Pin RPC and object version to lowest running

This commit adds detection and pinning to lowest RPC API version to all
rpcapi modules. The version pin is determined by a DB call done once per
service lifetime thanks to caching. Handling the compatibility is
guaranteed by oslo.messaging and our shims in rpcapi modules.

To achieve o.vo compatibility, a similar approach is implemented. Custom
oslo.messaging serializer is implemented that backports objects to the
lowest running version on sending.

During the process of upgrade it may happen that manager receives an
object in the version lower than current one. Handling of such
situations is up to the manager and it should do that explicitely by
checking obj.VERSION.

The patch also adds required methods to db.api and Service object.

Co-Authored-By: Thang Pham <thang.g.pham@gmail.com>

Change-Id: I649892da64f9734928a6cf0f004a369aa7aa375f
Partial-Implements: blueprint rpc-object-compatibility
This commit is contained in:
Michał Dulko 2016-01-14 20:57:12 +01:00
parent 4226d37272
commit e6d525e56d
15 changed files with 339 additions and 50 deletions

View File

@ -20,18 +20,15 @@ Client side of the volume backup RPC API.
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from cinder.objects import base as objects_base
from cinder import rpc
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class BackupAPI(object):
class BackupAPI(rpc.RPCAPI):
"""Client side of the volume rpc API.
API version history:
@ -40,16 +37,9 @@ class BackupAPI(object):
1.1 - Changed methods to accept backup objects instead of IDs.
"""
BASE_RPC_API_VERSION = '1.0'
RPC_API_VERSION = '1.1'
def __init__(self):
super(BackupAPI, self).__init__()
target = messaging.Target(topic=CONF.backup_topic,
version=self.BASE_RPC_API_VERSION)
serializer = objects_base.CinderObjectSerializer()
self.client = rpc.get_client(target, self.RPC_API_VERSION,
serializer=serializer)
TOPIC = CONF.backup_topic
BINARY = 'cinder-backup'
def create_backup(self, ctxt, backup):
LOG.debug("create_backup in rpcapi backup_id %s", backup.id)

View File

@ -116,6 +116,11 @@ def service_get_all_by_topic(context, topic, disabled=None):
return IMPL.service_get_all_by_topic(context, topic, disabled=disabled)
def service_get_all_by_binary(context, binary, disabled=None):
"""Get all services for a given binary."""
return IMPL.service_get_all_by_binary(context, binary, disabled)
def service_get_by_args(context, host, binary):
"""Get the state of a service by node name and binary."""
return IMPL.service_get_by_args(context, host, binary)

View File

