Indirection API implementation
When an object is received in an incompatible version, IncompatibleObjectVersion is raised. Implementation of the indirection API allows the object to be backported to a supported version by the conductor. Related to blueprint versioned-objects-indirection-api Change-Id: I99fe686b4b4e497be6b5d35a1d2e41833865799a
This commit is contained in:
parent
4a6ac54bdd
commit
2ed6b128d4
|
@ -26,6 +26,7 @@ from oslo_reports import guru_meditation_report as gmr
|
||||||
from magnum.api import app as api_app
|
from magnum.api import app as api_app
|
||||||
from magnum.common import service
|
from magnum.common import service
|
||||||
from magnum.i18n import _LI
|
from magnum.i18n import _LI
|
||||||
|
from magnum.objects import base
|
||||||
from magnum import version
|
from magnum import version
|
||||||
|
|
||||||
|
|
||||||
|
@ -37,6 +38,9 @@ def main():
|
||||||
|
|
||||||
gmr.TextGuruMeditation.setup_autorun(version)
|
gmr.TextGuruMeditation.setup_autorun(version)
|
||||||
|
|
||||||
|
# Enable object backporting via the conductor
|
||||||
|
base.MagnumObject.indirection_api = base.MagnumObjectIndirectionAPI()
|
||||||
|
|
||||||
app = api_app.setup_app()
|
app = api_app.setup_app()
|
||||||
|
|
||||||
# Create the WSGI server and start it
|
# Create the WSGI server and start it
|
||||||
|
|
|
@ -29,6 +29,7 @@ from magnum.common import short_id
|
||||||
from magnum.conductor.handlers import bay_conductor
|
from magnum.conductor.handlers import bay_conductor
|
||||||
from magnum.conductor.handlers import conductor_listener
|
from magnum.conductor.handlers import conductor_listener
|
||||||
from magnum.conductor.handlers import docker_conductor
|
from magnum.conductor.handlers import docker_conductor
|
||||||
|
from magnum.conductor.handlers import indirection_api
|
||||||
from magnum.conductor.handlers import k8s_conductor
|
from magnum.conductor.handlers import k8s_conductor
|
||||||
from magnum.conductor.handlers import x509keypair_conductor
|
from magnum.conductor.handlers import x509keypair_conductor
|
||||||
from magnum.i18n import _LE
|
from magnum.i18n import _LE
|
||||||
|
@ -51,6 +52,7 @@ def main():
|
||||||
|
|
||||||
conductor_id = short_id.generate_id()
|
conductor_id = short_id.generate_id()
|
||||||
endpoints = [
|
endpoints = [
|
||||||
|
indirection_api.Handler(),
|
||||||
docker_conductor.Handler(),
|
docker_conductor.Handler(),
|
||||||
k8s_conductor.Handler(),
|
k8s_conductor.Handler(),
|
||||||
bay_conductor.Handler(),
|
bay_conductor.Handler(),
|
||||||
|
|
|
@ -153,6 +153,7 @@ class API(rpc_service.API):
|
||||||
command=command)
|
command=command)
|
||||||
|
|
||||||
# X509KeyPair Operations
|
# X509KeyPair Operations
|
||||||
|
|
||||||
def x509keypair_create(self, x509keypair):
|
def x509keypair_create(self, x509keypair):
|
||||||
return self._call('x509keypair_create', x509keypair=x509keypair)
|
return self._call('x509keypair_create', x509keypair=x509keypair)
|
||||||
|
|
||||||
|
@ -163,6 +164,25 @@ class API(rpc_service.API):
|
||||||
return objects.X509KeyPair.list(context, limit, marker,
|
return objects.X509KeyPair.list(context, limit, marker,
|
||||||
sort_key, sort_dir)
|
sort_key, sort_dir)
|
||||||
|
|
||||||
|
# Versioned Objects indirection API
|
||||||
|
|
||||||
|
def object_class_action(self, context, objname, objmethod, objver,
|
||||||
|
args, kwargs):
|
||||||
|
"Indirection API callback"
|
||||||
|
return self._client.call(context, 'object_class_action',
|
||||||
|
objname=objname, objmethod=objmethod,
|
||||||
|
objver=objver, args=args, kwargs=kwargs)
|
||||||
|
|
||||||
|
def object_action(self, context, objinst, objmethod, args, kwargs):
|
||||||
|
"Indirection API callback"
|
||||||
|
return self._client.call(context, 'object_action', objinst=objinst,
|
||||||
|
objmethod=objmethod, args=args, kwargs=kwargs)
|
||||||
|
|
||||||
|
def object_backport(self, context, objinst, target_version):
|
||||||
|
"Indirection API callback"
|
||||||
|
return self._client.call(context, 'object_backport', objinst=objinst,
|
||||||
|
target_version=target_version)
|
||||||
|
|
||||||
|
|
||||||
class ListenerAPI(rpc_service.API):
|
class ListenerAPI(rpc_service.API):
|
||||||
def __init__(self, context=None, topic=None, server=None, timeout=None):
|
def __init__(self, context=None, topic=None, server=None, timeout=None):
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import oslo_messaging as messaging
|
||||||
|
|
||||||
|
from magnum.objects import base
|
||||||
|
|
||||||
|
|
||||||
|
class Handler(object):
|
||||||
|
"Indirection API callbacks"
|
||||||
|
|
||||||
|
def _object_dispatch(self, target, method, context, args, kwargs):
|
||||||
|
"""Dispatch a call to an object method.
|
||||||
|
|
||||||
|
This ensures that object methods get called and any exception
|
||||||
|
that is raised gets wrapped in an ExpectedException for forwarding
|
||||||
|
back to the caller (without spamming the conductor logs).
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# NOTE(danms): Keep the getattr inside the try block since
|
||||||
|
# a missing method is really a client problem
|
||||||
|
return getattr(target, method)(context, *args, **kwargs)
|
||||||
|
except Exception:
|
||||||
|
raise messaging.ExpectedException()
|
||||||
|
|
||||||
|
def object_class_action(self, context, objname, objmethod,
|
||||||
|
objver, args, kwargs):
|
||||||
|
"""Perform a classmethod action on an object."""
|
||||||
|
objclass = base.MagnumObject.obj_class_from_name(objname, objver)
|
||||||
|
result = self._object_dispatch(objclass, objmethod, context,
|
||||||
|
args, kwargs)
|
||||||
|
# NOTE(danms): The RPC layer will convert to primitives for us,
|
||||||
|
# but in this case, we need to honor the version the client is
|
||||||
|
# asking for, so we do it before returning here.
|
||||||
|
return (result.obj_to_primitive(target_version=objver)
|
||||||
|
if isinstance(result, base.MagnumObject) else result)
|
||||||
|
|
||||||
|
def object_action(self, context, objinst, objmethod, args, kwargs):
|
||||||
|
"""Perform an action on an object."""
|
||||||
|
old_objinst = objinst.obj_clone()
|
||||||
|
result = self._object_dispatch(objinst, objmethod, context,
|
||||||
|
args, kwargs)
|
||||||
|
updates = dict()
|
||||||
|
# NOTE(danms): Diff the object with the one passed to us and
|
||||||
|
# generate a list of changes to forward back
|
||||||
|
for name, field in objinst.fields.items():
|
||||||
|
if not objinst.obj_attr_is_set(name):
|
||||||
|
# Avoid demand-loading anything
|
||||||
|
continue
|
||||||
|
if (not old_objinst.obj_attr_is_set(name) or
|
||||||
|
getattr(old_objinst, name) != getattr(objinst, name)):
|
||||||
|
updates[name] = field.to_primitive(objinst, name,
|
||||||
|
getattr(objinst, name))
|
||||||
|
updates['obj_what_changed'] = objinst.obj_what_changed()
|
||||||
|
return updates, result
|
||||||
|
|
||||||
|
def object_backport(self, context, objinst, target_version):
|
||||||
|
return objinst.obj_to_primitive(target_version=target_version)
|
|
@ -62,6 +62,26 @@ class MagnumPersistentObject(object):
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class MagnumObjectIndirectionAPI(ovoo_base.VersionedObjectIndirectionAPI):
|
||||||
|
def __init__(self):
|
||||||
|
super(MagnumObjectIndirectionAPI, self).__init__()
|
||||||
|
from magnum.conductor import api as conductor_api
|
||||||
|
self._conductor = conductor_api.API()
|
||||||
|
|
||||||
|
def object_action(self, context, objinst, objmethod, args, kwargs):
|
||||||
|
return self._conductor.object_action(context, objinst, objmethod,
|
||||||
|
args, kwargs)
|
||||||
|
|
||||||
|
def object_class_action(self, context, objname, objmethod, objver,
|
||||||
|
args, kwargs):
|
||||||
|
return self._conductor.object_class_action(context, objname, objmethod,
|
||||||
|
objver, args, kwargs)
|
||||||
|
|
||||||
|
def object_backport(self, context, objinst, target_version):
|
||||||
|
return self._conductor.object_backport(context, objinst,
|
||||||
|
target_version)
|
||||||
|
|
||||||
|
|
||||||
class MagnumObjectSerializer(ovoo_base.VersionedObjectSerializer):
|
class MagnumObjectSerializer(ovoo_base.VersionedObjectSerializer):
|
||||||
# Base class to use for object hydration
|
# Base class to use for object hydration
|
||||||
OBJ_BASE_CLASS = MagnumObject
|
OBJ_BASE_CLASS = MagnumObject
|
||||||
|
|
|
@ -0,0 +1,87 @@
|
||||||
|
# Copyright 2015 NEC Corporation. All rights reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import oslo_messaging as messaging
|
||||||
|
from oslo_versionedobjects import fields
|
||||||
|
|
||||||
|
from magnum.conductor.handlers import indirection_api
|
||||||
|
from magnum.objects import base as obj_base
|
||||||
|
from magnum.tests import base
|
||||||
|
|
||||||
|
|
||||||
|
class TestIndirectionApiConductor(base.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestIndirectionApiConductor, self).setUp()
|
||||||
|
self.conductor = indirection_api.Handler()
|
||||||
|
|
||||||
|
def _test_object_action(self, is_classmethod, raise_exception):
|
||||||
|
@obj_base.MagnumObjectRegistry.register
|
||||||
|
class TestObject(obj_base.MagnumObject):
|
||||||
|
def foo(self, context, raise_exception=False):
|
||||||
|
if raise_exception:
|
||||||
|
raise Exception('test')
|
||||||
|
else:
|
||||||
|
return 'test'
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def bar(cls, context, raise_exception=False):
|
||||||
|
if raise_exception:
|
||||||
|
raise Exception('test')
|
||||||
|
else:
|
||||||
|
return 'test'
|
||||||
|
|
||||||
|
obj = TestObject()
|
||||||
|
if is_classmethod:
|
||||||
|
result = self.conductor.object_class_action(
|
||||||
|
self.context, TestObject.obj_name(), 'bar', '1.0',
|
||||||
|
tuple(), {'raise_exception': raise_exception})
|
||||||
|
else:
|
||||||
|
updates, result = self.conductor.object_action(
|
||||||
|
self.context, obj, 'foo', tuple(),
|
||||||
|
{'raise_exception': raise_exception})
|
||||||
|
self.assertEqual('test', result)
|
||||||
|
|
||||||
|
def test_object_action(self):
|
||||||
|
self._test_object_action(False, False)
|
||||||
|
|
||||||
|
def test_object_action_on_raise(self):
|
||||||
|
self.assertRaises(messaging.ExpectedException,
|
||||||
|
self._test_object_action, False, True)
|
||||||
|
|
||||||
|
def test_object_class_action(self):
|
||||||
|
self._test_object_action(True, False)
|
||||||
|
|
||||||
|
def test_object_class_action_on_raise(self):
|
||||||
|
self.assertRaises(messaging.ExpectedException,
|
||||||
|
self._test_object_action, True, True)
|
||||||
|
|
||||||
|
def test_object_action_copies_object(self):
|
||||||
|
@obj_base.MagnumObjectRegistry.register
|
||||||
|
class TestObject(obj_base.MagnumObject):
|
||||||
|
fields = {'dict': fields.DictOfStringsField()}
|
||||||
|
|
||||||
|
def touch_dict(self, context):
|
||||||
|
self.dict['foo'] = 'bar'
|
||||||
|
self.obj_reset_changes()
|
||||||
|
|
||||||
|
obj = TestObject()
|
||||||
|
obj.dict = {}
|
||||||
|
obj.obj_reset_changes()
|
||||||
|
updates, result = self.conductor.object_action(
|
||||||
|
self.context, obj, 'touch_dict', tuple(), {})
|
||||||
|
# NOTE(danms): If conductor did not properly copy the object, then
|
||||||
|
# the new and reference copies of the nested dict object will be
|
||||||
|
# the same, and thus 'dict' will not be reported as changed
|
||||||
|
self.assertIn('dict', updates)
|
||||||
|
self.assertEqual({'foo': 'bar'}, updates['dict'])
|
|
@ -16,6 +16,7 @@ import datetime
|
||||||
import gettext
|
import gettext
|
||||||
|
|
||||||
import iso8601
|
import iso8601
|
||||||
|
import mock
|
||||||
import netaddr
|
import netaddr
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
from oslo_versionedobjects import fields
|
from oslo_versionedobjects import fields
|
||||||
|
@ -438,3 +439,51 @@ class TestObjectSerializer(test_base.TestCase):
|
||||||
self.assertEqual(1, len(thing2))
|
self.assertEqual(1, len(thing2))
|
||||||
for item in thing2:
|
for item in thing2:
|
||||||
self.assertIsInstance(item, MyObj)
|
self.assertIsInstance(item, MyObj)
|
||||||
|
|
||||||
|
@mock.patch('magnum.objects.base.MagnumObject.indirection_api')
|
||||||
|
def _test_deserialize_entity_newer(self, obj_version, backported_to,
|
||||||
|
mock_indirection_api,
|
||||||
|
my_version='1.6'):
|
||||||
|
ser = base.MagnumObjectSerializer()
|
||||||
|
mock_indirection_api.object_backport_versions.side_effect \
|
||||||
|
= NotImplementedError()
|
||||||
|
mock_indirection_api.object_backport.return_value = 'backported'
|
||||||
|
|
||||||
|
@base.MagnumObjectRegistry.register
|
||||||
|
class MyTestObj(MyObj):
|
||||||
|
VERSION = my_version
|
||||||
|
|
||||||
|
obj = MyTestObj()
|
||||||
|
obj.VERSION = obj_version
|
||||||
|
primitive = obj.obj_to_primitive()
|
||||||
|
result = ser.deserialize_entity(self.context, primitive)
|
||||||
|
if backported_to is None:
|
||||||
|
self.assertFalse(mock_indirection_api.object_backport.called)
|
||||||
|
else:
|
||||||
|
self.assertEqual('backported', result)
|
||||||
|
mock_indirection_api.object_backport.assert_called_with(
|
||||||
|
self.context, primitive, backported_to)
|
||||||
|
|
||||||
|
def test_deserialize_entity_newer_version_backports_level1(self):
|
||||||
|
"Test object with unsupported (newer) version"
|
||||||
|
self._test_deserialize_entity_newer('11.5', '1.6')
|
||||||
|
|
||||||
|
def test_deserialize_entity_newer_version_backports_level2(self):
|
||||||
|
"Test object with unsupported (newer) version"
|
||||||
|
self._test_deserialize_entity_newer('1.25', '1.6')
|
||||||
|
|
||||||
|
def test_deserialize_entity_same_revision_does_not_backport(self):
|
||||||
|
"Test object with supported revision"
|
||||||
|
self._test_deserialize_entity_newer('1.6', None)
|
||||||
|
|
||||||
|
def test_deserialize_entity_newer_revision_does_not_backport_zero(self):
|
||||||
|
"Test object with supported revision"
|
||||||
|
self._test_deserialize_entity_newer('1.6.0', None)
|
||||||
|
|
||||||
|
def test_deserialize_entity_newer_revision_does_not_backport(self):
|
||||||
|
"Test object with supported (newer) revision"
|
||||||
|
self._test_deserialize_entity_newer('1.6.1', None)
|
||||||
|
|
||||||
|
def test_deserialize_entity_newer_version_passes_revision(self):
|
||||||
|
"Test object with unsupported (newer) version and revision"
|
||||||
|
self._test_deserialize_entity_newer('1.7', '1.6.1', my_version='1.6.1')
|
||||||
|
|
Loading…
Reference in New Issue