From a401b8abe75ba9c0f3d690cb372fe6f2924aed0c Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 16 May 2013 12:21:28 -0700 Subject: [PATCH] Various reworkings. Add a function to release/unclaim a job on the claimer object and expose that functionality in the job object. Add an await() method on the job which will do basic spinning (later functionality can do more advanced types of waiting). Add a useful str() function to the job so that it can be easily printed with some useful info about it. --- taskflow/job.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/taskflow/job.py b/taskflow/job.py index 02e531ec..50a6307e 100644 --- a/taskflow/job.py +++ b/taskflow/job.py @@ -17,6 +17,7 @@ # under the License. import abc +import time import uuid from taskflow import exceptions as exc @@ -36,6 +37,13 @@ class Claimer(object): be claimed.""" raise NotImplementedError() + @abc.abstractmethod + def unclaim(self, job, owner): + """This method will attempt to unclaim said job and must + either succeed at this or throw an exception signaling the job can not + be claimed.""" + raise NotImplementedError() + class Job(object): """A job is connection to some set of work to be done by some agent. Basic @@ -55,10 +63,17 @@ class Job(object): self._id = str(uuid.uuid4().hex) self._state = states.UNCLAIMED + def __str__(self): + return "Job (%s, %s): %s" % (self.name, self.tracking_id, self.state) + @property def state(self): return self._state + @state.setter + def state(self, new_state): + self._change_state(new_state) + def _change_state(self, new_state): if self.state != new_state: self._state = new_state @@ -72,7 +87,7 @@ class Job(object): return self._logbook def claim(self, owner): - """ This can be used to attempt transition this job from unclaimed + """This can be used to attempt transition this job from unclaimed to claimed. This must be done in a way that likely uses some type of locking or @@ -83,15 +98,45 @@ class Job(object): raise exc.UnclaimableJobException("Unable to claim job when job is" " in state %s" % (self.state)) self._claimer.claim(self, owner) - self.owner = owner self._change_state(states.CLAIMED) + def unclaim(self): + """Atomically transitions this job from claimed to unclaimed.""" + if self.state == states.UNCLAIMED: + return + self._claimer.unclaim(self, self.owner) + self._change_state(states.UNCLAIMED) + def erase(self): """Erases any traces of this job from its associated resources.""" for b in self.posted_on: b.erase(self) self._catalog.erase(self) - self._logbook = None + if self._logbook is not None: + self._logbook.close() + self._logbook = None + if self.state != states.UNCLAIMED: + self._claimer.unclaim(self, self.owner) + + def await(self, timeout=None): + """Awaits until either the job fails or succeeds or the provided + timeout is reached.""" + if timeout is not None: + end_time = time.time() + max(0, timeout) + else: + end_time = None + # Use the same/similar scheme that the python condition class uses. + delay = 0.0005 + while self.state not in (states.FAILURE, states.SUCCESS): + time.sleep(delay) + if end_time is not None: + remaining = end_time - time.time() + if remaining <= 0: + return False + delay = min(delay * 2, remaining, 0.05) + else: + delay = min(delay * 2, 0.05) + return True @property def tracking_id(self):