Initial framework

This commit is contained in:
Mark McLoughlin 2013-04-30 06:49:56 +01:00
commit 20c3674021
11 changed files with 703 additions and 0 deletions

View File

@ -0,0 +1,14 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,67 @@
Projects will need to do e.g.
-- nova.config:
from nova.openstack.common.messaging import transport
TRANSPORT_DRIVER = None
def parse_args(argv, ...):
transport.set_defaults(control_exchange='nova')
cfg.CONF(...)
TRANSPORT_DRIVER = transport.get_transport(cfg.CONF)
-- nova.scheduler.rpcapi:
from oslo.config import cfg
from nova import config
from nova.openstack.common.messaging.rpc import client
from nova.openstack.common.messaging import target
CONF = cfg.CONF
class SchedulerAPI(client.RPCClient):
def __init__(self):
target = target.Target(topic=CONF.scheduler_topic, version='2.0')
super(SchedulerAPI, self).__init__(config.TRANSPORT_DRIVER, target)
....
def select_hosts(self, ctxt, request_spec, filter_properties):
# FIXME(markmc): ctxt
cctxt = self.prepare(version='2.6')
return ctxt.call('select_hosts',
request_spec=request_spec,
filter_properties=filter_properties)
-- nova.service:
from nova import baserpc
from nova import config
from nova.openstack.common.messaging.rpc import server as rpcserver
from nova.openstack.common.messaging.rpc import target
def start(self):
...
target = target.Target(topic=self.topic, self.host)
base_rpc = baserpc.BaseRPCAPI(self.service_name, backdoor_port)
self.rpcserver = rpcserver.EventletRPCServer(config.TRANSPORT_DRIVER,
target,
[self.manager, base_rpc])
LOG.debug(_("Starting RPC server for %(topic)s on %(host)s") %
dict(topic=self.topic, host=self.host))
self.rpcserver.start()
...
self.rpcserver.stop()
self.rpcserver.wait()

View File

@ -0,0 +1,37 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
NAMESPACE = 'openstack.common.messaging.drivers'
def _driver(module, name):
return '%s.%s.%s' % (NAMESPACE, module, name)
_RABBIT_DRIVER = _driver('rabbit', RabbitDriver)
_QPID_DRIVER = _driver('qpid', QpidDriver)
_ZMQ_DRIVER = _driver('zmq', ZmqDriver)
TRANSPORT_DRIVERS = [
'rabbit = ' + _RABBIT_DRIVER,
'qpid = ' + _QPID_DRIVER,
'zmq = ' + _ZMQ_DRIVER,
# To avoid confusion
'kombu = ' + _RABBIT_DRIVER,
# For backwards compat
'openstack.common.rpc.impl_kombu = ' + _RABBIT_DRIVER,
'openstack.common.rpc.impl_qpid = ' + _QPID_DRIVER,
'openstack.common.rpc.impl_zmq = ' + _ZMQ_DRIVER,
]

View File

@ -0,0 +1,57 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class Listener(object):
__metaclass__ = abc.ABCMeta
def __init__(self, conf, target):
self.conf = conf
self.target = target
@abc.abstractmethod
def poll(self):
# returns (target, message)
# target includes the (exchange, topic, namespace, version) which the
# message was sent to
pass
@abc.abstractmethod
def done(self, message):
# so the transport can ack the message
pass
class BaseDriver(object):
__metaclass__ = abc.ABCMeta
def __init__(self, conf, url=None, default_exchange=None):
self.conf = conf
self._url = url
self._default_exchange = _default_exchange
@abc.abstractmethod
def _send(self, target, message, wait_for_reply=None, timeout=None):
"""Send a message to the given target."""
return None
@abc.abstractmethod
def _listen(self, target):
"""Construct a Listener for the given target."""
return None

View File

@ -0,0 +1,33 @@
TODO:
- contexts!
- notifications!
- @expose decorator
- when shutting down a dispatcher, do we need to invoke
a cleanup method on the listener?
Things I don't like:
- CallContext - we already abuse the term "context" enough
- There's something about using a context manager for prepare() that
I like:
with client.prepare(version='2.6') as cctxt:
cctxt.call('select_host',
request_spec=request_spec,
filter_properties=filter_properties)
but it seems a bit nonsensical
- "endpoints" - better than api_objs, callbacks, proxyobj, etc.
- we probably won't use BlockingRPCDispatcher anywhere, but I think
it does a good job of showing the basic job of a dispatcher
implementation

View File

