如何编写和扩展 workflows#

书写 workflows#

AiiDA 中的 workflow 是一个 process ,它调用其他 workflow 和计算,并可选择 返回 数据,因此可以编码典型科学 workflow 的逻辑。目前,有两种方法可以实现 workflow 进程:

下面我们将简要介绍如何编写这两种 workflow 类型。

备注

有关 workflow 概念以及工作函数和工作链之间区别的更多详情,请参见相应的 topics section

备注

开发 workflow 可能需要运行多个冗长的计算。可以考虑使用 enabling caching 来避免重复冗长的 workflow 步骤。

工作职能#

工作函数 是一个进程函数,它调用一个或多个计算函数,并 返回 由它调用的计算函数 创建*的数据。此外,工作函数还可以调用其他工作函数,从而允许您编写嵌套的 workflow。编写工作函数(其 provenance 已自动存储)就像编写 Python 函数并用 workfunction 装饰器装饰一样简单:

"""Basic calcfunction-based workflows for demonstration purposes."""

from aiida.engine import calcfunction, workfunction


@calcfunction
def add(x, y):
    return x + y


@calcfunction
def multiply(x, y):
    return x * y


@workfunction
def add_multiply(x, y, z):
    """Add two numbers and multiply it with a third."""
    addition = add(x, y)
    product = multiply(addition, z)
    return product

important 在此重申, workfunction -decorated add_multiply() 函数并不 创建 任何新数据 node。 add()multiply() 计算函数创建了 Int 数据 node,工作函数所做的只是 返回 multiply() 计算函数的结果。此外,计算函数和 workflow 函数都只能接受和返回数据 nodes,即 Data 类的子类实例。

工作链#

当你要运行的 workflow 比较复杂,需要较长时间才能完成时,最好编写 工作链 。在 AiiDA 中编写工作链需要创建一个继承自 WorkChain 类的类。下面是一个工作链的例子,输入三个整数,将前两个整数相乘,然后将第三个整数相加,得到最终结果:

"""Implementation of the MultiplyAddWorkChain for testing and demonstration purposes."""

from aiida.engine import ToContext, WorkChain, calcfunction
from aiida.orm import AbstractCode, Int
from aiida.plugins.factories import CalculationFactory

ArithmeticAddCalculation = CalculationFactory('core.arithmetic.add')


@calcfunction
def multiply(x, y):
    return x * y


class MultiplyAddWorkChain(WorkChain):
    """WorkChain to multiply two numbers and add a third, for testing and demonstration purposes."""

    @classmethod
    def define(cls, spec):
        """Specify inputs and outputs."""
        super().define(spec)
        spec.input('x', valid_type=Int)
        spec.input('y', valid_type=Int)
        spec.input('z', valid_type=Int)
        spec.input('code', valid_type=AbstractCode)
        spec.outline(
            cls.multiply,
            cls.add,
            cls.validate_result,
            cls.result,
        )
        spec.output('result', valid_type=Int)
        spec.exit_code(400, 'ERROR_NEGATIVE_NUMBER', message='The result is a negative number.')

    def multiply(self):
        """Multiply two integers."""
        self.ctx.product = multiply(self.inputs.x, self.inputs.y)

    def add(self):
        """Add two numbers using the `ArithmeticAddCalculation` calculation job plugin."""
        inputs = {'x': self.ctx.product, 'y': self.inputs.z, 'code': self.inputs.code}
        future = self.submit(ArithmeticAddCalculation, **inputs)

        return ToContext(addition=future)

    def validate_result(self):
        """Make sure the result is not negative."""
        result = self.ctx.addition.outputs.sum

        if result.value < 0:
            return self.exit_codes.ERROR_NEGATIVE_NUMBER

    def result(self):
        """Add the result to the outputs."""
        self.out('result', self.ctx.addition.outputs.sum)

你可以给工作链取任何有效的 Python 类名,但惯例是让它以 WorkChain 结尾,这样它所引用的内容就会一目了然。让我们逐一查看 MultiplyAddWorkChain 的方法:

