From 01e1d881caf23b7f8b3c03fda1e38df34516b844 Mon Sep 17 00:00:00 2001 From: Shehan M Parmar Date: Thu, 30 Jan 2025 15:16:35 -0500 Subject: [PATCH 1/7] created DynamicOpenMMFlowMaker; moved BaseOpenMMMaker import outside of TYPE_CHECKING to use as callable in default_factory --- src/atomate2/openmm/flows/core.py | 43 ++++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/src/atomate2/openmm/flows/core.py b/src/atomate2/openmm/flows/core.py index 8b226fc95e..9a74113ac6 100644 --- a/src/atomate2/openmm/flows/core.py +++ b/src/atomate2/openmm/flows/core.py @@ -11,16 +11,13 @@ from jobflow import Flow, Job, Maker, Response from monty.json import MontyDecoder, MontyEncoder -from atomate2.openmm.jobs.base import openmm_job +from atomate2.openmm.jobs.base import BaseOpenMMMaker, openmm_job from atomate2.openmm.jobs.core import NVTMaker, TempChangeMaker from atomate2.openmm.utils import create_list_summing_to if TYPE_CHECKING: from openff.interchange import Interchange - from atomate2.openmm.jobs.base import BaseOpenMMMaker - - def _get_calcs_reversed(job: Job | Flow) -> list[Calculation | list]: """Unwrap a nested list of calcs from jobs or flows.""" if isinstance(job, Flow): @@ -230,3 +227,41 @@ def anneal_flow( makers=[raise_temp_maker, nvt_maker, lower_temp_maker], collect_outputs=False, ) + + +@dataclass +class DynamicOpenMMFlowMaker(Maker): + """Run a dynamic equlibration or production simulation. + + Create a dynamic flow out of an existing OpenMM simulation + job or a linear sequence of linked jobs, i.e., an OpenMM + flow. + + Attributes + ---------- + name : str + The name of the dynamic OpenMM job or flow. Default is the name + of the inherited maker name with "dynamic" preprended. + tags : list[str] + Tags to apply to the final job. Will only be applied if collect_jobs is True. + maker: Union[BaseOpenMMMaker, OpenMMFlowMaker] + A list of makers to string together. + collect_outputs : bool + If True, a final job is added that collects all jobs into a single + task document. + """ + + name: str = field(default=None) + maker: BaseOpenMMMaker | OpenMMFlowMaker = field(default_factory=BaseOpenMMMaker) + + def __post_init__(self) -> None: + """Post init formatting of arguments.""" + if self.name is None: + self.name = f"dynamic {self.maker.name}" + + def make( + self, + interchange: Interchange | OpenMMInterchange | str, + prev_dir: str | None = None, + ) -> Flow: + pass From 7cd990639a657e815a068635f24b2de9216f692e Mon Sep 17 00:00:00 2001 From: Shehan M Parmar Date: Thu, 30 Jan 2025 15:46:06 -0500 Subject: [PATCH 2/7] use of lambda function is more appropriate for default_factory to avoid shared mutable defaults --- src/atomate2/openmm/flows/core.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/atomate2/openmm/flows/core.py b/src/atomate2/openmm/flows/core.py index 9a74113ac6..b134a09a61 100644 --- a/src/atomate2/openmm/flows/core.py +++ b/src/atomate2/openmm/flows/core.py @@ -77,7 +77,7 @@ class OpenMMFlowMaker(Maker): The name of the production job. Default is "production". tags : list[str] Tags to apply to the final job. Will only be applied if collect_jobs is True. - makers: list[BaseOpenMMMaker] + makers: Union[BaseOpenMMMaker, OpenMMFlowMaker] A list of makers to string together. collect_outputs : bool If True, a final job is added that collects all jobs into a single @@ -252,7 +252,9 @@ class DynamicOpenMMFlowMaker(Maker): """ name: str = field(default=None) - maker: BaseOpenMMMaker | OpenMMFlowMaker = field(default_factory=BaseOpenMMMaker) + maker: BaseOpenMMMaker | OpenMMFlowMaker = field( + default_factory=lambda: BaseOpenMMMaker() + ) def __post_init__(self) -> None: """Post init formatting of arguments.""" From 2cf8a9517c79af6d1d851dfae76012e3053b7f1f Mon Sep 17 00:00:00 2001 From: Shehan M Parmar Date: Sat, 1 Feb 2025 17:57:35 -0500 Subject: [PATCH 3/7] added dynamic flow logic and apply_flow_control instace classes; added placeholder should_continue function --- src/atomate2/openmm/flows/core.py | 136 +++++++++++++++++++++++++++++- 1 file changed, 132 insertions(+), 4 deletions(-) diff --git a/src/atomate2/openmm/flows/core.py b/src/atomate2/openmm/flows/core.py index b134a09a61..23d3c24399 100644 --- a/src/atomate2/openmm/flows/core.py +++ b/src/atomate2/openmm/flows/core.py @@ -63,6 +63,54 @@ def collect_outputs( return Response(output=task_doc) +@openmm_job +def dynamic_collect_outputs( + all_docs: list[OpenMMTaskDocument] +) -> list[OpenMMTaskDocument]: + """Wrapper to run "collect_outputs" jobs dynamically """ + if not all_docs: + return [] + + doc = all_docs.pop(0) + print(f"collecting outputs for:\n{doc}") + prev_dir = doc.dir_name + tags = None # need to pass this in + job_uuids = doc.job.uuid + calcs_reversed = doc.calcs_reversed + task_type = "collect" + collect_job = collect_outputs( + prev_dir=prev_dir, + tags=tags or None, + job_uuids=job_uuids, + calcs_reversed=calcs_reversed, + task_type=task_type, + ) + if all_docs: + next_collect_job = dynamic_collect_outputs(all_docs=all_docs) + flow = Flow(jobs=[collect_job, next_collect_job],output=next_job.output) + return Response(replace=flow) + else: + return Response(output=[doc], addition=collect_job) + +def default_should_continue( + task_doc: OpenMMTaskDocument, + stage_index: int, + max_stages: int, +) -> bool: + """Default dynamic flow logic for deciding to continue flow (True) + + This serves as a template for any bespoke "should_continue" functions + written by the user. By default, simulation logic depends on stability + of potential energy as a function of time, dU_dt. + """ + print(f"%%%%% Running stage index {stage_index} %%%%%") + print(f"task_doc={task_doc}") + # TODO: Add PE vs time functionality ... + # TODO: add density vs. time functionality ... + if stage_index < max_stages: + return True + else: + return False @dataclass class OpenMMFlowMaker(Maker): @@ -242,20 +290,29 @@ class DynamicOpenMMFlowMaker(Maker): name : str The name of the dynamic OpenMM job or flow. Default is the name of the inherited maker name with "dynamic" preprended. - tags : list[str] - Tags to apply to the final job. Will only be applied if collect_jobs is True. maker: Union[BaseOpenMMMaker, OpenMMFlowMaker] A list of makers to string together. + max_stages: int + Maximum number of stages to run consecutively before terminating + dynamic flow logic. collect_outputs : bool If True, a final job is added that collects all jobs into a single task document. + should_continue: ... + A function ... Default is _default_should_continue...#TODO """ name: str = field(default=None) maker: BaseOpenMMMaker | OpenMMFlowMaker = field( default_factory=lambda: BaseOpenMMMaker() ) - + max_stages: int = field(default=5) + collect_outputs: bool = True + should_continue: Callable[[OpenMMTaskDocument, int, int], bool] = field( + default_factory=lambda: default_should_continue + ) + stage_task_type: str = "collect" # or default to OpenMMFlowMaker final_task_type? + def __post_init__(self) -> None: """Post init formatting of arguments.""" if self.name is None: @@ -266,4 +323,75 @@ def make( interchange: Interchange | OpenMMInterchange | str, prev_dir: str | None = None, ) -> Flow: - pass + + print(f"inside DOFM: -> {self.maker}") + # Initial segment job (or flow) of entire dynamic flow + stage_n = self.dynamic_flow( + interchange=interchange, + prev_dir=prev_dir, + stage_index=0, # later can make this an input, n, for checkpointing + stage_docs=[], + ) + + return Flow([stage_n], name=self.name, output=stage_n.output) + + def dynamic_flow( + self, + interchange: Interchange | OpenMMInterchange | str, + prev_dir: str | None, + stage_index: int, + stage_docs: list[OpenMMTaskDocument], + ) -> Response: + """Run stage n and dynamically decide to continue or terminate flow.""" + + # `stage` is either a job or a flow + stage = self.maker.make( + interchange=interchange, + prev_dir=prev_dir, + ) + + # stage decision login--> (a) terminate, (b) continue, (c) pause, etc + stage_control = self.apply_flow_control( + stage_doc=stage.output, + stage_index=stage_index, + stage_docs=stage_docs, + ) + + # conditionally run flow until should_continue=False + flow = Flow( + jobs=[stage, stage_control], + output=stage_control.output, + name=f"{self.name} Stage {stage_index}", + ) + return Response(replace=flow) + + def apply_flow_control( + self, + stage_doc: OpenMMTaskDocument, + stage_index: int, + stage_docs: list[OpenMMTaskDocument], + ) -> Response: + + # update "stage" docs + stage_docs = stage_docs + [stage_doc] + + continue_flow = self.should_continue( + stage_doc, + stage_index, + self.max_stages, + ) + + if continue_flow and stage_index < self.max_stages: + next_stage = self.dynamic_flow( + interchange=stage_doc.interchange, + prev_dir=stage_doc.dir_name, + stage_index=stage_index+1, + stage_docs=stage_docs, + ) + return Response(replace=Flow([next_stage], output=next_stage.output)) + + if self.collect_outputs: + print(stage_docs) + dynamic_collection = dynamic_collect_outputs(all_docs=stage_docs) + final_flow = Flow([dynamic_collection], output=dynamic_collection.output) + return None From 4bc4d9082d6744b8a4ac12cde60c6c98caec42b3 Mon Sep 17 00:00:00 2001 From: Shehan M Parmar Date: Mon, 3 Feb 2025 21:12:36 -0500 Subject: [PATCH 4/7] restructured dynamic_flow and removed apply_flow_control --- src/atomate2/openmm/flows/core.py | 254 +++++++++++++++++++++--------- 1 file changed, 182 insertions(+), 72 deletions(-) diff --git a/src/atomate2/openmm/flows/core.py b/src/atomate2/openmm/flows/core.py index 23d3c24399..d32fd360d9 100644 --- a/src/atomate2/openmm/flows/core.py +++ b/src/atomate2/openmm/flows/core.py @@ -3,12 +3,14 @@ from __future__ import annotations import json +import numpy as np +from scipy.signal import savgol_filter from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING from emmet.core.openmm import Calculation, OpenMMInterchange, OpenMMTaskDocument -from jobflow import Flow, Job, Maker, Response +from jobflow import Flow, Job, Maker, Response, job from monty.json import MontyDecoder, MontyEncoder from atomate2.openmm.jobs.base import BaseOpenMMMaker, openmm_job @@ -92,25 +94,88 @@ def dynamic_collect_outputs( else: return Response(output=[doc], addition=collect_job) +@openmm_job def default_should_continue( - task_doc: OpenMMTaskDocument, + task_docs: list[OpenMMTaskDocument], stage_index: int, - max_stages: int, -) -> bool: + max_stages: int, + physical_property: str = "potential_energy", + target: float | None = None, + threshold: float = 1e-3, +) -> Response: """Default dynamic flow logic for deciding to continue flow (True) This serves as a template for any bespoke "should_continue" functions written by the user. By default, simulation logic depends on stability of potential energy as a function of time, dU_dt. """ - print(f"%%%%% Running stage index {stage_index} %%%%%") - print(f"task_doc={task_doc}") - # TODO: Add PE vs time functionality ... - # TODO: add density vs. time functionality ... + print(f"%%%%% Running should_continue for stage index {stage_index} %%%%%") + task_doc = task_docs[-1] + setattr(task_doc, "stage_index", stage_index) + print(f"task_doc.stage_index = {stage_index}") + print(f"there are {len(task_docs)} task_docs") + + potential_energy: list[float] = [] + density: list[float] = [] + for doc in task_docs: + potential_energy.extend(doc.calcs_reversed[0].output.potential_energy) + density.extend(doc.calcs_reversed[0].output.density) + dt = doc.calcs_reversed[0].input.state_interval + print(f"total number of energies: {len(potential_energy)}") + + # toss out first 20% of values + burn_in = int(0.2 * len(potential_energy)) + if physical_property == "density": + values = np.array(density[burn_in:]) + elif physical_property == "potential_energy": + values = np.array(potential_energy[burn_in:]) + + if int(0.2 * len(values)) % 2 == 0: + window_length = max(5, int(0.2 * len(values)) + 1) + else: + window_length = max(5, int(0.2 * len(values))) + print(f"window_length={window_length}") + dValue_dt = savgol_filter(values, window_length, polyorder=3, deriv=1, delta=dt) + decay_rate = np.max(np.abs(dValue_dt)) + avg = np.mean(values) + std = np.std(values) + fluctuations = std / np.abs(avg) + + print(f"checking {physical_property}") + print(f"Mean of last 80% energies: {avg:.5f}") + print(f"Std of last 80% energies: {std:.5f}") + print(f"Fluctuations: {fluctuations:.5f}") + print(f"N^(1/2) ~ {len(values)**0.5}") + print(f"decay_rate: {decay_rate:.5f}") + if target: + delta = np.abs( (avg - target) / target ) + if delta < threshold: + print(f"{physical_property} within {threshold} threshold") + should_continue = False + print(f"should_continue={should_continue}") + else: + print(f"{physical_property} error at {delta*100} percent") + should_continue = True + print(f"should_continue={should_continue}") + elif stage_index > max_stages: + print(f"MAX STAGES REACHED!!") + should_continue=False + print(f"should_continue={should_continue}") + elif decay_rate < threshold: + print(f"EQUILIBRIUM REACHED!!") + should_continue=False + print(f"should_continue={should_continue}") + else: + print(f"decay_rate = {decay_rate}") + should_continue=True + print(f"should_continue={should_continue}") + if stage_index < max_stages: - return True + should_continue=True else: - return False + should_continue=False + setattr(task_doc, "should_continue", should_continue) + return Response(output=task_doc) @dataclass class OpenMMFlowMaker(Maker): @@ -308,10 +373,14 @@ class DynamicOpenMMFlowMaker(Maker): ) max_stages: int = field(default=5) collect_outputs: bool = True - should_continue: Callable[[OpenMMTaskDocument, int, int], bool] = field( + should_continue: Callable[[list[OpenMMTaskDocument], int, int], Response] = field( default_factory=lambda: default_should_continue ) stage_task_type: str = "collect" # or default to OpenMMFlowMaker final_task_type? + + jobs: list = field(default_factory=list) + job_uuids: list = field(default_factory=list) + calcs_reversed: list[Calculation] = field(default_factory=list) def __post_init__(self) -> None: """Post init formatting of arguments.""" @@ -324,74 +393,115 @@ def make( prev_dir: str | None = None, ) -> Flow: - print(f"inside DOFM: -> {self.maker}") - # Initial segment job (or flow) of entire dynamic flow - stage_n = self.dynamic_flow( + # Run initial stage job + stage_job_0 = self.maker.make( interchange=interchange, - prev_dir=prev_dir, - stage_index=0, # later can make this an input, n, for checkpointing - stage_docs=[], + prev_dir=prev_dir, ) + self.jobs.append(stage_job_0) + print(f"stage_job_0->{stage_job_0.name}") + + # collect the uuids and calcs for the final collect job + if isinstance(stage_job_0, Flow): + self.job_uuids.extend(stage_job_0.job_uuids) + else: + self.job_uuids.append(stage_job_0.uuid) + self.calcs_reversed.append(_get_calcs_reversed(stage_job_0)) + print(f"appended stage {self.jobs[-1]}") + print(f"appended job_uuid {self.job_uuids[-1]}") + print(f"appended calc {self.calcs_reversed[-1]}") + + # Determine stage job control logic + control_stage_0 = self.should_continue( + task_docs=[stage_job_0.output], + stage_index=0, + max_stages=self.max_stages, + ) + self.jobs.append(control_stage_0) + print(f"control_stage_0->{control_stage_0}") + + # stage_0 = Flow([stage_job_0, control_stage_0]) + + stage_n = self.dynamic_flow( + prev_stage_index=0, + prev_docs=[control_stage_0.output], + ) + self.jobs.append(stage_n) + + if self.collect_outputs: + collect_job = collect_outputs( + stage_n.output.dir_name, + tags=None, #self.tags or None, + job_uuids=self.job_uuids, + calcs_reversed=self.calcs_reversed, + task_type=self.stage_task_type, + ) + # jobs.append(collect_job) + stage_n_flow = Flow([stage_job_0,control_stage_0,stage_n,collect_job]) + else: + stage_n_flow = Flow([stage_job_0,control_stage_0,stage_n]) + + return stage_n_flow - return Flow([stage_n], name=self.name, output=stage_n.output) - + #return Response(output=stage_n.output) + + @job def dynamic_flow( self, - interchange: Interchange | OpenMMInterchange | str, - prev_dir: str | None, - stage_index: int, - stage_docs: list[OpenMMTaskDocument], - ) -> Response: + prev_stage_index: int, + prev_docs: list[OpenMMTaskDocument], + ) -> Response | None: """Run stage n and dynamically decide to continue or terminate flow.""" - - # `stage` is either a job or a flow - stage = self.maker.make( - interchange=interchange, - prev_dir=prev_dir, - ) - # stage decision login--> (a) terminate, (b) continue, (c) pause, etc - stage_control = self.apply_flow_control( - stage_doc=stage.output, - stage_index=stage_index, - stage_docs=stage_docs, + print(f"previous stage index: {prev_stage_index}") + prev_stage = prev_docs[-1] + + # stage control logic: (a) begin, (b) continue, (c) terminate, (d) pause + if not prev_stage.should_continue: # terminate + print(f"Equilibration complete!") + return Response(output=prev_stage) + if prev_stage_index > self.max_stages: # pause + print(f"Reached max stages with equilibration incomplete") + print(f"prev_stage.output should_continue={prev_stage.should_continue}") + #setattr(prev_stage.output, "should_continue", False) + #setattr(prev_stage.output, "stage_index", stage_index) + return Response(output=prev_stage) + + stage_index = prev_stage_index + 1 + + stage_job_n = self.maker.make( + interchange=prev_stage.interchange, + prev_dir=prev_stage.dir_name, ) - - # conditionally run flow until should_continue=False - flow = Flow( - jobs=[stage, stage_control], - output=stage_control.output, - name=f"{self.name} Stage {stage_index}", + self.jobs.append(stage_job_n) + print(f"stage_job_n->{stage_job_n.name}") + + # collect the uuids and calcs for the final collect job + if isinstance(stage_job_n, Flow): + self.job_uuids.extend(stage_job_n.job_uuids) + else: + self.job_uuids.append(stage_job_n.uuid) + self.calcs_reversed.append(_get_calcs_reversed(stage_job_n)) + print(f"appended stage {self.jobs[-1]}") + print(f"appended job_uuid {self.job_uuids[-1]}") + print(f"appended calc {self.calcs_reversed[-1]}") + print(f"stage_job_n->{stage_job_n}") + + control_stage_n = self.should_continue( + task_docs=prev_docs+[stage_job_n.output], + stage_index=stage_index, + max_stages=self.max_stages, ) - return Response(replace=flow) - - def apply_flow_control( - self, - stage_doc: OpenMMTaskDocument, - stage_index: int, - stage_docs: list[OpenMMTaskDocument], - ) -> Response: - - # update "stage" docs - stage_docs = stage_docs + [stage_doc] - - continue_flow = self.should_continue( - stage_doc, - stage_index, - self.max_stages, + self.jobs.append(control_stage_n) + print(f"control_stage_n->{control_stage_n}") + + next_stage_n = self.dynamic_flow( + prev_stage_index=stage_index, + prev_docs=prev_docs + [control_stage_n.output], ) - - if continue_flow and stage_index < self.max_stages: - next_stage = self.dynamic_flow( - interchange=stage_doc.interchange, - prev_dir=stage_doc.dir_name, - stage_index=stage_index+1, - stage_docs=stage_docs, - ) - return Response(replace=Flow([next_stage], output=next_stage.output)) - - if self.collect_outputs: - print(stage_docs) - dynamic_collection = dynamic_collect_outputs(all_docs=stage_docs) - final_flow = Flow([dynamic_collection], output=dynamic_collection.output) - return None + self.jobs.append(next_stage_n) + stage_n_flow = Flow([stage_job_n,control_stage_n, next_stage_n]) + + return Response(replace=stage_n_flow, output=next_stage_n.output) + + From 2439f18b795798a8b09bfe114726477ec33e8ec0 Mon Sep 17 00:00:00 2001 From: Shehan M Parmar Date: Mon, 3 Feb 2025 21:14:01 -0500 Subject: [PATCH 5/7] removed dynamic_collect_outputs --- src/atomate2/openmm/flows/core.py | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/src/atomate2/openmm/flows/core.py b/src/atomate2/openmm/flows/core.py index d32fd360d9..9c614bf789 100644 --- a/src/atomate2/openmm/flows/core.py +++ b/src/atomate2/openmm/flows/core.py @@ -65,35 +65,6 @@ def collect_outputs( return Response(output=task_doc) -@openmm_job -def dynamic_collect_outputs( - all_docs: list[OpenMMTaskDocument] -) -> list[OpenMMTaskDocument]: - """Wrapper to run "collect_outputs" jobs dynamically """ - if not all_docs: - return [] - - doc = all_docs.pop(0) - print(f"collecting outputs for:\n{doc}") - prev_dir = doc.dir_name - tags = None # need to pass this in - job_uuids = doc.job.uuid - calcs_reversed = doc.calcs_reversed - task_type = "collect" - collect_job = collect_outputs( - prev_dir=prev_dir, - tags=tags or None, - job_uuids=job_uuids, - calcs_reversed=calcs_reversed, - task_type=task_type, - ) - if all_docs: - next_collect_job = dynamic_collect_outputs(all_docs=all_docs) - flow = Flow(jobs=[collect_job, next_collect_job],output=next_job.output) - return Response(replace=flow) - else: - return Response(output=[doc], addition=collect_job) - @openmm_job def default_should_continue( task_docs: list[OpenMMTaskDocument], From e3ffa977081f5624eb1602fa3f8212330bff7539 Mon Sep 17 00:00:00 2001 From: Shehan M Parmar Date: Wed, 5 Feb 2025 20:18:26 -0500 Subject: [PATCH 6/7] added and passed DynamicOpenMMFlowMaker tests; fixed dynamic flow logic --- src/atomate2/openmm/flows/__init__.py | 2 +- src/atomate2/openmm/flows/core.py | 290 +++++++++++++------------- tests/openmm_md/flows/test_core.py | 52 ++++- 3 files changed, 201 insertions(+), 143 deletions(-) diff --git a/src/atomate2/openmm/flows/__init__.py b/src/atomate2/openmm/flows/__init__.py index 734b8e4d2a..aab17b501d 100644 --- a/src/atomate2/openmm/flows/__init__.py +++ b/src/atomate2/openmm/flows/__init__.py @@ -1,3 +1,3 @@ """Core Flows for OpenMM module.""" -from atomate2.openmm.flows.core import OpenMMFlowMaker +from atomate2.openmm.flows.core import DynamicOpenMMFlowMaker, OpenMMFlowMaker diff --git a/src/atomate2/openmm/flows/core.py b/src/atomate2/openmm/flows/core.py index 9c614bf789..15988b8185 100644 --- a/src/atomate2/openmm/flows/core.py +++ b/src/atomate2/openmm/flows/core.py @@ -3,23 +3,44 @@ from __future__ import annotations import json -import numpy as np -from scipy.signal import savgol_filter from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING +import numpy as np from emmet.core.openmm import Calculation, OpenMMInterchange, OpenMMTaskDocument -from jobflow import Flow, Job, Maker, Response, job +from jobflow import CURRENT_JOB, Flow, Job, Maker, Response, job from monty.json import MontyDecoder, MontyEncoder +from scipy.signal import savgol_filter from atomate2.openmm.jobs.base import BaseOpenMMMaker, openmm_job from atomate2.openmm.jobs.core import NVTMaker, TempChangeMaker from atomate2.openmm.utils import create_list_summing_to if TYPE_CHECKING: + from collections.abc import Callable + from openff.interchange import Interchange + +def _get_final_jobs(input_jobs: list[Job] | Flow) -> list[Job]: + """Unwrap nested jobs from a dynamic flow.""" + jobs = input_jobs.jobs if isinstance(input_jobs, Flow) else input_jobs + if not jobs: + return [] + + # check if the last job is a flow with .maker.jobs + last = jobs[-1] + if ( + hasattr(last, "maker") + and hasattr(last.maker, "jobs") + and isinstance(last.maker.jobs, list) + ): + # recursively explore .maker.jobs + return _get_final_jobs(last.maker.jobs) + return jobs + + def _get_calcs_reversed(job: Job | Flow) -> list[Calculation | list]: """Unwrap a nested list of calcs from jobs or flows.""" if isinstance(job, Flow): @@ -65,89 +86,71 @@ def collect_outputs( return Response(output=task_doc) + @openmm_job def default_should_continue( - task_docs: list[OpenMMTaskDocument], + task_docs: list[OpenMMTaskDocument], stage_index: int, max_stages: int, - physical_property: str = "potential_energy", + physical_property: str = "potential_energy", target: float | None = None, threshold: float = 1e-3, + burn_in_ratio: float = 0.2, ) -> Response: - """Default dynamic flow logic for deciding to continue flow (True) - + """Decide dynamic flow logic (True). + This serves as a template for any bespoke "should_continue" functions - written by the user. By default, simulation logic depends on stability - of potential energy as a function of time, dU_dt. + written by the user. By default, simulation logic depends on stability + of potential energy as a function of time, dU_dt. """ - print(f"%%%%% Running should_continue for stage index {stage_index} %%%%%") task_doc = task_docs[-1] - setattr(task_doc, "stage_index", stage_index) - print(f"task_doc.stage_index = {stage_index}") - print(f"there are {len(task_docs)} task_docs") + # get key physical parameters from calculation list potential_energy: list[float] = [] density: list[float] = [] for doc in task_docs: potential_energy.extend(doc.calcs_reversed[0].output.potential_energy) density.extend(doc.calcs_reversed[0].output.density) dt = doc.calcs_reversed[0].input.state_interval - print(f"total number of energies: {len(potential_energy)}") - - # toss out first 20% of values - burn_in = int(0.2 * len(potential_energy)) + if physical_property == "density": - values = np.array(density[burn_in:]) + values = np.array(density) elif physical_property == "potential_energy": - values = np.array(potential_energy[burn_in:]) - - if int(0.2 * len(values)) % 2 == 0: - window_length = max(5, int(0.2 * len(values)) + 1) - else: - window_length = max(5, int(0.2 * len(values))) - print(f"window_length={window_length}") - dValue_dt = savgol_filter(values, window_length, polyorder=3, deriv=1, delta=dt) - decay_rate = np.max(np.abs(dValue_dt)) + values = np.array(potential_energy) + + # toss out first X% of values, default 20% + burn_in = int(burn_in_ratio * len(values)) + values = values[burn_in:] + window_length = max(5, burn_in + 1) if burn_in % 2 == 0 else max(5, burn_in) + avg = np.mean(values) - std = np.std(values) - fluctuations = std / np.abs(avg) - - print(f"checking {physical_property}") - print(f"Mean of last 80% energies: {avg:.5f}") - print(f"Std of last 80% energies: {std:.5f}") - print(f"Fluctuations: {fluctuations:.5f}") - print(f"N^(1/2) ~ {len(values)**0.5}") - print(f"decay_rate: {decay_rate:.5f}") - if target: - delta = np.abs( (avg - target) / target ) - if delta < threshold: - print(f"{physical_property} within {threshold} threshold") - should_continue = False - print(f"should_continue={should_continue}") - else: - print(f"{physical_property} error at {delta*100} percent") - should_continue = True - print(f"should_continue={should_continue}") - elif stage_index > max_stages: - print(f"MAX STAGES REACHED!!") - should_continue=False - print(f"should_continue={should_continue}") - elif decay_rate < threshold: - print(f"EQUILIBRIUM REACHED!!") - should_continue=False - print(f"should_continue={should_continue}") - else: - print(f"decay_rate = {decay_rate}") - should_continue=True - print(f"should_continue={should_continue}") - - if stage_index < max_stages: - should_continue=True - else: - should_continue=False - setattr(task_doc, "should_continue", should_continue) + dvalue_dt = savgol_filter( + values / avg, window_length, polyorder=3, deriv=1, delta=dt + ) + decay_rate = np.max(np.abs(dvalue_dt)) + job = CURRENT_JOB.job + + if target: + delta = np.abs((avg - target) / target) + should_continue = not delta < threshold + job.append_name( + f" [Stage {stage_index}, delta={delta:.3e}" + f"-> should_continue={should_continue}]" + ) + elif stage_index > max_stages or decay_rate < threshold: # max_stages exceeded + should_continue = False + else: # decay_rate not stable + should_continue = True + + job.append_name( + f" [Stage {stage_index}, decay_rate={decay_rate:.3e}" + f"-> should_continue={should_continue}]" + ) + + task_doc.should_continue = should_continue return Response(output=task_doc) + @dataclass class OpenMMFlowMaker(Maker): """Run a production simulation. @@ -316,46 +319,66 @@ def anneal_flow( @dataclass class DynamicOpenMMFlowMaker(Maker): """Run a dynamic equlibration or production simulation. - + Create a dynamic flow out of an existing OpenMM simulation job or a linear sequence of linked jobs, i.e., an OpenMM - flow. + flow. Attributes ---------- name : str - The name of the dynamic OpenMM job or flow. Default is the name - of the inherited maker name with "dynamic" preprended. + The name of the dynamic OpenMM job or flow. Default is the name + of the inherited maker name with "dynamic" prepended. + tags : list[str] + Tags to apply to the final job. Will only be applied if collect_jobs is True. maker: Union[BaseOpenMMMaker, OpenMMFlowMaker] - A list of makers to string together. + A single (either job or flow) maker to make dynamic. max_stages: int Maximum number of stages to run consecutively before terminating - dynamic flow logic. + dynamic flow logic. collect_outputs : bool If True, a final job is added that collects all jobs into a single task document. - should_continue: ... - A function ... Default is _default_should_continue...#TODO + should_continue: Callable + A general function for evaluating properties in `calcs_reversed` + to determine simulation flow logic (i.e., termination, pausing, + or continuing). + jobs: list[BaseOpenMMMaker | OpenMMFlowMaker] + A running list of jobs in simulation flow. + job_uuids: list + A running list of job uuids in simulation flow. + calcs_reversed: list[Calculation] + A running list of Calculations in simulation flow. """ - + name: str = field(default=None) + tags: list[str] = field(default_factory=list) maker: BaseOpenMMMaker | OpenMMFlowMaker = field( default_factory=lambda: BaseOpenMMMaker() ) max_stages: int = field(default=5) collect_outputs: bool = True - should_continue: Callable[[list[OpenMMTaskDocument], int, int], Response] = field( - default_factory=lambda: default_should_continue - ) - stage_task_type: str = "collect" # or default to OpenMMFlowMaker final_task_type? - + ( + list[OpenMMTaskDocument], + int, + int, + str, + float | None, + float, + float, + ) + should_continue: Callable[ + [list[OpenMMTaskDocument], int, int, str, float | None, float, str], Response + ] = field(default_factory=lambda: default_should_continue) + jobs: list = field(default_factory=list) job_uuids: list = field(default_factory=list) calcs_reversed: list[Calculation] = field(default_factory=list) - + stage_task_type: str = "collect" + def __post_init__(self) -> None: """Post init formatting of arguments.""" - if self.name is None: + if self.name is None: self.name = f"dynamic {self.maker.name}" def make( @@ -363,116 +386,101 @@ def make( interchange: Interchange | OpenMMInterchange | str, prev_dir: str | None = None, ) -> Flow: - + """Run the dynamic simulation using the provided Interchange object. + + Parameters + ---------- + interchange : Interchange + The Interchange object containing the system + to simulate. + prev_task : Optional[ClassicalMDTaskDocument] + The directory of the previous task. + + Returns + ------- + Flow + A Flow object containing the OpenMM jobs for the simulation. + """ # Run initial stage job stage_job_0 = self.maker.make( interchange=interchange, prev_dir=prev_dir, ) self.jobs.append(stage_job_0) - print(f"stage_job_0->{stage_job_0.name}") - + # collect the uuids and calcs for the final collect job if isinstance(stage_job_0, Flow): self.job_uuids.extend(stage_job_0.job_uuids) else: self.job_uuids.append(stage_job_0.uuid) self.calcs_reversed.append(_get_calcs_reversed(stage_job_0)) - print(f"appended stage {self.jobs[-1]}") - print(f"appended job_uuid {self.job_uuids[-1]}") - print(f"appended calc {self.calcs_reversed[-1]}") - + # Determine stage job control logic control_stage_0 = self.should_continue( - task_docs=[stage_job_0.output], + task_docs=[stage_job_0.output], stage_index=0, - max_stages=self.max_stages, + max_stages=self.max_stages, ) self.jobs.append(control_stage_0) - print(f"control_stage_0->{control_stage_0}") - - # stage_0 = Flow([stage_job_0, control_stage_0]) - + stage_n = self.dynamic_flow( prev_stage_index=0, prev_docs=[control_stage_0.output], ) self.jobs.append(stage_n) - - if self.collect_outputs: - collect_job = collect_outputs( - stage_n.output.dir_name, - tags=None, #self.tags or None, - job_uuids=self.job_uuids, - calcs_reversed=self.calcs_reversed, - task_type=self.stage_task_type, - ) - # jobs.append(collect_job) - stage_n_flow = Flow([stage_job_0,control_stage_0,stage_n,collect_job]) - else: - stage_n_flow = Flow([stage_job_0,control_stage_0,stage_n]) - - return stage_n_flow + return Flow([stage_job_0, control_stage_0, stage_n], output=stage_n.output) - #return Response(output=stage_n.output) - @job def dynamic_flow( self, prev_stage_index: int, prev_docs: list[OpenMMTaskDocument], - ) -> Response | None: + ) -> Response | None: """Run stage n and dynamically decide to continue or terminate flow.""" - - print(f"previous stage index: {prev_stage_index}") - prev_stage = prev_docs[-1] - + prev_stage = prev_docs[-1] + # stage control logic: (a) begin, (b) continue, (c) terminate, (d) pause - if not prev_stage.should_continue: # terminate - print(f"Equilibration complete!") - return Response(output=prev_stage) - if prev_stage_index > self.max_stages: # pause - print(f"Reached max stages with equilibration incomplete") - print(f"prev_stage.output should_continue={prev_stage.should_continue}") - #setattr(prev_stage.output, "should_continue", False) - #setattr(prev_stage.output, "stage_index", stage_index) + if ( + prev_stage_index >= self.max_stages or not prev_stage.should_continue + ): # pause + if self.collect_outputs: + collect_job = collect_outputs( + prev_stage.dir_name, + tags=self.tags or None, + job_uuids=self.job_uuids, + calcs_reversed=self.calcs_reversed, + task_type=self.stage_task_type, + ) + return Response(replace=collect_job, output=collect_job.output) return Response(output=prev_stage) stage_index = prev_stage_index + 1 - + stage_job_n = self.maker.make( - interchange=prev_stage.interchange, + interchange=prev_stage.interchange, prev_dir=prev_stage.dir_name, ) self.jobs.append(stage_job_n) - print(f"stage_job_n->{stage_job_n.name}") - + # collect the uuids and calcs for the final collect job if isinstance(stage_job_n, Flow): self.job_uuids.extend(stage_job_n.job_uuids) else: self.job_uuids.append(stage_job_n.uuid) self.calcs_reversed.append(_get_calcs_reversed(stage_job_n)) - print(f"appended stage {self.jobs[-1]}") - print(f"appended job_uuid {self.job_uuids[-1]}") - print(f"appended calc {self.calcs_reversed[-1]}") - print(f"stage_job_n->{stage_job_n}") - + control_stage_n = self.should_continue( - task_docs=prev_docs+[stage_job_n.output], + task_docs=[*prev_docs, stage_job_n.output], stage_index=stage_index, - max_stages=self.max_stages, + max_stages=self.max_stages, ) self.jobs.append(control_stage_n) - print(f"control_stage_n->{control_stage_n}") - + next_stage_n = self.dynamic_flow( prev_stage_index=stage_index, - prev_docs=prev_docs + [control_stage_n.output], + prev_docs=[*prev_docs, control_stage_n.output], ) self.jobs.append(next_stage_n) - stage_n_flow = Flow([stage_job_n,control_stage_n, next_stage_n]) - + stage_n_flow = Flow([stage_job_n, control_stage_n, next_stage_n]) + return Response(replace=stage_n_flow, output=next_stage_n.output) - - diff --git a/tests/openmm_md/flows/test_core.py b/tests/openmm_md/flows/test_core.py index f30a23ce85..f4ced23f1e 100644 --- a/tests/openmm_md/flows/test_core.py +++ b/tests/openmm_md/flows/test_core.py @@ -12,7 +12,7 @@ from monty.json import MontyDecoder from openmm.app import PDBFile -from atomate2.openmm.flows.core import OpenMMFlowMaker +from atomate2.openmm.flows.core import DynamicOpenMMFlowMaker, OpenMMFlowMaker from atomate2.openmm.jobs import EnergyMinimizationMaker, NPTMaker, NVTMaker @@ -178,6 +178,56 @@ def test_flow_maker(interchange, run_job): assert len(u.trajectory) == 5 +def test_dynamic_flow_maker(interchange, run_job): + from functools import partial + + from jobflow import run_locally + + from atomate2.openmm.flows.core import _get_final_jobs, default_should_continue + + should_continue = partial( + default_should_continue, + physical_property="potential_energy", + threshold=1e-2, + ) + should_continue.__name__ = "should_continue" + + # Create an instance of DynamicFlowMaker with custom parameters + dynamic_flow_maker = DynamicOpenMMFlowMaker( + name="test dynamic equilibration", + tags=["test"], + maker=NPTMaker(n_steps=200, pressure=1.0, state_interval=10, traj_interval=10), + max_stages=10, + should_continue=should_continue, + ) + + production_flow = dynamic_flow_maker.make(interchange) + response_dict = run_locally(Flow([production_flow])) + task_doc = list(response_dict.values())[-1][2].output + + assert isinstance(task_doc, OpenMMTaskDocument) + assert task_doc.state == "successful" + assert (len(task_doc.calcs_reversed) - 1) <= dynamic_flow_maker.max_stages + assert task_doc.calcs_reversed[-1].task_name == "npt simulation" + assert task_doc.calcs_reversed[0].task_name == "npt simulation" + assert task_doc.tags == ["test"] + assert task_doc.job_uuids[0] is not None + + ## Check the individual jobs in the flow + job_list = _get_final_jobs(production_flow) + npt_job_0 = job_list[0] + assert isinstance(npt_job_0.maker, NPTMaker) + + npt_stages = 0 + for job in job_list: + if isinstance(job.maker, NPTMaker): + npt_stages += 1 + + assert (npt_stages - 1) <= dynamic_flow_maker.max_stages + assert task_doc.calcs_reversed[0].output.traj_file == f"trajectory{npt_stages}.dcd" + assert task_doc.calcs_reversed[0].output.traj_file == f"state{npt_stages}.csv" + + def test_traj_blob_embed(interchange, run_job, tmp_path): nvt = NVTMaker(n_steps=2, traj_interval=1, embed_traj=True) From c05c7c21192b772126d54aafc4ea1cae4b0c7d65 Mon Sep 17 00:00:00 2001 From: Shehan M Parmar Date: Wed, 5 Feb 2025 20:24:52 -0500 Subject: [PATCH 7/7] undo incorrect change --- src/atomate2/openmm/flows/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/atomate2/openmm/flows/core.py b/src/atomate2/openmm/flows/core.py index 15988b8185..cc48342356 100644 --- a/src/atomate2/openmm/flows/core.py +++ b/src/atomate2/openmm/flows/core.py @@ -164,7 +164,7 @@ class OpenMMFlowMaker(Maker): The name of the production job. Default is "production". tags : list[str] Tags to apply to the final job. Will only be applied if collect_jobs is True. - makers: Union[BaseOpenMMMaker, OpenMMFlowMaker] + makers: list[BaseOpenMMMaker] A list of makers to string together. collect_outputs : bool If True, a final job is added that collects all jobs into a single