@ -0,0 +1,14 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,14 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,118 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
from oslo.config import cfg
from openstack.common.gettextutils import _
from openstack.common import local
from openstack.common import log as logging
from openstack.common.messaging import target
_client_opts = [
cfg.IntOpt('rpc_response_timeout',
default=60,
help='Seconds to wait for a response from a call'),
]
_LOG = logging.getLogger(__name__)
class _CallContext(object):
def __init__(self, transport, target, timeout=None, check_for_lock=None):
self.conf = transport.conf
self.transport = transport
self.target = target
self.timeout = timeout
self.check_for_lock = check_for_lock
super(CallContext, self).__init__()
def _make_message(self, method, args):
msg = dict(method=method, args=args)
if self.target.namespace is not None:
msg['namespace'] = self.target.namespace
if self.target.version is not None:
msg['version'] = self.target.version
return msg
def cast(self, method, **kwargs):
msg = self._make_message(method, kwargs)
self.transport._send(target, msg)
def _check_for_lock():
if not self.conf.debug:
return None
if (hasattr(local.strong_store, 'locks_held') and
local.strong_store.locks_held):
stack = ' :: '.join([frame[3] for frame in inspect.stack()])
_LOG.warn(_('An RPC is being made while holding a lock. The locks '
'currently held are %(locks)s. This is probably a bug. '
'Please report it. Include the following: [%(stack)s].'),
{'locks': local.strong_store.locks_held, 'stack': stack})
return True
return False
def call(self, method, **kwargs):
msg = self._make_message(method, kwargs)
timeout = self.timeout
if self.timeout is None:
self.conf.rpc_response_timeout
if self.check_for_lock:
self._check_for_lock()
return self.transport._send(target, msg,
wait_for_reply=True, timeout=timeout)
class RPCClient(object):
def __init__(self, transport, target, timeout=None, check_for_lock=None):
self.conf = transport.conf
self.conf.register_opts(_client_opts)
self.transport = transport
self.target = target
self.timeout = timeout
self.check_for_lock = check_for_lock
super(RPCClient, self).__init__()
def prepare(self, host=None, version=None,
timeout=None, check_for_lock=None):
target = self.target(host=host, version=version)
if timeout is None:
timeout = self.timeout
if check_for_lock is None:
check_for_lock = self.check_for_lock
return _CallContext(self.transport, target, timeout, check_for_lock)
def cast(self, method, **kwargs):
self.prepare().cast(method, **kwargs)
def call(self, method, **kwargs):
return self.prepare().call(method, **kwargs)

View File

@ -0,0 +1,200 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import eventlet
import greenlet
from openstack.common.gettextutils import _
from openstack.common import log as logging
from openstack.common.messaging import _utils as utils
_LOG = logging.getLogger(__name__)
class RPCServerError(Exception):
pass
class NoSuchMethodError(RPCServerError, AttributeError):
def __init__(self, method):
self.method = method
def __str__(self):
return _("Endpoint does not support RPC method %s") % self.method
class UnsupportedVersion(RPCServerError):
def __init__(self, version):
self.version = version
def __str__(self):
return _("Endpoint does not support RPC version %s") % self.version
class RPCDispatcher(object):
__metaclass__ = abc.ABCMeta
def __init__(self, conf, listener, callback):
self.conf = conf
self.listener = listener
self.callback = callback
def _process_one_message(self):
(target, message) = self.listener.poll()
try:
self.callback(target, message)
except Exception:
_LOG.exception(_("Failed to process message... skipping it."))
finally:
self.listener.done(message)
@abc.abstractmethod
def start(self):
pass
@abc.abstractmethod
def stop(self):
pass
@abc.abstractmethod
def wait(self):
pass
class BlockingRPCDispatcher(RPCDispatcher):
def __init__(self, conf, listener, callback):
super(BlockingRPCDispatcher, self).__init__(conf, listener, callback)
self._running = False
def start(self):
self._running = True
while self._running:
self._process_one_message()
def stop(self):
self._running = False
def wait(self):
pass
class EventletRPCDispatcher(RPCDispatcher):
def __init__(self, conf, listener, callback):
super(EventletRPCDispatcher, self).__init__(conf, listener, callback)
self._thread = None
def start(self):
if self._thread is not None:
return
def _dispatcher_thread():
try:
while True:
except greenlet.GreenletExit:
return
self._thread = eventlet.spawn(_dispatcher_thread)
def stop(self):
if self._thread is None:
return
self._thread.kill()
def wait(self):
if self._thread is None:
return
try:
self._thread.wait()
except greenlet.GreenletExit:
pass
self._thread = None
class RPCServer(object):
def __init__(self, transport, target, endpoints, dispatcher_cls):
self.conf = transport.conf
self.transport = transport
self.target = target
self.endpoints = endpoints
self._dispatcher_cls = dispatcher_cls
self._dispatcher = None
super(RPCServer, self).__init__()
@staticmethod
def _is_compatible(endpoint, version):
endpoint_version = endpoint.target.version or '1.0'
return utils.version_is_compatible(endpoint_version, version)
def _dispatch(self, target, message):
method = message.get('method')
args = message.get('args', {})
version = target.version or '1.0'
found_compatible = False
for endpoint in self.endpoints:
if target.namespace != endpoint.target.namespace:
continue
is_compatible = self._is_compatible(endpoint, version)
if is_compatible and hasattr(endpoint, method):
return getattr(endpoint, method)(**kwargs)
found_compatible = found_compatible or is_compatible
if found_compatible:
raise NoSuchMethodError(method)
else:
raise UnsupportedVersion(version)
def start():
if self._dispatcher is not None:
return
listener = self.transport.listen(self.target)
self._dispatcher = self._dispatcher_cls(self.conf, listener,
self._dispatch)
self._dispatcher.start()
def stop(self):
if self._dispatcher is not None:
self._dispatcher.stop()
def wait(self):
if self._dispatcher is not None:
self._dispatcher.wait()
self._dispatcher = None
def get_blocking_server(transport, target, endpoints):
return RPCServer(transport, target, endpoints, BlockingRPCDispatcher)
def get_eventlet_server(transport, target, endpoints):
return RPCServer(transport, target, endpoints, EventletRPCDispatcher)

