Source code for aiida.backends.sqlalchemy.utils

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
# pylint: disable=import-error,no-name-in-module
"""Utility functions specific to the SqlAlchemy backend."""


[docs]def delete_nodes_and_connections_sqla(pks_to_delete): # pylint: disable=invalid-name """ Delete all nodes corresponding to pks in the input. :param pks_to_delete: A list, tuple or set of pks that should be deleted. """ # pylint: disable=no-value-for-parameter from aiida.backends.sqlalchemy.models.node import DbNode, DbLink from aiida.backends.sqlalchemy.models.group import table_groups_nodes from aiida.manage.manager import get_manager backend = get_manager().get_backend() with backend.transaction() as session: # I am first making a statement to delete the membership of these nodes to groups. # Since table_groups_nodes is a sqlalchemy.schema.Table, I am using expression language to compile # a stmt to be executed by the session. It works, but it's not nice that two different ways are used! # Can this be changed? stmt = table_groups_nodes.delete().where(table_groups_nodes.c.dbnode_id.in_(list(pks_to_delete))) session.execute(stmt) # First delete links, then the Nodes, since we are not cascading deletions. # Here I delete the links coming out of the nodes marked for deletion. session.query(DbLink).filter(DbLink.input_id.in_(list(pks_to_delete))).delete(synchronize_session='fetch') # Here I delete the links pointing to the nodes marked for deletion. session.query(DbLink).filter(DbLink.output_id.in_(list(pks_to_delete))).delete(synchronize_session='fetch') # Now I am deleting the nodes session.query(DbNode).filter(DbNode.id.in_(list(pks_to_delete))).delete(synchronize_session='fetch')
[docs]def flag_modified(instance, key): """Wrapper around `sqlalchemy.orm.attributes.flag_modified` to correctly dereference utils.ModelWrapper Since SqlAlchemy 1.2.12 (and maybe earlier but not in 1.0.19) the flag_modified function will check that the key is actually present in the instance or it will except. If we pass a model instance, wrapped in the ModelWrapper the call will raise an InvalidRequestError. In this function that wraps the flag_modified of SqlAlchemy, we derefence the model instance if the passed instance is actually wrapped in the ModelWrapper. """ from sqlalchemy.orm.attributes import flag_modified as flag_modified_sqla from aiida.orm.implementation.sqlalchemy.utils import ModelWrapper if isinstance(instance, ModelWrapper): instance = instance._model # pylint: disable=protected-access flag_modified_sqla(instance, key)
[docs]def install_tc(session): """ Install the transitive closure table with SqlAlchemy. """ links_table_name = 'db_dblink' links_table_input_field = 'input_id' links_table_output_field = 'output_id' closure_table_name = 'db_dbpath' closure_table_parent_field = 'parent_id' closure_table_child_field = 'child_id' session.execute( get_pg_tc( links_table_name, links_table_input_field, links_table_output_field, closure_table_name, closure_table_parent_field, closure_table_child_field ) )
[docs]def get_pg_tc( links_table_name, links_table_input_field, links_table_output_field, closure_table_name, closure_table_parent_field, closure_table_child_field ): """ Return the transitive closure table template """ from string import Template pg_tc = Template( """ DROP TRIGGER IF EXISTS autoupdate_tc ON $links_table_name; DROP FUNCTION IF EXISTS update_tc(); CREATE OR REPLACE FUNCTION update_tc() RETURNS trigger AS $$BODY$$ DECLARE new_id INTEGER; old_id INTEGER; num_rows INTEGER; BEGIN IF tg_op = 'INSERT' THEN IF EXISTS ( SELECT Id FROM $closure_table_name WHERE $closure_table_parent_field = new.$links_table_input_field AND $closure_table_child_field = new.$links_table_output_field AND depth = 0 ) THEN RETURN null; END IF; IF new.$links_table_input_field = new.$links_table_output_field OR EXISTS ( SELECT id FROM $closure_table_name WHERE $closure_table_parent_field = new.$links_table_output_field AND $closure_table_child_field = new.$links_table_input_field ) THEN RETURN null; END IF; INSERT INTO $closure_table_name ( $closure_table_parent_field, $closure_table_child_field, depth) VALUES ( new.$links_table_input_field, new.$links_table_output_field, 0); new_id := lastval(); UPDATE $closure_table_name SET entry_edge_id = new_id , exit_edge_id = new_id , direct_edge_id = new_id WHERE id = new_id; INSERT INTO $closure_table_name ( entry_edge_id, direct_edge_id, exit_edge_id, $closure_table_parent_field, $closure_table_child_field, depth) SELECT id , new_id , new_id , $closure_table_parent_field , new.$links_table_output_field , depth + 1 FROM $closure_table_name WHERE $closure_table_child_field = new.$links_table_input_field; INSERT INTO $closure_table_name ( entry_edge_id, direct_edge_id, exit_edge_id, $closure_table_parent_field, $closure_table_child_field, depth) SELECT new_id , new_id , id , new.$links_table_input_field , $closure_table_child_field , depth + 1 FROM $closure_table_name WHERE $closure_table_parent_field = new.$links_table_output_field; INSERT INTO $closure_table_name ( entry_edge_id, direct_edge_id, exit_edge_id, $closure_table_parent_field, $closure_table_child_field, depth) SELECT A.id , new_id , B.id , A.$closure_table_parent_field , B.$closure_table_child_field , A.depth + B.depth + 2 FROM $closure_table_name A CROSS JOIN $closure_table_name B WHERE A.$closure_table_child_field = new.$links_table_input_field AND B.$closure_table_parent_field = new.$links_table_output_field; END IF; IF tg_op = 'DELETE' THEN IF NOT EXISTS( SELECT id FROM $closure_table_name WHERE $closure_table_parent_field = old.$links_table_input_field AND $closure_table_child_field = old.$links_table_output_field AND depth = 0 ) THEN RETURN NULL; END IF; CREATE TABLE PurgeList (Id int); INSERT INTO PurgeList SELECT id FROM $closure_table_name WHERE $closure_table_parent_field = old.$links_table_input_field AND $closure_table_child_field = old.$links_table_output_field AND depth = 0; WHILE (1 = 1) loop INSERT INTO PurgeList SELECT id FROM $closure_table_name WHERE depth > 0 AND ( entry_edge_id IN ( SELECT Id FROM PurgeList ) OR direct_edge_id IN ( SELECT Id FROM PurgeList ) OR exit_edge_id IN ( SELECT Id FROM PurgeList ) ) AND Id NOT IN (SELECT Id FROM PurgeList ); GET DIAGNOSTICS num_rows = ROW_COUNT; if (num_rows = 0) THEN EXIT; END IF; end loop; DELETE FROM $closure_table_name WHERE Id IN ( SELECT Id FROM PurgeList); DROP TABLE PurgeList; END IF; RETURN NULL; END $$BODY$$ LANGUAGE plpgsql VOLATILE COST 100; CREATE TRIGGER autoupdate_tc AFTER INSERT OR DELETE OR UPDATE ON $links_table_name FOR each ROW EXECUTE PROCEDURE update_tc(); """ ) return pg_tc.substitute( links_table_name=links_table_name, links_table_input_field=links_table_input_field, links_table_output_field=links_table_output_field, closure_table_name=closure_table_name, closure_table_parent_field=closure_table_parent_field, closure_table_child_field=closure_table_child_field )