diff --git a/nova/manager.py b/nova/manager.py index 7a0b2888e..cbe67e008 100644 --- a/nova/manager.py +++ b/nova/manager.py @@ -56,6 +56,7 @@ This module provides Manager, a base class for managers. from nova.db import base from nova import flags from nova import log as logging +from nova.rpc import dispatcher as rpc_dispatcher from nova.scheduler import api from nova import version @@ -130,12 +131,23 @@ class ManagerMeta(type): class Manager(base.Base): __metaclass__ = ManagerMeta + # Set RPC API version to 1.0 by default. + RPC_API_VERSION = '1.0' + def __init__(self, host=None, db_driver=None): if not host: host = FLAGS.host self.host = host super(Manager, self).__init__(db_driver) + def create_rpc_dispatcher(self): + '''Get the rpc dispatcher for this manager. + + If a manager would like to set an rpc API version, or support more than + one class as the target of rpc messages, override this method. + ''' + return rpc_dispatcher.RpcDispatcher([self]) + def periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" for task_name, task in self._periodic_tasks: diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index acfc10e7e..b48e47610 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -17,6 +17,14 @@ # License for the specific language governing permissions and limitations # under the License. +""" +A remote procedure call (rpc) abstraction. + +For some wrappers that add message versioning to rpc, see: + rpc.dispatcher + rpc.proxy +""" + from nova.openstack.common import cfg from nova.openstack.common import importutils diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py index b0a0d5d12..0e079f533 100644 --- a/nova/rpc/amqp.py +++ b/nova/rpc/amqp.py @@ -242,23 +242,26 @@ class ProxyCallback(object): ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) + version = message_data.get('version', None) if not method: LOG.warn(_('no method for message: %s') % message_data) ctxt.reply(_('No method for message: %s') % message_data, connection_pool=self.connection_pool) return - self.pool.spawn_n(self._process_data, ctxt, method, args) + self.pool.spawn_n(self._process_data, ctxt, version, method, args) - def _process_data(self, ctxt, method, args): - """Thread that magically looks for a method on the proxy - object and calls it. + def _process_data(self, ctxt, version, method, args): + """Process a message in a new thread. + + If the proxy object we have has a dispatch method + (see rpc.dispatcher.RpcDispatcher), pass it the version, + method, and args and let it dispatch as appropriate. If not, use + the old behavior of magically calling the specified method on the + proxy we have here. """ ctxt.update_store() try: - node_func = getattr(self.proxy, str(method)) - node_args = dict((str(k), v) for k, v in args.iteritems()) - # NOTE(vish): magic is fun! - rval = node_func(context=ctxt, **node_args) + rval = self.proxy.dispatch(ctxt, version, method, **args) # Check if the result was a generator if inspect.isgenerator(rval): for x in rval: diff --git a/nova/rpc/common.py b/nova/rpc/common.py index d15d1f3f1..aee243800 100644 --- a/nova/rpc/common.py +++ b/nova/rpc/common.py @@ -85,6 +85,11 @@ class InvalidRPCConnectionReuse(RPCException): message = _("Invalid reuse of an RPC connection.") +class UnsupportedRpcVersion(RPCException): + message = _("Specified RPC version, %(version)s, not supported by " + "this endpoint.") + + class Connection(object): """A connection, returned by rpc.create_connection(). diff --git a/nova/rpc/dispatcher.py b/nova/rpc/dispatcher.py new file mode 100644 index 000000000..3f46398a9 --- /dev/null +++ b/nova/rpc/dispatcher.py @@ -0,0 +1,105 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Code for rpc message dispatching. + +Messages that come in have a version number associated with them. RPC API +version numbers are in the form: + + Major.Minor + +For a given message with version X.Y, the receiver must be marked as able to +handle messages of version A.B, where: + + A = X + + B >= Y + +The Major version number would be incremented for an almost completely new API. +The Minor version number would be incremented for backwards compatible changes +to an existing API. A backwards compatible change could be something like +adding a new method, adding an argument to an existing method (but not +requiring it), or changing the type for an existing argument (but still +handling the old type as well). + +The conversion over to a versioned API must be done on both the client side and +server side of the API at the same time. However, as the code stands today, +there can be both versioned and unversioned APIs implemented in the same code +base. +""" + +from nova.rpc import common as rpc_common + + +class RpcDispatcher(object): + """Dispatch rpc messages according to the requested API version. + + This class can be used as the top level 'manager' for a service. It + contains a list of underlying managers that have an API_VERSION attribute. + """ + + def __init__(self, callbacks): + """Initialize the rpc dispatcher. + + :param callbacks: List of proxy objects that are an instance + of a class with rpc methods exposed. Each proxy + object should have an RPC_API_VERSION attribute. + """ + self.callbacks = callbacks + super(RpcDispatcher, self).__init__() + + @staticmethod + def _is_compatible(mversion, version): + """Determine whether versions are compatible. + + :param mversion: The API version implemented by a callback. + :param version: The API version requested by an incoming message. + """ + version_parts = version.split('.') + mversion_parts = mversion.split('.') + if int(version_parts[0]) != int(mversion_parts[0]): # Major + return False + if int(version_parts[1]) > int(mversion_parts[1]): # Minor + return False + return True + + def dispatch(self, ctxt, version, method, **kwargs): + """Dispatch a message based on a requested version. + + :param ctxt: The request context + :param version: The requested API version from the incoming message + :param method: The method requested to be called by the incoming + message. + :param kwargs: A dict of keyword arguments to be passed to the method. + + :returns: Whatever is returned by the underlying method that gets + called. + """ + if not version: + version = '1.0' + + for proxyobj in self.callbacks: + if hasattr(proxyobj, 'RPC_API_VERSION'): + rpc_api_version = proxyobj.RPC_API_VERSION + else: + rpc_api_version = '1.0' + if not hasattr(proxyobj, method): + continue + if self._is_compatible(rpc_api_version, version): + return getattr(proxyobj, method)(ctxt, **kwargs) + + raise rpc_common.UnsupportedRpcVersion(version=version) diff --git a/nova/rpc/impl_fake.py b/nova/rpc/impl_fake.py index 99c686901..70a8ca5f7 100644 --- a/nova/rpc/impl_fake.py +++ b/nova/rpc/impl_fake.py @@ -47,15 +47,13 @@ class Consumer(object): self.topic = topic self.proxy = proxy - def call(self, context, method, args, timeout): - node_func = getattr(self.proxy, method) - node_args = dict((str(k), v) for k, v in args.iteritems()) + def call(self, context, version, method, args, timeout): done = eventlet.event.Event() def _inner(): ctxt = RpcContext.from_dict(context.to_dict()) try: - rval = node_func(context=ctxt, **node_args) + rval = self.proxy.dispatch(context, version, method, **args) res = [] # Caller might have called ctxt.reply() manually for (reply, failure) in ctxt._response: @@ -129,13 +127,14 @@ def multicall(conf, context, topic, msg, timeout=None): if not method: return args = msg.get('args', {}) + version = msg.get('version', None) try: consumer = CONSUMERS[topic][0] except (KeyError, IndexError): return iter([None]) else: - return consumer.call(context, method, args, timeout) + return consumer.call(context, version, method, args, timeout) def call(conf, context, topic, msg, timeout=None): diff --git a/nova/rpc/proxy.py b/nova/rpc/proxy.py new file mode 100644 index 000000000..79a90dc3a --- /dev/null +++ b/nova/rpc/proxy.py @@ -0,0 +1,161 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A helper class for proxy objects to remote APIs. + +For more information about rpc API version numbers, see: + rpc/dispatcher.py +""" + + +from nova import rpc + + +class RpcProxy(object): + """A helper class for rpc clients. + + This class is a wrapper around the RPC client API. It allows you to + specify the topic and API version in a single place. This is intended to + be used as a base class for a class that implements the client side of an + rpc API. + """ + + def __init__(self, topic, default_version): + """Initialize an RpcProxy. + + :param topic: The topic to use for all messages. + :param default_version: The default API version to request in all + outgoing messages. This can be overridden on a per-message + basis. + """ + self.topic = topic + self.default_version = default_version + super(RpcProxy, self).__init__() + + def _set_version(self, msg, vers): + """Helper method to set the version in a message. + + :param msg: The message having a version added to it. + :param vers: The version number to add to the message. + """ + msg['version'] = vers if vers else self.default_version + + def _get_topic(self, topic): + """Return the topic to use for a message.""" + return topic if topic else self.topic + + @staticmethod + def make_msg(method, **kwargs): + return {'method': method, 'args': kwargs} + + def call(self, context, msg, topic=None, version=None, timeout=None): + """rpc.call() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param timeout: (Optional) A timeout to use when waiting for the + response. If no timeout is specified, a default timeout will be + used that is usually sufficient. + :param version: (Optional) Override the requested API version in this + message. + + :returns: The return value from the remote method. + """ + self._set_version(msg, version) + return rpc.call(context, self._get_topic(topic), msg, timeout) + + def multicall(self, context, msg, topic=None, version=None, timeout=None): + """rpc.multicall() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param timeout: (Optional) A timeout to use when waiting for the + response. If no timeout is specified, a default timeout will be + used that is usually sufficient. + :param version: (Optional) Override the requested API version in this + message. + + :returns: An iterator that lets you process each of the returned values + from the remote method as they arrive. + """ + self._set_version(msg, version) + return rpc.multicall(context, self._get_topic(topic), msg, timeout) + + def cast(self, context, msg, topic=None, version=None): + """rpc.cast() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.cast() does not wait on any return value from the + remote method. + """ + self._set_version(msg, version) + rpc.cast(context, self._get_topic(topic), msg) + + def fanout_cast(self, context, msg, version=None): + """rpc.fanout_cast() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.fanout_cast() does not wait on any return value + from the remote method. + """ + self._set_version(msg, version) + rpc.fanout_cast(context, self.topic, msg) + + def cast_to_server(self, context, server_params, msg, topic=None, + version=None): + """rpc.cast_to_server() a remote method. + + :param context: The request context + :param server_params: Server parameters. See rpc.cast_to_server() for + details. + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.cast_to_server() does not wait on any + return values. + """ + self._set_version(msg, version) + rpc.cast_to_server(context, server_params, self._get_topic(topic), msg) + + def fanout_cast_to_server(self, context, server_params, msg, version=None): + """rpc.fanout_cast_to_server() a remote method. + + :param context: The request context + :param server_params: Server parameters. See rpc.cast_to_server() for + details. + :param msg: The message to send, including the method and args. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.fanout_cast_to_server() does not wait on any + return values. + """ + self._set_version(msg, version) + rpc.fanout_cast_to_server(context, server_params, self.topic, msg) diff --git a/nova/service.py b/nova/service.py index 7d5db5a0a..dcd1205f2 100644 --- a/nova/service.py +++ b/nova/service.py @@ -197,13 +197,15 @@ class Service(object): LOG.debug(_("Creating Consumer connection for Service %s") % self.topic) + rpc_dispatcher = self.manager.create_rpc_dispatcher() + # Share this same connection for these Consumers - self.conn.create_consumer(self.topic, self, fanout=False) + self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False) node_topic = '%s.%s' % (self.topic, self.host) - self.conn.create_consumer(node_topic, self, fanout=False) + self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False) - self.conn.create_consumer(self.topic, self, fanout=True) + self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True) # Consume from all consumers in a thread self.conn.consume_in_thread() diff --git a/nova/tests/rpc/common.py b/nova/tests/rpc/common.py index 0c8c11487..c07ddfa1a 100644 --- a/nova/tests/rpc/common.py +++ b/nova/tests/rpc/common.py @@ -30,6 +30,7 @@ from nova import flags from nova import log as logging from nova.rpc import amqp as rpc_amqp from nova.rpc import common as rpc_common +from nova.rpc import dispatcher as rpc_dispatcher from nova import test @@ -44,8 +45,9 @@ class BaseRpcTestCase(test.TestCase): self.context = context.get_admin_context() if self.rpc: self.conn = self.rpc.create_connection(FLAGS, True) - self.receiver = TestReceiver() - self.conn.create_consumer('test', self.receiver, False) + receiver = TestReceiver() + self.dispatcher = rpc_dispatcher.RpcDispatcher([receiver]) + self.conn.create_consumer('test', self.dispatcher, False) self.conn.consume_in_thread() def tearDown(self): @@ -145,8 +147,9 @@ class BaseRpcTestCase(test.TestCase): return value nested = Nested() + dispatcher = rpc_dispatcher.RpcDispatcher([nested]) conn = self.rpc.create_connection(FLAGS, True) - conn.create_consumer('nested', nested, False) + conn.create_consumer('nested', dispatcher, False) conn.consume_in_thread() value = 42 result = self.rpc.call(FLAGS, self.context, @@ -228,7 +231,6 @@ class TestReceiver(object): Uses static methods because we aren't actually storing any state. """ - @staticmethod def echo(context, value): """Simply returns whatever value is sent in.""" diff --git a/nova/tests/rpc/test_dispatcher.py b/nova/tests/rpc/test_dispatcher.py new file mode 100644 index 000000000..0c16c6a34 --- /dev/null +++ b/nova/tests/rpc/test_dispatcher.py @@ -0,0 +1,109 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Unit Tests for rpc.dispatcher +""" + +from nova import context +from nova.rpc import dispatcher +from nova.rpc import common as rpc_common +from nova import test + + +class RpcDispatcherTestCase(test.TestCase): + class API1(object): + RPC_API_VERSION = '1.0' + + def __init__(self): + self.test_method_ctxt = None + self.test_method_arg1 = None + + def test_method(self, ctxt, arg1): + self.test_method_ctxt = ctxt + self.test_method_arg1 = arg1 + + class API2(object): + RPC_API_VERSION = '2.1' + + def __init__(self): + self.test_method_ctxt = None + self.test_method_arg1 = None + + def test_method(self, ctxt, arg1): + self.test_method_ctxt = ctxt + self.test_method_arg1 = arg1 + + class API3(object): + RPC_API_VERSION = '3.5' + + def __init__(self): + self.test_method_ctxt = None + self.test_method_arg1 = None + + def test_method(self, ctxt, arg1): + self.test_method_ctxt = ctxt + self.test_method_arg1 = arg1 + + def setUp(self): + self.ctxt = context.RequestContext('fake_user', 'fake_project') + super(RpcDispatcherTestCase, self).setUp() + + def tearDown(self): + super(RpcDispatcherTestCase, self).tearDown() + + def _test_dispatch(self, version, expectations): + v2 = self.API2() + v3 = self.API3() + disp = dispatcher.RpcDispatcher([v2, v3]) + + disp.dispatch(self.ctxt, version, 'test_method', arg1=1) + + self.assertEqual(v2.test_method_ctxt, expectations[0]) + self.assertEqual(v2.test_method_arg1, expectations[1]) + self.assertEqual(v3.test_method_ctxt, expectations[2]) + self.assertEqual(v3.test_method_arg1, expectations[3]) + + def test_dispatch(self): + self._test_dispatch('2.1', (self.ctxt, 1, None, None)) + self._test_dispatch('3.5', (None, None, self.ctxt, 1)) + + def test_dispatch_lower_minor_version(self): + self._test_dispatch('2.0', (self.ctxt, 1, None, None)) + self._test_dispatch('3.1', (None, None, self.ctxt, 1)) + + def test_dispatch_higher_minor_version(self): + self.assertRaises(rpc_common.UnsupportedRpcVersion, + self._test_dispatch, '2.6', (None, None, None, None)) + self.assertRaises(rpc_common.UnsupportedRpcVersion, + self._test_dispatch, '3.6', (None, None, None, None)) + + def test_dispatch_lower_major_version(self): + self.assertRaises(rpc_common.UnsupportedRpcVersion, + self._test_dispatch, '1.0', (None, None, None, None)) + + def test_dispatch_higher_major_version(self): + self.assertRaises(rpc_common.UnsupportedRpcVersion, + self._test_dispatch, '4.0', (None, None, None, None)) + + def test_dispatch_no_version_uses_v1(self): + v1 = self.API1() + disp = dispatcher.RpcDispatcher([v1]) + + disp.dispatch(self.ctxt, None, 'test_method', arg1=1) + + self.assertEqual(v1.test_method_ctxt, self.ctxt) + self.assertEqual(v1.test_method_arg1, 1) diff --git a/nova/tests/rpc/test_proxy.py b/nova/tests/rpc/test_proxy.py new file mode 100644 index 000000000..9ef504a0d --- /dev/null +++ b/nova/tests/rpc/test_proxy.py @@ -0,0 +1,124 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Unit Tests for rpc.proxy +""" + +import copy + +from nova import context +from nova import rpc +from nova.rpc import proxy +from nova import test + + +class RpcProxyTestCase(test.TestCase): + + def setUp(self): + super(RpcProxyTestCase, self).setUp() + + def tearDown(self): + super(RpcProxyTestCase, self).tearDown() + + def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False, + server_params=None, supports_topic_override=True): + topic = 'fake_topic' + timeout = 123 + rpc_proxy = proxy.RpcProxy(topic, '1.0') + ctxt = context.RequestContext('fake_user', 'fake_project') + msg = {'method': 'fake_method', 'args': {'x': 'y'}} + expected_msg = {'method': 'fake_method', 'args': {'x': 'y'}, + 'version': '1.0'} + + expected_retval = 'hi' if has_retval else None + + self.fake_args = None + self.fake_kwargs = None + + def _fake_rpc_method(*args, **kwargs): + self.fake_args = args + self.fake_kwargs = kwargs + if has_retval: + return expected_retval + + self.stubs.Set(rpc, rpc_method, _fake_rpc_method) + + args = [ctxt, msg] + if server_params: + args.insert(1, server_params) + + # Base method usage + retval = getattr(rpc_proxy, rpc_method)(*args) + self.assertEqual(retval, expected_retval) + expected_args = [ctxt, topic, expected_msg] + if server_params: + expected_args.insert(1, server_params) + for arg, expected_arg in zip(self.fake_args, expected_args): + self.assertEqual(arg, expected_arg) + + # overriding the version + retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1') + self.assertEqual(retval, expected_retval) + new_msg = copy.deepcopy(expected_msg) + new_msg['version'] = '1.1' + expected_args = [ctxt, topic, new_msg] + if server_params: + expected_args.insert(1, server_params) + for arg, expected_arg in zip(self.fake_args, expected_args): + self.assertEqual(arg, expected_arg) + + if has_timeout: + # set a timeout + retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout) + self.assertEqual(retval, expected_retval) + expected_args = [ctxt, topic, expected_msg, timeout] + for arg, expected_arg in zip(self.fake_args, expected_args): + self.assertEqual(arg, expected_arg) + + if supports_topic_override: + # set a topic + new_topic = 'foo.bar' + retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic) + self.assertEqual(retval, expected_retval) + expected_args = [ctxt, new_topic, expected_msg] + if server_params: + expected_args.insert(1, server_params) + for arg, expected_arg in zip(self.fake_args, expected_args): + self.assertEqual(arg, expected_arg) + + def test_call(self): + self._test_rpc_method('call', has_timeout=True, has_retval=True) + + def test_multicall(self): + self._test_rpc_method('multicall', has_timeout=True, has_retval=True) + + def test_cast(self): + self._test_rpc_method('cast') + + def test_fanout_cast(self): + self._test_rpc_method('fanout_cast', supports_topic_override=False) + + def test_cast_to_server(self): + self._test_rpc_method('cast_to_server', server_params={'blah': 1}) + + def test_fanout_cast_to_server(self): + self._test_rpc_method('fanout_cast_to_server', + server_params={'blah': 1}, supports_topic_override=False) + + def test_make_msg(self): + self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2), + {'method': 'test_method', 'args': {'a': 1, 'b': 2}})