View File

@ -0,0 +1,67 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class Target(object):
"""Identifies the destination of messages.
A Target encapsulates all the information to identify where a message
should be sent or what messages a server is listening for.
Its attributes are:
:param exchange: A scope for topics. Leave unspecified to default to the
control_exchange configuration option.
:type exchange: str
:param topic: A name which identifies the set of interfaces exposed by a
server. Multiple servers may listen on a topic and messages will be
dispatched to one of the servers in a round-robin fashion.
:type topic: str
:param namespace: Identifies a particular interface (i.e. set of methods)
exposed by a server. The default interface has no namespace identifier
and is referred to as the null namespace.
:type topic: str
:param version: Interfaces have a major.minor version number associated
with them. A minor number increment indicates a backwards compatible
change and an incompatible change is indicated by a major number bump.
Servers may implement multiple major versions and clients may require
indicate that their message requires a particular minimum minor version.
:type topic: str
:param server: Clients can request that a message be directed to a specific
server, rather than just one of a pool of servers listening on the topic.
:type topic: str
:param fanout: Clients may request that a message be directed to all
servers listening on a topic by setting fanout to ``True``, rather than
just one of them.
:type topic: bool
"""
def __init__(self, exchange=None, topic=None, namespace=None,
version=None, server=None, fanout=None):
self.exchange = exchange
self.topic = topic
self.namespace = namespace
self.version = version
self.server = server
self.fanout = fanout
def __call__(self, **kwargs):
kwargs.setdefault('exchange', self.exchange)
kwargs.setdefault('topic', self.topic)
kwargs.setdefault('namespace', self.version)
kwargs.setdefault('server', self.server)
kwargs.setdefault('fanout', self.fanout)
return Target(**kwargs)

View File

@ -0,0 +1,82 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import urlparse
from oslo.config import cfg
from stevedore import driver
from openstack.common.messaging import drivers
_transport_opts = [
cfg.StrOpt('transport_url',
default=None,
help='A URL representing the messaging driver to use and its '
'full configuration. If not set, we fall back to the '
'rpc_backend option and driver specific configuration.'),
cfg.StrOpt('rpc_backend',
default='kombu',
help='The messaging driver to use, defaults to kombu. Other '
'drivers include qpid and zmq.'),
cfg.StrOpt('control_exchange',
default='openstack',
help='The default exchange under which topics are scoped. May '
'be overridden by an exchange name specified in the '
'transport_url option.'),
]
def set_defaults(control_exchange):
cfg.set_defaults(_transport_opts,
control_exchange=control_exchange)
class Transport(object):
def __init__(self, driver):
self.conf = driver.conf
self._driver = driver
def send(self, target, message, wait_for_reply=None, timeout=None):
return self._driver._send(target, message,
wait_for_reply=wait_for_reply,
timeout=timeout)
def listen(self, target):
return self._driver._listen(target)
def get_transport(conf, url=None):
conf.register_opts(_transport_opts)
url = url or conf.transport_url
if url is not None:
rpc_backend = urlparse.urlparse(url).scheme
else:
rpc_backend = conf.rpc_backend
kwargs = dict(default_exchange=conf.control_exchange)
if url is not None:
kwargs['url'] = url
mgr = driver.DriverManager(drivers.NAMESPACE,
rpc_backend,
invoke_on_load=True,
invoke_args=[conf],
invoke_kwds=kwargs)
return Transport(mgr.driver)