Source code for

# -*- coding: utf-8 -*-
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit               #
"""Data plugin represeting an executable code to be wrapped and called through a `CalcJob` plugin."""
import os
import warnings

from aiida.common import exceptions
from aiida.common.warnings import AiidaDeprecationWarning
from .data import Data

__all__ = ('Code',)

[docs]class Code(Data): """ A code entity. It can either be 'local', or 'remote'. * Local code: it is a collection of files/dirs (added using the add_path() method), where one \ file is flagged as executable (using the set_local_executable() method). * Remote code: it is a pair (remotecomputer, remotepath_of_executable) set using the \ set_remote_computer_exec() method. For both codes, one can set some code to be executed right before or right after the execution of the code, using the set_preexec_code() and set_postexec_code() methods (e.g., the set_preexec_code() can be used to load specific modules required for the code to be run). """ # pylint: disable=too-many-public-methods
[docs] def __init__(self, remote_computer_exec=None, local_executable=None, input_plugin_name=None, files=None, **kwargs): super().__init__(**kwargs) if remote_computer_exec and local_executable: raise ValueError('cannot set `remote_computer_exec` and `local_executable` at the same time') if remote_computer_exec: self.set_remote_computer_exec(remote_computer_exec) if local_executable: self.set_local_executable(local_executable) if input_plugin_name: self.set_input_plugin_name(input_plugin_name) if files: self.set_files(files)
HIDDEN_KEY = 'hidden'
[docs] def hide(self): """ Hide the code (prevents from showing it in the verdi code list) """ self.set_extra(self.HIDDEN_KEY, True)
[docs] def reveal(self): """ Reveal the code (allows to show it in the verdi code list) By default, it is revealed """ self.set_extra(self.HIDDEN_KEY, False)
@property def hidden(self): """ Determines whether the Code is hidden or not """ return self.get_extra(self.HIDDEN_KEY, False)
[docs] def set_files(self, files): """ Given a list of filenames (or a single filename string), add it to the path (all at level zero, i.e. without folders). Therefore, be careful for files with the same name! :todo: decide whether to check if the Code must be a local executable to be able to call this function. """ if isinstance(files, str): files = [files] for filename in files: if os.path.isfile(filename): with open(filename, 'rb') as handle: self.put_object_from_filelike(handle, os.path.split(filename)[1], 'wb', encoding=None)
[docs] def __str__(self): local_str = 'Local' if self.is_local() else 'Remote' computer_str = return f"{local_str} code '{self.label}' on {computer_str}, pk: {}, uuid: {self.uuid}"
[docs] def get_computer_name(self): """Get label of this code's computer. .. deprecated:: 1.4.0 Will be removed in `v2.0.0`, use the `self.get_computer_label()` method instead. """ return self.get_computer_label()
[docs] def get_computer_label(self): """Get label of this code's computer.""" return 'repository' if self.is_local() else
@property def full_label(self): """Get full label of this code. Returns label of the form <code-label>@<computer-name>. """ return f'{self.label}@{self.get_computer_label()}' @property def label(self): """Return the node label. :return: the label """ return super().label @label.setter def label(self, value): """Set the label. :param value: the new value to set """ if '@' in str(value): msg = "Code labels must not contain the '@' symbol" raise exceptions.InputValidationError(msg) super(Code, self.__class__).label.fset(self, value) # pylint: disable=no-member
[docs] def relabel(self, new_label, raise_error=True): """Relabel this code. :param new_label: new code label :param raise_error: Set to False in order to return a list of errors instead of raising them. .. deprecated:: 1.2.0 Will remove raise_error in `v2.0.0`. Use `try/except` instead. """ # pylint: disable=unused-argument suffix = f'@{}' if new_label.endswith(suffix): new_label = new_label[:-len(suffix)] self.label = new_label
[docs] def get_description(self): """Return a string description of this Code instance. :return: string description of this Code instance """ return f'{self.description}'
[docs] @classmethod def get_code_helper(cls, label, machinename=None): """ :param label: the code label identifying the code to load :param machinename: the machine name where code is setup :raise aiida.common.NotExistent: if no code identified by the given string is found :raise aiida.common.MultipleObjectsError: if the string cannot identify uniquely a code """ from aiida.common.exceptions import NotExistent, MultipleObjectsError from aiida.orm.querybuilder import QueryBuilder from aiida.orm.computers import Computer query = QueryBuilder() query.append(cls, filters={'label': label}, project='*', tag='code') if machinename: query.append(Computer, filters={'name': machinename}, with_node='code') if query.count() == 0: raise NotExistent(f"'{label}' is not a valid code name.") elif query.count() > 1: codes = query.all(flat=True) retstr = f"There are multiple codes with label '{label}', having IDs: " retstr += f"{', '.join(sorted([str( for c in codes]))}.\n" retstr += ('Relabel them (using their ID), or refer to them with their ID.') raise MultipleObjectsError(retstr) else: return query.first()[0]
[docs] @classmethod def get(cls, pk=None, label=None, machinename=None): """ Get a Computer object with given identifier string, that can either be the numeric ID (pk), or the label (and computername) (if unique). :param pk: the numeric ID (pk) for code :param label: the code label identifying the code to load :param machinename: the machine name where code is setup :raise aiida.common.NotExistent: if no code identified by the given string is found :raise aiida.common.MultipleObjectsError: if the string cannot identify uniquely a code :raise aiida.common.InputValidationError: if neither a pk nor a label was passed in """ # pylint: disable=arguments-differ from aiida.orm.utils import load_code # first check if code pk is provided if pk: code_int = int(pk) try: return load_code(pk=code_int) except exceptions.NotExistent: raise ValueError(f'{pk} is not valid code pk') except exceptions.MultipleObjectsError: raise exceptions.MultipleObjectsError(f"More than one code in the DB with pk='{pk}'!") # check if label (and machinename) is provided elif label is not None: return cls.get_code_helper(label, machinename) else: raise exceptions.InputValidationError('Pass either pk or code label (and machinename)')
[docs] @classmethod def get_from_string(cls, code_string): """ Get a Computer object with given identifier string in the format label@machinename. See the note below for details on the string detection algorithm. .. note:: the (leftmost) '@' symbol is always used to split code and computername. Therefore do not use '@' in the code name if you want to use this function ('@' in the computer name are instead valid). :param code_string: the code string identifying the code to load :raise aiida.common.NotExistent: if no code identified by the given string is found :raise aiida.common.MultipleObjectsError: if the string cannot identify uniquely a code :raise aiida.common.InputValidationError: if code_string is not of string type """ from aiida.common.exceptions import NotExistent, MultipleObjectsError, InputValidationError try: label, _, machinename = code_string.partition('@') except AttributeError: raise InputValidationError('the provided code_string is not of valid string type') try: return cls.get_code_helper(label, machinename) except NotExistent: raise NotExistent(f'{code_string} could not be resolved to a valid code label') except MultipleObjectsError: raise MultipleObjectsError(f'{code_string} could not be uniquely resolved')
[docs] @classmethod def list_for_plugin(cls, plugin, labels=True): """ Return a list of valid code strings for a given plugin. :param plugin: The string of the plugin. :param labels: if True, return a list of code names, otherwise return the code PKs (integers). :return: a list of string, with the code names if labels is True, otherwise a list of integers with the code PKs. """ from aiida.orm.querybuilder import QueryBuilder query = QueryBuilder() query.append(cls, filters={'attributes.input_plugin': {'==': plugin}}) valid_codes = query.all(flat=True) if labels: return [c.label for c in valid_codes] return [ for c in valid_codes]
[docs] def _validate(self): super()._validate() if self.is_local() is None: raise exceptions.ValidationError('You did not set whether the code is local or remote') if self.is_local(): if not self.get_local_executable(): raise exceptions.ValidationError( 'You have to set which file is the local executable ' 'using the set_exec_filename() method' ) if self.get_local_executable() not in self.list_object_names(): raise exceptions.ValidationError( "The local executable '{}' is not in the list of " 'files of this code'.format(self.get_local_executable()) ) else: if self.list_object_names(): raise exceptions.ValidationError('The code is remote but it has files inside') if not self.get_remote_computer(): raise exceptions.ValidationError('You did not specify a remote computer') if not self.get_remote_exec_path(): raise exceptions.ValidationError('You did not specify a remote executable')
[docs] def set_prepend_text(self, code): """ Pass a string of code that will be put in the scheduler script before the execution of the code. """ self.set_attribute('prepend_text', str(code))
[docs] def get_prepend_text(self): """ Return the code that will be put in the scheduler script before the execution, or an empty string if no pre-exec code was defined. """ return self.get_attribute('prepend_text', '')
[docs] def set_input_plugin_name(self, input_plugin): """ Set the name of the default input plugin, to be used for the automatic generation of a new calculation. """ if input_plugin is None: self.set_attribute('input_plugin', None) else: self.set_attribute('input_plugin', str(input_plugin))
[docs] def get_input_plugin_name(self): """ Return the name of the default input plugin (or None if no input plugin was set. """ return self.get_attribute('input_plugin', None)
[docs] def set_append_text(self, code): """ Pass a string of code that will be put in the scheduler script after the execution of the code. """ self.set_attribute('append_text', str(code))
[docs] def get_append_text(self): """ Return the postexec_code, or an empty string if no post-exec code was defined. """ return self.get_attribute('append_text', '')
[docs] def set_local_executable(self, exec_name): """ Set the filename of the local executable. Implicitly set the code as local. """ self._set_local() self.set_attribute('local_executable', exec_name)
[docs] def get_local_executable(self): return self.get_attribute('local_executable', '')
[docs] def set_remote_computer_exec(self, remote_computer_exec): """ Set the code as remote, and pass the computer on which it resides and the absolute path on that computer. :param remote_computer_exec: a tuple (computer, remote_exec_path), where computer is a aiida.orm.Computer and remote_exec_path is the absolute path of the main executable on remote computer. """ from aiida import orm from aiida.common.lang import type_check if (not isinstance(remote_computer_exec, (list, tuple)) or len(remote_computer_exec) != 2): raise ValueError( 'remote_computer_exec must be a list or tuple ' 'of length 2, with machine and executable ' 'name' ) computer, remote_exec_path = tuple(remote_computer_exec) if not os.path.isabs(remote_exec_path): raise ValueError('exec_path must be an absolute path (on the remote machine)') type_check(computer, orm.Computer) self._set_remote() = computer self.set_attribute('remote_exec_path', remote_exec_path)
[docs] def get_remote_exec_path(self): if self.is_local(): raise ValueError('The code is local') return self.get_attribute('remote_exec_path', '')
[docs] def get_remote_computer(self): if self.is_local(): raise ValueError('The code is local') return
[docs] def _set_local(self): """ Set the code as a 'local' code, meaning that all the files belonging to the code will be copied to the cluster, and the file set with set_exec_filename will be run. It also deletes the flags related to the local case (if any) """ self.set_attribute('is_local', True) = None try: self.delete_attribute('remote_exec_path') except AttributeError: pass
[docs] def _set_remote(self): """ Set the code as a 'remote' code, meaning that the code itself has no files attached, but only a location on a remote computer (with an absolute path of the executable on the remote computer). It also deletes the flags related to the local case (if any) """ self.set_attribute('is_local', False) try: self.delete_attribute('local_executable') except AttributeError: pass
[docs] def is_local(self): """ Return True if the code is 'local', False if it is 'remote' (see also documentation of the set_local and set_remote functions). """ return self.get_attribute('is_local', None)
[docs] def can_run_on(self, computer): """ Return True if this code can run on the given computer, False otherwise. Local codes can run on any machine; remote codes can run only on the machine on which they reside. TODO: add filters to mask the remote machines on which a local code can run. """ from aiida import orm from aiida.common.lang import type_check if self.is_local(): return True type_check(computer, orm.Computer) return == self.get_remote_computer().id
[docs] def get_execname(self): """ Return the executable string to be put in the script. For local codes, it is ./LOCAL_EXECUTABLE_NAME For remote codes, it is the absolute path to the executable. """ if self.is_local(): return f'./{self.get_local_executable()}' return self.get_remote_exec_path()
[docs] def get_builder(self): """Create and return a new `ProcessBuilder` for the `CalcJob` class of the plugin configured for this code. The configured calculation plugin class is defined by the `get_input_plugin_name` method. .. note:: it also sets the ``builder.code`` value. :return: a `ProcessBuilder` instance with the `code` input already populated with ourselves :raise aiida.common.EntryPointError: if the specified plugin does not exist. :raise ValueError: if no default plugin was specified. """ from aiida.plugins import CalculationFactory plugin_name = self.get_input_plugin_name() if plugin_name is None: raise ValueError('no default calculation input plugin specified for this code') try: process_class = CalculationFactory(plugin_name) except exceptions.EntryPointError: raise exceptions.EntryPointError(f'the calculation entry point `{plugin_name}` could not be loaded') builder = process_class.get_builder() builder.code = self return builder
[docs] def get_full_text_info(self, verbose=False): """Return a list of lists with a human-readable detailed information on this code. .. deprecated:: 1.4.0 Will be removed in `v2.0.0`. :return: list of lists where each entry consists of two elements: a key and a value """ warnings.warn('this property is deprecated', AiidaDeprecationWarning) # pylint: disable=no-member from aiida.repository import FileType result = [] result.append(['PK',]) result.append(['UUID', self.uuid]) result.append(['Label', self.label]) result.append(['Description', self.description]) result.append(['Default plugin', self.get_input_plugin_name()]) if verbose: result.append(['Calculations', len(self.get_outgoing().all())]) if self.is_local(): result.append(['Type', 'local']) result.append(['Exec name', self.get_execname()]) result.append(['List of files/folders:', '']) for obj in self.list_objects(): if obj.file_type == FileType.DIRECTORY: result.append(['directory',]) else: result.append(['file',]) else: result.append(['Type', 'remote']) result.append(['Remote machine', self.get_remote_computer().label]) result.append(['Remote absolute path', self.get_remote_exec_path()]) if self.get_prepend_text().strip(): result.append(['Prepend text', '']) for line in self.get_prepend_text().split('\n'): result.append(['', line]) else: result.append(['Prepend text', 'No prepend text']) if self.get_append_text().strip(): result.append(['Append text', '']) for line in self.get_append_text().split('\n'): result.append(['', line]) else: result.append(['Append text', 'No append text']) return result