# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Data plugin that models a folder on a remote computer."""
import os
from aiida.orm import AuthInfo
from ..data import Data
__all__ = ('RemoteData',)
[docs]class RemoteData(Data):
"""
Store a link to a file or folder on a remote machine.
Remember to pass a computer!
"""
KEY_EXTRA_CLEANED = 'cleaned'
[docs] def __init__(self, remote_path=None, **kwargs):
super().__init__(**kwargs)
if remote_path is not None:
self.set_remote_path(remote_path)
[docs] def get_remote_path(self):
return self.get_attribute('remote_path')
[docs] def set_remote_path(self, val):
self.set_attribute('remote_path', val)
@property
def is_empty(self):
"""
Check if remote folder is empty
"""
authinfo = self.get_authinfo()
transport = authinfo.get_transport()
with transport:
try:
transport.chdir(self.get_remote_path())
except IOError:
# If the transport IOError the directory no longer exists and was deleted
return True
return not transport.listdir()
[docs] def getfile(self, relpath, destpath):
"""
Connects to the remote folder and retrieves the content of a file.
:param relpath: The relative path of the file on the remote to retrieve.
:param destpath: The absolute path of where to store the file on the local machine.
"""
authinfo = self.get_authinfo()
with authinfo.get_transport() as transport:
try:
full_path = os.path.join(self.get_remote_path(), relpath)
transport.getfile(full_path, destpath)
except IOError as exception:
if exception.errno == 2: # file does not exist
raise IOError(
'The required remote file {} on {} does not exist or has been deleted.'.format(
full_path,
self.computer.label # pylint: disable=no-member
)
) from exception
raise
[docs] def listdir(self, relpath='.'):
"""
Connects to the remote folder and lists the directory content.
:param relpath: If 'relpath' is specified, lists the content of the given subfolder.
:return: a flat list of file/directory names (as strings).
"""
authinfo = self.get_authinfo()
with authinfo.get_transport() as transport:
try:
full_path = os.path.join(self.get_remote_path(), relpath)
transport.chdir(full_path)
except IOError as exception:
if exception.errno in (2, 20): # directory not existing or not a directory
exc = IOError(
'The required remote folder {} on {} does not exist, is not a directory or has been deleted.'.
format(full_path, self.computer.label) # pylint: disable=no-member
)
exc.errno = exception.errno
raise exc from exception
else:
raise
try:
return transport.listdir()
except IOError as exception:
if exception.errno in (2, 20): # directory not existing or not a directory
exc = IOError(
'The required remote folder {} on {} does not exist, is not a directory or has been deleted.'.
format(full_path, self.computer.label) # pylint: disable=no-member
)
exc.errno = exception.errno
raise exc from exception
else:
raise
[docs] def listdir_withattributes(self, path='.'):
"""
Connects to the remote folder and lists the directory content.
:param relpath: If 'relpath' is specified, lists the content of the given subfolder.
:return: a list of dictionaries, where the documentation is in :py:class:Transport.listdir_withattributes.
"""
authinfo = self.get_authinfo()
with authinfo.get_transport() as transport:
try:
full_path = os.path.join(self.get_remote_path(), path)
transport.chdir(full_path)
except IOError as exception:
if exception.errno in (2, 20): # directory not existing or not a directory
exc = IOError(
'The required remote folder {} on {} does not exist, is not a directory or has been deleted.'.
format(full_path, self.computer.label) # pylint: disable=no-member
)
exc.errno = exception.errno
raise exc from exception
else:
raise
try:
return transport.listdir_withattributes()
except IOError as exception:
if exception.errno in (2, 20): # directory not existing or not a directory
exc = IOError(
'The required remote folder {} on {} does not exist, is not a directory or has been deleted.'.
format(full_path, self.computer.label) # pylint: disable=no-member
)
exc.errno = exception.errno
raise exc from exception
else:
raise
[docs] def _clean(self, transport=None):
"""Remove all content of the remote folder on the remote computer.
When the cleaning operation is successful, the extra with the key ``RemoteData.KEY_EXTRA_CLEANED`` is set.
:param transport: Provide an optional transport that is already open. If not provided, a transport will be
automatically opened, based on the current default user and the computer of this data node. Passing in the
transport can be used for efficiency if a great number of nodes need to be cleaned for the same computer.
Note that the user should take care that the correct transport is passed.
:raises ValueError: If the hostname of the provided transport does not match that of the node's computer.
"""
from aiida.orm.utils.remote import clean_remote
remote_dir = self.get_remote_path()
if transport is None:
with self.get_authinfo().get_transport() as transport: # pylint: disable=redefined-argument-from-local
clean_remote(transport, remote_dir)
else:
if transport.hostname != self.computer.hostname:
raise ValueError(
f'Transport hostname `{transport.hostname}` does not equal `{self.computer.hostname}` of {self}.'
)
clean_remote(transport, remote_dir)
self.set_extra(self.KEY_EXTRA_CLEANED, True)
[docs] def _validate(self):
from aiida.common.exceptions import ValidationError
super()._validate()
try:
self.get_remote_path()
except AttributeError as exception:
raise ValidationError("attribute 'remote_path' not set.") from exception
computer = self.computer
if computer is None:
raise ValidationError('Remote computer not set.')
[docs] def get_authinfo(self):
return AuthInfo.objects(self.backend).get(dbcomputer=self.computer, aiidauser=self.user)