###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Base classes for PBSPro and PBS/Torque plugins."""
import logging
from aiida.common.escaping import escape_for_bash
from aiida.schedulers import SchedulerError, SchedulerParsingError
from aiida.schedulers.datastructures import JobInfo, JobState, MachineInfo, NodeNumberJobResource
from .bash import BashCliScheduler
_LOGGER = logging.getLogger(__name__)
# This maps PbsPro status letters to our own status list
# List of states from the man page of qstat
# B Array job has at least one subjob running.
# E Job is exiting after having run.
# F Job is finished.
# H Job is held.
# M Job was moved to another server.
# Q Job is queued.
# R Job is running.
# S Job is suspended.
# T Job is being moved to new location.
# U Cycle-harvesting job is suspended due to keyboard activity.
# W Job is waiting for its submitter-assigned start time to be reached.
# X Subjob has completed execution or has been deleted.
# These are instead the states from PBS/Torque v.2.4.16 (from Ubuntu)
# C - Job is completed after having run [different from above, but not clashing]
# E - Job is exiting after having run. [same as above]
# H - Job is held. [same as above]
# Q - job is queued, eligible to run or routed. [same as above]
# R - job is running. [same as above]
# T - job is being moved to new location. [same as above]
# W - job is waiting for its execution time
# (-a option) to be reached. [similar to above]
# S - (Unicos only) job is suspend. [as above]
_MAP_STATUS_PBS_COMMON = {
'B': JobState.RUNNING,
'E': JobState.RUNNING, # If exiting, for our purposes it is still running
'F': JobState.DONE,
'H': JobState.QUEUED_HELD,
'M': JobState.UNDETERMINED, # TODO: check if this is ok?
'Q': JobState.QUEUED,
'R': JobState.RUNNING,
'S': JobState.SUSPENDED,
'T': JobState.QUEUED, # We assume that from the AiiDA point of view
# it is still queued
'U': JobState.SUSPENDED,
'W': JobState.QUEUED,
'X': JobState.DONE,
'C': JobState.DONE, # This is the completed state of PBS/Torque
}
[docs]
class PbsJobResource(NodeNumberJobResource):
"""Class for PBS job resources."""
[docs]
@classmethod
def validate_resources(cls, **kwargs):
"""Validate the resources against the job resource class of this scheduler.
This extends the base class validator and calculates the `num_cores_per_machine` fields to pass to PBSlike
schedulers. Checks that `num_cores_per_machine` is a multiple of `num_cores_per_mpiproc` and/or
`num_mpiprocs_per_machine`.
:param kwargs: dictionary of values to define the job resources
:return: attribute dictionary with the parsed parameters populated
:raises ValueError: if the resources are invalid or incomplete
"""
resources = super().validate_resources(**kwargs)
if resources.num_cores_per_machine is not None and resources.num_cores_per_mpiproc is not None:
if resources.num_cores_per_machine != resources.num_cores_per_mpiproc * resources.num_mpiprocs_per_machine:
raise ValueError(
'`num_cores_per_machine` must be equal to `num_cores_per_mpiproc * num_mpiprocs_per_machine` and in'
' particular it should be a multiple of `num_cores_per_mpiproc` and/or `num_mpiprocs_per_machine`'
)
elif resources.num_cores_per_mpiproc is not None:
if resources.num_cores_per_mpiproc < 1:
raise ValueError('num_cores_per_mpiproc must be greater than or equal to one.')
# In this plugin we never used num_cores_per_mpiproc so if it is not defined it is OK.
resources.num_cores_per_machine = resources.num_cores_per_mpiproc * resources.num_mpiprocs_per_machine
return resources
[docs]
class PbsBaseClass(BashCliScheduler):
"""Base class with support for the PBSPro scheduler
(http://www.pbsworks.com/) and for PBS and Torque
(http://www.adaptivecomputing.com/products/open-source/torque/).
Only a few properties need to be redefined, see examples of the pbspro and
torque plugins
"""
# Query only by list of jobs and not by user
_features = {
'can_query_by_user': False,
}
# The class to be used for the job resource.
_job_resource_class = PbsJobResource
_map_status = _MAP_STATUS_PBS_COMMON
[docs]
def _get_resource_lines(
self, num_machines, num_mpiprocs_per_machine, num_cores_per_machine, max_memory_kb, max_wallclock_seconds
):
"""Return a set a list of lines (possibly empty) with the header
lines relative to:
* num_machines
* num_mpiprocs_per_machine
* num_cores_per_machine
* max_memory_kb
* max_wallclock_seconds
This is done in an external function because it may change in
different subclasses.
"""
raise NotImplementedError('Implement the _get_resource_lines in each subclass!')
[docs]
def _get_joblist_command(self, jobs=None, user=None):
"""The command to report full information on existing jobs.
TODO: in the case of job arrays, decide what to do (i.e., if we want
to pass the -t options to list each subjob).
"""
from aiida.common.exceptions import FeatureNotAvailable
command = ['qstat', '-f']
if jobs and user:
raise FeatureNotAvailable('Cannot query by user and job(s) in PBS')
if user:
command.append(f'-u{user}')
if jobs:
if isinstance(jobs, str):
command.append(f'{escape_for_bash(jobs)}')
else:
try:
command.append(f"{' '.join(escape_for_bash(j) for j in jobs)}")
except TypeError:
raise TypeError("If provided, the 'jobs' variable must be a string or an iterable of strings")
comm = ' '.join(command)
_LOGGER.debug(f'qstat command: {comm}')
return comm
[docs]
def _get_detailed_job_info_command(self, job_id):
"""Return the command to run to get the detailed information on a job,
even after the job has finished.
The output text is just retrieved, and returned for logging purposes.
"""
return f'tracejob -v {escape_for_bash(job_id)}'
[docs]
def _get_submit_command(self, submit_script):
"""Return the string to execute to submit a given script.
Args:
-----
submit_script: the path of the submit script relative to the working
directory.
IMPORTANT: submit_script should be already escaped.
"""
submit_command = f'qsub {submit_script}'
_LOGGER.info(f'submitting with: {submit_command}')
return submit_command
[docs]
def _parse_joblist_output(self, retval, stdout, stderr):
"""Parse the queue output string, as returned by executing the
command returned by _get_joblist_command command (qstat -f).
Return a list of JobInfo objects, one of each job,
each relevant parameters implemented.
Note: depending on the scheduler configuration, finished jobs may
either appear here, or not.
This function will only return one element for each job find
in the qstat output; missing jobs (for whatever reason) simply
will not appear here.
"""
# I don't raise because if I pass a list of jobs, I get a non-zero status
# if one of the job is not in the list anymore
# retval should be zero
# if retval != 0:
# _LOGGER.warning("Error in _parse_joblist_output: retval={}; "
# "stdout={}; stderr={}".format(retval, stdout, stderr))
# issue a warning if there is any stderr output
# but I strip lines containing "Unknown Job Id", that happens
# also when I ask for a calculation that has finished
#
# I also strip for "Job has finished" because this happens for
# those schedulers configured to leave the job in the output
# of qstat for some time after job completion.
filtered_stderr = '\n'.join(
line for line in stderr.split('\n') if 'Unknown Job Id' not in line and 'Job has finished' not in line
)
if filtered_stderr.strip():
_LOGGER.warning(f"Warning in _parse_joblist_output, non-empty (filtered) stderr='{filtered_stderr}'")
if retval != 0:
raise SchedulerError(f'Error during qstat parsing, retval={retval}\nstdout={stdout}\nstderr={stderr}')
jobdata_raw = [] # will contain raw data parsed from qstat output
# Get raw data and split in lines
for line_num, line in enumerate(stdout.split('\n'), start=1):
# Each new job stanza starts with the string 'Job Id:': I
# create a new item in the jobdata_raw list
if line.startswith('Job Id:'):
jobdata_raw.append({'id': line.split(':', 1)[1].strip(), 'lines': [], 'warning_lines_idx': []})
# warning_lines_idx: lines that do not start either with
# tab or space
elif line.strip():
# This is a non-empty line, therefore it is an attribute
# of the last job found
if not jobdata_raw:
# The list is still empty! (This means that I found a
# non-empty line, before finding the first 'Job Id:'
# string: it is an error. However this may happen
# only before the first job.
raise SchedulerParsingError('I did not find the header for the first job')
# _LOGGER.warning("I found some text before the "
# "first job: {}".format(l))
elif line.startswith(' '):
# If it starts with a space, it is a new field
jobdata_raw[-1]['lines'].append(line)
elif line.startswith('\t'):
# If a line starts with a TAB,
# I append to the previous string
# stripping the TAB
if not jobdata_raw[-1]['lines']:
raise SchedulerParsingError(
f'Line {line_num} is the first line of the job, but it starts with a TAB! ({line})'
)
jobdata_raw[-1]['lines'][-1] += line[1:]
else:
# raise SchedulerParsingError(
# "Wrong starting character at line {}! ({})"
# "".format(line_num, l))
## For some reasons, the output of 'comment' and
## 'Variable_List', for instance, can have
## newlines if they are included... # I do a
## workaround
jobdata_raw[-1]['lines'][-1] += f'\n{line}'
jobdata_raw[-1]['warning_lines_idx'].append(len(jobdata_raw[-1]['lines']) - 1)
# Create dictionary and parse specific fields
job_list = []
for job in jobdata_raw:
this_job = JobInfo()
this_job.job_id = job['id']
lines_without_equals_sign = [i for i in job['lines'] if '=' not in i]
# There are lines without equals sign: this is bad
if lines_without_equals_sign:
# Should I only warn?
_LOGGER.error(f'There are lines without equals sign! {lines_without_equals_sign}')
raise SchedulerParsingError('There are lines without equals sign.')
raw_data = {
i.split('=', 1)[0].strip().lower(): i.split('=', 1)[1].lstrip() for i in job['lines'] if '=' in i
}
## I ignore the errors for the time being - this seems to be
## a problem if there are \n in the content of some variables?
## I consider this a workaround...
# for line_with_warning in set(job['warning_lines_idx']):
# if job['lines'][line_with_warning].split(
# '=',1)[0].strip().lower() != "comment":
# raise SchedulerParsingError(
# "Wrong starting character in one of the lines "
# "of job {}, and it's not a comment! ({})"
# "".format(this_job.job_id,
# job['lines'][line_with_warning]))
problematic_fields = []
for line_with_warning in set(job['warning_lines_idx']):
problematic_fields.append(job['lines'][line_with_warning].split('=', 1)[0].strip().lower())
if problematic_fields:
# These are the fields that contain unexpected newlines
raw_data['warning_fields_with_newlines'] = problematic_fields
# I believe that exit_status and terminating_signal cannot be
# retrieved from the qstat -f output.
# I wrap calls in try-except clauses to avoid errors if a field
# is missing
try:
this_job.title = raw_data['job_name']
except KeyError:
_LOGGER.debug(f"No 'job_name' field for job id {this_job.job_id}")
try:
this_job.annotation = raw_data['comment']
except KeyError:
# Many jobs do not have a comment; I do not complain about it.
pass
# _LOGGER.debug("No 'comment' field for job id {}".format(
# this_job.job_id))
try:
job_state_string = raw_data['job_state']
try:
this_job.job_state = self._map_status[job_state_string]
except KeyError:
_LOGGER.warning(f"Unrecognized job_state '{job_state_string}' for job id {this_job.job_id}")
this_job.job_state = JobState.UNDETERMINED
except KeyError:
_LOGGER.debug(f"No 'job_state' field for job id {this_job.job_id}")
this_job.job_state = JobState.UNDETERMINED
try:
this_job.job_substate = raw_data['substate']
except KeyError:
_LOGGER.debug(f"No 'substate' field for job id {this_job.job_id}")
try:
exec_hosts = raw_data['exec_host'].split('+')
except KeyError:
# No exec_host information found (it may be ok, if the job
# is not running)
pass
else:
# parse each host; syntax, from the man page:
# hosta/J1+hostb/J2*P+...
# where J1 and J2 are an index of the job
# on the named host and P is the number of
# processors allocated from that host to this job.
# P does not appear if it is 1.
try:
exec_host_list = []
for exec_host in exec_hosts:
node = MachineInfo()
node.name, data = exec_host.split('/')
data = data.split('*')
if len(data) == 1:
node.job_index = int(data[0])
node.num_cpus = 1
elif len(data) == 2:
node.job_index = int(data[0])
node.num_cpus = int(data[1])
else:
raise ValueError(
f'Wrong number of pieces: {len(data)} instead of 1 or 2 in exec_hosts: {exec_hosts}'
)
exec_host_list.append(node)
this_job.allocated_machines = exec_host_list
except Exception as exc:
_LOGGER.debug(
f'Problem parsing the node names, I got Exception {type(exc)!s} with message {exc}; '
f'exec_hosts was {exec_hosts}'
)
try:
# I strip the part after the @: is this always ok?
this_job.job_owner = raw_data['job_owner'].split('@')[0]
except KeyError:
_LOGGER.debug(f"No 'job_owner' field for job id {this_job.job_id}")
try:
this_job.num_cpus = int(raw_data['resource_list.ncpus'])
# TODO: understand if this is the correct field also for multithreaded (OpenMP) jobs.
except KeyError:
_LOGGER.debug(f"No 'resource_list.ncpus' field for job id {this_job.job_id}")
except ValueError:
_LOGGER.warning(
f"'resource_list.ncpus' is not an integer "
f"({raw_data['resource_list.ncpus']}) for job id {this_job.job_id}!"
)
try:
this_job.num_mpiprocs = int(raw_data['resource_list.mpiprocs'])
# TODO: understand if this is the correct field also for multithreaded (OpenMP) jobs.
except KeyError:
_LOGGER.debug(f"No 'resource_list.mpiprocs' field for job id {this_job.job_id}")
except ValueError:
_LOGGER.warning(
f"'resource_list.mpiprocs' is not an integer "
f"({raw_data['resource_list.mpiprocs']}) for job id {this_job.job_id}!"
)
try:
this_job.num_machines = int(raw_data['resource_list.nodect'])
except KeyError:
_LOGGER.debug(f"No 'resource_list.nodect' field for job id {this_job.job_id}")
except ValueError:
_LOGGER.warning(
f"'resource_list.nodect' is not an integer "
f"{raw_data['resource_list.nodect']}) for job id {this_job.job_id}!"
)
# Double check of redundant info
if this_job.allocated_machines is not None and this_job.num_machines is not None:
if len(set(machine.name for machine in this_job.allocated_machines)) != this_job.num_machines:
_LOGGER.error(
f'The length of the list of allocated nodes ({len(this_job.allocated_machines)}) is different '
f'from the expected number of nodes ({this_job.num_machines})!'
)
try:
this_job.queue_name = raw_data['queue']
except KeyError:
_LOGGER.debug(f"No 'queue' field for job id {this_job.job_id}")
try:
this_job.requested_wallclock_time = self._convert_time(raw_data['resource_list.walltime'])
except KeyError:
_LOGGER.debug(f"No 'resource_list.walltime' field for job id {this_job.job_id}")
except ValueError:
_LOGGER.warning(f"Error parsing 'resource_list.walltime' for job id {this_job.job_id}")
try:
this_job.wallclock_time_seconds = self._convert_time(raw_data['resources_used.walltime'])
except KeyError:
# May not have started yet
pass
except ValueError:
_LOGGER.warning(f"Error parsing 'resources_used.walltime' for job id {this_job.job_id}")
try:
this_job.cpu_time = self._convert_time(raw_data['resources_used.cput'])
except KeyError:
# May not have started yet
pass
except ValueError:
_LOGGER.warning(f"Error parsing 'resources_used.cput' for job id {this_job.job_id}")
#
# ctime: The time that the job was created
# mtime: The time that the job was last modified, changed state,
# or changed locations.
# qtime: The time that the job entered the current queue
# stime: The time when the job started execution.
# etime: The time that the job became eligible to run, i.e. in a
# queued state while residing in an execution queue.
try:
this_job.submission_time = self._parse_time_string(raw_data['ctime'])
except KeyError:
_LOGGER.debug(f"No 'ctime' field for job id {this_job.job_id}")
except ValueError:
_LOGGER.warning(f"Error parsing 'ctime' for job id {this_job.job_id}")
try:
this_job.dispatch_time = self._parse_time_string(raw_data['stime'])
except KeyError:
# The job may not have been started yet
pass
except ValueError:
_LOGGER.warning(f"Error parsing 'stime' for job id {this_job.job_id}")
# TODO: see if we want to set also finish_time for finished jobs, if there are any
# Everything goes here anyway for debugging purposes
this_job.raw_data = raw_data
# I append to the list of jobs to return
job_list.append(this_job)
return job_list
[docs]
@staticmethod
def _convert_time(string):
"""Convert a string in the format HH:MM:SS to a number of seconds."""
pieces = string.split(':')
if len(pieces) != 3:
_LOGGER.warning(f'Wrong number of pieces (expected 3) for time string {string}')
raise ValueError('Wrong number of pieces for time string.')
try:
hours = int(pieces[0])
if hours < 0:
raise ValueError
except ValueError:
_LOGGER.warning(f'Not a valid number of hours: {pieces[0]}')
raise ValueError('Not a valid number of hours.')
try:
mins = int(pieces[1])
if mins < 0:
raise ValueError
except ValueError:
_LOGGER.warning(f'Not a valid number of minutes: {pieces[1]}')
raise ValueError('Not a valid number of minutes.')
try:
secs = int(pieces[2])
if secs < 0:
raise ValueError
except ValueError:
_LOGGER.warning(f'Not a valid number of seconds: {pieces[2]}')
raise ValueError('Not a valid number of seconds.')
return hours * 3600 + mins * 60 + secs
[docs]
@staticmethod
def _parse_time_string(string, fmt='%a %b %d %H:%M:%S %Y'):
"""Parse a time string in the format returned from qstat -f and
returns a datetime object.
"""
import datetime
import time
try:
time_struct = time.strptime(string, fmt)
except Exception as exc:
_LOGGER.debug(f'Unable to parse time string {string}, the message was {exc}')
raise ValueError('Problem parsing the time string.')
# I convert from a time_struct to a datetime object going through
# the seconds since epoch, as suggested on stackoverflow:
# http://stackoverflow.com/questions/1697815
return datetime.datetime.fromtimestamp(time.mktime(time_struct))
[docs]
def _parse_submit_output(self, retval, stdout, stderr):
"""Parse the output of the submit command, as returned by executing the
command returned by _get_submit_command command.
To be implemented by the plugin.
Return a string with the JobID.
"""
if retval != 0:
_LOGGER.error(f'Error in _parse_submit_output: retval={retval}; stdout={stdout}; stderr={stderr}')
raise SchedulerError(f'Error during submission, retval={retval}; stdout={stdout}; stderr={stderr}')
if stderr.strip():
_LOGGER.warning(f'in _parse_submit_output there was some text in stderr: {stderr}')
return stdout.strip()
[docs]
def _get_kill_command(self, jobid):
"""Return the command to kill the job with specified jobid."""
submit_command = f'qdel {jobid}'
_LOGGER.info(f'killing job {jobid}')
return submit_command
[docs]
def _parse_kill_output(self, retval, stdout, stderr):
"""Parse the output of the kill command.
To be implemented by the plugin.
:return: True if everything seems ok, False otherwise.
"""
if retval != 0:
_LOGGER.error(f'Error in _parse_kill_output: retval={retval}; stdout={stdout}; stderr={stderr}')
return False
if stderr.strip():
_LOGGER.warning(f'in _parse_kill_output there was some text in stderr: {stderr}')
if stdout.strip():
_LOGGER.warning(f'in _parse_kill_output there was some text in stdout: {stdout}')
return True