# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Test export file migration from export version 0.5 to 0.6"""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
from aiida.backends.general.migrations.calc_state import STATE_MAPPING
from aiida.backends.testbase import AiidaTestCase
from aiida.backends.tests.utils.archives import get_json_files
from aiida.tools.importexport.migration.utils import verify_metadata_version
from aiida.tools.importexport.migration.v05_to_v06 import migrate_v5_to_v6
[docs]class TestMigrateV05toV06(AiidaTestCase):
"""Test migration of export files from export version 0.5 to 0.6"""
[docs] @classmethod
def setUpClass(cls, *args, **kwargs):
super(TestMigrateV05toV06, cls).setUpClass(*args, **kwargs)
# Utility helpers
cls.external_archive = {'filepath': 'archives', 'external_module': 'aiida-export-migration-tests'}
cls.core_archive = {'filepath': 'export/migrate'}
[docs] def test_migrate_v5_to_v6(self):
"""Test migration for file containing complete v0.5 era possibilities"""
from aiida import get_version
# Get metadata.json and data.json as dicts from v0.5 file archive
metadata_v5, data_v5 = get_json_files('export_v0.5_simple.aiida', **self.core_archive)
verify_metadata_version(metadata_v5, version='0.5')
# Get metadata.json and data.json as dicts from v0.6 file archive
metadata_v6, data_v6 = get_json_files('export_v0.6_simple.aiida', **self.core_archive)
verify_metadata_version(metadata_v6, version='0.6')
# Migrate to v0.6
migrate_v5_to_v6(metadata_v5, data_v5)
verify_metadata_version(metadata_v5, version='0.6')
# Remove AiiDA version, since this may change irregardless of the migration function
metadata_v5.pop('aiida_version')
metadata_v6.pop('aiida_version')
# Assert conversion message in `metadata.json` is correct and then remove it for later assertions
self.maxDiff = None # pylint: disable=invalid-name
conversion_message = 'Converted from version 0.5 to 0.6 with AiiDA v{}'.format(get_version())
self.assertEqual(
metadata_v5.pop('conversion_info')[-1],
conversion_message,
msg='The conversion message after migration is wrong'
)
metadata_v6.pop('conversion_info')
# Assert changes were performed correctly
self.assertDictEqual(
metadata_v5,
metadata_v6,
msg='After migration, metadata.json should equal intended metadata.json from archives'
)
self.assertDictEqual(
data_v5, data_v6, msg='After migration, data.json should equal intended data.json from archives'
)
[docs] def test_migrate_v5_to_v6_calc_states(self):
"""Test the data migration of legacy `JobCalcState` attributes.
This test has to use a local archive because the current archive from the `aiida-export-migration-tests`
module does not include a `CalcJobNode` with a legacy `state` attribute.
"""
# Get metadata.json and data.json as dicts from v0.5 file archive
metadata, data = get_json_files('export_v0.5_simple.aiida', **self.core_archive)
verify_metadata_version(metadata, version='0.5')
calc_job_node_type = 'process.calculation.calcjob.CalcJobNode.'
node_data = data['export_data'].get('Node', {})
node_attributes = data['node_attributes']
calc_jobs = {}
for pk, values in node_data.items():
if values['node_type'] == calc_job_node_type and 'state' in data['node_attributes'].get(pk, {}):
calc_jobs[pk] = data['node_attributes'][pk]['state']
# Migrate to v0.6
migrate_v5_to_v6(metadata, data)
verify_metadata_version(metadata, version='0.6')
node_attributes = data['node_attributes']
# The export archive contains a single `CalcJobNode` that had `state=FINISHED`.
for pk, state in calc_jobs.items():
attributes = node_attributes[pk]
if STATE_MAPPING[state].exit_status is not None:
self.assertEqual(attributes['exit_status'], STATE_MAPPING[state].exit_status)
if STATE_MAPPING[state].process_state is not None:
self.assertEqual(attributes['process_state'], STATE_MAPPING[state].process_state)
if STATE_MAPPING[state].process_status is not None:
self.assertEqual(attributes['process_status'], STATE_MAPPING[state].process_status)
self.assertEqual(attributes['process_label'], 'Legacy JobCalculation')
[docs] def test_migrate_v5_to_v6_datetime(self):
"""Test the data migration of serialized datetime objects.
Datetime attributes were serialized into strings, by first converting to UTC and then printing with the format
'%Y-%m-%dT%H:%M:%S.%f'. In the database migration, datetimes were serialized *including* timezone information.
Here we test that the archive migration correctly reattaches the timezone information. The archive that we are
using `export_v0.5_simple.aiida` contains a node with the attribute "scheduler_lastchecktime".
"""
# Get metadata.json and data.json as dicts from v0.5 file archive
metadata, data = get_json_files('export_v0.5_simple.aiida', **self.core_archive)
verify_metadata_version(metadata, version='0.5')
for key, values in data['node_attributes'].items():
if 'scheduler_lastchecktime' not in values:
continue
serialized_original = values['scheduler_lastchecktime']
msg = 'the serialized datetime before migration should not contain a plus: {}'.format(serialized_original)
self.assertTrue('+' not in serialized_original, msg=msg)
# Migrate to v0.6
migrate_v5_to_v6(metadata, data)
verify_metadata_version(metadata, version='0.6')
serialized_migrated = data['node_attributes'][key]['scheduler_lastchecktime']
self.assertEqual(serialized_migrated, serialized_original + '+00:00')
break
else:
raise RuntimeError(
'the archive `export_v0.5_simple.aiida` did not contain a node with the attribute '
'`scheduler_lastchecktime` which is required for this test.'
)
[docs] def test_migrate_v5_to_v6_complete(self):
"""Test migration for file containing complete v0.5 era possibilities"""
# Get metadata.json and data.json as dicts from v0.5 file archive
metadata, data = get_json_files('export_v0.5_manual.aiida', **self.external_archive)
verify_metadata_version(metadata, version='0.5')
# Migrate to v0.6
migrate_v5_to_v6(metadata, data)
verify_metadata_version(metadata, version='0.6')
self.maxDiff = None # pylint: disable=invalid-name
# Explicitly check that conversion dictionaries were removed
illegal_data_dicts = {'node_attributes_conversion', 'node_extras_conversion'}
for dict_ in illegal_data_dicts:
self.assertNotIn(dict_, data, msg="dictionary '{}' should have been removed from data.json".format(dict_))