Merge "Add a new wait() method that waits for jobs to arrive"
This commit is contained in:
@@ -18,6 +18,7 @@ import collections
|
|||||||
import contextlib
|
import contextlib
|
||||||
import functools
|
import functools
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
|
|
||||||
from concurrent import futures
|
from concurrent import futures
|
||||||
from kazoo import exceptions as k_exceptions
|
from kazoo import exceptions as k_exceptions
|
||||||
@@ -230,12 +231,6 @@ class ZookeeperJobBoardIterator(six.Iterator):
|
|||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def _fetch_jobs(self):
|
|
||||||
if self.ensure_fresh:
|
|
||||||
self._board._force_refresh()
|
|
||||||
with self._board._job_mutate:
|
|
||||||
return sorted(six.itervalues(self._board._known_jobs))
|
|
||||||
|
|
||||||
def _next_job(self):
|
def _next_job(self):
|
||||||
if self.only_unclaimed:
|
if self.only_unclaimed:
|
||||||
allowed_states = UNCLAIMED_JOB_STATES
|
allowed_states = UNCLAIMED_JOB_STATES
|
||||||
@@ -251,14 +246,14 @@ class ZookeeperJobBoardIterator(six.Iterator):
|
|||||||
LOG.warn("Failed determining the state of job: %s (%s)",
|
LOG.warn("Failed determining the state of job: %s (%s)",
|
||||||
maybe_job.uuid, maybe_job.path, exc_info=True)
|
maybe_job.uuid, maybe_job.path, exc_info=True)
|
||||||
except excp.NotFound:
|
except excp.NotFound:
|
||||||
with self._board._job_mutate:
|
self._board._remove_job(maybe_job.path)
|
||||||
self._board._remove_job(maybe_job.path)
|
|
||||||
return job
|
return job
|
||||||
|
|
||||||
def __next__(self):
|
def __next__(self):
|
||||||
if not self._jobs:
|
if not self._jobs:
|
||||||
if not self._fetched:
|
if not self._fetched:
|
||||||
self._jobs.extend(self._fetch_jobs())
|
jobs = self._board._fetch_jobs(ensure_fresh=self.ensure_fresh)
|
||||||
|
self._jobs.extend(jobs)
|
||||||
self._fetched = True
|
self._fetched = True
|
||||||
job = self._next_job()
|
job = self._next_job()
|
||||||
if job is None:
|
if job is None:
|
||||||
@@ -291,8 +286,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
|||||||
self._persistence = persistence
|
self._persistence = persistence
|
||||||
# Misc. internal details
|
# Misc. internal details
|
||||||
self._known_jobs = {}
|
self._known_jobs = {}
|
||||||
self._job_mutate = self._client.handler.rlock_object()
|
self._job_lock = threading.RLock()
|
||||||
self._open_close_lock = self._client.handler.rlock_object()
|
self._job_cond = threading.Condition(self._job_lock)
|
||||||
|
self._open_close_lock = threading.RLock()
|
||||||
self._client.add_listener(self._state_change_listener)
|
self._client.add_listener(self._state_change_listener)
|
||||||
self._bad_paths = frozenset([path])
|
self._bad_paths = frozenset([path])
|
||||||
self._job_watcher = None
|
self._job_watcher = None
|
||||||
@@ -313,9 +309,15 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def job_count(self):
|
def job_count(self):
|
||||||
with self._job_mutate:
|
with self._job_lock:
|
||||||
return len(self._known_jobs)
|
return len(self._known_jobs)
|
||||||
|
|
||||||
|
def _fetch_jobs(self, ensure_fresh=False):
|
||||||
|
if ensure_fresh:
|
||||||
|
self._force_refresh()
|
||||||
|
with self._job_lock:
|
||||||
|
return sorted(six.itervalues(self._known_jobs))
|
||||||
|
|
||||||
def _force_refresh(self):
|
def _force_refresh(self):
|
||||||
try:
|
try:
|
||||||
children = self._client.get_children(self.path)
|
children = self._client.get_children(self.path)
|
||||||
@@ -338,32 +340,18 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
|||||||
|
|
||||||
def _remove_job(self, path):
|
def _remove_job(self, path):
|
||||||
LOG.debug("Removing job that was at path: %s", path)
|
LOG.debug("Removing job that was at path: %s", path)
|
||||||
job = self._known_jobs.pop(path, None)
|
with self._job_lock:
|
||||||
|
job = self._known_jobs.pop(path, None)
|
||||||
if job is not None:
|
if job is not None:
|
||||||
self._emit(jobboard.REMOVAL,
|
self._emit(jobboard.REMOVAL, details={'job': job})
|
||||||
details={
|
|
||||||
'job': job,
|
|
||||||
})
|
|
||||||
|
|
||||||
def _process_child(self, path, request):
|
def _process_child(self, path, request):
|
||||||
"""Receives the result of a child data fetch request."""
|
"""Receives the result of a child data fetch request."""
|
||||||
|
job = None
|
||||||
try:
|
try:
|
||||||
raw_data, node_stat = request.get()
|
raw_data, node_stat = request.get()
|
||||||
job_data = misc.decode_json(raw_data)
|
job_data = misc.decode_json(raw_data)
|
||||||
created_on = misc.millis_to_datetime(node_stat.ctime)
|
created_on = misc.millis_to_datetime(node_stat.ctime)
|
||||||
with self._job_mutate:
|
|
||||||
if path not in self._known_jobs:
|
|
||||||
job = ZookeeperJob(job_data['name'], self,
|
|
||||||
self._client, self._persistence, path,
|
|
||||||
uuid=job_data['uuid'],
|
|
||||||
book_data=job_data.get("book"),
|
|
||||||
details=job_data.get("details", {}),
|
|
||||||
created_on=created_on)
|
|
||||||
self._known_jobs[path] = job
|
|
||||||
self._emit(jobboard.POSTED,
|
|
||||||
details={
|
|
||||||
'job': job,
|
|
||||||
})
|
|
||||||
except (ValueError, TypeError, KeyError):
|
except (ValueError, TypeError, KeyError):
|
||||||
LOG.warn("Incorrectly formatted job data found at path: %s",
|
LOG.warn("Incorrectly formatted job data found at path: %s",
|
||||||
path, exc_info=True)
|
path, exc_info=True)
|
||||||
@@ -379,6 +367,22 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
|||||||
except k_exceptions.KazooException:
|
except k_exceptions.KazooException:
|
||||||
LOG.warn("Internal error fetching job data from path: %s",
|
LOG.warn("Internal error fetching job data from path: %s",
|
||||||
path, exc_info=True)
|
path, exc_info=True)
|
||||||
|
else:
|
||||||
|
self._job_cond.acquire()
|
||||||
|
try:
|
||||||
|
if path not in self._known_jobs:
|
||||||
|
job = ZookeeperJob(job_data['name'], self,
|
||||||
|
self._client, self._persistence, path,
|
||||||
|
uuid=job_data['uuid'],
|
||||||
|
book_data=job_data.get("book"),
|
||||||
|
details=job_data.get("details", {}),
|
||||||
|
created_on=created_on)
|
||||||
|
self._known_jobs[path] = job
|
||||||
|
self._job_cond.notify_all()
|
||||||
|
finally:
|
||||||
|
self._job_cond.release()
|
||||||
|
if job is not None:
|
||||||
|
self._emit(jobboard.POSTED, details={'job': job})
|
||||||
|
|
||||||
def _on_job_posting(self, children, delayed=True):
|
def _on_job_posting(self, children, delayed=True):
|
||||||
LOG.debug("Got children %s under path %s", children, self.path)
|
LOG.debug("Got children %s under path %s", children, self.path)
|
||||||
@@ -390,7 +394,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
|||||||
child_paths.append(k_paths.join(self.path, c))
|
child_paths.append(k_paths.join(self.path, c))
|
||||||
|
|
||||||
# Remove jobs that we know about but which are no longer children
|
# Remove jobs that we know about but which are no longer children
|
||||||
with self._job_mutate:
|
with self._job_lock:
|
||||||
removals = set()
|
removals = set()
|
||||||
for path, _job in six.iteritems(self._known_jobs):
|
for path, _job in six.iteritems(self._known_jobs):
|
||||||
if path not in child_paths:
|
if path not in child_paths:
|
||||||
@@ -402,7 +406,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
|||||||
for path in child_paths:
|
for path in child_paths:
|
||||||
if path in self._bad_paths:
|
if path in self._bad_paths:
|
||||||
continue
|
continue
|
||||||
with self._job_mutate:
|
with self._job_lock:
|
||||||
if path not in self._known_jobs:
|
if path not in self._known_jobs:
|
||||||
# Fire off the request to populate this job asynchronously.
|
# Fire off the request to populate this job asynchronously.
|
||||||
#
|
#
|
||||||
@@ -450,8 +454,13 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
|||||||
self._persistence, job_path,
|
self._persistence, job_path,
|
||||||
book=book, details=details,
|
book=book, details=details,
|
||||||
uuid=job_uuid)
|
uuid=job_uuid)
|
||||||
with self._job_mutate:
|
self._job_cond.acquire()
|
||||||
|
try:
|
||||||
self._known_jobs[job_path] = job
|
self._known_jobs[job_path] = job
|
||||||
|
self._job_cond.notify_all()
|
||||||
|
finally:
|
||||||
|
self._job_cond.release()
|
||||||
|
self._emit(jobboard.POSTED, details={'job': job})
|
||||||
return job
|
return job
|
||||||
|
|
||||||
def claim(self, job, who):
|
def claim(self, job, who):
|
||||||
@@ -487,7 +496,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
|||||||
if not job_path:
|
if not job_path:
|
||||||
raise ValueError("Unable to check if %r is a known path"
|
raise ValueError("Unable to check if %r is a known path"
|
||||||
% (job_path))
|
% (job_path))
|
||||||
with self._job_mutate:
|
with self._job_lock:
|
||||||
if job_path not in self._known_jobs:
|
if job_path not in self._known_jobs:
|
||||||
fail_msg_tpl += ", unknown job"
|
fail_msg_tpl += ", unknown job"
|
||||||
raise excp.NotFound(fail_msg_tpl % (job_uuid))
|
raise excp.NotFound(fail_msg_tpl % (job_uuid))
|
||||||
@@ -540,8 +549,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
|||||||
with self._client.transaction() as txn:
|
with self._client.transaction() as txn:
|
||||||
txn.delete(job.lock_path, version=lock_stat.version)
|
txn.delete(job.lock_path, version=lock_stat.version)
|
||||||
txn.delete(job.path, version=data_stat.version)
|
txn.delete(job.path, version=data_stat.version)
|
||||||
with self._job_mutate:
|
self._remove_job(job.path)
|
||||||
self._remove_job(job.path)
|
|
||||||
|
|
||||||
def abandon(self, job, who):
|
def abandon(self, job, who):
|
||||||
_check_who(who)
|
_check_who(who)
|
||||||
@@ -564,9 +572,40 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
|||||||
LOG.debug("Kazoo client has changed to state: %s", state)
|
LOG.debug("Kazoo client has changed to state: %s", state)
|
||||||
|
|
||||||
def _clear(self):
|
def _clear(self):
|
||||||
with self._job_mutate:
|
with self._job_lock:
|
||||||
self._known_jobs = {}
|
self._known_jobs.clear()
|
||||||
self._job_watcher = None
|
self._job_watcher = None
|
||||||
|
|
||||||
|
def wait(self, timeout=None):
|
||||||
|
# Wait until timeout expires (or forever) for jobs to appear.
|
||||||
|
watch = None
|
||||||
|
if timeout is not None:
|
||||||
|
watch = misc.StopWatch(duration=float(timeout))
|
||||||
|
watch.start()
|
||||||
|
self._job_cond.acquire()
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
if not self._known_jobs:
|
||||||
|
if watch and watch.expired():
|
||||||
|
raise excp.NotFound("Expired waiting for jobs to"
|
||||||
|
" arrive; waited %s seconds"
|
||||||
|
% watch.elapsed())
|
||||||
|
# This is done since the given timeout can not be provided
|
||||||
|
# to the condition variable, since we can not ensure that
|
||||||
|
# when we acquire the condition that there will actually
|
||||||
|
# be jobs (especially if we are spuriously awaken), so we
|
||||||
|
# must recalculate the amount of time we really have left.
|
||||||
|
timeout = None
|
||||||
|
if watch is not None:
|
||||||
|
timeout = watch.leftover()
|
||||||
|
self._job_cond.wait(timeout)
|
||||||
|
else:
|
||||||
|
it = ZookeeperJobBoardIterator(self)
|
||||||
|
it._jobs.extend(self._fetch_jobs())
|
||||||
|
it._fetched = True
|
||||||
|
return it
|
||||||
|
finally:
|
||||||
|
self._job_cond.release()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def connected(self):
|
def connected(self):
|
||||||
|
@@ -58,6 +58,21 @@ class JobBoard(object):
|
|||||||
support this argument.
|
support this argument.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def wait(self, timeout=None):
|
||||||
|
"""Waits a given amount of time for job/s to be posted, when jobs are
|
||||||
|
found then an iterator will be returned that contains the jobs at
|
||||||
|
the given point in time.
|
||||||
|
|
||||||
|
NOTE(harlowja): since a jobboard can be mutated on by multiple external
|
||||||
|
entities at the *same* time the iterator that can be returned *may*
|
||||||
|
still be empty due to other entities removing those jobs after the
|
||||||
|
iterator has been created (be aware of this when using it).
|
||||||
|
|
||||||
|
:param timeout: float that indicates how long to wait for a job to
|
||||||
|
appear (if None then waits forever).
|
||||||
|
"""
|
||||||
|
|
||||||
@abc.abstractproperty
|
@abc.abstractproperty
|
||||||
def job_count(self):
|
def job_count(self):
|
||||||
"""Returns how many jobs are on this jobboard (this count may change as
|
"""Returns how many jobs are on this jobboard (this count may change as
|
||||||
|
@@ -16,6 +16,8 @@
|
|||||||
|
|
||||||
import contextlib
|
import contextlib
|
||||||
import mock
|
import mock
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
@@ -102,6 +104,36 @@ class TestZookeeperJobs(test.TestCase):
|
|||||||
jobs = list(self.board.iterjobs(ensure_fresh=True))
|
jobs = list(self.board.iterjobs(ensure_fresh=True))
|
||||||
self.assertEqual(1, len(jobs))
|
self.assertEqual(1, len(jobs))
|
||||||
|
|
||||||
|
def test_wait_timeout(self):
|
||||||
|
with connect_close(self.board):
|
||||||
|
self.assertRaises(excp.NotFound, self.board.wait, timeout=0.1)
|
||||||
|
|
||||||
|
def test_wait_arrival(self):
|
||||||
|
ev = threading.Event()
|
||||||
|
jobs = []
|
||||||
|
|
||||||
|
def poster(wait_post=0.2):
|
||||||
|
ev.wait() # wait until the waiter is active
|
||||||
|
time.sleep(wait_post)
|
||||||
|
self.board.post('test', p_utils.temporary_log_book())
|
||||||
|
|
||||||
|
def waiter():
|
||||||
|
ev.set()
|
||||||
|
it = self.board.wait()
|
||||||
|
jobs.extend(it)
|
||||||
|
|
||||||
|
with connect_close(self.board):
|
||||||
|
t1 = threading.Thread(target=poster)
|
||||||
|
t1.daemon = True
|
||||||
|
t1.start()
|
||||||
|
t2 = threading.Thread(target=waiter)
|
||||||
|
t2.daemon = True
|
||||||
|
t2.start()
|
||||||
|
for t in (t1, t2):
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
self.assertEqual(1, len(jobs))
|
||||||
|
|
||||||
def test_posting_received_raw(self):
|
def test_posting_received_raw(self):
|
||||||
book = p_utils.temporary_log_book()
|
book = p_utils.temporary_log_book()
|
||||||
|
|
||||||
|
@@ -460,6 +460,16 @@ class StopWatch(object):
|
|||||||
# NOTE(harlowja): don't silence the exception.
|
# NOTE(harlowja): don't silence the exception.
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def leftover(self):
|
||||||
|
if self._duration is None:
|
||||||
|
raise RuntimeError("Can not get the leftover time of a watch that"
|
||||||
|
" has no duration")
|
||||||
|
if self._state != self._STARTED:
|
||||||
|
raise RuntimeError("Can not get the leftover time of a stopwatch"
|
||||||
|
" that has not been started")
|
||||||
|
end_time = self._started_at + self._duration
|
||||||
|
return max(0.0, end_time - wallclock())
|
||||||
|
|
||||||
def expired(self):
|
def expired(self):
|
||||||
if self._duration is None:
|
if self._duration is None:
|
||||||
return False
|
return False
|
||||||
|
Reference in New Issue
Block a user