Source code for aiida.schedulers.plugins.test_lsf

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################

from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import unittest
import logging
import uuid

from aiida.schedulers.plugins.lsf import *

BJOBS_STDOUT_TO_TEST = "764213236|EXIT|TERM_RUNLIMIT: job killed after reaching LSF run time limit" \
                       "|b681e480bd|inewton|1|-|b681e480bd|test|Feb  2 00:46|Feb  2 00:45|-|Feb  2 00:44|aiida-1033269\n" \
                       "764220165|PEND|-|-|inewton|-|-|-|8nm|-|-|-|Feb  2 01:46|aiida-1033444\n" \
                       "764220167|PEND|-|-|fchopin|-|-|-|test|-|-|-|Feb  2 01:53 L|aiida-1033449\n" \
                       "764254593|RUN|-|lxbsu2710|inewton|1|-|lxbsu2710|test|Feb  2 07:40|Feb  2 07:39|-|Feb  2 07:39|test\n" \
                       "764255172|RUN|-|b68ac74822|inewton|1|-|b68ac74822|test|Feb  2 07:48 L|Feb  2 07:47|15.00% L|Feb  2 07:47|test\n" \
                       "764245175|RUN|-|b68ac74822|dbowie|1|-|b68ac74822|test|Jan  1 05:07|Dec  31 23:48 L|25.00%|Dec  31 23:40|test\n" \
                       "764399747|DONE|-|p05496706j68144|inewton|1|-|p05496706j68144|test|Feb  2 14:56 L|Feb  2 14:54|38.33% L|Feb  2 14:54|test"
BJOBS_STDERR_TO_TEST = "Job <864220165> is not found"

SUBMIT_STDOUT_TO_TEST = "Job <764254593> is submitted to queue <test>."
BKILL_STDOUT_TO_TEST = "Job <764254593> is being terminated"


