Source code for aiida.schedulers.plugins.sge

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
"""
Plugin for SGE.
This has been tested on GE 6.2u3.

Plugin originally written by Marco Dorigo.
Email: marco(DOT)dorigo(AT)rub(DOT)de
"""
import xml.parsers.expat
import xml.dom.minidom

from aiida.common.escaping import escape_for_bash
import aiida.schedulers
from aiida.schedulers import SchedulerError, SchedulerParsingError
from aiida.schedulers.datastructures import (JobInfo, JobState, ParEnvJobResource)

# 'http://www.loni.ucla.edu/twiki/bin/view/Infrastructure/GridComputing?skin=plain':
# Jobs Status:
#     'qw' - Queued and waiting,
#     'w' - Job waiting,
#     's' - Job suspended,
#     't' - Job transferring and about to start,
#     'r' - Job running,
#     'h' - Job hold,
#     'R' - Job restarted,
#     'd' - Job has been marked for deletion,
#     'Eqw' - An error occurred with the job.
#
# 'http://confluence.rcs.griffith.edu.au:8080/display/v20zCluster/
# Sun+Grid+Engine+SGE+state+letter+symbol+codes+meanings':
#
# Category     State     SGE Letter Code
# Pending:     pending     qw
# Pending:     pending, user hold     qw
# Pending:     pending, system hold     hqw
# Pending:     pending, user and system hold     hqw
# Pending:     pending, user hold, re-queue     hRwq
# Pending:     pending, system hold, re-queue     hRwq
# Pending:     pending, user and system hold, re-queue     hRwq
# Pending:     pending, user hold     qw
# Pending:     pending, user hold     qw
# Running     running     r
# Running     transferring     t
# Running     running, re-submit     Rr
# Running     transferring, re-submit     Rt
# Suspended     job suspended     s, ts
# Suspended     queue suspended     S, tS
# Suspended     queue suspended by alarm     T, tT
# Suspended     all suspended with re-submit     Rs, Rts, RS, RtS, RT, RtT
# Error     all pending states with error     Eqw, Ehqw, EhRqw
# Deleted     all running and suspended states with deletion     dr, dt, dRr, dRt,
#                                                                ds, dS, dT, dRs,
#                                                                dRS, dRT
_MAP_STATUS_SGE = {
    'qw': JobState.QUEUED,
    'w': JobState.QUEUED,
    'hqw': JobState.QUEUED_HELD,
    'hRwq': JobState.QUEUED_HELD,
    'r': JobState.RUNNING,
    't': JobState.RUNNING,
    'R': JobState.RUNNING,
    'Rr': JobState.RUNNING,
    'Rt': JobState.RUNNING,
    's': JobState.SUSPENDED,
    'st': JobState.SUSPENDED,
    'Rs': JobState.SUSPENDED,
    'Rts': JobState.SUSPENDED,
    'dr': JobState.UNDETERMINED,
    'dt': JobState.UNDETERMINED,
    'ds': JobState.UNDETERMINED,
    'dRr': JobState.UNDETERMINED,
    'dRt': JobState.UNDETERMINED,
    'dRs': JobState.UNDETERMINED,
    'Eqw': JobState.UNDETERMINED,
    'Ehqw': JobState.UNDETERMINED,
    'EhRqw': JobState.UNDETERMINED
}


