Source code for aiida.backends.testbase

# -*- coding: utf-8 -*-
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit               #
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function

import os
import sys
import unittest
import traceback

from tornado import ioloop

from aiida.backends.tests import get_db_test_list
from aiida.common.exceptions import ConfigurationError, TestsNotAllowedError, InternalError
from aiida.common.lang import classproperty
from aiida.manage import configuration
from aiida.manage.manager import get_manager, reset_manager

[docs]def check_if_tests_can_run(): """Verify that the currently loaded profile is a test profile, otherwise raise `TestsNotAllowedError`.""" profile = configuration.PROFILE if not profile.is_test_profile: raise TestsNotAllowedError('currently loaded profile {} is not a valid test profile'.format(
[docs]class AiidaTestCase(unittest.TestCase): """ This is the base class for AiiDA tests, independent of the backend. Internally it loads the AiidaTestImplementation subclass according to the current backend """ _class_was_setup = False __backend_instance = None backend = None # type: aiida.orm.implementation.Backend
[docs] @classmethod def get_backend_class(cls): from aiida.backends.testimplbase import AiidaTestImplementation from aiida.backends import BACKEND_SQLA, BACKEND_DJANGO from aiida.manage.configuration import PROFILE # Freeze the __impl_class after the first run if not hasattr(cls, '__impl_class'): if PROFILE.database_backend == BACKEND_SQLA: from aiida.backends.sqlalchemy.tests.testbase import ( SqlAlchemyTests) cls.__impl_class = SqlAlchemyTests elif PROFILE.database_backend == BACKEND_DJANGO: from aiida.backends.djsite.db.testbase import DjangoTests cls.__impl_class = DjangoTests else: raise ConfigurationError('Unknown backend type') # Check that it is of the right class if not issubclass(cls.__impl_class, AiidaTestImplementation): raise InternalError( 'The AiiDA test implementation is not of type ' '{}, that is not a subclass of AiidaTestImplementation'.format( cls.__impl_class.__name__ ) ) return cls.__impl_class
[docs] @classmethod def setUpClass(cls, *args, **kwargs): # Note: this will raise an exception, that will be seen as a test # failure. To be safe, you should do the same check also in the tearDownClass # to avoid that it is run check_if_tests_can_run() # Force the loading of the backend which will load the required database environment get_manager().get_backend() cls.__backend_instance = cls.get_backend_class()() cls.__backend_instance.setUpClass_method(*args, **kwargs) cls.backend = cls.__backend_instance.backend cls.insert_data() cls._class_was_setup = True
[docs] def setUp(self): # Install a new IOLoop so that any messing up of the state of the loop is not propagated # to subsequent tests. # This call should come before the backend instance setup call just in case it uses the loop ioloop.IOLoop().make_current() self.__backend_instance.setUp_method()
[docs] def tearDown(self): self.__backend_instance.tearDown_method() # Clean up the loop we created in set up. # Call this after the instance tear down just in case it uses the loop reset_manager() loop = ioloop.IOLoop.current() if not loop._closing: loop.close()
[docs] def reset_database(self): """Reset the database to the default state deleting any content currently stored""" self.clean_db() self.insert_data()
[docs] @classmethod def insert_data(cls): """ This method setups the database (by creating a default user) and inserts default data into the database (which is for the moment a default computer). """ cls.create_user() cls.create_computer()
[docs] @classmethod def create_user(cls): cls.__backend_instance.create_user()
[docs] @classmethod def create_computer(cls): cls.__backend_instance.create_computer()
[docs] @classmethod def clean_db(cls): """Clean up database and reset caches. Resets AiiDA manager cache, which could otherwise be left in an inconsistent state when cleaning the database. """ from aiida.common.exceptions import InvalidOperation # Note: this will raise an exception, that will be seen as a test # failure. To be safe, you should do the same check also in the tearDownClass # to avoid that it is run check_if_tests_can_run() if not cls._class_was_setup: raise InvalidOperation('You cannot call clean_db before running the setUpClass') cls.__backend_instance.clean_db() reset_manager()
[docs] @classmethod def clean_repository(cls): """ Cleans up file repository. """ from aiida.manage.configuration import get_profile from aiida.common.exceptions import InvalidOperation import shutil dirpath_repository = get_profile().repository_path TEST_KEYWORD = 'test_' base_repo_path = os.path.basename(os.path.normpath(dirpath_repository)) if TEST_KEYWORD not in base_repo_path: raise InvalidOperation('Warning: The repository folder {} does not ' 'seem to belong to a test profile and will therefore not be deleted.\n' 'Full repository path: ' '{}'.format(base_repo_path, dirpath_repository)) # Clean the test repository shutil.rmtree(dirpath_repository, ignore_errors=True) os.makedirs(dirpath_repository)
@classproperty def computer(cls): """ Get the default computer for this test :return: the test computer :rtype: :class:`aiida.orm.Computer` """ return cls.__backend_instance.get_computer() @classproperty def user_email(cls): return cls.__backend_instance.get_user_email()
[docs] @classmethod def tearDownClass(cls, *args, **kwargs): # Double check for double security to avoid to run the tearDown # if this is not a test profile check_if_tests_can_run() cls.clean_db() cls.clean_repository() cls.__backend_instance.tearDownClass_method(*args, **kwargs)
[docs] def assertClickSuccess(self, cli_result): self.assertEqual(cli_result.exit_code, 0, cli_result.output) self.assertClickResultNoException(cli_result)
[docs] def assertClickResultNoException(self, cli_result): self.assertIsNone(cli_result.exception, ''.join(traceback.format_exception(*cli_result.exc_info)))
[docs]class AiidaPostgresTestCase(AiidaTestCase):
[docs] @classmethod def setUpClass(cls, *args, **kwargs): """Setup the PGTest postgres test cluster.""" from pgtest.pgtest import PGTest cls.pg_test = PGTest() super(AiidaPostgresTestCase, cls).setUpClass(*args, **kwargs)
[docs] @classmethod def tearDownClass(cls, *args, **kwargs): """Close the PGTest postgres test cluster.""" super(AiidaPostgresTestCase, cls).tearDownClass(*args, **kwargs) cls.pg_test.close()
[docs]def run_aiida_db_tests(tests_to_run, verbose=False): """ Run all tests specified in tests_to_run. Return the list of test results. """ # Empty test suite that will be populated test_suite = unittest.TestSuite() actually_run_tests = [] num_tests_expected = 0 # To avoid adding more than once the same test # (e.g. if you type both db and found_modulenames = set() for test in set(tests_to_run): try: modulenames = get_db_test_list()[test] except KeyError: if verbose: print('Unknown DB test {}... skipping' .format(test), file=sys.stderr) continue actually_run_tests.append(test) for modulename in modulenames: if modulename not in found_modulenames: try: test_suite.addTest(unittest.defaultTestLoader.loadTestsFromName(modulename)) except AttributeError as exception: try: import importlib importlib.import_module(modulename) except ImportError as exception: print("[CRITICAL] The module '{}' has an import error and the tests cannot be run:\n{}" .format(modulename, traceback.format_exc(exception)), file=sys.stderr) sys.exit(1) found_modulenames.add(modulename) num_tests_expected = test_suite.countTestCases() if verbose: print('DB tests that will be run: {} (expecting {} tests)' .format(','.join(actually_run_tests), num_tests_expected), file=sys.stderr) results = unittest.TextTestRunner(failfast=False, verbosity=2).run(test_suite) else: results = unittest.TextTestRunner(failfast=False).run(test_suite) if verbose: print('Run tests: {}'.format(results.testsRun)) return results