@classmethod
def define(cls, spec):
    """Specify inputs and outputs."""
    super().define(spec)
    spec.input('x', valid_type=Int)
    spec.input('y', valid_type=Int)
    spec.input('z', valid_type=Int)
    spec.input('code', valid_type=AbstractCode)
    spec.outline(
        cls.multiply,
        cls.add,
        cls.validate_result,
        cls.result,
    )
    spec.output('result', valid_type=Int)
    spec.exit_code(400, 'ERROR_NEGATIVE_NUMBER', message='The result is a negative number.')

每个工作链最需要实现的 important 方法是 define() 方法。该类方法必须始终调用其父类的 define() 方法。接下来,应使用 define() 方法定义工作链的规范,这些规范包含在工作链 spec 中:

  • 输入**,使用 spec.input() 方法指定。 input() 方法的第一个参数是一个字符串,用于指定输入的标签,例如 'x' 。通过 valid_type 关键字参数,可以指定所需的 node 输入类型。其他关键字参数允许开发人员为输入设置默认值,或指示输入不应存储在数据库中,详情请参见 the process topics section

  • 使用 spec.outline() 方法指定 workflow 的 轮廓 或逻辑。workflow 的大纲由 WorkChain 类的方法构建而成。对于 MultiplyAddWorkChain ,大纲是一个简单的线性步骤序列,但也可以直接在大纲中加入实际逻辑,以定义更复杂的 workflow。更多详情,请参阅 work chain outline section

  • 输出**,使用 spec.output() 方法指定。这种方法的用法与 input() 方法非常相似。

  • 工作链的 退出代码 ,使用 spec.exit_code() 方法指定。退出代码用于向用户明确传达工作链的已知故障模式。第一个和第二个参数定义了故障情况下工作链的 exit_status ( 400 ),以及开发人员可用于引用退出代码的字符串 ( ERROR_NEGATIVE_NUMBER )。可以使用 message 关键字参数提供描述性的退出信息。对于 MultiplyAddWorkChain ,我们要求最终结果不是负数,这将在大纲的 validate_result 步骤中进行检查。

备注

有关 define() 方法和工艺规范的更多信息,请参见 corresponding section in the topics

multiply 方法是 MultiplyAddWorkChain 工作链大纲的第一步。

def multiply(self):
    """Multiply two integers."""
    self.ctx.product = multiply(self.inputs.x, self.inputs.y)

这一步只需在工作链的 x and y 输入 上运行计算函数 multiply() 。为了存储该函数的结果并在大纲的下一步中使用,需要使用 self.ctx 将其添加到工作链的 上下文 中。

def add(self):
    """Add two numbers using the `ArithmeticAddCalculation` calculation job plugin."""
    inputs = {'x': self.ctx.product, 'y': self.inputs.z, 'code': self.inputs.code}
    future = self.submit(ArithmeticAddCalculation, **inputs)

    return ToContext(addition=future)

add() 方法是工作链大纲中的第二步。由于这一步使用 ArithmeticAddCalculation 计算作业,我们首先要在字典中设置 CalcJob 的输入。接下来,在向守护进程提交该计算作业时,important 可以通过 self.submit() 使用工作链实例中的提交方法。由于加法的结果只有在计算任务完成后才能获得,因此 submit() 方法会返回 未来 ArithmeticAddCalculation 进程的 CalcJobNode 。为了告诉工作链在继续执行 workflow 之前等待该进程结束,我们返回了 ToContext 类,并通过字典指定未来计算作业 node 应分配给 'addition' 上下文键。

警告

切勿在 WorkChain 中使用全局 submit() 函数向守护进程提交计算。这样做会在运行时引发异常。有关详细信息,请参阅 topics section on work chains

备注

您也可以通过传递未来进程作为关键字参数(如 ToContext(addition=calcjob_node) )来初始化 ToContext 实例,而不是传递字典。有关 ToContext 类的更多信息,请参阅 the topics section on submitting sub processes

def validate_result(self):
    """Make sure the result is not negative."""
    result = self.ctx.addition.outputs.sum

    if result.value < 0:
        return self.exit_codes.ERROR_NEGATIVE_NUMBER