[docs]class TestParserBjobs(unittest.TestCase): """ Tests to verify if the function _parse_joblist_output behave correctly The tests is done parsing a string defined above, to be used offline """
[docs] def test_parse_common_joblist_output(self): """ Test whether _parse_joblist can parse the bjobs output """ import datetime scheduler = LsfScheduler() # Disable logging to avoid excessive output during test logging.disable(logging.ERROR) retval = 255 stdout = BJOBS_STDOUT_TO_TEST stderr = BJOBS_STDERR_TO_TEST with self.assertRaises(SchedulerError): job_list = scheduler._parse_joblist_output(retval, stdout, stderr) retval = 0 stdout = BJOBS_STDOUT_TO_TEST stderr = "" job_list = scheduler._parse_joblist_output(retval, stdout, stderr) # The parameters are hard coded in the text to parse job_on_cluster = 7 job_parsed = len(job_list) self.assertEquals(job_parsed, job_on_cluster) job_queued = 2 job_queue_name = ['8nm', 'test'] job_queued_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.QUEUED]) job_queue_name_parsed = [j.queue_name for j in job_list if j.job_state and j.job_state == JobState.QUEUED] self.assertEquals(job_queued, job_queued_parsed) self.assertEquals(job_queue_name, job_queue_name_parsed) job_done = 2 job_done_title = ['aiida-1033269', 'test'] job_done_annotation = ['TERM_RUNLIMIT: job killed after reaching LSF run time limit', '-'] job_done_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.DONE]) job_done_title_parsed = [j.title for j in job_list if j.job_state and j.job_state == JobState.DONE] job_done_annotation_parsed = [j.annotation for j in job_list if j.job_state and j.job_state == JobState.DONE] self.assertEquals(job_done, job_done_parsed) self.assertEquals(job_done_title, job_done_title_parsed) self.assertEquals(job_done_annotation, job_done_annotation_parsed) job_running = 3 job_running_parsed = len([j for j in job_list if j.job_state \ and j.job_state == JobState.RUNNING]) self.assertEquals(job_running, job_running_parsed) running_users = ['inewton', 'inewton', 'dbowie'] parsed_running_users = [j.job_owner for j in job_list if j.job_state and j.job_state == JobState.RUNNING] self.assertEquals(running_users, parsed_running_users) running_jobs = ['764254593', '764255172', '764245175'] num_machines = [1, 1, 1] allocated_machines = ['lxbsu2710', 'b68ac74822', 'b68ac74822'] parsed_running_jobs = [j.job_id for j in job_list if j.job_state and j.job_state == JobState.RUNNING] parsed_num_machines = [j.num_machines for j in job_list if j.job_state and j.job_state == JobState.RUNNING] parsed_allocated_machines = [ j.allocated_machines_raw for j in job_list if j.job_state and j.job_state == JobState.RUNNING ] self.assertEquals(running_jobs, parsed_running_jobs) self.assertEquals(num_machines, parsed_num_machines) self.assertEquals(allocated_machines, parsed_allocated_machines) self.assertEquals([j.requested_wallclock_time_seconds for j in job_list if j.job_id == '764254593'][0], 60) self.assertEquals([j.wallclock_time_seconds for j in job_list if j.job_id == '764255172'][0], 9) self.assertEquals([j.wallclock_time_seconds for j in job_list if j.job_id == '764245175'][0], 4785) current_year = datetime.datetime.now().year self.assertEquals([j.submission_time for j in job_list if j.job_id == '764245175'][0], datetime.datetime(current_year, 12, 31, 23, 40)) # Important to enable again logs! logging.disable(logging.NOTSET)
[docs]class TestSubmitScript(unittest.TestCase):
[docs] def test_submit_script(self): """ Test the creation of a simple submission script. """ from aiida.schedulers.datastructures import JobTemplate from aiida.common.datastructures import CodeInfo, CodeRunMode scheduler = LsfScheduler() job_tmpl = JobTemplate() job_tmpl.shebang = '#!/bin/bash' job_tmpl.uuid = str(uuid.uuid4()) job_tmpl.job_resource = scheduler.create_job_resource(tot_num_mpiprocs=2, parallel_env='b681e480bd.cern.ch') job_tmpl.max_wallclock_seconds = 24 * 3600 code_info = CodeInfo() code_info.cmdline_params = ["mpirun", "-np", "2", "pw.x", "-npool", "1"] code_info.stdin_name = 'aiida.in' job_tmpl.codes_info = [code_info] job_tmpl.codes_run_mode = CodeRunMode.SERIAL submit_script_text = scheduler.get_submit_script(job_tmpl) self.assertTrue(submit_script_text.startswith('#!/bin/bash')) self.assertTrue('#BSUB -rn' in submit_script_text) self.assertTrue('#BSUB -W 24:00' in submit_script_text) self.assertTrue('#BSUB -n 2' in submit_script_text) self.assertTrue("'mpirun' '-np' '2' 'pw.x' '-npool' '1'" + \ " < 'aiida.in'" in submit_script_text)
[docs] def test_submit_script_with_num_machines(self): """ Test to verify that script fails if we specify only num_machines. """ from aiida.schedulers.datastructures import JobTemplate scheduler = LsfScheduler() job_tmpl = JobTemplate() with self.assertRaises(TypeError): job_tmpl.job_resource = scheduler.create_job_resource( num_machines=1, num_mpiprocs_per_machine=1, )
[docs]class TestParserSubmit(unittest.TestCase):
[docs] def test_submit_output(self): """ Test the parsing of the output of the submission command """ scheduler = LsfScheduler() retval = 0 stdout = SUBMIT_STDOUT_TO_TEST stderr = '' self.assertEquals(scheduler._parse_submit_output(retval, stdout, stderr), '764254593')
[docs]class TestParserBkill(unittest.TestCase):
[docs] def test_kill_output(self): """ Test the parsing of the output of the submission command """ scheduler = LsfScheduler() retval = 0 stdout = BKILL_STDOUT_TO_TEST stderr = '' self.assertTrue(scheduler._parse_kill_output(retval, stdout, stderr))
if __name__ == '__main__': unittest.main()