Merge "Clean up job/jobboard code"

This commit is contained in:
Jenkins
2013-09-24 04:02:20 +00:00
committed by Gerrit Code Review
2 changed files with 96 additions and 48 deletions

View File

@@ -3,6 +3,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -16,44 +17,70 @@
# License for the specific language governing permissions and limitations
# under the License.
# from taskflow.backends import api as b_api
import threading
from taskflow import decorators
from taskflow.openstack.common import uuidutils
class Job(object):
"""A job is a higher level abstraction over a set of flows as well as the
*ownership* of those flows, it is the highest piece of work that can be
owned by an entity performing those flows.
Only one entity will be operating on the flows contained in a job at a
given time (for the foreseeable future).
It is the object that should be transferred to another entity on failure of
so that the contained flows ownership can be transferred to the secondary
entity for resumption/continuation/reverting.
"""
def __init__(self, name, uuid=None):
if uuid:
self._uuid = uuid
else:
self._uuid = uuidutils.generate_uuid()
self._name = name
self._lock = threading.RLock()
self._flows = []
self.owner = None
self.state = None
self._flows = []
self.logbook = None
self.book = None
def add_flow(self, wf):
self._flows.append(wf)
@decorators.locked
def add(self, *flows):
self._flows.extend(flows)
def remove_flow(self, wf):
self._flows = [f for f in self._flows
if f.uuid != wf.uuid]
@decorators.locked
def remove(self, flow):
j = -1
for i, f in enumerate(self._flows):
if f.uuid == flow.uuid:
j = i
break
if j == -1:
raise ValueError("Could not find %r to remove" % (flow))
self._flows.pop(j)
def __contains__(self, wf):
for self_wf in self.flows:
if self_wf.flow_id == wf.flow_id:
def __contains__(self, flow):
for f in self:
if f.uuid == flow.uuid:
return True
return False
@property
def uuid(self):
"""The uuid of this job"""
return self._uuid
@property
def name(self):
"""The non-uniquely identifying name of this job"""
return self._name
@property
def flows(self):
return self._flows
def __iter__(self):
# Don't iterate while holding the lock
with self._lock:
flows = list(self._flows)
for f in flows:
yield f

View File

@@ -3,6 +3,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -16,47 +17,67 @@
# License for the specific language governing permissions and limitations
# under the License.
# from taskflow.backends import api as b_api
from taskflow.openstack.common import uuidutils
# TODO(kchenweijie): THIS ENTIRE CLASS
import abc
class JobBoard(object):
def __init__(self, name, jb_id=None):
if jb_id:
self._uuid = jb_id
else:
self._uuid = uuidutils.generate_uuid()
"""A jobboard is an abstract representation of a place where jobs
can be posted, reposted, claimed and transferred. There can be multiple
implementations of this job board, depending on the desired semantics and
capabilities of the underlying jobboard implementation.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, name):
self._name = name
self._jobs = []
def add_job(self, job):
self._jobs.append(job)
def get_job(self, job_id):
return self._jobs[job_id]
def remove_job(self, job):
self._jobs = [j for j in self._jobs
if j.uuid != job.uuid]
def __contains__(self, job):
for self_job in self.jobs:
if self_job.job_id == job.job_id:
return True
return False
@property
def name(self):
"""The non-uniquely identifying name of this jobboard"""
return self._name
@property
def uuid(self):
return self._uuid
@abc.abstractmethod
def consume(self, job):
"""Permanently (and atomically) removes a job from the jobboard,
signaling that this job has been completed by the entity assigned
to that job.
@property
def jobs(self):
return self._jobs
Only the entity that has claimed that job is able to consume a job.
A job that has been consumed can not be reclaimed or reposted by
another entity (job postings are immutable). Any entity consuming
a unclaimed job (or a job they do not own) will cause an exception.
"""
@abc.abstractmethod
def post(self, job):
"""Atomically posts a given job to the jobboard, allowing others to
attempt to claim that job (and subsequently work on that job).
Once a job has been posted it can only be removed by consuming that
job (after that job is claimed). Any entity can post or propose jobs
to the jobboard (in the future this may be restricted).
"""
@abc.abstractmethod
def claim(self, job, who):
"""Atomically attempts to claim the given job for the entity and either
succeeds or fails at claiming by throwing corresponding exceptions.
If a job is claimed it is expected that the entity that claims that job
will at sometime in the future work on that jobs flows and either fail
at completing them (resulting in a reposting) or consume that job from
the jobboard (signaling its completion).
"""
@abc.abstractmethod
def repost(self, job):
"""Atomically reposts the given job on the jobboard, allowing that job
to be reclaimed by others. This would typically occur if the entity
that has claimed the job has failed or is unable to complete the job
or jobs it has claimed.
Only the entity that has claimed that job can repost a job. Any entity
reposting a unclaimed job (or a job they do not own) will cause an
exception.
"""