Merge "Added attribute object and its unit tests"
This commit is contained in:
commit
2f6f71849e
@ -95,6 +95,10 @@ class Connection(object):
|
||||
marker=None, columns_to_join=None):
|
||||
"""Get requested deployable by filters."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def deployable_get_by_filters_with_attributes(self, context,
|
||||
filters):
|
||||
"""Get requested deployable by filters with attributes."""
|
||||
# attributes
|
||||
@abc.abstractmethod
|
||||
def attribute_create(self, context, key, value):
|
||||
@ -105,8 +109,12 @@ class Connection(object):
|
||||
"""Get requested attribute."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def attribute_get_by_deployable_uuid(self, context, deployable_uuid):
|
||||
"""Get requested deployable by deployable uuid."""
|
||||
def attribute_get_by_deployable_id(self, context, deployable_id):
|
||||
"""Get requested attribute by attribute id."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def attribute_get_by_filter(self, context, filters):
|
||||
"""Get requested attribute by kv pair and attribute id."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def attribute_update(self, context, uuid, key, value):
|
||||
|
@ -31,7 +31,8 @@ from cyborg.common import exception
|
||||
from cyborg.common.i18n import _
|
||||
from cyborg.db import api
|
||||
from cyborg.db.sqlalchemy import models
|
||||
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy import and_
|
||||
|
||||
_CONTEXT = threading.local()
|
||||
LOG = log.getLogger(__name__)
|
||||
@ -248,6 +249,38 @@ class Connection(api.Connection):
|
||||
if count != 1:
|
||||
raise exception.DeployableNotFound(uuid=uuid)
|
||||
|
||||
def deployable_get_by_filters_with_attributes(self, context,
|
||||
filters):
|
||||
|
||||
exact_match_filter_names = ['uuid', 'name',
|
||||
'parent_uuid', 'root_uuid',
|
||||
'pcie_address', 'host',
|
||||
'board', 'vendor', 'version',
|
||||
'type', 'assignable', 'instance_uuid',
|
||||
'availability', 'accelerator_id']
|
||||
attribute_filters = {}
|
||||
filters_copy = copy.deepcopy(filters)
|
||||
for key, value in filters_copy.iteritems():
|
||||
if key not in exact_match_filter_names:
|
||||
# This key is not in the deployable regular fields
|
||||
value = filters.pop(key)
|
||||
attribute_filters.update({key: value})
|
||||
|
||||
query_prefix = model_query(context, models.Deployable)
|
||||
filters = copy.deepcopy(filters)
|
||||
|
||||
# Filter the query
|
||||
query_prefix = self._exact_deployable_filter_with_attributes(
|
||||
query_prefix,
|
||||
filters,
|
||||
exact_match_filter_names,
|
||||
attribute_filters
|
||||
)
|
||||
if query_prefix is None:
|
||||
return []
|
||||
deployables = query_prefix.all()
|
||||
return deployables
|
||||
|
||||
def deployable_get_by_filters(self, context,
|
||||
filters, sort_key='created_at',
|
||||
sort_dir='desc', limit=None,
|
||||
@ -262,6 +295,52 @@ class Connection(api.Connection):
|
||||
sort_keys=[sort_key],
|
||||
sort_dirs=[sort_dir])
|
||||
|
||||
def _exact_deployable_filter_with_attributes(self, query,
|
||||
dpl_filters, legal_keys,
|
||||
attribute_filters):
|
||||
"""Applies exact match filtering to a deployable query.
|
||||
Returns the updated query. Modifies dpl_filters argument to remove
|
||||
dpl_filters consumed.
|
||||
:param query: query to apply dpl_filters and attribute_filters to
|
||||
:param dpl_filters: dictionary of filters; values that are lists,
|
||||
tuples, sets, or frozensets cause an 'IN' test to
|
||||
be performed, while exact matching ('==' operator)
|
||||
is used for other values
|
||||
:param legal_keys: list of keys to apply exact filtering to
|
||||
:param attribute_filters: dictionary of attribute filters
|
||||
"""
|
||||
|
||||
filter_dict = {}
|
||||
model = models.Deployable
|
||||
|
||||
# Walk through all the keys
|
||||
for key in legal_keys:
|
||||
# Skip ones we're not filtering on
|
||||
if key not in dpl_filters:
|
||||
continue
|
||||
|
||||
# OK, filtering on this key; what value do we search for?
|
||||
value = dpl_filters.pop(key)
|
||||
|
||||
if isinstance(value, (list, tuple, set, frozenset)):
|
||||
if not value:
|
||||
return None
|
||||
# Looking for values in a list; apply to query directly
|
||||
column_attr = getattr(model, key)
|
||||
query = query.filter(column_attr.in_(value))
|
||||
else:
|
||||
filter_dict[key] = value
|
||||
# Apply simple exact matches
|
||||
if filter_dict:
|
||||
query = query.filter(*[getattr(models.Deployable, k) == v
|
||||
for k, v in filter_dict.items()])
|
||||
if attribute_filters:
|
||||
query = query.outerjoin(models.Attribute)
|
||||
query = query.filter(or_(*[and_(models.Attribute.key == k,
|
||||
models.Attribute.value == v)
|
||||
for k, v in attribute_filters.items()]))
|
||||
return query
|
||||
|
||||
def _exact_deployable_filter(self, query, filters, legal_keys):
|
||||
"""Applies exact match filtering to a deployable query.
|
||||
Returns the updated query. Modifies filters argument to remove
|
||||
@ -334,12 +413,13 @@ class Connection(api.Connection):
|
||||
deployables = query_prefix.all()
|
||||
return deployables
|
||||
|
||||
def attribute_create(self, context, key, value):
|
||||
update_fields = {'key': key, 'value': value}
|
||||
update_fields['uuid'] = uuidutils.generate_uuid()
|
||||
|
||||
def attribute_create(self, context, values):
|
||||
if not values.get('uuid'):
|
||||
values['uuid'] = uuidutils.generate_uuid()
|
||||
if values.get('id'):
|
||||
values.pop('id', None)
|
||||
attribute = models.Attribute()
|
||||
attribute.update(update_fields)
|
||||
attribute.update(values)
|
||||
|
||||
with _session_for_write() as session:
|
||||
try:
|
||||
@ -359,15 +439,42 @@ class Connection(api.Connection):
|
||||
except NoResultFound:
|
||||
raise exception.AttributeNotFound(uuid=uuid)
|
||||
|
||||
def attribute_get_by_deployable_uuid(self, context, deployable_uuid):
|
||||
def attribute_get_by_deployable_id(self, context, deployable_id):
|
||||
query = model_query(
|
||||
context,
|
||||
models.Attribute).filter_by(deployable_uuid=deployable_uuid)
|
||||
models.Attribute).filter_by(deployable_id=deployable_id)
|
||||
try:
|
||||
return query.all()
|
||||
except NoResultFound:
|
||||
raise exception.AttributeNotFound(uuid=uuid)
|
||||
|
||||
def attribute_get_by_filter(self, context, filters):
|
||||
"""Return attributes that matches the filters
|
||||
"""
|
||||
query_prefix = model_query(context, models.Attribute)
|
||||
|
||||
# Filter the query
|
||||
query_prefix = self._exact_attribute_by_filter(query_prefix,
|
||||
filters)
|
||||
if query_prefix is None:
|
||||
return []
|
||||
|
||||
return query_prefix.all()
|
||||
|
||||
def _exact_attribute_by_filter(self, query, filters):
|
||||
"""Applies exact match filtering to a atrtribute query.
|
||||
Returns the updated query.
|
||||
:param filters: The filters specified by a dict of kv pairs
|
||||
"""
|
||||
|
||||
model = models.Attribute
|
||||
filter_dict = filters
|
||||
|
||||
# Apply simple exact matches
|
||||
query = query.filter(*[getattr(models.Attribute, k) == v
|
||||
for k, v in filter_dict.items()])
|
||||
return query
|
||||
|
||||
def attribute_update(self, context, uuid, key, value):
|
||||
return self._do_update_attribute(context, uuid, key, value)
|
||||
|
||||
|
@ -27,3 +27,4 @@ def register_all():
|
||||
# need to receive it via RPC.
|
||||
__import__('cyborg.objects.accelerator')
|
||||
__import__('cyborg.objects.deployable')
|
||||
__import__('cyborg.objects.attribute')
|
||||
|
@ -33,9 +33,9 @@ class Attribute(base.CyborgObject, object_base.VersionedObjectDictCompat):
|
||||
dbapi = dbapi.get_instance()
|
||||
|
||||
fields = {
|
||||
'id': fields.IntegerField(nullable=False),
|
||||
'id': object_fields.IntegerField(nullable=False),
|
||||
'uuid': object_fields.UUIDField(nullable=False),
|
||||
'deployable_id': fields.IntegerField(nullable=False),
|
||||
'deployable_id': object_fields.IntegerField(nullable=False),
|
||||
'key': object_fields.StringField(nullable=False),
|
||||
'value': object_fields.StringField(nullable=False)
|
||||
}
|
||||
@ -47,26 +47,31 @@ class Attribute(base.CyborgObject, object_base.VersionedObjectDictCompat):
|
||||
|
||||
values = self.obj_get_changes()
|
||||
db_attr = self.dbapi.attribute_create(context,
|
||||
self.key,
|
||||
self.value)
|
||||
values)
|
||||
self._from_db_object(self, db_attr)
|
||||
|
||||
@classmethod
|
||||
def get(cls, context, uuid):
|
||||
"""Find a DB Deployable and return an Obj Deployable."""
|
||||
"""Find a DB attribute and return an Obj Deployable."""
|
||||
db_attr = cls.dbapi.attribute_get(context, uuid)
|
||||
obj_attr = cls._from_db_object(cls(context), db_attr)
|
||||
return obj_attr
|
||||
|
||||
@classmethod
|
||||
def attribute_get_by_deployable_uuid(cls, context, deployable_uuid):
|
||||
"""Get a Deployable by host."""
|
||||
db_attr = cls.dbapi.attribute_get_by_deployable_uuid(context,
|
||||
deployable_uuid)
|
||||
def get_by_deployable_id(cls, context, deployable_id):
|
||||
"""Get a attribute by deployable_id"""
|
||||
db_attr = cls.dbapi.attribute_get_by_deployable_id(context,
|
||||
deployable_id)
|
||||
return cls._from_db_object_list(db_attr, context)
|
||||
|
||||
@classmethod
|
||||
def get_by_filter(cls, context, filters):
|
||||
"""Get a attribute by specified filters"""
|
||||
db_attr = cls.dbapi.attribute_get_by_filter(context, filters)
|
||||
return cls._from_db_object_list(db_attr, context)
|
||||
|
||||
def save(self, context):
|
||||
"""Update a Deployable record in the DB."""
|
||||
"""Update a attribute record in the DB."""
|
||||
updates = self.obj_get_changes()
|
||||
db_attr = self.dbapi.attribute_update(context,
|
||||
self.uuid,
|
||||
@ -75,7 +80,7 @@ class Attribute(base.CyborgObject, object_base.VersionedObjectDictCompat):
|
||||
self._from_db_object(self, db_attr)
|
||||
|
||||
def destroy(self, context):
|
||||
"""Delete a Deployable from the DB."""
|
||||
"""Delete a attribute from the DB."""
|
||||
self.dbapi.attribute_delete(context, self.uuid)
|
||||
self.obj_reset_changes()
|
||||
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
from oslo_log import log as logging
|
||||
from oslo_versionedobjects import base as object_base
|
||||
|
||||
@ -20,7 +21,7 @@ from cyborg.common import exception
|
||||
from cyborg.db import api as dbapi
|
||||
from cyborg.objects import base
|
||||
from cyborg.objects import fields as object_fields
|
||||
|
||||
from cyborg.objects.attribute import Attribute
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -78,25 +79,42 @@ class Deployable(base.CyborgObject, object_base.VersionedObjectDictCompat):
|
||||
|
||||
db_dep = self.dbapi.deployable_create(context, values)
|
||||
self._from_db_object(self, db_dep)
|
||||
del self.attributes_list[:]
|
||||
|
||||
@classmethod
|
||||
def get(cls, context, uuid):
|
||||
"""Find a DB Deployable and return an Obj Deployable."""
|
||||
db_dep = cls.dbapi.deployable_get(context, uuid)
|
||||
obj_dep = cls._from_db_object(cls(context), db_dep)
|
||||
# retrieve all the attrobutes for this deployable
|
||||
query = {"deployable_id": obj_dep.id}
|
||||
attr_get_list = Attribute.get_by_filter(context,
|
||||
query)
|
||||
obj_dep.attributes_list = attr_get_list
|
||||
return obj_dep
|
||||
|
||||
@classmethod
|
||||
def get_by_host(cls, context, host):
|
||||
"""Get a Deployable by host."""
|
||||
db_deps = cls.dbapi.deployable_get_by_host(context, host)
|
||||
query = {"deployable_id": db_deps.id}
|
||||
attr_get_list = Attribute.get_by_filter(context,
|
||||
query)
|
||||
db_deps.attributes_list = attr_get_list
|
||||
return cls._from_db_object_list(db_deps, context)
|
||||
|
||||
@classmethod
|
||||
def list(cls, context):
|
||||
"""Return a list of Deployable objects."""
|
||||
db_deps = cls.dbapi.deployable_list(context)
|
||||
return cls._from_db_object_list(db_deps, context)
|
||||
|
||||
obj_dpl_list = cls._from_db_object_list(db_deps, context)
|
||||
for obj_dpl in obj_dpl_list:
|
||||
query = {"deployable_id": obj_dpl.id}
|
||||
attr_get_list = Attribute.get_by_filter(context,
|
||||
query)
|
||||
obj_dpl.attributes_list = attr_get_list
|
||||
return obj_dpl_list
|
||||
|
||||
def save(self, context):
|
||||
"""Update a Deployable record in the DB."""
|
||||
@ -106,6 +124,7 @@ class Deployable(base.CyborgObject, object_base.VersionedObjectDictCompat):
|
||||
|
||||
def destroy(self, context):
|
||||
"""Delete a Deployable from the DB."""
|
||||
del self.attributes_list[:]
|
||||
self.dbapi.deployable_delete(context, self.uuid)
|
||||
self.obj_reset_changes()
|
||||
|
||||
@ -114,26 +133,57 @@ class Deployable(base.CyborgObject, object_base.VersionedObjectDictCompat):
|
||||
If the attribute already exists, it will update the value,
|
||||
otherwise, the vf will be appended to the list
|
||||
"""
|
||||
if not isinstance(vf, VirtualFunction) or vf.type != 'vf':
|
||||
raise exception.InvalidDeployType()
|
||||
for exist_vf in self.virtual_function_list:
|
||||
if base.obj_equal_prims(vf, exist_vf):
|
||||
LOG.warning("The vf already exists")
|
||||
|
||||
for exist_attr in self.attributes_list:
|
||||
if base.obj_equal_prims(attribute, exist_attr):
|
||||
LOG.warning("The attribute already exists")
|
||||
return None
|
||||
attribute.deployable_id = self.id
|
||||
attribute_copy = copy.deepcopy(attribute)
|
||||
self.attributes_list.append(attribute_copy)
|
||||
|
||||
def delete_attribute(self, context, attribute):
|
||||
"""remove an attribute from the attributes_list
|
||||
if the attribute does not exist, ignore it
|
||||
"""
|
||||
|
||||
idx = 0
|
||||
for exist_attribute in self.attributes_list:
|
||||
if base.obj_equal_prims(attribute, exist_attribute):
|
||||
removed_attribute = self.attributes_list.pop(idx)
|
||||
removed_attribute.destroy(context)
|
||||
return
|
||||
idx = idx + 1
|
||||
LOG.warning("The removing attribute does not exist!")
|
||||
|
||||
@classmethod
|
||||
def get_by_filter(cls, context,
|
||||
filters, sort_key='created_at',
|
||||
sort_dir='desc', limit=None,
|
||||
marker=None, join=None):
|
||||
filters):
|
||||
obj_dpl_list = []
|
||||
db_dpl_list = cls.dbapi.deployable_get_by_filters(context, filters,
|
||||
sort_key=sort_key,
|
||||
sort_dir=sort_dir,
|
||||
limit=limit,
|
||||
marker=marker,
|
||||
join_columns=join)
|
||||
for db_dpl in db_dpl_list:
|
||||
obj_dpl = cls._from_db_object(cls(context), db_dpl)
|
||||
obj_dpl_list.append(obj_dpl)
|
||||
db_dpl_list = cls.dbapi.deployable_get_by_filters_with_attributes(
|
||||
context,
|
||||
filters)
|
||||
|
||||
if db_dpl_list:
|
||||
for db_dpl in db_dpl_list:
|
||||
obj_dpl = cls._from_db_object(cls(context), db_dpl)
|
||||
query = {"deployable_id": obj_dpl.id}
|
||||
attr_get_list = Attribute.get_by_filter(context,
|
||||
query)
|
||||
obj_dpl.attributes_list = attr_get_list
|
||||
obj_dpl_list.append(obj_dpl)
|
||||
|
||||
return obj_dpl_list
|
||||
|
||||
@classmethod
|
||||
def _from_db_object(cls, obj, db_obj):
|
||||
"""Converts a deployable to a formal object.
|
||||
|
||||
:param obj: An object of the class.
|
||||
:param db_obj: A DB model of the object
|
||||
:return: The object of the class with the database entity added
|
||||
"""
|
||||
obj = base.CyborgObject._from_db_object(obj, db_obj)
|
||||
obj.attributes_list = []
|
||||
|
||||
return obj
|
||||
|
59
cyborg/tests/unit/fake_attribute.py
Normal file
59
cyborg/tests/unit/fake_attribute.py
Normal file
@ -0,0 +1,59 @@
|
||||
# Copyright 2018 Huawei Technologies Co.,LTD.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from cyborg import objects
|
||||
from cyborg.objects import fields
|
||||
|
||||
|
||||
def fake_db_attribute(**updates):
|
||||
attr_uuid = uuidutils.generate_uuid()
|
||||
db_attribute = {
|
||||
'id': 0,
|
||||
'uuid': attr_uuid,
|
||||
'deployable_id': 1,
|
||||
'key': 'attr_key',
|
||||
'value': 'attr_val'
|
||||
}
|
||||
|
||||
for name, field in objects.Attribute.fields.items():
|
||||
if name in db_attribute:
|
||||
continue
|
||||
if field.nullable:
|
||||
db_attribute[name] = None
|
||||
elif field.default != fields.UnspecifiedDefault:
|
||||
db_attribute[name] = field.default
|
||||
else:
|
||||
raise Exception('fake_db_attribute needs help with %s' % name)
|
||||
|
||||
if updates:
|
||||
db_attribute.update(updates)
|
||||
|
||||
return db_attribute
|
||||
|
||||
|
||||
def fake_attribute_obj(context, obj_attr_class=None, **updates):
|
||||
if obj_attr_class is None:
|
||||
obj_attr_class = objects.Attribute
|
||||
expected_attrs = updates.pop('expected_attrs', None)
|
||||
attribute = obj_attr_class._from_db_object(context,
|
||||
obj_attr_class(),
|
||||
fake_db_deployable(**updates),
|
||||
expected_attrs=expected_attrs)
|
||||
attribute.obj_reset_changes()
|
||||
return attribute
|
207
cyborg/tests/unit/objects/test_attribute.py
Normal file
207
cyborg/tests/unit/objects/test_attribute.py
Normal file
@ -0,0 +1,207 @@
|
||||
# Copyright 2018 Huawei Technologies Co.,LTD.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
|
||||
import mock
|
||||
import netaddr
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import timeutils
|
||||
from oslo_context import context
|
||||
|
||||
from cyborg import db
|
||||
from cyborg.common import exception
|
||||
from cyborg import objects
|
||||
from cyborg.objects import base
|
||||
from cyborg import tests as test
|
||||
from cyborg.tests.unit import fake_attribute
|
||||
from cyborg.tests.unit import fake_deployable
|
||||
from cyborg.tests.unit import fake_accelerator
|
||||
from cyborg.tests.unit.objects import test_objects
|
||||
from cyborg.tests.unit.db.base import DbTestCase
|
||||
|
||||
|
||||
class _TestDeployableObject(DbTestCase):
|
||||
@property
|
||||
def fake_deployable(self):
|
||||
db_deploy = fake_deployable.fake_db_deployable(id=1)
|
||||
return db_deploy
|
||||
|
||||
@property
|
||||
def fake_accelerator(self):
|
||||
db_acc = fake_accelerator.fake_db_accelerator(id=2)
|
||||
return db_acc
|
||||
|
||||
@property
|
||||
def fake_attribute(self):
|
||||
db_attr = fake_attribute.fake_db_attribute(id=2)
|
||||
return db_attr
|
||||
|
||||
def test_create(self):
|
||||
db_acc = self.fake_accelerator
|
||||
acc = objects.Accelerator(context=self.context,
|
||||
**db_acc)
|
||||
acc.create(self.context)
|
||||
acc_get = objects.Accelerator.get(self.context, acc.uuid)
|
||||
|
||||
db_dpl = self.fake_deployable
|
||||
dpl = objects.Deployable(context=self.context,
|
||||
**db_dpl)
|
||||
|
||||
dpl.accelerator_id = acc_get.id
|
||||
dpl.create(self.context)
|
||||
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
|
||||
|
||||
db_attr = self.fake_attribute
|
||||
attr = objects.Attribute(context=self.context,
|
||||
**db_attr)
|
||||
attr.deployable_id = dpl_get.id
|
||||
attr.create(self.context)
|
||||
|
||||
self.assertEqual(db_attr['uuid'], attr.uuid)
|
||||
|
||||
def test_get(self):
|
||||
db_acc = self.fake_accelerator
|
||||
acc = objects.Accelerator(context=self.context,
|
||||
**db_acc)
|
||||
acc.create(self.context)
|
||||
acc_get = objects.Accelerator.get(self.context, acc.uuid)
|
||||
|
||||
db_dpl = self.fake_deployable
|
||||
dpl = objects.Deployable(context=self.context,
|
||||
**db_dpl)
|
||||
|
||||
dpl.accelerator_id = acc_get.id
|
||||
dpl.create(self.context)
|
||||
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
|
||||
|
||||
db_attr = self.fake_attribute
|
||||
attr = objects.Attribute(context=self.context,
|
||||
**db_attr)
|
||||
attr.deployable_id = dpl_get.id
|
||||
attr.create(self.context)
|
||||
attr_get = objects.Attribute.get(self.context, attr.uuid)
|
||||
|
||||
self.assertEqual(db_attr['uuid'], attr_get.uuid)
|
||||
|
||||
def test_get_by_deployable_uuid(self):
|
||||
db_acc = self.fake_accelerator
|
||||
acc = objects.Accelerator(context=self.context,
|
||||
**db_acc)
|
||||
acc.create(self.context)
|
||||
acc_get = objects.Accelerator.get(self.context, acc.uuid)
|
||||
|
||||
db_dpl = self.fake_deployable
|
||||
dpl = objects.Deployable(context=self.context,
|
||||
**db_dpl)
|
||||
|
||||
dpl.accelerator_id = acc_get.id
|
||||
dpl.create(self.context)
|
||||
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
|
||||
|
||||
db_attr = self.fake_attribute
|
||||
attr = objects.Attribute(context=self.context,
|
||||
**db_attr)
|
||||
attr.deployable_id = dpl_get.id
|
||||
attr.create(self.context)
|
||||
attr_get = objects.Attribute.get_by_deployable_id(
|
||||
self.context, dpl_get.id)[0]
|
||||
|
||||
self.assertEqual(db_attr['uuid'], attr_get.uuid)
|
||||
|
||||
def test_save(self):
|
||||
db_acc = self.fake_accelerator
|
||||
acc = objects.Accelerator(context=self.context,
|
||||
**db_acc)
|
||||
acc.create(self.context)
|
||||
acc_get = objects.Accelerator.get(self.context, acc.uuid)
|
||||
|
||||
db_dpl = self.fake_deployable
|
||||
dpl = objects.Deployable(context=self.context,
|
||||
**db_dpl)
|
||||
|
||||
dpl.accelerator_id = acc_get.id
|
||||
dpl.create(self.context)
|
||||
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
|
||||
|
||||
db_attr = self.fake_attribute
|
||||
attr = objects.Attribute(context=self.context,
|
||||
**db_attr)
|
||||
attr.deployable_id = dpl_get.id
|
||||
attr.create(self.context)
|
||||
attr_get = objects.Attribute.get(self.context, attr.uuid)
|
||||
attr_get.set_key_value_pair("test_key", "test_val")
|
||||
attr_get.save(self.context)
|
||||
attr_get_2 = objects.Attribute.get(self.context, attr_get.uuid)
|
||||
self.assertEqual(attr_get_2.key, "test_key")
|
||||
self.assertEqual(attr_get_2.value, "test_val")
|
||||
|
||||
def test_destroy(self):
|
||||
db_acc = self.fake_accelerator
|
||||
acc = objects.Accelerator(context=self.context,
|
||||
**db_acc)
|
||||
acc.create(self.context)
|
||||
acc_get = objects.Accelerator.get(self.context, acc.uuid)
|
||||
|
||||
db_dpl = self.fake_deployable
|
||||
dpl = objects.Deployable(context=self.context,
|
||||
**db_dpl)
|
||||
|
||||
dpl.accelerator_id = acc_get.id
|
||||
dpl.create(self.context)
|
||||
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
|
||||
|
||||
db_attr = self.fake_attribute
|
||||
attr = objects.Attribute(context=self.context,
|
||||
**db_attr)
|
||||
attr.deployable_id = dpl_get.id
|
||||
attr.create(self.context)
|
||||
self.assertEqual(db_attr['uuid'], attr.uuid)
|
||||
|
||||
attr.destroy(self.context)
|
||||
self.assertRaises(exception.AttributeNotFound,
|
||||
objects.Attribute.get, self.context,
|
||||
attr.uuid)
|
||||
|
||||
def test_get_by_filter(self):
|
||||
db_acc = self.fake_accelerator
|
||||
acc = objects.Accelerator(context=self.context,
|
||||
**db_acc)
|
||||
acc.create(self.context)
|
||||
acc_get = objects.Accelerator.get(self.context, acc.uuid)
|
||||
|
||||
db_dpl = self.fake_deployable
|
||||
dpl = objects.Deployable(context=self.context,
|
||||
**db_dpl)
|
||||
|
||||
dpl.accelerator_id = acc_get.id
|
||||
dpl.create(self.context)
|
||||
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
|
||||
|
||||
db_attr = self.fake_attribute
|
||||
attr = objects.Attribute(context=self.context,
|
||||
**db_attr)
|
||||
attr.deployable_id = dpl_get.id
|
||||
attr.create(self.context)
|
||||
attr_filter = {"key": "attr_key", "value": "attr_val"}
|
||||
attr_get = objects.Attribute.get_by_filter(
|
||||
self.context, attr_filter)[0]
|
||||
|
||||
self.assertEqual(db_attr['uuid'], attr_get.uuid)
|
||||
|
||||
attr_filter = {"key": "attr_key", "value": "attr_val2"}
|
||||
attr_get_list = objects.Attribute.get_by_filter(
|
||||
self.context, attr_filter)
|
||||
self.assertEqual(len(attr_get_list), 0)
|
@ -28,6 +28,7 @@ from cyborg.objects import base
|
||||
from cyborg import tests as test
|
||||
from cyborg.tests.unit import fake_accelerator
|
||||
from cyborg.tests.unit import fake_deployable
|
||||
from cyborg.tests.unit import fake_attribute
|
||||
from cyborg.tests.unit.objects import test_objects
|
||||
from cyborg.tests.unit.db.base import DbTestCase
|
||||
|
||||
@ -38,11 +39,31 @@ class _TestDeployableObject(DbTestCase):
|
||||
db_deploy = fake_deployable.fake_db_deployable(id=1)
|
||||
return db_deploy
|
||||
|
||||
@property
|
||||
def fake_deployable2(self):
|
||||
db_deploy = fake_deployable.fake_db_deployable(id=2)
|
||||
return db_deploy
|
||||
|
||||
@property
|
||||
def fake_accelerator(self):
|
||||
db_acc = fake_accelerator.fake_db_accelerator(id=2)
|
||||
return db_acc
|
||||
|
||||
@property
|
||||
def fake_attribute(self):
|
||||
db_attr = fake_attribute.fake_db_attribute(id=2)
|
||||
return db_attr
|
||||
|
||||
@property
|
||||
def fake_attribute2(self):
|
||||
db_attr = fake_attribute.fake_db_attribute(id=3)
|
||||
return db_attr
|
||||
|
||||
@property
|
||||
def fake_attribute3(self):
|
||||
db_attr = fake_attribute.fake_db_attribute(id=4)
|
||||
return db_attr
|
||||
|
||||
def test_create(self):
|
||||
db_acc = self.fake_accelerator
|
||||
acc = objects.Accelerator(context=self.context,
|
||||
@ -125,6 +146,138 @@ class _TestDeployableObject(DbTestCase):
|
||||
objects.Deployable.get, self.context,
|
||||
dpl.uuid)
|
||||
|
||||
def test_add_attribute(self):
|
||||
db_acc = self.fake_accelerator
|
||||
acc = objects.Accelerator(context=self.context,
|
||||
**db_acc)
|
||||
acc.create(self.context)
|
||||
acc_get = objects.Accelerator.get(self.context, acc.uuid)
|
||||
|
||||
db_dpl = self.fake_deployable
|
||||
dpl = objects.Deployable(context=self.context,
|
||||
**db_dpl)
|
||||
dpl.accelerator_id = acc_get.id
|
||||
dpl.create(self.context)
|
||||
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
|
||||
|
||||
db_attr = self.fake_attribute
|
||||
attr = objects.Attribute(context=self.context,
|
||||
**db_attr)
|
||||
attr.deployable_id = dpl_get.id
|
||||
attr.create(self.context)
|
||||
|
||||
dpl.add_attribute(attr)
|
||||
dpl.save(self.context)
|
||||
|
||||
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
|
||||
self.assertEqual(len(dpl_get.attributes_list), 1)
|
||||
self.assertEqual(dpl_get.attributes_list[0].id, attr.id)
|
||||
|
||||
def test_delete_attribute(self):
|
||||
db_acc = self.fake_accelerator
|
||||
acc = objects.Accelerator(context=self.context,
|
||||
**db_acc)
|
||||
acc.create(self.context)
|
||||
acc_get = objects.Accelerator.get(self.context, acc.uuid)
|
||||
|
||||
db_dpl = self.fake_deployable
|
||||
dpl = objects.Deployable(context=self.context,
|
||||
**db_dpl)
|
||||
dpl.accelerator_id = acc_get.id
|
||||
dpl.create(self.context)
|
||||
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
|
||||
db_attr = self.fake_attribute
|
||||
attr = objects.Attribute(context=self.context,
|
||||
**db_attr)
|
||||
attr.deployable_id = dpl_get.id
|
||||
attr.create(self.context)
|
||||
dpl_get.add_attribute(attr)
|
||||
dpl_get.save(self.context)
|
||||
dpl_get = objects.Deployable.get(self.context, dpl_get.uuid)
|
||||
self.assertEqual(len(dpl_get.attributes_list), 1)
|
||||
self.assertEqual(dpl_get.attributes_list[0].id, attr.id)
|
||||
|
||||
dpl_get.delete_attribute(self.context, dpl_get.attributes_list[0])
|
||||
self.assertEqual(len(dpl_get.attributes_list), 0)
|
||||
self.assertRaises(exception.AttributeNotFound,
|
||||
objects.Attribute.get, self.context,
|
||||
attr.uuid)
|
||||
|
||||
def test_get_by_filter_with_attributes(self):
|
||||
db_acc = self.fake_accelerator
|
||||
acc = objects.Accelerator(context=self.context,
|
||||
**db_acc)
|
||||
acc.create(self.context)
|
||||
acc_get = objects.Accelerator.get(self.context, acc.uuid)
|
||||
|
||||
db_dpl = self.fake_deployable
|
||||
dpl = objects.Deployable(context=self.context,
|
||||
**db_dpl)
|
||||
dpl.accelerator_id = acc_get.id
|
||||
dpl.create(self.context)
|
||||
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
|
||||
|
||||
db_dpl2 = self.fake_deployable2
|
||||
dpl2 = objects.Deployable(context=self.context,
|
||||
**db_dpl2)
|
||||
dpl2.accelerator_id = acc_get.id
|
||||
dpl2.create(self.context)
|
||||
dpl2_get = objects.Deployable.get(self.context, dpl2.uuid)
|
||||
|
||||
db_attr = self.fake_attribute
|
||||
attr = objects.Attribute(context=self.context,
|
||||
**db_attr)
|
||||
attr.deployable_id = dpl_get.id
|
||||
attr.create(self.context)
|
||||
|
||||
db_attr2 = self.fake_attribute2
|
||||
attr2 = objects.Attribute(context=self.context,
|
||||
**db_attr2)
|
||||
attr2.deployable_id = dpl2_get.id
|
||||
attr2.create(self.context)
|
||||
|
||||
db_attr3 = self.fake_attribute3
|
||||
attr3 = objects.Attribute(context=self.context,
|
||||
**db_attr3)
|
||||
attr3.deployable_id = dpl2_get.id
|
||||
attr3.create(self.context)
|
||||
|
||||
dpl.add_attribute(attr)
|
||||
dpl.save(self.context)
|
||||
|
||||
dpl2.add_attribute(attr2)
|
||||
dpl2.save(self.context)
|
||||
|
||||
dpl2.add_attribute(attr3)
|
||||
dpl2.save(self.context)
|
||||
|
||||
query = {"attr_key": "attr_val"}
|
||||
|
||||
dpl_get_list = objects.Deployable.get_by_filter(self.context, query)
|
||||
self.assertEqual(len(dpl_get_list), 2)
|
||||
self.assertEqual(dpl_get_list[0].uuid, dpl.uuid)
|
||||
|
||||
attr2.set_key_value_pair("test_key", "test_val")
|
||||
attr2.save(self.context)
|
||||
|
||||
attr3.set_key_value_pair("test_key3", "test_val3")
|
||||
attr3.save(self.context)
|
||||
|
||||
query = {"test_key": "test_val"}
|
||||
dpl_get_list = objects.Deployable.get_by_filter(self.context, query)
|
||||
self.assertEqual(len(dpl_get_list), 1)
|
||||
self.assertEqual(dpl_get_list[0].uuid, dpl2.uuid)
|
||||
|
||||
query = {"test_key": "test_val", "test_key3": "test_val3"}
|
||||
dpl_get_list = objects.Deployable.get_by_filter(self.context, query)
|
||||
self.assertEqual(len(dpl_get_list), 1)
|
||||
self.assertEqual(dpl_get_list[0].uuid, dpl2.uuid)
|
||||
|
||||
query = {"host": "host_name", "test_key3": "test_val3"}
|
||||
dpl_get_list = objects.Deployable.get_by_filter(self.context, query)
|
||||
self.assertEqual(len(dpl_get_list), 1)
|
||||
self.assertEqual(dpl_get_list[0].uuid, dpl2.uuid)
|
||||
|
||||
|
||||
class TestDeployableObject(test_objects._LocalTest,
|
||||
_TestDeployableObject):
|
||||
|
Loading…
Reference in New Issue
Block a user