Support capping message versions in the client.

When doing a rolling upgrade, we need to be able to tell all rpc clients
to hold off on sending newer versions of messages until all nodes
understand the new message version.  This patch adds the oslo component
of this.

It's quite simple.  The rpc proxy just stores the version cap and will
raise an exception if code ever tries to send a message that exceeds
this cap.

Allowing the cap to be configured and generating different types of
messages based on the configured value is the hard part here, but that
is left up to the project using the rpc library.

Implements blueprint rpc-version-control.
This commit is contained in:
Russell Bryant 2013-05-15 16:39:04 -04:00
parent 704f2c3319
commit ab26987133
1 changed files with 31 additions and 4 deletions

View File

@ -24,6 +24,7 @@ from openstack.common.gettextutils import _
from openstack.common import local
from openstack.common import log as logging
from openstack.common.messaging import target
from openstack.common.messaging import _utils as utils
_client_opts = [
cfg.IntOpt('rpc_response_timeout',
@ -34,15 +35,29 @@ _client_opts = [
_LOG = logging.getLogger(__name__)
class RpcVersionCapError(Exception):
def __init__(self, version, version_cap):
self.version = version
self.version_cap = version_cap
def __str__(self):
return (_("Specified RPC version cap, %(version_cap)s, is too low. "
"Needs to be higher than %(version)s.") %
dict(version=self.version, version_cap=self.version_cap))
class _CallContext(object):
def __init__(self, transport, target, timeout=None, check_for_lock=None):
def __init__(self, transport, target,
timeout=None, check_for_lock=None, version_cap=None):
self.conf = transport.conf
self.transport = transport
self.target = target
self.timeout = timeout
self.check_for_lock = check_for_lock
self.version_cap = version_cap
super(_CallContext, self).__init__()
@ -70,6 +85,11 @@ class _CallContext(object):
'Please report it. Include the following: [%(stack)s].'),
{'locks': local.strong_store.locks_held, 'stack': stack})
def _check_version_cap(self, version):
if not utils.version_is_compatible(self.version_cap, v):
raise RpcVersionCapError(version=version,
version_cap=self.version_cap)
def call(self, ctxt, method, **kwargs):
msg = self._make_message(method, kwargs)
@ -79,6 +99,8 @@ class _CallContext(object):
if self.check_for_lock:
self._check_for_lock()
if self.version_cap:
self._check_version_cap(msg.get('version'))
return self.transport._send(self.target, ctxt, msg,
wait_for_reply=True, timeout=timeout)
@ -86,7 +108,8 @@ class _CallContext(object):
class RPCClient(object):
def __init__(self, transport, target, timeout=None, check_for_lock=None):
def __init__(self, transport, target,
timeout=None, check_for_lock=None, version_cap=None):
self.conf = transport.conf
self.conf.register_opts(_client_opts)
@ -94,19 +117,23 @@ class RPCClient(object):
self.target = target
self.timeout = timeout
self.check_for_lock = check_for_lock
self.version_cap = version_cap
super(RPCClient, self).__init__()
def prepare(self, server=None, version=None,
timeout=None, check_for_lock=None):
timeout=None, check_for_lock=None, version_cap=None):
target = self.target(server=server, version=version)
if timeout is None:
timeout = self.timeout
if check_for_lock is None:
check_for_lock = self.check_for_lock
if version_cap is None:
version_cap = self.version_cap
return _CallContext(self.transport, target, timeout, check_for_lock)
return _CallContext(self.transport, target,
timeout, check_for_lock, version_cap)
def cast(self, ctxt, method, **kwargs):
self.prepare().cast(ctxt, method, **kwargs)