From 2b36afaa7873ee7f90a0ae5977976ac57b4e6370 Mon Sep 17 00:00:00 2001 From: Daniel Tchon Date: Wed, 4 Jan 2023 12:53:40 -0800 Subject: [PATCH 01/11] Since `use_mpi=False`, set `method=local` as well --- xfel/command_line/striping.py | 1 + 1 file changed, 1 insertion(+) diff --git a/xfel/command_line/striping.py b/xfel/command_line/striping.py index ea63a8d220..ce088b0b2a 100644 --- a/xfel/command_line/striping.py +++ b/xfel/command_line/striping.py @@ -22,6 +22,7 @@ multiprocessing_override_str = ''' mp { + method = local use_mpi = False } ''' From 3c96767cc971935b68ea6b1e8d0f952be2577ce2 Mon Sep 17 00:00:00 2001 From: Daniel Tchon Date: Wed, 4 Jan 2023 15:53:04 -0800 Subject: [PATCH 02/11] Since `use_mpi=False`, set other mp commands for local --- xfel/command_line/striping.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/xfel/command_line/striping.py b/xfel/command_line/striping.py index ce088b0b2a..f9606bf2cc 100644 --- a/xfel/command_line/striping.py +++ b/xfel/command_line/striping.py @@ -24,6 +24,8 @@ mp { method = local use_mpi = False + mpi_command = source + local.include_mp_in_command = False } ''' From 817c1022a3eaa1bf3102c673995d778502279054 Mon Sep 17 00:00:00 2001 From: Daniel Tchon Date: Wed, 11 Jan 2023 18:11:53 -0800 Subject: [PATCH 03/11] Crudely stripe or chunk expts/refls instead of expt/refl files --- xfel/command_line/striping.py | 77 +++++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 22 deletions(-) diff --git a/xfel/command_line/striping.py b/xfel/command_line/striping.py index f9606bf2cc..cd34d15adf 100644 --- a/xfel/command_line/striping.py +++ b/xfel/command_line/striping.py @@ -1,4 +1,7 @@ from __future__ import absolute_import, division, print_function + +import abc + from six.moves import range # -*- Mode: Python; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 8 -*- # @@ -9,6 +12,7 @@ # dials.combine_experiments (optionally with clustering and selecting clusters). # from dials.util import show_mail_on_error +from dxtbx.model import ExperimentList from libtbx.phil import parse from libtbx.utils import Sorry from libtbx import easy_run @@ -246,6 +250,46 @@ for interactive unit cell clustering, use combine_experiments.clustering.dendrogram=True """ + +def chunk_pairs(expt_paths, refl_paths, max_size=1000): + """Distribute matching expt-refl pairs into chunks with < `max_size` expts""" + expt_lengths = [len(ExperimentList.from_file(expt_path, check_format=False)) + for expt_path in expt_paths] + chunk_count = math.ceil(sum(expt_lengths) / max_size) + estimated_fill = sum(expt_lengths) / chunk_count + chunks_indices = [[] for _ in range(chunk_count)] + chunk_lengths = [0] * chunk_count + currently_filled_chunk = 0 + for len_index, len_ in enumerate(expt_lengths): + if len_ / 2 + chunk_lengths[currently_filled_chunk] > estimated_fill: + currently_filled_chunk = min(chunk_count - 1, currently_filled_chunk + 1) + chunks_indices[currently_filled_chunk].append(len_index) + chunk_lengths[currently_filled_chunk] += len_ + chunked_expts, chunked_refls = [], [] + for chunk_indices in chunks_indices: + chunked_expts.append([expt_paths[i] for i in chunk_indices]) + chunked_refls.append([refl_paths[i] for i in chunk_indices]) + return chunked_expts, chunked_refls, chunk_lengths + + +def stripe_pairs(expt_paths, refl_paths, max_size=1000): + """Distribute matching expt-refl pairs into stripes with <`max_size` expts""" + expt_lengths = [len(ExperimentList.from_file(expt_path, check_format=False)) + for expt_path in expt_paths] + stripe_count = math.ceil(sum(expt_lengths) / max_size) + stripe_indices = [[] for _ in range(stripe_count)] + stripe_lengths = [0, ] * stripe_count + for len_index, len_ in enumerate(expt_lengths): + currently_filled_stripe = stripe_lengths.index(min(stripe_lengths)) + stripe_indices[currently_filled_stripe].append(len_index) + stripe_lengths[currently_filled_stripe] += len_ + striped_expts, striped_refls = [], [] + for chunk_indices in stripe_indices: + striped_expts.append([expt_paths[i] for i in chunk_indices]) + striped_refls.append([refl_paths[i] for i in chunk_indices]) + return striped_expts, striped_refls, stripe_lengths + + def allocate_chunks(results_dir, trial_no, rgs_selected=None, @@ -276,7 +320,6 @@ def allocate_chunks(results_dir, rgs[rg] = [run] else: rgs[rg].append(run) - batch_chunk_nums_sizes = {} batch_contents = {} if respect_rungroup_barriers: batchable = {rg:{rg:runs} for rg, runs in six.iteritems(rgs)} @@ -315,35 +358,25 @@ def allocate_chunks(results_dir, print("no images found for %s" % batch) del batch_contents[batch] continue - n_chunks = int(math.ceil(n_img/max_size)) - chunk_size = int(math.ceil(n_img/n_chunks)) - batch_chunk_nums_sizes[batch] = (n_chunks, chunk_size) if len(batch_contents) == 0: raise Sorry("no DIALS integration results found.") refl_ending += extension batch_chunks = {} - for batch, num_size_tuple in six.iteritems(batch_chunk_nums_sizes): - num, size = num_size_tuple - batch_chunks[batch] = [] + for batch in batchable: contents = batch_contents[batch] expts = [c for c in contents if c.endswith(expt_ending)] refls = [c for c in contents if c.endswith(refl_ending)] expts, refls = match_dials_files(expts, refls, expt_ending, refl_ending) - if stripe: - for i in range(num): - expts_stripe = expts[i::num] - refls_stripe = refls[i::num] - batch_chunks[batch].append((expts_stripe, refls_stripe)) - print("striped %d experiments in %s with %d experiments per stripe and %d stripes" % \ - (len(expts), batch, len(batch_chunks[batch][0][0]), len(batch_chunks[batch]))) - else: - for i in range(num): - expts_chunk = expts[i*size:(i+1)*size] - refls_chunk = refls[i*size:(i+1)*size] - batch_chunks[batch].append((expts_chunk, refls_chunk)) - print("chunked %d experiments in %s with %d experiments per chunk and %d chunks" % \ - (len(expts), batch, len(batch_chunks[batch][0][0]), len(batch_chunks[batch]))) - return batch_chunks + pack_func = stripe_pairs if stripe else chunk_pairs + pack_name = "stripe" if stripe else "chunk" + expts_packs, refls_packs, pack_lengths = pack_func(expts, refls) + for expts_pack, refls_pack in zip(expts_packs, refls_packs): + batch_chunks[batch] = (expts_pack, refls_pack) + r = '{}ed {} experiments from {} files in {} into {} {}s with sizes = {}' + print(r.format(pack_name.title(), sum(pack_lengths), len(expts), batch, + len(pack_lengths), pack_name, pack_lengths)) + return batch_chunks + def parse_retaining_scope(args, phil_scope=phil_scope): if "-c" in args: From 208056575212c72ce5ed1bcd692d7cd38249231d Mon Sep 17 00:00:00 2001 From: Daniel Tchon Date: Wed, 11 Jan 2023 18:33:47 -0800 Subject: [PATCH 04/11] Fix batch_chunks format passed to files --- xfel/command_line/striping.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/xfel/command_line/striping.py b/xfel/command_line/striping.py index cd34d15adf..7c85feb929 100644 --- a/xfel/command_line/striping.py +++ b/xfel/command_line/striping.py @@ -363,6 +363,7 @@ def allocate_chunks(results_dir, refl_ending += extension batch_chunks = {} for batch in batchable: + batch_chunks[batch] = [] contents = batch_contents[batch] expts = [c for c in contents if c.endswith(expt_ending)] refls = [c for c in contents if c.endswith(refl_ending)] @@ -371,7 +372,7 @@ def allocate_chunks(results_dir, pack_name = "stripe" if stripe else "chunk" expts_packs, refls_packs, pack_lengths = pack_func(expts, refls) for expts_pack, refls_pack in zip(expts_packs, refls_packs): - batch_chunks[batch] = (expts_pack, refls_pack) + batch_chunks[batch].append((expts_pack, refls_pack)) r = '{}ed {} experiments from {} files in {} into {} {}s with sizes = {}' print(r.format(pack_name.title(), sum(pack_lengths), len(expts), batch, len(pack_lengths), pack_name, pack_lengths)) From a557b2b3557836fe451a2f2fe484d6a9670b59e8 Mon Sep 17 00:00:00 2001 From: Daniel Tchon Date: Wed, 11 Jan 2023 18:57:36 -0800 Subject: [PATCH 05/11] Sort expt and refl paths before striping to fix in-chunk order --- xfel/command_line/striping.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xfel/command_line/striping.py b/xfel/command_line/striping.py index 7c85feb929..769f7056b8 100644 --- a/xfel/command_line/striping.py +++ b/xfel/command_line/striping.py @@ -365,8 +365,8 @@ def allocate_chunks(results_dir, for batch in batchable: batch_chunks[batch] = [] contents = batch_contents[batch] - expts = [c for c in contents if c.endswith(expt_ending)] - refls = [c for c in contents if c.endswith(refl_ending)] + expts = sorted([c for c in contents if c.endswith(expt_ending)]) + refls = sorted([c for c in contents if c.endswith(refl_ending)]) expts, refls = match_dials_files(expts, refls, expt_ending, refl_ending) pack_func = stripe_pairs if stripe else chunk_pairs pack_name = "stripe" if stripe else "chunk" From c46bf5d88ea3cf7b06c72d9a85abbf33b7323462 Mon Sep 17 00:00:00 2001 From: Daniel Tchon Date: Thu, 12 Jan 2023 11:19:07 -0800 Subject: [PATCH 06/11] Obey `max_size=striping.chunk_size` when striping/chunking --- xfel/command_line/striping.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xfel/command_line/striping.py b/xfel/command_line/striping.py index 769f7056b8..05746d674f 100644 --- a/xfel/command_line/striping.py +++ b/xfel/command_line/striping.py @@ -54,7 +54,7 @@ .help = "Enable to select results evenly spaced across each rungroup" "(stripes) as opposed to contiguous chunks." chunk_size = 1000 - .type = float + .type = int(value_min=1) .help = "Maximum number of images per chunk or stripe." respect_rungroup_barriers = True .type = bool @@ -370,7 +370,7 @@ def allocate_chunks(results_dir, expts, refls = match_dials_files(expts, refls, expt_ending, refl_ending) pack_func = stripe_pairs if stripe else chunk_pairs pack_name = "stripe" if stripe else "chunk" - expts_packs, refls_packs, pack_lengths = pack_func(expts, refls) + expts_packs, refls_packs, pack_lengths = pack_func(expts, refls, max_size) for expts_pack, refls_pack in zip(expts_packs, refls_packs): batch_chunks[batch].append((expts_pack, refls_pack)) r = '{}ed {} experiments from {} files in {} into {} {}s with sizes = {}' From 928e75bc43e54de280a93edeca61d450a8cb43e0 Mon Sep 17 00:00:00 2001 From: Daniel Tchon Date: Thu, 12 Jan 2023 12:52:04 -0800 Subject: [PATCH 07/11] Fix typo in striping output --- xfel/command_line/striping.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/xfel/command_line/striping.py b/xfel/command_line/striping.py index 05746d674f..9c87c75e08 100644 --- a/xfel/command_line/striping.py +++ b/xfel/command_line/striping.py @@ -369,13 +369,13 @@ def allocate_chunks(results_dir, refls = sorted([c for c in contents if c.endswith(refl_ending)]) expts, refls = match_dials_files(expts, refls, expt_ending, refl_ending) pack_func = stripe_pairs if stripe else chunk_pairs - pack_name = "stripe" if stripe else "chunk" expts_packs, refls_packs, pack_lengths = pack_func(expts, refls, max_size) for expts_pack, refls_pack in zip(expts_packs, refls_packs): batch_chunks[batch].append((expts_pack, refls_pack)) - r = '{}ed {} experiments from {} files in {} into {} {}s with sizes = {}' - print(r.format(pack_name.title(), sum(pack_lengths), len(expts), batch, - len(pack_lengths), pack_name, pack_lengths)) + r = '{} {} experiments from {} files in {} into {} {}s with sizes = {}' + print(r.format("Striped" if stripe else "Chunked", sum(pack_lengths), + len(expts), batch, len(pack_lengths), + "stripes" if stripe else "chunks", pack_lengths)) return batch_chunks From 0b2321654a6811f98a7529811c152c3d804152b9 Mon Sep 17 00:00:00 2001 From: Daniel Tchon Date: Fri, 13 Jan 2023 11:02:28 -0800 Subject: [PATCH 08/11] Overwrite `mp.mpi_option` which previously killed integration --- xfel/command_line/striping.py | 1 + 1 file changed, 1 insertion(+) diff --git a/xfel/command_line/striping.py b/xfel/command_line/striping.py index 9c87c75e08..f2dc014e06 100644 --- a/xfel/command_line/striping.py +++ b/xfel/command_line/striping.py @@ -29,6 +29,7 @@ method = local use_mpi = False mpi_command = source + mpi_option = "" local.include_mp_in_command = False } ''' From 8261e88190b312a214754db78b60364fefef1a1e Mon Sep 17 00:00:00 2001 From: Daniel Tchon Date: Fri, 13 Jan 2023 12:20:21 -0800 Subject: [PATCH 09/11] Fix minor typo in striping output string --- xfel/command_line/striping.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xfel/command_line/striping.py b/xfel/command_line/striping.py index f2dc014e06..ac3a8c433f 100644 --- a/xfel/command_line/striping.py +++ b/xfel/command_line/striping.py @@ -373,7 +373,7 @@ def allocate_chunks(results_dir, expts_packs, refls_packs, pack_lengths = pack_func(expts, refls, max_size) for expts_pack, refls_pack in zip(expts_packs, refls_packs): batch_chunks[batch].append((expts_pack, refls_pack)) - r = '{} {} experiments from {} files in {} into {} {}s with sizes = {}' + r = '{} {} experiments from {} files in {} into {} {}{} with sizes = {}' print(r.format("Striped" if stripe else "Chunked", sum(pack_lengths), len(expts), batch, len(pack_lengths), "stripes" if stripe else "chunks", pack_lengths)) From ff79a9d2bbc3e1a3a4e8b7261b9ecb89b9b57a19 Mon Sep 17 00:00:00 2001 From: Daniel Tchon Date: Fri, 27 Jan 2023 18:10:16 -0800 Subject: [PATCH 10/11] Remove unused abc import --- xfel/command_line/striping.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/xfel/command_line/striping.py b/xfel/command_line/striping.py index ac3a8c433f..ed6a16f749 100644 --- a/xfel/command_line/striping.py +++ b/xfel/command_line/striping.py @@ -1,7 +1,4 @@ from __future__ import absolute_import, division, print_function - -import abc - from six.moves import range # -*- Mode: Python; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 8 -*- # From 529231d6bf2f51a864c9a3da622b00dc64712954 Mon Sep 17 00:00:00 2001 From: Daniel Tchon Date: Wed, 1 Feb 2023 14:04:41 -0800 Subject: [PATCH 11/11] Fix format string when reporting striping/chunking --- xfel/command_line/striping.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xfel/command_line/striping.py b/xfel/command_line/striping.py index ed6a16f749..a6c0dc2c30 100644 --- a/xfel/command_line/striping.py +++ b/xfel/command_line/striping.py @@ -370,7 +370,7 @@ def allocate_chunks(results_dir, expts_packs, refls_packs, pack_lengths = pack_func(expts, refls, max_size) for expts_pack, refls_pack in zip(expts_packs, refls_packs): batch_chunks[batch].append((expts_pack, refls_pack)) - r = '{} {} experiments from {} files in {} into {} {}{} with sizes = {}' + r = '{} {} experiments from {} files in {} into {} {} with sizes = {}' print(r.format("Striped" if stripe else "Chunked", sum(pack_lengths), len(expts), batch, len(pack_lengths), "stripes" if stripe else "chunks", pack_lengths))