Skip to content

Commit

Permalink
Merge pull request mmcdermott#255 from mmcdermott/28_filelock_library
Browse files Browse the repository at this point in the history
Use the `filelock` library for file locking, not a custom solution. Closes mmcdermott#28
  • Loading branch information
mmcdermott authored Feb 20, 2025
2 parents 3afad1a + 2cd60ca commit 41ee586
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 190 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ classifiers = [
"Operating System :: OS Independent",
]
dependencies = [
"polars~=1.14", "pyarrow", "nested_ragged_tensors>=0.0.8", "hydra-core", "numpy", "meds==0.3.3",
"polars~=1.14", "pyarrow", "nested_ragged_tensors>=0.0.8", "hydra-core", "numpy", "meds==0.3.3", "filelock",
]

[tool.setuptools_scm]
Expand Down
213 changes: 24 additions & 189 deletions src/MEDS_transforms/mapreduce/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import TypeVar

import pyarrow.parquet as pq
from filelock import FileLock, Timeout
from omegaconf import DictConfig

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -50,77 +51,6 @@ def is_complete_parquet_file(fp: Path) -> bool:
return False


def get_earliest_lock(cache_directory: Path) -> datetime | None:
"""Returns the earliest start time of any lock file present in a cache directory, or None if none exist.
Args:
cache_directory: The cache directory to check for the presence of a lock file.
Examples:
>>> import tempfile
>>> directory = tempfile.TemporaryDirectory()
>>> root = Path(directory.name)
>>> empty_directory = root / "cache_empty"
>>> empty_directory.mkdir(exist_ok=True, parents=True)
>>> cache_directory = root / "cache_with_locks"
>>> locks_directory = cache_directory / "locks"
>>> locks_directory.mkdir(exist_ok=True, parents=True)
>>> time_1 = datetime(2021, 1, 1)
>>> time_1_str = time_1.strftime(LOCK_TIME_FMT) # "2021-01-01T00:00:00.000000"
>>> lock_fp_1 = locks_directory / f"{time_1_str}.json"
>>> _ = lock_fp_1.write_text(json.dumps({"start": time_1_str}))
>>> time_2 = datetime(2021, 1, 2, 3, 4, 5)
>>> time_2_str = time_2.strftime(LOCK_TIME_FMT) # "2021-01-02T03:04:05.000000"
>>> lock_fp_2 = locks_directory / f"{time_2_str}.json"
>>> _ = lock_fp_2.write_text(json.dumps({"start": time_2_str}))
>>> get_earliest_lock(cache_directory)
datetime.datetime(2021, 1, 1, 0, 0)
>>> get_earliest_lock(empty_directory) is None
True
>>> lock_fp_1.unlink()
>>> get_earliest_lock(cache_directory)
datetime.datetime(2021, 1, 2, 3, 4, 5)
>>> directory.cleanup()
"""
locks_directory = cache_directory / "locks"

lock_times = [
datetime.strptime(json.loads(lock_fp.read_text())["start"], LOCK_TIME_FMT)
for lock_fp in locks_directory.glob("*.json")
]

return min(lock_times) if lock_times else None


def register_lock(cache_directory: Path) -> tuple[datetime, Path]:
"""Register a lock file in a cache directory.
Args:
cache_directory: The cache directory to register a lock file in.
Examples:
>>> import tempfile
>>> directory = tempfile.TemporaryDirectory()
>>> root = Path(directory.name)
>>> cache_directory = root / "cache_with_locks"
>>> lock_time, lock_fp = register_lock(cache_directory)
>>> assert (datetime.now() - lock_time).total_seconds() < 1, "Lock time should be ~ now."
>>> lock_fp.is_file()
True
>>> lock_fp.read_text() == f'{{"start": "{lock_time.strftime(LOCK_TIME_FMT)}"}}'
True
>>> directory.cleanup()
"""

lock_directory = cache_directory / "locks"
lock_directory.mkdir(exist_ok=True, parents=True)

