# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""
Base classes for PBSPro and PBS/Torque plugins.
"""
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import abc
import logging
import six
from aiida.common.escaping import escape_for_bash
from aiida.schedulers import Scheduler, SchedulerError, SchedulerParsingError
from aiida.schedulers.datastructures import (JobInfo, JobState, MachineInfo, NodeNumberJobResource)
_LOGGER = logging.getLogger(__name__)
# This maps PbsPro status letters to our own status list
## List of states from the man page of qstat
# B Array job has at least one subjob running.
# E Job is exiting after having run.
# F Job is finished.
# H Job is held.
# M Job was moved to another server.
# Q Job is queued.
# R Job is running.
# S Job is suspended.
# T Job is being moved to new location.
# U Cycle-harvesting job is suspended due to keyboard activity.
# W Job is waiting for its submitter-assigned start time to be reached.
# X Subjob has completed execution or has been deleted.
## These are instead the states from PBS/Torque v.2.4.16 (from Ubuntu)
# C - Job is completed after having run [different from above, but not clashing]
# E - Job is exiting after having run. [same as above]
# H - Job is held. [same as above]
# Q - job is queued, eligible to run or routed. [same as above]
# R - job is running. [same as above]
# T - job is being moved to new location. [same as above]
# W - job is waiting for its execution time
# (-a option) to be reached. [similar to above]
# S - (Unicos only) job is suspend. [as above]
_MAP_STATUS_PBS_COMMON = {
'B': JobState.RUNNING,
'E': JobState.RUNNING, # If exiting, for our purposes it is still running
'F': JobState.DONE,
'H': JobState.QUEUED_HELD,
'M': JobState.UNDETERMINED, # TODO: check if this is ok?
'Q': JobState.QUEUED,
'R': JobState.RUNNING,
'S': JobState.SUSPENDED,
'T': JobState.QUEUED, # We assume that from the AiiDA point of view
# it is still queued
'U': JobState.SUSPENDED,
'W': JobState.QUEUED,
'X': JobState.DONE,
'C': JobState.DONE, # This is the completed state of PBS/Torque
}
[docs]class PbsJobResource(NodeNumberJobResource):
"""
Base class for PBS job resources
"""
[docs] def __init__(self, **kwargs):
"""
It extends the base class init method and calculates the
num_cores_per_machine fields to pass to PBSlike schedulers.
Checks that num_cores_per_machine is a multiple of
num_cores_per_mpiproc and/or num_mpiprocs_per_machine
Check sequence
1. If num_cores_per_mpiproc and num_cores_per_machine both are
specified check whether it satisfies the check
2. If only num_cores_per_mpiproc is passed, calculate
num_cores_per_machine
3. If only num_cores_per_machine is passed, use it
"""
super(PbsJobResource, self).__init__(**kwargs)
value_error = ("num_cores_per_machine must be equal to "
"num_cores_per_mpiproc * num_mpiprocs_per_machine, "
"and in perticular it should be a multiple of "
"num_cores_per_mpiproc and/or num_mpiprocs_per_machine")
if self.num_cores_per_machine is not None and self.num_cores_per_mpiproc is not None:
if self.num_cores_per_machine != (self.num_cores_per_mpiproc * self.num_mpiprocs_per_machine):
# If user specify both values, check if specified
# values are correct
raise ValueError(value_error)
elif self.num_cores_per_mpiproc is not None:
if self.num_cores_per_mpiproc <= 0:
raise ValueError("num_cores_per_mpiproc must be >=1")
# calculate num_cores_per_machine
# In this plugin we never used num_cores_per_mpiproc so if it
# is not defined it is OK.
self.num_cores_per_machine = (self.num_cores_per_mpiproc * self.num_mpiprocs_per_machine)
[docs]@six.add_metaclass(abc.ABCMeta)
class PbsBaseClass(Scheduler):
"""
Base class with support for the PBSPro scheduler
(http://www.pbsworks.com/) and for PBS and Torque
(http://www.adaptivecomputing.com/products/open-source/torque/).
Only a few properties need to be redefined, see examples of the pbspro and
torque plugins
"""
# Query only by list of jobs and not by user
_features = {
'can_query_by_user': False,
}
# The class to be used for the job resource.
_job_resource_class = PbsJobResource
_map_status = _MAP_STATUS_PBS_COMMON
[docs] def _get_resource_lines(self, num_machines, num_mpiprocs_per_machine, num_cores_per_machine, max_memory_kb,
max_wallclock_seconds):
"""
Return a set a list of lines (possibly empty) with the header
lines relative to:
* num_machines
* num_mpiprocs_per_machine
* num_cores_per_machine
* max_memory_kb
* max_wallclock_seconds
This is done in an external function because it may change in
different subclasses.
"""
raise NotImplementedError("Implement the _get_resource_lines in each subclass!")
[docs] def _get_joblist_command(self, jobs=None, user=None):
"""
The command to report full information on existing jobs.
TODO: in the case of job arrays, decide what to do (i.e., if we want
to pass the -t options to list each subjob).
"""
from aiida.common.exceptions import FeatureNotAvailable
command = ['qstat', '-f']
if jobs and user:
raise FeatureNotAvailable("Cannot query by user and job(s) in PBS")
if user:
command.append('-u{}'.format(user))
if jobs:
if isinstance(jobs, six.string_types):
command.append('{}'.format(escape_for_bash(jobs)))
else:
try:
command.append('{}'.format(' '.join(escape_for_bash(j) for j in jobs)))
except TypeError:
raise TypeError("If provided, the 'jobs' variable must be a string or an iterable of strings")
comm = ' '.join(command)
_LOGGER.debug("qstat command: {}".format(comm))
return comm
[docs] def _get_detailed_jobinfo_command(self, jobid):
"""
Return the command to run to get the detailed information on a job,
even after the job has finished.
The output text is just retrieved, and returned for logging purposes.
"""
return "tracejob -v {}".format(escape_for_bash(jobid))
[docs] def _get_submit_command(self, submit_script):
"""
Return the string to execute to submit a given script.
Args:
submit_script: the path of the submit script relative to the working
directory.
IMPORTANT: submit_script should be already escaped.
"""
submit_command = 'qsub {}'.format(submit_script)
_LOGGER.info("submitting with: {}".format(submit_command))
return submit_command
[docs] def _parse_joblist_output(self, retval, stdout, stderr):
"""
Parse the queue output string, as returned by executing the
command returned by _get_joblist_command command (qstat -f).
Return a list of JobInfo objects, one of each job,
each relevant parameters implemented.
Note: depending on the scheduler configuration, finished jobs may
either appear here, or not.
This function will only return one element for each job find
in the qstat output; missing jobs (for whatever reason) simply
will not appear here.
"""
# I don't raise because if I pass a list of jobs, I get a non-zero status
# if one of the job is not in the list anymore
# retval should be zero
# if retval != 0:
# _LOGGER.warning("Error in _parse_joblist_output: retval={}; "
# "stdout={}; stderr={}".format(retval, stdout, stderr))
# issue a warning if there is any stderr output
# but I strip lines containing "Unknown Job Id", that happens
# also when I ask for a calculation that has finished
#
# I also strip for "Job has finished" because this happens for
# those schedulers configured to leave the job in the output
# of qstat for some time after job completion.
filtered_stderr = '\n'.join(
l for l in stderr.split('\n') if "Unknown Job Id" not in l and "Job has finished" not in l)
if filtered_stderr.strip():
_LOGGER.warning("Warning in _parse_joblist_output, non-empty "
"(filtered) stderr='{}'".format(filtered_stderr))
if retval != 0:
raise SchedulerError("Error during qstat parsing (_parse_joblist_output function)")
jobdata_raw = [] # will contain raw data parsed from qstat output
# Get raw data and split in lines
for line_num, line in enumerate(stdout.split('\n'), start=1):
# Each new job stanza starts with the string 'Job Id:': I
# create a new item in the jobdata_raw list
if line.startswith('Job Id:'):
jobdata_raw.append({'id': line.split(':', 1)[1].strip(), 'lines': [], 'warning_lines_idx': []})
# warning_lines_idx: lines that do not start either with
# tab or space
else:
if line.strip():
# This is a non-empty line, therefore it is an attribute
# of the last job found
if not jobdata_raw:
# The list is still empty! (This means that I found a
# non-empty line, before finding the first 'Job Id:'
# string: it is an error. However this may happen
# only before the first job.
raise SchedulerParsingError("I did not find the header for the first job")
# _LOGGER.warning("I found some text before the "
# "first job: {}".format(l))
else:
if line.startswith(' '):
# If it starts with a space, it is a new field
jobdata_raw[-1]['lines'].append(line)
elif line.startswith('\t'):
# If a line starts with a TAB,
# I append to the previous string
# stripping the TAB
if not jobdata_raw[-1]['lines']:
raise SchedulerParsingError("Line {} is the first line of the job, but it "
"starts with a TAB! ({})".format(line_num, line))
jobdata_raw[-1]['lines'][-1] += line[1:]
else:
# raise SchedulerParsingError(
# "Wrong starting character at line {}! ({})"
# "".format(line_num, l))
## For some reasons, the output of 'comment' and
## 'Variable_List', for instance, can have
## newlines if they are included... # I do a
## workaround
jobdata_raw[-1]['lines'][-1] += "\n{}".format(line)
jobdata_raw[-1]['warning_lines_idx'].append(len(jobdata_raw[-1]['lines']) - 1)
# Create dictionary and parse specific fields
job_list = []
for job in jobdata_raw:
this_job = JobInfo()
this_job.job_id = job['id']
lines_without_equals_sign = [i for i in job['lines'] if '=' not in i]
# There are lines without equals sign: this is bad
if lines_without_equals_sign:
# Should I only warn?
_LOGGER.error("There are lines without equals sign! {}" "".format(lines_without_equals_sign))
raise SchedulerParsingError("There are lines without equals sign.")
raw_data = {
i.split('=', 1)[0].strip().lower(): i.split('=', 1)[1].lstrip()
for i in job['lines']
if '=' in i
}
## I ignore the errors for the time being - this seems to be
## a problem if there are \n in the content of some variables?
## I consider this a workaround...
# for line_with_warning in set(job['warning_lines_idx']):
# if job['lines'][line_with_warning].split(
# '=',1)[0].strip().lower() != "comment":
# raise SchedulerParsingError(
# "Wrong starting character in one of the lines "
# "of job {}, and it's not a comment! ({})"
# "".format(this_job.job_id,
# job['lines'][line_with_warning]))
problematic_fields = []
for line_with_warning in set(job['warning_lines_idx']):
problematic_fields.append(job['lines'][line_with_warning].split('=', 1)[0].strip().lower())
if problematic_fields:
# These are the fields that contain unexpected newlines
raw_data['warning_fields_with_newlines'] = problematic_fields
# I believe that exit_status and terminating_signal cannot be
# retrieved from the qstat -f output.
# I wrap calls in try-except clauses to avoid errors if a field
# is missing
try:
this_job.title = raw_data['job_name']
except KeyError:
_LOGGER.debug("No 'job_name' field for job id {}".format(this_job.job_id))
try:
this_job.annotation = raw_data['comment']
except KeyError:
# Many jobs do not have a comment; I do not complain about it.
pass
# _LOGGER.debug("No 'comment' field for job id {}".format(
# this_job.job_id))
try:
job_state_string = raw_data['job_state']
try:
this_job.job_state = self._map_status[job_state_string]
except KeyError:
_LOGGER.warning("Unrecognized job_state '{}' for job "
"id {}".format(job_state_string, this_job.job_id))
this_job.job_state = JobState.UNDETERMINED
except KeyError:
_LOGGER.debug("No 'job_state' field for job id {}".format(this_job.job_id))
this_job.job_state = JobState.UNDETERMINED
try:
this_job.job_substate = raw_data['substate']
except KeyError:
_LOGGER.debug("No 'substate' field for job id {}".format(this_job.job_id))
try:
exec_hosts = raw_data['exec_host'].split('+')
except KeyError:
# No exec_host information found (it may be ok, if the job
# is not running)
pass
else:
# parse each host; syntax, from the man page:
# hosta/J1+hostb/J2*P+...
# where J1 and J2 are an index of the job
# on the named host and P is the number of
# processors allocated from that host to this job.
# P does not appear if it is 1.
try:
exec_host_list = []
for exec_host in exec_hosts:
node = MachineInfo()
node.name, data = exec_host.split('/')
data = data.split('*')
if len(data) == 1:
node.jobIndex = int(data[0])
node.num_cpus = 1
elif len(data) == 2:
node.jobIndex = int(data[0])
node.num_cpus = int(data[1])
else:
raise ValueError("Wrong number of pieces: {} "
"instead of 1 or 2 in exec_hosts: "
"{}".format(len(data), exec_hosts))
exec_host_list.append(node)
this_job.allocated_machines = exec_host_list
except Exception as exc:
_LOGGER.debug("Problem parsing the node names, I "
"got Exception {} with message {}; "
"exec_hosts was {}".format(str(type(exc)), exc, exec_hosts))
try:
# I strip the part after the @: is this always ok?
this_job.job_owner = raw_data['job_owner'].split('@')[0]
except KeyError:
_LOGGER.debug("No 'job_owner' field for job id {}".format(this_job.job_id))
try:
this_job.num_cpus = int(raw_data['resource_list.ncpus'])
# TODO: understand if this is the correct field also for
# multithreaded (OpenMP) jobs.
except KeyError:
_LOGGER.debug("No 'resource_list.ncpus' field for job id {}".format(this_job.job_id))
except ValueError:
_LOGGER.warning("'resource_list.ncpus' is not an integer "
"({}) for job id {}!".format(raw_data['resource_list.ncpus'], this_job.job_id))
try:
this_job.num_mpiprocs = int(raw_data['resource_list.mpiprocs'])
# TODO: understand if this is the correct field also for
# multithreaded (OpenMP) jobs.
except KeyError:
_LOGGER.debug("No 'resource_list.mpiprocs' field for job id {}".format(this_job.job_id))
except ValueError:
_LOGGER.warning("'resource_list.mpiprocs' is not an integer "
"({}) for job id {}!".format(raw_data['resource_list.mpiprocs'], this_job.job_id))
try:
this_job.num_machines = int(raw_data['resource_list.nodect'])
except KeyError:
_LOGGER.debug("No 'resource_list.nodect' field for job id {}".format(this_job.job_id))
except ValueError:
_LOGGER.warning("'resource_list.nodect' is not an integer "
"({}) for job id {}!".format(raw_data['resource_list.nodect'], this_job.job_id))
# Double check of redundant info
if (this_job.allocated_machines is not None and this_job.num_machines is not None):
if len(set(machine.name for machine in this_job.allocated_machines)) != this_job.num_machines:
_LOGGER.error("The length of the list of allocated "
"nodes ({}) is different from the "
"expected number of nodes ({})!".format(
len(this_job.allocated_machines), this_job.num_machines))
try:
this_job.queue_name = raw_data['queue']
except KeyError:
_LOGGER.debug("No 'queue' field for job id {}".format(this_job.job_id))
try:
this_job.RequestedWallclockTime = (self._convert_time(raw_data['resource_list.walltime']))
except KeyError:
_LOGGER.debug("No 'resource_list.walltime' field for job id {}".format(this_job.job_id))
except ValueError:
_LOGGER.warning("Error parsing 'resource_list.walltime' for job id {}".format(this_job.job_id))
try:
this_job.wallclock_time_seconds = (self._convert_time(raw_data['resources_used.walltime']))
except KeyError:
# May not have started yet
pass
except ValueError:
_LOGGER.warning("Error parsing 'resources_used.walltime' for job id {}".format(this_job.job_id))
try:
this_job.cpu_time = (self._convert_time(raw_data['resources_used.cput']))
except KeyError:
# May not have started yet
pass
except ValueError:
_LOGGER.warning("Error parsing 'resources_used.cput' for job id {}".format(this_job.job_id))
#
# ctime: The time that the job was created
# mtime: The time that the job was last modified, changed state,
# or changed locations.
# qtime: The time that the job entered the current queue
# stime: The time when the job started execution.
# etime: The time that the job became eligible to run, i.e. in a
# queued state while residing in an execution queue.
try:
this_job.submission_time = self._parse_time_string(raw_data['ctime'])
except KeyError:
_LOGGER.debug("No 'ctime' field for job id {}".format(this_job.job_id))
except ValueError:
_LOGGER.warning("Error parsing 'ctime' for job id {}".format(this_job.job_id))
try:
this_job.dispatch_time = self._parse_time_string(raw_data['stime'])
except KeyError:
# The job may not have been started yet
pass
except ValueError:
_LOGGER.warning("Error parsing 'stime' for job id {}".format(this_job.job_id))
# TODO: see if we want to set also finish_time for finished jobs,
# if there are any
# Everything goes here anyway for debugging purposes
this_job.raw_data = raw_data
# I append to the list of jobs to return
job_list.append(this_job)
return job_list
[docs] @staticmethod
def _convert_time(string):
"""
Convert a string in the format HH:MM:SS to a number of seconds.
"""
pieces = string.split(':')
if len(pieces) != 3:
_LOGGER.warning("Wrong number of pieces (expected 3) for time string {}".format(string))
raise ValueError("Wrong number of pieces for time string.")
try:
hours = int(pieces[0])
if hours < 0:
raise ValueError
except ValueError:
_LOGGER.warning("Not a valid number of hours: {}".format(pieces[0]))
raise ValueError("Not a valid number of hours.")
try:
mins = int(pieces[1])
if mins < 0:
raise ValueError
except ValueError:
_LOGGER.warning("Not a valid number of minutes: {}".format(pieces[1]))
raise ValueError("Not a valid number of minutes.")
try:
secs = int(pieces[2])
if secs < 0:
raise ValueError
except ValueError:
_LOGGER.warning("Not a valid number of seconds: {}".format(pieces[2]))
raise ValueError("Not a valid number of seconds.")
return hours * 3600 + mins * 60 + secs
[docs] @staticmethod
def _parse_time_string(string, fmt='%a %b %d %H:%M:%S %Y'):
"""
Parse a time string in the format returned from qstat -f and
returns a datetime object.
"""
import time
import datetime
try:
time_struct = time.strptime(string, fmt)
except Exception as exc:
_LOGGER.debug("Unable to parse time string {}, the message was {}".format(string, exc))
raise ValueError("Problem parsing the time string.")
# I convert from a time_struct to a datetime object going through
# the seconds since epoch, as suggested on stackoverflow:
# http://stackoverflow.com/questions/1697815
return datetime.datetime.fromtimestamp(time.mktime(time_struct))
[docs] def _parse_submit_output(self, retval, stdout, stderr):
"""
Parse the output of the submit command, as returned by executing the
command returned by _get_submit_command command.
To be implemented by the plugin.
Return a string with the JobID.
"""
if retval != 0:
_LOGGER.error("Error in _parse_submit_output: retval={}; "
"stdout={}; stderr={}".format(retval, stdout, stderr))
raise SchedulerError("Error during submission, retval={}\n"
"stdout={}\nstderr={}".format(retval, stdout, stderr))
if stderr.strip():
_LOGGER.warning("in _parse_submit_output there was some text in stderr: {}".format(stderr))
return stdout.strip()
[docs] def _get_kill_command(self, jobid):
"""
Return the command to kill the job with specified jobid.
"""
submit_command = 'qdel {}'.format(jobid)
_LOGGER.info("killing job {}".format(jobid))
return submit_command
[docs] def _parse_kill_output(self, retval, stdout, stderr):
"""
Parse the output of the kill command.
To be implemented by the plugin.
:return: True if everything seems ok, False otherwise.
"""
if retval != 0:
_LOGGER.error("Error in _parse_kill_output: retval={}; "
"stdout={}; stderr={}".format(retval, stdout, stderr))
return False
if stderr.strip():
_LOGGER.warning("in _parse_kill_output there was some text in stderr: {}".format(stderr))
if stdout.strip():
_LOGGER.warning("in _parse_kill_output there was some text in stdout: {}".format(stdout))
return True