Source code for aiida.common.archive

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################

[docs]def extract_zip(infile, folder, nodes_export_subfolder="nodes", silent=False): """ Extract the nodes to be imported from a zip file. :param infile: file path :param folder: a SandboxFolder, used to extract the file tree :param nodes_export_subfolder: name of the subfolder for AiiDA nodes :param silent: suppress debug print """ import os import zipfile if not silent: print "READING DATA AND METADATA..." try: with zipfile.ZipFile(infile, "r") as zip: zip.extract(path=folder.abspath, member='metadata.json') zip.extract(path=folder.abspath, member='data.json') if not silent: print "EXTRACTING NODE DATA..." for membername in zip.namelist(): # Check that we are only exporting nodes within # the subfolder! # TODO: better check such that there are no .. in the # path; use probably the folder limit checks if not membername.startswith(nodes_export_subfolder+os.sep): continue zip.extract(path=folder.abspath, member=membername) except zipfile.BadZipfile: raise ValueError("The input file format for import is not valid (not" " a zip file)")
[docs]def extract_tar(infile, folder, nodes_export_subfolder="nodes", silent=False): """ Extract the nodes to be imported from a (possibly zipped) tar file. :param infile: file path :param folder: a SandboxFolder, used to extract the file tree :param nodes_export_subfolder: name of the subfolder for AiiDA nodes :param silent: suppress debug print """ import os import tarfile if not silent: print "READING DATA AND METADATA..." try: with tarfile.open(infile, "r:*", format=tarfile.PAX_FORMAT) as tar: tar.extract(path=folder.abspath, member=tar.getmember('metadata.json')) tar.extract(path=folder.abspath, member=tar.getmember('data.json')) if not silent: print "EXTRACTING NODE DATA..." for member in tar.getmembers(): if member.isdev(): # safety: skip if character device, block device or FIFO print >> sys.stderr, ("WARNING, device found inside the " "import file: {}".format(member.name)) continue if member.issym() or member.islnk(): # safety: in export, I set dereference=True therefore # there should be no symbolic or hard links. print >> sys.stderr, ("WARNING, link found inside the " "import file: {}".format(member.name)) continue # Check that we are only exporting nodes within # the subfolder! # TODO: better check such that there are no .. in the # path; use probably the folder limit checks if not member.name.startswith(nodes_export_subfolder+os.sep): continue tar.extract(path=folder.abspath, member=member) except tarfile.ReadError: raise ValueError("The input file format for import is not valid (1)")
[docs]def extract_tree(infile, folder, silent=False): """ Prepare to import nodes from plain file system tree. :param infile: path :param folder: a SandboxFolder, used to extract the file tree :param silent: suppress debug print """ import os def add_files(args,path,files): folder = args['folder'] root = args['root'] for f in files: fullpath = os.path.join(path,f) relpath = os.path.relpath(fullpath,root) if os.path.isdir(fullpath): if os.path.dirname(relpath) != '': folder.get_subfolder(os.path.dirname(relpath) + os.sep, create=True) elif not os.path.isfile(fullpath): continue if os.path.dirname(relpath) != '': folder.get_subfolder(os.path.dirname(relpath)+os.sep, create=True) folder.insert_path(os.path.abspath(fullpath),relpath) os.path.walk(infile,add_files,{'folder': folder, 'root': infile})
[docs]def extract_cif(infile, folder, nodes_export_subfolder="nodes", aiida_export_subfolder="aiida", silent=False): """ Extract the nodes to be imported from a TCOD CIF file. TCOD CIFs, exported by AiiDA, may contain an importable subset of AiiDA database, which can be imported. This function prepares SandboxFolder with files required for import. :param infile: file path :param folder: a SandboxFolder, used to extract the file tree :param nodes_export_subfolder: name of the subfolder for AiiDA nodes :param aiida_export_subfolder: name of the subfolder for AiiDA data inside the TCOD CIF internal file tree :param silent: suppress debug print """ import os import urllib2 import CifFile from aiida.common.exceptions import ValidationError from aiida.common.utils import md5_file, sha1_file from aiida.tools.dbexporters.tcod import decode_textfield values = CifFile.ReadCif(infile) values = values[values.keys()[0]] # taking the first datablock in CIF for i in range(0,len(values['_tcod_file_id'])-1): name = values['_tcod_file_name'][i] if not name.startswith(aiida_export_subfolder+os.sep): continue dest_path = os.path.relpath(name,aiida_export_subfolder) if name.endswith(os.sep): if not os.path.exists(folder.get_abs_path(dest_path)): folder.get_subfolder(folder.get_abs_path(dest_path),create=True) continue contents = values['_tcod_file_contents'][i] if contents == '?' or contents == '.': uri = values['_tcod_file_uri'][i] if uri is not None and uri != '?' and uri != '.': contents = urllib2.urlopen(uri).read() encoding = values['_tcod_file_content_encoding'][i] if encoding == '.': encoding = None contents = decode_textfield(contents,encoding) if os.path.dirname(dest_path) != '': folder.get_subfolder(os.path.dirname(dest_path)+os.sep,create=True) with open(folder.get_abs_path(dest_path),'w') as f: f.write(contents) f.flush() md5 = values['_tcod_file_md5sum'][i] if md5 is not None: if md5_file(folder.get_abs_path(dest_path)) != md5: raise ValidationError("MD5 sum for extracted file '{}' is " "different from given in the CIF " "file".format(dest_path)) sha1 = values['_tcod_file_sha1sum'][i] if sha1 is not None: if sha1_file(folder.get_abs_path(dest_path)) != sha1: raise ValidationError("SHA1 sum for extracted file '{}' is " "different from given in the CIF " "file".format(dest_path))