Source code for aiida.work.util

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################

import importlib
from threading import local
from aiida.orm.calculation import Calculation
from aiida.common.links import LinkType
from aiida.orm.data.frozendict import FrozenDict


# The name of the attribute to store the label of a process in a node with.
PROCESS_LABEL_ATTR = '_process_label'


[docs]class ProcessStack(object): """ Keep track of the per-thread call stack of processes. """ # Use thread-local storage for the stack _thread_local = local() @classmethod
[docs] def get_active_process_id(cls): """ Get the pid of the process at the top of the stack :return: The pid """ return cls.top().pid
@classmethod
[docs] def get_active_process_calc_node(cls): """ Get the calculation node of the process at the top of the stack :return: The calculation node :rtype: :class:`aiida.orm.implementation.general.calculation.job.AbstractJobCalculation` """ return cls.top().calc
@classmethod def top(cls): return cls.stack()[-1] @classmethod def stack(cls): try: return cls._thread_local.wf_stack except AttributeError: cls._thread_local.wf_stack = [] return cls._thread_local.wf_stack @classmethod def pids(cls): try: return cls._thread_local.pids_stack except AttributeError: cls._thread_local.pids_stack = [] return cls._thread_local.pids_stack @classmethod def push(cls, process): try: process._parent = cls.top() except IndexError: process._parent = None cls.stack().append(process) cls.pids().append(process.pid) @classmethod
[docs] def pop(cls, process=None, pid=None): """ Pop a process from the stack. To make sure the stack is not corrupted the process instance or pid of the calling process should be supplied so we can verify that is really is top of the stack. :param process: The process instance :param pid: The process id. """ assert process is not None or pid is not None if process is not None: assert process is cls.top(),\ "Can't pop a process that is not top of the stack" elif pid is not None: assert pid == cls.pids()[-1], \ "Can't pop a process that is not top of the stack" else: raise ValueError("Must supply process or pid") process = cls.stack().pop() process._parent = None cls.pids().pop()
def __init__(self): raise NotImplementedError("Can't instantiate the ProcessStack")
[docs]def load_class(classstring): """ Load a class from a string """ class_data = classstring.split(".") module_path = ".".join(class_data[:-1]) class_name = class_data[-1] module = importlib.import_module(module_path) # Finally, we retrieve the Class return getattr(module, class_name)
def is_workfunction(func): try: return func._is_workfunction except AttributeError: return False
[docs]def get_or_create_output_group(calculation): """ For a given Calculation, get or create a new frozendict Data node that has as its values all output Data nodes of the Calculation. :param calculation: Calculation """ if not isinstance(calculation, Calculation): raise TypeError("Can only create output groups for type Calculation") d = calculation.get_outputs_dict(link_type=(LinkType.CREATE)) d.update(calculation.get_outputs_dict(link_type=(LinkType.RETURN))) return FrozenDict(dict=d)