Add eval_mm dir

This commit is contained in:
trainfanlhy
2024-05-28 01:21:34 +08:00
parent 7e12387362
commit 65f5567a3a
49 changed files with 5610 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
import torch
torch.set_grad_enabled(False)
torch.manual_seed(1234)
from .base import BaseModel
from .minicpm_llama3_v_2_5 import MiniCPM_Llama3_V
from .minicpm_v import MiniCPM_V

View File

@@ -0,0 +1,150 @@
from ..smp import *
from ..utils.dataset_config import img_root_map
from abc import abstractmethod
class BaseModel:
INTERLEAVE = False
allowed_types = ['text', 'image']
def use_custom_prompt(self, dataset):
"""Whether to use custom prompt for the given dataset.
Args:
dataset (str): The name of the dataset.
Returns:
bool: Whether to use custom prompt. If True, will call `build_prompt` of the VLM to build the prompt.
Default to False.
"""
return False
@abstractmethod
def build_prompt(self, line, dataset):
"""Build custom prompts for a specific dataset. Called only if `use_custom_prompt` returns True.
Args:
line (line of pd.DataFrame): The raw input line.
dataset (str): The name of the dataset.
Returns:
str: The built message.
"""
raise NotImplementedError
def dump_image(self, line, dataset):
"""Dump the image(s) of the input line to the corresponding dataset folder.
Args:
line (line of pd.DataFrame): The raw input line.
dataset (str): The name of the dataset.
Returns:
str | list[str]: The paths of the dumped images.
"""
ROOT = LMUDataRoot()
assert isinstance(dataset, str)
img_root = osp.join(ROOT, 'images', img_root_map[dataset] if dataset in img_root_map else dataset)
os.makedirs(img_root, exist_ok=True)
if isinstance(line['image'], list):
tgt_path = []
assert 'image_path' in line
for img, im_name in zip(line['image'], line['image_path']):
path = osp.join(img_root, im_name)
if not read_ok(path):
decode_base64_to_image_file(img, path)
tgt_path.append(path)
else:
tgt_path = osp.join(img_root, f"{line['index']}.jpg")
if not read_ok(tgt_path):
decode_base64_to_image_file(line['image'], tgt_path)
tgt_path = [tgt_path]
return tgt_path
@abstractmethod
def generate_inner(self, message, dataset=None):
raise NotImplementedError
def check_content(self, msgs):
"""Check the content type of the input. Four types are allowed: str, dict, liststr, listdict.
"""
if isinstance(msgs, str):
return 'str'
if isinstance(msgs, dict):
return 'dict'
if isinstance(msgs, list):
types = [self.check_content(m) for m in msgs]
if all(t == 'str' for t in types):
return 'liststr'
if all(t == 'dict' for t in types):
return 'listdict'
return 'unknown'
def preproc_content(self, inputs):
"""Convert the raw input messages to a list of dicts.
Args:
inputs: raw input messages.
Returns:
list(dict): The preprocessed input messages. Will return None if failed to preprocess the input.
"""
if self.check_content(inputs) == 'str':
return [dict(type='text', value=inputs)]
elif self.check_content(inputs) == 'dict':
assert 'type' in inputs and 'value' in inputs
return [inputs]
elif self.check_content(inputs) == 'liststr':
res = []
for s in inputs:
mime, pth = parse_file(s)
if mime is None or mime == 'unknown':
res.append(dict(type='text', value=s))
else:
res.append(dict(type=mime.split('/')[0], value=pth))
return res
elif self.check_content(inputs) == 'listdict':
for item in inputs:
assert 'type' in item and 'value' in item
mime, s = parse_file(item['value'])
if mime is None:
assert item['type'] == 'text'
else:
assert mime.split('/')[0] == item['type']
item['value'] = s
return inputs
else:
return None
def generate(self, message, dataset=None):
"""Generate the output message.
Args:
message (list[dict]): The input message.
dataset (str, optional): The name of the dataset. Defaults to None.
Returns:
str: The generated message.
"""
assert self.check_content(message) in ['str', 'dict', 'liststr', 'listdict'], f'Invalid input type: {message}'
message = self.preproc_content(message)
assert message is not None and self.check_content(message) == 'listdict'
for item in message:
assert item['type'] in self.allowed_types, f'Invalid input type: {item["type"]}'
return self.generate_inner(message, dataset)
def message_to_promptimg(self, message):
assert not self.INTERLEAVE
model_name = self.__class__.__name__
warnings.warn(
f'Model {model_name} does not support interleaved input. '
'Will use the first image and aggregated texts as prompt. ')
num_images = len([x for x in message if x['type'] == 'image'])
if num_images == 0:
prompt = '\n'.join([x['value'] for x in message if x['type'] == 'text'])
image = None
else:
prompt = '\n'.join([x['value'] for x in message if x['type'] == 'text'])
image = [x['value'] for x in message if x['type'] == 'image'][0]
return prompt, image

