# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
# pylint: disable=too-many-lines
"""Unittests for REST API."""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import tempfile
from aiida import orm
from aiida.backends.testbase import AiidaTestCase
from aiida.common import json
from aiida.common.links import LinkType
from aiida.restapi.api import App, AiidaApi
[docs]class RESTApiTestCase(AiidaTestCase):
"""
Setup of the tests for the AiiDA RESTful-api
"""
_url_prefix = '/api/v4'
_dummy_data = {}
_PERPAGE_DEFAULT = 20
_LIMIT_DEFAULT = 400
[docs] @classmethod
def setUpClass(cls, *args, **kwargs): # pylint: disable=too-many-locals, too-many-statements
"""
Basides the standard setup we need to add few more objects in the
database to be able to explore different requests/filters/orderings etc.
"""
# call parent setUpClass method
super(RESTApiTestCase, cls).setUpClass()
# connect the app and the api
# Init the api by connecting it the the app (N.B. respect the following
# order, api.__init__)
kwargs = dict(PREFIX=cls._url_prefix, PERPAGE_DEFAULT=cls._PERPAGE_DEFAULT, LIMIT_DEFAULT=cls._LIMIT_DEFAULT)
cls.app = App(__name__)
cls.app.config['TESTING'] = True
AiidaApi(cls.app, **kwargs)
# create test inputs
cell = ((2., 0., 0.), (0., 2., 0.), (0., 0., 2.))
structure = orm.StructureData(cell=cell)
structure.append_atom(position=(0., 0., 0.), symbols=['Ba'])
structure.store()
structure.add_comment('This is test comment.')
structure.add_comment('Add another comment.')
cif = orm.CifData(ase=structure.get_ase())
cif.store()
parameter1 = orm.Dict(dict={'a': 1, 'b': 2})
parameter1.store()
parameter2 = orm.Dict(dict={'c': 3, 'd': 4})
parameter2.store()
kpoint = orm.KpointsData()
kpoint.set_kpoints_mesh([4, 4, 4])
kpoint.store()
resources = {'num_machines': 1, 'num_mpiprocs_per_machine': 1}
calcfunc = orm.CalcFunctionNode(computer=cls.computer)
calcfunc.store()
calc = orm.CalcJobNode(computer=cls.computer)
calc.set_option('resources', resources)
calc.set_attribute('attr1', 'OK')
calc.set_attribute('attr2', 'OK')
calc.set_extra('extra1', False)
calc.set_extra('extra2', 'extra_info')
calc.add_incoming(structure, link_type=LinkType.INPUT_CALC, link_label='link_structure')
calc.add_incoming(parameter1, link_type=LinkType.INPUT_CALC, link_label='link_parameter')
aiida_in = 'The input file\nof the CalcJob node'
# Add the calcjob_inputs folder with the aiida.in file to the CalcJobNode repository
with tempfile.NamedTemporaryFile(mode='w+') as handle:
handle.write(aiida_in)
handle.flush()
handle.seek(0)
calc.put_object_from_filelike(handle, key='calcjob_inputs/aiida.in', force=True)
calc.store()
# create log message for calcjob
import logging
from aiida.common.log import LOG_LEVEL_REPORT
from aiida.common.timezone import now
from aiida.orm import Log
log_record = {
'time': now(),
'loggername': 'loggername',
'levelname': logging.getLevelName(LOG_LEVEL_REPORT),
'dbnode_id': calc.id,
'message': 'This is a template record message',
'metadata': {
'content': 'test'
},
}
Log(**log_record)
aiida_out = 'The output file\nof the CalcJob node'
retrieved_outputs = orm.FolderData()
# Add the calcjob_outputs folder with the aiida.out file to the FolderData node
with tempfile.NamedTemporaryFile(mode='w+') as handle:
handle.write(aiida_out)
handle.flush()
handle.seek(0)
retrieved_outputs.put_object_from_filelike(handle, key='calcjob_outputs/aiida.out', force=True)
retrieved_outputs.store()
retrieved_outputs.add_incoming(calc, link_type=LinkType.CREATE, link_label='retrieved')
kpoint.add_incoming(calc, link_type=LinkType.CREATE, link_label='create')
calc1 = orm.CalcJobNode(computer=cls.computer)
calc1.set_option('resources', resources)
calc1.store()
dummy_computers = [{
'name': 'test1',
'hostname': 'test1.epfl.ch',
'transport_type': 'ssh',
'scheduler_type': 'pbspro',
}, {
'name': 'test2',
'hostname': 'test2.epfl.ch',
'transport_type': 'ssh',
'scheduler_type': 'torque',
}, {
'name': 'test3',
'hostname': 'test3.epfl.ch',
'transport_type': 'local',
'scheduler_type': 'slurm',
}, {
'name': 'test4',
'hostname': 'test4.epfl.ch',
'transport_type': 'ssh',
'scheduler_type': 'slurm',
}]
for dummy_computer in dummy_computers:
computer = orm.Computer(**dummy_computer)
computer.store()
# Prepare typical REST responses
cls.process_dummy_data()
[docs] def get_dummy_data(self):
return self._dummy_data
[docs] def get_url_prefix(self):
return self._url_prefix
[docs] @classmethod
def process_dummy_data(cls):
# pylint: disable=fixme
"""
This functions prepare atomic chunks of typical responses from the
RESTapi and puts them into class attributes
"""
# TODO: Storing the different nodes as lists and accessing them
# by their list index is very fragile and a pain to debug.
# Please change this!
computer_projections = ['id', 'uuid', 'name', 'hostname', 'transport_type', 'scheduler_type']
computers = orm.QueryBuilder().append(orm.Computer, tag='comp', project=computer_projections).order_by({
'comp': [{
'id': {
'order': 'asc'
}
}]
}).dict()
# Cast UUID into a string (e.g. in sqlalchemy it comes as a UUID object)
computers = [_['comp'] for _ in computers]
for comp in computers:
if comp['uuid'] is not None:
comp['uuid'] = str(comp['uuid'])
cls._dummy_data['computers'] = computers
calculation_projections = ['id', 'uuid', 'user_id', 'node_type']
calculations = orm.QueryBuilder().append(orm.CalculationNode, tag='calc',
project=calculation_projections).order_by({
'calc': [{
'id': {
'order': 'desc'
}
}]
}).dict()
calculations = [_['calc'] for _ in calculations]
for calc in calculations:
if calc['uuid'] is not None:
calc['uuid'] = str(calc['uuid'])
cls._dummy_data['calculations'] = calculations
data_projections = ['id', 'uuid', 'user_id', 'node_type']
data_types = {
'cifdata': orm.CifData,
'parameterdata': orm.Dict,
'structuredata': orm.StructureData,
'data': orm.Data,
}
for label, dataclass in data_types.items():
data = orm.QueryBuilder().append(dataclass, tag='data', project=data_projections).order_by({
'data': [{
'id': {
'order': 'desc'
}
}]
}).dict()
data = [_['data'] for _ in data]
for datum in data:
if datum['uuid'] is not None:
datum['uuid'] = str(datum['uuid'])
cls._dummy_data[label] = data
[docs] def split_path(self, url):
# pylint: disable=no-self-use
"""
Split the url with "?" to get url path and it's parameters
:param url: Web url
:return: url path and url parameters
"""
parts = url.split('?')
path = ''
query_string = ''
if parts:
path = parts[0]
if len(parts) > 1:
query_string = parts[1]
return path, query_string
# node details and list with limit, offset, page, perpage
[docs] def process_test(
self,
entity_type,
url,
full_list=False,
empty_list=False,
expected_list_ids=None,
expected_range=None,
expected_errormsg=None,
uuid=None,
result_node_type=None,
result_name=None
):
# pylint: disable=too-many-arguments
"""
Check whether response matches expected values.
:param entity_type: url requested fot the type of the node
:param url: web url
:param full_list: if url is requested to get full list
:param empty_list: if the response list is empty
:param expected_list_ids: list of expected ids from data
:param expected_range: [start, stop] range of expected ids from data
:param expected_errormsg: expected error message in response
:param uuid: url requested for the node pk
:param result_node_type: node type in response data
:param result_name: result name in response e.g. incoming, outgoing
"""
if expected_list_ids is None:
expected_list_ids = []
if expected_range is None:
expected_range = []
if result_node_type is None and result_name is None:
result_node_type = entity_type
result_name = entity_type
url = self._url_prefix + url
with self.app.test_client() as client:
rv_response = client.get(url)
response = json.loads(rv_response.data)
if expected_errormsg:
self.assertEqual(response['message'], expected_errormsg)
else:
if full_list:
expected_data = self._dummy_data[result_node_type]
elif empty_list:
expected_data = []
elif expected_list_ids:
expected_data = [self._dummy_data[result_node_type][i] for i in expected_list_ids]
elif expected_range != []:
expected_data = self._dummy_data[result_node_type][expected_range[0]:expected_range[1]]
else:
from aiida.common.exceptions import InputValidationError
raise InputValidationError('Pass the expected range of the dummydata')
expected_node_uuids = [node['uuid'] for node in expected_data]
result_node_uuids = [node['uuid'] for node in response['data'][result_name]]
self.assertEqual(expected_node_uuids, result_node_uuids)
self.compare_extra_response_data(entity_type, url, response, uuid)
[docs]class RESTApiTestSuite(RESTApiTestCase):
# pylint: disable=too-many-public-methods
"""
Define unittests for rest api
"""
[docs] def test_computers_details(self):
"""
Requests the details of single computer
"""
node_uuid = self.get_dummy_data()['computers'][1]['uuid']
RESTApiTestCase.process_test(
self, 'computers', '/computers/' + str(node_uuid), expected_list_ids=[1], uuid=node_uuid
)
[docs] def test_computers_list(self):
"""
Get the full list of computers from database
"""
RESTApiTestCase.process_test(self, 'computers', '/computers?orderby=+id', full_list=True)
[docs] def test_computers_list_limit_offset(self):
"""
Get the list of computers from database using limit
and offset parameter.
It should return the no of rows specified in limit from
database starting from the no. specified in offset
"""
RESTApiTestCase.process_test(
self, 'computers', '/computers?limit=2&offset=2&orderby=+id', expected_range=[2, 4]
)
[docs] def test_computers_list_limit_only(self):
"""
Get the list of computers from database using limit
parameter.
It should return the no of rows specified in limit from
database.
"""
RESTApiTestCase.process_test(self, 'computers', '/computers?limit=2&orderby=+id', expected_range=[None, 2])
[docs] def test_computers_list_offset_only(self):
"""
Get the list of computers from database using offset
parameter
It should return all the rows from database starting from
the no. specified in offset
"""
RESTApiTestCase.process_test(self, 'computers', '/computers?offset=2&orderby=+id', expected_range=[2, None])
[docs] def test_computers_list_limit_offset_perpage(self):
"""
If we pass the limit, offset and perpage at same time, it
would return the error message.
"""
expected_error = 'perpage key is incompatible with limit and offset'
RESTApiTestCase.process_test(
self, 'computers', '/computers?offset=2&limit=1&perpage=2&orderby=+id', expected_errormsg=expected_error
)
[docs] def test_computers_list_page_limit_offset(self):
"""
If we use the page, limit and offset at same time, it
would return the error message.
"""
expected_error = 'requesting a specific page is incompatible with ' \
'limit and offset'
RESTApiTestCase.process_test(
self, 'computers', '/computers/page/2?offset=2&limit=1&orderby=+id', expected_errormsg=expected_error
)
[docs] def test_complist_pagelimitoffset_perpage(self):
"""
If we use the page, limit, offset and perpage at same time, it
would return the error message.
"""
expected_error = 'perpage key is incompatible with limit and offset'
RESTApiTestCase.process_test(
self,
'computers',
'/computers/page/2?offset=2&limit=1&perpage=2&orderby=+id',
expected_errormsg=expected_error
)
[docs] def test_computers_list_page_default(self):
"""
it returns the no. of rows defined as default perpage option
from database.
no.of pages = total no. of computers in database / perpage
"/page" acts as "/page/1?perpage=default_value"
"""
RESTApiTestCase.process_test(self, 'computers', '/computers/page?orderby=+id', full_list=True)
[docs] def test_computers_list_page_perpage(self):
"""
no.of pages = total no. of computers in database / perpage
Using this formula it returns the no. of rows for requested page
"""
RESTApiTestCase.process_test(
self, 'computers', '/computers/page/1?perpage=2&orderby=+id', expected_range=[None, 2]
)
[docs] def test_computers_list_page_perpage_exceed(self):
"""
no.of pages = total no. of computers in database / perpage
If we request the page which exceeds the total no. of pages then
it would return the error message.
"""
expected_error = 'Non existent page requested. The page range is [1 : ' \
'3]'
RESTApiTestCase.process_test(
self, 'computers', '/computers/page/4?perpage=2&orderby=+id', expected_errormsg=expected_error
)
############### list filters ########################
[docs] def test_computers_filter_id1(self):
"""
Add filter on the id of computer and get the filtered computer
list (e.g. id=1)
"""
node_pk = self.get_dummy_data()['computers'][1]['id']
RESTApiTestCase.process_test(self, 'computers', '/computers?id=' + str(node_pk), expected_list_ids=[1])
[docs] def test_computers_filter_id2(self):
"""
Add filter on the id of computer and get the filtered computer
list (e.g. id > 2)
"""
node_pk = self.get_dummy_data()['computers'][1]['id']
RESTApiTestCase.process_test(
self, 'computers', '/computers?id>' + str(node_pk) + '&orderby=+id', expected_range=[2, None]
)
[docs] def test_computers_filter_pk(self):
"""
Add filter on the id of computer and get the filtered computer
list (e.g. id=1)
"""
node_pk = self.get_dummy_data()['computers'][1]['id']
RESTApiTestCase.process_test(self, 'computers', '/computers?pk=' + str(node_pk), expected_list_ids=[1])
[docs] def test_computers_filter_name(self):
"""
Add filter for the name of computer and get the filtered computer
list
"""
RESTApiTestCase.process_test(self, 'computers', '/computers?name="test1"', expected_list_ids=[1])
[docs] def test_computers_filter_hostname(self):
"""
Add filter for the hostname of computer and get the filtered computer
list
"""
RESTApiTestCase.process_test(self, 'computers', '/computers?hostname="test1.epfl.ch"', expected_list_ids=[1])
[docs] def test_computers_filter_transport_type(self):
"""
Add filter for the transport_type of computer and get the filtered
computer
list
"""
RESTApiTestCase.process_test(
self, 'computers', '/computers?transport_type="local"&name="test3"&orderby=+id', expected_list_ids=[3]
)
############### list orderby ########################
[docs] def test_computers_orderby_id_asc(self):
"""
Returns the computers list ordered by "id" in ascending
order
"""
RESTApiTestCase.process_test(self, 'computers', '/computers?orderby=id', full_list=True)
[docs] def test_computers_orderby_id_asc_sign(self):
"""
Returns the computers list ordered by "+id" in ascending
order
"""
RESTApiTestCase.process_test(self, 'computers', '/computers?orderby=+id', full_list=True)
[docs] def test_computers_orderby_id_desc(self):
"""
Returns the computers list ordered by "id" in descending
order
"""
RESTApiTestCase.process_test(self, 'computers', '/computers?orderby=-id', expected_list_ids=[4, 3, 2, 1, 0])
[docs] def test_computers_orderby_name_asc(self):
"""
Returns the computers list ordered by "name" in ascending
order
"""
node_pk = self.get_dummy_data()['computers'][0]['id']
RESTApiTestCase.process_test(
self, 'computers', '/computers?pk>' + str(node_pk) + '&orderby=name', expected_list_ids=[1, 2, 3, 4]
)
[docs] def test_computers_orderby_name_asc_sign(self):
"""
Returns the computers list ordered by "+name" in ascending
order
"""
node_pk = self.get_dummy_data()['computers'][0]['id']
RESTApiTestCase.process_test(
self, 'computers', '/computers?pk>' + str(node_pk) + '&orderby=+name', expected_list_ids=[1, 2, 3, 4]
)
[docs] def test_computers_orderby_name_desc(self):
"""
Returns the computers list ordered by "name" in descending
order
"""
node_pk = self.get_dummy_data()['computers'][0]['id']
RESTApiTestCase.process_test(
self, 'computers', '/computers?pk>' + str(node_pk) + '&orderby=-name', expected_list_ids=[4, 3, 2, 1]
)
[docs] def test_computers_orderby_scheduler_type_asc(self):
"""
Returns the computers list ordered by "scheduler_type" in ascending
order
"""
node_pk = self.get_dummy_data()['computers'][0]['id']
RESTApiTestCase.process_test(
self,
'computers',
'/computers?transport_type="ssh"&pk>' + str(node_pk) + '&orderby=scheduler_type',
expected_list_ids=[1, 4, 2]
)
[docs] def test_comp_orderby_scheduler_ascsign(self):
"""
Returns the computers list ordered by "+scheduler_type" in ascending
order
"""
node_pk = self.get_dummy_data()['computers'][0]['id']
RESTApiTestCase.process_test(
self,
'computers',
'/computers?transport_type="ssh"&pk>' + str(node_pk) + '&orderby=+scheduler_type',
expected_list_ids=[1, 4, 2]
)
[docs] def test_computers_orderby_schedulertype_desc(self):
"""
Returns the computers list ordered by "scheduler_type" in descending
order
"""
node_pk = self.get_dummy_data()['computers'][0]['id']
RESTApiTestCase.process_test(
self,
'computers',
'/computers?pk>' + str(node_pk) + '&transport_type="ssh"&orderby=-scheduler_type',
expected_list_ids=[2, 4, 1]
)
############### list orderby combinations #######################
[docs] def test_computers_orderby_mixed1(self):
"""
Returns the computers list first order by "transport_type" in
ascending order and if it is having same transport_type, order it
by "id"
"""
node_pk = self.get_dummy_data()['computers'][0]['id']
RESTApiTestCase.process_test(
self,
'computers',
'/computers?pk>' + str(node_pk) + '&orderby=transport_type,id',
expected_list_ids=[3, 1, 2, 4]
)
[docs] def test_computers_orderby_mixed2(self):
"""
Returns the computers list first order by "scheduler_type" in
descending order and if it is having same scheduler_type, order it
by "name"
"""
node_pk = self.get_dummy_data()['computers'][0]['id']
RESTApiTestCase.process_test(
self,
'computers',
'/computers?pk>' + str(node_pk) + '&orderby=-scheduler_type,name',
expected_list_ids=[2, 3, 4, 1]
)
[docs] def test_computers_orderby_mixed3(self):
"""
Returns the computers list first order by "scheduler_type" in
ascending order and if it is having same scheduler_type, order it
by "hostname" descending order
Response::
test4 slurm
test3 slurm
test2 torque
test1 pbspro
localhost pbspro
==========
Expected::
test1 pbspro
localhost pbspro
test4 slurm
test3 slurm
test2 torque
test1 test4
RESTApiTestCase.process_test(self, "computers",
"/computers?orderby=+scheduler_type,
-hostname",
expected_list_ids=[1,0,4,3,2])
"""
############### list filter combinations #######################
[docs] def test_computers_filter_mixed1(self):
"""
Add filter for the hostname and id of computer and get the
filtered computer list
"""
node_pk = self.get_dummy_data()['computers'][0]['id']
RESTApiTestCase.process_test(
self, 'computers', '/computers?id>' + str(node_pk) + '&hostname="test1.epfl.ch"', expected_list_ids=[1]
)
[docs] def test_computers_filter_mixed2(self):
"""
Add filter for the id, hostname and transport_type of the computer
and get the filtered computer list
"""
node_pk = self.get_dummy_data()['computers'][0]['id']
RESTApiTestCase.process_test(
self,
'computers',
'/computers?id>' + str(node_pk) + '&hostname="test3.epfl.ch"&transport_type="ssh"',
empty_list=True
)
############### list all parameter combinations #######################
[docs] def test_computers_mixed1(self):
"""
url parameters: id, limit and offset
"""
node_pk = self.get_dummy_data()['computers'][0]['id']
RESTApiTestCase.process_test(
self, 'computers', '/computers?id>' + str(node_pk) + '&limit=2&offset=3&orderby=+id', expected_list_ids=[4]
)
[docs] def test_computers_mixed2(self):
"""
url parameters: id, page, perpage
"""
node_pk = self.get_dummy_data()['computers'][0]['id']
RESTApiTestCase.process_test(
self,
'computers',
'/computers/page/2?id>' + str(node_pk) + '&perpage=2&orderby=+id',
expected_list_ids=[3, 4]
)
[docs] def test_computers_mixed3(self):
"""
url parameters: id, transport_type, orderby
"""
node_pk = self.get_dummy_data()['computers'][0]['id']
RESTApiTestCase.process_test(
self,
'computers',
'/computers?id>=' + str(node_pk) + '&transport_type="ssh"&orderby=-id&limit=2',
expected_list_ids=[4, 2]
)
########## pass unknown url parameter ###########
[docs] def test_computers_unknown_param(self):
"""
url parameters: id, limit and offset
from aiida.common.exceptions import InputValidationError
RESTApiTestCase.node_exception(self, "/computers?aa=bb&id=2", InputValidationError)
"""
############### calculation retrieved_inputs and retrieved_outputs #############
[docs] def test_calculation_retrieved_outputs(self):
"""
Get the list of given calculation retrieved_outputs
"""
node_uuid = self.get_dummy_data()['calculations'][1]['uuid']
url = self.get_url_prefix() + '/calcjobs/' + str(node_uuid) + '/output_files'
with self.app.test_client() as client:
response_value = client.get(url)
response = json.loads(response_value.data)
self.assertEqual(response['data'], [{'name': 'calcjob_outputs', 'type': 'DIRECTORY'}])
############### calculation incoming #############
[docs] def test_calculation_iotree(self):
"""
Get filtered incoming list for given calculations
"""
node_uuid = self.get_dummy_data()['calculations'][1]['uuid']
url = self.get_url_prefix() + '/nodes/' + str(node_uuid) + '/links/tree?in_limit=1&out_limit=1'
with self.app.test_client() as client:
response_value = client.get(url)
response = json.loads(response_value.data)
self.assertEqual(len(response['data']['nodes']), 1)
self.assertEqual(len(response['data']['nodes'][0]['incoming']), 1)
self.assertEqual(len(response['data']['nodes'][0]['outgoing']), 1)
self.assertEqual(len(response['data']['metadata']), 1)
expected_attr = [
'ctime', 'mtime', 'id', 'node_label', 'node_type', 'uuid', 'description', 'incoming', 'outgoing'
]
received_attr = response['data']['nodes'][0].keys()
for attr in expected_attr:
self.assertIn(attr, received_attr)
RESTApiTestCase.compare_extra_response_data(self, 'nodes', url, response, uuid=node_uuid)
############### calculation attributes #############
[docs] def test_calculation_attributes(self):
"""
Get list of calculation attributes
"""
attributes = {
'attr1': 'OK',
'attr2': 'OK',
'resources': {
'num_machines': 1,
'num_mpiprocs_per_machine': 1
},
}
node_uuid = self.get_dummy_data()['calculations'][1]['uuid']
url = self.get_url_prefix() + '/nodes/' + str(node_uuid) + '/contents/attributes'
with self.app.test_client() as client:
rv_obj = client.get(url)
response = json.loads(rv_obj.data)
self.assertNotIn('message', response)
self.assertEqual(response['data']['attributes'], attributes)
RESTApiTestCase.compare_extra_response_data(self, 'nodes', url, response, uuid=node_uuid)
[docs] def test_contents_attributes_filter(self):
"""
Get list of calculation attributes with filter attributes_filter
"""
node_uuid = self.get_dummy_data()['calculations'][1]['uuid']
url = self.get_url_prefix() + '/nodes/' + str(node_uuid) + '/contents/attributes?attributes_filter="attr1"'
with self.app.test_client() as client:
rv_obj = client.get(url)
response = json.loads(rv_obj.data)
self.assertNotIn('message', response)
self.assertEqual(response['data']['attributes'], {'attr1': 'OK'})
RESTApiTestCase.compare_extra_response_data(self, 'nodes', url, response, uuid=node_uuid)
############### calculation node attributes filter #############
[docs] def test_calculation_attributes_filter(self):
"""
Get the list of given calculation attributes filtered
"""
attributes = {
'attr1': 'OK',
'attr2': 'OK',
'resources': {
'num_machines': 1,
'num_mpiprocs_per_machine': 1
},
}
node_uuid = self.get_dummy_data()['calculations'][1]['uuid']
url = self.get_url_prefix() + '/nodes/' + str(node_uuid) + '?attributes=true'
with self.app.test_client() as client:
response_value = client.get(url)
response = json.loads(response_value.data)
self.assertEqual(response['data']['nodes'][0]['attributes'], attributes)
############### calculation node extras_filter #############
############### structure node attributes filter #############
[docs] def test_structure_attributes_filter(self):
"""
Get the list of given calculation attributes filtered
"""
cell = [[2., 0., 0.], [0., 2., 0.], [0., 0., 2.]]
node_uuid = self.get_dummy_data()['structuredata'][0]['uuid']
url = self.get_url_prefix() + '/nodes/' + str(node_uuid) + '?attributes=true&attributes_filter=cell'
with self.app.test_client() as client:
rv_obj = client.get(url)
response = json.loads(rv_obj.data)
self.assertEqual(response['data']['nodes'][0]['attributes']['cell'], cell)
############### node full_type filter #############
[docs] def test_nodes_full_type_filter(self):
"""
Get the list of nodes filtered by full_type
"""
expected_node_uuids = []
for calc in self.get_dummy_data()['calculations']:
if calc['node_type'] == 'process.calculation.calcjob.CalcJobNode.':
expected_node_uuids.append(calc['uuid'])
url = self.get_url_prefix() + '/nodes/' + '?full_type="process.calculation.calcjob.CalcJobNode.|"'
with self.app.test_client() as client:
rv_obj = client.get(url)
response = json.loads(rv_obj.data)
for node in response['data']['nodes']:
self.assertIn(node['uuid'], expected_node_uuids)
############### Structure visualization and download #############
[docs] def test_structure_derived_properties(self):
"""
Get the list of give calculation incoming
"""
node_uuid = self.get_dummy_data()['structuredata'][0]['uuid']
url = self.get_url_prefix() + '/nodes/' + str(node_uuid) + '/contents/derived_properties'
with self.app.test_client() as client:
rv_obj = client.get(url)
response = json.loads(rv_obj.data)
self.assertNotIn('message', response)
self.assertEqual(
response['data']['derived_properties']['dimensionality'], {
u'dim': 3,
u'value': 8.0,
u'label': u'volume'
}
)
self.assertEqual(response['data']['derived_properties']['formula'], 'Ba')
RESTApiTestCase.compare_extra_response_data(self, 'nodes', url, response, uuid=node_uuid)
[docs] def test_structure_download(self):
"""
Test download of structure file
"""
from aiida.orm import load_node
node_uuid = self.get_dummy_data()['structuredata'][0]['uuid']
url = self.get_url_prefix() + '/nodes/' + node_uuid + '/download?download_format=xsf'
with self.app.test_client() as client:
rv_obj = client.get(url)
structure_data = load_node(node_uuid)._exportcontent('xsf')[0] # pylint: disable=protected-access
self.assertEqual(rv_obj.data, structure_data)
[docs] def test_cif(self):
"""
Test download of cif file
"""
from aiida.orm import load_node
node_uuid = self.get_dummy_data()['cifdata'][0]['uuid']
url = self.get_url_prefix() + '/nodes/' + node_uuid + '/download?download_format=cif'
with self.app.test_client() as client:
rv_obj = client.get(url)
cif = load_node(node_uuid)._prepare_cif()[0] # pylint: disable=protected-access
self.assertEqual(rv_obj.data, cif)
############### projectable_properties #############
[docs] def test_projectable_properties(self):
"""
test projectable_properties endpoint
"""
for nodetype in ['nodes', 'processes', 'computers', 'users', 'groups']:
url = self.get_url_prefix() + '/' + nodetype + '/projectable_properties'
with self.app.test_client() as client:
rv_obj = client.get(url)
response = json.loads(rv_obj.data)
self.assertNotIn('message', response)
expected_keys = ['display_name', 'help_text', 'is_display', 'is_foreign_key', 'type']
# check fields
for _, pinfo in response['data']['fields'].items():
available_keys = pinfo.keys()
for prop in expected_keys:
self.assertIn(prop, available_keys)
# check order
available_properties = response['data']['fields'].keys()
for prop in response['data']['ordering']:
self.assertIn(prop, available_properties)
[docs] def test_node_namespace(self):
"""
Test the rest api call to get list of available node namespace
"""
url = self.get_url_prefix() + '/nodes/full_types'
with self.app.test_client() as client:
rv_obj = client.get(url)
response = json.loads(rv_obj.data)
expected_data_keys = ['path', 'namespace', 'subspaces', 'label', 'full_type']
response_keys = response['data'].keys()
for dkay in expected_data_keys:
self.assertIn(dkay, response_keys)
RESTApiTestCase.compare_extra_response_data(self, 'nodes', url, response)
[docs] def test_repo(self):
"""
Test to get repo list or repo file contents for given node
"""
from aiida.orm import load_node
node_uuid = self.get_dummy_data()['calculations'][1]['uuid']
url = self.get_url_prefix() + '/nodes/' + str(node_uuid) + '/repo/list?filename="calcjob_inputs"'
with self.app.test_client() as client:
response_value = client.get(url)
response = json.loads(response_value.data)
self.assertEqual(response['data']['repo_list'], [{'type': 'FILE', 'name': 'aiida.in'}])
url = self.get_url_prefix() + '/nodes/' + str(node_uuid) + '/repo/contents?filename="calcjob_inputs/aiida.in"'
with self.app.test_client() as client:
response_obj = client.get(url)
input_file = load_node(node_uuid).get_object_content('calcjob_inputs/aiida.in', mode='rb')
self.assertEqual(response_obj.data, input_file)
[docs] def test_process_report(self):
"""
Test process report
"""
node_uuid = self.get_dummy_data()['calculations'][1]['uuid']
url = self.get_url_prefix() + '/processes/' + str(node_uuid) + '/report'
with self.app.test_client() as client:
response_value = client.get(url)
response = json.loads(response_value.data)
expected_keys = response['data'].keys()
for key in ['logs']:
self.assertIn(key, expected_keys)
expected_log_keys = response['data']['logs'][0].keys()
for key in ['time', 'loggername', 'levelname', 'dbnode_id', 'message']:
self.assertIn(key, expected_log_keys)