mirror of
https://github.com/OpenBMB/MiniCPM-V.git
synced 2026-02-05 10:19:18 +08:00
190 lines
7.2 KiB
Python
190 lines
7.2 KiB
Python
import re
|
|
import math
|
|
from typing import List
|
|
|
|
from vlmeval.dataset.utils.judge_util import build_judge
|
|
from vlmeval.smp import *
|
|
from .image_base import ImageBaseDataset
|
|
from .mmlongbench import concat_images, MMLongBench_auxeval, anls_compute
|
|
|
|
|
|
FAIL_MSG = 'Failed to obtain answer via API.'
|
|
|
|
|
|
def get_f1(gt, pred):
|
|
gt_bow, pred_bow = gt.strip().split(), pred.strip().split()
|
|
if not gt_bow or not pred_bow:
|
|
return 0.0
|
|
|
|
recall = len([pred_e for pred_e in pred_bow if pred_e in gt_bow]) / len(gt_bow)
|
|
precision = len([pred_e for pred_e in pred_bow if pred_e in gt_bow]) / len(pred_bow)
|
|
f1 = 2 * recall * precision / (recall + precision) if (recall + precision) > 1e-4 else 0.0
|
|
return f1
|
|
|
|
|
|
def SlideVQA_acc(result_file):
|
|
data = load(result_file)
|
|
anls_list, em_list, f1_list = list(), list(), list()
|
|
for i in range(len(data)):
|
|
item = data.iloc[i]
|
|
if isinstance(item['answer'], float) and math.isnan(item['answer']):
|
|
item['answer'] = 'Not answerable'
|
|
|
|
item['answer'] = re.sub('\n', '', item['answer']).lower()
|
|
item['pred'] = str(item['pred']).lower()
|
|
anls_score = anls_compute(item['answer'], item['pred'])
|
|
em_score = (item['answer'].strip() == item['pred'].strip())
|
|
f1_score = get_f1(item['answer'], item['pred'])
|
|
anls_list.append(anls_score)
|
|
em_list.append(em_score)
|
|
f1_list.append(f1_score)
|
|
print('---------------------')
|
|
print(item['answer'], item['pred'], anls_score, em_score, f1_score)
|
|
|
|
data['anls'] = anls_list
|
|
data['em'] = em_list
|
|
data['f1'] = f1_list
|
|
dump(data, result_file)
|
|
|
|
res = dict()
|
|
res['category'], res['num'] = ['anls', 'EM', 'F1'], [len(data), len(data), len(data)]
|
|
res['avg'] = [sum(anls_list) / len(data), sum(em_list) / len(data), sum(f1_list) / len(data)]
|
|
res = pd.DataFrame(res)
|
|
return res
|
|
|
|
|
|
class SlideVQA(ImageBaseDataset):
|
|
|
|
TYPE = 'VQA'
|
|
|
|
DATASET_URL = {
|
|
'SLIDEVQA_MINI': 'https://opencompass.openxlab.space/utils/VLMEval/SLIDEVQA_MINI.tsv',
|
|
'SLIDEVQA': 'https://opencompass.openxlab.space/utils/VLMEval/SLIDEVQA.tsv',
|
|
}
|
|
DATASET_MD5 = {
|
|
'SLIDEVQA_MINI': '6d9a8d8814fa5b7669deb2af3a3208eb',
|
|
'SLIDEVQA': '5e822c2f800e94c1e23badfd478326b6',
|
|
}
|
|
|
|
SUPPORTED_MODELS = {
|
|
'GPT4': (1, 1),
|
|
'GPT4V': (1, 1),
|
|
'GPT4V_HIGH': (1, 1),
|
|
'GPT4o': (1, 1),
|
|
'GPT4o_HIGH': (1, 1),
|
|
'GPT4o_MINI': (1, 1),
|
|
'XComposer2d5': (1, -1),
|
|
'XComposer2_4KHD': (1, -1),
|
|
'MiniCPM-Llama3-V-2_5': (1, 5),
|
|
'InternVL-Chat-V1-5': (5, 2),
|
|
}
|
|
|
|
def __init__(self, dataset, **kwargs):
|
|
self.model_list = list(self.SUPPORTED_MODELS.keys())
|
|
model_name = kwargs['model']
|
|
if not listinstr(self.model_list, model_name):
|
|
raise AssertionError("{} doesn't support the evaluation on SlideVQA.".format(model_name))
|
|
super(SlideVQA, self).__init__(dataset)
|
|
|
|
self.is_api = True if listinstr(['GPT4'], model_name) else False
|
|
self.max_pages = 120
|
|
concat_num, column_num = self.SUPPORTED_MODELS.get(model_name)
|
|
self.concat_num = concat_num
|
|
self.column_num = column_num
|
|
|
|
def dump_image(self, origin_line):
|
|
os.makedirs(self.img_root, exist_ok=True)
|
|
|
|
line = origin_line.copy()
|
|
if not isinstance(line['image_path'], List):
|
|
line['image_path'] = [line['image_path']]
|
|
line['image_path'] = line['image_path'][:self.max_pages]
|
|
|
|
if 'image' in line:
|
|
if isinstance(line['image'], list):
|
|
tgt_path = []
|
|
assert 'image_path' in line
|
|
for img, im_name in zip(line['image'], line['image_path']):
|
|
path = osp.join(self.img_root, im_name)
|
|
if not read_ok(path):
|
|
decode_base64_to_image_file(img, path)
|
|
tgt_path.append(path)
|
|
else:
|
|
tgt_path = osp.join(self.img_root, f"{line['index']}.jpg")
|
|
if not read_ok(tgt_path):
|
|
decode_base64_to_image_file(line['image'], tgt_path)
|
|
tgt_path = [tgt_path]
|
|
else:
|
|
assert 'image_path' in line
|
|
tgt_path = toliststr(line['image_path'])
|
|
|
|
if self.concat_num > 0 and not self.is_api:
|
|
concatenated_images = concat_images(tgt_path, max_concat=self.concat_num, column_num=self.column_num)
|
|
|
|
old_tgt_path = tgt_path
|
|
assert isinstance(old_tgt_path, list)
|
|
if self.column_num != -1:
|
|
tgt_path = [
|
|
'_'.join(old_tgt_path[0].split('_')[:-1]) + '_concat{}_{}.jpg'.format(self.concat_num, i)
|
|
for i in range(len(concatenated_images))
|
|
]
|
|
else:
|
|
tgt_path = ['_'.join(old_tgt_path[0].split('_')[:-1]) + '_concat_all.jpg']
|
|
|
|
for path, concatenated_image in zip(tgt_path, concatenated_images):
|
|
if not read_ok(path):
|
|
decode_base64_to_image_file(encode_image_to_base64(concatenated_image), path)
|
|
num_images, image_size = len(old_tgt_path), concatenated_image.size
|
|
print('concat {} images to a new one with size {}. save at {}'.format(num_images, image_size, path))
|
|
return tgt_path
|
|
|
|
@classmethod
|
|
def evaluate(self, eval_file, **judge_kwargs):
|
|
logger = get_logger('Evaluation')
|
|
model = judge_kwargs['model']
|
|
|
|
suffix = eval_file.split('.')[-1]
|
|
storage = eval_file.replace(f'.{suffix}', f'_{model}.xlsx')
|
|
tmp_file = eval_file.replace(f'.{suffix}', f'_{model}.pkl')
|
|
|
|
if osp.exists(storage):
|
|
logger.warning(f'GPT scoring file {storage} already exists, will reuse it in SlideVQA_eval. ')
|
|
else:
|
|
data = load(eval_file)
|
|
model = build_judge(max_tokens=128, **judge_kwargs)
|
|
lt = len(data)
|
|
lines = [data.iloc[i] for i in range(lt)]
|
|
tups = [(model, line) for line in lines]
|
|
indices = [line['index'] for line in lines]
|
|
|
|
ans = {}
|
|
if osp.exists(tmp_file):
|
|
ans = load(tmp_file)
|
|
tups = [x for x, i in zip(tups, indices) if i not in ans]
|
|
indices = [i for i in indices if i not in ans]
|
|
|
|
if len(indices):
|
|
new_results = list()
|
|
for model, line in tqdm(tups):
|
|
res = MMLongBench_auxeval(model, line)
|
|
new_results.append(res)
|
|
|
|
log_map, res_map, pred_map = {}, {}, {}
|
|
all_inds = [line['index'] for line in lines]
|
|
for k, v in zip(all_inds, new_results):
|
|
log_map[k] = v['log']
|
|
res_map[k] = v['res']
|
|
pred_map[k] = v['pred']
|
|
data['res'] = [res_map[idx] for idx in data['index']]
|
|
data['log'] = [log_map[idx] for idx in data['index']]
|
|
data['pred'] = [pred_map[idx] for idx in data['index']]
|
|
dump(data, storage)
|
|
|
|
score = SlideVQA_acc(storage)
|
|
score_pth = storage.replace('.xlsx', '_score.csv')
|
|
|
|
dump(score, score_pth)
|
|
logger.info(f'SlideVQA successfully finished evaluating {eval_file}, results saved in {score_pth}')
|
|
logger.info('Score: ')
|
|
logger.info(score)
|