Merge "Indirection API implementation"
This commit is contained in:
commit
82d961831f
|
@ -26,6 +26,7 @@ from oslo_reports import guru_meditation_report as gmr
|
|||
from magnum.api import app as api_app
|
||||
from magnum.common import service
|
||||
from magnum.i18n import _LI
|
||||
from magnum.objects import base
|
||||
from magnum import version
|
||||
|
||||
|
||||
|
@ -37,6 +38,9 @@ def main():
|
|||
|
||||
gmr.TextGuruMeditation.setup_autorun(version)
|
||||
|
||||
# Enable object backporting via the conductor
|
||||
base.MagnumObject.indirection_api = base.MagnumObjectIndirectionAPI()
|
||||
|
||||
app = api_app.setup_app()
|
||||
|
||||
# Create the WSGI server and start it
|
||||
|
|
|
@ -30,6 +30,7 @@ from magnum.conductor.handlers import bay_conductor
|
|||
from magnum.conductor.handlers import ca_conductor
|
||||
from magnum.conductor.handlers import conductor_listener
|
||||
from magnum.conductor.handlers import docker_conductor
|
||||
from magnum.conductor.handlers import indirection_api
|
||||
from magnum.conductor.handlers import k8s_conductor
|
||||
from magnum.conductor.handlers import x509keypair_conductor
|
||||
from magnum.i18n import _LE
|
||||
|
@ -52,6 +53,7 @@ def main():
|
|||
|
||||
conductor_id = short_id.generate_id()
|
||||
endpoints = [
|
||||
indirection_api.Handler(),
|
||||
docker_conductor.Handler(),
|
||||
k8s_conductor.Handler(),
|
||||
bay_conductor.Handler(),
|
||||
|
|
|
@ -153,6 +153,7 @@ class API(rpc_service.API):
|
|||
command=command)
|
||||
|
||||
# X509KeyPair Operations
|
||||
|
||||
def x509keypair_create(self, x509keypair):
|
||||
return self._call('x509keypair_create', x509keypair=x509keypair)
|
||||
|
||||
|
@ -170,6 +171,25 @@ class API(rpc_service.API):
|
|||
def get_ca_certificate(self, bay):
|
||||
return self._call('get_ca_certificate', bay=bay)
|
||||
|
||||
# Versioned Objects indirection API
|
||||
|
||||
def object_class_action(self, context, objname, objmethod, objver,
|
||||
args, kwargs):
|
||||
"Indirection API callback"
|
||||
return self._client.call(context, 'object_class_action',
|
||||
objname=objname, objmethod=objmethod,
|
||||
objver=objver, args=args, kwargs=kwargs)
|
||||
|
||||
def object_action(self, context, objinst, objmethod, args, kwargs):
|
||||
"Indirection API callback"
|
||||
return self._client.call(context, 'object_action', objinst=objinst,
|
||||
objmethod=objmethod, args=args, kwargs=kwargs)
|
||||
|
||||
def object_backport(self, context, objinst, target_version):
|
||||
"Indirection API callback"
|
||||
return self._client.call(context, 'object_backport', objinst=objinst,
|
||||
target_version=target_version)
|
||||
|
||||
|
||||
class ListenerAPI(rpc_service.API):
|
||||
def __init__(self, context=None, topic=None, server=None, timeout=None):
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import oslo_messaging as messaging
|
||||
|
||||
from magnum.objects import base
|
||||
|
||||
|
||||
class Handler(object):
|
||||
"Indirection API callbacks"
|
||||
|
||||
def _object_dispatch(self, target, method, context, args, kwargs):
|
||||
"""Dispatch a call to an object method.
|
||||
|
||||
This ensures that object methods get called and any exception
|
||||
that is raised gets wrapped in an ExpectedException for forwarding
|
||||
back to the caller (without spamming the conductor logs).
|
||||
"""
|
||||
try:
|
||||
# NOTE(danms): Keep the getattr inside the try block since
|
||||
# a missing method is really a client problem
|
||||
return getattr(target, method)(context, *args, **kwargs)
|
||||
except Exception:
|
||||
raise messaging.ExpectedException()
|
||||
|
||||
def object_class_action(self, context, objname, objmethod,
|
||||
objver, args, kwargs):
|
||||
"""Perform a classmethod action on an object."""
|
||||
objclass = base.MagnumObject.obj_class_from_name(objname, objver)
|
||||
result = self._object_dispatch(objclass, objmethod, context,
|
||||
args, kwargs)
|
||||
# NOTE(danms): The RPC layer will convert to primitives for us,
|
||||
# but in this case, we need to honor the version the client is
|
||||
# asking for, so we do it before returning here.
|
||||
return (result.obj_to_primitive(target_version=objver)
|
||||
if isinstance(result, base.MagnumObject) else result)
|
||||
|
||||
def object_action(self, context, objinst, objmethod, args, kwargs):
|
||||
"""Perform an action on an object."""
|
||||
old_objinst = objinst.obj_clone()
|
||||
result = self._object_dispatch(objinst, objmethod, context,
|
||||
args, kwargs)
|
||||
updates = dict()
|
||||
# NOTE(danms): Diff the object with the one passed to us and
|
||||
# generate a list of changes to forward back
|
||||
for name, field in objinst.fields.items():
|
||||
if not objinst.obj_attr_is_set(name):
|
||||
# Avoid demand-loading anything
|
||||
continue
|
||||
if (not old_objinst.obj_attr_is_set(name) or
|
||||
getattr(old_objinst, name) != getattr(objinst, name)):
|
||||
updates[name] = field.to_primitive(objinst, name,
|
||||
getattr(objinst, name))
|
||||
updates['obj_what_changed'] = objinst.obj_what_changed()
|
||||
return updates, result
|
||||
|
||||
def object_backport(self, context, objinst, target_version):
|
||||
return objinst.obj_to_primitive(target_version=target_version)
|
|
@ -62,6 +62,26 @@ class MagnumPersistentObject(object):
|
|||
}
|
||||
|
||||
|
||||
class MagnumObjectIndirectionAPI(ovoo_base.VersionedObjectIndirectionAPI):
|
||||
def __init__(self):
|
||||
super(MagnumObjectIndirectionAPI, self).__init__()
|
||||
from magnum.conductor import api as conductor_api
|
||||
self._conductor = conductor_api.API()
|
||||
|
||||
def object_action(self, context, objinst, objmethod, args, kwargs):
|
||||
return self._conductor.object_action(context, objinst, objmethod,
|
||||
args, kwargs)
|
||||
|
||||
def object_class_action(self, context, objname, objmethod, objver,
|
||||
args, kwargs):
|
||||
return self._conductor.object_class_action(context, objname, objmethod,
|
||||
objver, args, kwargs)
|
||||
|
||||
def object_backport(self, context, objinst, target_version):
|
||||
return self._conductor.object_backport(context, objinst,
|
||||
target_version)
|
||||
|
||||
|
||||
class MagnumObjectSerializer(ovoo_base.VersionedObjectSerializer):
|
||||
# Base class to use for object hydration
|
||||
OBJ_BASE_CLASS = MagnumObject
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
# Copyright 2015 NEC Corporation. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import oslo_messaging as messaging
|
||||
from oslo_versionedobjects import fields
|
||||
|
||||
from magnum.conductor.handlers import indirection_api
|
||||
from magnum.objects import base as obj_base
|
||||
from magnum.tests import base
|
||||
|
||||
|
||||
class TestIndirectionApiConductor(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestIndirectionApiConductor, self).setUp()
|
||||
self.conductor = indirection_api.Handler()
|
||||
|
||||
def _test_object_action(self, is_classmethod, raise_exception):
|
||||
@obj_base.MagnumObjectRegistry.register
|
||||
class TestObject(obj_base.MagnumObject):
|
||||
def foo(self, context, raise_exception=False):
|
||||
if raise_exception:
|
||||
raise Exception('test')
|
||||
else:
|
||||
return 'test'
|
||||
|
||||
@classmethod
|
||||
def bar(cls, context, raise_exception=False):
|
||||
if raise_exception:
|
||||
raise Exception('test')
|
||||
else:
|
||||
return 'test'
|
||||
|
||||
obj = TestObject()
|
||||
if is_classmethod:
|
||||
result = self.conductor.object_class_action(
|
||||
self.context, TestObject.obj_name(), 'bar', '1.0',
|
||||
tuple(), {'raise_exception': raise_exception})
|
||||
else:
|
||||
updates, result = self.conductor.object_action(
|
||||
self.context, obj, 'foo', tuple(),
|
||||
{'raise_exception': raise_exception})
|
||||
self.assertEqual('test', result)
|
||||
|
||||
def test_object_action(self):
|
||||
self._test_object_action(False, False)
|
||||
|
||||
def test_object_action_on_raise(self):
|
||||
self.assertRaises(messaging.ExpectedException,
|
||||
self._test_object_action, False, True)
|
||||
|
||||
def test_object_class_action(self):
|
||||
self._test_object_action(True, False)
|
||||
|
||||
def test_object_class_action_on_raise(self):
|
||||
self.assertRaises(messaging.ExpectedException,
|
||||
self._test_object_action, True, True)
|
||||
|
||||
def test_object_action_copies_object(self):
|
||||
@obj_base.MagnumObjectRegistry.register
|
||||
class TestObject(obj_base.MagnumObject):
|
||||
fields = {'dict': fields.DictOfStringsField()}
|
||||
|
||||
def touch_dict(self, context):
|
||||
self.dict['foo'] = 'bar'
|
||||
self.obj_reset_changes()
|
||||
|
||||
obj = TestObject()
|
||||
obj.dict = {}
|
||||
obj.obj_reset_changes()
|
||||
updates, result = self.conductor.object_action(
|
||||
self.context, obj, 'touch_dict', tuple(), {})
|
||||
# NOTE(danms): If conductor did not properly copy the object, then
|
||||
# the new and reference copies of the nested dict object will be
|
||||
# the same, and thus 'dict' will not be reported as changed
|
||||
self.assertIn('dict', updates)
|
||||
self.assertEqual({'foo': 'bar'}, updates['dict'])
|
|
@ -16,6 +16,7 @@ import datetime
|
|||
import gettext
|
||||
|
||||
import iso8601
|
||||
import mock
|
||||
import netaddr
|
||||
from oslo_utils import timeutils
|
||||
from oslo_versionedobjects import fields
|
||||
|
@ -477,3 +478,51 @@ class TestObjectSerializer(test_base.TestCase):
|
|||
self.assertEqual(1, len(thing2))
|
||||
for item in thing2:
|
||||
self.assertIsInstance(item, MyObj)
|
||||
|
||||
@mock.patch('magnum.objects.base.MagnumObject.indirection_api')
|
||||
def _test_deserialize_entity_newer(self, obj_version, backported_to,
|
||||
mock_indirection_api,
|
||||
my_version='1.6'):
|
||||
ser = base.MagnumObjectSerializer()
|
||||
mock_indirection_api.object_backport_versions.side_effect \
|
||||
= NotImplementedError()
|
||||
mock_indirection_api.object_backport.return_value = 'backported'
|
||||
|
||||
@base.MagnumObjectRegistry.register
|
||||
class MyTestObj(MyObj):
|
||||
VERSION = my_version
|
||||
|
||||
obj = MyTestObj()
|
||||
obj.VERSION = obj_version
|
||||
primitive = obj.obj_to_primitive()
|
||||
result = ser.deserialize_entity(self.context, primitive)
|
||||
if backported_to is None:
|
||||
self.assertFalse(mock_indirection_api.object_backport.called)
|
||||
else:
|
||||
self.assertEqual('backported', result)
|
||||
mock_indirection_api.object_backport.assert_called_with(
|
||||
self.context, primitive, backported_to)
|
||||
|
||||
def test_deserialize_entity_newer_version_backports_level1(self):
|
||||
"Test object with unsupported (newer) version"
|
||||
self._test_deserialize_entity_newer('11.5', '1.6')
|
||||
|
||||
def test_deserialize_entity_newer_version_backports_level2(self):
|
||||
"Test object with unsupported (newer) version"
|
||||
self._test_deserialize_entity_newer('1.25', '1.6')
|
||||
|
||||
def test_deserialize_entity_same_revision_does_not_backport(self):
|
||||
"Test object with supported revision"
|
||||
self._test_deserialize_entity_newer('1.6', None)
|
||||
|
||||
def test_deserialize_entity_newer_revision_does_not_backport_zero(self):
|
||||
"Test object with supported revision"
|
||||
self._test_deserialize_entity_newer('1.6.0', None)
|
||||
|
||||
def test_deserialize_entity_newer_revision_does_not_backport(self):
|
||||
"Test object with supported (newer) revision"
|
||||
self._test_deserialize_entity_newer('1.6.1', None)
|
||||
|
||||
def test_deserialize_entity_newer_version_passes_revision(self):
|
||||
"Test object with unsupported (newer) version and revision"
|
||||
self._test_deserialize_entity_newer('1.7', '1.6.1', my_version='1.6.1')
|
||||
|
|
Loading…
Reference in New Issue