ArithmeticAddCalculation 计算工作完成后,工作链的下一步是验证结果,即验证结果不是负数。从上下文中提取 addition node 后,我们从 ArithmeticAddCalculation 输出中提取 sum node 并将其存储到 result 变量中。如果 Int node 的值为负数,则返回 define() 方法中定义的 ERROR_NEGATIVE_NUMBER 退出代码。请注意,一旦在大纲的任何步骤中返回退出代码,工作链将被终止,不会再执行其他步骤。

def result(self):
    """Add the result to the outputs."""
    self.out('result', self.ctx.addition.outputs.sum)

大纲的最后一步是使用 self.out() 方法将结果传递到工作链的输出端。第一个参数 ( 'result' ) 指定了输出的标签,与 define() 方法中提供给规范的标签相对应。第二个参数是从上下文中以 'addition' 键存储的 Int node 中提取的工作链结果。

有关 workflow 及其用法的更全面讨论,请阅读 the corresponding topics section

扩展 workflows#

在设计 workflow 时,很多情况下您都希望重复使用现有流程。本节将介绍如何通过将 workflow 包在其他流程中或将它们连接在一起来扩展 workflow。

举例来说,如果您想扩展 MultiplyAddWorkChain 的功能,再增加一步分析,检查结果是否为偶数。最后一步可以写成简单的 calcfunction

@calcfunction
def is_even(number):
    """Check if a number is even."""
    return Bool(number % 2 == 0)

我们只需在 MultiplyAddWorkChain 的基础上编写一个新的 workflow,在大纲中增加一个运行 is_even 计算函数的步骤即可。然而,这会导致大量代码重复,而且由多个工作链组成的较长的 workflow 处理起来会非常麻烦(参见下面的下拉面板)。

BadMultiplyAddIsEvenWorkChain
class BadMultiplyAddIsEvenWorkChain(WorkChain):
    """WorkChain to multiply two numbers and add a third, for testing and demonstration purposes."""

    @classmethod
    def define(cls, spec):
        """Specify inputs and outputs."""
        super().define(spec)
        spec.input('x', valid_type=Int)
        spec.input('y', valid_type=Int)
        spec.input('z', valid_type=Int)
        spec.input('code', valid_type=AbstractCode)
        spec.outline(
            cls.multiply,
            cls.add,
            cls.validate_result,
            cls.is_even,
        )
        spec.output('is_even', valid_type=Bool)
        spec.exit_code(400, 'ERROR_NEGATIVE_NUMBER', message='The result is a negative number.')

    def multiply(self):
        """Multiply two integers."""
        self.ctx.product = multiply(self.inputs.x, self.inputs.y)

    def add(self):
        """Add two numbers using the `ArithmeticAddCalculation` calculation job plugin."""
        inputs = {'x': self.ctx.product, 'y': self.inputs.z, 'code': self.inputs.code}
        future = self.submit(ArithmeticAddCalculation, **inputs)

        return ToContext(addition=future)

    def validate_result(self):
        """Make sure the result is not negative."""
        result = self.ctx.addition.outputs.sum

        if result.value < 0:
            return self.exit_codes.ERROR_NEGATIVE_NUMBER

    def is_even(self):
        """Check if the result is even."""
        result_is_even = is_even(self.ctx.addition.outputs.sum)

        self.out('is_even', result_is_even)

备注

我们删除了大纲中的 result 步,也删除了 result 输出。在这个工作链中,我们假定目前只对结果是否偶数感兴趣。

我们只需在新工作链的一个步骤中提交 MultiplyAddWorkChain ,然后在第二个步骤中调用 is_even ,就可以避免代码重复:

