1a91aacb85
The oslo.messaging library takes the existing RPC code from oslo and wraps it in a sane API with well defined semantics around which we can make a commitment to retain compatibility in future. The patch is large, but the changes can be summarized as: * oslo.messaging>=1.3.0a4 is required; a proper 1.3.0 release will be pushed before the icehouse release candidates. * The new rpc module has init() and cleanup() methods which manage the global oslo.messaging transport state. The TRANSPORT and NOTIFIER globals are conceptually similar to the current RPCIMPL global, except we're free to create and use alternate Transport objects in e.g. the cells code. * The rpc.get_{client,server,notifier}() methods are just helpers which wrap the global messaging state, specifiy serializers and specify the use of the eventlet executor. * In oslo.messaging, a request context is expected to be a dict so we add a RequestContextSerializer which can serialize to and from dicts using RequestContext.{to,from}_dict() * The allowed_rpc_exception_modules configuration option is replaced by an allowed_remote_exmods get_transport() parameter. This is not something that users ever need to configure, but it is something each project using oslo.messaging needs to be able to customize. * The nova.rpcclient module is removed; it was only a helper class to allow us split a lot of the more tedious changes out of this patch. * Finalizing the port from RpcProxy to RPCClient is straightforward. We put the default topic, version and namespace into a Target and contstruct the client using that. * Porting endpoint classes (like ComputeManager) just involves setting a target attribute on the class. * The @client_exceptions() decorator has been renamed to @expected_exceptions since it's used on the server side to designate exceptions we expect the decorated method to raise. * We maintain a global NOTIFIER object and create specializations of it with specific publisher IDs in order to avoid notification driver loading overhead. * rpc.py contains transport aliases for backwards compatibility purposes. setup.cfg also contains notification driver aliases for backwards compat. * The messaging options are moved about in nova.conf.sample because the options are advertised via a oslo.config.opts entry point and picked up by the generator. * We use messaging.ConfFixture in tests to override oslo.messaging config options, rather than making assumptions about the options registered by the library. The porting of cells code is particularly tricky: * messaging.TransportURL parse() and str() replaces the [un]parse_transport_url() methods. Note the complication that an oslo.messaging transport URL can actually have multiple hosts in order to support message broker clustering. Also the complication of transport aliases in rpc.get_transport_url(). * proxy_rpc_to_manager() is fairly nasty. Right now, we're proxying the on-the-wire message format over this call, but you can't supply such messages to oslo.messaging's cast()/call() methods. Rather than change the inter-cell RPC API to suit oslo.messaging, we instead just unpack the topic, server, method and args from the message on the remote side. cells_api.RPCClientCellsProxy is a mock RPCClient implementation which allows us to wrap up a RPC in the message format currently used for inter-cell RPCs. * Similarly, proxy_rpc_to_manager uses the on-the-wire format for exception serialization, but this format is an implementation detail of oslo.messaging's transport drivers. So, we need to duplicate the exception serialization code in cells.messaging. We may find a way to reconcile this in future - for example a ExceptionSerializer class might work, but with the current format it might be difficult for the deserializer to generically detect a serialized exception. * CellsRPCDriver.start_servers() and InterCellRPCAPI._get_client() need close review, but they're pretty straightforward ports of code to listen on some specialized topics and connect to a remote cell using its transport URL. blueprint: oslo-messaging Change-Id: Ib613e6300f2c215be90f924afbd223a3da053a69
167 lines
6.4 KiB
Python
167 lines
6.4 KiB
Python
# Copyright (c) 2012 Rackspace Hosting
|
|
# All Rights Reserved.
|
|
# Copyright 2013 Red Hat, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""
|
|
Cells RPC Communication Driver
|
|
"""
|
|
from oslo.config import cfg
|
|
from oslo import messaging
|
|
|
|
from nova.cells import driver
|
|
from nova import rpc
|
|
|
|
cell_rpc_driver_opts = [
|
|
cfg.StrOpt('rpc_driver_queue_base',
|
|
default='cells.intercell',
|
|
help="Base queue name to use when communicating between "
|
|
"cells. Various topics by message type will be "
|
|
"appended to this.")]
|
|
|
|
CONF = cfg.CONF
|
|
CONF.register_opts(cell_rpc_driver_opts, group='cells')
|
|
CONF.import_opt('call_timeout', 'nova.cells.opts', group='cells')
|
|
|
|
rpcapi_cap_opt = cfg.StrOpt('intercell',
|
|
help='Set a version cap for messages sent between cells services')
|
|
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
|
|
|
|
|
|
class CellsRPCDriver(driver.BaseCellsDriver):
|
|
"""Driver for cell<->cell communication via RPC. This is used to
|
|
setup the RPC consumers as well as to send a message to another cell.
|
|
|
|
One instance of this class will be created for every neighbor cell
|
|
that we find in the DB and it will be associated with the cell in
|
|
its CellState.
|
|
|
|
One instance is also created by the cells manager for setting up
|
|
the consumers.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(CellsRPCDriver, self).__init__(*args, **kwargs)
|
|
self.rpc_servers = []
|
|
self.intercell_rpcapi = InterCellRPCAPI()
|
|
|
|
def start_servers(self, msg_runner):
|
|
"""Start RPC servers.
|
|
|
|
Start up 2 separate servers for handling inter-cell
|
|
communication via RPC. Both handle the same types of
|
|
messages, but requests/replies are separated to solve
|
|
potential deadlocks. (If we used the same queue for both,
|
|
it's possible to exhaust the RPC thread pool while we wait
|
|
for replies.. such that we'd never consume a reply.)
|
|
"""
|
|
topic_base = CONF.cells.rpc_driver_queue_base
|
|
proxy_manager = InterCellRPCDispatcher(msg_runner)
|
|
for msg_type in msg_runner.get_message_types():
|
|
target = messaging.Target(topic='%s.%s' % (topic_base, msg_type),
|
|
server=CONF.host)
|
|
# NOTE(comstud): We do not need to use the object serializer
|
|
# on this because object serialization is taken care for us in
|
|
# the nova.cells.messaging module.
|
|
server = rpc.get_server(target, endpoints=[proxy_manager])
|
|
server.start()
|
|
self.rpc_servers.append(server)
|
|
|
|
def stop_servers(self):
|
|
"""Stop RPC servers.
|
|
|
|
NOTE: Currently there's no hooks when stopping services
|
|
to have managers cleanup, so this is not currently called.
|
|
"""
|
|
for server in self.rpc_servers:
|
|
server.stop()
|
|
|
|
def send_message_to_cell(self, cell_state, message):
|
|
"""Use the IntercellRPCAPI to send a message to a cell."""
|
|
self.intercell_rpcapi.send_message_to_cell(cell_state, message)
|
|
|
|
|
|
class InterCellRPCAPI(object):
|
|
"""Client side of the Cell<->Cell RPC API.
|
|
|
|
The CellsRPCDriver uses this to make calls to another cell.
|
|
|
|
API version history:
|
|
1.0 - Initial version.
|
|
|
|
... Grizzly supports message version 1.0. So, any changes to existing
|
|
methods in 2.x after that point should be done such that they can
|
|
handle the version_cap being set to 1.0.
|
|
"""
|
|
|
|
VERSION_ALIASES = {
|
|
'grizzly': '1.0',
|
|
}
|
|
|
|
def __init__(self):
|
|
super(InterCellRPCAPI, self).__init__()
|
|
self.version_cap = (
|
|
self.VERSION_ALIASES.get(CONF.upgrade_levels.intercell,
|
|
CONF.upgrade_levels.intercell))
|
|
|
|
def _get_client(self, next_hop, topic):
|
|
"""Turn the DB information for a cell into a messaging.RPCClient."""
|
|
transport_url = next_hop.db_info['transport_url']
|
|
transport = messaging.get_transport(cfg.CONF, transport_url,
|
|
rpc.TRANSPORT_ALIASES)
|
|
target = messaging.Target(topic=topic, version='1.0')
|
|
serializer = rpc.RequestContextSerializer(None)
|
|
return messaging.RPCClient(transport,
|
|
target,
|
|
version_cap=self.version_cap,
|
|
serializer=serializer)
|
|
|
|
def send_message_to_cell(self, cell_state, message):
|
|
"""Send a message to another cell by JSON-ifying the message and
|
|
making an RPC cast to 'process_message'. If the message says to
|
|
fanout, do it. The topic that is used will be
|
|
'CONF.rpc_driver_queue_base.<message_type>'.
|
|
"""
|
|
topic_base = CONF.cells.rpc_driver_queue_base
|
|
topic = '%s.%s' % (topic_base, message.message_type)
|
|
cctxt = self._get_client(cell_state, topic)
|
|
if message.fanout:
|
|
cctxt = cctxt.prepare(fanout=message.fanout)
|
|
return cctxt.cast(message.ctxt, 'process_message',
|
|
message=message.to_json())
|
|
|
|
|
|
class InterCellRPCDispatcher(object):
|
|
"""RPC Dispatcher to handle messages received from other cells.
|
|
|
|
All messages received here have come from a sibling cell. Depending
|
|
on the ultimate target and type of message, we may process the message
|
|
in this cell, relay the message to another sibling cell, or both. This
|
|
logic is defined by the message class in the nova.cells.messaging module.
|
|
"""
|
|
|
|
target = messaging.Target(version='1.0')
|
|
|
|
def __init__(self, msg_runner):
|
|
"""Init the Intercell RPC Dispatcher."""
|
|
self.msg_runner = msg_runner
|
|
|
|
def process_message(self, _ctxt, message):
|
|
"""We received a message from another cell. Use the MessageRunner
|
|
to turn this from JSON back into an instance of the correct
|
|
Message class. Then process it!
|
|
"""
|
|
message = self.msg_runner.message_from_json(message)
|
|
message.process()
|