Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase robustness: version check, prevent precompilation with MPI, automatically precompile in the workchain #17

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ aiida-dftk = 'aiida_dftk.cli:cmd_root'

[project.entry-points.'aiida.calculations']
'dftk' = 'aiida_dftk.calculations:DftkCalculation'
'dftk.precompile' = 'aiida_dftk.calculations:PrecompileCalculation'

[project.entry-points.'aiida.parsers']
'dftk' = 'aiida_dftk.parsers:DftkParser'
Expand Down
102 changes: 85 additions & 17 deletions src/aiida_dftk/calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@
import os
import json
import typing as ty
from pathlib import Path

from aiida import orm
from aiida.common import datastructures, exceptions
from aiida.engine import CalcJob
from aiida.engine import CalcJob, ExitCode
from aiida_pseudo.data.pseudo import UpfData
from pymatgen.core import units


_AIIDA_DFTK_VERSION_SPEC = "0.2.0"


class DftkCalculation(CalcJob):
"""`CalcJob` implementation for DFTK."""

INPUT_FILENAME = 'run_dftk.json'
LOGFILE = 'run_dftk.log'
SCFRES_SUMMARY_NAME = 'self_consistent_field.json'
# TODO: don't limit postscf
_SUPPORTED_POSTSCF = ['compute_forces_cart', 'compute_stresses_cart', 'compute_bands']
Expand Down Expand Up @@ -45,7 +49,7 @@ def define(cls, spec):
options = spec.inputs['metadata']['options']

options['parser_name'].default = 'dftk'
options['input_filename'].default = f'run_dftk.json'
options['input_filename'].default = cls.INPUT_FILENAME
options['max_wallclock_seconds'].default = 1800

# TODO: Why is this here?
Expand All @@ -62,6 +66,8 @@ def define(cls, spec):
spec.exit_code(501, 'ERROR_SCF_OUT_OF_WALLTIME',message='The SCF was interuptted due to out of walltime. Non-recovarable error.')
spec.exit_code(502, 'ERROR_POSTSCF_OUT_OF_WALLTIME',message='The POSTSCF was interuptted due to out of walltime.')
spec.exit_code(503, 'ERROR_BANDS_CONVERGENCE_NOT_REACHED', message='The BANDS minimization cycle did not converge.')
# Significant errors but calculation can be used to restart
spec.exit_code(400, 'ERROR_PACKAGE_IMPORT_FAILED', message="Failed to import AiiDA DFTK or write first log message. Typically indicates an environment issue.")

# Outputs
spec.output('output_parameters', valid_type=orm.Dict, help='output parameters')
Expand Down Expand Up @@ -98,16 +104,20 @@ def _validate_options(self):
)

def _validate_inputs(self):
"""Validate input parameters.

Check that the post-SCF function(s) are supported.
"""
"""Validate input parameters."""
parameters = self.inputs.parameters.get_dict()
if 'postscf' in parameters:
for postscf in parameters['postscf']:
if postscf['$function'] not in self._SUPPORTED_POSTSCF:
raise exceptions.InputValidationError(f"Unsupported postscf function: {postscf['$function']}")

# We want the option to be set for `verdi calcjob inputcat` to work,
# but we don't allow overriding it because it would affect the name of the log file.
if self.metadata.options.input_filename != self.INPUT_FILENAME:
raise exceptions.InputValidationError(
f"Input filename must be {self.INPUT_FILENAME}. Was: {self.metadata.options.input_filename}"
)

def _validate_pseudos(self):
"""Validate the pseudopotentials.

Expand Down Expand Up @@ -146,16 +156,23 @@ def _generate_inputdata(

local_copy_pseudo_list = []

data = {'periodic_system': {}, 'model_kwargs': {}, 'basis_kwargs': {}, 'scf': {}, 'postscf': []}
data = {
'periodic_system': {},
'pseudopotentials': {},
'model_kwargs': {},
'basis_kwargs': {},
'scf': {},
'postscf': [],
}
data['periodic_system']['bounding_box'] = [[x * units.ang_to_bohr for x in vec] for vec in structure.cell]
data['periodic_system']['atoms'] = []
for site in structure.sites:
data['periodic_system']['atoms'].append({
'symbol': site.kind_name,
'position': [X * units.ang_to_bohr for X in list(site.position)],
'pseudopotential': f'{self._PSEUDO_SUBFOLDER}{pseudos[site.kind_name].filename}'
})
pseudo = pseudos[site.kind_name]
for symbol, pseudo in pseudos.items():
data['pseudopotentials'][symbol] = f'{self._PSEUDO_SUBFOLDER}{pseudo.filename}'
local_copy_pseudo_list.append((pseudo.uuid, pseudo.filename, f'{self._PSEUDO_SUBFOLDER}{pseudo.filename}'))
data['basis_kwargs']['kgrid'], data['basis_kwargs']['kshift'] = kpoints.get_kpoints_mesh()

Expand All @@ -170,12 +187,6 @@ def _generate_inputdata(

return data, local_copy_pseudo_list

def _generate_cmdline_params(self) -> ty.List[str]:
# Define the command based on the input settings
cmd_params = []
cmd_params.extend(['-e', 'using AiidaDFTK; AiidaDFTK.run()', self.metadata.options.input_filename])
return cmd_params

def _generate_retrieve_list(self, parameters: orm.Dict) -> list:
"""Generate the list of files to retrieve based on the type of calculation requested in the input parameters.

