Various reworkings.

Add a function to release/unclaim a job on the claimer object and
expose that functionality in the job object. Add an await() method
on the job which will do basic spinning (later functionality can
do more advanced types of waiting). Add a useful str() function to
the job so that it can be easily printed with some useful info about
it.
This commit is contained in:
Joshua Harlow
2013-05-16 12:21:28 -07:00
parent 05af861b82
commit a401b8abe7

View File

@@ -17,6 +17,7 @@
# under the License.
import abc
import time
import uuid
from taskflow import exceptions as exc
@@ -36,6 +37,13 @@ class Claimer(object):
be claimed."""
raise NotImplementedError()
@abc.abstractmethod
def unclaim(self, job, owner):
"""This method will attempt to unclaim said job and must
either succeed at this or throw an exception signaling the job can not
be claimed."""
raise NotImplementedError()
class Job(object):
"""A job is connection to some set of work to be done by some agent. Basic
@@ -55,10 +63,17 @@ class Job(object):
self._id = str(uuid.uuid4().hex)
self._state = states.UNCLAIMED
def __str__(self):
return "Job (%s, %s): %s" % (self.name, self.tracking_id, self.state)
@property
def state(self):
return self._state
@state.setter
def state(self, new_state):
self._change_state(new_state)
def _change_state(self, new_state):
if self.state != new_state:
self._state = new_state
@@ -72,7 +87,7 @@ class Job(object):
return self._logbook
def claim(self, owner):
""" This can be used to attempt transition this job from unclaimed
"""This can be used to attempt transition this job from unclaimed
to claimed.
This must be done in a way that likely uses some type of locking or
@@ -83,15 +98,45 @@ class Job(object):
raise exc.UnclaimableJobException("Unable to claim job when job is"
" in state %s" % (self.state))
self._claimer.claim(self, owner)
self.owner = owner
self._change_state(states.CLAIMED)
def unclaim(self):
"""Atomically transitions this job from claimed to unclaimed."""
if self.state == states.UNCLAIMED:
return
self._claimer.unclaim(self, self.owner)
self._change_state(states.UNCLAIMED)
def erase(self):
"""Erases any traces of this job from its associated resources."""
for b in self.posted_on:
b.erase(self)
self._catalog.erase(self)
self._logbook = None
if self._logbook is not None:
self._logbook.close()
self._logbook = None
if self.state != states.UNCLAIMED:
self._claimer.unclaim(self, self.owner)
def await(self, timeout=None):
"""Awaits until either the job fails or succeeds or the provided
timeout is reached."""
if timeout is not None:
end_time = time.time() + max(0, timeout)
else:
end_time = None
# Use the same/similar scheme that the python condition class uses.
delay = 0.0005
while self.state not in (states.FAILURE, states.SUCCESS):
time.sleep(delay)
if end_time is not None:
remaining = end_time - time.time()
if remaining <= 0:
return False
delay = min(delay * 2, remaining, 0.05)
else:
delay = min(delay * 2, 0.05)
return True
@property
def tracking_id(self):