From ab269871338535419ca7e756ab734318b5e0aa34 Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Wed, 15 May 2013 16:39:04 -0400 Subject: [PATCH] Support capping message versions in the client. When doing a rolling upgrade, we need to be able to tell all rpc clients to hold off on sending newer versions of messages until all nodes understand the new message version. This patch adds the oslo component of this. It's quite simple. The rpc proxy just stores the version cap and will raise an exception if code ever tries to send a message that exceeds this cap. Allowing the cap to be configured and generating different types of messages based on the configured value is the hard part here, but that is left up to the project using the rpc library. Implements blueprint rpc-version-control. --- openstack/common/messaging/rpc/client.py | 35 +++++++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/openstack/common/messaging/rpc/client.py b/openstack/common/messaging/rpc/client.py index ba5cfcdd6..7bdf46846 100644 --- a/openstack/common/messaging/rpc/client.py +++ b/openstack/common/messaging/rpc/client.py @@ -24,6 +24,7 @@ from openstack.common.gettextutils import _ from openstack.common import local from openstack.common import log as logging from openstack.common.messaging import target +from openstack.common.messaging import _utils as utils _client_opts = [ cfg.IntOpt('rpc_response_timeout', @@ -34,15 +35,29 @@ _client_opts = [ _LOG = logging.getLogger(__name__) +class RpcVersionCapError(Exception): + + def __init__(self, version, version_cap): + self.version = version + self.version_cap = version_cap + + def __str__(self): + return (_("Specified RPC version cap, %(version_cap)s, is too low. " + "Needs to be higher than %(version)s.") % + dict(version=self.version, version_cap=self.version_cap)) + + class _CallContext(object): - def __init__(self, transport, target, timeout=None, check_for_lock=None): + def __init__(self, transport, target, + timeout=None, check_for_lock=None, version_cap=None): self.conf = transport.conf self.transport = transport self.target = target self.timeout = timeout self.check_for_lock = check_for_lock + self.version_cap = version_cap super(_CallContext, self).__init__() @@ -70,6 +85,11 @@ class _CallContext(object): 'Please report it. Include the following: [%(stack)s].'), {'locks': local.strong_store.locks_held, 'stack': stack}) + def _check_version_cap(self, version): + if not utils.version_is_compatible(self.version_cap, v): + raise RpcVersionCapError(version=version, + version_cap=self.version_cap) + def call(self, ctxt, method, **kwargs): msg = self._make_message(method, kwargs) @@ -79,6 +99,8 @@ class _CallContext(object): if self.check_for_lock: self._check_for_lock() + if self.version_cap: + self._check_version_cap(msg.get('version')) return self.transport._send(self.target, ctxt, msg, wait_for_reply=True, timeout=timeout) @@ -86,7 +108,8 @@ class _CallContext(object): class RPCClient(object): - def __init__(self, transport, target, timeout=None, check_for_lock=None): + def __init__(self, transport, target, + timeout=None, check_for_lock=None, version_cap=None): self.conf = transport.conf self.conf.register_opts(_client_opts) @@ -94,19 +117,23 @@ class RPCClient(object): self.target = target self.timeout = timeout self.check_for_lock = check_for_lock + self.version_cap = version_cap super(RPCClient, self).__init__() def prepare(self, server=None, version=None, - timeout=None, check_for_lock=None): + timeout=None, check_for_lock=None, version_cap=None): target = self.target(server=server, version=version) if timeout is None: timeout = self.timeout if check_for_lock is None: check_for_lock = self.check_for_lock + if version_cap is None: + version_cap = self.version_cap - return _CallContext(self.transport, target, timeout, check_for_lock) + return _CallContext(self.transport, target, + timeout, check_for_lock, version_cap) def cast(self, ctxt, method, **kwargs): self.prepare().cast(ctxt, method, **kwargs)