lock_time = datetime.now()
lock_fp = lock_directory / f"{lock_time.strftime(LOCK_TIME_FMT)}.json"
lock_fp.write_text(json.dumps({"start": lock_time.strftime(LOCK_TIME_FMT)}))
return lock_time, lock_fp


def default_file_checker(fp: Path) -> bool:
"""Check if a file exists and is complete."""
if fp.suffix == ".parquet":
Expand All @@ -136,7 +66,6 @@ def rwlock_wrap(
compute_fn: Callable[[DF_T], DF_T],
do_overwrite: bool = False,
out_fp_checker: Callable[[Path], bool] = default_file_checker,
register_lock_fn: Callable[[Path], tuple[datetime, Path]] = register_lock, # For dependency injection
) -> bool:
"""Wrap a series of file-in file-out map transformations on a dataframe with caching and locking.
Expand Down Expand Up @@ -201,91 +130,19 @@ def rwlock_wrap(
...
>>> assert not out_fp.is_file() # Out file should not be created when the process crashes
If we check the locks during computation, one should be present
>>> cache_directory = root / f".output.csv_cache"
>>> lock_dir = cache_directory / "locks"
>>> assert not list(lock_dir.iterdir()), "Lock dir starts empty"
>>> def lock_dir_checker_fn(df: pl.DataFrame) -> pl.DataFrame:
... print(f"Lock dir empty? {not (list(lock_dir.iterdir()))}")
... return df
>>> result_computed = rwlock_wrap(in_fp, out_fp, read_fn, write_fn, lock_dir_checker_fn)
Lock dir empty? False
>>> result_computed
True
>>> assert not list(lock_dir.iterdir()), "Lock dir should be empty again"
>>> out_fp.unlink()
If the lock file already exists, the function will not do anything
>>> def compute_fn(df: pl.DataFrame) -> pl.DataFrame:
... return df.with_columns(pl.col("c") * 2).filter(pl.col("c") > 4)
>>> out_fp = root / "output.csv"
>>> lock_fp = root / "output.csv.lock"
>>> with FileLock(str(lock_fp)):
... result_computed = rwlock_wrap(in_fp, out_fp, read_fn, write_fn, compute_fn)
... assert not result_computed
If we register a lock before we run, the process won't actually compute
>>> compute_fn = lambda df: df
>>> lock_time, lock_fp = register_lock(cache_directory)
The lock file will be removed after successful processing.
>>> result_computed = rwlock_wrap(in_fp, out_fp, read_fn, write_fn, compute_fn)
>>> result_computed
False
>>> len(list(lock_dir.iterdir())) # The lock file at lock_fp should still exist
1
>>> lock_fp.unlink()
>>> assert not list(lock_dir.iterdir()), "Lock dir should be empty again"
If two processes collide when writing locks during lock registration before reading, the one that
writes a lock with an earlier timestamp wins and the later one does not read:
>>> def read_fn_and_print(in_fp: Path) -> pl.DataFrame:
... print("Reading!")
... return read_fn(in_fp)
>>> def register_lock_with_conflict_fntr(early: bool) -> callable:
... fake_lock_time = datetime(2021, 1, 1, 0, 0, 0) if early else datetime(5000, 1, 2, 0, 0, 0)
... def fn(cache_directory: Path) -> tuple[datetime, Path]:
... lock_fp = cache_directory / "locks" / f"{fake_lock_time.strftime(LOCK_TIME_FMT)}.json"
... lock_fp.write_text(json.dumps({"start": fake_lock_time.strftime(LOCK_TIME_FMT)}))
... return register_lock(cache_directory)
... return fn
>>> result_computed = rwlock_wrap(
... in_fp, out_fp, read_fn_and_print, write_fn, compute_fn,
... register_lock_fn=register_lock_with_conflict_fntr(early=True)
... )
>>> result_computed
False
>>> len(list(lock_dir.iterdir())) # The lock file added during the registration should still exist.
1
>>> next(lock_dir.iterdir()).unlink()
>>> result_computed = rwlock_wrap(
... in_fp, out_fp, read_fn_and_print, write_fn, compute_fn,
... register_lock_fn=register_lock_with_conflict_fntr(early=False)
... )
Reading!
>>> result_computed
True
>>> len(list(lock_dir.iterdir())) # The lock file added during the registration should still exist.
1
>>> next(lock_dir.iterdir()).unlink()
>>> out_fp.unlink()
If two processes collide when writing locks during reading, the one that writes a lock with an earlier
timestamp wins:
>>> def read_fn_with_lock_fntr(early: bool) -> callable:
... fake_lock_time = datetime(2021, 1, 1, 0, 0, 0) if early else datetime(5000, 1, 2, 0, 0, 0)
... def fn(in_fp: Path) -> pl.DataFrame:
... print("Reading!")
... df = read_fn(in_fp)
... lock_fp = lock_dir / f"{fake_lock_time.strftime(LOCK_TIME_FMT)}.json"
... lock_fp.write_text(json.dumps({"start": fake_lock_time.strftime(LOCK_TIME_FMT)}))
... return df
... return fn
>>> result_computed = rwlock_wrap(in_fp, out_fp, read_fn_with_lock_fntr(True), write_fn, compute_fn)
Reading!
>>> result_computed
False
>>> len(list(lock_dir.iterdir())) # The lock file added during the read should still exist.
1
>>> next(lock_dir.iterdir()).unlink()
>>> result_computed = rwlock_wrap(in_fp, out_fp, read_fn_with_lock_fntr(False), write_fn, compute_fn)
Reading!
>>> result_computed
True
>>> len(list(lock_dir.iterdir())) # The lock file added during the read should still exist.
1
>>> next(lock_dir.iterdir()).unlink()
>>> out_fp.unlink()
>>> directory.cleanup()
>>> assert result_computed
>>> assert not lock_fp.exists()
"""