Expand All @@ -188,6 +199,7 @@ def _generate_retrieve_list(self, parameters: orm.Dict) -> list:
f"{item['$function']}.json" if item['$function'] == 'compute_bands' else f"{item['$function']}.hdf5"
for item in parameters['postscf']
]
retrieve_list.append(self.LOGFILE)
retrieve_list.append('timings.json')
retrieve_list.append(f'{self.SCFRES_SUMMARY_NAME}')
return retrieve_list
Expand Down Expand Up @@ -232,7 +244,14 @@ def prepare_for_submission(self, folder):
remote_copy_list.append(checkpointfile_info)

# prepare command line parameters
cmdline_params = self._generate_cmdline_params()
cmdline_params = [
# Precompilation under MPI generally deadlocks. Make sure everything is already precompiled.
'--compiled-modules=strict',
'-e', 'using AiidaDFTK; AiidaDFTK.run(inputfile="{}", allowed_versions="{}")'.format(
self.metadata.options.input_filename,
_AIIDA_DFTK_VERSION_SPEC,
),
]

# prepare retrieve list
retrieve_list = self._generate_retrieve_list(self.inputs.parameters)
Expand All @@ -251,3 +270,52 @@ def prepare_for_submission(self, folder):
calcinfo.local_copy_list = local_copy_list

return calcinfo

class PrecompileCalculation(CalcJob):
"""Calcjob implementation to precompile AiidaDFTK."""

_SUCCESS_PRINT = "Import successful"

@classmethod
def define(cls, spec):
"""Define the process specification."""
super().define(spec)

options = spec.inputs['metadata']['options']
options['resources'].default = {'num_machines': 1, 'num_mpiprocs_per_machine': 1}

def prepare_for_submission(self, folder):
# Set up the `CodeInfo` to pass to `CalcInfo`
codeinfo = datastructures.CodeInfo()
codeinfo.code_uuid = self.inputs.code.uuid
codeinfo.cmdline_params = [
'-e', f'using Pkg; Pkg.precompile(; strict=true); using AiidaDFTK; println("{self._SUCCESS_PRINT}")'
]
codeinfo.withmpi = False

# Set up the `CalcInfo` so AiiDA knows what to do with everything
calcinfo = datastructures.CalcInfo()
calcinfo.codes_info = [codeinfo]

return calcinfo

# Easier to override the parse method than to write a parser.
def parse(self, *args, **kwargs):
exit_code = super().parse(*args, **kwargs)
if exit_code.status != 0:
return exit_code

retrieved = self.node.outputs.retrieved
filename_stdout = self.node.get_option('scheduler_stdout')

if filename_stdout is None:
self.report('could not determine `stdout` filename because `scheduler_stdout` option was not set.')
else:
try:
scheduler_stdout = retrieved.base.repository.get_object_content(filename_stdout, mode='r')
if self._SUCCESS_PRINT in scheduler_stdout:
return ExitCode(0)
except FileNotFoundError:
self.report(f'could not parse scheduler output: the `{filename_stdout}` file is missing')

return self.exit_codes.ERROR_UNSPECIFIED
11 changes: 11 additions & 0 deletions src/aiida_dftk/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ def parse(self, **kwargs):
else:
return self.exit_codes.ERROR_POSTSCF_OUT_OF_WALLTIME

# Check error file
try:
errors_log = self.retrieved.base.repository.get_object_content(DftkCalculation.LOGFILE)
if "Imports succeeded" not in errors_log:
return self.exit_codes.ERROR_PACKAGE_IMPORT_FAILED
except FileNotFoundError:
return self.exit_codes.ERROR_PACKAGE_IMPORT_FAILED

if "Finished successfully" not in errors_log:
return self.exit_codes.ERROR_UNSPECIFIED

