mirror of
https://github.com/OpenBMB/MiniCPM-V.git
synced 2026-02-05 18:29:18 +08:00
Modify eval_mm for MiniCPM-V 2.6
This commit is contained in:
@@ -3,5 +3,4 @@ import torch
|
||||
torch.set_grad_enabled(False)
|
||||
torch.manual_seed(1234)
|
||||
from .base import BaseModel
|
||||
from .minicpm_llama3_v_2_5 import MiniCPM_Llama3_V
|
||||
from .minicpm_v import MiniCPM_V
|
||||
from .minicpm_v import MiniCPM_V, MiniCPM_Llama3_V, MiniCPM_V_2_6
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
from ..smp import *
|
||||
from ..utils.dataset_config import img_root_map
|
||||
from ..dataset import img_root_map
|
||||
from abc import abstractmethod
|
||||
|
||||
|
||||
class BaseModel:
|
||||
|
||||
INTERLEAVE = False
|
||||
allowed_types = ['text', 'image']
|
||||
allowed_types = ['text', 'image', 'video']
|
||||
|
||||
def __init__(self):
|
||||
self.dump_image_func = None
|
||||
|
||||
def use_custom_prompt(self, dataset):
|
||||
"""Whether to use custom prompt for the given dataset.
|
||||
@@ -33,34 +36,11 @@ class BaseModel:
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def set_dump_image(self, dump_image_func):
|
||||
self.dump_image_func = dump_image_func
|
||||
|
||||
def dump_image(self, line, dataset):
|
||||
"""Dump the image(s) of the input line to the corresponding dataset folder.
|
||||
|
||||
Args:
|
||||
line (line of pd.DataFrame): The raw input line.
|
||||
dataset (str): The name of the dataset.
|
||||
|
||||
Returns:
|
||||
str | list[str]: The paths of the dumped images.
|
||||
"""
|
||||
ROOT = LMUDataRoot()
|
||||
assert isinstance(dataset, str)
|
||||
img_root = osp.join(ROOT, 'images', img_root_map[dataset] if dataset in img_root_map else dataset)
|
||||
os.makedirs(img_root, exist_ok=True)
|
||||
if isinstance(line['image'], list):
|
||||
tgt_path = []
|
||||
assert 'image_path' in line
|
||||
for img, im_name in zip(line['image'], line['image_path']):
|
||||
path = osp.join(img_root, im_name)
|
||||
if not read_ok(path):
|
||||
decode_base64_to_image_file(img, path)
|
||||
tgt_path.append(path)
|
||||
else:
|
||||
tgt_path = osp.join(img_root, f"{line['index']}.jpg")
|
||||
if not read_ok(tgt_path):
|
||||
decode_base64_to_image_file(line['image'], tgt_path)
|
||||
tgt_path = [tgt_path]
|
||||
return tgt_path
|
||||
return self.dump_image_func(line)
|
||||
|
||||
@abstractmethod
|
||||
def generate_inner(self, message, dataset=None):
|
||||
@@ -134,7 +114,25 @@ class BaseModel:
|
||||
assert item['type'] in self.allowed_types, f'Invalid input type: {item["type"]}'
|
||||
return self.generate_inner(message, dataset)
|
||||
|
||||
def message_to_promptimg(self, message):
|
||||
def chat(self, messages, dataset=None):
|
||||
"""The main function for multi-turn chatting. Will call `chat_inner` with the preprocessed input messages."""
|
||||
assert hasattr(self, 'chat_inner'), 'The API model should has the `chat_inner` method. '
|
||||
for msg in messages:
|
||||
assert isinstance(msg, dict) and 'role' in msg and 'content' in msg, msg
|
||||
assert self.check_content(msg['content']) in ['str', 'dict', 'liststr', 'listdict'], msg
|
||||
msg['content'] = self.preproc_content(msg['content'])
|
||||
|
||||
while len(messages):
|
||||
try:
|
||||
return self.chat_inner(messages, dataset=dataset)
|
||||
except:
|
||||
messages = messages[1:]
|
||||
while len(messages) and messages[0]['role'] != 'user':
|
||||
messages = messages[1:]
|
||||
continue
|
||||
return 'Chat Mode: Failed with all possible conversation turns.'
|
||||
|
||||
def message_to_promptimg(self, message, dataset=None):
|
||||
assert not self.INTERLEAVE
|
||||
model_name = self.__class__.__name__
|
||||
warnings.warn(
|
||||
@@ -146,5 +144,24 @@ class BaseModel:
|
||||
image = None
|
||||
else:
|
||||
prompt = '\n'.join([x['value'] for x in message if x['type'] == 'text'])
|
||||
image = [x['value'] for x in message if x['type'] == 'image'][0]
|
||||
images = [x['value'] for x in message if x['type'] == 'image']
|
||||
if 'BLINK' == dataset:
|
||||
image = concat_images_vlmeval(images, target_size=512)
|
||||
else:
|
||||
image = images[0]
|
||||
return prompt, image
|
||||
|
||||
def message_to_promptvideo(self, message):
|
||||
if self.VIDEO_LLM:
|
||||
num_videos = len([x for x in message if x['type'] == 'video'])
|
||||
if num_videos == 0:
|
||||
prompt = '\n'.join([x['value'] for x in message if x['type'] == 'text'])
|
||||
video = None
|
||||
else:
|
||||
prompt = '\n'.join([x['value'] for x in message if x['type'] == 'text'])
|
||||
video = [x['value'] for x in message if x['type'] == 'video'][0]
|
||||
return prompt, video
|
||||
else:
|
||||
import sys
|
||||
warnings.warn('Model does not support video input.')
|
||||
sys.exit(-1)
|
||||
|
||||
@@ -1,155 +0,0 @@
|
||||
import torch
|
||||
from PIL import Image
|
||||
from transformers import AutoModel, AutoTokenizer
|
||||
|
||||
from ..smp import *
|
||||
from ..utils import DATASET_TYPE
|
||||
from .base import BaseModel
|
||||
|
||||
|
||||
class MiniCPM_Llama3_V(BaseModel):
|
||||
INSTALL_REQ = False
|
||||
INTERLEAVE = True
|
||||
|
||||
def __init__(self, model_path='openbmb/MiniCPM-V', **kwargs):
|
||||
assert model_path is not None
|
||||
self.model_path = model_path
|
||||
self.ckpt = model_path
|
||||
|
||||
print(f'load from {self.model_path}')
|
||||
self.model = AutoModel.from_pretrained(self.model_path, trust_remote_code=True)
|
||||
if '.pt' in model_path:
|
||||
print(f'load from {model_path}')
|
||||
self.state_dict = torch.load(self.ckpt, map_location='cpu')
|
||||
self.model.load_state_dict(self.state_dict, strict=False)
|
||||
self.model = self.model.to(dtype=torch.float16)
|
||||
self.model.eval().cuda()
|
||||
self.kwargs = kwargs
|
||||
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path, trust_remote_code=True)
|
||||
torch.cuda.empty_cache()
|
||||
self.num_beams = 1 if self.model_path == 'openbmb/MiniCPM-V' else 3
|
||||
self.options_system_prompt = ('Carefully read the following question and select the letter corresponding '
|
||||
'to the correct answer. Highlight the applicable choices without giving '
|
||||
'explanations.')
|
||||
self.wo_options_system_prompt = 'Carefully read the following question Answer the question directly.'
|
||||
self.detail_system_prompt = 'Answer this question in detail.'
|
||||
self.vqa_prompt = 'Answer the question using a single word or phrase.'
|
||||
|
||||
|
||||
def use_custom_prompt(self, dataset):
|
||||
if listinstr(['multi-choice', 'VQA'], DATASET_TYPE(dataset)):
|
||||
return True
|
||||
elif dataset is not None and listinstr(['HallusionBench'], dataset):
|
||||
return True
|
||||
return False
|
||||
|
||||
def build_prompt(self, line, dataset=None):
|
||||
if dataset is None:
|
||||
dataset = self.dataset
|
||||
|
||||
if isinstance(line, int):
|
||||
line = self.data.iloc[line]
|
||||
|
||||
tgt_path = self.dump_image(line, dataset)
|
||||
system_prompt = ''
|
||||
|
||||
question = line['question']
|
||||
if DATASET_TYPE(dataset) == 'multi-choice':
|
||||
options = {
|
||||
cand: line[cand]
|
||||
for cand in string.ascii_uppercase
|
||||
if cand in line and not pd.isna(line[cand])
|
||||
}
|
||||
options_prompt = 'Options:\n'
|
||||
for key, item in options.items():
|
||||
options_prompt += f'{key}. {item}\n'
|
||||
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
|
||||
prompt = ''
|
||||
if hint is not None:
|
||||
prompt += f'Hint: {hint}\n'
|
||||
prompt += f'Question: {question}\n'
|
||||
if len(options):
|
||||
prompt += options_prompt
|
||||
system_prompt = self.options_system_prompt + "\nPlease just indicate your choice."
|
||||
else:
|
||||
system_prompt = self.wo_options_system_prompt
|
||||
|
||||
if 'MMMU' in dataset: # Corner Case
|
||||
prompt = system_prompt + '\n' + prompt
|
||||
system_prompt = ''
|
||||
elif dataset is not None and listinstr(['HallusionBench'], dataset):
|
||||
question = line['question'] + " Yes or No?"
|
||||
prompt = question
|
||||
|
||||
elif dataset is not None and listinstr(['OCRBench'], dataset):
|
||||
system_prompt = self.vqa_prompt
|
||||
question = line['question']
|
||||
prompt = question
|
||||
elif DATASET_TYPE(dataset) == 'VQA':
|
||||
if listinstr(['LLaVABench'], dataset):
|
||||
system_prompt = ""
|
||||
prompt = question
|
||||
elif listinstr(['MMVet'], dataset):
|
||||
system_prompt = self.detail_system_prompt
|
||||
prompt = question
|
||||
else:
|
||||
system_prompt = self.vqa_prompt
|
||||
prompt = question
|
||||
|
||||
msgs = []
|
||||
if system_prompt:
|
||||
msgs.append(dict(type='text', value=system_prompt))
|
||||
if isinstance(tgt_path, list):
|
||||
msgs.extend([dict(type='image', value=p) for p in tgt_path])
|
||||
else:
|
||||
msgs = [dict(type='image', value=tgt_path)]
|
||||
msgs.append(dict(type='text', value=prompt))
|
||||
|
||||
return msgs
|
||||
|
||||
def generate_inner(self, message, dataset=None):
|
||||
if DATASET_TYPE(dataset) == 'multi-choice':
|
||||
max_new_tokens = 200
|
||||
elif DATASET_TYPE(dataset) == 'Y/N':
|
||||
max_new_tokens = 3
|
||||
else:
|
||||
max_new_tokens = 1024
|
||||
|
||||
'''
|
||||
nums_beams = 3
|
||||
'''
|
||||
default_kwargs = dict(
|
||||
max_new_tokens=max_new_tokens,
|
||||
sampling=False,
|
||||
num_beams=self.num_beams,
|
||||
)
|
||||
default_kwargs.update(self.kwargs)
|
||||
|
||||
content = []
|
||||
|
||||
# message = [
|
||||
# {'type': 'text', 'value': 'sys prompt'},
|
||||
# {'type': 'image', 'value': '/path/to/image1.jpg'},
|
||||
# {'type': 'text', 'value': 'Here is an image:'},
|
||||
# ]
|
||||
|
||||
for x in message:
|
||||
if x['type'] == 'text':
|
||||
content.append(x['value'])
|
||||
elif x['type'] == 'image':
|
||||
image = Image.open(x['value']).convert('RGB')
|
||||
content.append(image)
|
||||
msgs = [{'role': 'user', 'content': content}]
|
||||
|
||||
res = self.model.chat(
|
||||
image = None,
|
||||
msgs=msgs,
|
||||
context=None,
|
||||
tokenizer=self.tokenizer,
|
||||
**default_kwargs
|
||||
)
|
||||
|
||||
if isinstance(res, tuple) and len(res) > 0:
|
||||
res = res[0]
|
||||
# print(f"content: {content}, res: {res}")
|
||||
return res
|
||||
@@ -1,10 +1,13 @@
|
||||
import math
|
||||
import torch
|
||||
import random
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from transformers import AutoModel, AutoTokenizer
|
||||
|
||||
from .base import BaseModel
|
||||
from ..smp import *
|
||||
from ..utils import DATASET_TYPE
|
||||
from ..dataset import DATASET_TYPE
|
||||
|
||||
|
||||
class MiniCPM_V(BaseModel):
|
||||
@@ -59,10 +62,10 @@ class MiniCPM_V(BaseModel):
|
||||
return message
|
||||
|
||||
def generate_inner(self, message, dataset=None):
|
||||
prompt, image_path = self.message_to_promptimg(message)
|
||||
prompt, image_path = self.message_to_promptimg(message, dataset=dataset)
|
||||
image = Image.open(image_path).convert('RGB')
|
||||
msgs = [{'role': 'user', 'content': prompt}]
|
||||
if DATASET_TYPE(dataset) == 'multi-choice':
|
||||
if DATASET_TYPE(dataset) == 'MCQ':
|
||||
max_new_tokens = 20
|
||||
elif DATASET_TYPE(dataset) == 'Y/N':
|
||||
max_new_tokens = 100
|
||||
@@ -83,3 +86,374 @@ class MiniCPM_V(BaseModel):
|
||||
**default_kwargs
|
||||
)
|
||||
return res
|
||||
|
||||
|
||||
class MiniCPM_Llama3_V(BaseModel):
|
||||
|
||||
INSTALL_REQ = False
|
||||
INTERLEAVE = True
|
||||
|
||||
def __init__(self, model_path='openbmb/MiniCPM-Llama3-V-2_5', **kwargs):
|
||||
assert model_path is not None
|
||||
self.model_path = model_path
|
||||
print(f'load from {self.model_path}')
|
||||
self.model = AutoModel.from_pretrained(self.model_path, trust_remote_code=True)
|
||||
self.model = self.model.to(dtype=torch.float16)
|
||||
self.model.eval().cuda()
|
||||
self.kwargs = kwargs
|
||||
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path, trust_remote_code=True)
|
||||
torch.cuda.empty_cache()
|
||||
self.num_beams = 1 if self.model_path == 'openbmb/MiniCPM-V' else 3
|
||||
self.options_system_prompt = ('Carefully read the following question and select the letter corresponding '
|
||||
'to the correct answer. Highlight the applicable choices without giving '
|
||||
'explanations.')
|
||||
self.wo_options_system_prompt = 'Carefully read the following question Answer the question directly.'
|
||||
self.detail_system_prompt = 'Answer this question in detail.'
|
||||
self.vqa_prompt = 'Answer the question using a single word or phrase.'
|
||||
|
||||
def use_custom_prompt(self, dataset):
|
||||
if listinstr(['MCQ', 'VQA'], DATASET_TYPE(dataset)):
|
||||
return True
|
||||
elif dataset is not None and listinstr(['HallusionBench'], dataset):
|
||||
return True
|
||||
return False
|
||||
|
||||
def build_prompt(self, line, dataset=None):
|
||||
if isinstance(line, int):
|
||||
line = self.data.iloc[line]
|
||||
|
||||
tgt_path = self.dump_image(line, dataset)
|
||||
system_prompt = ''
|
||||
|
||||
question = line['question']
|
||||
if DATASET_TYPE(dataset) == 'MCQ':
|
||||
options = {
|
||||
cand: line[cand]
|
||||
for cand in string.ascii_uppercase
|
||||
if cand in line and not pd.isna(line[cand])
|
||||
}
|
||||
options_prompt = 'Options:\n'
|
||||
for key, item in options.items():
|
||||
options_prompt += f'{key}. {item}\n'
|
||||
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
|
||||
prompt = ''
|
||||
if hint is not None:
|
||||
prompt += f'Hint: {hint}\n'
|
||||
prompt += f'Question: {question}\n'
|
||||
if len(options):
|
||||
prompt += options_prompt
|
||||
system_prompt = self.options_system_prompt + '\nPlease just indicate your choice.'
|
||||
else:
|
||||
system_prompt = self.wo_options_system_prompt
|
||||
if 'MMMU' in dataset: # Corner Case
|
||||
prompt = system_prompt + '\n' + prompt
|
||||
system_prompt = ''
|
||||
elif dataset is not None and listinstr(['HallusionBench'], dataset):
|
||||
question = line['question'] + ' Yes or No?'
|
||||
prompt = question
|
||||
elif dataset is not None and listinstr(['MME'], dataset):
|
||||
question = line['question'] + ' Yes or No?'
|
||||
prompt = question
|
||||
elif dataset is not None and listinstr(['OCRBench'], dataset):
|
||||
system_prompt = self.vqa_prompt
|
||||
question = line['question']
|
||||
prompt = question
|
||||
elif DATASET_TYPE(dataset) == 'VQA':
|
||||
if listinstr(['LLaVABench', 'MMLongBench_DOC'], dataset):
|
||||
system_prompt = ''
|
||||
prompt = question
|
||||
elif listinstr(['MMVet'], dataset):
|
||||
system_prompt = self.detail_system_prompt
|
||||
prompt = question
|
||||
else:
|
||||
system_prompt = self.vqa_prompt
|
||||
prompt = question
|
||||
|
||||
msgs = []
|
||||
if system_prompt:
|
||||
msgs.append(dict(type='text', value=system_prompt))
|
||||
if isinstance(tgt_path, list):
|
||||
msgs.extend([dict(type='image', value=p) for p in tgt_path])
|
||||
else:
|
||||
msgs = [dict(type='image', value=tgt_path)]
|
||||
msgs.append(dict(type='text', value=prompt))
|
||||
return msgs
|
||||
|
||||
def generate_inner(self, message, dataset=None):
|
||||
if DATASET_TYPE(dataset) == 'MCQ':
|
||||
max_new_tokens = 200
|
||||
elif DATASET_TYPE(dataset) == 'Y/N':
|
||||
max_new_tokens = 3
|
||||
else:
|
||||
max_new_tokens = 1024
|
||||
|
||||
default_kwargs = dict(
|
||||
max_new_tokens=max_new_tokens,
|
||||
sampling=False,
|
||||
num_beams=self.num_beams,
|
||||
)
|
||||
default_kwargs.update(self.kwargs)
|
||||
|
||||
content = []
|
||||
for x in message:
|
||||
if x['type'] == 'text':
|
||||
content.append(x['value'])
|
||||
elif x['type'] == 'image':
|
||||
image = Image.open(x['value']).convert('RGB')
|
||||
content.append(image)
|
||||
msgs = [{'role': 'user', 'content': content}]
|
||||
|
||||
res = self.model.chat(
|
||||
msgs=msgs,
|
||||
context=None,
|
||||
image=None,
|
||||
tokenizer=self.tokenizer,
|
||||
**default_kwargs
|
||||
)
|
||||
|
||||
if isinstance(res, tuple) and len(res) > 0:
|
||||
res = res[0]
|
||||
return res
|
||||
|
||||
def chat_inner(self, message, dataset=None):
|
||||
max_new_tokens = 1024
|
||||
|
||||
default_kwargs = dict(
|
||||
max_new_tokens=max_new_tokens,
|
||||
sampling=False,
|
||||
num_beams=self.num_beams,
|
||||
)
|
||||
default_kwargs.update(self.kwargs)
|
||||
|
||||
msgs = []
|
||||
for msg in message:
|
||||
content = []
|
||||
if len(msg['content']) == 1 and msg['content'][0]['type'] == 'text':
|
||||
msg_new = {'role': msg['role'], 'content': msg['content'][0]['value']}
|
||||
msgs.append(msg_new)
|
||||
continue
|
||||
|
||||
for x in msg['content']:
|
||||
if x['type'] == 'text':
|
||||
content.append(x['value'])
|
||||
elif x['type'] == 'image':
|
||||
image = Image.open(x['value']).convert('RGB')
|
||||
content.append(image)
|
||||
msg_new = {'role': msg['role'], 'content': content}
|
||||
msgs.append(msg_new)
|
||||
|
||||
res = self.model.chat(
|
||||
msgs=msgs,
|
||||
context=None,
|
||||
image=None,
|
||||
tokenizer=self.tokenizer,
|
||||
**default_kwargs)
|
||||
|
||||
if isinstance(res, tuple) and len(res) > 0:
|
||||
res = res[0]
|
||||
return res
|
||||
|
||||
|
||||
class MiniCPM_V_2_6(BaseModel):
|
||||
INSTALL_REQ = False
|
||||
INTERLEAVE = True
|
||||
|
||||
def __init__(self, model_path='openbmb/MiniCPM-V', **kwargs):
|
||||
random.seed(0)
|
||||
np.random.seed(0)
|
||||
torch.manual_seed(0)
|
||||
torch.cuda.manual_seed_all(0)
|
||||
|
||||
assert model_path is not None
|
||||
self.model_path = model_path
|
||||
print(f'load from path {self.model_path}')
|
||||
self.model = AutoModel.from_pretrained(self.model_path, trust_remote_code=True)
|
||||
self.model = self.model.to(dtype=torch.bfloat16)
|
||||
self.model.eval().cuda()
|
||||
|
||||
self.kwargs = kwargs
|
||||
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path, trust_remote_code=True)
|
||||
torch.cuda.empty_cache()
|
||||
self.num_beams = 1 if self.model_path == 'openbmb/MiniCPM-V' else 3
|
||||
|
||||
self.options_suffix_prompt = '''\nAnswer with the option's letter from the given choices directly.'''
|
||||
self.wo_options_system_prompt = 'Carefully read the following question Answer the question directly.'
|
||||
self.detail_system_prompt = 'Answer this question in detail.'
|
||||
self.vqa_prompt = 'Answer the question using a single word or phrase.'
|
||||
|
||||
self.multi_choice_cot_prompt = ('''Carefully read the following multichoice question, solve it step '''
|
||||
'''by step and finally pick the option associated with the correct '''
|
||||
'''answer in the format of "Answer: selected option\n\n''')
|
||||
self.short_ans_cot_prompt = ('''Read the following question carefully, solve it step by step, and '''
|
||||
'''then output the final answer in the format of "Answer: single number '''
|
||||
'''or single word or phrase".\n\n''')
|
||||
|
||||
def use_custom_prompt(self, dataset=None):
|
||||
if dataset is None:
|
||||
return False
|
||||
if listinstr(['MCQ', 'VQA', 'Y/N'], DATASET_TYPE(dataset)):
|
||||
return True
|
||||
return False
|
||||
|
||||
def use_cot(self, dataset=None):
|
||||
if dataset is None:
|
||||
return False
|
||||
if listinstr(['MMMU', 'HallusionBench', 'OCRBench', 'ChartQA'], dataset):
|
||||
return True
|
||||
elif listinstr(['MathVista', 'MMVet', 'MMBench', 'MMStar', 'AI2D', 'RealWorldQA',
|
||||
'POPE', 'ScienceQA', 'TextVQA', 'DocVQA'], dataset):
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
def use_upsize(self, dataset=None):
|
||||
if dataset is None:
|
||||
return False
|
||||
if listinstr(['MMVet', 'MMBench', 'MMStar', 'AI2D', 'OCRBench'], dataset):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def build_prompt(self, line, dataset=None):
|
||||
if isinstance(line, int):
|
||||
line = self.data.iloc[line]
|
||||
|
||||
tgt_path = self.dump_image(line, dataset)
|
||||
system_prompt, prompt = '', ''
|
||||
|
||||
question = line['question']
|
||||
|
||||
if not self.use_cot(dataset):
|
||||
if DATASET_TYPE(dataset) == 'MCQ':
|
||||
options = {
|
||||
cand: line[cand]
|
||||
for cand in string.ascii_uppercase
|
||||
if cand in line and not pd.isna(line[cand])
|
||||
}
|
||||
options_prompt = 'Options:\n'
|
||||
for key, item in options.items():
|
||||
options_prompt += f'{key}. {item}\n'
|
||||
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
|
||||
if hint is not None:
|
||||
prompt += f'Hint: {hint}\n'
|
||||
prompt += f'Question: {question}\n'
|
||||
if len(options):
|
||||
prompt += options_prompt
|
||||
prompt += self.options_suffix_prompt
|
||||
else:
|
||||
system_prompt = self.wo_options_system_prompt
|
||||
|
||||
if 'MMMU' in dataset:
|
||||
if len(system_prompt) > 0:
|
||||
prompt = system_prompt + '\n' + prompt
|
||||
system_prompt = ''
|
||||
elif dataset is not None and listinstr(['HallusionBench'], dataset):
|
||||
question += ' Yes or No?'
|
||||
prompt = question
|
||||
elif dataset is not None and listinstr(['OCRBench'], dataset):
|
||||
system_prompt = self.vqa_prompt
|
||||
prompt = question
|
||||
elif DATASET_TYPE(dataset) == 'VQA':
|
||||
if listinstr(['LLaVABench'], dataset):
|
||||
system_prompt = ''
|
||||
elif listinstr(['MMVet'], dataset):
|
||||
system_prompt = self.detail_system_prompt
|
||||
else:
|
||||
system_prompt = self.vqa_prompt
|
||||
prompt = question
|
||||
else:
|
||||
prompt = question
|
||||
else:
|
||||
has_options = True
|
||||
if DATASET_TYPE(dataset) == 'MCQ':
|
||||
options = {
|
||||
cand: line[cand]
|
||||
for cand in string.ascii_uppercase
|
||||
if cand in line and not pd.isna(line[cand])
|
||||
}
|
||||
options_prompt = ''
|
||||
for key, item in options.items():
|
||||
options_prompt += f'{key}. {item}\n'
|
||||
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
|
||||
if hint is not None:
|
||||
prompt += f'Hint: {hint}\n'
|
||||
prompt += f'{question}\n'
|
||||
|
||||
if len(options):
|
||||
prompt += options_prompt
|
||||
else:
|
||||
has_options = False
|
||||
|
||||
if 'MMMU' in dataset:
|
||||
if len(system_prompt) > 0:
|
||||
prompt = system_prompt + '\n' + prompt
|
||||
system_prompt = ''
|
||||
else:
|
||||
prompt = question
|
||||
|
||||
if DATASET_TYPE(dataset) in ['MCQ', 'Y/N', 'VQA']:
|
||||
if DATASET_TYPE(dataset) == 'MCQ':
|
||||
if has_options:
|
||||
prompt = self.multi_choice_cot_prompt + prompt
|
||||
else:
|
||||
prompt = self.short_ans_cot_prompt + prompt
|
||||
elif DATASET_TYPE(dataset) == 'Y/N':
|
||||
prompt = self.short_ans_cot_prompt + prompt
|
||||
else:
|
||||
prompt = self.short_ans_cot_prompt + prompt
|
||||
|
||||
msgs = []
|
||||
if system_prompt:
|
||||
msgs.append(dict(type='text', value=system_prompt))
|
||||
if isinstance(tgt_path, list):
|
||||
msgs.extend([dict(type='image', value=p) for p in tgt_path])
|
||||
else:
|
||||
msgs = [dict(type='image', value=tgt_path)]
|
||||
msgs.append(dict(type='text', value=prompt))
|
||||
|
||||
return msgs
|
||||
|
||||
def generate_inner(self, message, dataset=None):
|
||||
max_new_tokens = 2048
|
||||
default_kwargs = dict(
|
||||
max_new_tokens=max_new_tokens,
|
||||
sampling=False,
|
||||
num_beams=self.num_beams,
|
||||
)
|
||||
default_kwargs.update(self.kwargs)
|
||||
|
||||
content = []
|
||||
|
||||
for x in message:
|
||||
if x['type'] == 'text':
|
||||
content.append(x['value'])
|
||||
elif x['type'] == 'image':
|
||||
image = Image.open(x['value']).convert('RGB')
|
||||
if not self.use_upsize(dataset):
|
||||
content.append(image)
|
||||
else:
|
||||
img_width, img_height = image.width, image.height
|
||||
if (img_width * img_height) >= (1344 * 1344):
|
||||
content.append(image)
|
||||
else:
|
||||
ratio = math.sqrt((1344 * 1344) / (img_width * img_height))
|
||||
max_img_width = int(img_width * ratio)
|
||||
new_img_width = random.randint(img_width, max_img_width)
|
||||
new_img_height = int(new_img_width / img_width * img_height)
|
||||
resized_image = image.resize((new_img_width, new_img_height))
|
||||
content.append(resized_image)
|
||||
msgs = [{'role': 'user', 'content': content}]
|
||||
|
||||
res = self.model.chat(
|
||||
image=None,
|
||||
msgs=msgs,
|
||||
context=None,
|
||||
tokenizer=self.tokenizer,
|
||||
max_inp_length=8192,
|
||||
**default_kwargs
|
||||
)
|
||||
|
||||
if isinstance(res, tuple) and len(res) > 0:
|
||||
res = res[0]
|
||||
|
||||
return res
|
||||
|
||||
Reference in New Issue
Block a user