diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 6ad86d7b..1fb688b0 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -95,8 +95,8 @@ class WorkerTaskExecutor(executor.TaskExecutor): request = self._requests_cache.get(task_uuid) if request is not None: response = pr.Response.from_dict(response) - LOG.debug("Response with state '%s' received for '%s'", - response.state, request) + LOG.debug("Extracted response '%s' and matched it to" + " request '%s'", response, request) if response.state == pr.RUNNING: request.transition_and_log_error(pr.RUNNING, logger=LOG) elif response.state == pr.EVENT: diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index 44913064..63556c25 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -104,9 +104,10 @@ LOG = logging.getLogger(__name__) class Message(object): """Base class for all message types.""" - def __str__(self): - cls_name = reflection.get_class_name(self, fully_qualified=False) - return "<%s> %s" % (cls_name, self.to_dict()) + def __repr__(self): + return ("<%s object at 0x%x with contents %s>" + % (reflection.get_class_name(self, fully_qualified=False), + id(self), self.to_dict())) @abc.abstractmethod def to_dict(self): @@ -150,6 +151,14 @@ class Notify(Message): def __init__(self, **data): self._data = data + @property + def topic(self): + return self._data.get('topic') + + @property + def tasks(self): + return self._data.get('tasks') + def to_dict(self): return self._data diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index 09a41ab3..2a049aa1 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -206,18 +206,18 @@ class ProxyWorkerFinder(WorkerFinder): self._workers[topic] = worker return (worker, True) - def _process_response(self, response, message): - """Process notify message from remote side.""" - LOG.debug("Started processing notify message '%s'", + def _process_response(self, data, message): + """Process notify message sent from remote side.""" + LOG.debug("Started processing notify response message '%s'", ku.DelayedPretty(message)) - topic = response['topic'] - tasks = response['tasks'] + response = pr.Notify(**data) + LOG.debug("Extracted notify response '%s'", response) with self._cond: - worker, new_or_updated = self._add(topic, tasks) + worker, new_or_updated = self._add(response.topic, + response.tasks) if new_or_updated: - LOG.debug("Received notification about worker '%s' (%s" - " total workers are currently known)", worker, - self._total_workers()) + LOG.debug("Updated worker '%s' (%s total workers are" + " currently known)", worker, self._total_workers()) self._cond.notify_all() if new_or_updated: self.notifier.notify(self.WORKER_ARRIVED, {'worker': worker})