Source code for aiida.backends.sqlalchemy.migrations.versions.239cea6d2452_provenance_redesign

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
# pylint: disable=invalid-name,no-member
"""Migration after the provenance redesign

Revision ID: 239cea6d2452
Revises: 140c971ae0a3
Create Date: 2018-12-04 21:14:15.250247

"""

# Remove when https://github.com/PyCQA/pylint/issues/1931 is fixed
# pylint: disable=no-name-in-module,import-error
from alembic import op
from sqlalchemy import String, Integer
from sqlalchemy.sql import table, column, select, text
from sqlalchemy.dialects.postgresql import UUID

# revision identifiers, used by Alembic.
revision = '239cea6d2452'
down_revision = '140c971ae0a3'
branch_labels = None
depends_on = None


[docs]def migrate_infer_calculation_entry_point(connection): """Set the process type for calculation nodes by inferring it from their type string.""" from aiida.manage.database.integrity import write_database_integrity_violation from aiida.manage.database.integrity.plugins import infer_calculation_entry_point from aiida.plugins.entry_point import ENTRY_POINT_STRING_SEPARATOR DbNode = table( 'db_dbnode', column('id', Integer), column('uuid', UUID), column('type', String), column('process_type', String) ) query_set = connection.execute(select([DbNode.c.type]).where(DbNode.c.type.like('calculation.%'))).fetchall() type_strings = set(entry[0] for entry in query_set) mapping_node_type_to_entry_point = infer_calculation_entry_point(type_strings=type_strings) fallback_cases = [] for type_string, entry_point_string in mapping_node_type_to_entry_point.items(): # If the entry point string does not contain the entry point string separator, the mapping function was not able # to map the type string onto a known entry point string. As a fallback it uses the modified type string itself. # All affected entries should be logged to file that the user can consult. if ENTRY_POINT_STRING_SEPARATOR not in entry_point_string: query_set = connection.execute( select([DbNode.c.uuid]).where(DbNode.c.type == op.inline_literal(type_string)) ).fetchall() uuids = [str(entry.uuid) for entry in query_set] for uuid in uuids: fallback_cases.append([uuid, type_string, entry_point_string]) connection.execute( DbNode.update().where(DbNode.c.type == op.inline_literal(type_string) ).values(process_type=op.inline_literal(entry_point_string)) ) if fallback_cases: headers = ['UUID', 'type (old)', 'process_type (fallback)'] warning_message = 'found calculation nodes with a type string that could not be mapped onto a known entry point' action_message = 'inferred `process_type` for all calculation nodes, using fallback for unknown entry points' write_database_integrity_violation(fallback_cases, headers, warning_message, action_message)
[docs]def upgrade(): """The upgrade migration actions.""" connection = op.get_bind() # Migrate calculation nodes by inferring the process type from the type string migrate_infer_calculation_entry_point(connection) # Detect if the database contain any unexpected links detect_unexpected_links(connection) statement = text( """ DELETE FROM db_dblink WHERE db_dblink.id IN ( SELECT db_dblink.id FROM db_dblink INNER JOIN db_dbnode ON db_dblink.input_id = db_dbnode.id WHERE (db_dbnode.type LIKE 'calculation.job%' OR db_dbnode.type LIKE 'calculation.inline%') AND db_dblink.type = 'returnlink' ); -- Delete all outgoing RETURN links from JobCalculation and InlineCalculation nodes DELETE FROM db_dblink WHERE db_dblink.id IN ( SELECT db_dblink.id FROM db_dblink INNER JOIN db_dbnode ON db_dblink.input_id = db_dbnode.id WHERE (db_dbnode.type LIKE 'calculation.job%' OR db_dbnode.type LIKE 'calculation.inline%') AND db_dblink.type = 'calllink' ); -- Delete all outgoing CALL links from JobCalculation and InlineCalculation nodes DELETE FROM db_dblink WHERE db_dblink.id IN ( SELECT db_dblink.id FROM db_dblink INNER JOIN db_dbnode ON db_dblink.input_id = db_dbnode.id WHERE (db_dbnode.type LIKE 'calculation.function%' OR db_dbnode.type LIKE 'calculation.work%') AND db_dblink.type = 'createlink' ); -- Delete all outgoing CREATE links from WorkCalculation nodes UPDATE db_dbnode SET type = 'calculation.work.WorkCalculation.' WHERE type = 'calculation.process.ProcessCalculation.'; -- First migrate very old `ProcessCalculation` to `WorkCalculation` UPDATE db_dbnode SET type = 'node.process.workflow.workfunction.WorkFunctionNode.' WHERE type = 'calculation.work.WorkCalculation.' AND attributes ? 'function_name'; -- WorkCalculations that have a `function_name` attribute are `WorkFunctionNode` UPDATE db_dbnode SET type = 'node.process.workflow.workchain.WorkChainNode.' WHERE type = 'calculation.work.WorkCalculation.'; -- Update type for `WorkCalculation` nodes - all what is left should be `WorkChainNodes` UPDATE db_dbnode SET type = 'node.process.calculation.calcjob.CalcJobNode.' WHERE type LIKE 'calculation.job.%'; -- Update type for JobCalculation nodes UPDATE db_dbnode SET type = 'node.process.calculation.calcfunction.CalcFunctionNode.' WHERE type = 'calculation.inline.InlineCalculation.'; -- Update type for InlineCalculation nodes UPDATE db_dbnode SET type = 'node.process.workflow.workfunction.WorkFunctionNode.' WHERE type = 'calculation.function.FunctionCalculation.'; -- Update type for FunctionCalculation nodes UPDATE db_dblink SET type = 'create' WHERE type = 'createlink'; -- Rename `createlink` to `create` UPDATE db_dblink SET type = 'return' WHERE type = 'returnlink'; -- Rename `returnlink` to `return` UPDATE db_dblink SET type = 'input_calc' FROM db_dbnode WHERE db_dblink.output_id = db_dbnode.id AND db_dbnode.type LIKE 'node.process.calculation%' AND db_dblink.type = 'inputlink'; -- Rename `inputlink` to `input_calc` if the target node is a calculation type node UPDATE db_dblink SET type = 'input_work' FROM db_dbnode WHERE db_dblink.output_id = db_dbnode.id AND db_dbnode.type LIKE 'node.process.workflow%' AND db_dblink.type = 'inputlink'; -- Rename `inputlink` to `input_work` if the target node is a workflow type node UPDATE db_dblink SET type = 'call_calc' FROM db_dbnode WHERE db_dblink.output_id = db_dbnode.id AND db_dbnode.type LIKE 'node.process.calculation%' AND db_dblink.type = 'calllink'; -- Rename `calllink` to `call_calc` if the target node is a calculation type node UPDATE db_dblink SET type = 'call_work' FROM db_dbnode WHERE db_dblink.output_id = db_dbnode.id AND db_dbnode.type LIKE 'node.process.workflow%' AND db_dblink.type = 'calllink'; -- Rename `calllink` to `call_work` if the target node is a workflow type node """ ) connection.execute(statement)
[docs]def downgrade(): """The downgrade migration actions.""" connection = op.get_bind() statement = text( """ UPDATE db_dbnode SET type = 'calculation.job.JobCalculation.' WHERE type = 'node.process.calculation.calcjob.CalcJobNode.'; UPDATE db_dbnode SET type = 'calculatison.inline.InlineCalculation.' WHERE type = 'node.process.calculation.calcfunction.CalcFunctionNode.'; UPDATE db_dbnode SET type = 'calculation.function.FunctionCalculation.' WHERE type = 'node.process.workflow.workfunction.WorkFunctionNode.'; UPDATE db_dbnode SET type = 'calculation.work.WorkCalculation.' WHERE type = 'node.process.workflow.workchain.WorkChainNode.'; UPDATE db_dblink SET type = 'inputlink' WHERE type = 'input_call' OR type = 'input_work'; UPDATE db_dblink SET type = 'calllink' WHERE type = 'call_call' OR type = 'call_work'; UPDATE db_dblink SET type = 'createlink' WHERE type = 'create'; UPDATE db_dblink SET type = 'returnlink' WHERE type = 'return'; """ ) connection.execute(statement)