# Check retrieve list to know which files the calculation is expected to have produced.
try:
self._parse_optional_result(
Expand Down
46 changes: 43 additions & 3 deletions src/aiida_dftk/workflows/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@
"""Base DFTK WorkChain implementation."""
from aiida import orm
from aiida.common import AttributeDict, exceptions
from aiida.engine import BaseRestartWorkChain, ProcessHandlerReport, process_handler, while_
from aiida.engine import BaseRestartWorkChain, ProcessHandlerReport, process_handler, while_, if_, ToContext
from aiida.plugins import CalculationFactory

from aiida_dftk.utils import create_kpoints_from_distance, validate_and_prepare_pseudos_inputs

DftkCalculation = CalculationFactory('dftk')
PrecompileCalculation = CalculationFactory('dftk.precompile')


class DftkBaseWorkChain(BaseRestartWorkChain):
"""Base DFTK Workchain to perform a DFT calculation. Validates parameters and restart."""

_process_class = DftkCalculation

_attempted_precompilation_extra = "attempted_precompilation"

@classmethod
def define(cls, spec):
"""Define the process specification."""
Expand Down Expand Up @@ -44,7 +47,12 @@ def define(cls, spec):
while_(cls.should_run_process)(
mfherbst marked this conversation as resolved.
Show resolved Hide resolved
cls.prepare_process,
cls.run_process,
cls.inspect_process,
if_(cls.should_attempt_precompilation)(
cls.run_precompilation,
cls.inspect_precompilation,
).else_(
cls.inspect_process,
)
),
cls.results,
)
Expand All @@ -59,6 +67,8 @@ def define(cls, spec):
message='Neither the `options` nor `automatic_parallelization` input was specified.')
spec.exit_code(204, 'ERROR_INVALID_INPUT_RESOURCES_UNDERSPECIFIED',
message='The `metadata.options` did not specify both `resources.num_machines` and `max_wallclock_seconds`.')
spec.exit_code(300, 'ERROR_PRECOMPILATION_FAILURE',
message='Failed to precompile AiidaDFTK. Typically indicates an environment issue.')

def setup(self):
"""Call the `setup` of the `BaseRestartWorkChain` and then create the inputs dictionary in `self.ctx.inputs`.
Expand Down Expand Up @@ -146,7 +156,37 @@ def report_error_handled(self, calculation, action):
:param action: a string message with the action taken
"""
self.report(f'{calculation.process_label}<{calculation.pk}> failed with exit status {calculation.exit_status}: {calculation.exit_message}')
self.report(f'Action taken: {action}')
self.report(f'action taken: {action}')

def should_attempt_precompilation(self):
if self.node.base.extras.get(self._attempted_precompilation_extra, False):
return False
node = self.ctx.children[self.ctx.iteration - 1]
return node.exit_code == DftkCalculation.exit_codes.ERROR_PACKAGE_IMPORT_FAILED

def run_precompilation(self):
self.node.base.extras.set(self._attempted_precompilation_extra, True)
calculation = self.ctx.children[self.ctx.iteration - 1]
self.report_error_handled(calculation, 'attempting to precompile')

node = self.submit(PrecompileCalculation, inputs={
"code": calculation.inputs.code,
})
self.report(f'launching {node.process_label}<{node.pk}>')

return ToContext(precompile_job=node)

def inspect_precompilation(self):
calculation = self.ctx.precompile_job

if calculation.exit_status != 0:
self.report(f'{calculation.process_label}<{calculation.pk}> failed with exit status {calculation.exit_status}: {calculation.exit_message}')
self.report(f'the issue can be diagnosed by running `verdi process report {calculation.pk}` and checking the logs.')
self.report('aborting workchain')
return self.exit_codes.ERROR_PRECOMPILATION_FAILURE

self.report(f'{calculation.process_label}<{calculation.pk}> succeeded. Resuming workchain iterations.')
return None

@process_handler(priority=500, exit_codes=[DftkCalculation.exit_codes.ERROR_SCF_CONVERGENCE_NOT_REACHED])
def handle_scf_convergence_not_reached(self, _):
Expand Down
17 changes: 14 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
import logging
import os
from pathlib import Path
import pytest

import aiida_dftk.calculations

pytest_plugins = 'aiida.manage.tests.pytest_fixtures'

_LOGGER = logging.getLogger(__name__)
_julia_project_path = os.path.join(__file__, "..", "julia_environment")
_julia_project_path = Path(__file__).parent / "julia_environment"


def pytest_sessionstart():
"""Instantiates the test Julia environment before any test runs."""
import subprocess

with open(_julia_project_path / "Project.toml", "w") as f:
f.write(f"""
[deps]
AiidaDFTK = "26386dbc-b74b-4d9a-b75a-41d28ada84fc"

[compat]
AiidaDFTK = "{aiida_dftk.calculations._AIIDA_DFTK_VERSION_SPEC}"
""")

# Pkg.Registry.add() seems necessary for GitHub Actions
subprocess.run(['julia', f'--project={_julia_project_path}', '-e', 'using Pkg; Pkg.Registry.add(); Pkg.resolve(); Pkg.precompile();'], check=True)
subprocess.run(['julia', f'--project={_julia_project_path}', '-e', 'using Pkg; Pkg.Registry.add(); Pkg.resolve();'], check=True)


@pytest.fixture
Expand Down
3 changes: 1 addition & 2 deletions tests/julia_environment/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
LocalPreferences.toml
Manifest.toml
*
5 changes: 0 additions & 5 deletions tests/julia_environment/Project.toml

This file was deleted.

Loading