# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""
This module contains a set of unittest test classes that can be loaded from
the plugin.
Every transport plugin should be able to pass all of these common tests.
Plugin specific tests will be written in the plugin itself.
"""
# TODO : test for copy with pattern
# TODO : test for copy with/without patterns, overwriting folder
# TODO : test for exotic cases of copy with source = destination
# TODO : silly cases of copy/put/get from self to self
import unittest
[docs]def get_all_custom_transports():
"""
Autodiscover all custom transports defined in the variable
plugin_transpors inside each test_* file in this folder.
Therefore, do not move this function out of this file.
:return: a dictionary of objects as defined in the various plugin_transport
variables of the different files (the key is the module in which
it was found)
"""
import importlib
import os
modulename = __name__.rpartition('.')[0]
this_full_fname = __file__
thisdir, thisfname = os.path.split(this_full_fname)
test_modules = [os.path.split(f)[1][:-3]
for f in os.listdir(thisdir)
if f.endswith('.py') and f.startswith('test_')]
# Remove this module: note that I should be careful because __file__, from
# the second time on, is the pyc file rather than the py file
thisbasename = os.path.splitext(thisfname)[0]
try:
test_modules.remove(thisbasename)
except IndexError:
print "Warning, this module ({}) was not found!".format(thisbasename)
all_custom_transports = {}
for m in test_modules:
module = importlib.import_module(".".join([modulename, m]))
custom_transport = module.__dict__.get('plugin_transport', None)
if custom_transport is None:
print ("Define the plugin_transport variable inside the {} module!"
"".format(m))
else:
all_custom_transports[m] = custom_transport
return all_custom_transports
[docs]def run_for_all_plugins(actual_test_method):
"""
Decorator method that actually run the methods with an additional
parameter (custom_transport), once for every custom_transport defined
in the test_* files [except this one].
"""
class CollectiveException(Exception):
pass
all_custom_transports = get_all_custom_transports()
def test_all_plugins(self):
"""
The wrapper function that calls the subfunction for each transport.
"""
exceptions = []
for tr_name, custom_transport in all_custom_transports.iteritems():
try:
actual_test_method(self, custom_transport)
except Exception as e:
import traceback
exceptions.append((e, traceback.format_exc(), tr_name))
if exceptions:
if all(isinstance(exc[0], AssertionError) for exc in exceptions):
exception_to_raise = AssertionError
else:
exception_to_raise = CollectiveException
messages = ["*** At least one test for a subplugin failed. "
"See below ***", ""]
for exc in exceptions:
if hasattr(exc[0], "message"):
messages.append("*** [For plugin {}]: Exception '{}': {}"
"".format(exc[2], type(exc[0]).__name__,
exc[0].message))
messages.append(exc[1])
else:
messages.append("*** [For plugin {}]: Exception '{}'".format(
exc[2], type(exc[0]).__name__))
messages.append(exc[1])
raise exception_to_raise("\n".join(messages))
return test_all_plugins
# class TestDebugTests(unittest.TestCase):
# """
# This is only meant to check if the tests for each plugin are performed.
# """
# @run_for_all_plugins
# def test_check_custom_transport(self, custom_transport):
# print custom_transport, type(custom_transport)
[docs]class TestDirectoryManipulation(unittest.TestCase):
"""
Tests to check, create and delete folders.
"""
[docs] @run_for_all_plugins
def test_makedirs(self, custom_transport):
"""
Verify the functioning of makedirs command
"""
# Imports required later
import random
import string
import os
with custom_transport as t:
location = t.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
t.chdir(location)
self.assertEquals(location, t.getcwd())
while t.isdir(directory):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
t.mkdir(directory)
t.chdir(directory)
# define folder structure
dir_tree = os.path.join('1', '2')
# I create the tree
t.makedirs(dir_tree)
# verify the existence
self.assertTrue(t.isdir('1'))
self.assertTrue(dir_tree)
# try to recreate the same folder
with self.assertRaises(OSError):
t.makedirs(dir_tree)
# recreate but with ignore flag
t.makedirs(dir_tree, True)
t.rmdir(dir_tree)
t.rmdir('1')
t.chdir('..')
t.rmdir(directory)
[docs] @run_for_all_plugins
def test_rmtree(self, custom_transport):
"""
Verify the functioning of rmtree command
"""
# Imports required later
import random
import string
import os
with custom_transport as t:
location = t.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
t.chdir(location)
self.assertEquals(location, t.getcwd())
while t.isdir(directory):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
t.mkdir(directory)
t.chdir(directory)
# define folder structure
dir_tree = os.path.join('1', '2')
# I create the tree
t.makedirs(dir_tree)
# remove it
t.rmtree('1')
# verify the removal
self.assertFalse(t.isdir('1'))
# also tests that it works with a single file
# create file
local_file_name = 'file.txt'
text = 'Viva Verdi\n'
with open(os.path.join(t.getcwd(), local_file_name), 'w') as f:
f.write(text)
# remove it
t.rmtree(local_file_name)
# verify the removal
self.assertFalse(t.isfile(local_file_name))
t.chdir('..')
t.rmdir(directory)
[docs] @run_for_all_plugins
def test_listdir(self, custom_transport):
"""
create directories, verify listdir, delete a folder with subfolders
"""
# Imports required later
import tempfile
import random
import string
import os
with custom_transport as t:
# We cannot use tempfile.mkdtemp because we're on a remote folder
location = t.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
t.chdir(location)
self.assertEquals(location, t.getcwd())
while t.isdir(directory):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
t.mkdir(directory)
t.chdir(directory)
list_of_dir = ['1', '-f a&', 'as', 'a2', 'a4f']
list_of_files = ['a', 'b']
for this_dir in list_of_dir:
t.mkdir(this_dir)
for fname in list_of_files:
with tempfile.NamedTemporaryFile() as f:
# Just put an empty file there at the right file name
t.putfile(f.name, fname)
list_found = t.listdir('.')
self.assertTrue(sorted(list_found) == sorted(list_of_dir + list_of_files))
self.assertTrue(sorted(t.listdir('.', 'a*')), sorted(['as', 'a2', 'a4f']))
self.assertTrue(sorted(t.listdir('.', 'a?')), sorted(['as', 'a2']))
self.assertTrue(sorted(t.listdir('.', 'a[2-4]*')), sorted(['a2', 'a4f']))
for this_dir in list_of_dir:
t.rmdir(this_dir)
for this_file in list_of_files:
t.remove(this_file)
t.chdir('..')
t.rmdir(directory)
[docs] @run_for_all_plugins
def test_listdir_withattributes(self, custom_transport):
"""
create directories, verify listdir_withattributes, delete a folder with subfolders
"""
# Imports required later
import tempfile
import random
import string
import os
def simplify_attributes(data):
"""
Take data from listdir_withattributes and return a dictionary
{fname: isdir}
:param data: the output of listdir_withattributes
:return: dictionary: the key is a filename, the value is True if it's a directory, False otherwise
"""
return {_['name']: _['isdir'] for _ in data}
with custom_transport as t:
# We cannot use tempfile.mkdtemp because we're on a remote folder
location = t.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
t.chdir(location)
self.assertEquals(location, t.getcwd())
while t.isdir(directory):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
t.mkdir(directory)
t.chdir(directory)
list_of_dir = ['1', '-f a&', 'as', 'a2', 'a4f']
list_of_files = ['a', 'b']
for this_dir in list_of_dir:
t.mkdir(this_dir)
for fname in list_of_files:
with tempfile.NamedTemporaryFile() as f:
# Just put an empty file there at the right file name
t.putfile(f.name, fname)
comparison_list = {k: True for k in list_of_dir}
for k in list_of_files:
comparison_list[k] = False
self.assertTrue(simplify_attributes(t.listdir_withattributes('.')),
comparison_list)
self.assertTrue(simplify_attributes(t.listdir_withattributes('.', 'a*')),
{'as': True, 'a2': True, 'a4f': True, 'a': False})
self.assertTrue(simplify_attributes(t.listdir_withattributes('.', 'a?')),
{'as': True, 'a2': True})
self.assertTrue(simplify_attributes(t.listdir_withattributes('.', 'a[2-4]*')),
{'a2': True, 'a4f': True})
for this_dir in list_of_dir:
t.rmdir(this_dir)
for this_file in list_of_files:
t.remove(this_file)
t.chdir('..')
t.rmdir(directory)
[docs] @run_for_all_plugins
def test_dir_creation_deletion(self, custom_transport):
# Imports required later
import random
import string
import os
with custom_transport as t:
location = t.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
t.chdir(location)
self.assertEquals(location, t.getcwd())
while t.isdir(directory):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
t.mkdir(directory)
with self.assertRaises(OSError):
# I create twice the same directory
t.mkdir(directory)
t.isdir(directory)
self.assertFalse(t.isfile(directory))
t.rmdir(directory)
[docs] @run_for_all_plugins
def test_dir_copy(self, custom_transport):
"""
Verify if in the copy of a directory also the protection bits
are carried over
"""
# Imports required later
import random
import string
import os
with custom_transport as t:
location = t.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
t.chdir(location)
while t.isdir(directory):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
t.mkdir(directory)
dest_directory = directory + '_copy'
t.copy(directory, dest_directory)
with self.assertRaises(ValueError):
t.copy(directory, '')
with self.assertRaises(ValueError):
t.copy('', directory)
t.rmdir(directory)
t.rmdir(dest_directory)
[docs] @run_for_all_plugins
def test_dir_permissions_creation_modification(self, custom_transport):
"""
verify if chmod raises IOError when trying to change bits on a
non-existing folder
"""
# Imports required later
import random
import string
import os
with custom_transport as t:
location = t.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
t.chdir(location)
while t.isdir(directory):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
# create directory with non default permissions
t.mkdir(directory)
# change permissions
t.chmod(directory, 0777)
# test if the security bits have changed
self.assertEquals(t.get_mode(directory), 0777)
# change permissions
t.chmod(directory, 0511)
# test if the security bits have changed
self.assertEquals(t.get_mode(directory), 0511)
# TODO : bug in paramiko. When changing the directory to very low \
# I cannot set it back to higher permissions
## TODO: probably here we should then check for
## the new directory modes. To see if we want a higher
## level function to ask for the mode, or we just
## use get_attribute
t.chdir(directory)
# change permissions of an empty string, non existing folder.
fake_dir = ''
with self.assertRaises(IOError):
t.chmod(fake_dir, 0777)
fake_dir = 'pippo'
with self.assertRaises(IOError):
# chmod to a non existing folder
t.chmod(fake_dir, 0777)
t.chdir('..')
t.rmdir(directory)
[docs] @run_for_all_plugins
def test_dir_reading_permissions(self, custom_transport):
"""
Try to enter a directory with no read permissions.
Verify that the cwd has not changed after failed try.
"""
# Imports required later
import random
import string
import os
with custom_transport as t:
location = t.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
t.chdir(location)
while t.isdir(directory):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
# create directory with non default permissions
t.mkdir(directory)
# change permissions to low ones
t.chmod(directory, 0)
# test if the security bits have changed
self.assertEquals(t.get_mode(directory), 0)
old_cwd = t.getcwd()
with self.assertRaises(IOError):
t.chdir(directory)
new_cwd = t.getcwd()
self.assertEquals(old_cwd, new_cwd)
# TODO : the test leaves a directory even if it is successful
# The bug is in paramiko. After lowering the permissions,
# I cannot restore them to higher values
#t.rmdir(directory)
[docs] @run_for_all_plugins
def test_isfile_isdir_to_empty_string(self, custom_transport):
"""
I check that isdir or isfile return False when executed on an
empty string
"""
import os
with custom_transport as t:
location = t.normalize(os.path.join('/', 'tmp'))
t.chdir(location)
self.assertFalse(t.isdir(""))
self.assertFalse(t.isfile(""))
[docs] @run_for_all_plugins
def test_isfile_isdir_to_non_existing_string(self, custom_transport):
"""
I check that isdir or isfile return False when executed on an
empty string
"""
import os
with custom_transport as t:
location = t.normalize(os.path.join('/', 'tmp'))
t.chdir(location)
fake_folder = 'pippo'
self.assertFalse(t.isfile(fake_folder))
self.assertFalse(t.isdir(fake_folder))
with self.assertRaises(IOError):
t.chdir(fake_folder)
[docs] @run_for_all_plugins
def test_chdir_to_empty_string(self, custom_transport):
"""
I check that if I pass an empty string to chdir, the cwd does
not change (this is a paramiko default behavior), but getcwd()
is still correctly defined.
"""
import os
with custom_transport as t:
new_dir = t.normalize(os.path.join('/', 'tmp'))
t.chdir(new_dir)
t.chdir("")
self.assertEquals(new_dir, t.getcwd())
[docs]class TestPutGetFile(unittest.TestCase):
"""
Test to verify whether the put and get functions behave correctly on files.
1) they work
2) they need abs paths where necessary, i.e. for local paths
3) they reject empty strings
"""
[docs] @run_for_all_plugins
def test_put_and_get(self, custom_transport):
import os
import random
import string
local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
with custom_transport as t:
t.chdir(remote_dir)
while t.isdir(directory):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
t.mkdir(directory)
t.chdir(directory)
local_file_name = os.path.join(local_dir, directory, 'file.txt')
remote_file_name = 'file_remote.txt'
retrieved_file_name = os.path.join(local_dir, directory, 'file_retrieved.txt')
text = 'Viva Verdi\n'
with open(local_file_name, 'w') as f:
f.write(text)
# here use full path in src and dst
t.put(local_file_name, remote_file_name)
t.get(remote_file_name, retrieved_file_name)
t.putfile(local_file_name, remote_file_name)
t.getfile(remote_file_name, retrieved_file_name)
list_of_files = t.listdir('.')
# it is False because local_file_name has the full path,
# while list_of_files has not
self.assertFalse(local_file_name in list_of_files)
self.assertTrue(remote_file_name in list_of_files)
self.assertFalse(retrieved_file_name in list_of_files)
os.remove(local_file_name)
t.remove(remote_file_name)
os.remove(retrieved_file_name)
t.chdir('..')
t.rmdir(directory)
[docs] @run_for_all_plugins
def test_put_get_abs_path(self, custom_transport):
"""
test of exception for non existing files and abs path
"""
import os
import random
import string
local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
with custom_transport as t:
t.chdir(remote_dir)
while t.isdir(directory):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
t.mkdir(directory)
t.chdir(directory)
partial_file_name = 'file.txt'
local_file_name = os.path.join(local_dir, directory, 'file.txt')
remote_file_name = 'file_remote.txt'
retrieved_file_name = os.path.join(local_dir, directory, 'file_retrieved.txt')
f = open(local_file_name, 'w')
f.close()
# partial_file_name is not an abs path
with self.assertRaises(ValueError):
t.put(partial_file_name, remote_file_name)
with self.assertRaises(ValueError):
t.putfile(partial_file_name, remote_file_name)
# retrieved_file_name does not exist
with self.assertRaises(OSError):
t.put(retrieved_file_name, remote_file_name)
with self.assertRaises(OSError):
t.putfile(retrieved_file_name, remote_file_name)
# remote_file_name does not exist
with self.assertRaises(IOError):
t.get(remote_file_name, retrieved_file_name)
with self.assertRaises(IOError):
t.getfile(remote_file_name, retrieved_file_name)
t.put(local_file_name, remote_file_name)
t.putfile(local_file_name, remote_file_name)
# local filename is not an abs path
with self.assertRaises(ValueError):
t.get(remote_file_name, 'delete_me.txt')
with self.assertRaises(ValueError):
t.getfile(remote_file_name, 'delete_me.txt')
t.remove(remote_file_name)
os.remove(local_file_name)
t.chdir('..')
t.rmdir(directory)
[docs] @run_for_all_plugins
def test_put_get_empty_string(self, custom_transport):
"""
test of exception put/get of empty strings
"""
# TODO : verify the correctness of \n at the end of a file
import os
import random
import string
local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
with custom_transport as t:
t.chdir(remote_dir)
while t.isdir(directory):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
t.mkdir(directory)
t.chdir(directory)
local_file_name = os.path.join(local_dir, directory, 'file_local.txt')
remote_file_name = 'file_remote.txt'
retrieved_file_name = os.path.join(local_dir, directory, 'file_retrieved.txt')
text = 'Viva Verdi\n'
with open(local_file_name, 'w') as f:
f.write(text)
# localpath is an empty string
# ValueError because it is not an abs path
with self.assertRaises(ValueError):
t.put('', remote_file_name)
with self.assertRaises(ValueError):
t.putfile('', remote_file_name)
# remote path is an empty string
with self.assertRaises(IOError):
t.put(local_file_name, '')
with self.assertRaises(IOError):
t.putfile(local_file_name, '')
t.put(local_file_name, remote_file_name)
# overwrite the remote_file_name
t.putfile(local_file_name, remote_file_name)
# remote path is an empty string
with self.assertRaises(IOError):
t.get('', retrieved_file_name)
with self.assertRaises(IOError):
t.getfile('', retrieved_file_name)
# local path is an empty string
# ValueError because it is not an abs path
with self.assertRaises(ValueError):
t.get(remote_file_name, '')
with self.assertRaises(ValueError):
t.getfile(remote_file_name, '')
# TODO : get doesn't retrieve empty files.
# Is it what we want?
t.get(remote_file_name, retrieved_file_name)
# overwrite retrieved_file_name
t.getfile(remote_file_name, retrieved_file_name)
os.remove(local_file_name)
t.remove(remote_file_name)
# If it couldn't end the copy, it leaves what he did on
# local file
self.assertTrue('file_retrieved.txt' in t.listdir('.'))
os.remove(retrieved_file_name)
t.chdir('..')
t.rmdir(directory)
[docs]class TestPutGetTree(unittest.TestCase):
"""
Test to verify whether the put and get functions behave correctly on folders.
1) they work
2) they need abs paths where necessary, i.e. for local paths
3) they reject empty strings
"""
[docs] @run_for_all_plugins
def test_put_and_get(self, custom_transport):
import os
import random
import string
local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
with custom_transport as t:
t.chdir(remote_dir)
while os.path.exists(os.path.join(local_dir, directory)):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
local_subfolder = os.path.join(local_dir, directory, 'tmp1')
remote_subfolder = 'tmp2'
retrieved_subfolder = os.path.join(local_dir, directory, 'tmp3')
os.mkdir(os.path.join(local_dir, directory))
os.mkdir(os.path.join(local_dir, directory, local_subfolder))
t.chdir(directory)
local_file_name = os.path.join(local_subfolder, 'file.txt')
text = 'Viva Verdi\n'
with open(local_file_name, 'w') as f:
f.write(text)
# here use full path in src and dst
for i in range(2):
if i == 0:
t.put(local_subfolder, remote_subfolder)
t.get(remote_subfolder, retrieved_subfolder)
else:
t.puttree(local_subfolder, remote_subfolder)
t.gettree(remote_subfolder, retrieved_subfolder)
# Here I am mixing the local with the remote fold
list_of_dirs = t.listdir('.')
# # it is False because local_file_name has the full path,
# # while list_of_files has not
self.assertFalse(local_subfolder in list_of_dirs)
self.assertTrue(remote_subfolder in list_of_dirs)
self.assertFalse(retrieved_subfolder in list_of_dirs)
self.assertTrue('tmp1' in list_of_dirs)
self.assertTrue('tmp3' in list_of_dirs)
list_pushed_file = t.listdir('tmp2')
list_retrieved_file = t.listdir('tmp3')
self.assertTrue('file.txt' in list_pushed_file)
self.assertTrue('file.txt' in list_retrieved_file)
import shutil
shutil.rmtree(local_subfolder)
shutil.rmtree(retrieved_subfolder)
t.rmtree(remote_subfolder)
t.chdir('..')
t.rmdir(directory)
[docs] @run_for_all_plugins
def test_put_and_get_overwrite(self, custom_transport):
import os, shutil
import random
import string
local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
with custom_transport as t:
t.chdir(remote_dir)
while os.path.exists(os.path.join(local_dir, directory)):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
local_subfolder = os.path.join(local_dir, directory, 'tmp1')
remote_subfolder = 'tmp2'
retrieved_subfolder = os.path.join(local_dir, directory, 'tmp3')
os.mkdir(os.path.join(local_dir, directory))
os.mkdir(os.path.join(local_dir, directory, local_subfolder))
t.chdir(directory)
local_file_name = os.path.join(local_subfolder, 'file.txt')
text = 'Viva Verdi\n'
with open(local_file_name, 'w') as f:
f.write(text)
t.put(local_subfolder, remote_subfolder)
t.get(remote_subfolder, retrieved_subfolder)
# by defaults rewrite everything
t.put(local_subfolder, remote_subfolder)
t.get(remote_subfolder, retrieved_subfolder)
with self.assertRaises(OSError):
t.put(local_subfolder, remote_subfolder, overwrite=False)
with self.assertRaises(OSError):
t.get(remote_subfolder, retrieved_subfolder, overwrite=False)
with self.assertRaises(OSError):
t.puttree(local_subfolder, remote_subfolder, overwrite=False)
with self.assertRaises(OSError):
t.gettree(remote_subfolder, retrieved_subfolder, overwrite=False)
shutil.rmtree(local_subfolder)
shutil.rmtree(retrieved_subfolder)
t.rmtree(remote_subfolder)
# t.rmtree(remote_subfolder)
# here I am mixing inevitably the local and the remote folder
t.chdir('..')
t.rmtree(directory)
[docs] @run_for_all_plugins
def test_copy(self, custom_transport):
import os
import random
import string
local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
with custom_transport as t:
t.chdir(remote_dir)
while os.path.exists(os.path.join(local_dir, directory)):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
t.mkdir(directory)
t.chdir(directory)
local_base_dir = os.path.join(local_dir, directory, 'local')
os.mkdir(local_base_dir)
# first test put: I create three files in local
file_1 = os.path.join(local_base_dir, 'a.txt')
file_2 = os.path.join(local_base_dir, 'b.tmp')
file_3 = os.path.join(local_base_dir, 'c.txt')
text = 'Viva Verdi\n'
for filename in [file_1, file_2, file_3]:
with open(filename, 'w') as f:
f.write(text)
# first test the copy. Copy of two files matching patterns, into a folder
t.copy(os.path.join('local', '*.txt'), '.')
self.assertEquals(set(['a.txt', 'c.txt', 'local']),
set(t.listdir('.')))
t.remove('a.txt')
t.remove('c.txt')
# second test copy. Copy of two folders
t.copy('local', 'prova')
self.assertEquals(set(['prova', 'local']),
set(t.listdir('.')))
self.assertEquals(set(['a.txt', 'b.tmp', 'c.txt']),
set(t.listdir('prova')))
t.rmtree('prova')
# third test copy. Can copy one file into a new file
t.copy(os.path.join('local', '*.tmp'), 'prova')
self.assertEquals(set(['prova', 'local']),
set(t.listdir('.')))
t.remove('prova')
# fourth test copy: can't copy more than one file on the same file,
# i.e., the destination should be a folder
with self.assertRaises(OSError):
t.copy(os.path.join('local', '*.txt'), 'prova')
# fifth test, copying one file into a folder
t.mkdir('prova')
t.copy(os.path.join('local', 'a.txt'), 'prova')
self.assertEquals(set(t.listdir('prova')),
set(['a.txt']))
t.rmtree('prova')
# sixth test, copying one file into a file
t.copy(os.path.join('local', 'a.txt'), 'prova')
self.assertTrue(t.isfile('prova'))
t.remove('prova')
# copy of folder into an existing folder
#NOTE: the command cp has a different behavior on Mac vs Ubuntu
#tests performed locally on a Mac may result in a failure.
t.mkdir('prova')
t.copy('local', 'prova')
self.assertEquals(set(['local']),
set(t.listdir('prova')))
self.assertEquals(set(['a.txt', 'b.tmp', 'c.txt']),
set(t.listdir(os.path.join('prova', 'local'))))
t.rmtree('prova')
# exit
t.chdir('..')
t.rmtree(directory)
[docs] @run_for_all_plugins
def test_put(self, custom_transport):
# exactly the same tests of copy, just with the put function
# and therefore the local path must be absolute
import os
import random
import string
local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
with custom_transport as t:
t.chdir(remote_dir)
while os.path.exists(os.path.join(local_dir, directory)):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
t.mkdir(directory)
t.chdir(directory)
local_base_dir = os.path.join(local_dir, directory, 'local')
os.mkdir(local_base_dir)
# first test put: I create three files in local
file_1 = os.path.join(local_base_dir, 'a.txt')
file_2 = os.path.join(local_base_dir, 'b.tmp')
file_3 = os.path.join(local_base_dir, 'c.txt')
text = 'Viva Verdi\n'
for filename in [file_1, file_2, file_3]:
with open(filename, 'w') as f:
f.write(text)
# first test put. Copy of two files matching patterns, into a folder
t.put(os.path.join(local_base_dir, '*.txt'), '.')
self.assertEquals(set(['a.txt', 'c.txt', 'local']),
set(t.listdir('.')))
t.remove('a.txt')
t.remove('c.txt')
# second. Copy of folder into a non existing folder
t.put(local_base_dir, 'prova')
self.assertEquals(set(['prova', 'local']),
set(t.listdir('.')))
self.assertEquals(set(['a.txt', 'b.tmp', 'c.txt']),
set(t.listdir('prova')))
t.rmtree('prova')
# third. copy of folder into an existing folder
t.mkdir('prova')
t.put(local_base_dir, 'prova')
self.assertEquals(set(['prova', 'local']),
set(t.listdir('.')))
self.assertEquals(set(['local']),
set(t.listdir('prova')))
self.assertEquals(set(['a.txt', 'b.tmp', 'c.txt']),
set(t.listdir(os.path.join('prova', 'local'))))
t.rmtree('prova')
# third test copy. Can copy one file into a new file
t.put(os.path.join(local_base_dir, '*.tmp'), 'prova')
self.assertEquals(set(['prova', 'local']),
set(t.listdir('.')))
t.remove('prova')
# fourth test copy: can't copy more than one file on the same file,
# i.e., the destination should be a folder
with self.assertRaises(OSError):
t.put(os.path.join(local_base_dir, '*.txt'), 'prova')
# copy of folder into file
with open(os.path.join(local_dir, directory, 'existing.txt'), 'w') as f:
f.write(text)
with self.assertRaises(OSError):
t.put(os.path.join(local_base_dir), 'existing.txt')
t.remove('existing.txt')
# fifth test, copying one file into a folder
t.mkdir('prova')
t.put(os.path.join(local_base_dir, 'a.txt'), 'prova')
self.assertEquals(set(t.listdir('prova')),
set(['a.txt']))
t.rmtree('prova')
# sixth test, copying one file into a file
t.put(os.path.join(local_base_dir, 'a.txt'), 'prova')
self.assertTrue(t.isfile('prova'))
t.remove('prova')
# exit
t.chdir('..')
t.rmtree(directory)
[docs] @run_for_all_plugins
def test_get(self, custom_transport):
# exactly the same tests of copy, just with the put function
# and therefore the local path must be absolute
import os
import random
import string, shutil
local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
with custom_transport as t:
t.chdir(remote_dir)
while os.path.exists(os.path.join(local_dir, directory)):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
t.mkdir(directory)
t.chdir(directory)
local_base_dir = os.path.join(local_dir, directory, 'local')
local_destination = os.path.join(local_dir, directory)
os.mkdir(local_base_dir)
# first test put: I create three files in local
file_1 = os.path.join(local_base_dir, 'a.txt')
file_2 = os.path.join(local_base_dir, 'b.tmp')
file_3 = os.path.join(local_base_dir, 'c.txt')
text = 'Viva Verdi\n'
for filename in [file_1, file_2, file_3]:
with open(filename, 'w') as f:
f.write(text)
# first test put. Copy of two files matching patterns, into a folder
t.get(os.path.join('local', '*.txt'), local_destination)
self.assertEquals(set(['a.txt', 'c.txt', 'local']),
set(os.listdir(local_destination)))
os.remove(os.path.join(local_destination, 'a.txt'))
os.remove(os.path.join(local_destination, 'c.txt'))
# second. Copy of folder into a non existing folder
t.get('local', os.path.join(local_destination, 'prova'))
self.assertEquals(set(['prova', 'local']),
set(os.listdir(local_destination)))
self.assertEquals(set(['a.txt', 'b.tmp', 'c.txt']),
set(os.listdir(os.path.join(local_destination, 'prova'))))
shutil.rmtree(os.path.join(local_destination, 'prova'))
# third. copy of folder into an existing folder
os.mkdir(os.path.join(local_destination, 'prova'))
t.get('local', os.path.join(local_destination, 'prova'))
self.assertEquals(set(['prova', 'local']),
set(os.listdir(local_destination)))
self.assertEquals(set(['local']),
set(os.listdir(os.path.join(local_destination, 'prova'))))
self.assertEquals(set(['a.txt', 'b.tmp', 'c.txt']),
set(os.listdir(os.path.join(local_destination, 'prova', 'local'))))
shutil.rmtree(os.path.join(local_destination, 'prova'))
# third test copy. Can copy one file into a new file
t.get(os.path.join('local', '*.tmp'), os.path.join(local_destination, 'prova'))
self.assertEquals(set(['prova', 'local']),
set(os.listdir(local_destination)))
os.remove(os.path.join(local_destination, 'prova'))
# fourth test copy: can't copy more than one file on the same file,
# i.e., the destination should be a folder
with self.assertRaises(OSError):
t.get(os.path.join('local', '*.txt'), os.path.join(local_destination, 'prova'))
# copy of folder into file
with open(os.path.join(local_destination, 'existing.txt'), 'w') as f:
f.write(text)
with self.assertRaises(OSError):
t.get('local', os.path.join(local_destination, 'existing.txt'))
os.remove(os.path.join(local_destination, 'existing.txt'))
# fifth test, copying one file into a folder
os.mkdir(os.path.join(local_destination, 'prova'))
t.get(os.path.join('local', 'a.txt'), os.path.join(local_destination, 'prova'))
self.assertEquals(set(os.listdir(os.path.join(local_destination, 'prova'))),
set(['a.txt']))
shutil.rmtree(os.path.join(local_destination, 'prova'))
# sixth test, copying one file into a file
t.get(os.path.join('local', 'a.txt'), os.path.join(local_destination, 'prova'))
self.assertTrue(os.path.isfile(os.path.join(local_destination, 'prova')))
os.remove(os.path.join(local_destination, 'prova'))
# exit
t.chdir('..')
t.rmtree(directory)
[docs] @run_for_all_plugins
def test_put_get_abs_path(self, custom_transport):
"""
test of exception for non existing files and abs path
"""
import os
import random
import string
local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
with custom_transport as t:
t.chdir(remote_dir)
while os.path.exists(os.path.join(local_dir, directory)):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
local_subfolder = os.path.join(local_dir, directory, 'tmp1')
remote_subfolder = 'tmp2'
retrieved_subfolder = os.path.join(local_dir, directory, 'tmp3')
os.mkdir(os.path.join(local_dir, directory))
os.mkdir(os.path.join(local_dir, directory, local_subfolder))
t.chdir(directory)
local_file_name = os.path.join(local_subfolder, 'file.txt')
f = open(local_file_name, 'w')
f.close()
# 'tmp1' is not an abs path
with self.assertRaises(ValueError):
t.put('tmp1', remote_subfolder)
with self.assertRaises(ValueError):
t.putfile('tmp1', remote_subfolder)
with self.assertRaises(ValueError):
t.puttree('tmp1', remote_subfolder)
# 'tmp3' does not exist
with self.assertRaises(OSError):
t.put(retrieved_subfolder, remote_subfolder)
with self.assertRaises(OSError):
t.putfile(retrieved_subfolder, remote_subfolder)
with self.assertRaises(OSError):
t.puttree(retrieved_subfolder, remote_subfolder)
# remote_file_name does not exist
with self.assertRaises(IOError):
t.get('non_existing', retrieved_subfolder)
with self.assertRaises(IOError):
t.getfile('non_existing', retrieved_subfolder)
with self.assertRaises(IOError):
t.gettree('non_existing', retrieved_subfolder)
t.put(local_subfolder, remote_subfolder)
# local filename is not an abs path
with self.assertRaises(ValueError):
t.get(remote_subfolder, 'delete_me_tree')
with self.assertRaises(ValueError):
t.getfile(remote_subfolder, 'delete_me_tree')
with self.assertRaises(ValueError):
t.gettree(remote_subfolder, 'delete_me_tree')
os.remove(os.path.join(local_subfolder, 'file.txt'))
os.rmdir(local_subfolder)
t.rmtree(remote_subfolder)
t.chdir('..')
t.rmdir(directory)
[docs] @run_for_all_plugins
def test_put_get_empty_string(self, custom_transport):
"""
test of exception put/get of empty strings
"""
# TODO : verify the correctness of \n at the end of a file
import os
import random
import string
local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
with custom_transport as t:
t.chdir(remote_dir)
while os.path.exists(os.path.join(local_dir, directory)):
# I append a random letter/number until it is unique
directory += random.choice(
string.ascii_uppercase + string.digits)
local_subfolder = os.path.join(local_dir, directory, 'tmp1')
remote_subfolder = 'tmp2'
retrieved_subfolder = os.path.join(local_dir, directory, 'tmp3')
os.mkdir(os.path.join(local_dir, directory))
os.mkdir(os.path.join(local_dir, directory, local_subfolder))
t.chdir(directory)
local_file_name = os.path.join(local_subfolder, 'file.txt')
text = 'Viva Verdi\n'
with open(local_file_name, 'w') as f:
f.write(text)
# localpath is an empty string
# ValueError because it is not an abs path
with self.assertRaises(ValueError):
t.puttree('', remote_subfolder)
# remote path is an empty string
with self.assertRaises(IOError):
t.puttree(local_subfolder, '')
t.puttree(local_subfolder, remote_subfolder)
# remote path is an empty string
with self.assertRaises(IOError):
t.gettree('', retrieved_subfolder)
# local path is an empty string
# ValueError because it is not an abs path
with self.assertRaises(ValueError):
t.gettree(remote_subfolder, '')
# TODO : get doesn't retrieve empty files.
# Is it what we want?
t.gettree(remote_subfolder, retrieved_subfolder)
os.remove(os.path.join(local_subfolder, 'file.txt'))
os.rmdir(local_subfolder)
t.remove(os.path.join(remote_subfolder, 'file.txt'))
t.rmdir(remote_subfolder)
# If it couldn't end the copy, it leaves what he did on local file
# here I am mixing local with remote
self.assertTrue('file.txt' in t.listdir('tmp3'))
os.remove(os.path.join(retrieved_subfolder, 'file.txt'))
os.rmdir(retrieved_subfolder)
t.chdir('..')
t.rmdir(directory)
[docs]class TestExecuteCommandWait(unittest.TestCase):
"""
Test some simple command executions and stdin/stdout management.
It also checks for escaping of the folder names.
"""
[docs] @run_for_all_plugins
def test_exec_pwd(self, custom_transport):
"""
I create a strange subfolder with a complicated name and
then see if I can run pwd. This also checks the correct
escaping of funny characters, both in the directory
creation (which should be done by paramiko) and in the command
execution (done in this module, in the _exec_command_internal function).
"""
import os
# Start value
delete_at_end = False
with custom_transport as t:
# To compare with: getcwd uses the normalized ('realpath') path
location = t.normalize('/tmp')
subfolder = """_'s f"#""" # A folder with characters to escape
subfolder_fullpath = os.path.join(location, subfolder)
t.chdir(location)
if not t.isdir(subfolder):
# Since I created the folder, I will remember to
# delete it at the end of this test
delete_at_end = True
t.mkdir(subfolder)
self.assertTrue(t.isdir(subfolder))
t.chdir(subfolder)
self.assertEquals(subfolder_fullpath, t.getcwd())
retcode, stdout, stderr = t.exec_command_wait('pwd')
self.assertEquals(retcode, 0)
# I have to strip it because 'pwd' returns a trailing \n
self.assertEquals(stdout.strip(), subfolder_fullpath)
self.assertEquals(stderr, "")
if delete_at_end:
t.chdir(location)
t.rmdir(subfolder)
[docs] @run_for_all_plugins
def test_exec_with_stdin_string(self, custom_transport):
test_string = str("some_test String")
with custom_transport as t:
retcode, stdout, stderr = t.exec_command_wait(
'cat', stdin=test_string)
self.assertEquals(retcode, 0)
self.assertEquals(stdout, test_string)
self.assertEquals(stderr, "")
[docs] @run_for_all_plugins
def test_exec_with_stdin_unicode(self, custom_transport):
test_string = u"some_test String"
with custom_transport as t:
retcode, stdout, stderr = t.exec_command_wait(
'cat', stdin=test_string)
self.assertEquals(retcode, 0)
self.assertEquals(stdout, test_string)
self.assertEquals(stderr, "")
[docs] @run_for_all_plugins
def test_exec_with_stdin_filelike(self, custom_transport):
import StringIO
test_string = "some_test String"
stdin = StringIO.StringIO(test_string)
with custom_transport as t:
retcode, stdout, stderr = t.exec_command_wait(
'cat', stdin=stdin)
self.assertEquals(retcode, 0)
self.assertEquals(stdout, test_string)
self.assertEquals(stderr, "")
[docs] @run_for_all_plugins
def test_exec_with_wrong_stdin(self, custom_transport):
# I pass a number
with custom_transport as t:
with self.assertRaises(ValueError):
_ = t.exec_command_wait(
'cat', stdin=1)