mirror of
https://github.com/FunAudioLLM/CosyVoice.git
synced 2026-02-04 17:39:25 +08:00
feat: Support DPO
This commit is contained in:
committed by
lyuxiang.lx
parent
3770c1c8b1
commit
6d876f573c
184
cosyvoice/utils/executor_dpo.py
Normal file
184
cosyvoice/utils/executor_dpo.py
Normal file
@@ -0,0 +1,184 @@
|
||||
# Copyright (c) 2020 Mobvoi Inc (Binbin Zhang)
|
||||
# 2024 Alibaba Inc (authors: Xiang Lyu)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from contextlib import nullcontext
|
||||
import os
|
||||
|
||||
import torch
|
||||
import torch.distributed as dist
|
||||
|
||||
from cosyvoice.utils.train_utils_dpo import update_parameter_and_lr, log_per_step, log_per_save, batch_forward, batch_backward, save_model, cosyvoice_join
|
||||
from cosyvoice.utils.losses_dpo import DPOLoss
|
||||
|
||||
|
||||
class Executor:
|
||||
|
||||
def __init__(self, gan: bool = False, dpo: bool = False, beta: float = 0.01, label_smoothing: float = 0.0, ipo: bool = False):
|
||||
self.gan = gan
|
||||
self.step = 0
|
||||
self.epoch = 0
|
||||
self.rank = int(os.environ.get('RANK', 0))
|
||||
self.device = torch.device('cuda:{}'.format(self.rank))
|
||||
self.dpo = dpo
|
||||
if self.dpo:
|
||||
self.dpo_loss = DPOLoss(beta, label_smoothing, ipo)
|
||||
else:
|
||||
self.dpo_loss = None
|
||||
|
||||
def train_one_epoc(self, model, optimizer, scheduler, train_data_loader, cv_data_loader, writer, info_dict, scaler, group_join, ref_model=None):
|
||||
''' Train one epoch
|
||||
'''
|
||||
|
||||
lr = optimizer.param_groups[0]['lr']
|
||||
logging.info('Epoch {} TRAIN info lr {} rank {}'.format(self.epoch, lr, self.rank))
|
||||
logging.info('using accumulate grad, new batch size is {} times'
|
||||
' larger than before'.format(info_dict['accum_grad']))
|
||||
# A context manager to be used in conjunction with an instance of
|
||||
# torch.nn.parallel.DistributedDataParallel to be able to train
|
||||
# with uneven inputs across participating processes.
|
||||
model.train()
|
||||
if self.dpo:
|
||||
assert ref_model is not None
|
||||
ref_model.eval()
|
||||
model_context = model.join if info_dict['train_engine'] == 'torch_ddp' else nullcontext
|
||||
with model_context():
|
||||
for batch_idx, batch_dict in enumerate(train_data_loader):
|
||||
info_dict["tag"] = "TRAIN"
|
||||
info_dict["step"] = self.step
|
||||
info_dict["epoch"] = self.epoch
|
||||
info_dict["batch_idx"] = batch_idx
|
||||
if cosyvoice_join(group_join, info_dict):
|
||||
break
|
||||
|
||||
# Disable gradient synchronizations across DDP processes.
|
||||
# Within this context, gradients will be accumulated on module
|
||||
# variables, which will later be synchronized.
|
||||
if info_dict['train_engine'] == 'torch_ddp' and (batch_idx + 1) % info_dict["accum_grad"] != 0:
|
||||
context = model.no_sync
|
||||
# Used for single gpu training and DDP gradient synchronization
|
||||
# processes.
|
||||
else:
|
||||
context = nullcontext
|
||||
|
||||
with context():
|
||||
info_dict = batch_forward(model, batch_dict, scaler, info_dict, ref_model, self.dpo_loss)
|
||||
info_dict = batch_backward(model, scaler, info_dict)
|
||||
|
||||
info_dict = update_parameter_and_lr(model, optimizer, scheduler, scaler, info_dict)
|
||||
log_per_step(writer, info_dict)
|
||||
# NOTE specify save_per_step in cosyvoice.yaml if you want to enable step save
|
||||
if info_dict['save_per_step'] > 0 and (self.step + 1) % info_dict['save_per_step'] == 0 and \
|
||||
(batch_idx + 1) % info_dict["accum_grad"] == 0:
|
||||
dist.barrier()
|
||||
self.cv(model, cv_data_loader, writer, info_dict, on_batch_end=False, ref_model=ref_model, dpo_loss=self.dpo_loss)
|
||||
model.train()
|
||||
if (batch_idx + 1) % info_dict["accum_grad"] == 0:
|
||||
self.step += 1
|
||||
dist.barrier()
|
||||
self.cv(model, cv_data_loader, writer, info_dict, on_batch_end=True, ref_model=ref_model, dpo_loss=self.dpo_loss)
|
||||
|
||||
def train_one_epoc_gan(self, model, optimizer, scheduler, optimizer_d, scheduler_d, train_data_loader, cv_data_loader,
|
||||
writer, info_dict, scaler, group_join):
|
||||
''' Train one epoch
|
||||
'''
|
||||
|
||||
lr = optimizer.param_groups[0]['lr']
|
||||
logging.info('Epoch {} TRAIN info lr {} rank {}'.format(self.epoch, lr, self.rank))
|
||||
logging.info('using accumulate grad, new batch size is {} times'
|
||||
' larger than before'.format(info_dict['accum_grad']))
|
||||
# A context manager to be used in conjunction with an instance of
|
||||
# torch.nn.parallel.DistributedDataParallel to be able to train
|
||||
# with uneven inputs across participating processes.
|
||||
model.train()
|
||||
model_context = model.join if info_dict['train_engine'] == 'torch_ddp' else nullcontext
|
||||
with model_context():
|
||||
for batch_idx, batch_dict in enumerate(train_data_loader):
|
||||
info_dict["tag"] = "TRAIN"
|
||||
info_dict["step"] = self.step
|
||||
info_dict["epoch"] = self.epoch
|
||||
info_dict["batch_idx"] = batch_idx
|
||||
if cosyvoice_join(group_join, info_dict):
|
||||
break
|
||||
|
||||
# Disable gradient synchronizations across DDP processes.
|
||||
# Within this context, gradients will be accumulated on module
|
||||
# variables, which will later be synchronized.
|
||||
if info_dict['train_engine'] == 'torch_ddp' and (batch_idx + 1) % info_dict["accum_grad"] != 0:
|
||||
context = model.no_sync
|
||||
# Used for single gpu training and DDP gradient synchronization
|
||||
# processes.
|
||||
else:
|
||||
context = nullcontext
|
||||
|
||||
with context():
|
||||
batch_dict['turn'] = 'discriminator'
|
||||
info_dict = batch_forward(model, batch_dict, scaler, info_dict)
|
||||
info_dict = batch_backward(model, scaler, info_dict)
|
||||
info_dict = update_parameter_and_lr(model, optimizer_d, scheduler_d, scaler, info_dict)
|
||||
optimizer.zero_grad()
|
||||
log_per_step(writer, info_dict)
|
||||
with context():
|
||||
batch_dict['turn'] = 'generator'
|
||||
info_dict = batch_forward(model, batch_dict, scaler, info_dict)
|
||||
info_dict = batch_backward(model, scaler, info_dict)
|
||||
info_dict = update_parameter_and_lr(model, optimizer, scheduler, scaler, info_dict)
|
||||
optimizer_d.zero_grad()
|
||||
log_per_step(writer, info_dict)
|
||||
# NOTE specify save_per_step in cosyvoice.yaml if you want to enable step save
|
||||
if info_dict['save_per_step'] > 0 and (self.step + 1) % info_dict['save_per_step'] == 0 and \
|
||||
(batch_idx + 1) % info_dict["accum_grad"] == 0:
|
||||
dist.barrier()
|
||||
self.cv(model, cv_data_loader, writer, info_dict, on_batch_end=False)
|
||||
model.train()
|
||||
if (batch_idx + 1) % info_dict["accum_grad"] == 0:
|
||||
self.step += 1
|
||||
dist.barrier()
|
||||
self.cv(model, cv_data_loader, writer, info_dict, on_batch_end=True)
|
||||
|
||||
@torch.inference_mode()
|
||||
def cv(self, model, cv_data_loader, writer, info_dict, on_batch_end=True, ref_model=None, dpo_loss=None):
|
||||
''' Cross validation on
|
||||
'''
|
||||
logging.info('Epoch {} Step {} on_batch_end {} CV rank {}'.format(self.epoch, self.step + 1, on_batch_end, self.rank))
|
||||
model.eval()
|
||||
if self.dpo:
|
||||
assert ref_model is not None
|
||||
ref_model.eval()
|
||||
total_num_utts, total_loss_dict = 0, {} # avoid division by 0
|
||||
for batch_idx, batch_dict in enumerate(cv_data_loader):
|
||||
info_dict["tag"] = "CV"
|
||||
info_dict["step"] = self.step
|
||||
info_dict["epoch"] = self.epoch
|
||||
info_dict["batch_idx"] = batch_idx
|
||||
|
||||
num_utts = len(batch_dict["utts"])
|
||||
total_num_utts += num_utts
|
||||
|
||||
if self.gan is True:
|
||||
batch_dict['turn'] = 'generator'
|
||||
info_dict = batch_forward(model, batch_dict, None, info_dict, ref_model, dpo_loss)
|
||||
|
||||
for k, v in info_dict['loss_dict'].items():
|
||||
if k not in total_loss_dict:
|
||||
total_loss_dict[k] = []
|
||||
total_loss_dict[k].append(v.item() * num_utts)
|
||||
log_per_step(None, info_dict)
|
||||
for k, v in total_loss_dict.items():
|
||||
total_loss_dict[k] = sum(v) / total_num_utts
|
||||
info_dict['loss_dict'] = total_loss_dict
|
||||
log_per_save(writer, info_dict)
|
||||
model_name = 'epoch_{}_whole'.format(self.epoch) if on_batch_end else 'epoch_{}_step_{}'.format(self.epoch, self.step + 1)
|
||||
save_model(model, model_name, info_dict)
|
||||
57
cosyvoice/utils/losses_dpo.py
Normal file
57
cosyvoice/utils/losses_dpo.py
Normal file
@@ -0,0 +1,57 @@
|
||||
import torch
|
||||
import torch.nn.functional as F
|
||||
from typing import Tuple
|
||||
|
||||
|
||||
def tpr_loss(disc_real_outputs, disc_generated_outputs, tau):
|
||||
loss = 0
|
||||
for dr, dg in zip(disc_real_outputs, disc_generated_outputs):
|
||||
m_DG = torch.median((dr - dg))
|
||||
L_rel = torch.mean((((dr - dg) - m_DG) ** 2)[dr < dg + m_DG])
|
||||
loss += tau - F.relu(tau - L_rel)
|
||||
return loss
|
||||
|
||||
|
||||
def mel_loss(real_speech, generated_speech, mel_transforms):
|
||||
loss = 0
|
||||
for transform in mel_transforms:
|
||||
mel_r = transform(real_speech)
|
||||
mel_g = transform(generated_speech)
|
||||
loss += F.l1_loss(mel_g, mel_r)
|
||||
return loss
|
||||
|
||||
|
||||
class DPOLoss(torch.nn.Module):
|
||||
"""
|
||||
DPO Loss
|
||||
"""
|
||||
|
||||
def __init__(self, beta: float, label_smoothing: float = 0.0, ipo: bool = False) -> None:
|
||||
super().__init__()
|
||||
self.beta = beta
|
||||
self.label_smoothing = label_smoothing
|
||||
self.ipo = ipo
|
||||
|
||||
def forward(
|
||||
self,
|
||||
policy_chosen_logps: torch.Tensor,
|
||||
policy_rejected_logps: torch.Tensor,
|
||||
reference_chosen_logps: torch.Tensor,
|
||||
reference_rejected_logps: torch.Tensor,
|
||||
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
|
||||
pi_logratios = policy_chosen_logps - policy_rejected_logps
|
||||
ref_logratios = reference_chosen_logps - reference_rejected_logps
|
||||
logits = pi_logratios - ref_logratios
|
||||
if self.ipo:
|
||||
losses = (logits - 1 / (2 * self.beta)) ** 2 # Eq. 17 of https://arxiv.org/pdf/2310.12036v2.pdf
|
||||
else:
|
||||
# Eq. 3 https://ericmitchell.ai/cdpo.pdf; label_smoothing=0 gives original DPO (Eq. 7 of https://arxiv.org/pdf/2305.18290.pdf)
|
||||
losses = (
|
||||
-F.logsigmoid(self.beta * logits) * (1 - self.label_smoothing)
|
||||
- F.logsigmoid(-self.beta * logits) * self.label_smoothing
|
||||
)
|
||||
loss = losses.mean()
|
||||
chosen_rewards = self.beta * (policy_chosen_logps - reference_chosen_logps).detach()
|
||||
rejected_rewards = self.beta * (policy_rejected_logps - reference_rejected_logps).detach()
|
||||
|
||||
return loss, chosen_rewards, rejected_rewards
|
||||
364
cosyvoice/utils/train_utils_dpo.py
Normal file
364
cosyvoice/utils/train_utils_dpo.py
Normal file
@@ -0,0 +1,364 @@
|
||||
# Copyright (c) 2021 Mobvoi Inc. (authors: Binbin Zhang)
|
||||
# 2023 Horizon Inc. (authors: Xingchen Song)
|
||||
# 2024 Alibaba Inc (authors: Xiang Lyu)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import torch
|
||||
import json
|
||||
import re
|
||||
import datetime
|
||||
import yaml
|
||||
|
||||
import deepspeed
|
||||
import torch.optim as optim
|
||||
import torch.distributed as dist
|
||||
|
||||
from torch.utils.tensorboard import SummaryWriter
|
||||
from torch.utils.data import DataLoader
|
||||
from torch.nn.utils import clip_grad_norm_
|
||||
|
||||
from deepspeed.runtime.zero.stage_1_and_2 import estimate_zero2_model_states_mem_needs_all_live
|
||||
|
||||
from cosyvoice.dataset.dataset import Dataset
|
||||
from cosyvoice.utils.scheduler import WarmupLR, NoamHoldAnnealing, ConstantLR
|
||||
|
||||
|
||||
def init_distributed(args):
|
||||
world_size = int(os.environ.get('WORLD_SIZE', 1))
|
||||
local_rank = int(os.environ.get('LOCAL_RANK', 0))
|
||||
rank = int(os.environ.get('RANK', 0))
|
||||
logging.info('training on multiple gpus, this gpu {}'.format(local_rank) +
|
||||
', rank {}, world_size {}'.format(rank, world_size))
|
||||
if args.train_engine == 'torch_ddp':
|
||||
torch.cuda.set_device(local_rank)
|
||||
dist.init_process_group(args.dist_backend)
|
||||
else:
|
||||
deepspeed.init_distributed(dist_backend=args.dist_backend)
|
||||
return world_size, local_rank, rank
|
||||
|
||||
|
||||
def init_dataset_and_dataloader(args, configs, gan):
|
||||
data_pipeline = configs['data_pipeline_gan'] if gan is True else configs['data_pipeline']
|
||||
train_dataset = Dataset(args.train_data, data_pipeline=data_pipeline, mode='train', gan=gan, shuffle=True, partition=True)
|
||||
cv_dataset = Dataset(args.cv_data, data_pipeline=data_pipeline, mode='train', gan=gan, shuffle=False, partition=False)
|
||||
|
||||
# do not use persistent_workers=True, as whisper tokenizer opens tiktoken file each time when the for loop starts
|
||||
train_data_loader = DataLoader(train_dataset,
|
||||
batch_size=None,
|
||||
pin_memory=args.pin_memory,
|
||||
num_workers=args.num_workers,
|
||||
prefetch_factor=args.prefetch)
|
||||
cv_data_loader = DataLoader(cv_dataset,
|
||||
batch_size=None,
|
||||
pin_memory=args.pin_memory,
|
||||
num_workers=args.num_workers,
|
||||
prefetch_factor=args.prefetch)
|
||||
return train_dataset, cv_dataset, train_data_loader, cv_data_loader
|
||||
|
||||
|
||||
def check_modify_and_save_config(args, configs):
|
||||
if args.train_engine == "torch_ddp":
|
||||
configs['train_conf']["dtype"] = 'fp32'
|
||||
else:
|
||||
with open(args.deepspeed_config, 'r') as fin:
|
||||
ds_configs = json.load(fin)
|
||||
if "fp16" in ds_configs and ds_configs["fp16"]["enabled"]:
|
||||
configs['train_conf']["dtype"] = "fp16"
|
||||
elif "bf16" in ds_configs and ds_configs["bf16"]["enabled"]:
|
||||
configs['train_conf']["dtype"] = "bf16"
|
||||
else:
|
||||
configs['train_conf']["dtype"] = "fp32"
|
||||
assert ds_configs["train_micro_batch_size_per_gpu"] == 1
|
||||
# if use deepspeed, override ddp config
|
||||
configs['train_conf']['save_per_step'] = int(configs['train_conf']['save_per_step'] *
|
||||
configs['train_conf']['accum_grad'] / ds_configs["gradient_accumulation_steps"])
|
||||
configs['train_conf']['accum_grad'] = ds_configs["gradient_accumulation_steps"]
|
||||
configs['train_conf']['grad_clip'] = ds_configs["gradient_clipping"]
|
||||
configs['train_conf']['log_interval'] = ds_configs["steps_per_print"]
|
||||
return configs
|
||||
|
||||
|
||||
def wrap_cuda_model(args, model):
|
||||
local_world_size = int(os.environ.get('LOCAL_WORLD_SIZE', 1))
|
||||
world_size = int(os.environ.get('WORLD_SIZE', 1))
|
||||
if args.train_engine == "torch_ddp": # native pytorch ddp
|
||||
assert (torch.cuda.is_available())
|
||||
model.cuda()
|
||||
model = torch.nn.parallel.DistributedDataParallel(model, find_unused_parameters=True)
|
||||
else:
|
||||
if int(os.environ.get('RANK', 0)) == 0:
|
||||
logging.info("Estimating model states memory needs (zero2)...")
|
||||
estimate_zero2_model_states_mem_needs_all_live(
|
||||
model,
|
||||
num_gpus_per_node=local_world_size,
|
||||
num_nodes=world_size // local_world_size)
|
||||
return model
|
||||
|
||||
|
||||
def init_optimizer_and_scheduler(args, configs, model, gan):
|
||||
if gan is False:
|
||||
if configs['train_conf']['optim'] == 'adam':
|
||||
optimizer = optim.Adam(model.parameters(), **configs['train_conf']['optim_conf'])
|
||||
elif configs['train_conf']['optim'] == 'adamw':
|
||||
optimizer = optim.AdamW(model.parameters(), **configs['train_conf']['optim_conf'])
|
||||
else:
|
||||
raise ValueError("unknown optimizer: " + configs['train_conf'])
|
||||
|
||||
if configs['train_conf']['scheduler'] == 'warmuplr':
|
||||
scheduler_type = WarmupLR
|
||||
scheduler = WarmupLR(optimizer, **configs['train_conf']['scheduler_conf'])
|
||||
elif configs['train_conf']['scheduler'] == 'NoamHoldAnnealing':
|
||||
scheduler_type = NoamHoldAnnealing
|
||||
scheduler = NoamHoldAnnealing(optimizer, **configs['train_conf']['scheduler_conf'])
|
||||
elif configs['train_conf']['scheduler'] == 'constantlr':
|
||||
scheduler_type = ConstantLR
|
||||
scheduler = ConstantLR(optimizer)
|
||||
else:
|
||||
raise ValueError("unknown scheduler: " + configs['train_conf'])
|
||||
|
||||
# use deepspeed optimizer for speedup
|
||||
if args.train_engine == "deepspeed":
|
||||
def scheduler(opt):
|
||||
return scheduler_type(opt, **configs['train_conf']['scheduler_conf'])
|
||||
model, optimizer, _, scheduler = deepspeed.initialize(
|
||||
args=args,
|
||||
model=model,
|
||||
optimizer=None,
|
||||
lr_scheduler=scheduler,
|
||||
model_parameters=model.parameters())
|
||||
|
||||
optimizer_d, scheduler_d = None, None
|
||||
|
||||
else:
|
||||
# currently we wrap generator and discriminator in one model, so we cannot use deepspeed
|
||||
if configs['train_conf']['optim'] == 'adam':
|
||||
optimizer = optim.Adam(model.module.generator.parameters(), **configs['train_conf']['optim_conf'])
|
||||
elif configs['train_conf']['optim'] == 'adamw':
|
||||
optimizer = optim.AdamW(model.module.generator.parameters(), **configs['train_conf']['optim_conf'])
|
||||
else:
|
||||
raise ValueError("unknown optimizer: " + configs['train_conf'])
|
||||
|
||||
if configs['train_conf']['scheduler'] == 'warmuplr':
|
||||
scheduler_type = WarmupLR
|
||||
scheduler = WarmupLR(optimizer, **configs['train_conf']['scheduler_conf'])
|
||||
elif configs['train_conf']['scheduler'] == 'NoamHoldAnnealing':
|
||||
scheduler_type = NoamHoldAnnealing
|
||||
scheduler = NoamHoldAnnealing(optimizer, **configs['train_conf']['scheduler_conf'])
|
||||
elif configs['train_conf']['scheduler'] == 'constantlr':
|
||||
scheduler_type = ConstantLR
|
||||
scheduler = ConstantLR(optimizer)
|
||||
else:
|
||||
raise ValueError("unknown scheduler: " + configs['train_conf'])
|
||||
|
||||
if configs['train_conf']['optim_d'] == 'adam':
|
||||
optimizer_d = optim.Adam(model.module.discriminator.parameters(), **configs['train_conf']['optim_conf'])
|
||||
elif configs['train_conf']['optim_d'] == 'adamw':
|
||||
optimizer_d = optim.AdamW(model.module.discriminator.parameters(), **configs['train_conf']['optim_conf'])
|
||||
else:
|
||||
raise ValueError("unknown optimizer: " + configs['train_conf'])
|
||||
|
||||
if configs['train_conf']['scheduler_d'] == 'warmuplr':
|
||||
scheduler_type = WarmupLR
|
||||
scheduler_d = WarmupLR(optimizer_d, **configs['train_conf']['scheduler_conf'])
|
||||
elif configs['train_conf']['scheduler_d'] == 'NoamHoldAnnealing':
|
||||
scheduler_type = NoamHoldAnnealing
|
||||
scheduler_d = NoamHoldAnnealing(optimizer_d, **configs['train_conf']['scheduler_conf'])
|
||||
elif configs['train_conf']['scheduler'] == 'constantlr':
|
||||
scheduler_type = ConstantLR
|
||||
scheduler_d = ConstantLR(optimizer_d)
|
||||
else:
|
||||
raise ValueError("unknown scheduler: " + configs['train_conf'])
|
||||
return model, optimizer, scheduler, optimizer_d, scheduler_d
|
||||
|
||||
|
||||
def init_summarywriter(args):
|
||||
writer = None
|
||||
if int(os.environ.get('RANK', 0)) == 0:
|
||||
os.makedirs(args.model_dir, exist_ok=True)
|
||||
writer = SummaryWriter(args.tensorboard_dir)
|
||||
return writer
|
||||
|
||||
|
||||
def save_model(model, model_name, info_dict):
|
||||
rank = int(os.environ.get('RANK', 0))
|
||||
model_dir = info_dict["model_dir"]
|
||||
save_model_path = os.path.join(model_dir, '{}.pt'.format(model_name))
|
||||
|
||||
if info_dict["train_engine"] == "torch_ddp":
|
||||
if rank == 0:
|
||||
torch.save({**model.module.state_dict(), 'epoch': info_dict['epoch'], 'step': info_dict['step']}, save_model_path)
|
||||
else:
|
||||
with torch.no_grad():
|
||||
model.save_checkpoint(save_dir=model_dir,
|
||||
tag=model_name,
|
||||
client_state=info_dict)
|
||||
if rank == 0:
|
||||
info_path = re.sub('.pt$', '.yaml', save_model_path)
|
||||
info_dict['save_time'] = datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')
|
||||
with open(info_path, 'w') as fout:
|
||||
data = yaml.dump(info_dict)
|
||||
fout.write(data)
|
||||
logging.info('[Rank {}] Checkpoint: save to checkpoint {}'.format(rank, save_model_path))
|
||||
|
||||
|
||||
def cosyvoice_join(group_join, info_dict):
|
||||
world_size = int(os.environ.get('WORLD_SIZE', 1))
|
||||
local_rank = int(os.environ.get('LOCAL_RANK', 0))
|
||||
rank = int(os.environ.get('RANK', 0))
|
||||
|
||||
if info_dict["batch_idx"] != 0:
|
||||
# we try to join all rank in both ddp and deepspeed mode, in case different rank has different lr
|
||||
try:
|
||||
dist.monitored_barrier(group=group_join,
|
||||
timeout=group_join.options._timeout)
|
||||
return False
|
||||
except RuntimeError as e:
|
||||
logging.info("Detected uneven workload distribution: {}\n".format(e) +
|
||||
"Break current worker to manually join all workers, " +
|
||||
"world_size {}, current rank {}, current local_rank {}\n".
|
||||
format(world_size, rank, local_rank))
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def batch_forward(model, batch, scaler, info_dict, ref_model=None, dpo_loss=None):
|
||||
device = int(os.environ.get('LOCAL_RANK', 0))
|
||||
|
||||
dtype = info_dict["dtype"]
|
||||
if dtype == "fp16":
|
||||
dtype = torch.float16
|
||||
elif dtype == "bf16":
|
||||
dtype = torch.bfloat16
|
||||
else: # fp32
|
||||
dtype = torch.float32
|
||||
|
||||
if info_dict['train_engine'] == 'torch_ddp':
|
||||
autocast = torch.cuda.amp.autocast(enabled=scaler is not None)
|
||||
else:
|
||||
autocast = torch.cuda.amp.autocast(enabled=True, dtype=dtype, cache_enabled=False)
|
||||
|
||||
with autocast:
|
||||
info_dict['loss_dict'] = model(batch, device)
|
||||
if ref_model and dpo_loss:
|
||||
chosen_logps = info_dict['loss_dict']["chosen_logps"]
|
||||
rejected_logps = info_dict['loss_dict']["rejected_logps"]
|
||||
sft_loss = info_dict['loss_dict']['loss']
|
||||
with torch.no_grad():
|
||||
ref_model = ref_model.to(device)
|
||||
ref_loss_dict = ref_model(batch, device)
|
||||
reference_chosen_logps = ref_loss_dict["chosen_logps"]
|
||||
reference_rejected_logps = ref_loss_dict["rejected_logps"]
|
||||
preference_loss, chosen_reward, reject_reward = dpo_loss(
|
||||
chosen_logps, rejected_logps, reference_chosen_logps, reference_rejected_logps
|
||||
)
|
||||
dpo_acc = (chosen_reward > reject_reward).float().mean()
|
||||
info_dict['loss_dict']["loss"] = preference_loss + sft_loss
|
||||
info_dict['loss_dict']["sft_loss"] = sft_loss
|
||||
info_dict['loss_dict']["dpo_loss"] = preference_loss
|
||||
info_dict['loss_dict']["dpo_acc"] = dpo_acc
|
||||
info_dict['loss_dict']["chosen_reward"] = chosen_reward.mean()
|
||||
info_dict['loss_dict']["reject_reward"] = reject_reward.mean()
|
||||
return info_dict
|
||||
|
||||
|
||||
def batch_backward(model, scaler, info_dict):
|
||||
if info_dict["train_engine"] == "deepspeed":
|
||||
scaled_loss = model.backward(info_dict['loss_dict']['loss'])
|
||||
else:
|
||||
scaled_loss = info_dict['loss_dict']['loss'] / info_dict['accum_grad']
|
||||
if scaler is not None:
|
||||
scaler.scale(scaled_loss).backward()
|
||||
else:
|
||||
scaled_loss.backward()
|
||||
|
||||
info_dict['loss_dict']['loss'] = scaled_loss
|
||||
return info_dict
|
||||
|
||||
|
||||
def update_parameter_and_lr(model, optimizer, scheduler, scaler, info_dict):
|
||||
grad_norm = 0.0
|
||||
if info_dict['train_engine'] == "deepspeed":
|
||||
info_dict["is_gradient_accumulation_boundary"] = model.is_gradient_accumulation_boundary()
|
||||
model.step()
|
||||
grad_norm = model.get_global_grad_norm()
|
||||
elif (info_dict['batch_idx'] + 1) % info_dict["accum_grad"] == 0:
|
||||
# Use mixed precision training
|
||||
if scaler is not None:
|
||||
scaler.unscale_(optimizer)
|
||||
grad_norm = clip_grad_norm_(model.parameters(), info_dict['grad_clip'])
|
||||
# We don't check grad here since that if the gradient
|
||||
# has inf/nan values, scaler.step will skip
|
||||
# optimizer.step().
|
||||
if torch.isfinite(grad_norm):
|
||||
scaler.step(optimizer)
|
||||
scaler.update()
|
||||
else:
|
||||
grad_norm = clip_grad_norm_(model.parameters(), info_dict['grad_clip'])
|
||||
if torch.isfinite(grad_norm):
|
||||
optimizer.step()
|
||||
optimizer.zero_grad()
|
||||
scheduler.step()
|
||||
info_dict["lr"] = optimizer.param_groups[0]['lr']
|
||||
info_dict["grad_norm"] = grad_norm
|
||||
return info_dict
|
||||
|
||||
|
||||
def log_per_step(writer, info_dict):
|
||||
tag = info_dict["tag"]
|
||||
epoch = info_dict.get('epoch', 0)
|
||||
step = info_dict["step"]
|
||||
batch_idx = info_dict["batch_idx"]
|
||||
loss_dict = info_dict['loss_dict']
|
||||
rank = int(os.environ.get('RANK', 0))
|
||||
|
||||
# only rank 0 write to tensorboard to avoid multi-process write
|
||||
if writer is not None:
|
||||
if (info_dict['train_engine'] == 'deepspeed' and info_dict['is_gradient_accumulation_boundary'] is True) or \
|
||||
(info_dict['train_engine'] == 'torch_ddp' and (info_dict['batch_idx'] + 1) % info_dict['accum_grad'] == 0):
|
||||
for k in ['epoch', 'lr', 'grad_norm']:
|
||||
writer.add_scalar('{}/{}'.format(tag, k), info_dict[k], step + 1)
|
||||
for k, v in loss_dict.items():
|
||||
writer.add_scalar('{}/{}'.format(tag, k), v, step + 1)
|
||||
|
||||
# TRAIN & CV, Shell log (stdout)
|
||||
if (info_dict['batch_idx'] + 1) % info_dict['log_interval'] == 0:
|
||||
log_str = '{} Batch {}/{} '.format(tag, epoch, batch_idx + 1)
|
||||
for name, value in loss_dict.items():
|
||||
log_str += '{} {:.6f} '.format(name, value)
|
||||
if tag == "TRAIN":
|
||||
log_str += 'lr {:.8f} grad_norm {:.6f}'.format(
|
||||
info_dict["lr"], info_dict['grad_norm'])
|
||||
log_str += ' rank {}'.format(rank)
|
||||
logging.debug(log_str)
|
||||
|
||||
|
||||
def log_per_save(writer, info_dict):
|
||||
tag = info_dict["tag"]
|
||||
epoch = info_dict["epoch"]
|
||||
step = info_dict["step"]
|
||||
loss_dict = info_dict["loss_dict"]
|
||||
lr = info_dict['lr']
|
||||
rank = int(os.environ.get('RANK', 0))
|
||||
logging.info(
|
||||
'Epoch {} Step {} CV info lr {} {} rank {}'.format(
|
||||
epoch, step + 1, lr, rank, ' '.join(['{}_{}'.format(k, v) for k, v in loss_dict.items()])))
|
||||
|
||||
if writer is not None:
|
||||
for k in ['epoch', 'lr']:
|
||||
writer.add_scalar('{}/{}'.format(tag, k), info_dict[k], step + 1)
|
||||
for k, v in loss_dict.items():
|
||||
writer.add_scalar('{}/{}'.format(tag, k), v, step + 1)
|
||||
Reference in New Issue
Block a user