from __future__ import absolute_import
from enum import Enum
import sys
import traceback
try:
import tblib
_HAS_TBLIB = True
except ImportError:
_HAS_TBLIB = False
from tornado.gen import coroutine, Return
import yaml
from . import futures
from .base import state_machine
from . import persistence
from .persistence import auto_persist
from .lang import NULL
from . import utils
from . import exceptions
__all__ = [
'ProcessState',
'Created',
'Running',
'Waiting',
'Finished',
'Excepted',
'Killed',
# Commands
'Kill',
'Stop',
'Wait',
'Continue',
'Interruption',
'KillInterruption',
'PauseInterruption',
]
class Interruption(Exception):
pass
class KillInterruption(Interruption):
pass
class PauseInterruption(Interruption):
pass
# region Commands
class Command(persistence.Savable):
pass
@auto_persist('msg')
class Kill(Command):
def __init__(self, msg=None):
self.msg = msg
class Pause(Command):
pass
@auto_persist('msg', 'data')
class Wait(Command):
def __init__(self, continue_fn=None, msg=None, data=None):
self.continue_fn = continue_fn
self.msg = msg
self.data = data
@auto_persist('result')
class Stop(Command):
def __init__(self, result, successful):
self.result = result
self.successful = successful
@auto_persist('args', 'kwargs')
class Continue(Command):
CONTINUE_FN = 'continue_fn'
def __init__(self, continue_fn, *args, **kwargs):
self.continue_fn = continue_fn
self.args = args
self.kwargs = kwargs
def save_instance_state(self, out_state, save_context):
super(Continue, self).save_instance_state(out_state, save_context)
out_state[self.CONTINUE_FN] = self.continue_fn.__name__
def load_instance_state(self, saved_state, load_context):
super(Continue, self).load_instance_state(saved_state, load_context)
try:
self.continue_fn = utils.load_function(saved_state[self.CONTINUE_FN])
except ValueError:
process = load_context.process
self.continue_fn = getattr(process, saved_state[self.CONTINUE_FN])
# endregion
# region States
[docs]class ProcessState(Enum):
"""
The possible states that a :class:`Process` can be in.
"""
CREATED = 'created'
RUNNING = 'running'
WAITING = 'waiting'
FINISHED = 'finished'
EXCEPTED = 'excepted'
KILLED = 'killed'
@auto_persist('in_state')
class State(state_machine.State, persistence.Savable):
@property
def process(self):
"""
:return: The process
:rtype: :class:`ProcessStateMachine`
"""
return self.state_machine
def load_instance_state(self, saved_state, load_context):
super(State, self).load_instance_state(saved_state, load_context)
self.state_machine = load_context.process
def interrupt(self, reason):
return False
@auto_persist('args', 'kwargs')
class Created(State):
LABEL = ProcessState.CREATED
ALLOWED = {ProcessState.RUNNING, ProcessState.KILLED, ProcessState.EXCEPTED}
RUN_FN = 'run_fn'
def __init__(self, process, run_fn, *args, **kwargs):
super(Created, self).__init__(process)
assert run_fn is not None
self.run_fn = run_fn
self.args = args
self.kwargs = kwargs
def save_instance_state(self, out_state, save_context):
super(Created, self).save_instance_state(out_state, save_context)
out_state[self.RUN_FN] = self.run_fn.__name__
def load_instance_state(self, saved_state, load_context):
super(Created, self).load_instance_state(saved_state, load_context)
self.run_fn = getattr(self.process, saved_state[self.RUN_FN])
def execute(self):
return self.create_state(ProcessState.RUNNING, self.run_fn, *self.args, **self.kwargs)
@auto_persist('args', 'kwargs')
class Running(State):
LABEL = ProcessState.RUNNING
ALLOWED = {
ProcessState.RUNNING, ProcessState.WAITING, ProcessState.FINISHED, ProcessState.KILLED, ProcessState.EXCEPTED
}
RUN_FN = 'run_fn' # The key used to store the function to run
COMMAND = 'command' # The key used to store an upcoming command
# Class level defaults
_command = None
_running = False
_run_handle = None
def __init__(self, process, run_fn, *args, **kwargs):
super(Running, self).__init__(process)
assert run_fn is not None
self.run_fn = run_fn
self.args = args
self.kwargs = kwargs
self._run_handle = None
def save_instance_state(self, out_state, save_context):
super(Running, self).save_instance_state(out_state, save_context)
out_state[self.RUN_FN] = self.run_fn.__name__
if self._command is not None:
out_state[self.COMMAND] = self._command.save()
def load_instance_state(self, saved_state, load_context):
super(Running, self).load_instance_state(saved_state, load_context)
self.run_fn = getattr(self.process, saved_state[self.RUN_FN])
if self.COMMAND in saved_state:
self._command = persistence.Savable.load(saved_state[self.COMMAND], load_context)
def interrupt(self, reason):
return False
@coroutine
def execute(self):
if self._command is not None:
command = self._command
else:
try:
try:
self._running = True
result = self.run_fn(*self.args, **self.kwargs)
finally:
self._running = False
except Interruption:
# Let this bubble up to the caller
raise
except Exception:
excepted = self.create_state(ProcessState.EXCEPTED, *sys.exc_info()[1:])
raise Return(excepted)
else:
if not isinstance(result, Command):
if isinstance(result, exceptions.UnsuccessfulResult):
result = Stop(result.result, False)
else:
# Got passed a basic return type
result = Stop(result, True)
command = result
next_state = self._action_command(command)
raise Return(next_state)
def _action_command(self, command):
if isinstance(command, Kill):
return self.create_state(ProcessState.FINISHED, command.result, command.successful)
# elif isinstance(command, Pause):
# self.pause()
elif isinstance(command, Stop):
return self.create_state(ProcessState.FINISHED, command.result, command.successful)
elif isinstance(command, Wait):
return self.create_state(ProcessState.WAITING, command.continue_fn, command.msg, command.data)
elif isinstance(command, Continue):
return self.create_state(ProcessState.RUNNING, command.continue_fn, *command.args)
else:
raise ValueError("Unrecognised command")
@auto_persist('msg', 'data')
class Waiting(State):
LABEL = ProcessState.WAITING
ALLOWED = {
ProcessState.RUNNING, ProcessState.WAITING, ProcessState.KILLED, ProcessState.EXCEPTED, ProcessState.FINISHED
}
DONE_CALLBACK = 'DONE_CALLBACK'
_interruption = None
def __str__(self):
state_info = super(Waiting, self).__str__()
if self.msg is not None:
state_info += " ({})".format(self.msg)
return state_info
def __init__(self, process, done_callback, msg=None, data=None):
super(Waiting, self).__init__(process)
self.done_callback = done_callback
self.msg = msg
self.data = data
self._waiting_future = futures.Future()
def save_instance_state(self, out_state, save_context):
super(Waiting, self).save_instance_state(out_state, save_context)
if self.done_callback is not None:
out_state[self.DONE_CALLBACK] = self.done_callback.__name__
def load_instance_state(self, saved_state, load_context):
super(Waiting, self).load_instance_state(saved_state, load_context)
callback_name = saved_state.get(self.DONE_CALLBACK, None)
if callback_name is not None:
self.done_callback = getattr(self.process, callback_name)
else:
self.done_callback = None
self._waiting_future = futures.Future()
def interrupt(self, reason):
# This will cause the future in execute() to raise the exception
self._waiting_future.set_exception(reason)
@coroutine
def execute(self):
try:
result = yield self._waiting_future
except Interruption:
# Deal with the interruption (by raising) but make sure our internal
# state is back to how it was before the interruption so that we can be
# re-executed
self._waiting_future = futures.Future()
raise
if result == NULL:
next_state = self.create_state(ProcessState.RUNNING, self.done_callback)
else:
next_state = self.create_state(ProcessState.RUNNING, self.done_callback, result)
raise Return(next_state)
def resume(self, value=NULL):
assert self._waiting_future is not None, "Not yet waiting"
self._waiting_future.set_result(value)
class Excepted(State):
LABEL = ProcessState.EXCEPTED
EXC_VALUE = 'ex_value'
TRACEBACK = 'traceback'
def __init__(self, process, exception, trace_back=None):
"""
:param process: The associated process
:param exception: The exception instance
:param trace_back: An optional exception traceback
"""
super(Excepted, self).__init__(process)
self.exception = exception
self.traceback = trace_back
def __str__(self):
return "{} ({})".format(
super(Excepted, self).__str__(),
traceback.format_exception_only(type(self.exception), self.exception)[0])
def save_instance_state(self, out_state, save_context):
super(Excepted, self).save_instance_state(out_state, save_context)
out_state[self.EXC_VALUE] = yaml.dump(self.exception)
if self.traceback is not None:
out_state[self.TRACEBACK] = "".join(traceback.format_tb(self.traceback))
def load_instance_state(self, saved_state, load_context):
super(Excepted, self).load_instance_state(saved_state, load_context)
self.exception = yaml.load(saved_state[self.EXC_VALUE], Loader=yaml.FullLoader)
if _HAS_TBLIB:
try:
self.traceback = \
tblib.Traceback.from_string(saved_state[self.TRACEBACK],
strict=False)
except KeyError:
self.traceback = None
else:
self.traceback = None
def get_exc_info(self):
"""
Recreate the exc_info tuple and return it
"""
return type(self.exception), self.exception, self.traceback
@auto_persist('result', 'successful')
class Finished(State):
LABEL = ProcessState.FINISHED
def __init__(self, process, result, successful):
super(Finished, self).__init__(process)
self.result = result
self.successful = successful
@auto_persist('msg')
class Killed(State):
LABEL = ProcessState.KILLED
def __init__(self, process, msg):
"""
:param process: The associated process
:param msg: Optional kill message
:type msg: str
"""
super(Killed, self).__init__(process)
self.msg = msg
# endregion