Source code for aiida.restapi.common.identifiers

# -*- coding: utf-8 -*-
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit               #
"""Utility functions to work with node "full types" which are unique node identifiers.

A node's `full_type` is defined as a string that uniquely defines the node type. A valid `full_type` is constructed by
concatenating the `node_type` and `process_type` of a node with the `FULL_TYPE_CONCATENATOR`. Each segment of the full
type can optionally be terminated by a single `LIKE_OPERATOR_CHARACTER` to indicate that the `node_type` or
`process_type` should start with that value but can be followed by any amount of other characters. A full type is
invalid if it does not contain exactly one `FULL_TYPE_CONCATENATOR` character. Additionally, each segment can contain
at most one occurrence of the `LIKE_OPERATOR_CHARACTER` and it has to be at the end of the segment.

Examples of valid full types:


Examples of invalid full types:

    'data.bool'  # Only a single segment without concatenator
    'data.|bool.Bool.|process.'  # More than one concatenator
    'process.calculation%.calcfunction.|aiida.calculations:arithmetic.add'  # Like operator not at end of segment
    'process.calculation%.calcfunction.%|aiida.calculations:arithmetic.add'  # More than one operator in segment

from import MutableMapping

from aiida.common.escaping import escape_for_sql_like

DEFAULT_NAMESPACE_LABEL = '~no-entry-point~'

[docs]def validate_full_type(full_type): """Validate that the `full_type` is a valid full type unique node identifier. :param full_type: a `Node` full type :raises ValueError: if the `full_type` is invalid :raises TypeError: if the `full_type` is not a string type """ from aiida.common.lang import type_check type_check(full_type, str) if FULL_TYPE_CONCATENATOR not in full_type: raise ValueError( 'full type `{}` does not include the required concatenator symbol `{}`.'.format( full_type, FULL_TYPE_CONCATENATOR ) ) elif full_type.count(FULL_TYPE_CONCATENATOR) > 1: raise ValueError( 'full type `{}` includes the concatenator symbol `{}` more than once.'.format( full_type, FULL_TYPE_CONCATENATOR ) )
[docs]def construct_full_type(node_type, process_type): """Return the full type, which uniquely identifies any `Node` with the given `node_type` and `process_type`. :param node_type: the `node_type` of the `Node` :param process_type: the `process_type` of the `Node` :return: the full type, which is a unique identifier """ if node_type is None: process_type = '' if process_type is None: process_type = '' return '{}{}{}'.format(node_type, FULL_TYPE_CONCATENATOR, process_type)
[docs]def get_full_type_filters(full_type): """Return the `QueryBuilder` filters that will return all `Nodes` identified by the given `full_type`. :param full_type: the `full_type` unique node identifier :return: dictionary of filters to be passed for the `filters` keyword in `QueryBuilder.append` :raises ValueError: if the `full_type` is invalid :raises TypeError: if the `full_type` is not a string type """ validate_full_type(full_type) filters = {} node_type, process_type = full_type.split(FULL_TYPE_CONCATENATOR) for entry in (node_type, process_type): if entry.count(LIKE_OPERATOR_CHARACTER) > 1: raise ValueError('full type component `{}` contained more than one like-operator character'.format(entry)) if LIKE_OPERATOR_CHARACTER in entry and entry[-1] != LIKE_OPERATOR_CHARACTER: raise ValueError('like-operator character in full type component `{}` is not at the end'.format(entry)) if LIKE_OPERATOR_CHARACTER in node_type: # Remove the trailing `LIKE_OPERATOR_CHARACTER`, escape the string and reattach the character node_type = node_type[:-1] node_type = escape_for_sql_like(node_type) + LIKE_OPERATOR_CHARACTER filters['node_type'] = {'like': node_type} else: filters['node_type'] = escape_for_sql_like(node_type) if LIKE_OPERATOR_CHARACTER in process_type: # Remove the trailing `LIKE_OPERATOR_CHARACTER`, escape the string and reattach the character process_type = process_type[:-1] process_type = escape_for_sql_like(process_type) + LIKE_OPERATOR_CHARACTER filters['process_type'] = {'like': process_type} else: if process_type: filters['process_type'] = escape_for_sql_like(process_type) return filters
[docs]def load_entry_point_from_full_type(full_type): """Return the loaded entry point for the given `full_type` unique node identifier. :param full_type: the `full_type` unique node identifier :raises ValueError: if the `full_type` is invalid :raises TypeError: if the `full_type` is not a string type :raises `~aiida.common.exceptions.EntryPointError`: if the corresponding entry point cannot be loaded """ from aiida.common import EntryPointError from aiida.common.utils import strip_prefix from aiida.plugins.entry_point import is_valid_entry_point_string, load_entry_point, load_entry_point_from_string data_prefix = 'data.' validate_full_type(full_type) node_type, process_type = full_type.split(FULL_TYPE_CONCATENATOR) if is_valid_entry_point_string(process_type): try: return load_entry_point_from_string(process_type) except EntryPointError: raise EntryPointError('could not load entry point `{}`'.format(process_type)) elif node_type.startswith(data_prefix): base_name = strip_prefix(node_type, data_prefix) entry_point_name = base_name.rsplit('.', 2)[0] try: return load_entry_point('', entry_point_name) except EntryPointError: raise EntryPointError('could not load entry point `{}`'.format(process_type)) # Here we are dealing with a `ProcessNode` with a `process_type` that is not an entry point string. # Which means it is most likely a full module path (the fallback option) and we cannot necessarily load the # class from this. We could try with `importlib` but not sure that we should raise EntryPointError('entry point of the given full type cannot be loaded')
[docs]class Namespace(MutableMapping): """Namespace that can be used to map the node class hierarchy.""" namespace_separator = '.' # Very ugly ad-hoc mapping of `path` to `label` for the non-leaf entries in the nested `Namespace` mapping: mapping_path_to_label = { 'node': 'Node', '': 'Data', 'node.process': 'Process', 'node.process.calculation': 'Calculation', 'node.process.calculation.calcjob': 'Calculation job', 'node.process.calculation.calcfunction': 'Calculation function', 'node.process.workflow': 'Workflow', 'node.process.workflow.workchain': 'Work chain', 'node.process.workflow.workfunction': 'Work function', } # This is a hard-coded mapping to generate the correct full types for process node namespaces of external # plugins. The `node_type` in that case is fixed and the `process_type` should start with the entry point group # followed by the plugin name and the wildcard. process_full_type_mapping = { 'process.calculation.calcjob.': 'process.calculation.calcjob.CalcJobNode.|aiida.calculations:{plugin_name}.%', 'process.calculation.calcfunction.': 'process.calculation.calcfunction.CalcFunctionNode.|aiida.calculations:{plugin_name}.%', 'process.workflow.workfunction.': 'process.workflow.workfunction.WorkFunctionNode.|aiida.workflows:{plugin_name}.%', 'process.workflow.workchain.': 'process.workflow.workchain.WorkChainNode.|aiida.workflows:{plugin_name}.%', }
[docs] def __str__(self): import json return json.dumps(self.get_description(), sort_keys=True, indent=4)
[docs] def __init__(self, namespace, path=None, label=None, full_type=None, is_leaf=True): """Construct a new node class namespace.""" # pylint: disable=super-init-not-called self._namespace = namespace self._path = path if path else namespace self._full_type = self._infer_full_type(full_type) self._subspaces = {} self._is_leaf = is_leaf try: self._label = label if label is not None else self.mapping_path_to_label[path] except KeyError: self._label = self._path.rpartition('.')[-1] # Manual override for process subspaces that contain entries corresponding to nodes with "unregistered" process # types. In this case, the label should become `Unregistered` and the full type set to `None` because we cannot # query for all nodes that fall under this category. if namespace == DEFAULT_NAMESPACE_LABEL: self._label = 'Unregistered' self._full_type = None
[docs] def _infer_full_type(self, full_type): """Infer the full type based on the current namespace path and the given full type of the leaf.""" from aiida.common.utils import strip_prefix if full_type or self._path is None: return full_type full_type = strip_prefix(self._path, 'node.') if full_type.startswith('process.'): for basepath, full_type_template in self.process_full_type_mapping.items(): if full_type.startswith(basepath): plugin_name = strip_prefix(full_type, basepath) full_type = full_type_template.format(plugin_name=plugin_name) return full_type full_type += '.{}{}'.format(LIKE_OPERATOR_CHARACTER, FULL_TYPE_CONCATENATOR) if full_type.startswith('process.'): full_type += LIKE_OPERATOR_CHARACTER return full_type
[docs] def __iter__(self): return self._subspaces.__iter__()
[docs] def __len__(self): return len(self._subspaces)
[docs] def __delitem__(self, key): del self._subspaces[key]
[docs] def __getitem__(self, key): return self._subspaces[key]
[docs] def __setitem__(self, key, port): self._subspaces[key] = port
@property def is_leaf(self): return self._is_leaf
[docs] def get_description(self): """Return a dictionary with a description of the ports this namespace contains. Nested PortNamespaces will be properly recursed and Ports will print their properties in a list :returns: a dictionary of descriptions of the Ports contained within this PortNamespace """ result = { 'namespace': self._namespace, 'full_type': self._full_type, 'label': self._label, 'path': self._path, 'subspaces': [] } for _, port in self._subspaces.items(): result['subspaces'].append(port.get_description()) return result
[docs] def create_namespace(self, name, **kwargs): """Create and return a new `Namespace` in this `Namespace`. If the name is namespaced, the sub `Namespaces` will be created recursively, except if one of the namespaces is already occupied at any level by a Port in which case a ValueError will be thrown :param name: name (potentially namespaced) of the port to create and return :param kwargs: constructor arguments that will be used *only* for the construction of the terminal Namespace :returns: Namespace :raises: ValueError if any sub namespace is occupied by a non-Namespace port """ if not isinstance(name, str): raise ValueError('name has to be a string type, not {}'.format(type(name))) if not name: raise ValueError('name cannot be an empty string') namespace = name.split(self.namespace_separator) port_name = namespace.pop(0) path = '{}{}{}'.format(self._path, self.namespace_separator, port_name) # If this is True, the (sub) port namespace does not yet exist, so we create it if port_name not in self: # If there still is a `namespace`, we create a sub namespace, *without* the constructor arguments if namespace: self[port_name] = self.__class__(port_name, path=path, is_leaf=False) # Otherwise it is the terminal port and we construct *with* the keyword arugments else: kwargs['is_leaf'] = True self[port_name] = self.__class__(port_name, path=path, **kwargs) else: # The port does already exist: if it is a leaf and `namespace` is not empty, then the current leaf node is # also a namespace itself, so create a namespace with the same name and put the leaf within itself if self[port_name].is_leaf and namespace: clone = self[port_name] self[port_name] = self.__class__(port_name, path=path, is_leaf=False) self[port_name][port_name] = clone # If the current existing port is not a leaf and we do not have remaining namespace, that means the current # namespace is the "concrete" version of the namespace, so we add the leaf version to the namespace. elif not self[port_name].is_leaf and not namespace: kwargs['is_leaf'] = True self[port_name][port_name] = self.__class__(port_name, path='{}.{}'.format(path, port_name), **kwargs) # If there is still `namespace` left, we create the next namespace if namespace: kwargs['is_leaf'] = True return self[port_name].create_namespace(self.namespace_separator.join(namespace), **kwargs) return self[port_name]
[docs]def get_node_namespace(): """Return the full namespace of all available nodes in the current database. :return: complete node `Namespace` """ from aiida import orm from aiida.plugins.entry_point import is_valid_entry_point_string, parse_entry_point_string builder = orm.QueryBuilder().append(orm.Node, project=['node_type', 'process_type']).distinct() unique_types = {(node_type, process_type if process_type else '') for node_type, process_type in builder.all()} # First we create a flat list of all "leaf" node types. namespaces = [] for node_type, process_type in unique_types: label = None namespace = None if process_type: # Process nodes parts = node_type.rsplit('.', 2) if is_valid_entry_point_string(process_type): _, entry_point_name = parse_entry_point_string(process_type) label = entry_point_name.rpartition('.')[-1] namespace = '.'.join(parts[:-2] + [entry_point_name]) else: label = process_type.rsplit('.', 1)[-1] namespace = '.'.join(parts[:-2] + [DEFAULT_NAMESPACE_LABEL, label]) else: # Data nodes parts = node_type.rsplit('.', 2) try: label = parts[-2] namespace = '.'.join(parts[:-2]) except IndexError: continue full_type = construct_full_type(node_type, process_type) namespaces.append((namespace, label, full_type)) node_namespace = Namespace('node') for namespace, label, full_type in sorted(namespaces, key=lambda x: x[0], reverse=False): node_namespace.create_namespace(namespace, label=label, full_type=full_type) return node_namespace