Merge "Remove delayed decorator and replace with nicer method"
This commit is contained in:
@@ -16,6 +16,7 @@
|
||||
|
||||
import functools
|
||||
|
||||
from oslo_utils import reflection
|
||||
import six
|
||||
|
||||
from taskflow.engines.worker_based import protocol as pr
|
||||
@@ -23,26 +24,13 @@ from taskflow.engines.worker_based import proxy
|
||||
from taskflow import logging
|
||||
from taskflow.types import failure as ft
|
||||
from taskflow.types import notifier as nt
|
||||
from taskflow.types import timing as tt
|
||||
from taskflow.utils import kombu_utils as ku
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def delayed(executor):
|
||||
"""Wraps & runs the function using a futures compatible executor."""
|
||||
|
||||
def decorator(f):
|
||||
|
||||
@six.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
return executor.submit(f, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
class Server(object):
|
||||
"""Server implementation that waits for incoming tasks requests."""
|
||||
|
||||
@@ -51,14 +39,15 @@ class Server(object):
|
||||
retry_options=None):
|
||||
type_handlers = {
|
||||
pr.NOTIFY: [
|
||||
delayed(executor)(self._process_notify),
|
||||
self._delayed_process(self._process_notify),
|
||||
functools.partial(pr.Notify.validate, response=False),
|
||||
],
|
||||
pr.REQUEST: [
|
||||
delayed(executor)(self._process_request),
|
||||
self._delayed_process(self._process_request),
|
||||
pr.Request.validate,
|
||||
],
|
||||
}
|
||||
self._executor = executor
|
||||
self._proxy = proxy.Proxy(topic, exchange,
|
||||
type_handlers=type_handlers,
|
||||
url=url, transport=transport,
|
||||
@@ -68,6 +57,39 @@ class Server(object):
|
||||
self._endpoints = dict([(endpoint.name, endpoint)
|
||||
for endpoint in endpoints])
|
||||
|
||||
def _delayed_process(self, func):
|
||||
"""Runs the function using the instances executor (eventually).
|
||||
|
||||
This adds a *nice* benefit on showing how long it took for the
|
||||
function to finally be executed from when the message was received
|
||||
to when it was finally ran (which can be a nice thing to know
|
||||
to determine bottle-necks...).
|
||||
"""
|
||||
func_name = reflection.get_callable_name(func)
|
||||
|
||||
def _on_run(watch, content, message):
|
||||
LOG.blather("It took %s seconds to get around to running"
|
||||
" function/method '%s' with"
|
||||
" message '%s'", watch.elapsed(), func_name,
|
||||
ku.DelayedPretty(message))
|
||||
return func(content, message)
|
||||
|
||||
def _on_receive(content, message):
|
||||
LOG.debug("Submitting message '%s' for execution in the"
|
||||
" future to '%s'", ku.DelayedPretty(message), func_name)
|
||||
watch = tt.StopWatch()
|
||||
watch.start()
|
||||
try:
|
||||
self._executor.submit(_on_run, watch, content, message)
|
||||
except RuntimeError:
|
||||
LOG.error("Unable to continue processing message '%s',"
|
||||
" submission to instance executor (with later"
|
||||
" execution by '%s') was unsuccessful",
|
||||
ku.DelayedPretty(message), func_name,
|
||||
exc_info=True)
|
||||
|
||||
return _on_receive
|
||||
|
||||
@property
|
||||
def connection_details(self):
|
||||
return self._proxy.connection_details
|
||||
@@ -144,8 +166,6 @@ class Server(object):
|
||||
|
||||
def _process_notify(self, notify, message):
|
||||
"""Process notify message and reply back."""
|
||||
LOG.debug("Started processing notify message '%s'",
|
||||
ku.DelayedPretty(message))
|
||||
try:
|
||||
reply_to = message.properties['reply_to']
|
||||
except KeyError:
|
||||
@@ -164,8 +184,6 @@ class Server(object):
|
||||
|
||||
def _process_request(self, request, message):
|
||||
"""Process request message and reply back."""
|
||||
LOG.debug("Started processing request message '%s'",
|
||||
ku.DelayedPretty(message))
|
||||
try:
|
||||
# NOTE(skudriashev): parse broker message first to get
|
||||
# the `reply_to` and the `task_uuid` parameters to have
|
||||
|
||||
Reference in New Issue
Block a user