Merge "Add WBE worker expiry"

This commit is contained in:
Jenkins
2016-02-15 05:22:43 +00:00
committed by Gerrit Code Review
6 changed files with 78 additions and 12 deletions

View File

@@ -44,6 +44,12 @@ class WorkerBasedActionEngine(engine.ActionEngine):
options imply and are expected to be)
:param retry_options: retry specific options
(see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`)
:param worker_expiry: numeric value (or negative/zero/None for
infinite) that defines the number of seconds to
continue to send messages to workers that
have **not** responded back to a prior
notification/ping request (this defaults
to 60 seconds).
"""
def __init__(self, flow, flow_detail, backend, options):
@@ -73,4 +79,7 @@ class WorkerBasedActionEngine(engine.ActionEngine):
transport=options.get('transport'),
transport_options=options.get('transport_options'),
transition_timeout=options.get('transition_timeout',
pr.REQUEST_TIMEOUT))
pr.REQUEST_TIMEOUT),
worker_expiry=options.get('worker_expiry',
pr.EXPIRES_AFTER),
)

View File

@@ -41,7 +41,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
def __init__(self, uuid, exchange, topics,
transition_timeout=pr.REQUEST_TIMEOUT,
url=None, transport=None, transport_options=None,
retry_options=None):
retry_options=None, worker_expiry=pr.EXPIRES_AFTER):
self._uuid = uuid
self._ongoing_requests = {}
self._ongoing_requests_lock = threading.RLock()
@@ -57,7 +57,8 @@ class WorkerTaskExecutor(executor.TaskExecutor):
# to workers to 'learn' of the tasks they can perform (and requires
# pre-existing knowledge of the topics those workers are on to gather
# and update this information).
self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics)
self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics,
worker_expiry=worker_expiry)
self._proxy.dispatcher.type_handlers.update({
pr.RESPONSE: dispatcher.Handler(self._process_response,
validator=pr.Response.validate),
@@ -181,6 +182,9 @@ class WorkerTaskExecutor(executor.TaskExecutor):
"""This function is called cyclically between draining events."""
# Publish any finding messages (used to locate workers).
self._finder.maybe_publish()
# If the finder hasn't heard from workers in a given amount
# of time, then those workers are likely dead, so clean them out...
self._finder.clean()
# Process any expired requests or requests that have no current
# worker located (publish messages for those if we now do have
# a worker located).

View File

@@ -93,6 +93,10 @@ QUEUE_EXPIRE_TIMEOUT = REQUEST_TIMEOUT
# Workers notify period.
NOTIFY_PERIOD = 5
# When a worker hasn't notified in this many seconds, it will get expired from
# being used/targeted for further work.
EXPIRES_AFTER = 60
# Message types.
NOTIFY = 'NOTIFY'
REQUEST = 'REQUEST'

View File

@@ -44,6 +44,7 @@ class TopicWorker(object):
self.tasks.append(task)
self.topic = topic
self.identity = identity
self.last_seen = None
def performs(self, task):
if not isinstance(task, six.string_types):
@@ -80,7 +81,8 @@ class ProxyWorkerFinder(object):
"""Requests and receives responses about workers topic+task details."""
def __init__(self, uuid, proxy, topics,
beat_periodicity=pr.NOTIFY_PERIOD):
beat_periodicity=pr.NOTIFY_PERIOD,
worker_expiry=pr.EXPIRES_AFTER):
self._cond = threading.Condition()
self._proxy = proxy
self._topics = topics
@@ -89,8 +91,10 @@ class ProxyWorkerFinder(object):
self._seen_workers = 0
self._messages_processed = 0
self._messages_published = 0
self._worker_expiry = worker_expiry
self._watch = timeutils.StopWatch(duration=beat_periodicity)
@property
def total_workers(self):
"""Number of workers currently known."""
return len(self._workers)
@@ -109,9 +113,9 @@ class ProxyWorkerFinder(object):
watch = timeutils.StopWatch(duration=timeout)
watch.start()
with self._cond:
while len(self._workers) < workers:
while self.total_workers < workers:
if watch.expired():
return max(0, workers - len(self._workers))
return max(0, workers - self.total_workers)
self._cond.wait(watch.leftover(return_none=True))
return 0
@@ -192,11 +196,41 @@ class ProxyWorkerFinder(object):
response.tasks)
if new_or_updated:
LOG.debug("Updated worker '%s' (%s total workers are"
" currently known)", worker, len(self._workers))
" currently known)", worker, self.total_workers)
self._cond.notify_all()
worker.last_seen = timeutils.now()
self._messages_processed += 1
def clean(self):
"""Cleans out any dead/expired/not responding workers.
Returns how many workers were removed.
"""
if (not self._workers or
(self._worker_expiry is None or self._worker_expiry <= 0)):
return 0
dead_workers = {}
with self._cond:
now = timeutils.now()
for topic, worker in six.iteritems(self._workers):
if worker.last_seen is None:
continue
secs_since_last_seen = max(0, now - worker.last_seen)
if secs_since_last_seen >= self._worker_expiry:
dead_workers[topic] = (worker, secs_since_last_seen)
for topic in six.iterkeys(dead_workers):
self._workers.pop(topic)
if dead_workers:
self._cond.notify_all()
if dead_workers and LOG.isEnabledFor(logging.INFO):
for worker, secs_since_last_seen in six.itervalues(dead_workers):
LOG.info("Removed worker '%s' as it has not responded to"
" notification requests in %0.3f seconds",
worker, secs_since_last_seen)
return len(dead_workers)
def reset(self):
"""Resets finders internal state."""
with self._cond:
self._workers.clear()
self._messages_processed = 0

View File

@@ -50,7 +50,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
transport=None,
transport_options=None,
transition_timeout=mock.ANY,
retry_options=None)
retry_options=None,
worker_expiry=mock.ANY)
]
self.assertEqual(expected_calls, self.master_mock.mock_calls)
@@ -66,7 +67,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
transport_options={},
transition_timeout=200,
topics=topics,
retry_options={})
retry_options={},
worker_expiry=1)
expected_calls = [
mock.call.executor_class(uuid=eng.storage.flow_uuid,
url=broker_url,
@@ -75,7 +77,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
transport='memory',
transport_options={},
transition_timeout=200,
retry_options={})
retry_options={},
worker_expiry=1)
]
self.assertEqual(expected_calls, self.master_mock.mock_calls)

View File

@@ -33,12 +33,24 @@ class TestTopicWorker(test.TestCase):
class TestProxyFinder(test.TestCase):
@mock.patch("oslo_utils.timeutils.now")
def test_expiry(self, mock_now):
finder = worker_types.ProxyWorkerFinder('me', mock.MagicMock(), [],
worker_expiry=60)
w, emit = finder._add('dummy-topic', [utils.DummyTask])
w.last_seen = 0
mock_now.side_effect = [120]
gone = finder.clean()
self.assertEqual(0, finder.total_workers)
self.assertEqual(1, gone)
def test_single_topic_worker(self):
finder = worker_types.ProxyWorkerFinder('me', mock.MagicMock(), [])
w, emit = finder._add('dummy-topic', [utils.DummyTask])
self.assertIsNotNone(w)
self.assertTrue(emit)
self.assertEqual(1, finder.total_workers())
self.assertEqual(1, finder.total_workers)
w2 = finder.get_worker_for_task(utils.DummyTask)
self.assertEqual(w.identity, w2.identity)
@@ -60,7 +72,7 @@ class TestProxyFinder(test.TestCase):
added.append(finder._add('dummy-topic', [utils.DummyTask]))
added.append(finder._add('dummy-topic-2', [utils.DummyTask]))
added.append(finder._add('dummy-topic-3', [utils.NastyTask]))
self.assertEqual(3, finder.total_workers())
self.assertEqual(3, finder.total_workers)
w = finder.get_worker_for_task(utils.NastyTask)
self.assertEqual(added[-1][0].identity, w.identity)
w = finder.get_worker_for_task(utils.DummyTask)