View File

@@ -0,0 +1,155 @@
import torch
from PIL import Image
from transformers import AutoModel, AutoTokenizer
from ..smp import *
from ..utils import DATASET_TYPE
from .base import BaseModel
class MiniCPM_Llama3_V(BaseModel):
INSTALL_REQ = False
INTERLEAVE = True
def __init__(self, model_path='openbmb/MiniCPM-V', **kwargs):
assert model_path is not None
self.model_path = model_path
self.ckpt = model_path
print(f'load from {self.model_path}')
self.model = AutoModel.from_pretrained(self.model_path, trust_remote_code=True)
if '.pt' in model_path:
print(f'load from {model_path}')
self.state_dict = torch.load(self.ckpt, map_location='cpu')
self.model.load_state_dict(self.state_dict, strict=False)
self.model = self.model.to(dtype=torch.float16)
self.model.eval().cuda()
self.kwargs = kwargs
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path, trust_remote_code=True)
torch.cuda.empty_cache()
self.num_beams = 1 if self.model_path == 'openbmb/MiniCPM-V' else 3
self.options_system_prompt = ('Carefully read the following question and select the letter corresponding '
'to the correct answer. Highlight the applicable choices without giving '
'explanations.')
self.wo_options_system_prompt = 'Carefully read the following question Answer the question directly.'
self.detail_system_prompt = 'Answer this question in detail.'
self.vqa_prompt = 'Answer the question using a single word or phrase.'
def use_custom_prompt(self, dataset):
if listinstr(['multi-choice', 'VQA'], DATASET_TYPE(dataset)):
return True
elif dataset is not None and listinstr(['HallusionBench'], dataset):
return True
return False
def build_prompt(self, line, dataset=None):
if dataset is None:
dataset = self.dataset
if isinstance(line, int):
line = self.data.iloc[line]
tgt_path = self.dump_image(line, dataset)
system_prompt = ''
question = line['question']
if DATASET_TYPE(dataset) == 'multi-choice':
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
options_prompt = 'Options:\n'
for key, item in options.items():
options_prompt += f'{key}. {item}\n'
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
prompt = ''
if hint is not None:
prompt += f'Hint: {hint}\n'
prompt += f'Question: {question}\n'
if len(options):
prompt += options_prompt
system_prompt = self.options_system_prompt + "\nPlease just indicate your choice."
else:
system_prompt = self.wo_options_system_prompt
if 'MMMU' in dataset: # Corner Case
prompt = system_prompt + '\n' + prompt
system_prompt = ''
elif dataset is not None and listinstr(['HallusionBench'], dataset):
question = line['question'] + " Yes or No?"
prompt = question
elif dataset is not None and listinstr(['OCRBench'], dataset):
system_prompt = self.vqa_prompt
question = line['question']
prompt = question
elif DATASET_TYPE(dataset) == 'VQA':
if listinstr(['LLaVABench'], dataset):
system_prompt = ""
prompt = question
elif listinstr(['MMVet'], dataset):
system_prompt = self.detail_system_prompt
prompt = question
else:
system_prompt = self.vqa_prompt
prompt = question
msgs = []
if system_prompt:
msgs.append(dict(type='text', value=system_prompt))
if isinstance(tgt_path, list):
msgs.extend([dict(type='image', value=p) for p in tgt_path])
else:
msgs = [dict(type='image', value=tgt_path)]
msgs.append(dict(type='text', value=prompt))
return msgs
def generate_inner(self, message, dataset=None):
if DATASET_TYPE(dataset) == 'multi-choice':
max_new_tokens = 200
elif DATASET_TYPE(dataset) == 'Y/N':
max_new_tokens = 3
else:
max_new_tokens = 1024
'''
nums_beams = 3
'''
default_kwargs = dict(
max_new_tokens=max_new_tokens,
sampling=False,
num_beams=self.num_beams,
)
default_kwargs.update(self.kwargs)
content = []
# message = [
# {'type': 'text', 'value': 'sys prompt'},
# {'type': 'image', 'value': '/path/to/image1.jpg'},
# {'type': 'text', 'value': 'Here is an image:'},
# ]
for x in message:
if x['type'] == 'text':
content.append(x['value'])
elif x['type'] == 'image':
image = Image.open(x['value']).convert('RGB')
content.append(image)
msgs = [{'role': 'user', 'content': content}]
res = self.model.chat(
image = None,
msgs=msgs,
context=None,
tokenizer=self.tokenizer,
**default_kwargs
)
if isinstance(res, tuple) and len(res) > 0:
res = res[0]
# print(f"content: {content}, res: {res}")
return res

