mirror of
https://github.com/OpenBMB/MiniCPM-V.git
synced 2026-02-05 02:09:20 +08:00
437 lines
14 KiB
Python
437 lines
14 KiB
Python
import itertools
|
|
import json
|
|
import os
|
|
import re
|
|
from collections import namedtuple
|
|
|
|
import torch
|
|
from tqdm import tqdm
|
|
|
|
|
|
class InferenceSampler(torch.utils.data.sampler.Sampler):
|
|
|
|
def __init__(self, size):
|
|
self._size = int(size)
|
|
assert size > 0
|
|
self._rank = torch.distributed.get_rank()
|
|
self._world_size = torch.distributed.get_world_size()
|
|
self._local_indices = self._get_local_indices(size, self._world_size,
|
|
self._rank)
|
|
|
|
@staticmethod
|
|
def _get_local_indices(total_size, world_size, rank):
|
|
shard_size = total_size // world_size
|
|
left = total_size % world_size
|
|
shard_sizes = [shard_size + int(r < left) for r in range(world_size)]
|
|
|
|
begin = sum(shard_sizes[:rank])
|
|
end = min(sum(shard_sizes[:rank + 1]), total_size)
|
|
return range(begin, end)
|
|
|
|
def __iter__(self):
|
|
yield from self._local_indices
|
|
|
|
def __len__(self):
|
|
return len(self._local_indices)
|
|
|
|
def collate_fn_vqa(batches):
|
|
'''
|
|
'''
|
|
image_paths = [_['image_path'] for _ in batches]
|
|
questions = [_['question'] for _ in batches]
|
|
gt_answers = [_['gt_answers'] for _ in batches]
|
|
ocr_tokens = [_['ocr_tokens'] if 'ocr_tokens' in _ else None for _ in batches]
|
|
question_ids = [_['question_id'] if 'question_id' in _ else None for _ in batches]
|
|
question_type = [_['question_type'] if 'question_type' in _ else None for _ in batches]
|
|
|
|
return image_paths, questions, gt_answers, ocr_tokens, question_ids, question_type
|
|
|
|
def has_word(sentence, word):
|
|
if word[0].isalnum():
|
|
start_pattern = r"\b"
|
|
else:
|
|
start_pattern = r""
|
|
|
|
if word[-1].isalnum():
|
|
end_pattern = r"\b"
|
|
else:
|
|
end_pattern = r""
|
|
|
|
pattern = start_pattern + re.escape(word) + end_pattern
|
|
match = re.search(pattern, sentence)
|
|
return bool(match)
|
|
|
|
def remove_special_chars(s):
|
|
pattern = r"[^a-zA-Z0-9\s]"
|
|
s = re.sub(pattern, "", s)
|
|
return s
|
|
|
|
def levenshtein_distance(s1, s2):
|
|
if len(s1) > len(s2):
|
|
s1, s2 = s2, s1
|
|
|
|
distances = range(len(s1) + 1)
|
|
for i2, c2 in enumerate(s2):
|
|
distances_ = [i2+1]
|
|
for i1, c1 in enumerate(s1):
|
|
if c1 == c2:
|
|
distances_.append(distances[i1])
|
|
else:
|
|
distances_.append(1 + min((distances[i1], distances[i1 + 1], distances_[-1])))
|
|
distances = distances_
|
|
return distances[-1]
|
|
|
|
class VQAEval:
|
|
def __init__(self):
|
|
self.contractions = {
|
|
"aint": "ain't",
|
|
"arent": "aren't",
|
|
"cant": "can't",
|
|
"couldve": "could've",
|
|
"couldnt": "couldn't",
|
|
"couldn'tve": "couldn't've",
|
|
"couldnt've": "couldn't've",
|
|
"didnt": "didn't",
|
|
"doesnt": "doesn't",
|
|
"dont": "don't",
|
|
"hadnt": "hadn't",
|
|
"hadnt've": "hadn't've",
|
|
"hadn'tve": "hadn't've",
|
|
"hasnt": "hasn't",
|
|
"havent": "haven't",
|
|
"hed": "he'd",
|
|
"hed've": "he'd've",
|
|
"he'dve": "he'd've",
|
|
"hes": "he's",
|
|
"howd": "how'd",
|
|
"howll": "how'll",
|
|
"hows": "how's",
|
|
"Id've": "I'd've",
|
|
"I'dve": "I'd've",
|
|
"Im": "I'm",
|
|
"Ive": "I've",
|
|
"isnt": "isn't",
|
|
"itd": "it'd",
|
|
"itd've": "it'd've",
|
|
"it'dve": "it'd've",
|
|
"itll": "it'll",
|
|
"let's": "let's",
|
|
"maam": "ma'am",
|
|
"mightnt": "mightn't",
|
|
"mightnt've": "mightn't've",
|
|
"mightn'tve": "mightn't've",
|
|
"mightve": "might've",
|
|
"mustnt": "mustn't",
|
|
"mustve": "must've",
|
|
"neednt": "needn't",
|
|
"notve": "not've",
|
|
"oclock": "o'clock",
|
|
"oughtnt": "oughtn't",
|
|
"ow's'at": "'ow's'at",
|
|
"'ows'at": "'ow's'at",
|
|
"'ow'sat": "'ow's'at",
|
|
"shant": "shan't",
|
|
"shed've": "she'd've",
|
|
"she'dve": "she'd've",
|
|
"she's": "she's",
|
|
"shouldve": "should've",
|
|
"shouldnt": "shouldn't",
|
|
"shouldnt've": "shouldn't've",
|
|
"shouldn'tve": "shouldn't've",
|
|
"somebody'd": "somebodyd",
|
|
"somebodyd've": "somebody'd've",
|
|
"somebody'dve": "somebody'd've",
|
|
"somebodyll": "somebody'll",
|
|
"somebodys": "somebody's",
|
|
"someoned": "someone'd",
|
|
"someoned've": "someone'd've",
|
|
"someone'dve": "someone'd've",
|
|
"someonell": "someone'll",
|
|
"someones": "someone's",
|
|
"somethingd": "something'd",
|
|
"somethingd've": "something'd've",
|
|
"something'dve": "something'd've",
|
|
"somethingll": "something'll",
|
|
"thats": "that's",
|
|
"thered": "there'd",
|
|
"thered've": "there'd've",
|
|
"there'dve": "there'd've",
|
|
"therere": "there're",
|
|
"theres": "there's",
|
|
"theyd": "they'd",
|
|
"theyd've": "they'd've",
|
|
"they'dve": "they'd've",
|
|
"theyll": "they'll",
|
|
"theyre": "they're",
|
|
"theyve": "they've",
|
|
"twas": "'twas",
|
|
"wasnt": "wasn't",
|
|
"wed've": "we'd've",
|
|
"we'dve": "we'd've",
|
|
"weve": "we've",
|
|
"werent": "weren't",
|
|
"whatll": "what'll",
|
|
"whatre": "what're",
|
|
"whats": "what's",
|
|
"whatve": "what've",
|
|
"whens": "when's",
|
|
"whered": "where'd",
|
|
"wheres": "where's",
|
|
"whereve": "where've",
|
|
"whod": "who'd",
|
|
"whod've": "who'd've",
|
|
"who'dve": "who'd've",
|
|
"wholl": "who'll",
|
|
"whos": "who's",
|
|
"whove": "who've",
|
|
"whyll": "why'll",
|
|
"whyre": "why're",
|
|
"whys": "why's",
|
|
"wont": "won't",
|
|
"wouldve": "would've",
|
|
"wouldnt": "wouldn't",
|
|
"wouldnt've": "wouldn't've",
|
|
"wouldn'tve": "wouldn't've",
|
|
"yall": "y'all",
|
|
"yall'll": "y'all'll",
|
|
"y'allll": "y'all'll",
|
|
"yall'd've": "y'all'd've",
|
|
"y'alld've": "y'all'd've",
|
|
"y'all'dve": "y'all'd've",
|
|
"youd": "you'd",
|
|
"youd've": "you'd've",
|
|
"you'dve": "you'd've",
|
|
"youll": "you'll",
|
|
"youre": "you're",
|
|
"youve": "you've",
|
|
}
|
|
self.manualMap = {
|
|
"none": "0",
|
|
"zero": "0",
|
|
"one": "1",
|
|
"two": "2",
|
|
"three": "3",
|
|
"four": "4",
|
|
"five": "5",
|
|
"six": "6",
|
|
"seven": "7",
|
|
"eight": "8",
|
|
"nine": "9",
|
|
"ten": "10",
|
|
}
|
|
self.articles = ["a", "an", "the"]
|
|
|
|
self.periodStrip = re.compile("(?!<=\d)(\.)(?!\d)")
|
|
self.commaStrip = re.compile("(\d)(\,)(\d)")
|
|
self.punct = [
|
|
";",
|
|
r"/",
|
|
"[",
|
|
"]",
|
|
'"',
|
|
"{",
|
|
"}",
|
|
"(",
|
|
")",
|
|
"=",
|
|
"+",
|
|
"\\",
|
|
"_",
|
|
"-",
|
|
">",
|
|
"<",
|
|
"@",
|
|
"`",
|
|
",",
|
|
"?",
|
|
"!",
|
|
]
|
|
def clean_text(self, text):
|
|
text = text.replace("\n", " ").replace("\t", " ").strip()
|
|
text = self.processPunctuation(text)
|
|
text = self.processDigitArticle(text)
|
|
return text
|
|
|
|
def evaluate_vqa_human(self, answer, gt_answers):
|
|
'''TextVQA, VQAv2, OKVQA, vizwiz'''
|
|
answer = answer.replace("\n", " ").replace("\t", " ").strip()
|
|
answer = self.processPunctuation(answer)
|
|
answer = self.processDigitArticle(answer)
|
|
gt_answers = [self.processPunctuation(ans) for ans in gt_answers]
|
|
gt_answers = [self.processDigitArticle(ans) for ans in gt_answers]
|
|
|
|
gtAcc = []
|
|
|
|
for idx, gtAnsDatum in enumerate(gt_answers):
|
|
otherGTAns = gt_answers[:idx] + gt_answers[idx+1:]
|
|
|
|
matchingAns = [item for item in otherGTAns if answer == item]
|
|
|
|
acc = min(1, float(len(matchingAns)) / 3)
|
|
gtAcc.append(acc)
|
|
|
|
avgGTAcc = float(sum(gtAcc)) / len(gtAcc) if gtAcc else 0
|
|
|
|
return avgGTAcc
|
|
|
|
def evaluate_anls(self, answer, gt_answers, threshold=0.5):
|
|
'''DOcVQA, InfographicsVQA, STVQA'''
|
|
answer = ' '.join(answer.strip().lower().split())
|
|
if not isinstance(gt_answers, list):
|
|
gt_answers = [gt_answers]
|
|
gt_answers = [' '.join(gt_answer.strip().lower().split()) for gt_answer in gt_answers]
|
|
|
|
values = []
|
|
for gt_answer in gt_answers:
|
|
dist = levenshtein_distance(answer, gt_answer)
|
|
length = max(len(answer), len(gt_answer))
|
|
values.append(0.0 if length == 0 else float(dist) / float(length))
|
|
|
|
score = 1 - min(values)
|
|
|
|
score = 0 if score < threshold else score
|
|
|
|
return score
|
|
|
|
def processPunctuation(self, inText):
|
|
outText = inText
|
|
for p in self.punct:
|
|
if (p + " " in inText or " " + p in inText) or (
|
|
re.search(self.commaStrip, inText) != None
|
|
):
|
|
outText = outText.replace(p, "")
|
|
else:
|
|
outText = outText.replace(p, " ")
|
|
outText = self.periodStrip.sub("", outText, re.UNICODE)
|
|
return outText
|
|
|
|
def processDigitArticle(self, inText):
|
|
outText = []
|
|
tempText = inText.lower().split()
|
|
for word in tempText:
|
|
word = self.manualMap.setdefault(word, word)
|
|
if word not in self.articles:
|
|
outText.append(word)
|
|
else:
|
|
pass
|
|
for wordId, word in enumerate(outText):
|
|
if word in self.contractions:
|
|
outText[wordId] = self.contractions[word]
|
|
outText = " ".join(outText)
|
|
return outText
|
|
|
|
|
|
def evaluate_dataset(dataset_name, answer_file_path, model_name, method = None):
|
|
with open(answer_file_path, 'r', encoding='utf-8') as f:
|
|
predictions = json.load(f)
|
|
|
|
eval = VQAEval()
|
|
total_accuracy = 0
|
|
num = 0
|
|
Entry = namedtuple('Entry', ['text', 'bbox'])
|
|
|
|
for item in predictions:
|
|
gt_answers = item['gt_answers']
|
|
answer = item['answer']
|
|
if method is not None:
|
|
pass
|
|
if dataset_name in ["textVQA"]:
|
|
if num == 0:
|
|
print(f"evaluating vqa...")
|
|
accuracy = eval.evaluate_vqa_human(answer, gt_answers)
|
|
elif dataset_name in ['docVQA']:
|
|
if num == 0:
|
|
print(f"evaluating anls...")
|
|
accuracy = eval.evaluate_anls(answer, gt_answers)
|
|
else:
|
|
accuracy = eval.evaluate_has(answer, gt_answers)
|
|
item['accuracy'] = accuracy
|
|
|
|
total_accuracy += accuracy
|
|
num += 1
|
|
|
|
average_accuracy = total_accuracy / num
|
|
print(f'{dataset_name}:{average_accuracy}')
|
|
|
|
answer_model_method_path = answer_file_path.replace('.json', f'_{model_name}_{method}.json')
|
|
with open(answer_model_method_path, "w", encoding='utf-8') as f:
|
|
json.dump(predictions, f, indent=4, ensure_ascii=False)
|
|
|
|
return average_accuracy
|
|
|
|
|
|
def evaluate_VQA(
|
|
model,
|
|
dataset,
|
|
model_name,
|
|
dataset_name,
|
|
time,
|
|
batch_size=1,
|
|
generate_method="interleave",
|
|
answer_path='./answers',
|
|
):
|
|
sampler = None
|
|
if torch.distributed.is_initialized():
|
|
sampler=InferenceSampler(len(dataset))
|
|
|
|
dataloader = torch.utils.data.DataLoader(
|
|
dataset=dataset,
|
|
batch_size=batch_size,
|
|
sampler=sampler,
|
|
collate_fn=collate_fn_vqa
|
|
)
|
|
|
|
answer_dir = os.path.join(answer_path, model_name, time)
|
|
os.makedirs(answer_dir, exist_ok=True)
|
|
|
|
image_list = []
|
|
for item in dataset:
|
|
image_list.append(item["image_path"])
|
|
|
|
predictions = []
|
|
|
|
for batch in tqdm(dataloader, desc="Running inference"):
|
|
image_paths, questions, gt_answers, ocr_tokens_list, question_ids, question_type = batch
|
|
|
|
with torch.no_grad():
|
|
if generate_method == "old":
|
|
outputs = model.generate(images=image_paths, questions=questions, datasetname=dataset_name)
|
|
elif generate_method == "interleave":
|
|
outputs = model.generate_with_interleaved(images=image_paths, questions=questions, datasetname=dataset_name)
|
|
else:
|
|
raise Exception(f"Wrong generate paradigm {generate_method}!")
|
|
|
|
for i in range(len(outputs)):
|
|
answer_dict = {
|
|
'question_id': question_ids[i],
|
|
'question': questions[i],
|
|
'answer': outputs[i],
|
|
'gt_answers': gt_answers[i],
|
|
'image_path': image_paths[i],
|
|
'model_name': model_name,
|
|
'question_type': question_type[i]
|
|
}
|
|
predictions.append(answer_dict)
|
|
|
|
if torch.distributed.is_initialized():
|
|
torch.distributed.barrier()
|
|
if torch.distributed.is_initialized():
|
|
world_size = torch.distributed.get_world_size()
|
|
merged_predictions = [None for _ in range(world_size)]
|
|
torch.distributed.all_gather_object(merged_predictions, predictions)
|
|
predictions = [_ for _ in itertools.chain.from_iterable(merged_predictions)]
|
|
|
|
if torch.distributed.is_initialized() and torch.distributed.get_rank() != 0:
|
|
return None
|
|
|
|
answer_file_path = os.path.join(answer_dir, f"{dataset_name}.json")
|
|
print(f"answer_file_path:{answer_file_path}")
|
|
|
|
with open(answer_file_path, "w", encoding='utf-8') as f:
|
|
json.dump(predictions, f, indent=4, ensure_ascii=False)
|
|
|
|
if dataset_name in ["docVQATest"]:
|
|
return -1.0
|
|
|
|
return evaluate_dataset(answer_file_path=answer_file_path, dataset_name=dataset_name, model_name=model_name)
|