Increase/adjust the logging of the WBE response/send activities
Change-Id: I1d8309ce87114a0890dfc93a0a2c4b68f80ef828
This commit is contained in:
@@ -98,11 +98,14 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
|
||||
|
||||
def _process_notify(self, notify, message):
|
||||
"""Process notify message from remote side."""
|
||||
LOG.debug("Start processing notify message.")
|
||||
LOG.debug("Started processing notify message '%s'",
|
||||
message.delivery_tag)
|
||||
topic = notify['topic']
|
||||
tasks = notify['tasks']
|
||||
|
||||
# add worker info to the cache
|
||||
# Add worker info to the cache
|
||||
LOG.debug("Received that tasks %s can be processed by topic '%s'",
|
||||
tasks, topic)
|
||||
self._workers_arrival.acquire()
|
||||
try:
|
||||
self._workers_cache[topic] = tasks
|
||||
@@ -110,22 +113,25 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
|
||||
finally:
|
||||
self._workers_arrival.release()
|
||||
|
||||
# publish waiting requests
|
||||
# Publish waiting requests
|
||||
for request in self._requests_cache.get_waiting_requests(tasks):
|
||||
if request.transition_and_log_error(pr.PENDING, logger=LOG):
|
||||
self._publish_request(request, topic)
|
||||
|
||||
def _process_response(self, response, message):
|
||||
"""Process response from remote side."""
|
||||
LOG.debug("Start processing response message.")
|
||||
LOG.debug("Started processing response message '%s'",
|
||||
message.delivery_tag)
|
||||
try:
|
||||
task_uuid = message.properties['correlation_id']
|
||||
except KeyError:
|
||||
LOG.warning("The 'correlation_id' message property is missing.")
|
||||
LOG.warning("The 'correlation_id' message property is missing")
|
||||
else:
|
||||
request = self._requests_cache.get(task_uuid)
|
||||
if request is not None:
|
||||
response = pr.Response.from_dict(response)
|
||||
LOG.debug("Response with state '%s' received for '%s'",
|
||||
response.state, request)
|
||||
if response.state == pr.RUNNING:
|
||||
request.transition_and_log_error(pr.RUNNING, logger=LOG)
|
||||
elif response.state == pr.PROGRESS:
|
||||
@@ -144,7 +150,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
|
||||
LOG.warning("Unexpected response status: '%s'",
|
||||
response.state)
|
||||
else:
|
||||
LOG.debug("Request with id='%s' not found.", task_uuid)
|
||||
LOG.debug("Request with id='%s' not found", task_uuid)
|
||||
|
||||
@staticmethod
|
||||
def _handle_expired_request(request):
|
||||
@@ -191,12 +197,18 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
|
||||
self._requests_cache[request.uuid] = request
|
||||
self._publish_request(request, topic)
|
||||
else:
|
||||
LOG.debug("Delaying submission of '%s', no currently known"
|
||||
" worker/s available to process it", request)
|
||||
self._requests_cache[request.uuid] = request
|
||||
|
||||
return request.result
|
||||
|
||||
def _publish_request(self, request, topic):
|
||||
"""Publish request to a given topic."""
|
||||
LOG.debug("Submitting execution of '%s' to topic '%s' (expecting"
|
||||
" response identified by reply_to=%s and"
|
||||
" correlation_id=%s)", request, topic, self._uuid,
|
||||
request.uuid)
|
||||
try:
|
||||
self._proxy.publish(msg=request,
|
||||
routing_key=topic,
|
||||
@@ -204,7 +216,8 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
|
||||
correlation_id=request.uuid)
|
||||
except Exception:
|
||||
with misc.capture_failure() as failure:
|
||||
LOG.exception("Failed to submit the '%s' request.", request)
|
||||
LOG.warn("Failed to submit '%s' (transitioning it to"
|
||||
" %s)", request, pr.FAILURE, exc_info=True)
|
||||
if request.transition_and_log_error(pr.FAILURE, logger=LOG):
|
||||
del self._requests_cache[request.uuid]
|
||||
request.set_result(failure)
|
||||
|
||||
@@ -95,11 +95,11 @@ class Proxy(object):
|
||||
|
||||
def publish(self, msg, routing_key, **kwargs):
|
||||
"""Publish message to the named exchange with given routing key."""
|
||||
LOG.debug("Sending %s", msg)
|
||||
if isinstance(routing_key, six.string_types):
|
||||
routing_keys = [routing_key]
|
||||
else:
|
||||
routing_keys = routing_key
|
||||
LOG.debug("Sending '%s' using routing keys %s", msg, routing_keys)
|
||||
with kombu.producers[self._conn].acquire(block=True) as producer:
|
||||
for routing_key in routing_keys:
|
||||
queue = self._make_queue(routing_key, self._exchange)
|
||||
|
||||
Reference in New Issue
Block a user