Make Serializer/Conductor able to backlevel objects
This improves handling the situation where nova-api sends newer objects to an older compute node. In this case, the serializer detects the IncompatibleObjectVersion error and asks conductor to backlevel it. This means we call into a new method in conductor with the primitive object that we can't handle, and basically ask it to hydrate it and call obj_make_compatible() on our behalf and pass it back to us. This eliminates the need for API to know whether it's talking to an older compute and allows a newer conductor to assist older compute nodes during upgrades to a significant degree. Related-Bug: #1258256 Change-Id: I4b17c8382619e4f73b83c026bf68549ac67a68a2
This commit is contained in:
parent
b32d01d44c
commit
0eb7e35fbf
|
@ -304,6 +304,9 @@ class LocalAPI(object):
|
||||||
def compute_unrescue(self, context, instance):
|
def compute_unrescue(self, context, instance):
|
||||||
return self._manager.compute_unrescue(context, instance)
|
return self._manager.compute_unrescue(context, instance)
|
||||||
|
|
||||||
|
def object_backport(self, context, objinst, target_version):
|
||||||
|
return self._manager.object_backport(context, objinst, target_version)
|
||||||
|
|
||||||
|
|
||||||
class LocalComputeTaskAPI(object):
|
class LocalComputeTaskAPI(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|
|
@ -76,7 +76,7 @@ class ConductorManager(manager.Manager):
|
||||||
namespace. See the ComputeTaskManager class for details.
|
namespace. See the ComputeTaskManager class for details.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
RPC_API_VERSION = '1.61'
|
RPC_API_VERSION = '1.62'
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super(ConductorManager, self).__init__(service_name='conductor',
|
super(ConductorManager, self).__init__(service_name='conductor',
|
||||||
|
@ -616,6 +616,9 @@ class ConductorManager(manager.Manager):
|
||||||
def compute_reboot(self, context, instance, reboot_type):
|
def compute_reboot(self, context, instance, reboot_type):
|
||||||
self.compute_api.reboot(context, instance, reboot_type)
|
self.compute_api.reboot(context, instance, reboot_type)
|
||||||
|
|
||||||
|
def object_backport(self, context, objinst, target_version):
|
||||||
|
return objinst.obj_to_primitive(target_version=target_version)
|
||||||
|
|
||||||
|
|
||||||
class ComputeTaskManager(base.Base):
|
class ComputeTaskManager(base.Base):
|
||||||
"""Namespace for compute methods.
|
"""Namespace for compute methods.
|
||||||
|
|
|
@ -120,6 +120,7 @@ class ConductorAPI(rpcclient.RpcProxy):
|
||||||
... - Remove security_group_get_by_instance() and
|
... - Remove security_group_get_by_instance() and
|
||||||
security_group_rule_get_by_security_group()
|
security_group_rule_get_by_security_group()
|
||||||
1.61 - Return deleted instance from instance_destroy()
|
1.61 - Return deleted instance from instance_destroy()
|
||||||
|
1.62 - Added object_backport()
|
||||||
"""
|
"""
|
||||||
|
|
||||||
BASE_RPC_API_VERSION = '1.0'
|
BASE_RPC_API_VERSION = '1.0'
|
||||||
|
@ -473,6 +474,11 @@ class ConductorAPI(rpcclient.RpcProxy):
|
||||||
return cctxt.call(context, 'object_action', objinst=objinst,
|
return cctxt.call(context, 'object_action', objinst=objinst,
|
||||||
objmethod=objmethod, args=args, kwargs=kwargs)
|
objmethod=objmethod, args=args, kwargs=kwargs)
|
||||||
|
|
||||||
|
def object_backport(self, context, objinst, target_version):
|
||||||
|
cctxt = self.client.prepare(version='1.62')
|
||||||
|
return cctxt.call(context, 'object_backport', objinst=objinst,
|
||||||
|
target_version=target_version)
|
||||||
|
|
||||||
|
|
||||||
class ComputeTaskAPI(rpcclient.RpcProxy):
|
class ComputeTaskAPI(rpcclient.RpcProxy):
|
||||||
"""Client side of the conductor 'compute' namespaced RPC API
|
"""Client side of the conductor 'compute' namespaced RPC API
|
||||||
|
|
|
@ -201,19 +201,28 @@ class NovaObject(object):
|
||||||
'%(objtype)s') % dict(objtype=objname))
|
'%(objtype)s') % dict(objtype=objname))
|
||||||
raise exception.UnsupportedObjectError(objtype=objname)
|
raise exception.UnsupportedObjectError(objtype=objname)
|
||||||
|
|
||||||
|
latest = None
|
||||||
compatible_match = None
|
compatible_match = None
|
||||||
for objclass in cls._obj_classes[objname]:
|
for objclass in cls._obj_classes[objname]:
|
||||||
if objclass.VERSION == objver:
|
if objclass.VERSION == objver:
|
||||||
return objclass
|
return objclass
|
||||||
|
|
||||||
|
version_bits = tuple([int(x) for x in objclass.VERSION.split(".")])
|
||||||
|
if latest is None:
|
||||||
|
latest = version_bits
|
||||||
|
elif latest < version_bits:
|
||||||
|
latest = version_bits
|
||||||
|
|
||||||
if versionutils.is_compatible(objver, objclass.VERSION):
|
if versionutils.is_compatible(objver, objclass.VERSION):
|
||||||
compatible_match = objclass
|
compatible_match = objclass
|
||||||
|
|
||||||
if compatible_match:
|
if compatible_match:
|
||||||
return compatible_match
|
return compatible_match
|
||||||
|
|
||||||
|
latest_ver = '%i.%i' % latest
|
||||||
raise exception.IncompatibleObjectVersion(objname=objname,
|
raise exception.IncompatibleObjectVersion(objname=objname,
|
||||||
objver=objver)
|
objver=objver,
|
||||||
|
supported=latest_ver)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def obj_from_primitive(cls, primitive, context=None):
|
def obj_from_primitive(cls, primitive, context=None):
|
||||||
|
@ -501,6 +510,22 @@ class NovaObjectSerializer(nova.openstack.common.rpc.serializer.Serializer):
|
||||||
that needs to accept or return NovaObjects as arguments or result values
|
that needs to accept or return NovaObjects as arguments or result values
|
||||||
should pass this to its RpcProxy and RpcDispatcher objects.
|
should pass this to its RpcProxy and RpcDispatcher objects.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def conductor(self):
|
||||||
|
if not hasattr(self, '_conductor'):
|
||||||
|
from nova.conductor import api as conductor_api
|
||||||
|
self._conductor = conductor_api.API()
|
||||||
|
return self._conductor
|
||||||
|
|
||||||
|
def _process_object(self, context, objprim):
|
||||||
|
try:
|
||||||
|
objinst = NovaObject.obj_from_primitive(objprim, context=context)
|
||||||
|
except exception.IncompatibleObjectVersion as e:
|
||||||
|
objinst = self.conductor.object_backport(context, objprim,
|
||||||
|
e.kwargs['supported'])
|
||||||
|
return objinst
|
||||||
|
|
||||||
def _process_iterable(self, context, action_fn, values):
|
def _process_iterable(self, context, action_fn, values):
|
||||||
"""Process an iterable, taking an action on each value.
|
"""Process an iterable, taking an action on each value.
|
||||||
:param:context: Request context
|
:param:context: Request context
|
||||||
|
@ -528,7 +553,7 @@ class NovaObjectSerializer(nova.openstack.common.rpc.serializer.Serializer):
|
||||||
|
|
||||||
def deserialize_entity(self, context, entity):
|
def deserialize_entity(self, context, entity):
|
||||||
if isinstance(entity, dict) and 'nova_object.name' in entity:
|
if isinstance(entity, dict) and 'nova_object.name' in entity:
|
||||||
entity = NovaObject.obj_from_primitive(entity, context=context)
|
entity = self._process_object(context, entity)
|
||||||
elif isinstance(entity, (tuple, list, set)):
|
elif isinstance(entity, (tuple, list, set)):
|
||||||
entity = self._process_iterable(context, self.deserialize_entity,
|
entity = self._process_iterable(context, self.deserialize_entity,
|
||||||
entity)
|
entity)
|
||||||
|
|
|
@ -16,6 +16,7 @@ import contextlib
|
||||||
import datetime
|
import datetime
|
||||||
import iso8601
|
import iso8601
|
||||||
|
|
||||||
|
import mock
|
||||||
import netaddr
|
import netaddr
|
||||||
import six
|
import six
|
||||||
from testtools import matchers
|
from testtools import matchers
|
||||||
|
@ -498,6 +499,16 @@ class _TestObject(object):
|
||||||
self.assertRaises(exception.UnsupportedObjectError,
|
self.assertRaises(exception.UnsupportedObjectError,
|
||||||
base.NovaObject.obj_class_from_name, 'foo', '1.0')
|
base.NovaObject.obj_class_from_name, 'foo', '1.0')
|
||||||
|
|
||||||
|
def test_obj_class_from_name_supported_version(self):
|
||||||
|
error = None
|
||||||
|
try:
|
||||||
|
base.NovaObject.obj_class_from_name('MyObj', '1.25')
|
||||||
|
except exception.IncompatibleObjectVersion as error:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self.assertIsNotNone(error)
|
||||||
|
self.assertEqual('1.5', error.kwargs['supported'])
|
||||||
|
|
||||||
def test_with_alternate_context(self):
|
def test_with_alternate_context(self):
|
||||||
ctxt1 = context.RequestContext('foo', 'foo')
|
ctxt1 = context.RequestContext('foo', 'foo')
|
||||||
ctxt2 = context.RequestContext('bar', 'alternate')
|
ctxt2 = context.RequestContext('bar', 'alternate')
|
||||||
|
@ -754,6 +765,19 @@ class TestObjectSerializer(_BaseTestCase):
|
||||||
for thing in (1, 'foo', [1, 2], {'foo': 'bar'}):
|
for thing in (1, 'foo', [1, 2], {'foo': 'bar'}):
|
||||||
self.assertEqual(thing, ser.deserialize_entity(None, thing))
|
self.assertEqual(thing, ser.deserialize_entity(None, thing))
|
||||||
|
|
||||||
|
def test_deserialize_entity_newer_version(self):
|
||||||
|
ser = base.NovaObjectSerializer()
|
||||||
|
ser._conductor = mock.Mock()
|
||||||
|
ser._conductor.object_backport.return_value = 'backported'
|
||||||
|
obj = MyObj()
|
||||||
|
obj.VERSION = '1.25'
|
||||||
|
primitive = obj.obj_to_primitive()
|
||||||
|
result = ser.deserialize_entity(self.context, primitive)
|
||||||
|
self.assertEqual('backported', result)
|
||||||
|
ser._conductor.object_backport.assert_called_with(self.context,
|
||||||
|
primitive,
|
||||||
|
'1.5')
|
||||||
|
|
||||||
def test_object_serialization(self):
|
def test_object_serialization(self):
|
||||||
ser = base.NovaObjectSerializer()
|
ser = base.NovaObjectSerializer()
|
||||||
obj = MyObj()
|
obj = MyObj()
|
||||||
|
|
Loading…
Reference in New Issue