diff --git a/taskflow/job.py b/taskflow/job.py index 02e531ec..50a6307e 100644 --- a/taskflow/job.py +++ b/taskflow/job.py @@ -17,6 +17,7 @@ # under the License. import abc +import time import uuid from taskflow import exceptions as exc @@ -36,6 +37,13 @@ class Claimer(object): be claimed.""" raise NotImplementedError() + @abc.abstractmethod + def unclaim(self, job, owner): + """This method will attempt to unclaim said job and must + either succeed at this or throw an exception signaling the job can not + be claimed.""" + raise NotImplementedError() + class Job(object): """A job is connection to some set of work to be done by some agent. Basic @@ -55,10 +63,17 @@ class Job(object): self._id = str(uuid.uuid4().hex) self._state = states.UNCLAIMED + def __str__(self): + return "Job (%s, %s): %s" % (self.name, self.tracking_id, self.state) + @property def state(self): return self._state + @state.setter + def state(self, new_state): + self._change_state(new_state) + def _change_state(self, new_state): if self.state != new_state: self._state = new_state @@ -72,7 +87,7 @@ class Job(object): return self._logbook def claim(self, owner): - """ This can be used to attempt transition this job from unclaimed + """This can be used to attempt transition this job from unclaimed to claimed. This must be done in a way that likely uses some type of locking or @@ -83,15 +98,45 @@ class Job(object): raise exc.UnclaimableJobException("Unable to claim job when job is" " in state %s" % (self.state)) self._claimer.claim(self, owner) - self.owner = owner self._change_state(states.CLAIMED) + def unclaim(self): + """Atomically transitions this job from claimed to unclaimed.""" + if self.state == states.UNCLAIMED: + return + self._claimer.unclaim(self, self.owner) + self._change_state(states.UNCLAIMED) + def erase(self): """Erases any traces of this job from its associated resources.""" for b in self.posted_on: b.erase(self) self._catalog.erase(self) - self._logbook = None + if self._logbook is not None: + self._logbook.close() + self._logbook = None + if self.state != states.UNCLAIMED: + self._claimer.unclaim(self, self.owner) + + def await(self, timeout=None): + """Awaits until either the job fails or succeeds or the provided + timeout is reached.""" + if timeout is not None: + end_time = time.time() + max(0, timeout) + else: + end_time = None + # Use the same/similar scheme that the python condition class uses. + delay = 0.0005 + while self.state not in (states.FAILURE, states.SUCCESS): + time.sleep(delay) + if end_time is not None: + remaining = end_time - time.time() + if remaining <= 0: + return False + delay = min(delay * 2, remaining, 0.05) + else: + delay = min(delay * 2, 0.05) + return True @property def tracking_id(self):