From 7ee7c84fe7a56d0f3c843e4543112200392c8692 Mon Sep 17 00:00:00 2001 From: Nezar Abdennur Date: Thu, 19 Dec 2024 07:08:39 -0600 Subject: [PATCH 1/3] compat: Set multiprocessing fork method explicitly through context --- src/cooler/_reduce.py | 3 ++- src/cooler/cli/balance.py | 5 +++-- src/cooler/cli/cload.py | 8 +++++--- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/cooler/_reduce.py b/src/cooler/_reduce.py index f184913..2a480d1 100644 --- a/src/cooler/_reduce.py +++ b/src/cooler/_reduce.py @@ -722,7 +722,8 @@ def coarsen_cooler( try: # Note: fork before opening to prevent inconsistent global HDF5 state if nproc > 1: - pool = mp.Pool(nproc) + ctx = mp.get_context("fork") + pool = ctx.Pool(nproc) kwargs.setdefault("lock", lock) iterator = CoolerCoarsener( diff --git a/src/cooler/cli/balance.py b/src/cooler/cli/balance.py index 309a528..94043e8 100755 --- a/src/cooler/cli/balance.py +++ b/src/cooler/cli/balance.py @@ -2,9 +2,9 @@ import click import h5py +import multiprocess as mp import numpy as np import pandas as pd -from multiprocess import Pool from .._balance import balance_cooler from ..api import Cooler @@ -236,7 +236,8 @@ def balance( try: if nproc > 1: - pool = Pool(nproc) + ctx = mp.get_context("fork") + pool = ctx.Pool(nproc) map_ = pool.imap_unordered else: map_ = map diff --git a/src/cooler/cli/cload.py b/src/cooler/cli/cload.py index ded4b19..32f4300 100644 --- a/src/cooler/cli/cload.py +++ b/src/cooler/cli/cload.py @@ -2,11 +2,11 @@ import click import h5py +import multiprocess as mp import numpy as np import pandas as pd import simplejson as json from cytoolz import compose -from multiprocess import Pool from ..create import ( HDF5Aggregator, @@ -230,7 +230,8 @@ def tabix( try: map_func = map if nproc > 1: - pool = Pool(nproc) + ctx = mp.get_context("fork") + pool = ctx.Pool(nproc) logger.info(f"Using {nproc} cores") map_func = pool.imap @@ -331,7 +332,8 @@ def pairix( try: map_func = map if nproc > 1: - pool = Pool(nproc) + ctx = mp.get_context("fork") + pool = ctx.Pool(nproc) logger.info(f"Using {nproc} cores") map_func = pool.imap From ceb416ab494aaae30245571d3909bc63ea9ac3a2 Mon Sep 17 00:00:00 2001 From: Nezar Abdennur Date: Fri, 20 Dec 2024 17:54:54 -0600 Subject: [PATCH 2/3] Add accessors for reusable global mp context and lock --- src/cooler/_reduce.py | 7 +- src/cooler/cli/_util.py | 4 +- src/cooler/cli/balance.py | 4 +- src/cooler/cli/cload.py | 6 +- src/cooler/cli/coarsen.py | 4 +- src/cooler/cli/zoomify.py | 6 +- src/cooler/parallel.py | 155 +++++++++++++++++++++++++++----------- 7 files changed, 125 insertions(+), 61 deletions(-) diff --git a/src/cooler/_reduce.py b/src/cooler/_reduce.py index 2a480d1..bcb475d 100644 --- a/src/cooler/_reduce.py +++ b/src/cooler/_reduce.py @@ -8,7 +8,6 @@ from typing import Any, Literal import h5py -import multiprocess as mp import numpy as np import pandas as pd @@ -17,7 +16,7 @@ from ._version import __format_version_mcool__ from .api import Cooler from .create import ContactBinner, create -from .parallel import lock +from .parallel import get_mp_context, get_mp_lock from .util import GenomeSegmentation, parse_cooler_uri __all__ = ["coarsen_cooler", "merge_coolers", "zoomify_cooler"] @@ -637,6 +636,7 @@ def __iter__(self) -> Iterator[dict[str, np.ndarray]]: for i in range(0, len(spans), batchsize): try: if batchsize > 1: + lock = get_mp_lock() lock.acquire() results = self._map(self.aggregate, spans[i : i + batchsize]) finally: @@ -722,7 +722,8 @@ def coarsen_cooler( try: # Note: fork before opening to prevent inconsistent global HDF5 state if nproc > 1: - ctx = mp.get_context("fork") + ctx = get_mp_context() + lock = get_mp_lock() pool = ctx.Pool(nproc) kwargs.setdefault("lock", lock) diff --git a/src/cooler/cli/_util.py b/src/cooler/cli/_util.py index 1f10b2a..e3dd647 100644 --- a/src/cooler/cli/_util.py +++ b/src/cooler/cli/_util.py @@ -6,11 +6,11 @@ from functools import wraps import click -import multiprocess as mp import numpy as np import pandas as pd from .. import util +from ..parallel import get_mp_context class DelimitedTuple(click.types.ParamType): @@ -149,7 +149,7 @@ def check_ncpus(arg_value): if arg_value <= 0: raise click.BadParameter("n_cpus must be >= 1") else: - return min(arg_value, mp.cpu_count()) + return min(arg_value, get_mp_context().cpu_count()) @contextmanager diff --git a/src/cooler/cli/balance.py b/src/cooler/cli/balance.py index 94043e8..3924b17 100755 --- a/src/cooler/cli/balance.py +++ b/src/cooler/cli/balance.py @@ -2,12 +2,12 @@ import click import h5py -import multiprocess as mp import numpy as np import pandas as pd from .._balance import balance_cooler from ..api import Cooler +from ..parallel import get_mp_context from ..util import bedslice, parse_cooler_uri from . import cli, get_logger @@ -236,7 +236,7 @@ def balance( try: if nproc > 1: - ctx = mp.get_context("fork") + ctx = get_mp_context() pool = ctx.Pool(nproc) map_ = pool.imap_unordered else: diff --git a/src/cooler/cli/cload.py b/src/cooler/cli/cload.py index 32f4300..28b2371 100644 --- a/src/cooler/cli/cload.py +++ b/src/cooler/cli/cload.py @@ -2,7 +2,6 @@ import click import h5py -import multiprocess as mp import numpy as np import pandas as pd import simplejson as json @@ -16,6 +15,7 @@ create_cooler, sanitize_records, ) +from ..parallel import get_mp_context from . import cli, get_logger from ._util import parse_bins, parse_field_param, parse_kv_list_param @@ -230,7 +230,7 @@ def tabix( try: map_func = map if nproc > 1: - ctx = mp.get_context("fork") + ctx = get_mp_context() pool = ctx.Pool(nproc) logger.info(f"Using {nproc} cores") map_func = pool.imap @@ -332,7 +332,7 @@ def pairix( try: map_func = map if nproc > 1: - ctx = mp.get_context("fork") + ctx = get_mp_context() pool = ctx.Pool(nproc) logger.info(f"Using {nproc} cores") map_func = pool.imap diff --git a/src/cooler/cli/coarsen.py b/src/cooler/cli/coarsen.py index cb22ded..ed81a24 100644 --- a/src/cooler/cli/coarsen.py +++ b/src/cooler/cli/coarsen.py @@ -3,7 +3,7 @@ import click from .._reduce import coarsen_cooler -from ..parallel import lock +from ..parallel import get_mp_lock from ..util import parse_cooler_uri from . import cli from ._util import parse_field_param @@ -90,6 +90,6 @@ def coarsen(cool_uri, factor, nproc, chunksize, field, out, append): columns=columns, dtypes=dtypes, agg=agg, - lock=lock if same_file else None, + lock=get_mp_lock() if same_file else None, mode="a" if append else "w", ) diff --git a/src/cooler/cli/zoomify.py b/src/cooler/cli/zoomify.py index 5192146..0995f42 100644 --- a/src/cooler/cli/zoomify.py +++ b/src/cooler/cli/zoomify.py @@ -11,7 +11,7 @@ preferred_sequence, zoomify_cooler, ) -from ..parallel import lock +from ..parallel import get_mp_lock from ..util import parse_cooler_uri from . import cli, get_logger from ._util import parse_field_param @@ -142,7 +142,7 @@ def zoomify( if legacy: n_zooms, zoom_levels = legacy_zoomify( - cool_uri, outfile, nproc, chunksize, lock=lock + cool_uri, outfile, nproc, chunksize, lock=get_mp_lock() ) if balance: @@ -237,7 +237,7 @@ def zoomify( resolutions, chunksize, nproc=nproc, - lock=lock, + lock=get_mp_lock(), columns=columns, dtypes=dtypes, agg=agg, diff --git a/src/cooler/parallel.py b/src/cooler/parallel.py index bb28f66..acec1b8 100644 --- a/src/cooler/parallel.py +++ b/src/cooler/parallel.py @@ -1,53 +1,125 @@ -""" -Experimental API for developing split-apply-combine style algorithms on -coolers. +"""Experimental API for split-apply-combine style algorithms on coolers.""" -""" from __future__ import annotations +import multiprocessing as mp +import os +import sys from collections.abc import Iterable, Iterator, Sequence from functools import partial, reduce -from typing import Any, Callable - -from multiprocess import Lock +from typing import Any, Callable, Literal from ._typing import MapFunctor from .api import Cooler from .core import get from .util import partition -__all__ = ["lock", "partition", "split"] +__all__ = ["clear_mp_context", "get_mp_context", "get_mp_lock", "partition", "split"] -""" -Two possible reasons for using a lock +KeyType = Any +_ctx = None +_lock = None -(1) Prevent a concurrent process from opening an HDF5 file while the same -file is open for writing. In order for reading processes to obtain the correct -state, make sure the writing process finishes writing (flushes its buffers and -actually closes the file) before reading processes attempt to open it. -This explicit synchronization shouldn't be necessary if using the file in -SWMR mode. -See also: -* -* +def get_mp_context( + method: Literal["fork", "spawn", "forkserver"] | None = None, +) -> mp.context.BaseContext: + """Get a global multiprocessing context. -(2) Synchronize file access when opened before a fork(). Fork-based (Unix) -multiprocessing and concurrent reading are compatible as long as the fork -happens before the child processes open the file. If an HDF5 file is already -open before forking, the child processes inherit the same global HDF5 state, -which leads to a race condition that causes simultaneous access to fail. One -can either use a lock to prevent the race condition, or close and re-open the -file in the workers after the fork. + The context will be initialized on the first call. -See also: -* -* . + Parameters + ---------- + method : Literal["fork", "spawn", "forkserver"], optional + The method to use for starting child processes. The default is "fork" + on POSIX and "spawn" on Windows, which can be overridden by setting the + environment variable ``COOLER_DEFAULT_MP_METHOD``. -""" -lock = Lock() + Returns + ------- + mp.context.BaseContext -KeyType = Any + Notes + ----- + As of Python 3.14, "fork" is no longer the default start method on any + platform. This function preserves "fork" as the default start method on + POSIX systems and "spawn" on Windows systems. + """ + global _ctx + if _ctx is None: + if sys.platform == "win32": + default_method = "spawn" + else: + default_method = "fork" + default_method = os.getenv("COOLER_DEFAULT_MP_METHOD", default_method) + _ctx = mp.get_context(method or default_method) + elif method is not None: + if _ctx.get_start_method() != method: + raise RuntimeError( + "Cooler's multiprocessing context has already been set with " + "a different start method. Clear before resetting." + ) + return _ctx + + +def get_mp_lock() -> mp.synchronize.Lock: + """Get a global multiprocessing lock. + + The lock is initialized on the first call and reused thereafter until + cleared. If the global context has not already been set, the default + context is created. + + Returns + ------- + lock : mp.synchronize.Lock + + See also + -------- + clear_mp_lock + + Notes + ----- + Consider two reasons for using a lock: + + (1) Prevent a concurrent process from opening an HDF5 file while the same + file is open for writing. In order for reading processes to obtain the correct + state, make sure the writing process finishes writing (flushes its buffers and + actually closes the file) before reading processes attempt to open it. + This explicit synchronization shouldn't be necessary if using the file in + SWMR mode. + + See also: + * + * + + (2) Synchronize file access when opened before a fork(). Fork-based (Unix) + multiprocessing and concurrent reading are compatible as long as the fork + happens before the child processes open the file. If an HDF5 file is already + open before forking, the child processes inherit the same global HDF5 state, + which leads to a race condition that causes simultaneous access to fail. One + can either use a lock to prevent the race condition, or close and re-open the + file in the workers after the fork. + + See also: + * + * . + """ + global _lock + + if _lock is None: + ctx = get_mp_context() + _lock = ctx.Lock() + + return _lock + + +def clear_mp_context() -> None: + """Clear the global multiprocessing context and lock.""" + global _ctx, _lock + if _ctx is not None: + _ctx = None + if _lock is not None: + _lock = None def apply_pipeline( @@ -189,10 +261,7 @@ def prepare(self, func: Callable[[Any], Any]) -> MultiplexDataPipe: return self def pipe( - self, - func: Callable[[Any], Any] | Callable[[Any, Any], Any], - *args, - **kwargs + self, func: Callable[[Any], Any] | Callable[[Any, Any], Any], *args, **kwargs ) -> MultiplexDataPipe: """ Append new task(s) to the pipeline @@ -230,10 +299,7 @@ def run(self) -> Iterable[Any]: return self.map(pipeline, self.keys) def gather( - self, - combine: Callable[[Iterable], Sequence] = list, - *args, - **kwargs + self, combine: Callable[[Iterable], Sequence] = list, *args, **kwargs ) -> Sequence[Any]: """ Run the pipeline and gather outputs @@ -291,6 +357,7 @@ def __call__(self, span: tuple[int, int]) -> dict[str, Any]: chunk = {} try: if self.use_lock: + lock = get_mp_lock() lock.acquire() with self.cooler.open("r") as grp: if self.include_chroms: @@ -309,12 +376,8 @@ def split( map: MapFunctor = map, chunksize: int = 10_000_000, spans: Iterable[tuple[int, int]] | None = None, - **kwargs + **kwargs, ) -> MultiplexDataPipe: if spans is None: spans = partition(0, int(clr.info["nnz"]), chunksize) - return MultiplexDataPipe( - get=chunkgetter(clr, **kwargs), - keys=spans, - map=map - ) + return MultiplexDataPipe(get=chunkgetter(clr, **kwargs), keys=spans, map=map) From 9c6232377bbf5a3eb0936c4f86778987277e4d58 Mon Sep 17 00:00:00 2001 From: Nezar Abdennur Date: Sat, 11 Jan 2025 03:24:45 -0500 Subject: [PATCH 3/3] Pass locks explicitly from caller --- src/cooler/_balance.py | 26 ++++++------- src/cooler/_reduce.py | 79 ++++++++++++++++----------------------- src/cooler/cli/balance.py | 1 - src/cooler/cli/coarsen.py | 37 +++++++++++------- src/cooler/cli/zoomify.py | 34 +++++++++++------ src/cooler/parallel.py | 25 +++++++------ tests/test_parallel.py | 3 +- 7 files changed, 108 insertions(+), 97 deletions(-) diff --git a/src/cooler/_balance.py b/src/cooler/_balance.py index 0dc69c9..a001c3d 100644 --- a/src/cooler/_balance.py +++ b/src/cooler/_balance.py @@ -3,7 +3,7 @@ import warnings from functools import partial from operator import add -from typing import Literal +from typing import Any, Literal import numpy as np @@ -79,14 +79,14 @@ def _balance_genomewide( tol, max_iters, rescale_marginals, - use_lock, + lock, ): scale = 1.0 n_bins = len(bias) for _ in range(max_iters): marg = ( - split(clr, spans=spans, map=map, use_lock=use_lock) + split(clr, spans=spans, map=map, lock=lock) .prepare(_init) .pipe(filters) .pipe(_timesouterproduct, bias) @@ -134,7 +134,7 @@ def _balance_cisonly( tol, max_iters, rescale_marginals, - use_lock, + lock, ): chroms = clr.chroms()["name"][:] chrom_ids = np.arange(len(clr.chroms())) @@ -153,7 +153,7 @@ def _balance_cisonly( var = np.nan for _ in range(max_iters): marg = ( - split(clr, spans=spans, map=map, use_lock=use_lock) + split(clr, spans=spans, map=map, lock=lock) .prepare(_init) .pipe(filters) .pipe(_timesouterproduct, bias) @@ -206,7 +206,7 @@ def _balance_transonly( tol, max_iters, rescale_marginals, - use_lock, + lock, ): scale = 1.0 n_bins = len(bias) @@ -221,7 +221,7 @@ def _balance_transonly( for _ in range(max_iters): marg = ( - split(clr, spans=spans, map=map, use_lock=use_lock) + split(clr, spans=spans, map=map, lock=lock) .prepare(_init) .pipe(filters) .pipe(_zero_cis) @@ -276,7 +276,7 @@ def balance_cooler( max_iters: int = 200, chunksize: int = 10_000_000, map: MapFunctor = map, - use_lock: bool = False, + lock: Any | None = None, store: bool = False, store_name: str = "weight", ) -> tuple[np.ndarray, dict]: @@ -375,7 +375,7 @@ def balance_cooler( if min_nnz > 0: filters = [_binarize, *base_filters] marg_nnz = ( - split(clr, spans=spans, map=map, use_lock=use_lock) + split(clr, spans=spans, map=map, lock=lock) .prepare(_init) .pipe(filters) .pipe(_marginalize) @@ -385,7 +385,7 @@ def balance_cooler( filters = base_filters marg = ( - split(clr, spans=spans, map=map, use_lock=use_lock) + split(clr, spans=spans, map=map, lock=lock) .prepare(_init) .pipe(filters) .pipe(_marginalize) @@ -424,7 +424,7 @@ def balance_cooler( tol, max_iters, rescale_marginals, - use_lock, + lock, ) elif trans_only: bias, scale, var = _balance_transonly( @@ -437,7 +437,7 @@ def balance_cooler( tol, max_iters, rescale_marginals, - use_lock, + lock, ) else: bias, scale, var = _balance_genomewide( @@ -450,7 +450,7 @@ def balance_cooler( tol, max_iters, rescale_marginals, - use_lock, + lock, ) stats = { diff --git a/src/cooler/_reduce.py b/src/cooler/_reduce.py index bcb475d..fcd8fbc 100644 --- a/src/cooler/_reduce.py +++ b/src/cooler/_reduce.py @@ -16,7 +16,6 @@ from ._version import __format_version_mcool__ from .api import Cooler from .create import ContactBinner, create -from .parallel import get_mp_context, get_mp_lock from .util import GenomeSegmentation, parse_cooler_uri __all__ = ["coarsen_cooler", "merge_coolers", "zoomify_cooler"] @@ -516,8 +515,10 @@ def __init__( agg: dict[str, Any] | None, batchsize: int, map: MapFunctor = map, + lock=None, ): self._map = map + self.lock = lock self.source_uri = source_uri self.batchsize = batchsize @@ -582,7 +583,7 @@ def _each(group): .reset_index(drop=True) ) - def _aggregate(self, span: tuple[int, int]) -> pd.DataFrame: + def aggregate(self, span: tuple[int, int]) -> pd.DataFrame: lo, hi = span clr = Cooler(self.source_uri) @@ -622,26 +623,20 @@ def _aggregate(self, span: tuple[int, int]) -> pd.DataFrame: .reset_index() ) - def aggregate(self, span: tuple[int, int]) -> pd.DataFrame: - try: - chunk = self._aggregate(span) - except MemoryError as e: # pragma: no cover - raise RuntimeError(str(e)) from e - return chunk - def __iter__(self) -> Iterator[dict[str, np.ndarray]]: - # Distribute batches of `batchsize` pixel spans at once. + # Distribute consecutive pixel spans across the workers in batches. + # In the single-process case, each batch is one pixel span. + # With n processes, each batch is n pixel spans - one span per process. batchsize = self.batchsize spans = list(zip(self.edges[:-1], self.edges[1:])) for i in range(0, len(spans), batchsize): try: - if batchsize > 1: - lock = get_mp_lock() - lock.acquire() + if batchsize > 1 and self.lock is not None: + self.lock.acquire() results = self._map(self.aggregate, spans[i : i + batchsize]) finally: - if batchsize > 1: - lock.release() + if batchsize > 1 and self.lock is not None: + self.lock.release() for df in results: yield {k: v.values for k, v in df.items()} @@ -655,6 +650,7 @@ def coarsen_cooler( columns: list[str] | None = None, dtypes: dict[str, Any] | None = None, agg: dict[str, Any] | None = None, + map: MapFunctor = map, **kwargs, ) -> None: """ @@ -719,40 +715,29 @@ def coarsen_cooler( else: dtypes.setdefault(col, input_dtypes[col]) - try: - # Note: fork before opening to prevent inconsistent global HDF5 state - if nproc > 1: - ctx = get_mp_context() - lock = get_mp_lock() - pool = ctx.Pool(nproc) - kwargs.setdefault("lock", lock) - - iterator = CoolerCoarsener( - base_uri, - factor, - chunksize, - columns=columns, - agg=agg, - batchsize=nproc, - map=pool.map if nproc > 1 else map, - ) - - new_bins = iterator.new_bins + iterator = CoolerCoarsener( + base_uri, + factor, + chunksize, + columns=columns, + agg=agg, + batchsize=nproc, + map=map, + lock=kwargs.get("lock", None), + ) - kwargs.setdefault("append", True) + new_bins = iterator.new_bins - create( - output_uri, - new_bins, - iterator, - dtypes=dtypes, - symmetric_upper=clr.storage_mode == "symmetric-upper", - **kwargs, - ) + kwargs.setdefault("append", True) - finally: - if nproc > 1: - pool.close() + create( + output_uri, + new_bins, + iterator, + dtypes=dtypes, + symmetric_upper=clr.storage_mode == "symmetric-upper", + **kwargs, + ) def zoomify_cooler( @@ -764,6 +749,7 @@ def zoomify_cooler( columns: list[str] | None = None, dtypes: dict[str, Any] | None = None, agg: dict[str, Any] | None = None, + map: MapFunctor = map, **kwargs, ) -> None: """ @@ -874,6 +860,7 @@ def zoomify_cooler( dtypes=dtypes, agg=agg, mode="r+", + map=map, **kwargs, ) diff --git a/src/cooler/cli/balance.py b/src/cooler/cli/balance.py index 3924b17..2654dd0 100755 --- a/src/cooler/cli/balance.py +++ b/src/cooler/cli/balance.py @@ -255,7 +255,6 @@ def balance( max_iters=max_iters, ignore_diags=ignore_diags, rescale_marginals=True, - use_lock=False, map=map_, ) diff --git a/src/cooler/cli/coarsen.py b/src/cooler/cli/coarsen.py index ed81a24..74b68cb 100644 --- a/src/cooler/cli/coarsen.py +++ b/src/cooler/cli/coarsen.py @@ -3,7 +3,7 @@ import click from .._reduce import coarsen_cooler -from ..parallel import get_mp_lock +from ..parallel import get_mp_context, get_mp_lock from ..util import parse_cooler_uri from . import cli from ._util import parse_field_param @@ -81,15 +81,26 @@ def coarsen(cool_uri, factor, nproc, chunksize, field, out, append): # Default aggregation. Dtype will be inferred. columns, dtypes, agg = ["count"], None, None - coarsen_cooler( - cool_uri, - out, - factor, - chunksize=chunksize, - nproc=nproc, - columns=columns, - dtypes=dtypes, - agg=agg, - lock=get_mp_lock() if same_file else None, - mode="a" if append else "w", - ) + try: + map_func = map + if nproc > 1: + ctx = get_mp_context() + pool = ctx.Pool(nproc) + map_func = pool.map + + coarsen_cooler( + cool_uri, + out, + factor, + chunksize=chunksize, + columns=columns, + dtypes=dtypes, + agg=agg, + nproc=nproc, + map=map_func, + lock=get_mp_lock() if same_file else None, + mode="a" if append else "w", + ) + finally: + if nproc > 1: + pool.close() diff --git a/src/cooler/cli/zoomify.py b/src/cooler/cli/zoomify.py index 0995f42..1ae9ccd 100644 --- a/src/cooler/cli/zoomify.py +++ b/src/cooler/cli/zoomify.py @@ -11,7 +11,7 @@ preferred_sequence, zoomify_cooler, ) -from ..parallel import get_mp_lock +from ..parallel import get_mp_context, get_mp_lock from ..util import parse_cooler_uri from . import cli, get_logger from ._util import parse_field_param @@ -230,18 +230,28 @@ def zoomify( columns, dtypes, agg = ["count"], None, None # logger.info("Applying resolutions {}".format(resolutions)) + try: + map_func = map + if nproc > 1: + ctx = get_mp_context() + pool = ctx.Pool(nproc) + map_func = pool.map - zoomify_cooler( - [cool_uri, *list(base_uri)], - outfile, - resolutions, - chunksize, - nproc=nproc, - lock=get_mp_lock(), - columns=columns, - dtypes=dtypes, - agg=agg, - ) + zoomify_cooler( + [cool_uri, *list(base_uri)], + outfile, + resolutions, + chunksize, + nproc=nproc, + map=map_func, + lock=get_mp_lock(), + columns=columns, + dtypes=dtypes, + agg=agg, + ) + finally: + if nproc > 1: + pool.close() if balance: invoke_balance(balance_args, resolutions, outfile) diff --git a/src/cooler/parallel.py b/src/cooler/parallel.py index acec1b8..5215376 100644 --- a/src/cooler/parallel.py +++ b/src/cooler/parallel.py @@ -228,10 +228,8 @@ def __copy__(self) -> MultiplexDataPipe: other._prepare = self._prepare return other - def __reduce__(self) -> dict: - d = self.__dict__.copy() - d.pop("map", None) - return d + def __reduce__(self) -> tuple[Callable, tuple]: + return self.__class__, (self.get, self.keys, None) def __iter__(self) -> Iterator[Any]: return iter(self.run()) @@ -345,20 +343,25 @@ def __init__( clr: Cooler, include_chroms: bool = False, include_bins: bool = True, - use_lock: bool = False, + lock: mp.synchronize.Lock | None = None, ): self.cooler = clr self.include_chroms = include_chroms self.include_bins = include_bins - self.use_lock = use_lock + self.lock = lock + + def __reduce__(self) -> tuple[Callable, tuple]: + return ( + self.__class__, + (self.cooler, self.include_chroms, self.include_bins, None) + ) def __call__(self, span: tuple[int, int]) -> dict[str, Any]: lo, hi = span chunk = {} try: - if self.use_lock: - lock = get_mp_lock() - lock.acquire() + if self.lock: + self.lock.acquire() with self.cooler.open("r") as grp: if self.include_chroms: chunk["chroms"] = get(grp["chroms"], as_dict=True) @@ -366,8 +369,8 @@ def __call__(self, span: tuple[int, int]) -> dict[str, Any]: chunk["bins"] = get(grp["bins"], as_dict=True) chunk["pixels"] = get(grp["pixels"], lo, hi, as_dict=True) finally: - if self.use_lock: - lock.release() + if self.lock: + self.lock.release() return chunk diff --git a/tests/test_parallel.py b/tests/test_parallel.py index a8e0d43..9e5b40c 100644 --- a/tests/test_parallel.py +++ b/tests/test_parallel.py @@ -1,4 +1,5 @@ import os.path as op +from multiprocessing import Lock from operator import add import cooler @@ -55,7 +56,7 @@ def test_chunkgetter(): assert "bins" in chunk assert "pixels" in chunk - getter = parallel.chunkgetter(clr, use_lock=True) + getter = parallel.chunkgetter(clr, lock=Lock()) chunk = getter((lo, hi)) assert isinstance(chunk, dict) assert len(chunk["pixels"]["bin1_id"]) == 2