From 22fea728ccc217b18ae1ae2172d1c80d41e62c28 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 18 Feb 2016 12:50:44 +0100 Subject: [PATCH] Remove executor callback Revert the change I556b112371bec2ec29cea4dc254bb3f9c6d2c07a: the executor callback API was only used by the aioeventlet executor which was just removed. Change-Id: I1223f32594d8c1be28cc43fdd9bf102c86d75325 --- oslo_messaging/_executors/base.py | 3 --- .../_executors/impl_pooledexecutor.py | 2 +- oslo_messaging/dispatcher.py | 9 +++----- oslo_messaging/notify/dispatcher.py | 23 ++++++++----------- oslo_messaging/rpc/dispatcher.py | 22 +++++++----------- .../tests/executors/test_executor.py | 15 ++++-------- 6 files changed, 26 insertions(+), 48 deletions(-) diff --git a/oslo_messaging/_executors/base.py b/oslo_messaging/_executors/base.py index 7749c0087..dc4719e6f 100644 --- a/oslo_messaging/_executors/base.py +++ b/oslo_messaging/_executors/base.py @@ -20,9 +20,6 @@ import six @six.add_metaclass(abc.ABCMeta) class ExecutorBase(object): - # Executor can override how we run the application callback - _executor_callback = None - def __init__(self, conf, listener, dispatcher): self.conf = conf self.listener = listener diff --git a/oslo_messaging/_executors/impl_pooledexecutor.py b/oslo_messaging/_executors/impl_pooledexecutor.py index 0934bb4e8..d3f73a104 100644 --- a/oslo_messaging/_executors/impl_pooledexecutor.py +++ b/oslo_messaging/_executors/impl_pooledexecutor.py @@ -99,7 +99,7 @@ class PooledExecutor(base.ExecutorBase): if not incoming: continue - callback = self.dispatcher(incoming, self._executor_callback) + callback = self.dispatcher(incoming) was_submitted = self._do_submit(callback) if not was_submitted: break diff --git a/oslo_messaging/dispatcher.py b/oslo_messaging/dispatcher.py index 9d4fdf831..9324715de 100644 --- a/oslo_messaging/dispatcher.py +++ b/oslo_messaging/dispatcher.py @@ -43,13 +43,11 @@ class DispatcherExecutorContext(object): thread.run(callback.run) """ - def __init__(self, incoming, dispatch, executor_callback=None, - post=None): + def __init__(self, incoming, dispatch, post=None): self._result = None self._incoming = incoming self._dispatch = dispatch self._post = post - self._executor_callback = executor_callback def run(self): """The incoming message dispath itself @@ -58,8 +56,7 @@ class DispatcherExecutorContext(object): able to do it. """ try: - self._result = self._dispatch(self._incoming, - self._executor_callback) + self._result = self._dispatch(self._incoming) except Exception: msg = _('The dispatcher method must catches all exceptions') LOG.exception(msg) @@ -104,7 +101,7 @@ class DispatcherBase(object): """ @abc.abstractmethod - def __call__(self, incoming, executor_callback=None): + def __call__(self, incoming): """Called by the executor to get the DispatcherExecutorContext :param incoming: list of messages diff --git a/oslo_messaging/notify/dispatcher.py b/oslo_messaging/notify/dispatcher.py index 5677cad67..926865162 100644 --- a/oslo_messaging/notify/dispatcher.py +++ b/oslo_messaging/notify/dispatcher.py @@ -61,10 +61,9 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase): return transport._listen_for_notifications(self._targets_priorities, pool=self.pool) - def __call__(self, incoming, executor_callback=None): + def __call__(self, incoming): return dispatcher.DispatcherExecutorContext( incoming, self._dispatch_and_handle_error, - executor_callback=executor_callback, post=self._post_dispatch) def _post_dispatch(self, incoming, requeues): @@ -77,18 +76,18 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase): except Exception: LOG.error(_LE("Fail to ack/requeue message"), exc_info=True) - def _dispatch_and_handle_error(self, incoming, executor_callback): + def _dispatch_and_handle_error(self, incoming): """Dispatch a notification message to the appropriate endpoint method. :param incoming: the incoming notification message :type ctxt: IncomingMessage """ try: - return self._dispatch(incoming, executor_callback) + return self._dispatch(incoming) except Exception: LOG.error(_LE('Exception during message handling'), exc_info=True) - def _dispatch(self, incoming, executor_callback=None): + def _dispatch(self, incoming): """Dispatch notification messages to the appropriate endpoint method. """ @@ -120,18 +119,14 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase): if not filtered_messages: continue - ret = self._exec_callback(executor_callback, callback, - filtered_messages) + ret = self._exec_callback(callback, filtered_messages) if self.allow_requeue and ret == NotificationResult.REQUEUE: requeues.update(raw_messages) break return requeues - def _exec_callback(self, executor_callback, callback, *args): - if executor_callback: - ret = executor_callback(callback, *args) - else: - ret = callback(*args) + def _exec_callback(self, callback, *args): + ret = callback(*args) return NotificationResult.HANDLED if ret is None else ret def _extract_user_message(self, incoming): @@ -161,12 +156,12 @@ class NotificationDispatcher(_NotificationDispatcherBase): which is invoked with context and message dictionaries each time a message is received. """ - def _exec_callback(self, executor_callback, callback, messages): + def _exec_callback(self, callback, messages): localcontext._set_local_context( messages[0]["ctxt"]) try: return super(NotificationDispatcher, self)._exec_callback( - executor_callback, callback, + callback, messages[0]["ctxt"], messages[0]["publisher_id"], messages[0]["event_type"], diff --git a/oslo_messaging/rpc/dispatcher.py b/oslo_messaging/rpc/dispatcher.py index 5ff0610ad..b3a299517 100644 --- a/oslo_messaging/rpc/dispatcher.py +++ b/oslo_messaging/rpc/dispatcher.py @@ -118,29 +118,24 @@ class RPCDispatcher(dispatcher.DispatcherBase): endpoint_version = target.version or '1.0' return utils.version_is_compatible(endpoint_version, version) - def _do_dispatch(self, endpoint, method, ctxt, args, executor_callback): + def _do_dispatch(self, endpoint, method, ctxt, args): ctxt = self.serializer.deserialize_context(ctxt) new_args = dict() for argname, arg in six.iteritems(args): new_args[argname] = self.serializer.deserialize_entity(ctxt, arg) func = getattr(endpoint, method) - if executor_callback: - result = executor_callback(func, ctxt, **new_args) - else: - result = func(ctxt, **new_args) + result = func(ctxt, **new_args) return self.serializer.serialize_entity(ctxt, result) - def __call__(self, incoming, executor_callback=None): + def __call__(self, incoming): incoming[0].acknowledge() return dispatcher.DispatcherExecutorContext( - incoming[0], self._dispatch_and_reply, - executor_callback=executor_callback) + incoming[0], self._dispatch_and_reply) - def _dispatch_and_reply(self, incoming, executor_callback): + def _dispatch_and_reply(self, incoming): try: incoming.reply(self._dispatch(incoming.ctxt, - incoming.message, - executor_callback)) + incoming.message)) except ExpectedException as e: LOG.debug(u'Expected exception during message handling (%s)', e.exc_info[1]) @@ -158,7 +153,7 @@ class RPCDispatcher(dispatcher.DispatcherBase): # exc_info. del exc_info - def _dispatch(self, ctxt, message, executor_callback=None): + def _dispatch(self, ctxt, message): """Dispatch an RPC message to the appropriate endpoint method. :param ctxt: the request context @@ -185,8 +180,7 @@ class RPCDispatcher(dispatcher.DispatcherBase): if hasattr(endpoint, method): localcontext._set_local_context(ctxt) try: - return self._do_dispatch(endpoint, method, ctxt, args, - executor_callback) + return self._do_dispatch(endpoint, method, ctxt, args) finally: localcontext._clear_local_context() diff --git a/oslo_messaging/tests/executors/test_executor.py b/oslo_messaging/tests/executors/test_executor.py index d6f7f2c4e..bba37b747 100644 --- a/oslo_messaging/tests/executors/test_executor.py +++ b/oslo_messaging/tests/executors/test_executor.py @@ -82,20 +82,15 @@ class TestExecutor(test_utils.BaseTestCase): def _listen(self, transport): pass - def callback(self, incoming, executor_callback): - if executor_callback is None: - result = self.endpoint(incoming.ctxt, - incoming.message) - else: - result = executor_callback(self.endpoint, - incoming.ctxt, - incoming.message) + def callback(self, incoming): + result = self.endpoint(incoming.ctxt, + incoming.message) self.result = result return result - def __call__(self, incoming, executor_callback=None): + def __call__(self, incoming): return dispatcher_base.DispatcherExecutorContext( - incoming[0], self.callback, executor_callback) + incoming[0], self.callback) return Dispatcher(endpoint), endpoint, event, run_executor