Use a collections.namedtuple for the request work unit

Instead of returning a tuple with hard to understand
and read contents a namedtuple can help understand what the
tuple is composed of and how it can be used/what it is,
so we should prefer to use it when we can for the wbe
requested work to perform.

Change-Id: I8147814449d04ba9a03730547ac113e4ef7b272a
This commit is contained in:
Joshua Harlow
2015-03-13 15:39:10 -07:00
parent 2232c5a88d
commit bf5164e36c
2 changed files with 23 additions and 17 deletions

View File

@@ -15,6 +15,7 @@
# under the License.
import abc
import collections
import threading
from concurrent import futures
@@ -171,6 +172,10 @@ class Notify(Message):
% (cls.TYPE, e.message), e)
_WorkUnit = collections.namedtuple('_WorkUnit', ['task_cls', 'task_name',
'action', 'arguments'])
class Request(Message):
"""Represents request with execution results.
@@ -371,7 +376,7 @@ class Request(Message):
@staticmethod
def from_dict(data, task_uuid=None):
"""Parses **validated** data before it can be further processed.
"""Parses **validated** data into a work unit.
All :py:class:`~taskflow.types.failure.Failure` objects that have been
converted to dict(s) on the remote side will now converted back
@@ -401,7 +406,7 @@ class Request(Message):
arguments['failures'] = {}
for task, fail_data in six.iteritems(failures):
arguments['failures'][task] = ft.Failure.from_dict(fail_data)
return (task_cls, task_name, action, arguments)
return _WorkUnit(task_cls, task_name, action, arguments)
class Response(Message):

View File

@@ -172,10 +172,9 @@ class Server(object):
reply_callback = functools.partial(self._reply, True, reply_to,
task_uuid)
# parse request to get task name, action and action arguments
# Parse the request to get the activity/work to perform.
try:
bundle = pr.Request.from_dict(request, task_uuid=task_uuid)
task_cls, task_name, action, arguments = bundle
work = pr.Request.from_dict(request, task_uuid=task_uuid)
except ValueError:
with misc.capture_failure() as failure:
LOG.warn("Failed to parse request contents from message '%s'",
@@ -183,34 +182,35 @@ class Server(object):
reply_callback(result=failure.to_dict())
return
# get task endpoint
# Now fetch the task endpoint (and action handler on it).
try:
endpoint = self._endpoints[task_cls]
endpoint = self._endpoints[work.task_cls]
except KeyError:
with misc.capture_failure() as failure:
LOG.warn("The '%s' task endpoint does not exist, unable"
" to continue processing request message '%s'",
task_cls, ku.DelayedPretty(message), exc_info=True)
work.task_cls, ku.DelayedPretty(message),
exc_info=True)
reply_callback(result=failure.to_dict())
return
else:
try:
handler = getattr(endpoint, action)
handler = getattr(endpoint, work.action)
except AttributeError:
with misc.capture_failure() as failure:
LOG.warn("The '%s' handler does not exist on task endpoint"
" '%s', unable to continue processing request"
" message '%s'", action, endpoint,
" message '%s'", work.action, endpoint,
ku.DelayedPretty(message), exc_info=True)
reply_callback(result=failure.to_dict())
return
else:
try:
task = endpoint.generate(name=task_name)
task = endpoint.generate(name=work.task_name)
except Exception:
with misc.capture_failure() as failure:
LOG.warn("The '%s' task '%s' generation for request"
" message '%s' failed", endpoint, action,
" message '%s' failed", endpoint, work.action,
ku.DelayedPretty(message), exc_info=True)
reply_callback(result=failure.to_dict())
return
@@ -218,7 +218,7 @@ class Server(object):
if not reply_callback(state=pr.RUNNING):
return
# associate *any* events this task emits with a proxy that will
# Associate *any* events this task emits with a proxy that will
# emit them back to the engine... for handling at the engine side
# of things...
if task.notifier.can_be_registered(nt.Notifier.ANY):
@@ -226,22 +226,23 @@ class Server(object):
functools.partial(self._on_event,
reply_to, task_uuid))
elif isinstance(task.notifier, nt.RestrictedNotifier):
# only proxy the allowable events then...
# Only proxy the allowable events then...
for event_type in task.notifier.events_iter():
task.notifier.register(event_type,
functools.partial(self._on_event,
reply_to, task_uuid))
# perform the task action
# Perform the task action.
try:
result = handler(task, **arguments)
result = handler(task, **work.arguments)
except Exception:
with misc.capture_failure() as failure:
LOG.warn("The '%s' endpoint '%s' execution for request"
" message '%s' failed", endpoint, action,
" message '%s' failed", endpoint, work.action,
ku.DelayedPretty(message), exc_info=True)
reply_callback(result=failure.to_dict())
else:
# And be done with it!
if isinstance(result, ft.Failure):
reply_callback(result=result.to_dict())
else: