diff --git a/activitysim/cli/create.py b/activitysim/cli/create.py index 7211471d7..b810cdcd2 100644 --- a/activitysim/cli/create.py +++ b/activitysim/cli/create.py @@ -265,7 +265,7 @@ def download_asset( url: str, target_path: str, sha256: str = None, - link: bool = True, + link: bool | str | Path = True, base_path: str | None = None, unpack: str | None = None, ): diff --git a/activitysim/core/workflow/runner.py b/activitysim/core/workflow/runner.py index 8ba37a314..8eba53cbe 100644 --- a/activitysim/core/workflow/runner.py +++ b/activitysim/core/workflow/runner.py @@ -3,16 +3,14 @@ import logging import multiprocessing import time -import warnings +from collections.abc import Callable, Iterable from datetime import timedelta -from typing import Callable, Iterable from activitysim.core import tracing from activitysim.core.exceptions import DuplicateWorkflowNameError from activitysim.core.workflow.accessor import FromState, StateAccessor from activitysim.core.workflow.checkpoint import ( CHECKPOINT_NAME, - CHECKPOINT_TABLE_NAME, FINAL_CHECKPOINT_NAME, LAST_CHECKPOINT, ) @@ -130,7 +128,6 @@ def __call__(self, models, resume_after=None, memory_sidecar_process=None): _resume_after = resume_after if _resume_after: - if ( _resume_after != self._obj.checkpoint.last_checkpoint_name() or self._obj.uncheckpointed_table_names() @@ -372,9 +369,42 @@ def by_name(self, model_name, **kwargs): f"##### skipping {self.step_name} checkpoint for {model_name}" ) - def all(self, resume_after=LAST_CHECKPOINT, memory_sidecar_process=None): + def all( + self, + resume_after=LAST_CHECKPOINT, + memory_sidecar_process=None, + config_logger=True, + filter_warnings=True, + ): + t0 = time.time() try: - t0 = tracing.print_elapsed_time() + if "preload_injectables" not in self._obj: + # register abm steps and other abm-specific injectables + from activitysim import abm # noqa: F401 + + if config_logger: + self._obj.logging.config_logger() + + if ( + memory_sidecar_process is None + and self._obj.settings.memory_profile + and not self._obj.settings.multiprocess + ): + from activitysim.core.memory_sidecar import MemorySidecar + + # Memory sidecar is only useful for single process runs + # multiprocess runs log memory usage without blocking in the controlling process. + mem_prof_log = self._obj.get_log_file_path("memory_profile.csv") + memory_sidecar_process = MemorySidecar(mem_prof_log) + local_memory_sidecar_process = memory_sidecar_process + else: + local_memory_sidecar_process = None + + from activitysim.core import config + + if filter_warnings: + config.filter_warnings(self._obj) + logging.captureWarnings(capture=True) if self._obj.settings.multiprocess: logger.info("run multiprocess simulation") @@ -395,11 +425,16 @@ def all(self, resume_after=LAST_CHECKPOINT, memory_sidecar_process=None): memory_sidecar_process=memory_sidecar_process, ) + if local_memory_sidecar_process: + local_memory_sidecar_process.stop() + except Exception: # log time until error and the error traceback tracing.print_elapsed_time("all models until this error", t0) logger.exception("activitysim run encountered an unrecoverable error") raise + else: + tracing.print_elapsed_time("all models completed", t0) def _log_elapsed_time(self, msg, t0=None, level=25): t1 = time.time() diff --git a/activitysim/examples/external.py b/activitysim/examples/external.py index 447cbe0ce..d592d403c 100644 --- a/activitysim/examples/external.py +++ b/activitysim/examples/external.py @@ -257,7 +257,7 @@ def download_external_example( cache_dir = cache_dir.joinpath(name) cache_dir.mkdir(parents=True, exist_ok=True) - working_dir = Path(working_dir) + working_dir = Path(working_dir).absolute() working_dir.mkdir(parents=True, exist_ok=True) common_prefix = "." @@ -309,6 +309,8 @@ def download_external_example( raise ValueError( f"unknown archive file type {''.join(target_path.suffixes)}" ) + else: + working_subdir = working_dir.joinpath(name) # download assets if any: if assets: