Source code for aiida.tools.dbexporters.tcod

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################


from aiida.orm import DataFactory
from aiida.orm.calculation.inline import optional_inline

aiida_executable_name = '_aiidasubmit.sh'
inline_executable_name = 'aiidainline.py'

tcod_loops = {
    '_tcod_file': [
        '_tcod_file_id',
        '_tcod_file_name',
        '_tcod_file_md5sum',
        '_tcod_file_sha1sum',
        '_tcod_file_URI',
        '_tcod_file_role',
        '_tcod_file_contents',
        '_tcod_file_content_encoding',
    ],
    '_tcod_computation': [
        '_tcod_computation_step',
        '_tcod_computation_command',
        '_tcod_computation_reference_uuid',
        '_tcod_computation_environment',
        '_tcod_computation_stdout',
        '_tcod_computation_stderr',
    ],
    '_tcod_content_encoding': [
        '_tcod_content_encoding_id',
        '_tcod_content_encoding_layer_id',
        '_tcod_content_encoding_layer_type',
    ],
    '_audit_conform': [
        '_audit_conform_dict_location',
        '_audit_conform_dict_name',
        '_audit_conform_dict_version',
    ],
    '_dft_atom_basisset': [
        '_atom_type_symbol',
        '_dft_atom_basisset',
        '_dft_atom_basisset_type',
        '_dft_atom_basisset_energy_conv',
        '_dft_atom_basisset_citation_id',
        '_dft_atom_type_valence_configuration',
    ],
    '_tcod_atom_site_resid_force_Cartn_': [
        '_tcod_atom_site_resid_force_Cartn_x',
        '_tcod_atom_site_resid_force_Cartn_y',
        '_tcod_atom_site_resid_force_Cartn_z',
    ],
    '_dft_pseudopotential_': [
        '_dft_pseudopotential_atom_type',
        '_dft_pseudopotential_type',
        '_dft_pseudopotential_type_other_name',
    ],
}

conforming_dictionaries = [
    {
        'name': 'cif_tcod.dic',
        'version': '0.010',
        'url': 'http://www.crystallography.net/tcod/cif/dictionaries/cif_tcod.dic'
    },
    {
        'name': 'cif_dft.dic',
        'version': '0.020',
        'url': 'http://www.crystallography.net/tcod/cif/dictionaries/cif_dft.dic'
    }
]

default_options = {
    'code': 'cif_cod_deposit',
    'dump_aiida_database': True,
    'exclude_external_contents': False,
    'gzip': False,
    'gzip_threshold': 1024,
    'reduce_symmetry': True,
}


