From 20c3674021680ed85173db1f7b62acde35ffd231 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Tue, 30 Apr 2013 06:49:56 +0100 Subject: [PATCH] Initial framework --- openstack/common/messaging/__init__.py | 14 ++ openstack/common/messaging/changes.txt | 67 ++++++ .../common/messaging/drivers/__init__.py | 37 ++++ openstack/common/messaging/drivers/base.py | 57 +++++ openstack/common/messaging/notes.txt | 33 +++ openstack/common/messaging/notify/__init__.py | 14 ++ openstack/common/messaging/rpc/__init__.py | 14 ++ openstack/common/messaging/rpc/client.py | 118 +++++++++++ openstack/common/messaging/rpc/server.py | 200 ++++++++++++++++++ openstack/common/messaging/target.py | 67 ++++++ openstack/common/messaging/transport.py | 82 +++++++ 11 files changed, 703 insertions(+) create mode 100644 openstack/common/messaging/__init__.py create mode 100644 openstack/common/messaging/changes.txt create mode 100644 openstack/common/messaging/drivers/__init__.py create mode 100644 openstack/common/messaging/drivers/base.py create mode 100644 openstack/common/messaging/notes.txt create mode 100644 openstack/common/messaging/notify/__init__.py create mode 100644 openstack/common/messaging/rpc/__init__.py create mode 100644 openstack/common/messaging/rpc/client.py create mode 100644 openstack/common/messaging/rpc/server.py create mode 100644 openstack/common/messaging/target.py create mode 100644 openstack/common/messaging/transport.py diff --git a/openstack/common/messaging/__init__.py b/openstack/common/messaging/__init__.py new file mode 100644 index 000000000..b1541fdb7 --- /dev/null +++ b/openstack/common/messaging/__init__.py @@ -0,0 +1,14 @@ + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/openstack/common/messaging/changes.txt b/openstack/common/messaging/changes.txt new file mode 100644 index 000000000..9c521bf2b --- /dev/null +++ b/openstack/common/messaging/changes.txt @@ -0,0 +1,67 @@ + + +Projects will need to do e.g. + + -- nova.config: + + from nova.openstack.common.messaging import transport + + TRANSPORT_DRIVER = None + + + def parse_args(argv, ...): + transport.set_defaults(control_exchange='nova') + cfg.CONF(...) + TRANSPORT_DRIVER = transport.get_transport(cfg.CONF) + + -- nova.scheduler.rpcapi: + + from oslo.config import cfg + + from nova import config + from nova.openstack.common.messaging.rpc import client + from nova.openstack.common.messaging import target + + CONF = cfg.CONF + + + class SchedulerAPI(client.RPCClient): + + def __init__(self): + target = target.Target(topic=CONF.scheduler_topic, version='2.0') + super(SchedulerAPI, self).__init__(config.TRANSPORT_DRIVER, target) + + .... + + def select_hosts(self, ctxt, request_spec, filter_properties): + # FIXME(markmc): ctxt + cctxt = self.prepare(version='2.6') + return ctxt.call('select_hosts', + request_spec=request_spec, + filter_properties=filter_properties) + + -- nova.service: + + from nova import baserpc + from nova import config + from nova.openstack.common.messaging.rpc import server as rpcserver + from nova.openstack.common.messaging.rpc import target + + def start(self): + ... + target = target.Target(topic=self.topic, self.host) + + base_rpc = baserpc.BaseRPCAPI(self.service_name, backdoor_port) + + self.rpcserver = rpcserver.EventletRPCServer(config.TRANSPORT_DRIVER, + target, + [self.manager, base_rpc]) + + LOG.debug(_("Starting RPC server for %(topic)s on %(host)s") % + dict(topic=self.topic, host=self.host)) + + self.rpcserver.start() + + ... + self.rpcserver.stop() + self.rpcserver.wait() diff --git a/openstack/common/messaging/drivers/__init__.py b/openstack/common/messaging/drivers/__init__.py new file mode 100644 index 000000000..99e1d9009 --- /dev/null +++ b/openstack/common/messaging/drivers/__init__.py @@ -0,0 +1,37 @@ + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +NAMESPACE = 'openstack.common.messaging.drivers' + +def _driver(module, name): + return '%s.%s.%s' % (NAMESPACE, module, name) + +_RABBIT_DRIVER = _driver('rabbit', RabbitDriver) +_QPID_DRIVER = _driver('qpid', QpidDriver) +_ZMQ_DRIVER = _driver('zmq', ZmqDriver) + +TRANSPORT_DRIVERS = [ + 'rabbit = ' + _RABBIT_DRIVER, + 'qpid = ' + _QPID_DRIVER, + 'zmq = ' + _ZMQ_DRIVER, + + # To avoid confusion + 'kombu = ' + _RABBIT_DRIVER, + + # For backwards compat + 'openstack.common.rpc.impl_kombu = ' + _RABBIT_DRIVER, + 'openstack.common.rpc.impl_qpid = ' + _QPID_DRIVER, + 'openstack.common.rpc.impl_zmq = ' + _ZMQ_DRIVER, + ] diff --git a/openstack/common/messaging/drivers/base.py b/openstack/common/messaging/drivers/base.py new file mode 100644 index 000000000..a6148cdda --- /dev/null +++ b/openstack/common/messaging/drivers/base.py @@ -0,0 +1,57 @@ + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + + +class Listener(object): + + __metaclass__ = abc.ABCMeta + + def __init__(self, conf, target): + self.conf = conf + self.target = target + + @abc.abstractmethod + def poll(self): + # returns (target, message) + # target includes the (exchange, topic, namespace, version) which the + # message was sent to + pass + + @abc.abstractmethod + def done(self, message): + # so the transport can ack the message + pass + + +class BaseDriver(object): + + __metaclass__ = abc.ABCMeta + + def __init__(self, conf, url=None, default_exchange=None): + self.conf = conf + self._url = url + self._default_exchange = _default_exchange + + @abc.abstractmethod + def _send(self, target, message, wait_for_reply=None, timeout=None): + """Send a message to the given target.""" + return None + + @abc.abstractmethod + def _listen(self, target): + """Construct a Listener for the given target.""" + return None diff --git a/openstack/common/messaging/notes.txt b/openstack/common/messaging/notes.txt new file mode 100644 index 000000000..53b81e4df --- /dev/null +++ b/openstack/common/messaging/notes.txt @@ -0,0 +1,33 @@ + +TODO: + + - contexts! + + - notifications! + + - @expose decorator + + - when shutting down a dispatcher, do we need to invoke + a cleanup method on the listener? + +Things I don't like: + + - CallContext - we already abuse the term "context" enough + + - There's something about using a context manager for prepare() that + I like: + + with client.prepare(version='2.6') as cctxt: + cctxt.call('select_host', + request_spec=request_spec, + filter_properties=filter_properties) + + but it seems a bit nonsensical + + - "endpoints" - better than api_objs, callbacks, proxyobj, etc. + + - we probably won't use BlockingRPCDispatcher anywhere, but I think + it does a good job of showing the basic job of a dispatcher + implementation + + diff --git a/openstack/common/messaging/notify/__init__.py b/openstack/common/messaging/notify/__init__.py new file mode 100644 index 000000000..b1541fdb7 --- /dev/null +++ b/openstack/common/messaging/notify/__init__.py @@ -0,0 +1,14 @@ + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/openstack/common/messaging/rpc/__init__.py b/openstack/common/messaging/rpc/__init__.py new file mode 100644 index 000000000..b1541fdb7 --- /dev/null +++ b/openstack/common/messaging/rpc/__init__.py @@ -0,0 +1,14 @@ + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/openstack/common/messaging/rpc/client.py b/openstack/common/messaging/rpc/client.py new file mode 100644 index 000000000..e095a8333 --- /dev/null +++ b/openstack/common/messaging/rpc/client.py @@ -0,0 +1,118 @@ + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import inspect + +from oslo.config import cfg + +from openstack.common.gettextutils import _ +from openstack.common import local +from openstack.common import log as logging +from openstack.common.messaging import target + +_client_opts = [ + cfg.IntOpt('rpc_response_timeout', + default=60, + help='Seconds to wait for a response from a call'), +] + +_LOG = logging.getLogger(__name__) + + +class _CallContext(object): + + def __init__(self, transport, target, timeout=None, check_for_lock=None): + self.conf = transport.conf + + self.transport = transport + self.target = target + self.timeout = timeout + self.check_for_lock = check_for_lock + + super(CallContext, self).__init__() + + def _make_message(self, method, args): + msg = dict(method=method, args=args) + if self.target.namespace is not None: + msg['namespace'] = self.target.namespace + if self.target.version is not None: + msg['version'] = self.target.version + return msg + + def cast(self, method, **kwargs): + msg = self._make_message(method, kwargs) + self.transport._send(target, msg) + + def _check_for_lock(): + if not self.conf.debug: + return None + + if (hasattr(local.strong_store, 'locks_held') and + local.strong_store.locks_held): + stack = ' :: '.join([frame[3] for frame in inspect.stack()]) + _LOG.warn(_('An RPC is being made while holding a lock. The locks ' + 'currently held are %(locks)s. This is probably a bug. ' + 'Please report it. Include the following: [%(stack)s].'), + {'locks': local.strong_store.locks_held, 'stack': stack}) + return True + + return False + + def call(self, method, **kwargs): + msg = self._make_message(method, kwargs) + + timeout = self.timeout + if self.timeout is None: + self.conf.rpc_response_timeout + + if self.check_for_lock: + self._check_for_lock() + + return self.transport._send(target, msg, + wait_for_reply=True, timeout=timeout) + + +class RPCClient(object): + + def __init__(self, transport, target, timeout=None, check_for_lock=None): + self.conf = transport.conf + self.conf.register_opts(_client_opts) + + self.transport = transport + self.target = target + self.timeout = timeout + self.check_for_lock = check_for_lock + + super(RPCClient, self).__init__() + + def prepare(self, host=None, version=None, + timeout=None, check_for_lock=None): + target = self.target(host=host, version=version) + + if timeout is None: + timeout = self.timeout + if check_for_lock is None: + check_for_lock = self.check_for_lock + + return _CallContext(self.transport, target, timeout, check_for_lock) + + def cast(self, method, **kwargs): + self.prepare().cast(method, **kwargs) + + def call(self, method, **kwargs): + return self.prepare().call(method, **kwargs) diff --git a/openstack/common/messaging/rpc/server.py b/openstack/common/messaging/rpc/server.py new file mode 100644 index 000000000..c48622fbe --- /dev/null +++ b/openstack/common/messaging/rpc/server.py @@ -0,0 +1,200 @@ + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +import eventlet +import greenlet + +from openstack.common.gettextutils import _ +from openstack.common import log as logging +from openstack.common.messaging import _utils as utils + +_LOG = logging.getLogger(__name__) + + +class RPCServerError(Exception): + pass + + +class NoSuchMethodError(RPCServerError, AttributeError): + + def __init__(self, method): + self.method = method + + def __str__(self): + return _("Endpoint does not support RPC method %s") % self.method + + +class UnsupportedVersion(RPCServerError): + + def __init__(self, version): + self.version = version + + def __str__(self): + return _("Endpoint does not support RPC version %s") % self.version + + +class RPCDispatcher(object): + + __metaclass__ = abc.ABCMeta + + def __init__(self, conf, listener, callback): + self.conf = conf + self.listener = listener + self.callback = callback + + def _process_one_message(self): + (target, message) = self.listener.poll() + try: + self.callback(target, message) + except Exception: + _LOG.exception(_("Failed to process message... skipping it.")) + finally: + self.listener.done(message) + + @abc.abstractmethod + def start(self): + pass + + @abc.abstractmethod + def stop(self): + pass + + @abc.abstractmethod + def wait(self): + pass + + +class BlockingRPCDispatcher(RPCDispatcher): + + def __init__(self, conf, listener, callback): + super(BlockingRPCDispatcher, self).__init__(conf, listener, callback) + self._running = False + + def start(self): + self._running = True + while self._running: + self._process_one_message() + + def stop(self): + self._running = False + + def wait(self): + pass + + +class EventletRPCDispatcher(RPCDispatcher): + + def __init__(self, conf, listener, callback): + super(EventletRPCDispatcher, self).__init__(conf, listener, callback) + self._thread = None + + def start(self): + if self._thread is not None: + return + + def _dispatcher_thread(): + try: + while True: + except greenlet.GreenletExit: + return + + self._thread = eventlet.spawn(_dispatcher_thread) + + def stop(self): + if self._thread is None: + return + self._thread.kill() + + def wait(self): + if self._thread is None: + return + try: + self._thread.wait() + except greenlet.GreenletExit: + pass + self._thread = None + + +class RPCServer(object): + + def __init__(self, transport, target, endpoints, dispatcher_cls): + self.conf = transport.conf + + self.transport = transport + self.target = target + self.endpoints = endpoints + + self._dispatcher_cls = dispatcher_cls + self._dispatcher = None + + super(RPCServer, self).__init__() + + @staticmethod + def _is_compatible(endpoint, version): + endpoint_version = endpoint.target.version or '1.0' + return utils.version_is_compatible(endpoint_version, version) + + def _dispatch(self, target, message): + method = message.get('method') + args = message.get('args', {}) + + version = target.version or '1.0' + + found_compatible = False + for endpoint in self.endpoints: + if target.namespace != endpoint.target.namespace: + continue + + is_compatible = self._is_compatible(endpoint, version) + + if is_compatible and hasattr(endpoint, method): + return getattr(endpoint, method)(**kwargs) + + found_compatible = found_compatible or is_compatible + + if found_compatible: + raise NoSuchMethodError(method) + else: + raise UnsupportedVersion(version) + + def start(): + if self._dispatcher is not None: + return + listener = self.transport.listen(self.target) + self._dispatcher = self._dispatcher_cls(self.conf, listener, + self._dispatch) + self._dispatcher.start() + + def stop(self): + if self._dispatcher is not None: + self._dispatcher.stop() + + def wait(self): + if self._dispatcher is not None: + self._dispatcher.wait() + self._dispatcher = None + + +def get_blocking_server(transport, target, endpoints): + return RPCServer(transport, target, endpoints, BlockingRPCDispatcher) + + +def get_eventlet_server(transport, target, endpoints): + return RPCServer(transport, target, endpoints, EventletRPCDispatcher) diff --git a/openstack/common/messaging/target.py b/openstack/common/messaging/target.py new file mode 100644 index 000000000..1b401c4bc --- /dev/null +++ b/openstack/common/messaging/target.py @@ -0,0 +1,67 @@ + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class Target(object): + + """Identifies the destination of messages. + + A Target encapsulates all the information to identify where a message + should be sent or what messages a server is listening for. + + Its attributes are: + + :param exchange: A scope for topics. Leave unspecified to default to the + control_exchange configuration option. + :type exchange: str + :param topic: A name which identifies the set of interfaces exposed by a + server. Multiple servers may listen on a topic and messages will be + dispatched to one of the servers in a round-robin fashion. + :type topic: str + :param namespace: Identifies a particular interface (i.e. set of methods) + exposed by a server. The default interface has no namespace identifier + and is referred to as the null namespace. + :type topic: str + :param version: Interfaces have a major.minor version number associated + with them. A minor number increment indicates a backwards compatible + change and an incompatible change is indicated by a major number bump. + Servers may implement multiple major versions and clients may require + indicate that their message requires a particular minimum minor version. + :type topic: str + :param server: Clients can request that a message be directed to a specific + server, rather than just one of a pool of servers listening on the topic. + :type topic: str + :param fanout: Clients may request that a message be directed to all + servers listening on a topic by setting fanout to ``True``, rather than + just one of them. + :type topic: bool + """ + + def __init__(self, exchange=None, topic=None, namespace=None, + version=None, server=None, fanout=None): + self.exchange = exchange + self.topic = topic + self.namespace = namespace + self.version = version + self.server = server + self.fanout = fanout + + def __call__(self, **kwargs): + kwargs.setdefault('exchange', self.exchange) + kwargs.setdefault('topic', self.topic) + kwargs.setdefault('namespace', self.version) + kwargs.setdefault('server', self.server) + kwargs.setdefault('fanout', self.fanout) + return Target(**kwargs) diff --git a/openstack/common/messaging/transport.py b/openstack/common/messaging/transport.py new file mode 100644 index 000000000..15ec86297 --- /dev/null +++ b/openstack/common/messaging/transport.py @@ -0,0 +1,82 @@ + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import urlparse + +from oslo.config import cfg +from stevedore import driver + +from openstack.common.messaging import drivers + +_transport_opts = [ + cfg.StrOpt('transport_url', + default=None, + help='A URL representing the messaging driver to use and its ' + 'full configuration. If not set, we fall back to the ' + 'rpc_backend option and driver specific configuration.'), + cfg.StrOpt('rpc_backend', + default='kombu', + help='The messaging driver to use, defaults to kombu. Other ' + 'drivers include qpid and zmq.'), + cfg.StrOpt('control_exchange', + default='openstack', + help='The default exchange under which topics are scoped. May ' + 'be overridden by an exchange name specified in the ' + 'transport_url option.'), +] + + +def set_defaults(control_exchange): + cfg.set_defaults(_transport_opts, + control_exchange=control_exchange) + + +class Transport(object): + + def __init__(self, driver): + self.conf = driver.conf + self._driver = driver + + def send(self, target, message, wait_for_reply=None, timeout=None): + return self._driver._send(target, message, + wait_for_reply=wait_for_reply, + timeout=timeout) + + def listen(self, target): + return self._driver._listen(target) + + +def get_transport(conf, url=None): + conf.register_opts(_transport_opts) + + url = url or conf.transport_url + if url is not None: + rpc_backend = urlparse.urlparse(url).scheme + else: + rpc_backend = conf.rpc_backend + + kwargs = dict(default_exchange=conf.control_exchange) + if url is not None: + kwargs['url'] = url + + mgr = driver.DriverManager(drivers.NAMESPACE, + rpc_backend, + invoke_on_load=True, + invoke_args=[conf], + invoke_kwds=kwargs) + return Transport(mgr.driver)