From 5d6db4aee0154c535cf50d6943cdfe8954e2741b Mon Sep 17 00:00:00 2001 From: Neil Thomas Date: Thu, 8 Aug 2019 16:29:32 -0700 Subject: [PATCH 1/2] Use h5py for output data writing and consolidation to reduce memory footprint --- setup.py | 2 +- tape/__main__.py | 54 ++++++++++++++++++++++++++---------------------- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/setup.py b/setup.py index ea595eb..c360485 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ install_requires=[ 'tensorflow-gpu<1.14.0', # https://github.com/IDSIA/sacred/issues/493 'numpy', - 'rinokeras==1.1.1', + 'rinokeras==1.1.2', 'biopython', 'sacred', 'table_logger', diff --git a/tape/__main__.py b/tape/__main__.py index 60a75ce..d92698d 100644 --- a/tape/__main__.py +++ b/tape/__main__.py @@ -5,7 +5,9 @@ import os import shutil import pickle as pkl +import uuid +import h5py import tensorflow as tf from tape.tasks import TaskBuilder, Task, AbstractLanguageModelingTask @@ -207,31 +209,33 @@ def cleanup_folders(outdir: str, model, tasks, debug): def consolidate_data(outfile, include_hidden: bool = False): + """ + Turn batched h5 output file into flat h5 file + """ - with open(outfile, 'rb') as f: - outputs = pkl.load(f) - - data = defaultdict(list) # type: ignore - - for output in outputs: - output = output[0] - length = output['protein_length'] - for key, protein_batch in output.items(): - for protein_length, protein_data in zip(length, protein_batch): - if np.isscalar(protein_data): - data[key].append(protein_data) - elif protein_data.ndim == 1 and protein_data.dtype in [np.float32, np.float64]: - data[key].append(protein_data) - else: - data[key].append(protein_data[:protein_length]) - - data = dict(data) - - if not include_hidden: - del data['encoder_output'] - - with open(outfile, 'wb') as f: - pkl.dump(data, f) + tmp_id = uuid.uuid1().hex # just in case there's some weirdness + tmp_filename = 'outputs_tmp_{}.h5'.format(tmp_id) + i = 0 + with h5py.File(outfile, 'r') as f, h5py.File(tmp_filename, 'w') as f_out: + for key in f.keys(): # iterate over all batches + output = f[key] + length = output['protein_length'][()] + for key, protein_batch in output.items(): + protein_batch = protein_batch[()] + # iterate over all proteins in the batch + for protein_length, protein_data in zip(length, protein_batch): + grp = f_out.create_group(str(i)) + if np.isscalar(protein_data): + grp.create_dataset(key, data=protein_data) + elif protein_data.ndim == 1 and protein_data.dtype in [np.float32, np.float64]: + grp.create_dataset(key, data=protein_data) + else: + # truncate by length of the sequence to remove padding + grp.create_dataset(key, data=protein_data[:protein_length]) + i += 1 + + # be careful, this could take up many GB of disk space! (especially for the LSTM) + os.replace(tmp_filename, outfile) @proteins.command @@ -270,7 +274,7 @@ def eval(_run, _config, tasks: Union[str, List[str]], model: str): experiment.distribution_strategy, task_model, _config['load_task_from']) task_dir = os.path.dirname(_config['load_task_from']) - outfile = os.path.join(task_dir, 'outputs.pkl') + outfile = os.path.join(task_dir, 'outputs.h5') print('Saving outputs to {}'.format(outfile)) test_metrics = test_graph.run_epoch(save_outputs=outfile) print(test_metrics.get_average()) From c61ad8264604beb9475e4301a9705f454d79c38c Mon Sep 17 00:00:00 2001 From: Neil Thomas Date: Fri, 9 Aug 2019 14:31:48 -0700 Subject: [PATCH 2/2] Properly pick up each sequence in a batch --- tape/__main__.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tape/__main__.py b/tape/__main__.py index d92698d..cc2b9a7 100644 --- a/tape/__main__.py +++ b/tape/__main__.py @@ -220,11 +220,15 @@ def consolidate_data(outfile, include_hidden: bool = False): for key in f.keys(): # iterate over all batches output = f[key] length = output['protein_length'][()] + n_seqs = len(length) for key, protein_batch in output.items(): protein_batch = protein_batch[()] # iterate over all proteins in the batch - for protein_length, protein_data in zip(length, protein_batch): - grp = f_out.create_group(str(i)) + for index, protein_length, protein_data in zip(range(i, i+n_seqs), length, protein_batch): + try: + grp = f_out[str(index)] + except KeyError: + grp = f_out.create_group(str(index)) if np.isscalar(protein_data): grp.create_dataset(key, data=protein_data) elif protein_data.ndim == 1 and protein_data.dtype in [np.float32, np.float64]: @@ -232,7 +236,7 @@ def consolidate_data(outfile, include_hidden: bool = False): else: # truncate by length of the sequence to remove padding grp.create_dataset(key, data=protein_data[:protein_length]) - i += 1 + i += n_seqs # be careful, this could take up many GB of disk space! (especially for the LSTM) os.replace(tmp_filename, outfile) @@ -276,7 +280,7 @@ def eval(_run, _config, tasks: Union[str, List[str]], model: str): task_dir = os.path.dirname(_config['load_task_from']) outfile = os.path.join(task_dir, 'outputs.h5') print('Saving outputs to {}'.format(outfile)) - test_metrics = test_graph.run_epoch(save_outputs=outfile) + test_metrics = test_graph.run_epoch(save_outputs=outfile, save_format='h5') print(test_metrics.get_average()) consolidate_data(outfile, include_hidden=True)