[docs]class SgeJobResource(ParEnvJobResource): pass
[docs]class SgeScheduler(aiida.schedulers.Scheduler): """ Support for the Sun Grid Engine scheduler and its variants/forks (Son of Grid Engine, Oracle Grid Engine, ...) """ _logger = aiida.schedulers.Scheduler._logger.getChild('sge') # For SGE, we can have a good qstat xml output by querying by # user, but not by job id _features = { 'can_query_by_user': True, } # The class to be used for the job resource. _job_resource_class = SgeJobResource
[docs] def _get_joblist_command(self, jobs=None, user=None): """ The command to report full information on existing jobs. TODO: in the case of job arrays, decide what to do (i.e., if we want to pass the -t options to list each subjob). !!!ALL COPIED FROM PBSPRO!!! TODO: understand if it is worth escaping the username, or rather leave it unescaped to allow to pass $USER """ from aiida.common.exceptions import FeatureNotAvailable if jobs: raise FeatureNotAvailable('Cannot query by jobid in SGE') command = 'qstat -ext -urg -xml ' if user: command += '-u {}'.format(str(user)) else: # All users if no user is specified command += "-u '*'" self.logger.debug('qstat command: {}'.format(command)) return command
# raise NotImplementedError
[docs] def _get_detailed_job_info_command(self, job_id): command = 'qacct -j {}'.format(escape_for_bash(job_id)) return command
[docs] def _get_submit_script_header(self, job_tmpl): """ Return the submit script header, using the parameters from the job_tmpl. Args: job_tmpl: an JobTemplate instance with relevant parameters set. TODO: truncate the title if too long """ # pylint: disable=too-many-statements,too-many-branches import re import string empty_line = '' lines = [] # SGE provides flags for wd and cwd if job_tmpl.working_directory: lines.append('#$ -wd {}'.format(job_tmpl.working_directory)) else: lines.append('#$ -cwd') # Enforce bash shell lines.append('#$ -S /bin/bash') if job_tmpl.submit_as_hold: # if isinstance(job_tmpl.submit_as_hold, str): lines.append('#$ -h {}'.format(job_tmpl.submit_as_hold)) if job_tmpl.rerunnable: # if isinstance(job_tmpl.rerunnable, str): lines.append('#$ -r {}'.format(job_tmpl.rerunnable)) if job_tmpl.email: # If not specified, but email events are set, PBSPro # sends the mail to the job owner by default lines.append('#$ -M {}'.format(job_tmpl.email)) email_events = '' if job_tmpl.email_on_started: email_events += 'b' if job_tmpl.email_on_terminated: email_events += 'ea' if email_events: lines.append('#$ -m {}'.format(email_events)) if not job_tmpl.email: self.logger.info( 'Email triggers provided to SGE script for job,' 'but no email field set; will send emails to ' 'the job owner as set in the scheduler' ) else: lines.append('#$ -m n') # From the qsub man page: # "The name may be any arbitrary alphanumeric ASCII string, but # may not contain "\n", "\t", "\r", "/", ":", "@", "\", "*", # or "?"." if job_tmpl.job_name: job_title = re.sub(r'[^a-zA-Z0-9_.-]+', '', job_tmpl.job_name) # prepend a 'j' (for 'job') before the string if the string # is now empty or does not start with a valid character # (the first symbol cannot be digit, at least in some versions # of the scheduler) if not job_title or (job_title[0] not in string.ascii_letters): job_title = 'j' + job_title lines.append('#$ -N {}'.format(job_tmpl.job_name)) if job_tmpl.import_sys_environment: lines.append('#$ -V') if job_tmpl.sched_output_path: lines.append('#$ -o {}'.format(job_tmpl.sched_output_path)) if job_tmpl.sched_join_files: # from qsub man page: # 'y': Standard error and standard output are merged into # standard output # 'n' : Standard error and standard output are not merged (default) lines.append('#$ -j y') if job_tmpl.sched_error_path: self.logger.info( 'sched_join_files is True, but sched_error_path is set in ' 'SGE script; ignoring sched_error_path' ) else: if job_tmpl.sched_error_path: lines.append('#$ -e {}'.format(job_tmpl.sched_error_path)) if job_tmpl.queue_name: lines.append('#$ -q {}'.format(job_tmpl.queue_name)) if job_tmpl.account: lines.append('#$ -P {}'.format(job_tmpl.account)) if job_tmpl.priority: # Priority of the job. Format: host-dependent integer. Default: # zero. Range: [-1023, +1024]. Sets job's Priority # attribute to priority. lines.append('#$ -p {}'.format(job_tmpl.priority)) if not job_tmpl.job_resource: raise ValueError('Job resources (as the tot_num_mpiprocs) are required for the SGE scheduler plugin') # Setting up the parallel environment lines.append('#$ -pe {} {}'. \ format(str(job_tmpl.job_resource.parallel_env), \ int(job_tmpl.job_resource.tot_num_mpiprocs))) if job_tmpl.max_wallclock_seconds is not None: try: tot_secs = int(job_tmpl.max_wallclock_seconds) if tot_secs <= 0: raise ValueError except ValueError: raise ValueError( 'max_wallclock_seconds must be ' "a positive integer (in seconds)! It is instead '{}'" ''.format((job_tmpl.max_wallclock_seconds)) ) hours = tot_secs // 3600 tot_minutes = tot_secs % 3600 minutes = tot_minutes // 60 seconds = tot_minutes % 60 lines.append('#$ -l h_rt={:02d}:{:02d}:{:02d}'.format(hours, minutes, seconds)) if job_tmpl.custom_scheduler_commands: lines.append(job_tmpl.custom_scheduler_commands) # TAKEN FROM PBSPRO: # Job environment variables are to be set on one single line. # This is a tough job due to the escaping of commas, etc. # moreover, I am having issues making it work. # Therefore, I assume that this is bash and export variables by # and. if job_tmpl.job_environment: lines.append(empty_line) lines.append('# ENVIRONMENT VARIABLES BEGIN ###') if not isinstance(job_tmpl.job_environment, dict): raise ValueError('If you provide job_environment, it must be a dictionary') for key, value in job_tmpl.job_environment.items(): lines.append('export {}={}'.format(key.strip(), escape_for_bash(value))) lines.append('# ENVIRONMENT VARIABLES END ###') lines.append(empty_line) return '\n'.join(lines)
[docs] def _get_submit_command(self, submit_script): """ Return the string to execute to submit a given script. Args: submit_script: the path of the submit script relative to the working directory. IMPORTANT: submit_script should be already escaped. """ # pylint: disable=too-many-statements,too-many-branches submit_command = 'qsub -terse {}'.format(submit_script) self.logger.info('submitting with: ' + submit_command) return submit_command
[docs] def _parse_joblist_output(self, retval, stdout, stderr): # pylint: disable=too-many-statements,too-many-branches if retval != 0: self.logger.error( 'Error in _parse_joblist_output: retval={}; ' 'stdout={}; stderr={}'.format(retval, stdout, stderr) ) raise SchedulerError('Error during joblist retrieval, retval={}'. \ format(retval)) if stderr.strip(): self.logger.warning( 'in _parse_joblist_output for {}: ' 'there was some text in stderr: {}'.format(str(self.transport), stderr) ) if stdout: try: xmldata = xml.dom.minidom.parseString(stdout) except xml.parsers.expat.ExpatError: self.logger.error('in sge._parse_joblist_output: xml parsing of stdout failed:' '{}'.format(stdout)) raise SchedulerParsingError('Error during joblist retrieval,' 'xml parsing of stdout failed') else: self.logger.error( 'Error in sge._parse_joblist_output: retval={}; ' 'stdout={}; stderr={}'.format(retval, stdout, stderr) ) raise SchedulerError('Error during joblist retrieval,' 'no stdout produced') try: first_child = xmldata.firstChild second_childs = first_child.childNodes tag_names_sec = [elem.tagName for elem in second_childs \ if elem.nodeType == 1] if 'queue_info' not in tag_names_sec: self.logger.error('Error in sge._parse_joblist_output: ' 'no queue_info: {}'. \ format(stdout)) raise SchedulerError if 'job_info' not in tag_names_sec: self.logger.error('Error in sge._parse_joblist_output: ' 'no job_info: {}'. \ format(stdout)) raise SchedulerError except SchedulerError: self.logger.error('Error in sge._parse_joblist_output: stdout={}' \ .format(stdout)) raise SchedulerError( 'Error during xml processing, of stdout:' "There is no 'job_info' or no 'queue_info'" 'element or there are no jobs!' ) # If something weird happens while firstChild, pop, etc: except Exception: self.logger.error('Error in sge._parse_joblist_output: stdout={}' \ .format(stdout)) raise SchedulerError('Error during xml processing, of stdout') jobs = list(first_child.getElementsByTagName('job_list')) # jobs = [i for i in jobinfo.getElementsByTagName('job_list')] # print [i[0].childNodes[0].data for i in job_numbers if i] joblist = [] for job in jobs: this_job = JobInfo() # In case the user needs more information the xml-data for # each job is stored: this_job.raw_data = job.toxml() try: job_element = job.getElementsByTagName('JB_job_number').pop(0) element_child = job_element.childNodes.pop(0) this_job.job_id = str(element_child.data).strip() if not this_job.job_id: raise SchedulerError except SchedulerError: self.logger.error('Error in sge._parse_joblist_output:' 'no job id is given, stdout={}' \ .format(stdout)) raise SchedulerError('Error in sge._parse_joblist_output:' 'no job id is given') except IndexError: self.logger.error("No 'job_number' given for job index {} in " 'job list, stdout={}'.format(jobs.index(job) \ , stdout)) raise IndexError('Error in sge._parse_joblist_output:' 'no job id is given') try: job_element = job.getElementsByTagName('state').pop(0) element_child = job_element.childNodes.pop(0) job_state_string = str(element_child.data).strip() try: this_job.job_state = _MAP_STATUS_SGE[job_state_string] except KeyError: self.logger.warning( "Unrecognized job_state '{}' for job " 'id {}'.format(job_state_string, this_job.job_id) ) this_job.job_state = JobState.UNDETERMINED except IndexError: self.logger.warning("No 'job_state' field for job id {} in" 'stdout={}'.format(this_job.job_id, stdout)) this_job.job_state = JobState.UNDETERMINED try: job_element = job.getElementsByTagName('JB_owner').pop(0) element_child = job_element.childNodes.pop(0) this_job.job_owner = str(element_child.data).strip() except IndexError: self.logger.warning("No 'job_owner' field for job id {}".format(this_job.job_id)) try: job_element = job.getElementsByTagName('JB_name').pop(0) element_child = job_element.childNodes.pop(0) this_job.title = str(element_child.data).strip() except IndexError: self.logger.warning("No 'title' field for job id {}".format(this_job.job_id)) try: job_element = job.getElementsByTagName('queue_name').pop(0) element_child = job_element.childNodes.pop(0) this_job.queue_name = str(element_child.data).strip() except IndexError: if this_job.job_state == JobState.RUNNING: self.logger.warning("No 'queue_name' field for job id {}".format(this_job.job_id)) try: job_element = job.getElementsByTagName('JB_submission_time').pop(0) element_child = job_element.childNodes.pop(0) time_string = str(element_child.data).strip() try: this_job.submission_time = self._parse_time_string(time_string) except ValueError: self.logger.warning( "Error parsing 'JB_submission_time' " "for job id {} ('{}')".format(this_job.job_id, time_string) ) except IndexError: try: job_element = job.getElementsByTagName('JAT_start_time').pop(0) element_child = job_element.childNodes.pop(0) time_string = str(element_child.data).strip() try: this_job.dispatch_time = self._parse_time_string(time_string) except ValueError: self.logger.warning( "Error parsing 'JAT_start_time'" "for job id {} ('{}')".format(this_job.job_id, time_string) ) except IndexError: self.logger.warning( "No 'JB_submission_time' and no " "'JAT_start_time' field for job " 'id {}'.format(this_job.job_id) ) # There is also cpu_usage, mem_usage, io_usage information available: if this_job.job_state == JobState.RUNNING: try: job_element = job.getElementsByTagName('slots').pop(0) element_child = job_element.childNodes.pop(0) this_job.num_mpiprocs = str(element_child.data).strip() except IndexError: self.logger.warning("No 'slots' field for job id {}".format(this_job.job_id)) joblist.append(this_job) # self.logger.debug("joblist final: {}".format(joblist)) return joblist
[docs] def _parse_submit_output(self, retval, stdout, stderr): """ Parse the output of the submit command, as returned by executing the command returned by _get_submit_command command. To be implemented by the plugin. Return a string with the JobID. """ if retval != 0: self.logger.error( 'Error in _parse_submit_output: retval={}; ' 'stdout={}; stderr={}'.format(retval, stdout, stderr) ) raise SchedulerError( 'Error during submission, retval={}\n' 'stdout={}\nstderr={}'.format(retval, stdout, stderr) ) if stderr.strip(): self.logger.warning( 'in _parse_submit_output for {}: ' 'there was some text in stderr: {}'.format(str(self.transport), stderr) ) return stdout.strip()
[docs] def _parse_time_string(self, string, fmt='%Y-%m-%dT%H:%M:%S'): """ Parse a time string in the format returned from qstat -xml -ext and returns a datetime object. Example format: 2013-06-13T11:53:11 """ import time import datetime try: time_struct = time.strptime(string, fmt) except Exception as exc: self.logger.debug('Unable to parse time string {}, the message was {}'.format(string, exc)) raise ValueError('Problem parsing the time string.') # I convert from a time_struct to a datetime object going through # the seconds since epoch, as suggested on stackoverflow: # http://stackoverflow.com/questions/1697815 return datetime.datetime.fromtimestamp(time.mktime(time_struct))
[docs] def _get_kill_command(self, jobid): """ Return the command to kill the job with specified jobid. """ submit_command = 'qdel {}'.format(jobid) self.logger.info('killing job {}'.format(jobid)) return submit_command
[docs] def _parse_kill_output(self, retval, stdout, stderr): """ Parse the output of the kill command. To be implemented by the plugin. :return: True if everything seems ok, False otherwise. """ if retval != 0: self.logger.error( 'Error in _parse_kill_output: retval={}; ' 'stdout={}; stderr={}'.format(retval, stdout, stderr) ) return False if stderr.strip(): self.logger.warning( 'in _parse_kill_output for {}: ' 'there was some text in stderr: {}'.format(str(self.transport), stderr) ) if stdout.strip(): self.logger.info( 'in _parse_kill_output for {}: ' 'there was some text in stdout: {}'.format(str(self.transport), stdout) ) return True