From 8c1172b8b4e543107f6d3aea4ce02fbb40c1e4db Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 5 Feb 2016 10:55:39 -0800 Subject: [PATCH] Don't bother scanning for workers if no new messages arrived If the worker finder has not gotten any new notification messages letting it know about new (or updated) workers we can just skip trying to match existing waiting work to workers as without messages being processed the match will still not work (as the worker data doesn't change without those messages being processed). Change-Id: I41d50c676f04f85c49a03d9d503da1955af45f7d --- taskflow/engines/worker_based/executor.py | 27 +++++++++++++++++------ taskflow/engines/worker_based/types.py | 6 +++++ 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 6ba29ad7..d2749530 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -74,6 +74,9 @@ class WorkerTaskExecutor(executor.TaskExecutor): before_join=lambda t: p_worker.stop(), after_join=lambda t: p_worker.reset(), before_start=lambda t: p_worker.reset()) + self._messages_processed = { + 'finder': self._finder.messages_processed, + } def _process_response(self, response, message): """Process response from remote side.""" @@ -151,9 +154,7 @@ class WorkerTaskExecutor(executor.TaskExecutor): if request.expired: expired_requests[request_uuid] = request elif request.state == pr.WAITING: - worker = self._finder.get_worker_for_task(request.task) - if worker is not None: - waiting_requests[request_uuid] = (request, worker) + waiting_requests[request_uuid] = request if expired_requests: with self._ongoing_requests_lock: while expired_requests: @@ -161,10 +162,21 @@ class WorkerTaskExecutor(executor.TaskExecutor): if self._handle_expired_request(request): del self._ongoing_requests[request_uuid] if waiting_requests: - while waiting_requests: - request_uuid, (request, worker) = waiting_requests.popitem() - if request.transition_and_log_error(pr.PENDING, logger=LOG): - self._publish_request(request, worker) + finder = self._finder + new_messages_processed = finder.messages_processed + last_messages_processed = self._messages_processed['finder'] + if new_messages_processed > last_messages_processed: + # Some new message got to the finder, so we can see + # if any new workers match (if no new messages have been + # processed we might as well not do anything). + while waiting_requests: + _request_uuid, request = waiting_requests.popitem() + worker = finder.get_worker_for_task(request.task) + if (worker is not None and + request.transition_and_log_error(pr.PENDING, + logger=LOG)): + self._publish_request(request, worker) + self._messages_processed['finder'] = new_messages_processed def _submit_task(self, task, task_uuid, action, arguments, progress_callback=None, **kwargs): @@ -248,3 +260,4 @@ class WorkerTaskExecutor(executor.TaskExecutor): _request_uuid, request = self._ongoing_requests.popitem() self._handle_expired_request(request) self._finder.clear() + self._messages_processed['finder'] = self._finder.messages_processed diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index 2cbe298b..c8f11e28 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -152,6 +152,11 @@ class ProxyWorkerFinder(WorkerFinder): response=True)), }) self._counter = itertools.count() + self._messages_processed = 0 + + @property + def messages_processed(self): + return self._messages_processed def _next_worker(self, topic, tasks, temporary=False): if not temporary: @@ -199,6 +204,7 @@ class ProxyWorkerFinder(WorkerFinder): LOG.debug("Updated worker '%s' (%s total workers are" " currently known)", worker, self._total_workers()) self._cond.notify_all() + self._messages_processed += 1 def clear(self): with self._cond: