Source code for aiida.tools.importexport.dbexport.zip

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
"""Export a zip-file."""
# pylint: disable=missing-docstring,redefined-builtin
import io
import os
import zipfile


[docs]class MyWritingZipFile:
[docs] def __init__(self, zip_file, fname): self._zipfile = zip_file self._fname = fname self._buffer = None
[docs] def open(self): if self._buffer is not None: raise IOError('Cannot open again!') self._buffer = io.StringIO()
[docs] def write(self, data): self._buffer.write(data)
[docs] def close(self): self._buffer.seek(0) self._zipfile.writestr(self._fname, self._buffer.read()) self._buffer = None
[docs] def __enter__(self): self.open() return self
[docs] def __exit__(self, type, value, traceback): self.close()
[docs]class ZipFolder: """ To improve: if zipfile is closed, do something (e.g. add explicit open method, rename open to openfile, set _zipfile to None, ...) """
[docs] def __init__(self, zipfolder_or_fname, mode=None, subfolder='.', use_compression=True, allowZip64=True): """ :param zipfolder_or_fname: either another ZipFolder instance, of which you want to get a subfolder, or a filename to create. :param mode: the file mode; see the zipfile.ZipFile docs for valid strings. Note: can be specified only if zipfolder_or_fname is a string (the filename to generate) :param subfolder: the subfolder that specified the "current working directory" in the zip file. If zipfolder_or_fname is a ZipFolder, subfolder is a relative path from zipfolder_or_fname.subfolder :param use_compression: either True, to compress files in the Zip, or False if you just want to pack them together without compressing. It is ignored if zipfolder_or_fname is a ZipFolder isntance. """ if isinstance(zipfolder_or_fname, str): the_mode = mode if the_mode is None: the_mode = 'r' if use_compression: compression = zipfile.ZIP_DEFLATED else: compression = zipfile.ZIP_STORED self._zipfile = zipfile.ZipFile( zipfolder_or_fname, mode=the_mode, compression=compression, allowZip64=allowZip64 ) self._pwd = subfolder else: if mode is not None: raise ValueError("Cannot specify 'mode' when passing a ZipFolder") self._zipfile = zipfolder_or_fname._zipfile # pylint: disable=protected-access self._pwd = os.path.join(zipfolder_or_fname.pwd, subfolder)
[docs] def __enter__(self): return self
[docs] def __exit__(self, type, value, traceback): self.close()
[docs] def close(self): self._zipfile.close()
@property def pwd(self): return self._pwd
[docs] def open(self, fname, mode='r'): if mode == 'w': return MyWritingZipFile(zip_file=self._zipfile, fname=self._get_internal_path(fname)) # else return self._zipfile.open(self._get_internal_path(fname), mode)
[docs] def _get_internal_path(self, filename): return os.path.normpath(os.path.join(self.pwd, filename))
# pylint: disable=unused-argument
[docs] def get_subfolder(self, subfolder, create=False, reset_limit=False): # reset_limit: ignored # create: ignored, for the time being return ZipFolder(self, subfolder=subfolder)
[docs] def exists(self, path): """Check whether path already exists in the ZipFolder""" try: self._zipfile.getinfo(path) exists = True except KeyError: exists = False return exists
[docs] def insert_path(self, src, dest_name=None, overwrite=True): if dest_name is None: base_filename = str(os.path.basename(src)) else: base_filename = str(dest_name) base_filename = self._get_internal_path(base_filename) src = str(src) if not os.path.isabs(src): raise ValueError('src must be an absolute path in insert_file') if not overwrite and self.exists(base_filename): raise IOError('destination already exists: {}'.format(base_filename)) if os.path.isdir(src): for dirpath, dirnames, filenames in os.walk(src): relpath = os.path.relpath(dirpath, src) if not dirnames and not filenames: real_src = dirpath real_dest = os.path.join(base_filename, relpath) + os.path.sep if not self.exists(real_dest): self._zipfile.write(real_src, real_dest) else: for fname in dirnames + filenames: real_src = os.path.join(dirpath, fname) real_dest = os.path.join(base_filename, relpath, fname) if not self.exists(real_dest): self._zipfile.write(real_src, real_dest) else: self._zipfile.write(src, base_filename)