@ -393,6 +393,17 @@ def service_get_all_by_topic(context, topic, disabled=None):
return query.all()
@require_admin_context
def service_get_all_by_binary(context, binary, disabled=None):
query = model_query(
context, models.Service, read_deleted="no").filter_by(binary=binary)
if disabled is not None:
query = query.filter_by(disabled=disabled)
return query.all()
@require_admin_context
def service_get_by_host_and_topic(context, host, topic):
result = model_query(

View File

@ -96,6 +96,7 @@ OBJ_VERSIONS.add('1.0', {'Backup': '1.3', 'BackupImport': '1.3',
'ConsistencyGroup': '1.2',
'ConsistencyGroupList': '1.1', 'Service': '1.1',
'Volume': '1.3', 'VolumeTypeList': '1.1'})
OBJ_VERSIONS.add('1.1', {'Service': '1.2', 'ServiceList': '1.1'})
class CinderObjectRegistry(base.VersionedObjectRegistry):
@ -385,3 +386,23 @@ class ObjectListBase(base.ObjectListBase):
class CinderObjectSerializer(base.VersionedObjectSerializer):
OBJ_BASE_CLASS = CinderObject
def __init__(self, version_cap=None):
super(CinderObjectSerializer, self).__init__()
self.version_cap = version_cap
def _get_capped_obj_version(self, obj):
objname = obj.obj_name()
objver = OBJ_VERSIONS.get(self.version_cap, {})
return objver.get(objname, None)
def serialize_entity(self, context, entity):
if isinstance(entity, (tuple, list, set, dict)):
entity = self._process_iterable(context, self.serialize_entity,
entity)
elif (hasattr(entity, 'obj_to_primitive') and
callable(entity.obj_to_primitive)):
# NOTE(dulek): Backport outgoing object to the capped version.
backport_ver = self._get_capped_obj_version(entity)
entity = entity.obj_to_primitive(backport_ver)
return entity

View File

@ -14,6 +14,7 @@
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import versionutils
from oslo_versionedobjects import fields
from cinder import db
@ -33,7 +34,8 @@ class Service(base.CinderPersistentObject, base.CinderObject,
base.CinderComparableObject):
# Version 1.0: Initial version
# Version 1.1: Add rpc_current_version and object_current_version fields
VERSION = '1.1'
# Version 1.2: Add get_minimum_rpc_version() and get_minimum_obj_version()
VERSION = '1.2'
fields = {
'id': fields.IntegerField(),
@ -100,16 +102,49 @@ class Service(base.CinderPersistentObject, base.CinderObject,
with self.obj_as_admin():
db.service_destroy(self._context, self.id)
@classmethod
def _get_minimum_version(cls, attribute, context, binary):
services = ServiceList.get_all_by_binary(context, binary)
min_ver = None
min_ver_str = None
for s in services:
ver_str = getattr(s, attribute)
if ver_str is None:
# FIXME(dulek) None in *_current_version means that this
# service is in Liberty version, so we must assume this is the
# lowest one. We use handy and easy to remember token to
# indicate that. This may go away as soon as we drop
# compatibility with Liberty, possibly in early N.
return 'liberty'
ver = versionutils.convert_version_to_int(ver_str)
if min_ver is None or ver < min_ver:
min_ver = ver
min_ver_str = ver_str
return min_ver_str
@base.remotable_classmethod
def get_minimum_rpc_version(cls, context, binary):
return cls._get_minimum_version('rpc_current_version', context, binary)
@base.remotable_classmethod
def get_minimum_obj_version(cls, context, binary):
return cls._get_minimum_version('object_current_version', context,
binary)
@base.CinderObjectRegistry.register
class ServiceList(base.ObjectListBase, base.CinderObject):
VERSION = '1.0'
# Version 1.0: Initial version
# Version 1.1: Service object 1.2
VERSION = '1.1'
fields = {
'objects': fields.ListOfObjectsField('Service'),
}
child_versions = {
'1.0': '1.0'
'1.0': '1.0',
'1.1': '1.2',
}
@base.remotable_classmethod
@ -124,3 +159,10 @@ class ServiceList(base.ObjectListBase, base.CinderObject):
disabled=disabled)
return base.obj_make_list(context, cls(context), objects.Service,
services)
@base.remotable_classmethod
def get_all_by_binary(cls, context, binary, disabled=None):
services = db.service_get_all_by_binary(context, binary,
disabled=disabled)
return base.obj_make_list(context, cls(context), objects.Service,
services)

View File

@ -27,14 +27,19 @@ __all__ = [
]
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from osprofiler import profiler
import cinder.context
import cinder.exception
from cinder.i18n import _LI
from cinder import objects
from cinder.objects import base
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
TRANSPORT = None
NOTIFIER = None
@ -160,3 +165,70 @@ def get_notifier(service=None, host=None, publisher_id=None):
if not publisher_id:
publisher_id = "%s.%s" % (service, host or CONF.host)
return NOTIFIER.prepare(publisher_id=publisher_id)
LAST_RPC_VERSIONS = {}
LAST_OBJ_VERSIONS = {}
class RPCAPI(object):
"""Mixin class aggregating methods related to RPC API compatibility."""
RPC_API_VERSION = '1.0'
TOPIC = ''
BINARY = ''
def __init__(self):
target = messaging.Target(topic=self.TOPIC,
version=self.RPC_API_VERSION)
obj_version_cap = self._determine_obj_version_cap()
serializer = base.CinderObjectSerializer(obj_version_cap)
rpc_version_cap = self._determine_rpc_version_cap()
self.client = get_client(target, version_cap=rpc_version_cap,
serializer=serializer)
def _determine_rpc_version_cap(self):
global LAST_RPC_VERSIONS
if self.BINARY in LAST_RPC_VERSIONS:
return LAST_RPC_VERSIONS[self.BINARY]
version_cap = objects.Service.get_minimum_rpc_version(
cinder.context.get_admin_context(), self.BINARY)
if version_cap == 'liberty':
# NOTE(dulek): This means that one of the services is Liberty,
# we should cap to it's RPC version.
version_cap = LIBERTY_RPC_VERSIONS[self.BINARY]
LOG.info(_LI('Automatically selected %(binary)s RPC version '
'%(version)s as minimum service version.'),
{'binary': self.BINARY, 'version': version_cap})
LAST_RPC_VERSIONS[self.BINARY] = version_cap
return version_cap
def _determine_obj_version_cap(self):
global LAST_OBJ_VERSIONS
if self.BINARY in LAST_OBJ_VERSIONS:
return LAST_OBJ_VERSIONS[self.BINARY]
version_cap = objects.Service.get_minimum_obj_version(
cinder.context.get_admin_context(), self.BINARY)
LOG.info(_LI('Automatically selected %(binary)s objects version '
'%(version)s as minimum service version.'),
{'binary': self.BINARY, 'version': version_cap})
LAST_OBJ_VERSIONS[self.BINARY] = version_cap
return version_cap
# FIXME(dulek): Liberty haven't reported its RPC versions, so we need to have
# them hardcoded. This dict may go away as soon as we drop compatibility with
# L, which should be in early N.
#
# This is the only time we need to have such dictionary. We don't need to add
# similar ones for any release following Liberty.
LIBERTY_RPC_VERSIONS = {
'cinder-volume': '1.30',
'cinder-scheduler': '1.8',
# NOTE(dulek) backup.manager had specified version '1.2', but backup.rpcapi
# was really only sending messages up to '1.1'.
'cinder-backup': '1.1',
}

View File

@ -17,17 +17,15 @@ Client side of the scheduler manager RPC API.
"""
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from cinder.objects import base as objects_base
from cinder import rpc
CONF = cfg.CONF
class SchedulerAPI(object):
class SchedulerAPI(rpc.RPCAPI):
"""Client side of the scheduler rpc API.
API version history:
@ -48,18 +46,9 @@ class SchedulerAPI(object):
migrate_volume_to_host()
"""
RPC_API_VERSION = '1.0'
def __init__(self):
super(SchedulerAPI, self).__init__()
target = messaging.Target(topic=CONF.scheduler_topic,
version=self.RPC_API_VERSION)
serializer = objects_base.CinderObjectSerializer()
# NOTE(thangp): Until version pinning is impletemented, set the client
# version_cap to None
self.client = rpc.get_client(target, version_cap=None,
serializer=serializer)
RPC_API_VERSION = '1.11'
TOPIC = CONF.scheduler_topic
BINARY = 'cinder-scheduler'
def create_consistencygroup(self, ctxt, topic, group,
request_spec_list=None,

View File

@ -45,7 +45,7 @@ class BackupRpcAPITestCase(test.TestCase):
target = {
"server": server,
"fanout": fanout,
"version": kwargs.pop('version', rpcapi.BASE_RPC_API_VERSION)
"version": kwargs.pop('version', rpcapi.RPC_API_VERSION)
}
expected_msg = copy.deepcopy(kwargs)

View File

@ -17,6 +17,7 @@ import mock
import uuid
from iso8601 import iso8601
from oslo_utils import versionutils
from oslo_versionedobjects import fields
from sqlalchemy import sql
@ -30,12 +31,21 @@ from cinder.tests.unit import objects as test_objects
@objects.base.CinderObjectRegistry.register_if(False)
class TestObject(objects.base.CinderObject):
VERSION = '1.1'
fields = {
'scheduled_at': objects.base.fields.DateTimeField(nullable=True),
'uuid': objects.base.fields.UUIDField(),
'text': objects.base.fields.StringField(nullable=True),
}
def obj_make_compatible(self, primitive, target_version):
super(TestObject, self).obj_make_compatible(primitive,
target_version)
target_version = versionutils.convert_version_to_tuple(target_version)
if target_version < (1, 1):
primitive.pop('text', None)
class TestCinderObject(test_objects.BaseObjectsTestCase):
"""Tests methods from CinderObject."""
@ -595,3 +605,23 @@ class TestCinderDictObject(test_objects.BaseObjectsTestCase):
self.assertTrue('foo' in obj)
self.assertTrue('abc' in obj)
self.assertFalse('def' in obj)
@mock.patch('cinder.objects.base.OBJ_VERSIONS', {'1.0': {'TestObject': '1.0'},
'1.1': {'TestObject': '1.1'},
})
class TestCinderObjectSerializer(test_objects.BaseObjectsTestCase):
def setUp(self):
super(TestCinderObjectSerializer, self).setUp()
self.obj = TestObject(scheduled_at=None, uuid=uuid.uuid4(),
text='text')
def test_serialize_entity_backport(self):
serializer = objects.base.CinderObjectSerializer('1.0')
primitive = serializer.serialize_entity(self.context, self.obj)
self.assertEqual('1.0', primitive['versioned_object.version'])
def test_serialize_entity_unknown_version(self):
serializer = objects.base.CinderObjectSerializer('0.9')
primitive = serializer.serialize_entity(self.context, self.obj)
self.assertEqual('1.1', primitive['versioned_object.version'])

View File

@ -28,8 +28,8 @@ object_data = {
'CGSnapshotList': '1.0-e8c3f4078cd0ee23487b34d173eec776',
'ConsistencyGroup': '1.2-ed7f90a6871991a19af716ade7337fc9',
'ConsistencyGroupList': '1.1-73916823b697dfa0c7f02508d87e0f28',
'Service': '1.1-9eb00cbd8e2bfb7371343429af54d6e8',
'ServiceList': '1.0-d242d3384b68e5a5a534e090ff1d5161',
'Service': '1.2-4d3dd6c9906da364739fbf3f90c80505',
'ServiceList': '1.1-cb758b200f0a3a90efabfc5aa2ffb627',
'Snapshot': '1.0-a6c33eefeadefb324d79f72f66c54e9a',
'SnapshotList': '1.0-71661e7180ef6cc51501704a9bea4bf1',
'Volume': '1.3-97c3977846dae6588381e7bd3e6e6558',

View File

@ -97,6 +97,39 @@ class TestService(test_objects.BaseObjectsTestCase):
mock.call.__nonzero__(),
mock.call(self.context, 123)])
@mock.patch('cinder.db.service_get_all_by_binary')
def _test_get_minimum_version(self, services_update, expected,
service_get_all_by_binary):
services = [fake_service.fake_db_service(**s) for s in services_update]
service_get_all_by_binary.return_value = services
min_rpc = objects.Service.get_minimum_rpc_version(self.context, 'foo')
self.assertEqual(expected[0], min_rpc)
min_obj = objects.Service.get_minimum_obj_version(self.context, 'foo')
self.assertEqual(expected[1], min_obj)
service_get_all_by_binary.assert_has_calls(
[mock.call(self.context, 'foo', disabled=None)] * 2)
@mock.patch('cinder.db.service_get_all_by_binary')
def test_get_minimum_version(self, service_get_all_by_binary):
services_update = [
{'rpc_current_version': '1.0', 'object_current_version': '1.3'},
{'rpc_current_version': '1.1', 'object_current_version': '1.2'},
{'rpc_current_version': '2.0', 'object_current_version': '2.5'},
]
expected = ('1.0', '1.2')
self._test_get_minimum_version(services_update, expected)
@mock.patch('cinder.db.service_get_all_by_binary')
def test_get_minimum_version_liberty(self, service_get_all_by_binary):
services_update = [
{'rpc_current_version': '1.0', 'object_current_version': '1.3'},
{'rpc_current_version': '1.1', 'object_current_version': None},
{'rpc_current_version': None, 'object_current_version': '2.5'},
]
expected = ('liberty', 'liberty')
self._test_get_minimum_version(services_update, expected)
class TestServiceList(test_objects.BaseObjectsTestCase):
@mock.patch('cinder.db.service_get_all')
@ -120,3 +153,15 @@ class TestServiceList(test_objects.BaseObjectsTestCase):
self.context, 'foo', disabled='bar')
self.assertEqual(1, len(services))
TestService._compare(self, db_service, services[0])
@mock.patch('cinder.db.service_get_all_by_binary')
def test_get_all_by_binary(self, service_get_all_by_binary):
db_service = fake_service.fake_db_service()
service_get_all_by_binary.return_value = [db_service]
services = objects.ServiceList.get_all_by_binary(
self.context, 'foo', 'bar')
service_get_all_by_binary.assert_called_once_with(
self.context, 'foo', disabled='bar')
self.assertEqual(1, len(services))
TestService._compare(self, db_service, services[0])

View File

@ -209,6 +209,18 @@ class DBAPIServiceTestCase(BaseTest):
real = db.service_get_all_by_topic(self.ctxt, 't1')
self._assertEqualListsOfObjects(expected, real)
def test_service_get_all_by_binary(self):
values = [
{'host': 'host1', 'binary': 'b1'},
{'host': 'host2', 'binary': 'b1'},
{'host': 'host4', 'disabled': True, 'binary': 'b1'},
{'host': 'host3', 'binary': 'b2'}
]
services = [self._create_service(vals) for vals in values]
expected = services[:3]
real = db.service_get_all_by_binary(self.ctxt, 'b1')
self._assertEqualListsOfObjects(expected, real)
def test_service_get_by_args(self):
values = [
{'host': 'host1', 'binary': 'a'},

View File

@ -0,0 +1,83 @@
# Copyright 2015 Intel Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cinder import rpc
from cinder import test
class FakeAPI(rpc.RPCAPI):
RPC_API_VERSION = '1.5'
TOPIC = 'cinder-scheduler-topic'
BINARY = 'cinder-scheduler'
class RPCAPITestCase(test.TestCase):
"""Tests RPCAPI mixin aggregating stuff related to RPC compatibility."""
def setUp(self):
super(RPCAPITestCase, self).setUp()
# Reset cached version pins
rpc.LAST_RPC_VERSIONS = {}
rpc.LAST_OBJ_VERSIONS = {}
@mock.patch('cinder.objects.Service.get_minimum_rpc_version',
return_value='1.2')
@mock.patch('cinder.objects.Service.get_minimum_obj_version',
return_value='1.7')
@mock.patch('cinder.rpc.get_client')
def test_init(self, get_client, get_min_obj, get_min_rpc):
def fake_get_client(target, version_cap, serializer):
self.assertEqual(FakeAPI.TOPIC, target.topic)
self.assertEqual(FakeAPI.RPC_API_VERSION, target.version)
self.assertEqual('1.2', version_cap)
self.assertEqual('1.7', serializer.version_cap)
get_client.side_effect = fake_get_client
FakeAPI()
@mock.patch('cinder.objects.Service.get_minimum_rpc_version',
return_value='liberty')
@mock.patch('cinder.objects.Service.get_minimum_obj_version',
return_value='liberty')
@mock.patch('cinder.rpc.get_client')
def test_init_liberty_caps(self, get_client, get_min_obj, get_min_rpc):
def fake_get_client(target, version_cap, serializer):
self.assertEqual(FakeAPI.TOPIC, target.topic)
self.assertEqual(FakeAPI.RPC_API_VERSION, target.version)
self.assertEqual(rpc.LIBERTY_RPC_VERSIONS[FakeAPI.BINARY],
version_cap)
self.assertEqual('liberty', serializer.version_cap)
get_client.side_effect = fake_get_client
FakeAPI()
@mock.patch('cinder.objects.Service.get_minimum_rpc_version')
@mock.patch('cinder.objects.Service.get_minimum_obj_version')
@mock.patch('cinder.rpc.get_client')
@mock.patch('cinder.rpc.LAST_RPC_VERSIONS', {'cinder-scheduler': '1.4'})
@mock.patch('cinder.rpc.LAST_OBJ_VERSIONS', {'cinder-scheduler': '1.3'})
def test_init_cached_caps(self, get_client, get_min_obj, get_min_rpc):
def fake_get_client(target, version_cap, serializer):
self.assertEqual(FakeAPI.TOPIC, target.topic)
self.assertEqual(FakeAPI.RPC_API_VERSION, target.version)
self.assertEqual('1.4', version_cap)
self.assertEqual('1.3', serializer.version_cap)
get_client.side_effect = fake_get_client
FakeAPI()
self.assertFalse(get_min_obj.called)
self.assertFalse(get_min_rpc.called)

View File

@ -110,7 +110,7 @@ class VolumeRpcAPITestCase(test.TestCase):
expected_retval = 'foo' if method == 'call' else None
target = {
"version": kwargs.pop('version', rpcapi.BASE_RPC_API_VERSION)
"version": kwargs.pop('version', rpcapi.RPC_API_VERSION)
}
if 'request_spec' in kwargs:

View File

@ -17,10 +17,8 @@ Client side of the volume RPC API.
"""
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from cinder.objects import base as objects_base
from cinder import rpc
from cinder.volume import utils
@ -28,7 +26,7 @@ from cinder.volume import utils
CONF = cfg.CONF
class VolumeAPI(object):
class VolumeAPI(rpc.RPCAPI):
"""Client side of the volume rpc API.
API version history:
@ -89,18 +87,9 @@ class VolumeAPI(object):
checks in the API.
"""
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic=None):
super(VolumeAPI, self).__init__()
target = messaging.Target(topic=CONF.volume_topic,
version=self.BASE_RPC_API_VERSION)
serializer = objects_base.CinderObjectSerializer()
# NOTE(thangp): Until version pinning is impletemented, set the client
# version_cap to None
self.client = rpc.get_client(target, version_cap=None,
serializer=serializer)
RPC_API_VERSION = '1.37'
TOPIC = CONF.volume_topic
BINARY = 'cinder-volume'
def create_consistencygroup(self, ctxt, group, host):
new_host = utils.extract_host(host)