# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""
Plugin for direct execution.
"""
import aiida.schedulers
from aiida.common.escaping import escape_for_bash
from aiida.schedulers import SchedulerError
from aiida.schedulers.datastructures import (JobInfo, JobState, NodeNumberJobResource)
## From the ps man page on Mac OS X 10.12
# state The state is given by a sequence of characters, for example,
# ``RWNA''. The first character indicates the run state of the
# process:
#
# I Marks a process that is idle (sleeping for longer than
# about 20 seconds).
# R Marks a runnable process.
# S Marks a process that is sleeping for less than about 20
# seconds.
# T Marks a stopped process.
# U Marks a process in uninterruptible wait.
# Z Marks a dead process (a ``zombie'').
# From the man page of ps on Ubuntu 14.04:
# Here are the different values that the s, stat and state output
# specifiers (header "STAT" or "S") will display to describe the state of
# a process:
#
# D uninterruptible sleep (usually IO)
# R running or runnable (on run queue)
# S interruptible sleep (waiting for an event to complete)
# T stopped, either by a job control signal or because it is
# being traced
# W paging (not valid since the 2.6.xx kernel)
# X dead (should never be seen)
# Z defunct ("zombie") process, terminated but not reaped by
# its parent
_MAP_STATUS_PS = {
'D': JobState.RUNNING,
'I': JobState.RUNNING,
'R': JobState.RUNNING,
'S': JobState.RUNNING,
'T': JobState.SUSPENDED,
'U': JobState.RUNNING,
'W': JobState.RUNNING,
'X': JobState.DONE,
'Z': JobState.DONE,
# Not sure about these three, I comment them out (they used to be in
# here, but they don't appear neither on ubuntu nor on Mac)
# 'F': JobState.DONE,
# 'H': JobState.QUEUED_HELD,
# 'Q': JobState.QUEUED,
}
[docs]class DirectJobResource(NodeNumberJobResource):
pass
[docs]class DirectScheduler(aiida.schedulers.Scheduler):
"""
Support for the direct execution bypassing schedulers.
"""
_logger = aiida.schedulers.Scheduler._logger.getChild('direct')
# Query only by list of jobs and not by user
_features = {
'can_query_by_user': True,
}
# The class to be used for the job resource.
_job_resource_class = DirectJobResource
[docs] def _get_joblist_command(self, jobs=None, user=None):
"""
The command to report full information on existing jobs.
TODO: in the case of job arrays, decide what to do (i.e., if we want
to pass the -t options to list each subjob).
"""
# Using subprocess.Popen with `start_new_session=True` (as done in both local and ssh transport) results in
# processes without a controlling terminal.
# The -x option tells ps to include processes which do not have a controlling terminal, which would not be
# listed otherwise (leading the direct scheduler to conclude that the process already completed).
command = 'ps -xo pid,stat,user,time'
if jobs:
if isinstance(jobs, str):
command += ' {}'.format(escape_for_bash(jobs))
else:
try:
command += ' {}'.format(' '.join(escape_for_bash(job) for job in jobs if job))
except TypeError:
raise TypeError("If provided, the 'jobs' variable must be a string or a list of strings")
command += '| tail -n +2' # -header, do not use 'h'
return command
[docs] def _get_submit_command(self, submit_script):
"""
Return the string to execute to submit a given script.
.. note:: One needs to redirect stdout and stderr to /dev/null
otherwise the daemon remains hanging for the script to run
:param submit_script: the path of the submit script relative to the working
directory.
IMPORTANT: submit_script should be already escaped.
"""
submit_command = 'bash -e {} > /dev/null 2>&1 & echo $!'.format(submit_script)
self.logger.info('submitting with: ' + submit_command)
return submit_command
[docs] def _parse_joblist_output(self, retval, stdout, stderr):
"""
Parse the queue output string, as returned by executing the
command returned by _get_joblist_command command (qstat -f).
Return a list of JobInfo objects, one of each job,
each relevant parameters implemented.
.. note:: depending on the scheduler configuration, finished jobs
may either appear here, or not.
This function will only return one element for each job find
in the qstat output; missing jobs (for whatever reason) simply
will not appear here.
"""
import re
filtered_stderr = '\n'.join(l for l in stderr.split('\n'))
if filtered_stderr.strip():
self.logger.warning(
'Warning in _parse_joblist_output, non-empty '
"(filtered) stderr='{}'".format(filtered_stderr)
)
if retval != 0:
raise SchedulerError('Error during direct execution parsing (_parse_joblist_output function)')
# Create dictionary and parse specific fields
job_list = []
for line in stdout.split('\n'):
if re.search(r'^\s*PID', line) or line == '':
# Skip the header if present
continue
line = re.sub(r'^\s+', '', line)
job = re.split(r'\s+', line)
this_job = JobInfo()
this_job.job_id = job[0]
if len(job) < 3:
raise SchedulerError(
'Unexpected output from the scheduler, '
"not enough fields in line '{}'".format(line)
)
try:
job_state_string = job[1][0] # I just check the first character
except IndexError:
self.logger.debug("No 'job_state' field for job id {}".format(this_job.job_id))
this_job.job_state = JobState.UNDETERMINED
else:
try:
this_job.job_state = \
_MAP_STATUS_PS[job_state_string]
except KeyError:
self.logger.warning(
"Unrecognized job_state '{}' for job "
'id {}'.format(job_state_string, this_job.job_id)
)
this_job.job_state = JobState.UNDETERMINED
try:
# I strip the part after the @: is this always ok?
this_job.job_owner = job[2]
except KeyError:
self.logger.debug("No 'job_owner' field for job id {}".format(this_job.job_id))
try:
this_job.wallclock_time_seconds = self._convert_time(job[3])
except KeyError:
# May not have started yet
pass
except ValueError:
self.logger.warning("Error parsing 'resources_used.walltime' for job id {}".format(this_job.job_id))
# I append to the list of jobs to return
job_list.append(this_job)
return job_list
[docs] def get_jobs(self, jobs=None, user=None, as_dict=False):
"""
Overrides original method from DirectScheduler in order to list
missing processes as DONE.
"""
job_stats = super().get_jobs(jobs=jobs, user=user, as_dict=as_dict)
found_jobs = []
# Get the list of known jobs
if as_dict:
found_jobs = job_stats.keys()
else:
found_jobs = [j.job_id for j in job_stats]
# Now check if there are any the user requested but were not found
not_found_jobs = list(set(jobs) - set(found_jobs)) if jobs else []
for job_id in not_found_jobs:
job = JobInfo()
job.job_id = job_id
job.job_state = JobState.DONE
# Owner and wallclock time is unknown
if as_dict:
job_stats[job_id] = job
else:
job_stats.append(job)
return job_stats
[docs] def _convert_time(self, string):
"""
Convert a string in the format HH:MM:SS to a number of seconds.
"""
import re
pieces = re.split('[:.]', string)
if len(pieces) != 3:
self.logger.warning('Wrong number of pieces (expected 3) for time string {}'.format(string))
raise ValueError('Wrong number of pieces for time string.')
days = 0
pieces_first = pieces[0].split('-')
if len(pieces_first) == 2:
days, pieces[0] = pieces_first
days = int(days)
try:
hours = int(pieces[0])
if hours < 0:
raise ValueError
except ValueError:
self.logger.warning('Not a valid number of hours: {}'.format(pieces[0]))
raise ValueError('Not a valid number of hours.')
try:
mins = int(pieces[1])
if mins < 0:
raise ValueError
except ValueError:
self.logger.warning('Not a valid number of minutes: {}'.format(pieces[1]))
raise ValueError('Not a valid number of minutes.')
try:
secs = int(pieces[2])
if secs < 0:
raise ValueError
except ValueError:
self.logger.warning('Not a valid number of seconds: {}'.format(pieces[2]))
raise ValueError('Not a valid number of seconds.')
return days * 86400 + hours * 3600 + mins * 60 + secs
[docs] def _parse_submit_output(self, retval, stdout, stderr):
"""
Parse the output of the submit command, as returned by executing the
command returned by _get_submit_command command.
To be implemented by the plugin.
Return a string with the JobID.
"""
if retval != 0:
self.logger.error(
'Error in _parse_submit_output: retval={}; '
'stdout={}; stderr={}'.format(retval, stdout, stderr)
)
raise SchedulerError(
'Error during submission, retval={}\n'
'stdout={}\nstderr={}'.format(retval, stdout, stderr)
)
if stderr.strip():
self.logger.warning(
'in _parse_submit_output for {}: '
'there was some text in stderr: {}'.format(str(self.transport), stderr)
)
if not stdout.strip():
self.logger.debug(
'Unable to get the PID: retval={}; '
'stdout={}; stderr={}'.format(retval, stdout, stderr)
)
raise SchedulerError(
'Unable to get the PID: retval={}; '
'stdout={}; stderr={}'.format(retval, stdout, stderr)
)
return stdout.strip()
[docs] def _get_kill_command(self, jobid):
"""
Return the command to kill the job with specified jobid.
"""
submit_command = 'kill {}'.format(jobid)
self.logger.info('killing job {}'.format(jobid))
return submit_command
[docs] def _parse_kill_output(self, retval, stdout, stderr):
"""
Parse the output of the kill command.
To be implemented by the plugin.
:return: True if everything seems ok, False otherwise.
"""
if retval != 0:
self.logger.error(
'Error in _parse_kill_output: retval={}; '
'stdout={}; stderr={}'.format(retval, stdout, stderr)
)
return False
if stderr.strip():
self.logger.warning(
'in _parse_kill_output for {}: '
'there was some text in stderr: {}'.format(str(self.transport), stderr)
)
if stdout.strip():
self.logger.warning(
'in _parse_kill_output for {}: '
'there was some text in stdout: {}'.format(str(self.transport), stdout)
)
return True