Skip to content

Commit

Permalink
chore: simplify process monitoring codes in flow.py
Browse files Browse the repository at this point in the history
  • Loading branch information
ZLI-afk committed Jan 27, 2024
1 parent 5e3eb36 commit 60a8db1
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 95 deletions.
177 changes: 84 additions & 93 deletions apex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ def __init__(
executor: Optional[DispatcherExecutor] = None,
upload_python_packages: Optional[List[os.PathLike]] = None,
):
self.download_path = None
self.upload_path = None
self.workflow = None
self.relax_param = None
self.props_param = None

self.relax_make_op = relax_make_op
self.relax_post_op = relax_post_op
self.props_make_op = props_make_op
Expand All @@ -61,6 +67,48 @@ def __init__(
self.executor = executor
self.upload_python_packages = upload_python_packages

def _monitor_relax(self):
print('Waiting for relaxation result...')
while True:
time.sleep(4)
step_info = self.workflow.query()
relax_post = step_info.get_step(name='relaxation-cal')[0]
if relax_post['phase'] == 'Succeeded':
print(f'Relaxation finished (ID: {self.workflow.id}, UID: {self.workflow.uid})')
print('Retrieving completed tasks to local...')
download_artifact(
artifact=relax_post.outputs.artifacts['retrieve_path'],
path=self.download_path
)
break

def _monitor_props(
self,
subprops_key_list: List[str],
):
print(f'Waiting for sub-property results ({len(subprops_key_list)} left)...')
while True:
time.sleep(4)
step_info = self.workflow.query()
for kk in subprops_key_list:
try:
step = step_info.get_step(key=kk)[0]
except IndexError:
continue
if step['phase'] == 'Succeeded':
print(f'Sub-workflow {kk} finished (ID: {self.workflow.id}, UID: {self.workflow.uid})')
print('Retrieving completed tasks to local...')
download_artifact(
artifact=step.outputs.artifacts['retrieve_path'],
path=self.download_path
)
subprops_key_list.remove(kk)
if subprops_key_list:
print(f'Waiting for sub-property results ({len(subprops_key_list)} left)...')
if not subprops_key_list:
print(f'Workflow all finished (ID: {self.workflow.id}, UID: {self.workflow.uid})')
break

def _set_relax_flow(
self,
input_work_dir: dflow.common.S3Artifact,
Expand Down Expand Up @@ -179,28 +227,21 @@ def submit_relax(
download_path: Union[os.PathLike, str],
relax_parameter: dict,
labels: Optional[dict] = None
):
wf = Workflow(name='relaxation', labels=labels)
) -> str:
self.upload_path = upload_path
self.download_path = download_path
self.relax_param = relax_parameter
self.workflow = Workflow(name='relaxation', labels=labels)
relaxation = self._set_relax_flow(
input_work_dir=upload_artifact(upload_path),
relax_parameter=relax_parameter
)
wf.add(relaxation)
wf.submit()
self.workflow.add(relaxation)
self.workflow.submit()
# Wait for and retrieve relaxation
self._monitor_relax()

print('Waiting for relaxation result...')
while wf.query_status() in ["Pending", "Running"]:
time.sleep(4)
assert (wf.query_status() == 'Succeeded')
print(f'Relaxation Workflow finished (ID: {wf.id}, UID: {wf.uid})')
print('Retrieving completed tasks to local...')
final_step = wf.query_step(name='relaxation-cal')[0]

download_artifact(
final_step.outputs.artifacts['retrieve_path'],
path=download_path
)
return wf.id
return self.workflow.id

@json2dict
def submit_props(
Expand All @@ -209,40 +250,21 @@ def submit_props(
download_path: Union[os.PathLike, str],
props_parameter: dict,
labels: Optional[dict] = None
):
wf = Workflow(name='property', labels=labels)
) -> str:
self.upload_path = upload_path
self.download_path = download_path
self.props_param = props_parameter
self.workflow = Workflow(name='property', labels=labels)
subprops_list, subprops_key_list = self._set_props_flow(
input_work_dir=upload_artifact(upload_path),
props_parameter=props_parameter
)
wf.add(subprops_list)
wf.submit()
self.workflow.add(subprops_list)
self.workflow.submit()
# wait for and retrieve sub-property flows
self._monitor_props(subprops_key_list)

print(f'Waiting for sub-property results ({len(subprops_key_list)} left)...')
# Hearing sub-property flows
while True:
time.sleep(4)
step_info = wf.query()
for kk in subprops_key_list:
try:
step = step_info.get_step(key=kk)[0]
except IndexError:
continue
if step['phase'] == 'Succeeded':
print(f'Sub-workflow {kk} finished (ID: {wf.id}, UID: {wf.uid})')
print('Retrieving completed tasks to local...')
download_artifact(
artifact=step.outputs.artifacts['retrieve_path'],
path=download_path
)
subprops_key_list.remove(kk)
if subprops_key_list:
print(f'Waiting for sub-property results ({len(subprops_key_list)} left)...')
if not subprops_key_list:
print(f'Workflow all finished (ID: {wf.id}, UID: {wf.uid})')
break

return wf.id
return self.workflow.id

@json2dict
def submit_joint(
Expand All @@ -252,57 +274,26 @@ def submit_joint(
relax_parameter: dict,
props_parameter: dict,
labels: Optional[dict] = None
):
wf = Workflow(name='joint', labels=labels)
) -> str:
self.upload_path = upload_path
self.download_path = download_path
self.relax_param = relax_parameter
self.props_param = props_parameter
self.workflow = Workflow(name='joint', labels=labels)
relaxation = self._set_relax_flow(
input_work_dir=upload_artifact(upload_path),
relax_parameter=relax_parameter
relax_parameter=self.relax_param
)
subprops_list, subprops_key_list = self._set_props_flow(
input_work_dir=relaxation.outputs.artifacts["output_all"],
props_parameter=props_parameter
props_parameter=self.props_param
)
wf.add(relaxation)
wf.add(subprops_list)
wf.submit()

print('Waiting for relaxation result...')
# Hearing relaxation
while True:
time.sleep(4)
step_info = wf.query()
relax_post = step_info.get_step(name='relaxation-cal')[0]
if relax_post['phase'] == 'Succeeded':
print(f'Relaxation finished (ID: {wf.id}, UID: {wf.uid})')
print('Retrieving completed tasks to local...')
download_artifact(
artifact=relax_post.outputs.artifacts['retrieve_path'],
path=download_path
)
break

print(f'Waiting for sub-property results ({len(subprops_key_list)} left)...')
# Hearing sub-property flows
while True:
time.sleep(4)
step_info = wf.query()
for kk in subprops_key_list:
try:
step = step_info.get_step(key=kk)[0]
except IndexError:
continue
if step['phase'] == 'Succeeded':
print(f'Sub-workflow {kk} finished (ID: {wf.id}, UID: {wf.uid})')
print('Retrieving completed tasks to local...')
download_artifact(
artifact=step.outputs.artifacts['retrieve_path'],
path=download_path
)
subprops_key_list.remove(kk)
if subprops_key_list:
print(f'Waiting for sub-property results ({len(subprops_key_list)} left)...')
if not subprops_key_list:
print(f'Workflow all finished (ID: {wf.id}, UID: {wf.uid})')
break
self.workflow.add(relaxation)
self.workflow.add(subprops_list)
self.workflow.submit()
# Wait for and retrieve relaxation
self._monitor_relax()
# Wait for and retrieve sub-property flows
self._monitor_props(subprops_key_list)

return wf.id
return self.workflow.id
4 changes: 2 additions & 2 deletions apex/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ def submit(
):
if is_sub:
# reset dflow global config for sub-processes
print(f'Sub-process working on: {work_dir}')
logging.info(msg=f'Sub-process working on: {work_dir}')
config.update(conf)
s3_config.update(s3_conf)
logging.basicConfig(level=logging.INFO)
else:
print(f'Working on: {work_dir}')
logging.info(msg=f'Working on: {work_dir}')

with tempfile.TemporaryDirectory() as tmp_dir:
logging.debug(msg=f'Temporary upload directory:{tmp_dir}')
Expand Down

0 comments on commit 60a8db1

Please sign in to comment.