class BetterMultiplyAddIsEvenWorkChain(WorkChain):
    """WorkChain to multiply two numbers and add a third, for testing and demonstration purposes."""

    @classmethod
    def define(cls, spec):
        """Specify inputs and outputs."""
        super().define(spec)
        spec.input('x', valid_type=Int)
        spec.input('y', valid_type=Int)
        spec.input('z', valid_type=Int)
        spec.input('code', valid_type=AbstractCode)
        spec.outline(
            cls.multiply_add,
            cls.is_even,
        )
        spec.output('is_even', valid_type=Bool)

    def multiply_add(self):
        """Multiply two integers and add a third."""
        inputs = {'x': self.inputs.x, 'y': self.inputs.y, 'z': self.inputs.z, 'code': self.inputs.code}
        future = self.submit(MultiplyAddWorkChain, **inputs)

        return ToContext(multi_addition=future)

    def is_even(self):
        """Check if the result is even."""
        result_is_even = is_even(self.ctx.multi_addition.outputs.result)

        self.out('is_even', result_is_even)

这已经简化了扩展工作链,避免了在大纲中重复 MultiplyAddWorkChain 的步骤。但是,我们仍然需要复制 MultiplyAddWorkChain 的所有输入定义,并在将其传递给 self.submit 方法之前手动从输入中提取出来。幸运的是,有一种更好的方法可以 暴露 工作链子流程的输入和输出。

公开输入和输出#

在许多情况下,工作链可以方便地公开其所封装子进程的输入,这样用户就可以直接指定这些输入,同时还可以公开作为父工作链结果之一的某些输出。在上一节介绍的简单示例中,只需复制粘贴子进程 MultiplyAddWorkChain 的输入和输出端口定义就不会太麻烦。但是,一旦开始使用更多的输入来包装流程,这种方法很快就会变得繁琐且容易出错。

为防止输入和输出规范的复制粘贴, ProcessSpec 类提供了 expose_inputs()expose_outputs() 方法。为一个特定的 Process 类调用 expose_inputs() 会自动将该类的输入复制到流程规范的输入命名空间中:

@classmethod
def define(cls, spec):
    """Specify inputs and outputs."""
    super().define(spec)
    spec.expose_inputs(MultiplyAddWorkChain)  # Expose the inputs instead of copying their definition
    spec.outline(
        cls.multiply_add,
        cls.is_even,
    )
    spec.output('is_even', valid_type=Bool)

备注

公开功能不仅限于 WorkChain 实现,而且适用于所有进程类,例如 CalcJob 插件。它甚至还适用于流程函数(即 calcfunctionsworkfunctions ),因为在引擎盖下会为它们即时生成一个实际的 Process 类。对于流程函数,如果可以从提供的函数类型提示和文档中推断出 valid_typehelp 属性,那么暴露输入的 valid_typehelp 属性甚至会被保留下来(详见 type validationdocstring parsing )。

请注意,命名空间中已存在的任何输入都将被覆盖。为避免这种情况,该方法接受 namespace 参数,这将导致输入被复制到该命名空间,而不是顶层命名空间。这对暴露输入特别有用,因为**进程都有 metadata 输入。如果不在命名空间中公开输入,那么公开类的 metadata 输入端口将覆盖主机的输入端口,这通常是不可取的。让我们将 MultiplyAddWorkChain 的输入复制到 multiply_add 命名空间:

@classmethod
def define(cls, spec):
    """Specify inputs and outputs."""
    super().define(spec)
    spec.expose_inputs(MultiplyAddWorkChain, namespace='multiply_add')
    spec.outline(
        cls.multiply_add,
        cls.is_even,
    )
    spec.output('is_even', valid_type=Bool)

这样就能以非常高效的方式公开封装进程类的端口规范。要轻松获取传递给进程的输入,可以使用 exposed_inputs() 方法。请注意方法名称的过去式。该方法将一个进程类和一个可选的命名空间作为参数,并将返回进程启动时传入该命名空间的输入。有了这个实用程序,我们就可以简化大纲中的 multiply_add 步骤:

def multiply_add(self):
    """Multiply two integers and add a third."""
    future = self.submit(MultiplyAddWorkChain, **self.exposed_inputs(MultiplyAddWorkChain, 'multiply_add'))
    return ToContext(multi_addition=future)

这样,我们就不必手动从 self.inputs 中找出所有单独的输入,而只需调用这个方法,从而节省了时间和代码行数。最终的 MultiplyAddIsEvenWorkChain 可以在下面的下拉面板中找到。

