###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Functionality for dumping of ProcessNodes."""
from __future__ import annotations
import logging
from pathlib import Path
from types import SimpleNamespace
from typing import List
import yaml
from aiida.common import LinkType
from aiida.common.exceptions import NotExistentAttributeError
from aiida.orm import (
CalcFunctionNode,
CalcJobNode,
CalculationNode,
LinkManager,
ProcessNode,
WorkChainNode,
WorkflowNode,
WorkFunctionNode,
)
from aiida.orm.utils import LinkTriple
LOGGER = logging.getLogger(__name__)
[docs]
class ProcessDumper:
[docs]
def __init__(
self,
include_inputs: bool = True,
include_outputs: bool = False,
include_attributes: bool = True,
include_extras: bool = True,
overwrite: bool = False,
flat: bool = False,
) -> None:
self.include_inputs = include_inputs
self.include_outputs = include_outputs
self.include_attributes = include_attributes
self.include_extras = include_extras
self.overwrite = overwrite
self.flat = flat
[docs]
@staticmethod
def _generate_default_dump_path(process_node: ProcessNode) -> Path:
"""Simple helper function to generate the default parent-dumping directory if none given.
This function is not called for the recursive sub-calls of `_dump_calculation` as it just creates the default
parent folder for the dumping, if no name is given.
:param process_node: The `ProcessNode` for which the directory is created.
:return: The absolute default parent dump path.
"""
pk = process_node.pk
try:
return Path(f'dump-{process_node.process_label}-{pk}')
except AttributeError:
# This case came up during testing, not sure how relevant it actually is
return Path(f'dump-{process_node.process_type}-{pk}')
[docs]
@staticmethod
def _generate_readme(process_node: ProcessNode, output_path: Path) -> None:
"""Generate README.md file in main dumping directory.
:param process_node: `CalculationNode` or `WorkflowNode`.
:param output_path: Output path for dumping.
"""
import textwrap
from aiida.cmdline.utils.ascii_vis import format_call_graph
from aiida.cmdline.utils.common import (
get_calcjob_report,
get_node_info,
get_process_function_report,
get_workchain_report,
)
pk = process_node.pk
_readme_string = textwrap.dedent(
f"""\
This directory contains the files involved in the calculation/workflow
`{process_node.process_label} <{pk}>` run with AiiDA.
Child calculations/workflows (also called `CalcJob`s/`CalcFunction`s and `WorkChain`s/`WorkFunction`s in AiiDA
jargon) run by the parent workflow are contained in the directory tree as sub-folders and are sorted by their
creation time. The directory tree thus mirrors the logical execution of the workflow, which can also be queried
by running `verdi process status {pk}` on the command line.
By default, input and output files of each calculation can be found in the corresponding "inputs" and "outputs"
directories (the former also contains the hidden ".aiida" folder with machine-readable job execution settings).
Additional input and output files (depending on the type of calculation) are placed in the "node_inputs" and
"node_outputs", respectively.
Lastly, every folder also contains a hidden, human-readable `.aiida_node_metadata.yaml` file with the relevant
AiiDA node data for further inspection."""
)
# `verdi process status`
process_status = format_call_graph(calc_node=process_node, max_depth=None, call_link_label=True)
_readme_string += f'\n\n\nOutput of `verdi process status {pk}`:\n\n```shell\n{process_status}\n```'
# `verdi process report`
# Copied over from `cmd_process`
if isinstance(process_node, CalcJobNode):
process_report = get_calcjob_report(process_node)
elif isinstance(process_node, WorkChainNode):
process_report = get_workchain_report(process_node, levelname='REPORT', indent_size=2, max_depth=None)
elif isinstance(process_node, (CalcFunctionNode, WorkFunctionNode)):
process_report = get_process_function_report(process_node)
else:
process_report = f'Nothing to show for node type {process_node.__class__}'
_readme_string += f'\n\n\nOutput of `verdi process report {pk}`:\n\n```shell\n{process_report}\n```'
# `verdi process show`?
process_show = get_node_info(node=process_node)
_readme_string += f'\n\n\nOutput of `verdi process show {pk}`:\n\n```shell\n{process_show}\n```'
(output_path / 'README.md').write_text(_readme_string)
[docs]
@staticmethod
def _generate_child_node_label(index: int, link_triple: LinkTriple) -> str:
"""Small helper function to generate and clean directory label for child nodes during recursion.
:param index: Index assigned to step at current level of recursion.
:param link_triple: `LinkTriple` of `ProcessNode` explored during recursion.
:return: Chlild node label during recursion.
"""
node = link_triple.node
link_label = link_triple.link_label
# Generate directories with naming scheme akin to `verdi process status`
label_list = [f'{index:02d}', link_label]
try:
process_label = node.process_label
if process_label is not None and process_label != link_label:
label_list += [process_label]
except AttributeError:
process_type = node.process_type
if process_type is not None and process_type != link_label:
label_list += [process_type]
node_label = '-'.join(label_list)
# `CALL-` as part of the link labels also for MultiplyAddWorkChain -> Seems general enough, so remove
node_label = node_label.replace('CALL-', '')
node_label = node_label.replace('None-', '')
return node_label
[docs]
def dump(
self,
process_node: ProcessNode,
output_path: Path | None,
io_dump_paths: List[str | Path] | None = None,
) -> Path:
"""Dumps all data involved in a `ProcessNode`, including its outgoing links.
Note that if an outgoing link is a `WorkflowNode`, the function recursively calls itself, while files are
only actually created when a `CalculationNode` is reached.
:param process_node: The parent `ProcessNode` node to be dumped.
:param output_path: The output path where the directory tree will be created.
:param io_dump_paths: Subdirectories created for each `CalculationNode`.
Default: ['inputs', 'outputs', 'node_inputs', 'node_outputs']
"""
if output_path is None:
output_path = self._generate_default_dump_path(process_node=process_node)
self._validate_make_dump_path(validate_path=output_path)
if isinstance(process_node, CalculationNode):
self._dump_calculation(
calculation_node=process_node,
output_path=output_path,
io_dump_paths=io_dump_paths,
)
elif isinstance(process_node, WorkflowNode):
self._dump_workflow(
workflow_node=process_node,
output_path=output_path,
io_dump_paths=io_dump_paths,
)
self._generate_readme(process_node=process_node, output_path=output_path)
return output_path
[docs]
def _dump_workflow(
self, workflow_node: WorkflowNode, output_path: Path, io_dump_paths: List[str | Path] | None = None
) -> None:
"""Recursive function to traverse a `WorkflowNode` and dump its `CalculationNode` s.
:param workflow_node: `WorkflowNode` to be traversed. Will be updated during recursion.
:param output_path: Dumping parent directory. Will be updated during recursion.
:param io_dump_paths: Custom subdirectories for `CalculationNode` s, defaults to None
"""
self._validate_make_dump_path(validate_path=output_path)
self._dump_node_yaml(process_node=workflow_node, output_path=output_path)
called_links = workflow_node.base.links.get_outgoing(link_type=(LinkType.CALL_CALC, LinkType.CALL_WORK)).all()
called_links = sorted(called_links, key=lambda link_triple: link_triple.node.ctime)
for index, link_triple in enumerate(called_links, start=1):
child_node = link_triple.node
child_label = self._generate_child_node_label(index=index, link_triple=link_triple)
child_output_path = output_path.resolve() / child_label
# Recursive function call for `WorkFlowNode`
if isinstance(child_node, WorkflowNode):
self._dump_workflow(
workflow_node=child_node,
output_path=child_output_path,
io_dump_paths=io_dump_paths,
)
# Once a `CalculationNode` as child reached, dump it
elif isinstance(child_node, CalculationNode):
self._dump_calculation(
calculation_node=child_node,
output_path=child_output_path,
io_dump_paths=io_dump_paths,
)
[docs]
def _dump_calculation(
self,
calculation_node: CalculationNode,
output_path: Path,
io_dump_paths: List[str | Path] | None = None,
) -> None:
"""Dump the contents of a `CalculationNode` to a specified output path.
:param calculation_node: The `CalculationNode` to be dumped.
:param output_path: The path where the files will be dumped.
:param io_dump_paths: Subdirectories created for the `CalculationNode`.
Default: ['inputs', 'outputs', 'node_inputs', 'node_outputs']
"""
self._validate_make_dump_path(validate_path=output_path)
self._dump_node_yaml(process_node=calculation_node, output_path=output_path)
io_dump_mapping = self._generate_calculation_io_mapping(io_dump_paths=io_dump_paths)
# Dump the repository contents of the node
calculation_node.base.repository.copy_tree(output_path.resolve() / io_dump_mapping.repository)
# Dump the repository contents of `outputs.retrieved`
try:
calculation_node.outputs.retrieved.base.repository.copy_tree(
output_path.resolve() / io_dump_mapping.retrieved
)
except NotExistentAttributeError:
pass
# Dump the node_inputs
if self.include_inputs:
input_links = calculation_node.base.links.get_incoming(link_type=LinkType.INPUT_CALC)
self._dump_calculation_io(parent_path=output_path / io_dump_mapping.inputs, link_triples=input_links)
# Dump the node_outputs apart from `retrieved`
if self.include_outputs:
output_links = list(calculation_node.base.links.get_outgoing(link_type=LinkType.CREATE))
output_links = [output_link for output_link in output_links if output_link.link_label != 'retrieved']
self._dump_calculation_io(
parent_path=output_path / io_dump_mapping.outputs,
link_triples=output_links,
)
[docs]
def _dump_calculation_io(self, parent_path: Path, link_triples: LinkManager | List[LinkTriple]):
"""Small helper function to dump linked input/output nodes of a `CalculationNode`.
:param parent_path: Parent directory for dumping the linked node contents.
:param link_triples: List of link triples.
"""
for link_triple in link_triples:
link_label = link_triple.link_label
if not self.flat:
linked_node_path = parent_path / Path(*link_label.split('__'))
else:
# Don't use link_label at all -> But, relative path inside FolderData is retained
linked_node_path = parent_path
link_triple.node.base.repository.copy_tree(linked_node_path.resolve())
[docs]
def _validate_make_dump_path(self, validate_path: Path, safeguard_file: str = '.aiida_node_metadata.yaml') -> Path:
"""Create default dumping directory for a given process node and return it as absolute path.
:param validate_path: Path to validate for dumping.
:param safeguard_file: Dumping-specific file to avoid deleting wrong directory.
Default: `.aiida_node_metadata.yaml`
:return: The absolute created dump path.
"""
import shutil
if validate_path.is_dir():
# Existing, empty directory -> OK
if not any(validate_path.iterdir()):
pass
# Existing, non-empty directory and overwrite False -> FileExistsError
elif not self.overwrite:
raise FileExistsError(f'Path `{validate_path}` already exists and overwrite set to False.')
# Existing, non-empty directory and overwrite True
# Check for safeguard file ('.aiida_node_metadata.yaml') for safety
# If present -> Remove directory
elif (validate_path / safeguard_file).is_file():
LOGGER.info(f'Overwrite set to true, will overwrite directory `{validate_path}`.')
shutil.rmtree(validate_path)
# Existing and non-empty directory and overwrite True
# Check for safeguard file ('.aiida_node_metadata.yaml') for safety
# If absent -> Don't remove directory as to not accidentally remove a wrong one
else:
raise Exception(
f"Path `{validate_path}` already exists and doesn't contain safeguard file {safeguard_file}."
f' Not removing for safety reasons.'
)
# Not included in if-else as to avoid having to repeat the `mkdir` call.
# `exist_ok=True` as checks implemented above
validate_path.mkdir(exist_ok=True, parents=True)
return validate_path.resolve()
[docs]
def _generate_calculation_io_mapping(self, io_dump_paths: List[str | Path] | None = None) -> SimpleNamespace:
"""Helper function to generate mapping for entities dumped for each `CalculationNode`.
This is to avoid exposing AiiDA terminology, like `repository` to the user, while keeping track of which
entities should be dumped into which directory, and allowing for alternative directory names.
:param io_dump_paths: Subdirectories created for the `CalculationNode`.
Default: ['inputs', 'outputs', 'node_inputs', 'node_outputs']
:return: SimpleNamespace mapping.
"""
aiida_entities_to_dump = ['repository', 'retrieved', 'inputs', 'outputs']
default_calculation_io_dump_paths = ['inputs', 'outputs', 'node_inputs', 'node_outputs']
empty_calculation_io_dump_paths = [''] * 4
if self.flat and io_dump_paths is None:
LOGGER.info(
'Flat set to True and no `io_dump_paths`. Dumping in a flat directory, files might be overwritten.'
)
return SimpleNamespace(**dict(zip(aiida_entities_to_dump, empty_calculation_io_dump_paths)))
elif not self.flat and io_dump_paths is None:
LOGGER.info(
'Flat set to False but no `io_dump_paths` provided. '
+ f'Will use the defaults {default_calculation_io_dump_paths}.'
)
return SimpleNamespace(**dict(zip(aiida_entities_to_dump, default_calculation_io_dump_paths)))
elif self.flat and io_dump_paths is not None:
LOGGER.info('Flat set to True but `io_dump_paths` provided. These will be used, but `inputs` not nested.')
return SimpleNamespace(**dict(zip(aiida_entities_to_dump, io_dump_paths)))
else:
LOGGER.info(
'Flat set to False but no `io_dump_paths` provided. These will be used, but `node_inputs` flattened.'
)
return SimpleNamespace(**dict(zip(aiida_entities_to_dump, io_dump_paths))) # type: ignore[arg-type]
[docs]
def _dump_node_yaml(
self,
process_node: ProcessNode,
output_path: Path,
output_filename: str = '.aiida_node_metadata.yaml',
) -> None:
"""Dump the selected `ProcessNode` properties, attributes, and extras to a YAML file.
:param process_node: The `ProcessNode` to dump.
:param output_path: The path to the directory where the YAML file will be saved.
:param output_filename: The name of the output YAML file. Defaults to `.aiida_node_metadata.yaml`.
"""
node_properties = [
'label',
'description',
'pk',
'uuid',
'ctime',
'mtime',
'node_type',
'process_type',
'is_finished_ok',
]
user_properties = ('first_name', 'last_name', 'email', 'institution')
computer_properties = ('label', 'hostname', 'scheduler_type', 'transport_type')
node_dict = {}
metadata_dict = {}
# Add actual node `@property`s to dictionary
for metadata_property in node_properties:
metadata_dict[metadata_property] = getattr(process_node, metadata_property)
node_dict['Node data'] = metadata_dict
# Add user data
try:
node_dbuser = process_node.user
user_dict = {}
for user_property in user_properties:
user_dict[user_property] = getattr(node_dbuser, user_property)
node_dict['User data'] = user_dict
except AttributeError:
pass
# Add computer data
try:
node_dbcomputer = process_node.computer
computer_dict = {}
for computer_property in computer_properties:
computer_dict[computer_property] = getattr(node_dbcomputer, computer_property)
node_dict['Computer data'] = computer_dict
except AttributeError:
pass
# Add node attributes
if self.include_attributes:
node_attributes = process_node.base.attributes.all
node_dict['Node attributes'] = node_attributes
# Add node extras
if self.include_extras:
node_extras = process_node.base.extras.all
if node_extras:
node_dict['Node extras'] = node_extras
output_file = output_path.resolve() / output_filename
with open(output_file, 'w') as handle:
yaml.dump(node_dict, handle, sort_keys=False)