magnum/magnum/tests/objects/test_objects.py

595 lines
21 KiB
Python

# Copyright 2015 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import gettext
import iso8601
import netaddr
from oslo.utils import timeutils
import six
from magnum.common import context
from magnum.common import exception
from magnum.objects import base
from magnum.objects import utils
from magnum.tests import base as test_base
gettext.install('magnum')
class MyObj(base.MagnumObject):
VERSION = '1.0'
fields = {'foo': int,
'bar': str,
'missing': str,
}
def obj_load_attr(self, attrname):
setattr(self, attrname, 'loaded!')
@base.remotable_classmethod
def query(cls, context):
obj = cls(context)
obj.foo = 1
obj.bar = 'bar'
obj.obj_reset_changes()
return obj
@base.remotable
def marco(self, context):
return 'polo'
@base.remotable
def update_test(self, context):
if context.project_id == 'alternate':
self.bar = 'alternate-context'
else:
self.bar = 'updated'
@base.remotable
def save(self, context):
self.obj_reset_changes()
@base.remotable
def refresh(self, context):
self.foo = 321
self.bar = 'refreshed'
self.obj_reset_changes()
@base.remotable
def modify_save_modify(self, context):
self.bar = 'meow'
self.save()
self.foo = 42
class MyObj2(object):
@classmethod
def obj_name(cls):
return 'MyObj'
@base.remotable_classmethod
def get(cls, *args, **kwargs):
pass
class TestSubclassedObject(MyObj):
fields = {'new_field': str}
class TestMetaclass(test_base.TestCase):
def test_obj_tracking(self):
@six.add_metaclass(base.MagnumObjectMetaclass)
class NewBaseClass(object):
fields = {}
@classmethod
def obj_name(cls):
return cls.__name__
class Test1(NewBaseClass):
@staticmethod
def obj_name():
return 'fake1'
class Test2(NewBaseClass):
pass
class Test2v2(NewBaseClass):
@staticmethod
def obj_name():
return 'Test2'
expected = {'fake1': [Test1], 'Test2': [Test2, Test2v2]}
self.assertEqual(expected, NewBaseClass._obj_classes)
# The following should work, also.
self.assertEqual(expected, Test1._obj_classes)
self.assertEqual(expected, Test2._obj_classes)
class TestUtils(test_base.TestCase):
def test_datetime_or_none(self):
naive_dt = datetime.datetime.now()
dt = timeutils.parse_isotime(timeutils.isotime(naive_dt))
self.assertEqual(utils.datetime_or_none(dt), dt)
self.assertEqual(utils.datetime_or_none(dt),
naive_dt.replace(tzinfo=iso8601.iso8601.Utc(),
microsecond=0))
self.assertIsNone(utils.datetime_or_none(None))
self.assertRaises(ValueError, utils.datetime_or_none, 'foo')
def test_datetime_or_str_or_none(self):
dts = timeutils.isotime()
dt = timeutils.parse_isotime(dts)
self.assertEqual(utils.datetime_or_str_or_none(dt), dt)
self.assertIsNone(utils.datetime_or_str_or_none(None))
self.assertEqual(utils.datetime_or_str_or_none(dts), dt)
self.assertRaises(ValueError, utils.datetime_or_str_or_none, 'foo')
def test_int_or_none(self):
self.assertEqual(utils.int_or_none(1), 1)
self.assertEqual(utils.int_or_none('1'), 1)
self.assertIsNone(utils.int_or_none(None))
self.assertRaises(ValueError, utils.int_or_none, 'foo')
def test_str_or_none(self):
class Obj(object):
pass
self.assertEqual(utils.str_or_none('foo'), 'foo')
self.assertEqual(utils.str_or_none(1), '1')
self.assertIsNone(utils.str_or_none(None))
def test_ip_or_none(self):
ip4 = netaddr.IPAddress('1.2.3.4', 4)
ip6 = netaddr.IPAddress('1::2', 6)
self.assertEqual(utils.ip_or_none(4)('1.2.3.4'), ip4)
self.assertEqual(utils.ip_or_none(6)('1::2'), ip6)
self.assertIsNone(utils.ip_or_none(4)(None))
self.assertIsNone(utils.ip_or_none(6)(None))
self.assertRaises(netaddr.AddrFormatError, utils.ip_or_none(4), 'foo')
self.assertRaises(netaddr.AddrFormatError, utils.ip_or_none(6), 'foo')
def test_dt_serializer(self):
class Obj(object):
foo = utils.dt_serializer('bar')
obj = Obj()
obj.bar = timeutils.parse_isotime('1955-11-05T00:00:00Z')
self.assertEqual('1955-11-05T00:00:00Z', obj.foo())
obj.bar = None
self.assertIsNone(obj.foo())
obj.bar = 'foo'
self.assertRaises(AttributeError, obj.foo)
def test_dt_deserializer(self):
dt = timeutils.parse_isotime('1955-11-05T00:00:00Z')
self.assertEqual(utils.dt_deserializer(None, timeutils.isotime(dt)),
dt)
self.assertIsNone(utils.dt_deserializer(None, None))
self.assertRaises(ValueError, utils.dt_deserializer, None, 'foo')
def test_obj_to_primitive_list(self):
class MyList(base.ObjectListBase, base.MagnumObject):
pass
mylist = MyList(self.context)
mylist.objects = [1, 2, 3]
self.assertEqual([1, 2, 3], base.obj_to_primitive(mylist))
def test_obj_to_primitive_dict(self):
myobj = MyObj(self.context)
myobj.foo = 1
myobj.bar = 'foo'
self.assertEqual({'foo': 1, 'bar': 'foo'},
base.obj_to_primitive(myobj))
def test_obj_to_primitive_recursive(self):
class MyList(base.ObjectListBase, base.MagnumObject):
pass
mylist = MyList(self.context)
mylist.objects = [MyObj(self.context), MyObj(self.context)]
for i, value in enumerate(mylist):
value.foo = i
self.assertEqual([{'foo': 0}, {'foo': 1}],
base.obj_to_primitive(mylist))
class _TestObject(object):
def test_hydration_type_error(self):
primitive = {'magnum_object.name': 'MyObj',
'magnum_object.namespace': 'magnum',
'magnum_object.version': '1.5',
'magnum_object.data': {'foo': 'a'}}
self.assertRaises(ValueError, MyObj.obj_from_primitive, primitive)
def test_hydration(self):
primitive = {'magnum_object.name': 'MyObj',
'magnum_object.namespace': 'magnum',
'magnum_object.version': '1.5',
'magnum_object.data': {'foo': 1}}
obj = MyObj.obj_from_primitive(primitive)
self.assertEqual(1, obj.foo)
def test_hydration_bad_ns(self):
primitive = {'magnum_object.name': 'MyObj',
'magnum_object.namespace': 'foo',
'magnum_object.version': '1.5',
'magnum_object.data': {'foo': 1}}
self.assertRaises(exception.UnsupportedObjectError,
MyObj.obj_from_primitive, primitive)
def test_dehydration(self):
expected = {'magnum_object.name': 'MyObj',
'magnum_object.namespace': 'magnum',
'magnum_object.version': '1.5',
'magnum_object.data': {'foo': 1}}
obj = MyObj(self.context)
obj.foo = 1
obj.obj_reset_changes()
self.assertEqual(expected, obj.obj_to_primitive())
def test_get_updates(self):
obj = MyObj(self.context)
self.assertEqual({}, obj.obj_get_changes())
obj.foo = 123
self.assertEqual({'foo': 123}, obj.obj_get_changes())
obj.bar = 'test'
self.assertEqual({'foo': 123, 'bar': 'test'}, obj.obj_get_changes())
obj.obj_reset_changes()
self.assertEqual({}, obj.obj_get_changes())
def test_object_property(self):
obj = MyObj(self.context, foo=1)
self.assertEqual(1, obj.foo)
def test_object_property_type_error(self):
obj = MyObj(self.context)
def fail():
obj.foo = 'a'
self.assertRaises(ValueError, fail)
def test_object_dict_syntax(self):
obj = MyObj(self.context)
obj.foo = 123
obj.bar = 'bar'
self.assertEqual(123, obj['foo'])
self.assertEqual([('bar', 'bar'), ('foo', 123)],
sorted(obj.items(), key=lambda x: x[0]))
self.assertEqual([('bar', 'bar'), ('foo', 123)],
sorted(list(obj.iteritems()), key=lambda x: x[0]))
def test_load(self):
obj = MyObj(self.context)
self.assertEqual('loaded!', obj.bar)
def test_load_in_base(self):
class Foo(base.MagnumObject):
fields = {'foobar': int}
obj = Foo(self.context)
# NOTE(danms): Can't use assertRaisesRegexp() because of py26
raised = False
try:
obj.foobar
except NotImplementedError as ex:
raised = True
self.assertTrue(raised)
self.assertTrue('foobar' in str(ex))
def test_loaded_in_primitive(self):
obj = MyObj(self.context)
obj.foo = 1
obj.obj_reset_changes()
self.assertEqual('loaded!', obj.bar)
expected = {'magnum_object.name': 'MyObj',
'magnum_object.namespace': 'magnum',
'magnum_object.version': '1.0',
'magnum_object.changes': ['bar'],
'magnum_object.data': {'foo': 1,
'bar': 'loaded!'}}
self.assertEqual(expected, obj.obj_to_primitive())
def test_changes_in_primitive(self):
obj = MyObj(self.context)
obj.foo = 123
self.assertEqual(set(['foo']), obj.obj_what_changed())
primitive = obj.obj_to_primitive()
self.assertTrue('magnum_object.changes' in primitive)
obj2 = MyObj.obj_from_primitive(primitive)
self.assertEqual(set(['foo']), obj2.obj_what_changed())
obj2.obj_reset_changes()
self.assertEqual(set(), obj2.obj_what_changed())
def test_unknown_objtype(self):
self.assertRaises(exception.UnsupportedObjectError,
base.MagnumObject.obj_class_from_name, 'foo', '1.0')
def test_with_alternate_context(self):
ctxt1 = context.RequestContext('foo', 'foo')
ctxt2 = context.RequestContext('bar', project_id='alternate')
obj = MyObj.query(ctxt1)
obj.update_test(ctxt2)
self.assertEqual('alternate-context', obj.bar)
self.assertRemotes()
def test_orphaned_object(self):
obj = MyObj.query(self.context)
obj._context = None
self.assertRaises(exception.OrphanedObjectError,
obj.update_test)
self.assertRemotes()
def test_changed_1(self):
obj = MyObj.query(self.context)
obj.foo = 123
self.assertEqual(set(['foo']), obj.obj_what_changed())
obj.update_test(self.context)
self.assertEqual(set(['foo', 'bar']), obj.obj_what_changed())
self.assertEqual(123, obj.foo)
self.assertRemotes()
def test_changed_2(self):
obj = MyObj.query(self.context)
obj.foo = 123
self.assertEqual(set(['foo']), obj.obj_what_changed())
obj.save()
self.assertEqual(set([]), obj.obj_what_changed())
self.assertEqual(123, obj.foo)
self.assertRemotes()
def test_changed_3(self):
obj = MyObj.query(self.context)
obj.foo = 123
self.assertEqual(set(['foo']), obj.obj_what_changed())
obj.refresh()
self.assertEqual(set([]), obj.obj_what_changed())
self.assertEqual(321, obj.foo)
self.assertEqual('refreshed', obj.bar)
self.assertRemotes()
def test_changed_4(self):
obj = MyObj.query(self.context)
obj.bar = 'something'
self.assertEqual(set(['bar']), obj.obj_what_changed())
obj.modify_save_modify(self.context)
self.assertEqual(set(['foo']), obj.obj_what_changed())
self.assertEqual(42, obj.foo)
self.assertEqual('meow', obj.bar)
self.assertRemotes()
def test_static_result(self):
obj = MyObj.query(self.context)
self.assertEqual('bar', obj.bar)
result = obj.marco()
self.assertEqual('polo', result)
self.assertRemotes()
def test_updates(self):
obj = MyObj.query(self.context)
self.assertEqual(1, obj.foo)
obj.update_test()
self.assertEqual('updated', obj.bar)
self.assertRemotes()
def test_base_attributes(self):
dt = datetime.datetime(1955, 11, 5)
obj = MyObj(self.context)
obj.created_at = dt
obj.updated_at = dt
expected = {'magnum_object.name': 'MyObj',
'magnum_object.namespace': 'magnum',
'magnum_object.version': '1.0',
'magnum_object.changes':
['created_at', 'updated_at'],
'magnum_object.data':
{'created_at': timeutils.isotime(dt),
'updated_at': timeutils.isotime(dt),
}
}
actual = obj.obj_to_primitive()
# magnum_object.changes is built from a set and order is undefined
self.assertEqual(sorted(expected['magnum_object.changes']),
sorted(actual['magnum_object.changes']))
del expected['magnum_object.changes'], actual['magnum_object.changes']
self.assertEqual(expected, actual)
def test_contains(self):
obj = MyObj(self.context)
self.assertFalse('foo' in obj)
obj.foo = 1
self.assertTrue('foo' in obj)
self.assertFalse('does_not_exist' in obj)
def test_obj_attr_is_set(self):
obj = MyObj(self.context, foo=1)
self.assertTrue(obj.obj_attr_is_set('foo'))
self.assertFalse(obj.obj_attr_is_set('bar'))
self.assertRaises(AttributeError, obj.obj_attr_is_set, 'bang')
def test_get(self):
obj = MyObj(self.context, foo=1)
# Foo has value, should not get the default
self.assertEqual(obj.get('foo', 2), 1)
# Foo has value, should return the value without error
self.assertEqual(obj.get('foo'), 1)
# Bar is not loaded, so we should get the default
self.assertEqual(obj.get('bar', 'not-loaded'), 'not-loaded')
# Bar without a default should lazy-load
self.assertEqual(obj.get('bar'), 'loaded!')
# Bar now has a default, but loaded value should be returned
self.assertEqual(obj.get('bar', 'not-loaded'), 'loaded!')
# Invalid attribute should raise AttributeError
self.assertRaises(AttributeError, obj.get, 'nothing')
# ...even with a default
self.assertRaises(AttributeError, obj.get, 'nothing', 3)
def test_object_inheritance(self):
base_fields = base.MagnumObject.fields.keys()
myobj_fields = ['foo', 'bar', 'missing'] + base_fields
myobj3_fields = ['new_field']
self.assertTrue(issubclass(TestSubclassedObject, MyObj))
self.assertEqual(len(myobj_fields), len(MyObj.fields))
self.assertEqual(set(myobj_fields), set(MyObj.fields.keys()))
self.assertEqual(len(myobj_fields) + len(myobj3_fields),
len(TestSubclassedObject.fields))
self.assertEqual(set(myobj_fields) | set(myobj3_fields),
set(TestSubclassedObject.fields.keys()))
def test_get_changes(self):
obj = MyObj(self.context)
self.assertEqual({}, obj.obj_get_changes())
obj.foo = 123
self.assertEqual({'foo': 123}, obj.obj_get_changes())
obj.bar = 'test'
self.assertEqual({'foo': 123, 'bar': 'test'}, obj.obj_get_changes())
obj.obj_reset_changes()
self.assertEqual({}, obj.obj_get_changes())
def test_obj_fields(self):
class TestObj(base.MagnumObject):
fields = {'foo': int}
obj_extra_fields = ['bar']
@property
def bar(self):
return 'this is bar'
obj = TestObj(self.context)
self.assertEqual(set(['created_at', 'updated_at', 'foo', 'bar']),
set(obj.obj_fields))
def test_obj_constructor(self):
obj = MyObj(self.context, foo=123, bar='abc')
self.assertEqual(123, obj.foo)
self.assertEqual('abc', obj.bar)
self.assertEqual(set(['foo', 'bar']), obj.obj_what_changed())
class TestObjectListBase(test_base.TestCase):
def test_list_like_operations(self):
class Foo(base.ObjectListBase, base.MagnumObject):
pass
objlist = Foo(self.context)
objlist._context = 'foo'
objlist.objects = [1, 2, 3]
self.assertEqual(list(objlist), objlist.objects)
self.assertEqual(3, len(objlist))
self.assertIn(2, objlist)
self.assertEqual([1], list(objlist[:1]))
self.assertEqual('foo', objlist[:1]._context)
self.assertEqual(3, objlist[2])
self.assertEqual(1, objlist.count(1))
self.assertEqual(1, objlist.index(2))
def test_serialization(self):
class Foo(base.ObjectListBase, base.MagnumObject):
pass
class Bar(base.MagnumObject):
fields = {'foo': str}
obj = Foo(self.context)
obj.objects = []
for i in 'abc':
bar = Bar(self.context)
bar.foo = i
obj.objects.append(bar)
obj2 = base.MagnumObject.obj_from_primitive(obj.obj_to_primitive())
self.assertFalse(obj is obj2)
self.assertEqual([x.foo for x in obj],
[y.foo for y in obj2])
def _test_object_list_version_mappings(self, list_obj_class):
# Figure out what sort of object this list is for
list_field = list_obj_class.fields['objects']
item_obj_field = list_field._type._element_type
item_obj_name = item_obj_field._type._obj_name
# Look through all object classes of this type and make sure that
# the versions we find are covered by the parent list class
for item_class in base.MagnumObject._obj_classes[item_obj_name]:
self.assertIn(
item_class.VERSION,
list_obj_class.child_versions.values())
def test_object_version_mappings(self):
# Find all object list classes and make sure that they at least handle
# all the current object versions
for obj_classes in base.MagnumObject._obj_classes.values():
for obj_class in obj_classes:
if issubclass(obj_class, base.ObjectListBase):
self._test_object_list_version_mappings(obj_class)
def test_list_changes(self):
class Foo(base.ObjectListBase, base.MagnumObject):
pass
class Bar(base.MagnumObject):
fields = {'foo': str}
obj = Foo(self.context, objects=[])
self.assertEqual(set(['objects']), obj.obj_what_changed())
obj.objects.append(Bar(self.context, foo='test'))
self.assertEqual(set(['objects']), obj.obj_what_changed())
obj.obj_reset_changes()
# This should still look dirty because the child is dirty
self.assertEqual(set(['objects']), obj.obj_what_changed())
obj.objects[0].obj_reset_changes()
# This should now look clean because the child is clean
self.assertEqual(set(), obj.obj_what_changed())
class TestObjectSerializer(test_base.TestCase):
def test_serialize_entity_primitive(self):
ser = base.MagnumObjectSerializer()
for thing in (1, 'foo', [1, 2], {'foo': 'bar'}):
self.assertEqual(thing, ser.serialize_entity(None, thing))
def test_deserialize_entity_primitive(self):
ser = base.MagnumObjectSerializer()
for thing in (1, 'foo', [1, 2], {'foo': 'bar'}):
self.assertEqual(thing, ser.deserialize_entity(None, thing))
def test_object_serialization(self):
ser = base.MagnumObjectSerializer()
obj = MyObj(self.context)
primitive = ser.serialize_entity(self.context, obj)
self.assertTrue('magnum_object.name' in primitive)
obj2 = ser.deserialize_entity(self.context, primitive)
self.assertIsInstance(obj2, MyObj)
self.assertEqual(self.context, obj2._context)
def test_object_serialization_iterables(self):
ser = base.MagnumObjectSerializer()
obj = MyObj(self.context)
for iterable in (list, tuple, set):
thing = iterable([obj])
primitive = ser.serialize_entity(self.context, thing)
self.assertEqual(1, len(primitive))
for item in primitive:
self.assertFalse(isinstance(item, base.MagnumObject))
thing2 = ser.deserialize_entity(self.context, primitive)
self.assertEqual(1, len(thing2))
for item in thing2:
self.assertIsInstance(item, MyObj)