Source code for aiida.storage.psql_dos.migrations.utils.legacy_workflows
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
# ruff: noqa: N806
"""Utilities for removing legacy workflows."""
import codecs
import json
import sys
import click
from sqlalchemy.sql import func, select, table
from aiida.cmdline.utils import echo
[docs]
def json_serializer(obj):
"""JSON serializer for objects not serializable by default json code"""
from datetime import date, datetime
from uuid import UUID
if isinstance(obj, UUID):
return str(obj)
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError(f'Type {type(obj)} not serializable')
[docs]
def export_workflow_data(connection, profile):
"""Export existing legacy workflow data to a JSON file."""
from tempfile import NamedTemporaryFile
DbWorkflow = table('db_dbworkflow')
DbWorkflowData = table('db_dbworkflowdata')
DbWorkflowStep = table('db_dbworkflowstep')
count_workflow = connection.execute(select(func.count()).select_from(DbWorkflow)).scalar()
count_workflow_data = connection.execute(select(func.count()).select_from(DbWorkflowData)).scalar()
count_workflow_step = connection.execute(select(func.count()).select_from(DbWorkflowStep)).scalar()
# Nothing to do if all tables are empty
if count_workflow == 0 and count_workflow_data == 0 and count_workflow_step == 0:
return
if not profile.is_test_profile:
echo.echo('\n')
echo.echo_warning('The legacy workflow tables contain data but will have to be dropped to continue.')
echo.echo_warning('If you continue, the content will be dumped to a JSON file, before dropping the tables.')
echo.echo_warning('This serves merely as a reference and cannot be used to restore the database.')
echo.echo_warning('If you want a proper backup, make sure to dump the full database and backup your repository')
if not click.confirm('Are you sure you want to continue', default=True):
sys.exit(1)
delete_on_close = profile.is_test_profile
data = {
'workflow': [dict(row._mapping) for row in connection.execute(select('*').select_from(DbWorkflow))],
'workflow_data': [dict(row._mapping) for row in connection.execute(select('*').select_from(DbWorkflowData))],
'workflow_step': [dict(row._mapping) for row in connection.execute(select('*').select_from(DbWorkflowStep))],
}
with NamedTemporaryFile(
prefix='legacy-workflows', suffix='.json', dir='.', delete=delete_on_close, mode='wb'
) as handle:
filename = handle.name
json.dump(data, codecs.getwriter('utf-8')(handle), default=json_serializer)
# If delete_on_close is False, we are running for the user and add additional message of file location
if not delete_on_close:
echo.echo_report(f'Exported workflow data to {filename}')