Merge "Don't bother scanning for workers if no new messages arrived"
This commit is contained in:
@@ -74,6 +74,9 @@ class WorkerTaskExecutor(executor.TaskExecutor):
|
||||
before_join=lambda t: p_worker.stop(),
|
||||
after_join=lambda t: p_worker.reset(),
|
||||
before_start=lambda t: p_worker.reset())
|
||||
self._messages_processed = {
|
||||
'finder': self._finder.messages_processed,
|
||||
}
|
||||
|
||||
def _process_response(self, response, message):
|
||||
"""Process response from remote side."""
|
||||
@@ -151,9 +154,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
|
||||
if request.expired:
|
||||
expired_requests[request_uuid] = request
|
||||
elif request.state == pr.WAITING:
|
||||
worker = self._finder.get_worker_for_task(request.task)
|
||||
if worker is not None:
|
||||
waiting_requests[request_uuid] = (request, worker)
|
||||
waiting_requests[request_uuid] = request
|
||||
if expired_requests:
|
||||
with self._ongoing_requests_lock:
|
||||
while expired_requests:
|
||||
@@ -161,10 +162,21 @@ class WorkerTaskExecutor(executor.TaskExecutor):
|
||||
if self._handle_expired_request(request):
|
||||
del self._ongoing_requests[request_uuid]
|
||||
if waiting_requests:
|
||||
finder = self._finder
|
||||
new_messages_processed = finder.messages_processed
|
||||
last_messages_processed = self._messages_processed['finder']
|
||||
if new_messages_processed > last_messages_processed:
|
||||
# Some new message got to the finder, so we can see
|
||||
# if any new workers match (if no new messages have been
|
||||
# processed we might as well not do anything).
|
||||
while waiting_requests:
|
||||
request_uuid, (request, worker) = waiting_requests.popitem()
|
||||
if request.transition_and_log_error(pr.PENDING, logger=LOG):
|
||||
_request_uuid, request = waiting_requests.popitem()
|
||||
worker = finder.get_worker_for_task(request.task)
|
||||
if (worker is not None and
|
||||
request.transition_and_log_error(pr.PENDING,
|
||||
logger=LOG)):
|
||||
self._publish_request(request, worker)
|
||||
self._messages_processed['finder'] = new_messages_processed
|
||||
|
||||
def _submit_task(self, task, task_uuid, action, arguments,
|
||||
progress_callback=None, **kwargs):
|
||||
@@ -248,3 +260,4 @@ class WorkerTaskExecutor(executor.TaskExecutor):
|
||||
_request_uuid, request = self._ongoing_requests.popitem()
|
||||
self._handle_expired_request(request)
|
||||
self._finder.clear()
|
||||
self._messages_processed['finder'] = self._finder.messages_processed
|
||||
|
||||
@@ -152,6 +152,11 @@ class ProxyWorkerFinder(WorkerFinder):
|
||||
response=True)),
|
||||
})
|
||||
self._counter = itertools.count()
|
||||
self._messages_processed = 0
|
||||
|
||||
@property
|
||||
def messages_processed(self):
|
||||
return self._messages_processed
|
||||
|
||||
def _next_worker(self, topic, tasks, temporary=False):
|
||||
if not temporary:
|
||||
@@ -199,6 +204,7 @@ class ProxyWorkerFinder(WorkerFinder):
|
||||
LOG.debug("Updated worker '%s' (%s total workers are"
|
||||
" currently known)", worker, self._total_workers())
|
||||
self._cond.notify_all()
|
||||
self._messages_processed += 1
|
||||
|
||||
def clear(self):
|
||||
with self._cond:
|
||||
|
||||
Reference in New Issue
Block a user