Remove need for separate notify thread

Instead of having a periodic notification thread that
will drop messages to try to find workers we can just
have this same work be done in the periodically called
on_wait callback that is already used for expiring and
matching workers to new/updated workers.

This avoids having one more thread that doesn't do all
that much (and activating it during waiting calls will
be often enough to achieve its goal in life).

Change-Id: If80233d13d914f2ed3665001a27627b78e6ee780
This commit is contained in:
Joshua Harlow
2016-02-05 12:44:27 -08:00
committed by Joshua Harlow
parent 6cff7b2721
commit a70bd8a7e5
3 changed files with 69 additions and 49 deletions

View File

@@ -17,7 +17,6 @@
import functools
import threading
from futurist import periodics
from oslo_utils import timeutils
import six
@@ -47,12 +46,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
self._ongoing_requests = {}
self._ongoing_requests_lock = threading.RLock()
self._transition_timeout = transition_timeout
type_handlers = {
pr.RESPONSE: dispatcher.Handler(self._process_response,
validator=pr.Response.validate),
}
self._proxy = proxy.Proxy(uuid, exchange,
type_handlers=type_handlers,
on_wait=self._on_wait, url=url,
transport=transport,
transport_options=transport_options,
@@ -64,16 +58,17 @@ class WorkerTaskExecutor(executor.TaskExecutor):
# pre-existing knowledge of the topics those workers are on to gather
# and update this information).
self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics)
self._helpers = tu.ThreadBundle()
self._helpers.bind(lambda: tu.daemon_thread(self._proxy.start),
after_start=lambda t: self._proxy.wait(),
before_join=lambda t: self._proxy.stop())
p_worker = periodics.PeriodicWorker.create([self._finder])
if p_worker:
self._helpers.bind(lambda: tu.daemon_thread(p_worker.start),
before_join=lambda t: p_worker.stop(),
after_join=lambda t: p_worker.reset(),
before_start=lambda t: p_worker.reset())
self._proxy.dispatcher.type_handlers.update({
pr.RESPONSE: dispatcher.Handler(self._process_response,
validator=pr.Response.validate),
pr.NOTIFY: dispatcher.Handler(
self._finder.process_response,
validator=functools.partial(pr.Notify.validate,
response=True)),
})
# Thread that will run the message dispatching (and periodically
# call the on_wait callback to do various things) loop...
self._helper = None
self._messages_processed = {
'finder': self._finder.messages_processed,
}
@@ -138,8 +133,9 @@ class WorkerTaskExecutor(executor.TaskExecutor):
return True
return False
def _on_wait(self):
"""This function is called cyclically between draining events."""
def _clean(self):
if not self._ongoing_requests:
return
with self._ongoing_requests_lock:
ongoing_requests_uuids = set(six.iterkeys(self._ongoing_requests))
waiting_requests = {}
@@ -178,6 +174,15 @@ class WorkerTaskExecutor(executor.TaskExecutor):
self._publish_request(request, worker)
self._messages_processed['finder'] = new_messages_processed
def _on_wait(self):
"""This function is called cyclically between draining events."""
# Publish any finding messages (used to locate workers).
self._finder.maybe_publish()
# Process any expired requests or requests that have no current
# worker located (publish messages for those if we now do have
# a worker located).
self._clean()
def _submit_task(self, task, task_uuid, action, arguments,
progress_callback=None, **kwargs):
"""Submit task request to a worker."""
@@ -249,15 +254,23 @@ class WorkerTaskExecutor(executor.TaskExecutor):
timeout=timeout)
def start(self):
"""Starts proxy thread and associated topic notification thread."""
self._helpers.start()
"""Starts message processing thread."""
if self._helper is not None:
raise RuntimeError("Worker executor must be stopped before"
" it can be started")
self._helper = tu.daemon_thread(self._proxy.start)
self._helper.start()
self._proxy.wait()
def stop(self):
"""Stops proxy thread and associated topic notification thread."""
self._helpers.stop()
"""Stops message processing thread."""
if self._helper is not None:
self._proxy.stop()
self._helper.join()
self._helper = None
with self._ongoing_requests_lock:
while self._ongoing_requests:
_request_uuid, request = self._ongoing_requests.popitem()
self._handle_expired_request(request)
self._finder.clear()
self._finder.reset()
self._messages_processed['finder'] = self._finder.messages_processed

View File

@@ -15,17 +15,13 @@
# under the License.
import abc
import functools
import itertools
import random
import threading
from futurist import periodics
from oslo_utils import reflection
from oslo_utils import timeutils
import six
from taskflow.engines.worker_based import dispatcher
from taskflow.engines.worker_based import protocol as pr
from taskflow import logging
from taskflow.utils import kombu_utils as ku
@@ -132,27 +128,21 @@ class WorkerFinder(object):
def get_worker_for_task(self, task):
"""Gets a worker that can perform a given task."""
def clear(self):
pass
class ProxyWorkerFinder(WorkerFinder):
"""Requests and receives responses about workers topic+task details."""
def __init__(self, uuid, proxy, topics):
def __init__(self, uuid, proxy, topics,
beat_periodicity=pr.NOTIFY_PERIOD):
super(ProxyWorkerFinder, self).__init__()
self._proxy = proxy
self._topics = topics
self._workers = {}
self._uuid = uuid
self._proxy.dispatcher.type_handlers.update({
pr.NOTIFY: dispatcher.Handler(
self._process_response,
validator=functools.partial(pr.Notify.validate,
response=True)),
})
self._counter = itertools.count()
self._seen_workers = 0
self._messages_processed = 0
self._messages_published = 0
self._watch = timeutils.StopWatch(duration=beat_periodicity)
@property
def messages_processed(self):
@@ -160,15 +150,30 @@ class ProxyWorkerFinder(WorkerFinder):
def _next_worker(self, topic, tasks, temporary=False):
if not temporary:
return TopicWorker(topic, tasks,
identity=six.next(self._counter))
w = TopicWorker(topic, tasks, identity=self._seen_workers)
self._seen_workers += 1
return w
else:
return TopicWorker(topic, tasks)
@periodics.periodic(pr.NOTIFY_PERIOD, run_immediately=True)
def beat(self):
"""Cyclically called to publish notify message to each topic."""
self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid)
def maybe_publish(self):
"""Periodically called to publish notify message to each topic.
These messages (especially the responses) are how this find learns
about workers and what tasks they can perform (so that we can then
match workers to tasks to run).
"""
if self._messages_published == 0:
self._proxy.publish(pr.Notify(),
self._topics, reply_to=self._uuid)
self._messages_published += 1
self._watch.restart()
else:
if self._watch.expired():
self._proxy.publish(pr.Notify(),
self._topics, reply_to=self._uuid)
self._messages_published += 1
self._watch.restart()
def _total_workers(self):
return len(self._workers)
@@ -191,7 +196,7 @@ class ProxyWorkerFinder(WorkerFinder):
self._workers[topic] = worker
return (worker, True)
def _process_response(self, data, message):
def process_response(self, data, message):
"""Process notify message sent from remote side."""
LOG.debug("Started processing notify response message '%s'",
ku.DelayedPretty(message))
@@ -206,9 +211,12 @@ class ProxyWorkerFinder(WorkerFinder):
self._cond.notify_all()
self._messages_processed += 1
def clear(self):
def reset(self):
with self._cond:
self._workers.clear()
self._messages_processed = 0
self._messages_published = 0
self._seen_workers = 0
self._cond.notify_all()
def get_worker_for_task(self, task):

View File

@@ -85,8 +85,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
on_wait=ex._on_wait,
url=self.broker_url, transport=mock.ANY,
transport_options=mock.ANY,
retry_options=mock.ANY,
type_handlers=mock.ANY),
retry_options=mock.ANY),
mock.call.proxy.dispatcher.type_handlers.update(mock.ANY),
]
self.assertEqual(master_mock_calls, self.master_mock.mock_calls)
@@ -284,7 +283,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT))
# start executor again
ex.start()
self.assertRaises(RuntimeError, ex.start)
# stop executor
ex.stop()