Don't bother scanning for workers if no new messages arrived

If the worker finder has not gotten any new notification messages
letting it know about new (or updated) workers we can just skip
trying to match existing waiting work to workers as without messages
being processed the match will still not work (as the worker data
doesn't change without those messages being processed).

Change-Id: I41d50c676f04f85c49a03d9d503da1955af45f7d
This commit is contained in:
Joshua Harlow
2016-02-05 10:55:39 -08:00
committed by Joshua Harlow
parent cea71f2799
commit 8c1172b8b4
2 changed files with 26 additions and 7 deletions

View File

@@ -74,6 +74,9 @@ class WorkerTaskExecutor(executor.TaskExecutor):
before_join=lambda t: p_worker.stop(),
after_join=lambda t: p_worker.reset(),
before_start=lambda t: p_worker.reset())
self._messages_processed = {
'finder': self._finder.messages_processed,
}
def _process_response(self, response, message):
"""Process response from remote side."""
@@ -151,9 +154,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
if request.expired:
expired_requests[request_uuid] = request
elif request.state == pr.WAITING:
worker = self._finder.get_worker_for_task(request.task)
if worker is not None:
waiting_requests[request_uuid] = (request, worker)
waiting_requests[request_uuid] = request
if expired_requests:
with self._ongoing_requests_lock:
while expired_requests:
@@ -161,10 +162,21 @@ class WorkerTaskExecutor(executor.TaskExecutor):
if self._handle_expired_request(request):
del self._ongoing_requests[request_uuid]
if waiting_requests:
while waiting_requests:
request_uuid, (request, worker) = waiting_requests.popitem()
if request.transition_and_log_error(pr.PENDING, logger=LOG):
self._publish_request(request, worker)
finder = self._finder
new_messages_processed = finder.messages_processed
last_messages_processed = self._messages_processed['finder']
if new_messages_processed > last_messages_processed:
# Some new message got to the finder, so we can see
# if any new workers match (if no new messages have been
# processed we might as well not do anything).
while waiting_requests:
_request_uuid, request = waiting_requests.popitem()
worker = finder.get_worker_for_task(request.task)
if (worker is not None and
request.transition_and_log_error(pr.PENDING,
logger=LOG)):
self._publish_request(request, worker)
self._messages_processed['finder'] = new_messages_processed
def _submit_task(self, task, task_uuid, action, arguments,
progress_callback=None, **kwargs):
@@ -248,3 +260,4 @@ class WorkerTaskExecutor(executor.TaskExecutor):
_request_uuid, request = self._ongoing_requests.popitem()
self._handle_expired_request(request)
self._finder.clear()
self._messages_processed['finder'] = self._finder.messages_processed

View File

@@ -152,6 +152,11 @@ class ProxyWorkerFinder(WorkerFinder):
response=True)),
})
self._counter = itertools.count()
self._messages_processed = 0
@property
def messages_processed(self):
return self._messages_processed
def _next_worker(self, topic, tasks, temporary=False):
if not temporary:
@@ -199,6 +204,7 @@ class ProxyWorkerFinder(WorkerFinder):
LOG.debug("Updated worker '%s' (%s total workers are"
" currently known)", worker, self._total_workers())
self._cond.notify_all()
self._messages_processed += 1
def clear(self):
with self._cond: