Source code for aiida.transport.plugins.local

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
###
### GP: a note on the local transport:
### I believe that we must not use os.chdir to keep track of the folder
### in which we are, since this may have very nasty side effects in other
### parts of code, and make things not thread-safe.
### we should instead keep track internally of the 'current working directory'
### in the exact same way as paramiko does already.

import os, shutil, subprocess
import aiida.transport
from aiida.transport import FileAttribute
import StringIO
import glob
from aiida.common import aiidalogger



[docs]class LocalTransport(aiida.transport.Transport): """ Support copy and command execution on the same host on which AiiDA is running via direct file copy and execution commands. """ # There are no valid parameters for the local transport _valid_auth_params = [] def __init__(self, **kwargs): super(LocalTransport, self).__init__() # _internal_dir will emulate the concept of working directory # The real current working directory is not to be changed # self._internal_dir = None self._is_open = False self._internal_dir = None # Just to avoid errors self._machine = kwargs.pop('machine', None) if self._machine and self._machine != 'localhost': # TODO: check if we want a different logic self.logger.debug('machine was passed, but it is not localhost') if kwargs: raise ValueError("Input parameters to LocalTransport" " are not recognized")
[docs] def open(self): """ Opens a local transport channel :raise InvalidOperation: if the channel is already open """ from aiida.common.exceptions import InvalidOperation if self._is_open: raise InvalidOperation("Cannot open the transport twice") self._internal_dir = os.path.expanduser("~") self._is_open = True return self
[docs] def close(self): """ Closes the local transport channel :raise InvalidOperation: if the channel is already open """ from aiida.common.exceptions import InvalidOperation if not self._is_open: raise InvalidOperation("Cannot close the transport: " "it is already closed") self._is_open = False
def __str__(self): """ Return a description as a string. """ return "local [{}]".format("OPEN" if self._is_open else "CLOSED") @property def curdir(self): """ Returns the _internal_dir, if the channel is open. If possible, use getcwd() instead! """ if self._is_open: return os.path.realpath(self._internal_dir) else: raise aiida.transport.TransportInternalError( "Error, local method called for LocalTransport " "without opening the channel first")
[docs] def chdir(self, path): """ Changes directory to path, emulated internally. :param path: path to cd into :raise OSError: if the directory does not have read attributes. """ new_path = os.path.join(self.curdir, path) if os.access(new_path, os.R_OK): self._internal_dir = os.path.normpath(new_path) else: raise IOError('Not having read permissions')
[docs] def normalize(self, path): """ Normalizes path, eliminating double slashes, etc.. :param path: path to normalize """ return os.path.realpath(os.path.join(self.curdir, path))
[docs] def getcwd(self): """ Returns the current working directory, emulated by the transport """ return self.curdir
def _os_path_split_asunder(self, path): """ Used by makedirs. Takes path (a str) and returns a list deconcatenating the path """ # TODO : test it on windows (sigh!) parts = [] while True: newpath, tail = os.path.split(path) if newpath == path: assert not tail if path: parts.append(path) break parts.append(tail) path = newpath parts.reverse() return parts
[docs] def makedirs(self, path, ignore_existing=False): """ Super-mkdir; create a leaf directory and all intermediate ones. Works like mkdir, except that any intermediate path segment (not just the rightmost) will be created if it does not exist. :param path: directory to create :param ignore_existing: if set to true, it doesn't give any error if the leaf directory does already exist :raise OSError: If the directory already exists and is not ignore_existing """ # check to avoid creation of empty dirs path = os.path.normpath(path) the_path = os.path.join(self.curdir, path) to_create = self._os_path_split_asunder(the_path) this_dir = '' for count, element in enumerate(to_create): this_dir = os.path.join(this_dir, element) if count + 1 == len(to_create) and self.isdir(this_dir) and ignore_existing: return if count + 1 == len(to_create) and self.isdir(this_dir) and not ignore_existing: os.mkdir(this_dir) if not os.path.exists(this_dir): os.mkdir(this_dir)
[docs] def mkdir(self, path, ignore_existing=False): """ Create a folder (directory) named path. :param path: name of the folder to create :param ignore_existing: if True, does not give any error if the directory already exists :raise OSError: If the directory already exists. """ if ignore_existing and self.isdir(path): return os.mkdir(os.path.join(self.curdir, path))
[docs] def rmdir(self, path): """ Removes a folder at location path. :param path: path to remove """ os.rmdir(os.path.join(self.curdir, path))
[docs] def isdir(self, path): """ Checks if 'path' is a directory. :return: a boolean """ if not path: return False else: return os.path.isdir(os.path.join(self.curdir, path))
[docs] def chmod(self, path, mode): """ Changes permission bits of object at path :param path: path to modify :param mode: permission bits :raise IOError: if path does not exist. """ if not path: raise IOError("Directory not given in input") real_path = os.path.join(self.curdir, path) if not os.path.exists(real_path): raise IOError("Directory not given in input") else: os.chmod(real_path, mode)
[docs] def put(self, source, destination, dereference=True, overwrite=True, ignore_nonexisting=False): """ Copies a file or a folder from source to destination. Automatically redirects to putfile or puttree. :param source: absolute path to local file :param destination: path to remote file :param dereference: if True follows symbolic links. Default = True :param overwrite: if True overwrites destination. Default = False :raise IOError: if destination is not valid :raise ValueError: if source is not valid """ if not destination: raise IOError("Input destination to put function " "must be a non empty string") if not source: raise ValueError("Input source to put function " "must be a non empty string") if not os.path.isabs(source): raise ValueError("Source must be an absolute path") if self.has_magic(source): if self.has_magic(destination): raise ValueError("Pathname patterns are not allowed in the " "destination") to_copy_list = glob.glob(source) # using local glob here rename_remote = False if len(to_copy_list) > 1: # I can't scp more than one file on a single file if self.isfile(destination): raise OSError("Remote destination is not a directory") # I can't scp more than one file in a non existing directory elif not self.path_exists(destination): # questo dovrebbe valere solo per file raise OSError("Remote directory does not exist") else: # the remote path is a directory rename_remote = True for s in to_copy_list: if os.path.isfile(s): if rename_remote: # copying more than one file in one directory # here is the case isfile and more than one file r = os.path.join(destination, os.path.split(s)[1]) self.putfile(s, r, overwrite) elif self.isdir(destination): # one file to copy in '.' r = os.path.join(destination, os.path.split(s)[1]) self.putfile(s, r, overwrite) else: # one file to copy on one file self.putfile(s, destination, overwrite) else: self.puttree(s, destination, dereference, overwrite) else: if os.path.isdir(source): self.puttree(source, destination, dereference, overwrite) elif os.path.isfile(source): if self.isdir(destination): r = os.path.join(destination, os.path.split(source)[1]) self.putfile(source, r, overwrite) else: self.putfile(source, destination, overwrite) else: if ignore_nonexisting: pass else: raise OSError("The local path {} does not exist" .format(source))
[docs] def putfile(self, source, destination, overwrite=True): """ Copies a file from source to destination. Automatically redirects to putfile or puttree. :param source: absolute path to local file :param destination: path to remote file :param overwrite: if True overwrites destination Default = False :raise IOError: if destination is not valid :raise ValueError: if source is not valid :raise OSError: if source does not exist """ if not destination: raise IOError("Input destination to putfile " "must be a non empty string") if not source: raise ValueError("Input source to putfile " "must be a non empty string") if not os.path.isabs(source): raise ValueError("Source must be an absolute path") if not os.path.exists(source): raise OSError("Source does not exists") the_destination = os.path.join(self.curdir, destination) if os.path.exists(the_destination) and not overwrite: raise OSError('Destination already exists: not overwriting it') shutil.copyfile(source, the_destination)
[docs] def puttree(self, source, destination, dereference=True, overwrite=True): """ Copies a folder recursively from source to destination. Automatically redirects to putfile or puttree. :param source: absolute path to local file :param destination: path to remote file :param dereference: follow symbolic links. Default = True :param overwrite: if True overwrites destination. Default = False :raise IOError: if destination is not valid :raise ValueError: if source is not valid :raise OSError: if source does not exist """ if not destination: raise IOError("Input destination to putfile " "must be a non empty string") if not source: raise ValueError("Input source to putfile " "must be a non empty string") if not os.path.isabs(source): raise ValueError("Source must be an absolute path") if not os.path.exists(source): raise OSError("Source does not exists") if self.path_exists(destination) and not overwrite: raise OSError("Can't overwrite existing files") if self.isfile(destination): raise OSError("Cannot copy a directory into a file") if self.isdir(destination): destination = os.path.join(destination, os.path.split(source)[1]) the_destination = os.path.join(self.curdir, destination) shutil.copytree(source, the_destination, symlinks=not(dereference))
[docs] def rmtree(self, path): """ Remove tree as rm -r would do :param path: a string to path """ the_path = os.path.join(self.curdir, path) try: shutil.rmtree(the_path) except OSError as e: if e.errno == 20: os.remove(the_path) else: raise e
[docs] def get(self, source, destination, dereference=True, overwrite=True, ignore_nonexisting=False): """ Copies a folder or a file recursively from 'remote' source to 'local' destination. Automatically redirects to getfile or gettree. :param source: path to local file :param destination: absolute path to remote file :param dereference: follow symbolic links default = True :param overwrite: if True overwrites destination default = False :raise IOError: if 'remote' source is not valid :raise ValueError: if 'local' destination is not valid """ if not destination: raise ValueError("Input destination to get function " "must be a non empty string") if not source: raise IOError("Input source to get function " "must be a non empty string") if not os.path.isabs(destination): raise ValueError("Destination must be an absolute path") if self.has_magic(source): if self.has_magic(destination): raise ValueError("Pathname patterns are not allowed in the " "destination") to_copy_list = self.glob(source) rename_local = False if len(to_copy_list) > 1: # I can't scp more than one file on a single file if os.path.isfile(destination): raise IOError("Remote destination is not a directory") # I can't scp more than one file in a non existing directory elif not os.path.exists(destination): # this should hold only for files raise OSError("Remote directory does not exist") else: # the remote path is a directory rename_local = True for s in to_copy_list: if self.isfile(s): if rename_local: # copying more than one file in one directory # here is the case isfile and more than one file r = os.path.join(destination, os.path.split(s)[1]) self.getfile(s, r, overwrite) else: # one file to copy on one file self.getfile(s, destination, overwrite) else: self.gettree(s, destination, dereference, overwrite) else: if self.isdir(source): self.gettree(source, destination, dereference, overwrite) elif self.isfile(source): if os.path.isdir(destination): r = os.path.join(destination, os.path.split(source)[1]) self.getfile(source, r, overwrite) else: self.getfile(source, destination, overwrite) else: if ignore_nonexisting: pass else: raise IOError("The local path {} does not exist" .format(destination))
[docs] def getfile(self, source, destination, overwrite=True): """ Copies a file recursively from 'remote' source to 'local' destination. :param source: path to local file :param destination: absolute path to remote file :param overwrite: if True overwrites destination. Default = False :raise IOError if 'remote' source is not valid or not found :raise ValueError: if 'local' destination is not valid :raise OSError: if unintentionally overwriting """ if not destination: raise ValueError("Input destination to get function " "must be a non empty string") if not source: raise IOError("Input source to get function " "must be a non empty string") the_source = os.path.join(self.curdir, source) if not os.path.exists(the_source): raise IOError("Source not found") if not os.path.isabs(destination): raise ValueError("Destination must be an absolute path") if os.path.exists(destination) and not overwrite: raise OSError('Destination already exists: not overwriting it') shutil.copyfile(the_source, destination)
[docs] def gettree(self, source, destination, dereference=True, overwrite=True): """ Copies a folder recursively from 'remote' source to 'local' destination. :param source: path to local file :param destination: absolute path to remote file :param dereference: follow symbolic links. Default = True :param overwrite: if True overwrites destination. Default = False :raise IOError: if 'remote' source is not valid :raise ValueError: if 'local' destination is not valid :raise OSError: if unintentionally overwriting """ if not source: raise IOError("Remotepath must be a non empty string") if not destination: raise ValueError("Localpaths must be a non empty string") if not os.path.isabs(destination): raise ValueError("Localpaths must be an absolute path") if not self.isdir(source): raise IOError("Input remotepath is not a folder: {}".format(source)) if os.path.exists(destination) and not overwrite: raise OSError("Can't overwrite existing files") if os.path.isfile(destination): raise OSError("Cannot copy a directory into a file") if os.path.isdir(destination): destination = os.path.join(destination, os.path.split(source)[1]) the_source = os.path.join(self.curdir, source) shutil.copytree(the_source, destination, symlinks=not(dereference))
[docs] def copy(self, source, destination, dereference=False): """ Copies a file or a folder from 'remote' source to 'remote' destination. Automatically redirects to copyfile or copytree. :param source: path to local file :param destination: path to remote file :param dereference: follow symbolic links. Default = False :raise ValueError: if 'remote' source or destination is not valid :raise OSError: if source does not exist """ if not source: raise ValueError("Input source to copy " "must be a non empty object") if not destination: raise ValueError("Input destination to copy " "must be a non empty object") if not self.has_magic(source): if not os.path.exists(os.path.join(self.curdir, source)): raise OSError("Source not found") if self.normalize(source) == self.normalize(destination): raise ValueError("Cannot copy from itself to itself") # # by default, overwrite old files # if not destination.startswith('.'): # if self.isfile(destination) or self.isdir(destination): # self.rmtree(destination) the_destination = os.path.join(self.curdir, destination) if self.has_magic(source): if self.has_magic(destination): raise ValueError("Pathname patterns are not allowed in the " "destination") to_copy_list = self.glob(source) if len(to_copy_list) > 1: if not self.path_exists(destination) or self.isfile(destination): raise OSError("Can't copy more than one file in the same " "destination file") for s in to_copy_list: # If s is an absolute path, then the_s = s the_s = os.path.join(self.curdir, s) if self.isfile(s): # With shutil, use the full path (the_s) shutil.copy(the_s, the_destination) else: # With self.copytree, the (possible) relative path is OK self.copytree(s, destination, dereference) else: # If s is an absolute path, then the_source = source the_source = os.path.join(self.curdir, source) if self.isfile(source): # With shutil, use the full path (the_source) shutil.copy(the_source, the_destination) else: # With self.copytree, the (possible) relative path is OK self.copytree(source, destination, dereference)
[docs] def copyfile(self, source, destination): """ Copies a file from 'remote' source to 'remote' destination. :param source: path to local file :param destination: path to remote file :raise ValueError: if 'remote' source or destination is not valid :raise OSError: if source does not exist """ if not source: raise ValueError("Input source to copyfile " "must be a non empty object") if not destination: raise ValueError("Input destination to copyfile " "must be a non empty object") the_source = os.path.join(self.curdir, source) the_destination = os.path.join(self.curdir, destination) if not os.path.exists(the_source): raise OSError("Source not found") shutil.copyfile(the_source, the_destination)
[docs] def copytree(self, source, destination, dereference=False): """ Copies a folder from 'remote' source to 'remote' destination. :param source: path to local file :param destination: path to remote file :param dereference: follow symbolic links. Default = False :raise ValueError: if 'remote' source or destination is not valid :raise OSError: if source does not exist """ if not source: raise ValueError("Input source to copytree " "must be a non empty object") if not destination: raise ValueError("Input destination to copytree " "must be a non empty object") the_source = os.path.join(self.curdir, source) the_destination = os.path.join(self.curdir, destination) if not os.path.exists(the_source): raise OSError("Source not found") # Using the Ubuntu default behavior (different from Mac) if self.isdir(destination): the_destination = os.path.join(the_destination, os.path.split(source)[1]) shutil.copytree(the_source, the_destination, symlinks=not(dereference))
[docs] def get_attribute(self, path): """ Returns an object FileAttribute, as specified in aiida.transport. :param path: the path of the given file. """ os_attr = os.lstat(os.path.join(self.curdir, path)) aiida_attr = FileAttribute() # map the paramiko class into the aiida one # note that paramiko object contains more informations than the aiida for key in aiida_attr._valid_fields: aiida_attr[key] = getattr(os_attr, key) return aiida_attr
def _local_listdir(self, path, pattern=None): # acts on the local folder, for the rest, same as listdir if not pattern: return os.listdir(path) else: import re if path.startswith('/'): # always this is the case in the local plugin base_dir = path else: base_dir = os.path.join(os.getcwd(), path) filtered_list = glob.glob(os.path.join(base_dir, pattern)) if not base_dir.endswith(os.sep): base_dir += os.sep return [re.sub(base_dir, '', i) for i in filtered_list]
[docs] def listdir(self, path='.', pattern=None): """ :return: a list containing the names of the entries in the directory. :param path: default ='.' :param pattern: if set, returns the list of files matching pattern. Unix only. (Use to emulate ls * for example) """ the_path = os.path.join(self.curdir, path).strip() if not pattern: try: return os.listdir(the_path) except OSError as e: exc = IOError(str(e)) exc.errno = e.errno raise exc else: import re filtered_list = glob.glob(os.path.join(the_path, pattern)) if not the_path.endswith('/'): the_path += '/' return [re.sub(the_path, '', i) for i in filtered_list]
[docs] def remove(self, path): """ Removes a file at position path. """ os.remove(os.path.join(self.curdir, path))
[docs] def isfile(self, path): """ Checks if object at path is a file. Returns a boolean. """ if not path: return False else: return os.path.isfile(os.path.join(self.curdir, path))
def _exec_command_internal(self, command): """ Executes the specified command, first changing directory to the current working directory as returned by self.getcwd(). Does not wait for the calculation to finish. For a higher-level exec_command that automatically waits for the job to finish, use exec_command_wait. Otherwise, to end the process, use the proc.wait() method. :param command: the command to execute :return: a tuple with (stdin, stdout, stderr, proc), where stdin, stdout and stderr behave as file-like objects, proc is the process object as returned by the subprocess.Popen() class. """ proc = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.getcwd()) return proc.stdin, proc.stdout, proc.stderr, proc
[docs] def exec_command_wait(self, command, stdin=None): """ Executes the specified command and waits for it to finish. :param command: the command to execute :return: a tuple with (return_value, stdout, stderr) where stdout and stderr are strings. """ local_stdin, local_stdout, local_stderr, local_proc = self._exec_command_internal( command) if stdin is not None: if isinstance(stdin, basestring): filelike_stdin = StringIO.StringIO(stdin) else: filelike_stdin = stdin try: for l in filelike_stdin.readlines(): local_proc.stdin.write(l) except AttributeError: raise ValueError("stdin can only be either a string of a " "file-like object!") else: filelike_stdin = None local_stdin.flush() # TODO : instead of stringIO use cstringIO # TODO : use input option of communicate() output_text, stderr_text = local_proc.communicate() retval = local_proc.returncode return retval, output_text, stderr_text
[docs] def gotocomputer_command(self, remotedir): """ Return a string to be run using os.system in order to connect via the transport to the remote directory. Expected behaviors: * A new bash session is opened * A reasonable error message is produced if the folder does not exist :param str remotedir: the full path of the remote directory """ # return """bash -c "if [ -d {escaped_remotedir} ] ; then cd {escaped_remotedir} ; bash --rcfile <(echo 'if [ -e ~/.bashrc ] ; then source ~/.bashrc ; fi ; export PS1="'"\[\033[01;31m\][AiiDA] \033[00m\]$PS1"'"') ; else echo ' ** The directory' ; echo ' ** {remotedir}' ; echo ' ** seems to have been deleted, I logout...' ; fi" """.format( # escaped_remotedir="'{}'".format(remotedir), remotedir=remotedir) return """bash -c "if [ -d {escaped_remotedir} ] ; then cd {escaped_remotedir} ; bash ; else echo ' ** The directory' ; echo ' ** {remotedir}' ; echo ' ** seems to have been deleted, I logout...' ; fi" """.format( escaped_remotedir="'{}'".format(remotedir), remotedir=remotedir)
[docs] def rename(self, src, dst): """ Rename a file or folder from oldpath to newpath. :param str oldpath: existing name of the file or folder :param str newpath: new name for the file or folder :raises IOError: if src/dst is not found :raises ValueError: if src/dst is not a valid string """ if not src: raise ValueError("Source {} is not a valid string".format(src)) if not dst: raise ValueError("Destination {} is not a valid string".format(dst)) if not os.path.exists(src): raise IOError("Source {} does not exist".format(src)) if not os.path.exists(dst): raise IOError("Destination {} does not exist".format(dst)) shutil.move(src, dst)
[docs] def path_exists(self, path): """ Check if path exists """ return os.path.exists(os.path.join(self.curdir, path))