Merge "creates a dispatcher abstraction"

This commit is contained in:
Jenkins 2015-12-10 11:15:48 +00:00 committed by Gerrit Code Review
commit 4b6144a3db
5 changed files with 114 additions and 60 deletions

View File

@ -46,57 +46,6 @@ def version_is_compatible(imp_version, version):
return True
class DispatcherExecutorContext(object):
"""Dispatcher executor context helper
A dispatcher can have work to do before and after the dispatch of the
request in the main server thread while the dispatcher itself can be
done in its own thread.
The executor can use the helper like this:
callback = dispatcher(incoming)
callback.prepare()
thread = MyWhateverThread()
thread.on_done(callback.done)
thread.run(callback.run)
"""
def __init__(self, incoming, dispatch, executor_callback=None,
post=None):
self._result = None
self._incoming = incoming
self._dispatch = dispatch
self._post = post
self._executor_callback = executor_callback
def run(self):
"""The incoming message dispath itself
Can be run in an other thread/greenlet/corotine if the executor is
able to do it.
"""
try:
self._result = self._dispatch(self._incoming,
self._executor_callback)
except Exception:
msg = 'The dispatcher method must catches all exceptions'
LOG.exception(msg)
raise RuntimeError(msg)
def done(self):
"""Callback after the incoming message have been dispathed
Should be ran in the main executor thread/greenlet/corotine
"""
# FIXME(sileht): this is not currently true, this works only because
# the driver connection used for polling write on the wire only to
# ack/requeue message, but what if one day, the driver do something
# else
if self._post is not None:
self._post(self._incoming, self._result)
def fetch_current_thread_functor():
# Until https://github.com/eventlet/eventlet/issues/172 is resolved
# or addressed we have to use complicated workaround to get a object

View File

@ -0,0 +1,105 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
__all__ = [
"DispatcherBase",
"DispatcherExecutorContext"
]
LOG = logging.getLogger(__name__)
class DispatcherExecutorContext(object):
"""Dispatcher executor context helper
A dispatcher can have work to do before and after the dispatch of the
request in the main server thread while the dispatcher itself can be
done in its own thread.
The executor can use the helper like this:
callback = dispatcher(incoming)
callback.prepare()
thread = MyWhateverThread()
thread.on_done(callback.done)
thread.run(callback.run)
"""
def __init__(self, incoming, dispatch, executor_callback=None,
post=None):
self._result = None
self._incoming = incoming
self._dispatch = dispatch
self._post = post
self._executor_callback = executor_callback
def run(self):
"""The incoming message dispath itself
Can be run in an other thread/greenlet/corotine if the executor is
able to do it.
"""
try:
self._result = self._dispatch(self._incoming,
self._executor_callback)
except Exception:
msg = 'The dispatcher method must catches all exceptions'
LOG.exception(msg)
raise RuntimeError(msg)
def done(self):
"""Callback after the incoming message have been dispathed
Should be ran in the main executor thread/greenlet/corotine
"""
# FIXME(sileht): this is not currently true, this works only because
# the driver connection used for polling write on the wire only to
# ack/requeue message, but what if one day, the driver do something
# else
if self._post is not None:
self._post(self._incoming, self._result)
@six.add_metaclass(abc.ABCMeta)
class DispatcherBase(object):
"Base class for dispatcher"
@abc.abstractmethod
def _listen(self, transport):
"""Initiate the driver Listener
Usualy the driver Listener is start with the transport helper methods:
* transport._listen()
* transport._listen_for_notifications()
:param transport: the transport object
:type transport: oslo_messaging.transport.Transport
:returns: a driver Listener object
:rtype: oslo_messaging._drivers.base.Listener
"""
@abc.abstractmethod
def __call__(self, incoming, executor_callback=None):
"""Called by the executor to get the DispatcherExecutorContext
:param incoming: message or list of messages
:type incoming: oslo_messging._drivers.base.IncomingMessage
:returns: DispatcherExecutorContext
:rtype: DispatcherExecutorContext
"""

View File

@ -18,7 +18,7 @@ import itertools
import logging
import sys
from oslo_messaging import _utils as utils
from oslo_messaging import dispatcher
from oslo_messaging import localcontext
from oslo_messaging import serializer as msg_serializer
@ -33,7 +33,7 @@ class NotificationResult(object):
REQUEUE = 'requeue'
class NotificationDispatcher(object):
class NotificationDispatcher(dispatcher.DispatcherBase):
"""A message dispatcher which understands Notification messages.
A MessageHandlingServer is constructed by passing a callable dispatcher
@ -69,7 +69,7 @@ class NotificationDispatcher(object):
pool=self.pool)
def __call__(self, incoming, executor_callback=None):
return utils.DispatcherExecutorContext(
return dispatcher.DispatcherExecutorContext(
incoming, self._dispatch_and_handle_error,
executor_callback=executor_callback,
post=self._post_dispatch)

View File

@ -31,6 +31,7 @@ import six
from oslo_messaging._i18n import _LE
from oslo_messaging import _utils as utils
from oslo_messaging import dispatcher
from oslo_messaging import localcontext
from oslo_messaging import serializer as msg_serializer
from oslo_messaging import server as msg_server
@ -75,7 +76,7 @@ class UnsupportedVersion(RPCDispatcherError):
self.method = method
class RPCDispatcher(object):
class RPCDispatcher(dispatcher.DispatcherBase):
"""A message dispatcher which understands RPC messages.
A MessageHandlingServer is constructed by passing a callable dispatcher
@ -131,7 +132,7 @@ class RPCDispatcher(object):
def __call__(self, incoming, executor_callback=None):
incoming.acknowledge()
return utils.DispatcherExecutorContext(
return dispatcher.DispatcherExecutorContext(
incoming, self._dispatch_and_reply,
executor_callback=executor_callback)

View File

@ -44,7 +44,7 @@ try:
except ImportError:
impl_eventlet = None
from oslo_messaging._executors import impl_thread
from oslo_messaging import _utils as utils
from oslo_messaging import dispatcher as dispatcher_base
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
@ -151,9 +151,8 @@ class TestExecutor(test_utils.BaseTestCase):
return result
def __call__(self, incoming, executor_callback=None):
return utils.DispatcherExecutorContext(incoming,
self.callback,
executor_callback)
return dispatcher_base.DispatcherExecutorContext(
incoming, self.callback, executor_callback)
return Dispatcher(endpoint), endpoint, event, run_executor