# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
from aiida.backends.testbase import AiidaTestCase
from aiida.backends.utils import get_workflow_list
from aiida.common.datastructures import wf_states
from aiida.orm import User
from aiida.workflows.test import WFTestEmpty
from aiida.orm.implementation import get_workflow_info
from aiida.workflows.test import WFTestSimpleWithSubWF
[docs]class TestWorkflowBasic(AiidaTestCase):
"""
These tests check the basic features of workflows.
Now only the load_workflow function is tested.
"""
[docs] def test_load_workflows(self):
"""
Test for load_node() function.
"""
from aiida.orm import load_workflow
a = WFTestEmpty()
a.store()
self.assertEquals(a.pk, load_workflow(wf_id=a.pk).pk)
self.assertEquals(a.pk, load_workflow(wf_id=a.uuid).pk)
self.assertEquals(a.pk, load_workflow(pk=a.pk).pk)
self.assertEquals(a.pk, load_workflow(uuid=a.uuid).pk)
with self.assertRaises(ValueError):
load_workflow(wf_id=a.pk, pk=a.pk)
with self.assertRaises(ValueError):
load_workflow(pk=a.pk, uuid=a.uuid)
with self.assertRaises(ValueError):
load_workflow(pk=a.uuid)
with self.assertRaises(ValueError):
load_workflow(uuid=a.pk)
with self.assertRaises(ValueError):
load_workflow()
[docs] def test_listing_workflows(self):
"""
Test ensuring that the workflow listing works as expected.
(Listing initialized & running workflows and not listing finished
workflows or workflows with errors).
"""
# Assuming there is only one user
dbuser = User.search_for_users(email=self.user_email)[0]
# Creating a workflow & storing it
a = WFTestEmpty()
a.store()
# Setting manually the state to RUNNING.
a.set_state(wf_states.RUNNING)
# Getting all the available workflows of the current user
# and checking if we got the right one.
wfqs = get_workflow_list(all_states=True, user=dbuser)
self.assertTrue(len(wfqs) == 1, "We expect one workflow")
a_prime = wfqs[0].get_aiida_class()
self.assertEqual(a.uuid, a_prime.uuid, "The uuid is not the expected "
"one")
# We ask all the running workflows. We should get one workflow.
wfqs = get_workflow_list(all_states=True, user=dbuser)
self.assertTrue(len(wfqs) == 1, "We expect one workflow")
a_prime = wfqs[0].get_aiida_class()
self.assertEqual(a.uuid, a_prime.uuid, "The uuid is not the expected "
"one")
# We change the state of the workflow to FINISHED.
a.set_state(wf_states.FINISHED)
# Getting all the available workflows of the current user
# and checking if we got the right one.
wfqs = get_workflow_list(all_states=True, user=dbuser)
self.assertTrue(len(wfqs) == 1, "We expect one workflow")
a_prime = wfqs[0].get_aiida_class()
self.assertEqual(a.uuid, a_prime.uuid, "The uuid is not the expected "
"one")
# We ask all the running workflows. We should get zero results.
wfqs = get_workflow_list(all_states=False, user=dbuser)
self.assertTrue(len(wfqs) == 0, "We expect zero workflows")
# We change the state of the workflow to INITIALIZED.
a.set_state(wf_states.INITIALIZED)
# We ask all the running workflows. We should get one workflow.
wfqs = get_workflow_list(all_states=True, user=dbuser)
self.assertTrue(len(wfqs) == 1, "We expect one workflow")
a_prime = wfqs[0].get_aiida_class()
self.assertEqual(a.uuid, a_prime.uuid, "The uuid is not the expected "
"one")
# We change the state of the workflow to ERROR.
a.set_state(wf_states.ERROR)
# We ask all the running workflows. We should get zero results.
wfqs = get_workflow_list(all_states=False, user=dbuser)
self.assertTrue(len(wfqs) == 0, "We expect zero workflows")
[docs] def test_workflow_info(self):
"""
This test checks that the workflow info is generate without any
exceptions
:return:
"""
# Assuming there is only one user
dbuser = User.search_for_users(email=self.user_email)[0]
# Creating a simple workflow & storing it
a = WFTestEmpty()
a.store()
# Emulate the workflow list
for w in get_workflow_list(all_states=True, user=dbuser):
if not w.is_subworkflow():
get_workflow_info(w)
# Create a workflow with sub-workflows and store it
b = WFTestSimpleWithSubWF()
b.store()
# Emulate the workflow list
for w in get_workflow_list(all_states=True, user=dbuser):
if not w.is_subworkflow():
get_workflow_info(w)
# Start the first workflow and perform a workflow list
b.start()
for w in get_workflow_list(all_states=True, user=dbuser):
if not w.is_subworkflow():
get_workflow_info(w)
[docs] def test_wf_get_state(self):
"""
Simple test that checks the state of the workflows. We create two
workflows since the test order in the SQLA was influencing the value
of aiida.backends.sqlalchemy.models.workflow.DbWorkflow.state which
should be a Choice object, according to the SQLA doc. Sometimes it
was automatically converted to unicode.
Since we are interested to get a unicode from
aiida.orm.implementation.general.workflow.AbstractWorkflow#get_state
we enforce this conversion at
aiida.orm.implementation.sqlalchemy.workflow.Workflow#get_state
For more info, check issue #951
"""
# Creating two simple workflows & storing them
wf1 = WFTestEmpty()
wf1.store()
wf2 = WFTestEmpty()
wf2.store()
# Checking that the get_state doesn't throw exceptions and that
# it is a valid state
self.assertIn(wf1.get_state(), wf_states)
self.assertIn(wf2.get_state(), wf_states)
[docs] def test_wf_ctime(self):
import datetime
import pytz
# Get the current datetime (before the creation of the workflow)
dt_before = datetime.datetime.now(pytz.utc)
# Creating a simple workflow & storing it
wf = WFTestEmpty()
wf.store()
# Get the current datetime (after the creation of the workflow)
dt_after = datetime.datetime.now(pytz.utc)
self.assertLessEqual(dt_before, wf.ctime, "The workflow doesn't have"
"a valid creation time")
self.assertGreaterEqual(dt_after, wf.ctime, "The workflow doesn't "
"have a valid creation "
"time")
[docs] def test_failing_calc_in_wf(self):
"""
This test checks that a workflow (but also a workflow with
sub-workflows) that has an exception at one of its steps stops
properly and it is not left as RUNNING.
"""
from aiida.daemon.workflowmanager import execute_steps
from aiida.workflows.test import (FailingWFTestSimple,
FailingWFTestSimpleWithSubWF)
try:
# Testing the error propagation of a simple workflow
wf = FailingWFTestSimple()
wf.store()
step_no = 0
wf.start()
while wf.is_running():
execute_steps()
step_no += 1
self.assertLess(step_no, 5, "This workflow should have stopped "
"since it is failing")
# Testing the error propagation of a workflow with subworkflows
wf = FailingWFTestSimpleWithSubWF()
wf.store()
step_no = 0
wf.start()
while wf.is_running():
execute_steps()
step_no += 1
self.assertLess(step_no, 5, "This workflow should have stopped "
"since it is failing")
finally:
pass
[docs] def test_result_parameter_name_colision(self):
"""
This test checks that the the workflow parameters and results do not
collide. This was a problem in SQLA (Issue #960) but a test for both
backends is added (for completeness).
"""
# Creating a simple workflow & storing it
wf = WFTestEmpty()
wf.store()
# Set some parameters
params = {'band_calculation_set': 2,
'codename': 'pw-5.2.0',
'pseudo_family': 'SSSP_v0.7_eff_PBE'}
wf.set_params(params)
# Add some results that their names collide with the parameter names
wf.add_result('structure', 'test_string_1')
wf.add_result('codename', 'test_string_2')
# Check that we have the correct results
self.assertDictEqual(
{'structure': 'test_string_1', 'codename': 'test_string_2'},
wf.get_results(), "The workflow results are not the expected "
"ones.")
# Check that we have the correct parameters
self.assertDictEqual(params, wf.get_parameters(),
"The workflow parameters are not the expected "
"ones.")
[docs] def tearDown(self):
"""
Cleaning the database after each test. Since I don't
want the workflows of one test to interfere with the
workflows of the other tests.
"""
self._class_was_setup = True
self.clean_db()
self.insert_data()