[docs]def cif_encode_contents(content, gzip=False, gzip_threshold=1024): """ Encodes data for usage in CIF text field in a *best possible* way: binary data is encoded using Base64 encoding; text with non-ASCII symbols, too long lines or lines starting with semicolons (';') is encoded using Quoted-printable encoding. :param content: the content to be encoded :return content: encoded content :return encoding: a string specifying used encoding (None, 'base64', 'ncr', 'quoted-printable', 'gzip+base64') """ import re method = None if len(content) == 0: # content is empty method = None elif gzip and len(content) >= gzip_threshold: # content is larger than some arbitrary value and should be gzipped method = 'gzip+base64' elif float(len(re.findall('[^\x09\x0A\x0D\x20-\x7E]', content)))/len(content) > 0.25: # contents are assumed to be binary method = 'base64' elif re.search('^\s*data_',content) is not None or \ re.search('\n\s*data_',content) is not None: # contents have CIF datablock header-like lines, that may be # dangerous when parsed with primitive parsers method = 'base64' elif re.search('.{2048}.',content) is not None: # lines are too long method = 'quoted-printable' elif len(re.findall('[^\x09\x0A\x0D\x20-\x7E]', content)) > 0: # contents have non-ASCII symbols method = 'quoted-printable' elif re.search('^;', content) is not None or re.search('\n;', content) is not None: # content has lines starting with semicolon (';') method = 'quoted-printable' elif re.search('\t', content) is not None: # content has TAB symbols, which may be lost during the # parsing of TCOD CIF file method = 'quoted-printable' elif content == '.' or content == '?': method = 'quoted-printable' else: method = None if method == 'base64': content = encode_textfield_base64(content) elif method == 'quoted-printable': content = encode_textfield_quoted_printable(content) elif method == 'ncr': content = encode_textfield_ncr(content) elif method == 'gzip+base64': content = encode_textfield_gzip_base64(content) return content, method
[docs]def encode_textfield_base64(content, foldwidth=76): """ Encodes the contents for CIF textfield in Base64 using standard Python implementation (``base64.standard_b64encode()``). :param content: a string with contents :param foldwidth: maximum width of line (default is 76) :return: encoded string """ import base64 content = base64.standard_b64encode(content) content = "\n".join(list(content[i:i + foldwidth] for i in range(0, len(content), foldwidth))) return content
[docs]def decode_textfield_base64(content): """ Decodes the contents for CIF textfield from Base64 using standard Python implementation (``base64.standard_b64decode()``) :param content: a string with contents :return: decoded string """ import base64 return base64.standard_b64decode(content)
[docs]def encode_textfield_quoted_printable(content): """ Encodes the contents for CIF textfield in quoted-printable encoding. In addition to non-ASCII characters, that are encoded by Python function ``quopri.encodestring()``, following characters are encoded: * '``;``', if encountered on the beginning of the line; * '``\\t``' and '``\\r``'; * '``.``' and '``?``', if comprise the entire textfield. :param content: a string with contents :return: encoded string """ import re import quopri content = quopri.encodestring(content) def match2qp(m): prefix = '' postfix = '' if 'prefix' in m.groupdict().keys(): prefix = m.group('prefix') if 'postfix' in m.groupdict().keys(): postfix = m.group('postfix') h = hex(ord(m.group('chr')))[2:].upper() if len(h) == 1: h = "0{}".format(h) return "{}={}{}".format(prefix, h, postfix) content = re.sub('^(?P<chr>;)', match2qp, content) content = re.sub('(?P<chr>[\t\r])', match2qp, content) content = re.sub('(?P<prefix>\n)(?P<chr>;)', match2qp, content) content = re.sub('^(?P<chr>[\.\?])$', match2qp, content) return content
[docs]def decode_textfield_quoted_printable(content): """ Decodes the contents for CIF textfield from quoted-printable encoding. :param content: a string with contents :return: decoded string """ import quopri return quopri.decodestring(content)
[docs]def encode_textfield_ncr(content): """ Encodes the contents for CIF textfield in Numeric Character Reference. Encoded characters: * ``\\x09``, ``\\x0A``, ``\\x0D``, ``\\x20``--``\\x7E``; * '``;``', if encountered on the beginning of the line; * '``\\t``' * '``.``' and '``?``', if comprise the entire textfield. :param content: a string with contents :return: encoded string """ import re def match2ncr(m): prefix = '' postfix = '' if 'prefix' in m.groupdict().keys(): prefix = m.group('prefix') if 'postfix' in m.groupdict().keys(): postfix = m.group('postfix') return prefix + '&#' + str(ord(m.group('chr'))) + ';' + postfix content = re.sub('(?P<chr>[&\t])', match2ncr, content) content = re.sub('(?P<chr>[^\x09\x0A\x0D\x20-\x7E])', match2ncr, content) content = re.sub('^(?P<chr>;)', match2ncr, content) content = re.sub('(?P<prefix>\n)(?P<chr>;)', match2ncr, content) content = re.sub('^(?P<chr>[\.\?])$', match2ncr, content) return content
[docs]def decode_textfield_ncr(content): """ Decodes the contents for CIF textfield from Numeric Character Reference. :param content: a string with contents :return: decoded string """ import re def match2str(m): return chr(int(m.group(1))) return re.sub('&#(\d+);', match2str, content)
[docs]def encode_textfield_gzip_base64(content, **kwargs): """ Gzips the given string and encodes it in Base64. :param content: a string with contents :return: encoded string """ from aiida.common.utils import gzip_string return encode_textfield_base64(gzip_string(content), **kwargs)
[docs]def decode_textfield_gzip_base64(content): """ Decodes the contents for CIF textfield from Base64 and decompresses them with gzip. :param content: a string with contents :return: decoded string """ from aiida.common.utils import gunzip_string return gunzip_string(decode_textfield_base64(content))
[docs]def decode_textfield(content,method): """ Decodes the contents of encoded CIF textfield. :param content: the content to be decoded :param method: method, which was used for encoding the contents (None, 'base64', 'ncr', 'quoted-printable', 'gzip+base64') :return: decoded content :raises ValueError: if the encoding method is unknown """ if method == 'base64': content = decode_textfield_base64(content) elif method == 'quoted-printable': content = decode_textfield_quoted_printable(content) elif method == 'ncr': content = decode_textfield_ncr(content) elif method == 'gzip+base64': content = decode_textfield_gzip_base64(content) elif method is not None: raise ValueError("Unknown content encoding: '{}'".format(method)) return content
def _get_calculation(node): """ Gets the parent (immediate) calculation, attached as the input of the node. :param node: an instance of subclass of :py:class:`aiida.orm.node.Node` :return: an instance of subclass of :py:class:`aiida.orm.calculation.Calculation` :raises MultipleObjectsError: if the node has more than one calculation attached. """ from aiida.common.exceptions import MultipleObjectsError from aiida.orm.calculation import Calculation from aiida.common.links import LinkType if len(node.get_inputs(node_type=Calculation, link_type=LinkType.CREATE)) == 1: return node.get_inputs(node_type=Calculation, link_type=LinkType.CREATE)[0] elif len(node.get_inputs(node_type=Calculation, link_type=LinkType.CREATE)) == 0: return None else: raise MultipleObjectsError("Node {} seems to have more than one " "parent (immediate) calculation -- " "exporter does not know which one of " "them produced the node".format(node)) def _assert_same_parents(a, b): """ Checks whether two supplied nodes have the same immediate parent. Can be used to check whether two data nodes originate from the same calculation. :param a: an instance of subclass of :py:class:`aiida.orm.node.Node` :param b: an instance of subclass of :py:class:`aiida.orm.node.Node` :raises ValueError: if the condition is not met. """ if a is None or b is None: return if _get_calculation(a) is None or _get_calculation(b) is None: raise ValueError("Either the exported node or parameters does " "not originate from a calculation -- this is " "not allowed, as the proper relation between " "these two objects can not be traced") if _get_calculation(a).pk != _get_calculation(b).pk: raise ValueError("Exported node and parameters must " "originate from the same calculation") def _inline_to_standalone_script(calc): """ Create executable Python script for execution of inline script. .. note:: the output bash script may not always be correct, since it is simply formed from: * contents of the file, which contains the original ``\*_inline`` function; * call of the original ``\*_inline`` function with input nodes; * storing of the output nodes. Execution of generated bash script should result in ModificationNotAllowed exception, since the nodes, that are created by the ``\*_inline`` function, are already stored. """ input_dict = calc.get_inputs_dict() args = ["{}=load_node('{}')".format(x, input_dict[x].uuid) for x in input_dict.keys()] args_string = ",\n ".join(sorted(args)) code_string = calc.get_attr('source_file').encode('utf-8') if calc.get_attr('namespace', '__main__').startswith('aiida.'): code_string = "from {} import {}".format(calc.get_attr('namespace', '__main__'), calc.get_attr('function_name','f')) return """#!/usr/bin/env runaiida {} for key, value in {}( {} ).iteritems(): value.store() """.format(code_string, calc.get_attr('function_name','f'), args_string) def _collect_calculation_data(calc): """ Recursively collects calculations from the tree, starting at given calculation. """ from aiida.common.links import LinkType from aiida.orm.data import Data from aiida.orm.calculation import Calculation from aiida.orm.calculation.job import JobCalculation from aiida.orm.calculation.work import WorkCalculation from aiida.orm.calculation.inline import InlineCalculation import hashlib import os calcs_now = [] for d in calc.get_inputs(node_type=Data, link_type=LinkType.INPUT): for c in d.get_inputs(node_type=Calculation, link_type=LinkType.CREATE): calcs = _collect_calculation_data(c) calcs_now.extend(calcs) files_in = [] files_out = [] this_calc = { 'uuid' : calc.uuid, 'files': [], } if isinstance(calc, JobCalculation): retrieved_abspath = calc.get_retrieved_node().get_abs_path() files_in = _collect_files(calc._raw_input_folder.abspath) files_out = _collect_files(os.path.join(retrieved_abspath, 'path')) this_calc['env'] = calc.get_environment_variables() stdout_name = '{}.out'.format(aiida_executable_name) while stdout_name in [files_in,files_out]: stdout_name = '_{}'.format(stdout_name) stderr_name = '{}.err'.format(aiida_executable_name) while stderr_name in [files_in,files_out]: stderr_name = '_{}'.format(stderr_name) if calc.get_scheduler_output() is not None: files_out.append({ 'name' : stdout_name, 'contents': calc.get_scheduler_output(), 'md5' : hashlib.md5(calc.get_scheduler_output()).hexdigest(), 'sha1' : hashlib.sha1(calc.get_scheduler_output()).hexdigest(), 'role' : 'stdout', 'type' : 'file', }) this_calc['stdout'] = stdout_name if calc.get_scheduler_error() is not None: files_out.append({ 'name' : stderr_name, 'contents': calc.get_scheduler_error(), 'md5' : hashlib.md5(calc.get_scheduler_error()).hexdigest(), 'sha1' : hashlib.sha1(calc.get_scheduler_error()).hexdigest(), 'role' : 'stderr', 'type' : 'file', }) this_calc['stderr'] = stderr_name elif isinstance(calc, InlineCalculation): # Calculation is InlineCalculation python_script = _inline_to_standalone_script(calc) files_in.append({ 'name' : inline_executable_name, 'contents': python_script, 'md5' : hashlib.md5(python_script).hexdigest(), 'sha1' : hashlib.sha1(python_script).hexdigest(), 'type' : 'file', }) shell_script = '#!/bin/bash\n\nverdi run {}\n'.format(inline_executable_name) files_in.append({ 'name' : aiida_executable_name, 'contents': shell_script, 'md5' : hashlib.md5(shell_script).hexdigest(), 'sha1' : hashlib.sha1(shell_script).hexdigest(), 'type' : 'file', }) elif isinstance(calc, WorkCalculation): # We do not know how to recreate a WorkCalculation so we pass pass else: raise ValueError('calculation is of an unexpected type {}'.format(type(calc))) for f in files_in: if os.path.basename(f['name']) == aiida_executable_name: f['role'] = 'script' else: f['role'] = 'input' this_calc['files'].append(f) for f in files_out: if os.path.basename(f['name']) != calc._SCHED_OUTPUT_FILE and \ os.path.basename(f['name']) != calc._SCHED_ERROR_FILE: if 'role' not in f.keys(): f['role'] = 'output' this_calc['files'].append(f) calcs_now.append(this_calc) return calcs_now def _collect_files(base, path=''): """ Recursively collects files from the tree, starting at a given path. """ from aiida.common.folders import Folder from aiida.common.utils import md5_file,sha1_file import os def get_filename(file_dict): return file_dict['name'] if os.path.isdir(os.path.join(base,path)): folder = Folder(os.path.join(base,path)) files_now = [] if path != '': if not path.endswith(os.sep): path = "{}{}".format(path,os.sep) if path != '': files_now.append({ 'name': path, 'type': 'folder', }) for f in folder.get_content_list(): files = _collect_files(base,path=os.path.join(path,f)) files_now.extend(files) return sorted(files_now,key=get_filename) elif path == '.aiida/calcinfo.json': files = [] with open(os.path.join(base,path)) as f: files.append({ 'name': path, 'contents': f.read(), 'md5': md5_file(os.path.join(base,path)), 'sha1': sha1_file(os.path.join(base,path)), 'type': 'file', }) import json with open(os.path.join(base,path)) as f: calcinfo = json.load(f) if 'local_copy_list' in calcinfo: for local_copy in calcinfo['local_copy_list']: with open(local_copy[0]) as f: files.append({ 'name': os.path.normpath(local_copy[1]), 'contents': f.read(), 'md5': md5_file(local_copy[0]), 'sha1': sha1_file(local_copy[0]), 'type': 'file', }) return files else: with open(os.path.join(base,path)) as f: return [{ 'name': path, 'contents': f.read(), 'md5': md5_file(os.path.join(base,path)), 'sha1': sha1_file(os.path.join(base,path)), 'type': 'file', }]
[docs]def extend_with_cmdline_parameters(parser, expclass="Data"): """ Provides descriptions of command line options, that are used to control the process of exporting data to TCOD CIF files. :param parser: an argparse.Parser instance :param expclass: name of the exported class to be shown in help string for the command line options .. note:: This method must not set any default values for command line options in order not to clash with any other data export plugins. """ parser.add_argument('--reduce-symmetry', action='store_true', default=None, dest='reduce_symmetry', help="Perform symmetry reduction. " "Default option.") parser.add_argument('--no-reduce-symmetry', '--dont-reduce-symmetry', default=None, action='store_false', dest='reduce_symmetry', help="Do not perform symmetry reduction.") parser.add_argument('--parameter-data', type=int, default=None, help="ID of the ParameterData to be exported " "alongside the {} instance. " "By default, if {} originates from " "a calculation with single ParameterData " "in the output, aforementioned " "ParameterData is picked automatically. " "Instead, the option is used in the case " "the calculation produces more than a " "single instance of " "ParameterData.".format(expclass,expclass)) parser.add_argument('--dump-aiida-database', action='store_true', default=None, dest='dump_aiida_database', help="Export AiiDA database to the CIF file. " "Default option.") parser.add_argument('--no-dump-aiida-database', '--dont-dump-aiida-database', default=None, action='store_false', dest='dump_aiida_database', help="Do not export AiiDA database to the CIF " "file.") parser.add_argument('--exclude-external-contents', action='store_true', default=None, dest='exclude_external_contents', help="Do not save contents for external " "resources if URIs are provided. " "Default option.") parser.add_argument('--no-exclude-external-contents', '--dont-exclude-external-contents', default=None, action='store_false', dest='exclude_external_contents', help="Save contents for external resources " "even if URIs are provided.") parser.add_argument('--gzip', action='store_true', dest='gzip', default=None, help="Gzip large files.") parser.add_argument('--no-gzip', '--dont-gzip', action='store_false', default=None, dest='gzip', help="Do not gzip any files. Default option.") parser.add_argument('--gzip-threshold', type=int, default=None, help="Specify the minimum size of exported " "file which should be gzipped. " "Default {}.".format(default_options['gzip_threshold']))
def _collect_tags(node, calc,parameters=None, dump_aiida_database=default_options['dump_aiida_database'], exclude_external_contents=default_options['exclude_external_contents'], gzip=default_options['gzip'], gzip_threshold=default_options['gzip_threshold']): """ Retrieve metadata from attached calculation and pseudopotentials and prepare it to be saved in TCOD CIF. """ from aiida.common.links import LinkType import os, json import aiida tags = { '_audit_creation_method': "AiiDA version {}".format(aiida.__version__) } # Recording the dictionaries (if any) if len(conforming_dictionaries): for postfix in ['name', 'version', 'location']: key = '_audit_conform_dict_{}'.format(postfix) if key not in tags: tags[key] = [] for dictionary in conforming_dictionaries: tags['_audit_conform_dict_name'].append(dictionary['name']) tags['_audit_conform_dict_version'].append(dictionary['version']) tags['_audit_conform_dict_location'].append(dictionary['url']) # Collecting metadata from input files: calc_data = [] if calc is not None: calc_data = _collect_calculation_data(calc) for tag in tcod_loops['_tcod_computation'] + tcod_loops['_tcod_file']: tags[tag] = [] export_files = [] sn = 1 for step in calc_data: tags['_tcod_computation_step'].append(sn) tags['_tcod_computation_command'].append( 'cd {}; ./{}'.format(sn,aiida_executable_name)) tags['_tcod_computation_reference_uuid'].append(step['uuid']) if 'env' in step: tags['_tcod_computation_environment'].append( "\n".join(["%s=%s" % (key,step['env'][key]) for key in step['env']])) else: tags['_tcod_computation_environment'].append('') if 'stdout' in step and step['stdout'] is not None: tags['_tcod_computation_stdout'].append(step['stdout']) else: tags['_tcod_computation_stdout'].append('') if 'stderr' in step and step['stderr'] is not None: tags['_tcod_computation_stderr'].append(step['stderr']) else: tags['_tcod_computation_stderr'].append('') export_files.append( {'name': "{}{}".format(sn, os.sep), 'type': 'folder'} ) for f in step['files']: f['name'] = os.path.join(str(sn), f['name']) export_files.extend( step['files'] ) sn = sn + 1 # Creating importable AiiDA database dump in CIF tags if dump_aiida_database and node.is_stored: import json from aiida.common.exceptions import LicensingException from aiida.common.folders import SandboxFolder from aiida.orm.importexport import export_tree with SandboxFolder() as folder: try: export_tree([node.dbnode], folder=folder, silent=True, allowed_licenses=['CC0']) except LicensingException as e: raise LicensingException(e.message + \ ". Only CC0 license is accepted.") files = _collect_files(folder.abspath) with open(folder.get_abs_path('data.json')) as f: data = json.loads(f.read()) md5_to_url = {} if exclude_external_contents: for pk in data['node_attributes']: n = data['node_attributes'][pk] if 'md5' in n.keys() and 'source' in n.keys() and \ 'uri' in n['source'].keys(): md5_to_url[n['md5']] = n['source']['uri'] for f in files: f['name'] = os.path.join('aiida',f['name']) if f['type'] == 'file' and f['md5'] in md5_to_url.keys(): f['uri'] = md5_to_url[f['md5']] export_files.extend(files) # Describing seen files in _tcod_file_* loop encodings = list() fn = 0 for f in export_files: # ID and name tags['_tcod_file_id'].append(fn) tags['_tcod_file_name'].append(f['name']) # Checksums md5sum = None sha1sum = None if f['type'] == 'file': md5sum = f['md5'] sha1sum = f['sha1'] else: md5sum = '.' sha1sum = '.' tags['_tcod_file_md5sum'].append(md5sum) tags['_tcod_file_sha1sum'].append(sha1sum) # Content, encoding and URI contents = '?' encoding = None if 'uri' in f.keys(): contents = '.' tags['_tcod_file_URI'].append(f['uri']) else: tags['_tcod_file_URI'].append('?') if f['type'] == 'file': contents,encoding = \ cif_encode_contents(f['contents'], gzip=gzip, gzip_threshold=gzip_threshold) else: contents = '.' if encoding is None: encoding = '.' elif encoding not in encodings: encodings.append(encoding) tags['_tcod_file_contents'].append(contents) tags['_tcod_file_content_encoding'].append(encoding) # Role role = '?' if 'role' in f.keys(): role = f['role'] tags['_tcod_file_role'].append(role) fn = fn + 1 # Describing the encodings if encodings: for tag in tcod_loops['_tcod_content_encoding']: tags[tag] = [] for encoding in encodings: layers = encoding.split('+') for i in range(0, len(layers)): tags['_tcod_content_encoding_id'].append(encoding) tags['_tcod_content_encoding_layer_id'].append(i+1) tags['_tcod_content_encoding_layer_type'].append(layers[i]) # Describing Brillouin zone (if used) if calc is not None: from aiida.orm.data.array.kpoints import KpointsData kpoints_list = calc.get_inputs(KpointsData, link_type=LinkType.INPUT) # TODO: stop if more than one KpointsData is used? if len(kpoints_list) == 1: kpoints = kpoints_list[0] density, shift = kpoints.get_kpoints_mesh() tags['_dft_BZ_integration_grid_X'] = density[0] tags['_dft_BZ_integration_grid_Y'] = density[1] tags['_dft_BZ_integration_grid_Z'] = density[2] tags['_dft_BZ_integration_grid_shift_X'] = shift[0] tags['_dft_BZ_integration_grid_shift_Y'] = shift[1] tags['_dft_BZ_integration_grid_shift_Z'] = shift[2] from aiida.common.exceptions import MultipleObjectsError from aiida.common.pluginloader import all_plugins, get_plugin category = 'tools.dbexporters.tcod_plugins' plugins = list() if calc is not None: for entry_point in all_plugins(category): plugin = get_plugin(category, entry_point) if calc._plugin_type_string.endswith(plugin._plugin_type_string + '.'): plugins.append(plugin) if len(plugins) > 1: raise MultipleObjectsError('more than one plugin found for {}' .format(calc._plugin_type_string)) if len(plugins) == 1: plugin = plugins[0] translated_tags = translate_calculation_specific_values(calc, plugin) tags.update(translated_tags) return tags @optional_inline def add_metadata_inline(what, node=None, parameters=None, args=None): """ Add metadata of original exported node to the produced TCOD CIF. :param what: an original exported node. :param node: a :py:class:`aiida.orm.data.cif.CifData` instance. :param parameters: a :py:class:`aiida.orm.data.parameter.ParameterData` instance, produced by the same calculation as the original exported node. :param args: a :py:class:`aiida.orm.data.parameter.ParameterData` instance, containing parameters for the control of metadata collection and inclusion in the produced :py:class:`aiida.orm.data.cif.CifData`. :return: dict with :py:class:`aiida.orm.data.cif.CifData` :raises ValueError: if tags present in ``args.get_dict()['additional_tags']`` are not valid CIF tags. .. note:: can be used as inline calculation. """ from aiida.orm.data.cif import pycifrw_from_cif CifData = DataFactory('cif') if not node: node = what calc = _get_calculation(what) datablocks = [] loops = {} dataname = node.values.keys()[0] datablock = dict() for tag in node.values[dataname].keys(): datablock[tag] = node.values[dataname][tag] datablocks.append(datablock) for loop in node.values[dataname].loops: loops[loop.keys()[0]] = loop.keys() # Unpacking the kwargs from ParameterData kwargs = {} additional_tags = {} datablock_names = None if args: kwargs = args.get_dict() additional_tags = kwargs.pop('additional_tags',{}) datablock_names = kwargs.pop('datablock_names',None) tags = _collect_tags(what, calc, parameters=parameters, **kwargs) loops.update(tcod_loops) for datablock in datablocks: for k,v in dict(tags.items() + additional_tags.items()).iteritems(): if not k.startswith('_'): raise ValueError("Tag '{}' does not seem to start with " "an underscode ('_'): all CIF tags must " "start with underscores".format(k)) datablock[k] = v values = pycifrw_from_cif(datablocks, loops, names=datablock_names) cif = CifData(values=values) return {'cif': cif}
[docs]def export_cif(what, **kwargs): """ Exports given coordinate-containing \*Data node to string of CIF format. :return: string with contents of CIF file. """ cif = export_cifnode(what, **kwargs) return cif._exportstring('cif')[0]
[docs]def export_values(what, **kwargs): """ Exports given coordinate-containing \*Data node to PyCIFRW CIF data structure. :return: CIF data structure. .. note:: Requires PyCIFRW. """ cif = export_cifnode(what,**kwargs) return cif.values
[docs]def export_cifnode(what, parameters=None, trajectory_index=None, store=False, reduce_symmetry=default_options['reduce_symmetry'], **kwargs): """ The main exporter function. Exports given coordinate-containing \*Data node to :py:class:`aiida.orm.data.cif.CifData` node, ready to be exported to TCOD. All \*Data types, having method ``_get_cif()``, are supported in addition to :py:class:`aiida.orm.data.cif.CifData`. :param what: data node to be exported. :param parameters: a :py:class:`aiida.orm.data.parameter.ParameterData` instance, produced by the same calculation as the original exported node. :param trajectory_index: a step to be converted and exported in case a :py:class:`aiida.orm.data.array.trajectory.TrajectoryData` is exported. :param store: boolean indicating whether to store intermediate nodes or not. Default False. :param dump_aiida_database: boolean indicating whether to include the dump of AiiDA database (containing only transitive closure of the exported node). Default True. :param exclude_external_contents: boolean indicating whether to exclude nodes from AiiDA database dump, that are taken from external repositores and have a URL link allowing to refetch their contents. Default False. :param gzip: boolean indicating whether to Gzip large CIF text fields. Default False. :param gzip_threshold: integer indicating the maximum size (in bytes) of uncompressed CIF text fields when the **gzip** option is in action. Default 1024. :return: a :py:class:`aiida.orm.data.cif.CifData` node. """ from aiida.common.links import LinkType from aiida.common.exceptions import MultipleObjectsError from aiida.orm.calculation.inline import make_inline CifData = DataFactory('cif') StructureData = DataFactory('structure') TrajectoryData = DataFactory('array.trajectory') ParameterData = DataFactory('parameter') calc = _get_calculation(what) if parameters is not None: if not isinstance(parameters, ParameterData): raise ValueError("Supplied parameters are not an " "instance of ParameterData") elif calc is not None: params = calc.get_outputs(type=ParameterData, link_type=LinkType.CREATE) if len(params) == 1: parameters = params[0] elif len(params) > 0: raise MultipleObjectsError("Calculation {} has more than " "one ParameterData output, please " "specify which one to use with " "an option parameters='' when " "calling export_cif()".format(calc)) if parameters is not None: _assert_same_parents(what, parameters) node = what # Convert node to CifData (if required) if not isinstance(node, CifData) and getattr(node, '_get_cif'): function_args = { 'store': store } if trajectory_index is not None: function_args['index'] = trajectory_index node = node._get_cif(**function_args) if not isinstance(node,CifData): raise NotImplementedError("Exporter does not know how to " "export {}".format(type(node))) # Reduction of the symmetry if reduce_symmetry: from aiida.orm.data.cif import refine_inline ret_dict = refine_inline(node=node, store=store) node = ret_dict['cif'] # Addition of the metadata args = ParameterData(dict=kwargs) function_args = { 'what': what, 'args': args, 'store': store } if node != what: function_args['node'] = node if parameters is not None: function_args['parameters'] = parameters ret_dict = add_metadata_inline(**function_args) return ret_dict['cif']
[docs]def deposit(what, type, author_name=None, author_email=None, url=None, title=None, username=None, password=False, user_email=None, code_label=default_options['code'], computer_name=None, replace=None, message=None, **kwargs): """ Launches a :py:class:`aiida.orm.implementation.general.calculation.job.AbstractJobCalculation` to deposit data node to \*COD-type database. :return: launched :py:class:`aiida.orm.implementation.general.calculation.job.AbstractJobCalculation` instance. :raises ValueError: if any of the required parameters are not given. """ from aiida.common.setup import get_property parameters = {} if not what: raise ValueError("Node to be deposited is not supplied") if not type: raise ValueError("Deposition type is not supplied. Should be " "one of the following: 'published', " "'prepublication' or 'personal'") if not username: username = get_property('tcod.depositor_username') if not username: raise ValueError("Depositor username is not supplied") if not password: parameters['password'] = get_property('tcod.depositor_password') if not parameters['password']: raise ValueError("Depositor password is not supplied") if not user_email: user_email = get_property('tcod.depositor_email') if not user_email: raise ValueError("Depositor email is not supplied") parameters['deposition-type'] = type parameters['username'] = username parameters['user_email'] = user_email if type == 'published': pass elif type in ['prepublication','personal']: if not author_name: author_name = get_property('tcod.depositor_author_name') if not author_name: raise ValueError("Author name is not supplied") if not author_email: author_email = get_property('tcod.depositor_author_email') if not author_email: raise ValueError("Author email is not supplied") if not title: raise ValueError("Publication title is not supplied") else: raise ValueError("Unknown deposition type '{}' -- should be " "one of the following: 'published', " "'prepublication' or 'personal'".format(type)) if replace: if str(int(replace)) != replace or int(replace) < 10000000 \ or int(replace) > 99999999: raise ValueError("ID of the replaced structure ({}) does not " "seem to be valid TCOD ID: must be in " "range [10000000,99999999]".format(replace)) elif message: raise ValueError("Message is given while the structure is not " "redeposited -- log message is relevant to " "redeposition only") kwargs['additional_tags'] = {} if title: kwargs['additional_tags']['_publ_section_title'] = title if author_name: kwargs['additional_tags']['_publ_author_name'] = author_name if replace: kwargs['additional_tags']['_tcod_database_code'] = replace kwargs['datablock_names'] = [replace] cif = export_cifnode(what, store=True, **kwargs) from aiida.orm.code import Code from aiida.orm.computer import Computer from aiida.orm.data.parameter import ParameterData from aiida.common.exceptions import NotExistent code = Code.get_from_string(code_label) computer = None if computer_name: computer = Computer.get(computer_name) calc = code.new_calc(computer=computer) calc.set_resources({'num_machines': 1, 'num_mpiprocs_per_machine': 1}) if password: import getpass parameters['password'] = getpass.getpass("Password: ") if author_name: parameters['author_name'] = author_name if author_email: parameters['author_email'] = author_email if url: parameters['url'] = url if replace: parameters['replace'] = True if message: parameters['log-message'] = str(message) pd = ParameterData(dict=parameters) calc.use_cif(cif) calc.use_parameters(pd) calc.store_all() calc.submit() return calc
[docs]def deposition_cmdline_parameters(parser, expclass="Data"): """ Provides descriptions of command line options, that are used to control the process of deposition to TCOD. :param parser: an argparse.Parser instance :param expclass: name of the exported class to be shown in help string for the command line options .. note:: This method must not set any default values for command line options in order not to clash with any other data deposition plugins. """ parser.add_argument('--type', '--deposition-type', type=str, choices=['published','prepublication','personal'], help="Type of the deposition.") parser.add_argument('-u', '--username', type=str, default=None, dest='username', help="Depositor's username.") parser.add_argument('-p', '--password', action='store_true', dest='password', default=None, help="Depositor's password.") parser.add_argument('--user-email', type=str, default=None, help="Depositor's e-mail address.") parser.add_argument('--title', type=str, default=None, help="Title of the publication.") parser.add_argument('--author-name', type=str, default=None, help="Full name of the publication author.") parser.add_argument('--author-email', type=str, default=None, help="E-mail address of the publication author.") parser.add_argument('--url', type=str, help="URL of the deposition API.") parser.add_argument('--code', type=str, dest='code_label', default=None, help="Label of the code to be used for the " "deposition. Default: cif_cod_deposit.") parser.add_argument('--computer', type=str, dest='computer_name', help="Name of the computer to be used for " "deposition. Default computer is used if " "not specified.") parser.add_argument('--replace', type=str, dest='replace', help="ID of the structure to be redeposited " "(replaced), if any.") parser.add_argument('-m', '--message', type=str, dest='message', help="Description of the change (relevant for " "redepositions only.")
[docs]def translate_calculation_specific_values(calc, translator, **kwargs): """ Translates calculation-specific values from :py:class:`aiida.orm.implementation.general.calculation.job.AbstractJobCalculation` subclass to appropriate TCOD CIF tags. :param calc: an instance of :py:class:`aiida.orm.implementation.general.calculation.job.AbstractJobCalculation` subclass. :param translator: class, derived from :py:class:`aiida.tools.dbexporters.tcod_plugins.BaseTcodtranslator`. :raises ValueError: if **translator** is not derived from proper class. """ from aiida.tools.dbexporters.tcod_plugins import BaseTcodtranslator if not issubclass(translator, BaseTcodtranslator): raise ValueError("supplied translator is of class {}, while it " "must be derived from {} class".format(translator.__class__, BaseTcodtranslator.__class__)) translation_map = { '_tcod_software_package': 'get_software_package', '_tcod_software_package_version': 'get_software_package_version', '_tcod_software_package_compilation_date': 'get_software_package_compilation_timestamp', '_tcod_software_executable_path': 'get_software_executable_path', '_tcod_total_energy': 'get_total_energy', '_dft_1e_energy': 'get_one_electron_energy', '_dft_correlation_energy': 'get_exchange_correlation_energy', '_dft_ewald_energy': 'get_ewald_energy', '_dft_hartree_energy': 'get_hartree_energy', '_dft_fermi_energy': 'get_fermi_energy', '_dft_cell_valence_electrons': 'get_number_of_electrons', '_tcod_computation_wallclock_time': 'get_computation_wallclock_time', '_atom_type_symbol': 'get_atom_type_symbol', '_dft_atom_type_valence_configuration': 'get_atom_type_valence_configuration', '_dft_atom_basisset': 'get_atom_type_basisset', '_dft_BZ_integration_smearing_method': 'get_integration_smearing_method', '_dft_BZ_integration_smearing_method_other': 'get_integration_smearing_method_other', '_dft_BZ_integration_MP_order': 'get_integration_Methfessel_Paxton_order', '_dft_BZ_integration_grid_X': 'get_BZ_integration_grid_X', '_dft_BZ_integration_grid_Y': 'get_BZ_integration_grid_Y', '_dft_BZ_integration_grid_Z': 'get_BZ_integration_grid_Z', '_dft_BZ_integration_grid_shift_X': 'get_BZ_integration_grid_shift_X', '_dft_BZ_integration_grid_shift_Y': 'get_BZ_integration_grid_shift_Y', '_dft_BZ_integration_grid_shift_Z': 'get_BZ_integration_grid_shift_Z', '_dft_kinetic_energy_cutoff_wavefunctions': 'get_kinetic_energy_cutoff_wavefunctions', '_dft_kinetic_energy_cutoff_charge_density': 'get_kinetic_energy_cutoff_charge_density', '_dft_kinetic_energy_cutoff_EEX': 'get_kinetic_energy_cutoff_EEX', '_dft_pseudopotential_atom_type': 'get_pseudopotential_atom_type', '_dft_pseudopotential_type': 'get_pseudopotential_type', '_dft_pseudopotential_type_other_name': 'get_pseudopotential_type_other_name', ## Residual forces are no longer produced, as they should ## be in the same CIF loop with coordinates -- to be ## implemented later, since it's not yet clear how. # '_tcod_atom_site_resid_force_Cartn_x': 'get_atom_site_residual_force_Cartesian_x', # '_tcod_atom_site_resid_force_Cartn_y': 'get_atom_site_residual_force_Cartesian_y', # '_tcod_atom_site_resid_force_Cartn_z': 'get_atom_site_residual_force_Cartesian_z', } tags = dict() for tag, function in translation_map.iteritems(): value = None try: value = getattr(translator, function)(calc, **kwargs) except NotImplementedError as e: pass if value is not None: if isinstance(value,list): for i in range(0,len(value)): if value[i] is None: value[i] = '?' tags[tag] = value return tags