View File

@@ -0,0 +1,85 @@
import torch
from PIL import Image
from transformers import AutoModel, AutoTokenizer
from .base import BaseModel
from ..smp import *
from ..utils import DATASET_TYPE
class MiniCPM_V(BaseModel):
INSTALL_REQ = False
INTERLEAVE = False
def __init__(self, model_path='openbmb/MiniCPM-V', **kwargs):
assert model_path is not None
self.model_path = model_path
print(f'load from {self.model_path}')
self.model = AutoModel.from_pretrained(self.model_path, trust_remote_code=True)
self.model = self.model.to(dtype=torch.bfloat16)
self.model.eval().cuda()
self.kwargs = kwargs
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path, trust_remote_code=True)
torch.cuda.empty_cache()
self.num_beams = 1 if self.model_path == 'openbmb/MiniCPM-V' else 3
def use_custom_prompt(self, dataset):
assert dataset is not None
if listinstr(['MMMU'], dataset):
return True
return False
def build_prompt(self, line, dataset=None):
assert dataset is None or isinstance(dataset, str)
assert self.use_custom_prompt(dataset)
tgt_path = self.dump_image(line, dataset)
question = line['question']
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
options_prompt = 'Options:\n'
for key, item in options.items():
options_prompt += f'{key}. {item}\n'
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
prompt = ''
if hint is not None:
prompt += f'Hint: {hint}\n'
prompt += f'{question}\n'
if len(options):
prompt += options_prompt
prompt = 'Study the image carefully and pick the option associated with the correct answer. \
Focus solely on selecting the option and avoid including any other content.\n' + prompt
message = [dict(type='text', value=prompt)]
message.extend([dict(type='image', value=p) for p in tgt_path])
return message
def generate_inner(self, message, dataset=None):
prompt, image_path = self.message_to_promptimg(message)
image = Image.open(image_path).convert('RGB')
msgs = [{'role': 'user', 'content': prompt}]
if DATASET_TYPE(dataset) == 'multi-choice':
max_new_tokens = 20
elif DATASET_TYPE(dataset) == 'Y/N':
max_new_tokens = 100
else:
max_new_tokens = 1024
default_kwargs = dict(
max_new_tokens=max_new_tokens,
sampling=False,
num_beams=self.num_beams
)
default_kwargs.update(self.kwargs)
res, _, _ = self.model.chat(
image=image,
msgs=msgs,
context=None,
tokenizer=self.tokenizer,
**default_kwargs
)
return res