if out_fp_checker(out_fp):
Expand All @@ -296,49 +153,27 @@ def rwlock_wrap(
logger.info(f"{out_fp} exists; returning.")
return False

cache_directory = out_fp.parent / f".{out_fp.parts[-1]}_cache"
cache_directory.mkdir(exist_ok=True, parents=True)

earliest_lock_time = get_earliest_lock(cache_directory)
if earliest_lock_time is not None:
logger.info(f"{out_fp} is in progress as of {earliest_lock_time}. Returning.")
return False

st_time, lock_fp = register_lock_fn(cache_directory)

logger.info(f"Registered lock at {st_time}. Double checking no earlier locks have been registered.")
earliest_lock_time = get_earliest_lock(cache_directory)
if earliest_lock_time < st_time:
logger.info(f"Earlier lock found at {earliest_lock_time}. Deleting current lock and returning.")
lock_fp.unlink()
return False

logger.info(f"Reading input dataframe from {in_fp}")
df = read_fn(in_fp)
logger.info("Read dataset")

earliest_lock_time = get_earliest_lock(cache_directory)
if earliest_lock_time < st_time:
logger.info(
f"Earlier lock found post read at {earliest_lock_time}. Deleting current lock and returning."
)
lock_fp.unlink()
lock_fp = out_fp.with_suffix(f"{out_fp.suffix}.lock")
lock = FileLock(str(lock_fp))
try:
lock.acquire(timeout=0)
except Timeout:
logger.info(f"Lock found at {lock_fp}. Returning.")
return False

try:
st_time = datetime.now()
logger.info(f"Reading input dataframe from {in_fp}")
df = read_fn(in_fp)
logger.info("Read dataset")
df = compute_fn(df)
logger.info(f"Writing final output to {out_fp}")
write_fn(df, out_fp)
logger.info(f"Succeeded in {datetime.now() - st_time}")
logger.info(f"Leaving cache directory {cache_directory}, but clearing lock at {lock_fp}")
lock_fp.unlink()

return True

except Exception as e:
logger.warning(f"Clearing lock due to Exception {e} at {lock_fp} after {datetime.now() - st_time}")
finally:
lock.release()
lock_fp.unlink()
raise e


def shuffle_shards(shards: list[str], cfg: DictConfig) -> list[str]:
Expand Down

0 comments on commit 41ee586

Please sign in to comment.