diff --git a/oslo_messaging/_utils.py b/oslo_messaging/_utils.py index cec94bb48..021fea26c 100644 --- a/oslo_messaging/_utils.py +++ b/oslo_messaging/_utils.py @@ -46,57 +46,6 @@ def version_is_compatible(imp_version, version): return True -class DispatcherExecutorContext(object): - """Dispatcher executor context helper - - A dispatcher can have work to do before and after the dispatch of the - request in the main server thread while the dispatcher itself can be - done in its own thread. - - The executor can use the helper like this: - - callback = dispatcher(incoming) - callback.prepare() - thread = MyWhateverThread() - thread.on_done(callback.done) - thread.run(callback.run) - - """ - def __init__(self, incoming, dispatch, executor_callback=None, - post=None): - self._result = None - self._incoming = incoming - self._dispatch = dispatch - self._post = post - self._executor_callback = executor_callback - - def run(self): - """The incoming message dispath itself - - Can be run in an other thread/greenlet/corotine if the executor is - able to do it. - """ - try: - self._result = self._dispatch(self._incoming, - self._executor_callback) - except Exception: - msg = 'The dispatcher method must catches all exceptions' - LOG.exception(msg) - raise RuntimeError(msg) - - def done(self): - """Callback after the incoming message have been dispathed - - Should be ran in the main executor thread/greenlet/corotine - """ - # FIXME(sileht): this is not currently true, this works only because - # the driver connection used for polling write on the wire only to - # ack/requeue message, but what if one day, the driver do something - # else - if self._post is not None: - self._post(self._incoming, self._result) - - def fetch_current_thread_functor(): # Until https://github.com/eventlet/eventlet/issues/172 is resolved # or addressed we have to use complicated workaround to get a object diff --git a/oslo_messaging/dispatcher.py b/oslo_messaging/dispatcher.py new file mode 100644 index 000000000..5cdd14748 --- /dev/null +++ b/oslo_messaging/dispatcher.py @@ -0,0 +1,105 @@ + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import logging + +import six + +__all__ = [ + "DispatcherBase", + "DispatcherExecutorContext" +] + +LOG = logging.getLogger(__name__) + + +class DispatcherExecutorContext(object): + """Dispatcher executor context helper + + A dispatcher can have work to do before and after the dispatch of the + request in the main server thread while the dispatcher itself can be + done in its own thread. + + The executor can use the helper like this: + + callback = dispatcher(incoming) + callback.prepare() + thread = MyWhateverThread() + thread.on_done(callback.done) + thread.run(callback.run) + + """ + def __init__(self, incoming, dispatch, executor_callback=None, + post=None): + self._result = None + self._incoming = incoming + self._dispatch = dispatch + self._post = post + self._executor_callback = executor_callback + + def run(self): + """The incoming message dispath itself + + Can be run in an other thread/greenlet/corotine if the executor is + able to do it. + """ + try: + self._result = self._dispatch(self._incoming, + self._executor_callback) + except Exception: + msg = 'The dispatcher method must catches all exceptions' + LOG.exception(msg) + raise RuntimeError(msg) + + def done(self): + """Callback after the incoming message have been dispathed + + Should be ran in the main executor thread/greenlet/corotine + """ + # FIXME(sileht): this is not currently true, this works only because + # the driver connection used for polling write on the wire only to + # ack/requeue message, but what if one day, the driver do something + # else + if self._post is not None: + self._post(self._incoming, self._result) + + +@six.add_metaclass(abc.ABCMeta) +class DispatcherBase(object): + "Base class for dispatcher" + + @abc.abstractmethod + def _listen(self, transport): + """Initiate the driver Listener + + Usualy the driver Listener is start with the transport helper methods: + + * transport._listen() + * transport._listen_for_notifications() + + :param transport: the transport object + :type transport: oslo_messaging.transport.Transport + :returns: a driver Listener object + :rtype: oslo_messaging._drivers.base.Listener + """ + + @abc.abstractmethod + def __call__(self, incoming, executor_callback=None): + """Called by the executor to get the DispatcherExecutorContext + + :param incoming: message or list of messages + :type incoming: oslo_messging._drivers.base.IncomingMessage + :returns: DispatcherExecutorContext + :rtype: DispatcherExecutorContext + """ diff --git a/oslo_messaging/notify/dispatcher.py b/oslo_messaging/notify/dispatcher.py index 46d53035e..33aeea8ff 100644 --- a/oslo_messaging/notify/dispatcher.py +++ b/oslo_messaging/notify/dispatcher.py @@ -18,7 +18,7 @@ import itertools import logging import sys -from oslo_messaging import _utils as utils +from oslo_messaging import dispatcher from oslo_messaging import localcontext from oslo_messaging import serializer as msg_serializer @@ -33,7 +33,7 @@ class NotificationResult(object): REQUEUE = 'requeue' -class NotificationDispatcher(object): +class NotificationDispatcher(dispatcher.DispatcherBase): """A message dispatcher which understands Notification messages. A MessageHandlingServer is constructed by passing a callable dispatcher @@ -69,7 +69,7 @@ class NotificationDispatcher(object): pool=self.pool) def __call__(self, incoming, executor_callback=None): - return utils.DispatcherExecutorContext( + return dispatcher.DispatcherExecutorContext( incoming, self._dispatch_and_handle_error, executor_callback=executor_callback, post=self._post_dispatch) diff --git a/oslo_messaging/rpc/dispatcher.py b/oslo_messaging/rpc/dispatcher.py index 6913e7afe..9d640c425 100644 --- a/oslo_messaging/rpc/dispatcher.py +++ b/oslo_messaging/rpc/dispatcher.py @@ -31,6 +31,7 @@ import six from oslo_messaging._i18n import _LE from oslo_messaging import _utils as utils +from oslo_messaging import dispatcher from oslo_messaging import localcontext from oslo_messaging import serializer as msg_serializer from oslo_messaging import server as msg_server @@ -75,7 +76,7 @@ class UnsupportedVersion(RPCDispatcherError): self.method = method -class RPCDispatcher(object): +class RPCDispatcher(dispatcher.DispatcherBase): """A message dispatcher which understands RPC messages. A MessageHandlingServer is constructed by passing a callable dispatcher @@ -131,7 +132,7 @@ class RPCDispatcher(object): def __call__(self, incoming, executor_callback=None): incoming.acknowledge() - return utils.DispatcherExecutorContext( + return dispatcher.DispatcherExecutorContext( incoming, self._dispatch_and_reply, executor_callback=executor_callback) diff --git a/oslo_messaging/tests/executors/test_executor.py b/oslo_messaging/tests/executors/test_executor.py index 1e175fdf8..cd338a4e3 100644 --- a/oslo_messaging/tests/executors/test_executor.py +++ b/oslo_messaging/tests/executors/test_executor.py @@ -44,7 +44,7 @@ try: except ImportError: impl_eventlet = None from oslo_messaging._executors import impl_thread -from oslo_messaging import _utils as utils +from oslo_messaging import dispatcher as dispatcher_base from oslo_messaging.tests import utils as test_utils from six.moves import mock @@ -151,9 +151,8 @@ class TestExecutor(test_utils.BaseTestCase): return result def __call__(self, incoming, executor_callback=None): - return utils.DispatcherExecutorContext(incoming, - self.callback, - executor_callback) + return dispatcher_base.DispatcherExecutorContext( + incoming, self.callback, executor_callback) return Dispatcher(endpoint), endpoint, event, run_executor