Merge "Add rundimentary and/or non-optimized job priorities"

This commit is contained in:
Jenkins
2016-04-28 20:58:49 +00:00
committed by Gerrit Code Review
4 changed files with 210 additions and 70 deletions

View File

@@ -24,6 +24,7 @@ import time
import fasteners
import msgpack
from oslo_serialization import msgpackutils
from oslo_utils import excutils
from oslo_utils import strutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
@@ -66,7 +67,8 @@ class RedisJob(base.Job):
def __init__(self, board, name, sequence, key,
uuid=None, details=None,
created_on=None, backend=None,
book=None, book_data=None):
book=None, book_data=None,
priority=base.JobPriority.NORMAL):
super(RedisJob, self).__init__(board, name,
uuid=uuid, details=details,
backend=backend,
@@ -78,12 +80,17 @@ class RedisJob(base.Job):
self._key = key
self._last_modified_key = board.join(key + board.LAST_MODIFIED_POSTFIX)
self._owner_key = board.join(key + board.OWNED_POSTFIX)
self._priority = priority
@property
def key(self):
"""Key (in board listings/trash hash) the job data is stored under."""
return self._key
@property
def priority(self):
return self._priority
@property
def last_modified_key(self):
"""Key the job last modified data is stored under."""
@@ -131,21 +138,26 @@ class RedisJob(base.Job):
if not isinstance(other, RedisJob):
return NotImplemented
if self.board.listings_key == other.board.listings_key:
if self.created_on == other.created_on:
if self.priority == other.priority:
return self.sequence < other.sequence
else:
return self.created_on < other.created_on
ordered = base.JobPriority.reorder(
(self.priority, self), (other.priority, other))
if ordered[0] is self:
return False
return True
else:
# Different jobboards with different listing keys...
return self.board.listings_key < other.board.listings_key
def __eq__(self, other):
if not isinstance(other, RedisJob):
return NotImplemented
return ((self.board.listings_key, self.created_on, self.sequence) ==
(other.board.listings_key, other.created_on, other.sequence))
return ((self.board.listings_key, self.priority, self.sequence) ==
(other.board.listings_key, other.priority, other.sequence))
def __hash__(self):
return hash((self.board.listings_key, self.created_on, self.sequence))
return hash((self.board.listings_key, self.priority, self.sequence))
@property
def created_on(self):
@@ -202,13 +214,6 @@ class RedisJob(base.Job):
listings_key, owner_key,
value_from_callable=True)
def __str__(self):
"""Pretty formats the job into something *more* meaningful."""
tpl = "%s: %s (uuid=%s, owner_key=%s, sequence=%s, details=%s)"
return tpl % (type(self).__name__,
self.name, self.uuid, self.owner_key,
self.sequence, self.details)
class RedisJobBoard(base.JobBoard):
"""A jobboard backed by `redis`_.
@@ -727,11 +732,14 @@ return cmsgpack.pack(result)
raw_owner = self._client.get(owner_key)
return self._decode_owner(raw_owner)
def post(self, name, book=None, details=None):
def post(self, name, book=None, details=None,
priority=base.JobPriority.NORMAL):
job_uuid = uuidutils.generate_uuid()
job_priority = base.JobPriority.convert(priority)
posting = base.format_posting(job_uuid, name,
created_on=timeutils.utcnow(),
book=book, details=details)
book=book, details=details,
priority=job_priority)
with _translate_failures():
sequence = self._client.incr(self.sequence_key)
posting.update({
@@ -751,7 +759,8 @@ return cmsgpack.pack(result)
uuid=job_uuid, details=details,
created_on=posting['created_on'],
book=book, book_data=posting.get('book'),
backend=self._persistence)
backend=self._persistence,
priority=job_priority)
def wait(self, timeout=None, initial_delay=0.005,
max_delay=1.0, sleep_func=time.sleep):
@@ -791,16 +800,32 @@ return cmsgpack.pack(result)
raw_postings = self._client.hgetall(self.listings_key)
postings = []
for raw_job_key, raw_posting in six.iteritems(raw_postings):
posting = self._loads(raw_posting)
details = posting.get('details', {})
job_uuid = posting['uuid']
job = RedisJob(self, posting['name'], posting['sequence'],
raw_job_key, uuid=job_uuid, details=details,
created_on=posting['created_on'],
book_data=posting.get('book'),
backend=self._persistence)
postings.append(job)
return sorted(postings)
try:
job_data = self._loads(raw_posting)
try:
job_priority = job_data['priority']
job_priority = base.JobPriority.convert(job_priority)
except KeyError:
job_priority = base.JobPriority.NORMAL
job_created_on = job_data['created_on']
job_uuid = job_data['uuid']
job_name = job_data['name']
job_sequence_id = job_data['sequence']
job_details = job_data.get('details', {})
except (ValueError, TypeError, KeyError):
with excutils.save_and_reraise_exception():
LOG.warn("Incorrectly formatted job data found at"
" key: %s[%s]", self.listings_key,
raw_job_key, exc_info=True)
else:
postings.append(RedisJob(self, job_name, job_sequence_id,
raw_job_key, uuid=job_uuid,
details=job_details,
created_on=job_created_on,
book_data=job_data.get('book'),
backend=self._persistence,
priority=job_priority))
return sorted(postings, reverse=True)
def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
return base.JobBoardIterator(

View File

@@ -47,7 +47,8 @@ class ZookeeperJob(base.Job):
def __init__(self, board, name, client, path,
uuid=None, details=None, book=None, book_data=None,
created_on=None, backend=None):
created_on=None, backend=None,
priority=base.JobPriority.NORMAL):
super(ZookeeperJob, self).__init__(board, name,
uuid=uuid, details=details,
backend=backend,
@@ -60,12 +61,17 @@ class ZookeeperJob(base.Job):
basename = k_paths.basename(self._path)
self._root = self._path[0:-len(basename)]
self._sequence = int(basename[len(board.JOB_PREFIX):])
self._priority = priority
@property
def lock_path(self):
"""Path the job lock/claim and owner znode is stored."""
return self._lock_path
@property
def priority(self):
return self._priority
@property
def path(self):
"""Path the job data znode is stored."""
@@ -175,15 +181,23 @@ class ZookeeperJob(base.Job):
if not isinstance(other, ZookeeperJob):
return NotImplemented
if self.root == other.root:
return self.sequence < other.sequence
if self.priority == other.priority:
return self.sequence < other.sequence
else:
ordered = base.JobPriority.reorder(
(self.priority, self), (other.priority, other))
if ordered[0] is self:
return False
return True
else:
# Different jobboards with different roots...
return self.root < other.root
def __eq__(self, other):
if not isinstance(other, ZookeeperJob):
return NotImplemented
return ((self.root, self.sequence) ==
(other.root, other.sequence))
return ((self.root, self.sequence, self.priority) ==
(other.root, other.sequence, other.priority))
def __hash__(self):
return hash(self.path)
@@ -323,11 +337,12 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
if ensure_fresh:
self._force_refresh()
with self._job_cond:
return sorted(six.itervalues(self._known_jobs))
return sorted(six.itervalues(self._known_jobs), reverse=True)
def _force_refresh(self):
try:
children = self._client.get_children(self.path)
maybe_children = self._client.get_children(self.path)
self._on_job_posting(maybe_children, delayed=False)
except self._client.handler.timeout_exception:
excp.raise_with_cause(excp.JobFailure,
"Refreshing failure, operation timed out")
@@ -339,14 +354,13 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
except k_exceptions.KazooException:
excp.raise_with_cause(excp.JobFailure,
"Refreshing failure, internal error")
else:
self._on_job_posting(children, delayed=False)
def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
board_removal_func = lambda job: self._remove_job(job.path)
return base.JobBoardIterator(
self, LOG, only_unclaimed=only_unclaimed,
ensure_fresh=ensure_fresh, board_fetch_func=self._fetch_jobs,
board_removal_func=lambda a_job: self._remove_job(a_job.path))
board_removal_func=board_removal_func)
def _remove_job(self, path):
if path not in self._known_jobs:
@@ -357,41 +371,53 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
LOG.debug("Removed job that was at path '%s'", path)
self._try_emit(base.REMOVAL, details={'job': job})
def _process_child(self, path, request):
def _process_child(self, path, request, quiet=True):
"""Receives the result of a child data fetch request."""
job = None
try:
raw_data, node_stat = request.get()
job_data = misc.decode_json(raw_data)
created_on = misc.millis_to_datetime(node_stat.ctime)
job_created_on = misc.millis_to_datetime(node_stat.ctime)
try:
job_priority = job_data['priority']
job_priority = base.JobPriority.convert(job_priority)
except KeyError:
job_priority = base.JobPriority.NORMAL
job_uuid = job_data['uuid']
job_name = job_data['name']
except (ValueError, TypeError, KeyError):
LOG.warn("Incorrectly formatted job data found at path: %s",
path, exc_info=True)
with excutils.save_and_reraise_exception(reraise=not quiet):
LOG.warn("Incorrectly formatted job data found at path: %s",
path, exc_info=True)
except self._client.handler.timeout_exception:
LOG.warn("Operation timed out fetching job data from path: %s",
path, exc_info=True)
with excutils.save_and_reraise_exception(reraise=not quiet):
LOG.warn("Operation timed out fetching job data from path: %s",
path, exc_info=True)
except k_exceptions.SessionExpiredError:
LOG.warn("Session expired fetching job data from path: %s", path,
exc_info=True)
with excutils.save_and_reraise_exception(reraise=not quiet):
LOG.warn("Session expired fetching job data from path: %s",
path, exc_info=True)
except k_exceptions.NoNodeError:
LOG.debug("No job node found at path: %s, it must have"
" disappeared or was removed", path)
except k_exceptions.KazooException:
LOG.warn("Internal error fetching job data from path: %s",
path, exc_info=True)
with excutils.save_and_reraise_exception(reraise=not quiet):
LOG.warn("Internal error fetching job data from path: %s",
path, exc_info=True)
else:
with self._job_cond:
# Now we can offically check if someone already placed this
# jobs information into the known job set (if it's already
# existing then just leave it alone).
if path not in self._known_jobs:
job = ZookeeperJob(self, job_data['name'],
job = ZookeeperJob(self, job_name,
self._client, path,
backend=self._persistence,
uuid=job_data['uuid'],
uuid=job_uuid,
book_data=job_data.get("book"),
details=job_data.get("details", {}),
created_on=created_on)
created_on=job_created_on,
priority=job_priority)
self._known_jobs[path] = job
self._job_cond.notify_all()
if job is not None:
@@ -442,15 +468,18 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
if delayed:
request.rawlink(functools.partial(self._process_child, path))
else:
self._process_child(path, request)
self._process_child(path, request, quiet=False)
def post(self, name, book=None, details=None):
def post(self, name, book=None, details=None,
priority=base.JobPriority.NORMAL):
# NOTE(harlowja): Jobs are not ephemeral, they will persist until they
# are consumed (this may change later, but seems safer to do this until
# further notice).
job_priority = base.JobPriority.convert(priority)
job_uuid = uuidutils.generate_uuid()
job_posting = base.format_posting(job_uuid, name,
book=book, details=details)
book=book, details=details,
priority=job_priority)
raw_job_posting = misc.binary_encode(jsonutils.dumps(job_posting))
with self._wrap(job_uuid, None,
fail_msg_tpl="Posting failure: %s",
@@ -462,7 +491,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
job = ZookeeperJob(self, name, self._client, job_path,
backend=self._persistence,
book=book, details=details, uuid=job_uuid,
book_data=job_posting.get('book'))
book_data=job_posting.get('book'),
priority=job_priority)
with self._job_cond:
self._known_jobs[job_path] = job
self._job_cond.notify_all()

View File

@@ -20,6 +20,7 @@ import collections
import contextlib
import time
import enum
from oslo_utils import timeutils
from oslo_utils import uuidutils
import six
@@ -30,15 +31,89 @@ from taskflow.types import notifier
from taskflow.utils import iter_utils
class JobPriority(enum.Enum):
"""Enum of job priorities (modeled after hadoop job priorities)."""
#: Extremely urgent job priority.
VERY_HIGH = 'VERY_HIGH'
#: Mildly urgent job priority.
HIGH = 'HIGH'
#: Default job priority.
NORMAL = 'NORMAL'
#: Not needed anytime soon job priority.
LOW = 'LOW'
#: Very much not needed anytime soon job priority.
VERY_LOW = 'VERY_LOW'
@classmethod
def convert(cls, value):
if isinstance(value, cls):
return value
try:
return cls(value.upper())
except (ValueError, AttributeError):
valids = [cls.VERY_HIGH, cls.HIGH, cls.NORMAL,
cls.LOW, cls.VERY_LOW]
valids = [p.value for p in valids]
raise ValueError("'%s' is not a valid priority, valid"
" priorities are %s" % (value, valids))
@classmethod
def reorder(cls, *values):
"""Reorders (priority, value) tuples -> priority ordered values."""
if len(values) == 0:
raise ValueError("At least one (priority, value) pair is"
" required")
elif len(values) == 1:
v1 = values[0]
# Even though this isn't used, we do the conversion because
# all the other branches in this function do it so we do it
# to be consistent (this also will raise on bad values, which
# we want to do)...
p1 = cls.convert(v1[0])
return v1[1]
else:
# Order very very much matters in this tuple...
priority_ordering = (cls.VERY_HIGH, cls.HIGH,
cls.NORMAL, cls.LOW, cls.VERY_LOW)
if len(values) == 2:
# It's common to use this in a 2 tuple situation, so
# make it avoid all the needed complexity that is done
# for greater than 2 tuples.
v1 = values[0]
v2 = values[1]
p1 = cls.convert(v1[0])
p2 = cls.convert(v2[0])
p1_i = priority_ordering.index(p1)
p2_i = priority_ordering.index(p2)
if p1_i <= p2_i:
return v1[1], v2[1]
else:
return v2[1], v1[1]
else:
buckets = collections.defaultdict(list)
for (p, v) in values:
p = cls.convert(p)
buckets[p].append(v)
values = []
for p in priority_ordering:
values.extend(buckets[p])
return tuple(values)
@six.add_metaclass(abc.ABCMeta)
class Job(object):
"""A abstraction that represents a named and trackable unit of work.
A job connects a logbook, a owner, last modified and created on dates and
any associated state that the job has. Since it is a connector to a
logbook, which are each associated with a set of factories that can create
set of flows, it is the current top-level container for a piece of work
that can be owned by an entity (typically that entity will read those
A job connects a logbook, a owner, a priority, last modified and created
on dates and any associated state that the job has. Since it is a connected
to a logbook, which are each associated with a set of factories that can
create set of flows, it is the current top-level container for a piece of
work that can be owned by an entity (typically that entity will read those
logbooks and run any contained flows).
Only one entity will be allowed to own and operate on the flows contained
@@ -84,7 +159,10 @@ class Job(object):
@abc.abstractproperty
def state(self):
"""Access the current state of this job."""
pass
@abc.abstractproperty
def priority(self):
"""The :py:class:`~.JobPriority` of this job."""
def wait(self, timeout=None,
delay=0.01, delay_multiplier=2.0, max_delay=60.0,
@@ -185,9 +263,10 @@ class Job(object):
def __str__(self):
"""Pretty formats the job into something *more* meaningful."""
return "%s: %s (uuid=%s, details=%s)" % (type(self).__name__,
self.name, self.uuid,
self.details)
cls_name = type(self).__name__
return "%s: %s (priority=%s, uuid=%s, details=%s)" % (
cls_name, self.name, self.priority,
self.uuid, self.details)
class JobBoardIterator(six.Iterator):
@@ -241,6 +320,9 @@ class JobBoardIterator(six.Iterator):
self._logger.warn("Failed determining the state of"
" job '%s'", maybe_job, exc_info=True)
except excp.NotFound:
# Attempt to clean this off the board now that we found
# it wasn't really there (this **must** gracefully handle
# removal already having happened).
if self._board_removal_func is not None:
self._board_removal_func(maybe_job)
return job
@@ -282,9 +364,9 @@ class JobBoard(object):
"""Returns an iterator of jobs that are currently on this board.
NOTE(harlowja): the ordering of this iteration should be by posting
order (oldest to newest) if possible, but it is left up to the backing
implementation to provide the order that best suits it (so don't depend
on it always being oldest to newest).
order (oldest to newest) with higher priority jobs
being provided before lower priority jobs, but it is left up to the
backing implementation to provide the order that best suits it..
NOTE(harlowja): the iterator that is returned may support other
attributes which can be used to further customize how iteration can
@@ -302,15 +384,16 @@ class JobBoard(object):
@abc.abstractmethod
def wait(self, timeout=None):
"""Waits a given amount of time for jobs to be posted.
"""Waits a given amount of time for **any** jobs to be posted.
When jobs are found then an iterator will be returned that can be used
to iterate over those jobs.
NOTE(harlowja): since a jobboard can be mutated on by multiple external
entities at the *same* time the iterator that can be returned *may*
still be empty due to other entities removing those jobs after the
iterator has been created (be aware of this when using it).
entities at the **same** time the iterator that can be
returned **may** still be empty due to other entities removing those
jobs after the iterator has been created (be aware of this when
using it).
:param timeout: float that indicates how long to wait for a job to
appear (if None then waits forever).
@@ -356,7 +439,7 @@ class JobBoard(object):
"""
@abc.abstractmethod
def post(self, name, book=None, details=None):
def post(self, name, book=None, details=None, priority=JobPriority.NORMAL):
"""Atomically creates and posts a job to the jobboard.
This posting allowing others to attempt to claim that job (and
@@ -488,10 +571,11 @@ def check_who(meth):
def format_posting(uuid, name, created_on=None, last_modified=None,
details=None, book=None):
details=None, book=None, priority=JobPriority.NORMAL):
posting = {
'uuid': uuid,
'name': name,
'priority': priority.value,
}
if created_on is not None:
posting['created_on'] = created_on

View File

@@ -260,6 +260,7 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
'name': book.name,
'uuid': book.uuid,
},
'priority': 'NORMAL',
'details': {},
}, jsonutils.loads(misc.binary_decode(paths[path_key]['data'])))