use stream read to save memory

This commit is contained in:
lyuxiang.lx
2024-11-11 22:14:52 +08:00
parent 487701c98c
commit 5ed5bb15c8

View File

@@ -40,17 +40,18 @@ def parquet_opener(data, mode='train', tts_data={}):
assert 'src' in sample assert 'src' in sample
url = sample['src'] url = sample['src']
try: try:
df = pq.read_table(url).to_pandas() for df in pq.ParquetFile(url).iter_batches(batch_size=64):
for i in range(len(df)): df = df.to_pandas()
if mode == 'inference' and df.loc[i, 'utt'] not in tts_data: for i in range(len(df)):
continue if mode == 'inference' and df.loc[i, 'utt'] not in tts_data:
sample.update(dict(df.loc[i])) continue
if mode == 'train': sample.update(dict(df.loc[i]))
# NOTE do not return sample directly, must initialize a new dict if mode == 'train':
yield {**sample} # NOTE do not return sample directly, must initialize a new dict
else: yield {**sample}
for index, text in enumerate(tts_data[df.loc[i, 'utt']]): else:
yield {**sample, 'tts_index': index, 'tts_text': text} for index, text in enumerate(tts_data[df.loc[i, 'utt']]):
yield {**sample, 'tts_index': index, 'tts_text': text}
except Exception as ex: except Exception as ex:
logging.warning('Failed to open {}, ex info {}'.format(url, ex)) logging.warning('Failed to open {}, ex info {}'.format(url, ex))