MultiplyAddIsEvenWorkChain
class MultiplyAddIsEvenWorkChain(WorkChain):
    """WorkChain to multiply two numbers and add a third, for testing and demonstration purposes."""

    @classmethod
    def define(cls, spec):
        """Specify inputs and outputs."""
        super().define(spec)
        spec.expose_inputs(MultiplyAddWorkChain, namespace='multiply_add')
        spec.outline(
            cls.multiply_add,
            cls.is_even,
        )
        spec.output('is_even', valid_type=Bool)

    def multiply_add(self):
        """Multiply two integers and add a third."""
        future = self.submit(MultiplyAddWorkChain, **self.exposed_inputs(MultiplyAddWorkChain, 'multiply_add'))
        return ToContext(multi_addition=future)

    def is_even(self):
        """Check if the result is even."""
        result_is_even = is_even(self.ctx.multi_addition.outputs.result)

        self.out('is_even', result_is_even)

在提交或运行使用命名空间输入的工作链(上例中为 multiply_add )时,important 应在提供输入时使用命名空间:

add_code = load_code(label='add')
inputs = {
    'multiply_add': {'x': Int(1), 'y': Int(2), 'z': Int(3), 'code': add_code}
}

workchain_node = submit(MultiplyAddWorkChain, **inputs)

运行 MultiplyAddIsEvenWorkChain 后,可以使用 verdi process status 命令查看工作链调用进程的分层概览:

$ verdi process status 164
MultiplyAddIsEvenWorkChain<164> Finished [0] [1:is_even]
    ├── MultiplyAddWorkChain<165> Finished [0] [3:result]
    │   ├── multiply<166> Finished [0]
    │   └── ArithmeticAddCalculation<168> Finished [0]
    └── is_even<172> Finished [0]

请注意,该命令还会递归显示 MultiplyAddIsEvenWorkChain 工作链子进程调用的进程。

如前所述,还可以使用 expose_outputs() 方法公开 MultiplyAddWorkChain 的输出。假设我们要将 MultiplyAddWorkChainresult 添加为扩展工作链的输出之一:

@classmethod
def define(cls, spec):
    """Specify inputs and outputs."""
    super().define(spec)
    spec.expose_inputs(MultiplyAddWorkChain, namespace='multiply_add')
    spec.outline(
        cls.multiply_add,
        cls.is_even,
    )
    spec.expose_outputs(MultiplyAddWorkChain)
    spec.output('is_even', valid_type=Bool)

由于并非所有进程类都共享一个输出端口,因此在公开输出时使用 namespace 参数并不那么重要。不过,请注意不要覆盖父工作链的输出,以防它们的输出端口名称相同。我们仍然需要将 MultiplyAddWorkChainresult 传递给父工作链的输出。例如,我们可以在 is_even 步骤中使用 out() 方法来实现这一点:

def is_even(self):
    """Check if the result is even."""
    result = self.ctx.multi_addition.outputs.result
    result_is_even = is_even(result)

    self.out('result', result)
    self.out('is_even', result_is_even)

如果我们想将单个输出传递给父工作链,这种方法就很好用,但如果传递多个输出,这种方法又会变得繁琐且容易出错。我们可以将 exposed_outputs() 方法与 out_many() 方法结合使用:

def is_even(self):
    """Check if the result is even."""
    result_is_even = is_even(self.ctx.multi_addition.outputs.result)

    self.out_many(self.exposed_outputs(self.ctx.multi_addition, MultiplyAddWorkChain))
    self.out('is_even', result_is_even)

The exposed_outputs() method returns a dictionary of the exposed outputs of the MultiplyAddWorkChain, extracted from the workchain node stored in the multi_addition key of the context. The out_many() method takes this dictionary and assigns its values to the output ports with names equal to the corresponding keys.

重要

除了避免代码重复和错误外,使用公开输入和输出的方法还有一个好处,那就是当子工作链的输入或输出发生变化时,我们不必调整父工作链。这使得代码更易于维护。