Introduce ovo objects for ports
Those objects are intentionally not integrated into the database code so far. This is to quicken access to their definitions to implement push-notifications for security groups and ports. The object embeds segmentation information in addition to what's available through the model. Specifically, binding_levels field exposes all ml2 binding levels, that from their side load corresponding network segment object. The order for level objects in binding_levels list field is guaranteed to be in the order of level. So the consumers can eg. access the bottom binding info with: port_obj.binding_levels[-1].segment For PortBindingLevel object, we want to expose segmentation info. This is achieved through a 'segment' ObjectField. The database model itself contains segment_id too. There is no reason though to expose it for Level object in two places (one as a model field, another one through the ObjectField), so we avoid adding ID field. The base class that handles loading for ObjectField based synthetic fields was assuming that objects always have a field per model attribute, so it needed a slight adjustment to support this case, where we extract foreign_keys attributes from the model itself if the field is not present on the object. Partially-Implements: blueprint adopt-oslo-versioned-objects-for-db Partially-Implements: blueprint push-notifications Change-Id: I25de14e42e345d9235dbf4097c298ef5d606de51 Co-Authored-By: Martin Hickey <martin.hickey@ie.ibm.com> Co-Authored-By: Rossella Sblendido <rsblendido@suse.com> Co-Authored-By: Manjeet Singh Bhatia <manjeet.s.bhatia@intel.com> Co-Authored-By: Brandon Logan <brandon.logan@rackspace.com> Co-Authored-By: Victor Morales <victor.morales@intel.com>
This commit is contained in:
parent
f703f34894
commit
dcd78423aa
@ -110,12 +110,13 @@ class Port(standard_attr.HasStandardAttributes, model_base.BASEV2,
|
||||
)
|
||||
api_collections = [attr.PORTS]
|
||||
|
||||
def __init__(self, id=None, tenant_id=None, name=None, network_id=None,
|
||||
mac_address=None, admin_state_up=None, status=None,
|
||||
device_id=None, device_owner=None, fixed_ips=None, **kwargs):
|
||||
def __init__(self, id=None, tenant_id=None, project_id=None, name=None,
|
||||
network_id=None, mac_address=None, admin_state_up=None,
|
||||
status=None, device_id=None, device_owner=None,
|
||||
fixed_ips=None, **kwargs):
|
||||
super(Port, self).__init__(**kwargs)
|
||||
self.id = id
|
||||
self.tenant_id = tenant_id
|
||||
self.project_id = project_id or tenant_id
|
||||
self.name = name
|
||||
self.network_id = network_id
|
||||
self.mac_address = mac_address
|
||||
|
@ -544,7 +544,7 @@ class NeutronDbObject(NeutronObject):
|
||||
else:
|
||||
synth_objs = objclass.get_objects(
|
||||
self.obj_context, **{
|
||||
k: getattr(self, v)
|
||||
k: getattr(self, v) if v in self else db_obj.get(v)
|
||||
for k, v in foreign_keys.items()})
|
||||
if isinstance(self.fields[field], obj_fields.ObjectField):
|
||||
setattr(self, field, synth_objs[0] if synth_objs else None)
|
||||
|
@ -21,6 +21,7 @@ import six
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.common import constants
|
||||
from neutron.extensions import dns as dns_ext
|
||||
|
||||
|
||||
class NeutronRangeConstrainedIntegerInvalidLimit(exceptions.NeutronException):
|
||||
@ -78,6 +79,25 @@ class ListOfIPNetworksField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.List(obj_fields.IPNetwork())
|
||||
|
||||
|
||||
class SetOfUUIDsField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Set(obj_fields.UUID())
|
||||
|
||||
|
||||
class DomainName(obj_fields.String):
|
||||
def coerce(self, obj, attr, value):
|
||||
if not isinstance(value, six.string_types):
|
||||
msg = _("Field value %s is not a string") % value
|
||||
raise ValueError(msg)
|
||||
if len(value) > dns_ext.FQDN_MAX_LEN:
|
||||
msg = _("Domain name %s is too long") % value
|
||||
raise ValueError(msg)
|
||||
return super(DomainName, self).coerce(obj, attr, value)
|
||||
|
||||
|
||||
class DomainNameField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = DomainName()
|
||||
|
||||
|
||||
class IntegerEnum(obj_fields.Integer):
|
||||
def __init__(self, valid_values=None, **kwargs):
|
||||
if not valid_values:
|
||||
|
@ -10,6 +10,9 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# TODO(ihrachys): cover the module with functional tests targeting supported
|
||||
# backends
|
||||
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
@ -83,3 +86,15 @@ def delete_object(context, model, **kwargs):
|
||||
with context.session.begin(subtransactions=True):
|
||||
db_obj = _safe_get_object(context, model, **kwargs)
|
||||
context.session.delete(db_obj)
|
||||
|
||||
|
||||
# TODO(ihrachys): expose through objects API
|
||||
def delete_objects(context, model, **kwargs):
|
||||
'''Delete matching objects, if any.
|
||||
|
||||
This function does not raise exceptions if nothing matches.
|
||||
'''
|
||||
with context.session.begin(subtransactions=True):
|
||||
db_objs = get_objects(context, model, **kwargs)
|
||||
for db_obj in db_objs:
|
||||
context.session.delete(db_obj)
|
||||
|
@ -22,3 +22,8 @@ class _PortSecurity(base.NeutronDbObject):
|
||||
'port_security_enabled': obj_fields.BooleanField(
|
||||
default=portsecurity.DEFAULT_PORT_SECURITY),
|
||||
}
|
||||
|
||||
foreign_keys = {
|
||||
'Port': {'id': 'id'},
|
||||
'Network': {'id': 'id'},
|
||||
}
|
||||
|
@ -35,4 +35,7 @@ class NetworkSegment(base.NeutronDbObject):
|
||||
'segment_index': obj_fields.IntegerField(default=0)
|
||||
}
|
||||
|
||||
foreign_keys = {'Network': {'network_id': 'id'}}
|
||||
foreign_keys = {
|
||||
'Network': {'network_id': 'id'},
|
||||
'PortBindingLevel': {'id': 'segment_id'},
|
||||
}
|
||||
|
@ -34,6 +34,10 @@ class AllowedAddressPair(base.NeutronDbObject):
|
||||
'ip_address': common_types.IPNetworkField(),
|
||||
}
|
||||
|
||||
foreign_keys = {
|
||||
'Port': {'port_id': 'id'},
|
||||
}
|
||||
|
||||
# TODO(mhickey): get rid of it once we switch the db model to using
|
||||
# custom types.
|
||||
@classmethod
|
||||
|
@ -31,3 +31,7 @@ class ExtraDhcpOpt(base.NeutronDbObject):
|
||||
'opt_value': obj_fields.StringField(),
|
||||
'ip_version': obj_fields.IntegerField(),
|
||||
}
|
||||
|
||||
foreign_keys = {
|
||||
'Port': {'port_id': 'id'},
|
||||
}
|
||||
|
384
neutron/objects/ports.py
Normal file
384
neutron/objects/ports.py
Normal file
@ -0,0 +1,384 @@
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
|
||||
import netaddr
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_versionedobjects import base as obj_base
|
||||
from oslo_versionedobjects import fields as obj_fields
|
||||
|
||||
from neutron.common import utils
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db import dns_db as dns_models
|
||||
from neutron.db.models import securitygroup as sg_models
|
||||
from neutron.db import models_v2
|
||||
from neutron.db.qos import models as qos_models
|
||||
from neutron.objects import base
|
||||
from neutron.objects import common_types
|
||||
from neutron.objects.db import api as obj_db_api
|
||||
from neutron.plugins.ml2 import models as ml2_models
|
||||
|
||||
|
||||
class PortBindingBase(base.NeutronDbObject):
|
||||
|
||||
foreign_keys = {
|
||||
'Port': {'port_id': 'id'},
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def filter_to_json_str(value):
|
||||
def _dict_to_json(v):
|
||||
return jsonutils.dumps(
|
||||
collections.OrderedDict(
|
||||
sorted(v.items(), key=lambda t: t[0])
|
||||
) if v else {}
|
||||
)
|
||||
|
||||
if isinstance(value, list):
|
||||
return [_dict_to_json(val) for val in value]
|
||||
v = _dict_to_json(value)
|
||||
return v
|
||||
|
||||
@classmethod
|
||||
def modify_fields_to_db(cls, fields):
|
||||
result = super(PortBindingBase, cls).modify_fields_to_db(fields)
|
||||
if 'vif_details' in result:
|
||||
result['vif_details'] = (
|
||||
cls.filter_to_json_str(result['vif_details']))
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
def modify_fields_from_db(cls, db_obj):
|
||||
fields = super(PortBindingBase, cls).modify_fields_from_db(db_obj)
|
||||
if 'vif_details' in fields:
|
||||
fields['vif_details'] = jsonutils.loads(fields['vif_details'])
|
||||
if not fields['vif_details']:
|
||||
fields['vif_details'] = None
|
||||
return fields
|
||||
|
||||
|
||||
@obj_base.VersionedObjectRegistry.register
|
||||
class PortBinding(PortBindingBase):
|
||||
# Version 1.0: Initial version
|
||||
VERSION = '1.0'
|
||||
|
||||
db_model = ml2_models.PortBinding
|
||||
|
||||
fields = {
|
||||
'port_id': obj_fields.UUIDField(),
|
||||
'host': obj_fields.StringField(),
|
||||
'profile': obj_fields.StringField(),
|
||||
'vif_type': obj_fields.StringField(),
|
||||
'vif_details': obj_fields.DictOfStringsField(nullable=True),
|
||||
'vnic_type': obj_fields.StringField(),
|
||||
}
|
||||
|
||||
primary_keys = ['port_id']
|
||||
|
||||
|
||||
@obj_base.VersionedObjectRegistry.register
|
||||
class DistributedPortBinding(PortBindingBase):
|
||||
# Version 1.0: Initial version
|
||||
VERSION = '1.0'
|
||||
|
||||
db_model = ml2_models.DistributedPortBinding
|
||||
|
||||
fields = {
|
||||
'port_id': obj_fields.UUIDField(),
|
||||
'host': obj_fields.StringField(),
|
||||
'profile': obj_fields.StringField(),
|
||||
'vif_type': obj_fields.StringField(),
|
||||
'vif_details': obj_fields.DictOfStringsField(nullable=True),
|
||||
'vnic_type': obj_fields.StringField(),
|
||||
# NOTE(ihrachys): Fields below are specific to this type of binding. In
|
||||
# the future, we could think of converging different types of bindings
|
||||
# into a single field
|
||||
'status': obj_fields.StringField(),
|
||||
'router_id': obj_fields.StringField(nullable=True),
|
||||
}
|
||||
|
||||
primary_keys = ['host', 'port_id']
|
||||
|
||||
|
||||
@obj_base.VersionedObjectRegistry.register
|
||||
class PortBindingLevel(base.NeutronDbObject):
|
||||
# Version 1.0: Initial version
|
||||
VERSION = '1.0'
|
||||
|
||||
db_model = ml2_models.PortBindingLevel
|
||||
|
||||
primary_keys = ['port_id', 'host', 'level']
|
||||
|
||||
fields = {
|
||||
'port_id': obj_fields.UUIDField(),
|
||||
'host': obj_fields.StringField(),
|
||||
'level': obj_fields.IntegerField(),
|
||||
'driver': obj_fields.StringField(nullable=True),
|
||||
'segment': obj_fields.ObjectField(
|
||||
'NetworkSegment', nullable=True
|
||||
),
|
||||
}
|
||||
|
||||
synthetic_fields = ['segment']
|
||||
|
||||
foreign_keys = {
|
||||
'Port': {'port_id': 'id'},
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get_objects(cls, context, _pager=None, validate_filters=True,
|
||||
**kwargs):
|
||||
if not _pager:
|
||||
_pager = base.Pager()
|
||||
if not _pager.sorts:
|
||||
# (NOTE) True means ASC, False is DESC
|
||||
_pager.sorts = [('port_id', True), ('level', True)]
|
||||
return super(PortBindingLevel, cls).get_objects(
|
||||
context, _pager, validate_filters, **kwargs)
|
||||
|
||||
|
||||
@obj_base.VersionedObjectRegistry.register
|
||||
class IPAllocation(base.NeutronDbObject):
|
||||
# Version 1.0: Initial version
|
||||
VERSION = '1.0'
|
||||
|
||||
db_model = models_v2.IPAllocation
|
||||
|
||||
fields = {
|
||||
'port_id': obj_fields.UUIDField(nullable=True),
|
||||
'subnet_id': obj_fields.UUIDField(),
|
||||
'network_id': obj_fields.UUIDField(),
|
||||
'ip_address': obj_fields.IPAddressField(),
|
||||
}
|
||||
|
||||
primary_keys = ['subnet_id', 'network_id', 'ip_address']
|
||||
|
||||
foreign_keys = {
|
||||
'Port': {'port_id': 'id'},
|
||||
}
|
||||
|
||||
# TODO(rossella_s): get rid of it once we switch the db model to using
|
||||
# custom types.
|
||||
@classmethod
|
||||
def modify_fields_to_db(cls, fields):
|
||||
result = super(IPAllocation, cls).modify_fields_to_db(fields)
|
||||
if 'ip_address' in result:
|
||||
result['ip_address'] = cls.filter_to_str(result['ip_address'])
|
||||
return result
|
||||
|
||||
# TODO(rossella_s): get rid of it once we switch the db model to using
|
||||
# custom types.
|
||||
@classmethod
|
||||
def modify_fields_from_db(cls, db_obj):
|
||||
fields = super(IPAllocation, cls).modify_fields_from_db(db_obj)
|
||||
if 'ip_address' in fields:
|
||||
fields['ip_address'] = netaddr.IPAddress(fields['ip_address'])
|
||||
return fields
|
||||
|
||||
|
||||
@obj_base.VersionedObjectRegistry.register
|
||||
class PortDNS(base.NeutronDbObject):
|
||||
# Version 1.0: Initial version
|
||||
VERSION = '1.0'
|
||||
|
||||
db_model = dns_models.PortDNS
|
||||
|
||||
primary_keys = ['port_id']
|
||||
|
||||
foreign_keys = {
|
||||
'Port': {'port_id': 'id'},
|
||||
}
|
||||
|
||||
fields = {
|
||||
'port_id': obj_fields.UUIDField(),
|
||||
'current_dns_name': common_types.DomainNameField(),
|
||||
'current_dns_domain': common_types.DomainNameField(),
|
||||
'previous_dns_name': common_types.DomainNameField(),
|
||||
'previous_dns_domain': common_types.DomainNameField(),
|
||||
'dns_name': common_types.DomainNameField(),
|
||||
}
|
||||
|
||||
|
||||
@obj_base.VersionedObjectRegistry.register
|
||||
class Port(base.NeutronDbObject):
|
||||
# Version 1.0: Initial version
|
||||
VERSION = '1.0'
|
||||
|
||||
db_model = models_v2.Port
|
||||
|
||||
fields = {
|
||||
'id': obj_fields.UUIDField(),
|
||||
'project_id': obj_fields.StringField(nullable=True),
|
||||
'name': obj_fields.StringField(nullable=True),
|
||||
'network_id': obj_fields.UUIDField(),
|
||||
'mac_address': common_types.MACAddressField(),
|
||||
'admin_state_up': obj_fields.BooleanField(),
|
||||
'device_id': obj_fields.StringField(),
|
||||
'device_owner': obj_fields.StringField(),
|
||||
'status': obj_fields.StringField(),
|
||||
|
||||
'allowed_address_pairs': obj_fields.ListOfObjectsField(
|
||||
'AllowedAddressPair', nullable=True
|
||||
),
|
||||
'binding': obj_fields.ObjectField(
|
||||
'PortBinding', nullable=True
|
||||
),
|
||||
'dhcp_options': obj_fields.ListOfObjectsField(
|
||||
'ExtraDhcpOpt', nullable=True
|
||||
),
|
||||
'distributed_binding': obj_fields.ObjectField(
|
||||
'DistributedPortBinding', nullable=True
|
||||
),
|
||||
'dns': obj_fields.ObjectField('PortDNS', nullable=True),
|
||||
'fixed_ips': obj_fields.ListOfObjectsField(
|
||||
'IPAllocation', nullable=True
|
||||
),
|
||||
# TODO(ihrachys): consider converting to boolean
|
||||
'security': obj_fields.ObjectField(
|
||||
'PortSecurity', nullable=True
|
||||
),
|
||||
'security_group_ids': common_types.SetOfUUIDsField(
|
||||
nullable=True,
|
||||
# TODO(ihrachys): how do we safely pass a mutable default?
|
||||
default=None,
|
||||
),
|
||||
'qos_policy_id': obj_fields.UUIDField(nullable=True, default=None),
|
||||
|
||||
'binding_levels': obj_fields.ListOfObjectsField(
|
||||
'PortBindingLevel', nullable=True
|
||||
),
|
||||
|
||||
# TODO(ihrachys): consider adding a 'dns_assignment' fully synthetic
|
||||
# field in later object iterations
|
||||
}
|
||||
|
||||
synthetic_fields = [
|
||||
'allowed_address_pairs',
|
||||
'binding',
|
||||
'binding_levels',
|
||||
'dhcp_options',
|
||||
'distributed_binding',
|
||||
'dns',
|
||||
'fixed_ips',
|
||||
'qos_policy_id',
|
||||
'security',
|
||||
'security_group_ids',
|
||||
]
|
||||
|
||||
fields_need_translation = {
|
||||
'binding': 'port_binding',
|
||||
'dhcp_options': 'dhcp_opts',
|
||||
'distributed_binding': 'distributed_port_binding',
|
||||
'security': 'port_security',
|
||||
}
|
||||
|
||||
def create(self):
|
||||
fields = self.obj_get_changes()
|
||||
with db_api.autonested_transaction(self.obj_context.session):
|
||||
sg_ids = self.security_group_ids
|
||||
if sg_ids is None:
|
||||
sg_ids = set()
|
||||
qos_policy_id = self.qos_policy_id
|
||||
super(Port, self).create()
|
||||
if 'security_group_ids' in fields:
|
||||
self._attach_security_groups(sg_ids)
|
||||
if 'qos_policy_id' in fields:
|
||||
self._attach_qos_policy(qos_policy_id)
|
||||
|
||||
def update(self):
|
||||
fields = self.obj_get_changes()
|
||||
with db_api.autonested_transaction(self.obj_context.session):
|
||||
super(Port, self).update()
|
||||
if 'security_group_ids' in fields:
|
||||
self._attach_security_groups(fields['security_group_ids'])
|
||||
if 'qos_policy_id' in fields:
|
||||
self._attach_qos_policy(fields['qos_policy_id'])
|
||||
|
||||
def _attach_qos_policy(self, qos_policy_id):
|
||||
# TODO(ihrachys): introduce an object for the binding to isolate
|
||||
# database access in a single place, currently scattered between port
|
||||
# and policy objects
|
||||
obj_db_api.delete_objects(
|
||||
self.obj_context, qos_models.QosPortPolicyBinding, port_id=self.id)
|
||||
if qos_policy_id:
|
||||
obj_db_api.create_object(
|
||||
self.obj_context, qos_models.QosPortPolicyBinding,
|
||||
{'port_id': self.id, 'policy_id': qos_policy_id}
|
||||
)
|
||||
self.qos_policy_id = qos_policy_id
|
||||
self.obj_reset_changes(['qos_policy_id'])
|
||||
|
||||
def _attach_security_groups(self, sg_ids):
|
||||
# TODO(ihrachys): consider introducing an (internal) object for the
|
||||
# binding to decouple database operations a bit more
|
||||
obj_db_api.delete_objects(
|
||||
self.obj_context, sg_models.SecurityGroupPortBinding,
|
||||
port_id=self.id,
|
||||
)
|
||||
if sg_ids:
|
||||
for sg_id in sg_ids:
|
||||
self._attach_security_group(sg_id)
|
||||
self.security_group_ids = sg_ids
|
||||
self.obj_reset_changes(['security_group_ids'])
|
||||
|
||||
def _attach_security_group(self, sg_id):
|
||||
obj_db_api.create_object(
|
||||
self.obj_context, sg_models.SecurityGroupPortBinding,
|
||||
{'port_id': self.id, 'security_group_id': sg_id}
|
||||
)
|
||||
|
||||
# TODO(rossella_s): get rid of it once we switch the db model to using
|
||||
# custom types.
|
||||
@classmethod
|
||||
def modify_fields_to_db(cls, fields):
|
||||
result = super(Port, cls).modify_fields_to_db(fields)
|
||||
if 'mac_address' in result:
|
||||
result['mac_address'] = cls.filter_to_str(result['mac_address'])
|
||||
return result
|
||||
|
||||
# TODO(rossella_s): get rid of it once we switch the db model to using
|
||||
# custom types.
|
||||
@classmethod
|
||||
def modify_fields_from_db(cls, db_obj):
|
||||
fields = super(Port, cls).modify_fields_from_db(db_obj)
|
||||
if 'mac_address' in fields:
|
||||
fields['mac_address'] = utils.AuthenticEUI(fields['mac_address'])
|
||||
distributed_port_binding = fields.get('distributed_binding')
|
||||
if distributed_port_binding:
|
||||
fields['distributed_binding'] = fields['distributed_binding'][0]
|
||||
else:
|
||||
fields['distributed_binding'] = None
|
||||
return fields
|
||||
|
||||
def from_db_object(self, db_obj):
|
||||
super(Port, self).from_db_object(db_obj)
|
||||
# extract security group bindings
|
||||
if db_obj.get('security_groups', []):
|
||||
self.security_group_ids = {
|
||||
sg.security_group_id
|
||||
for sg in db_obj.security_groups
|
||||
}
|
||||
else:
|
||||
self.security_group_ids = set()
|
||||
self.obj_reset_changes(['security_group_ids'])
|
||||
|
||||
# extract qos policy binding
|
||||
if db_obj.get('qos_policy_binding'):
|
||||
self.qos_policy_id = (
|
||||
db_obj.qos_policy_binding.policy_id
|
||||
)
|
||||
else:
|
||||
self.qos_policy_id = None
|
||||
self.obj_reset_changes(['qos_policy_id'])
|
@ -77,6 +77,12 @@ class PortBindingLevel(model_base.BASEV2):
|
||||
sa.ForeignKey('networksegments.id',
|
||||
ondelete="SET NULL"))
|
||||
|
||||
# Add a relationship to the Port model in order to instruct SQLAlchemy to
|
||||
# eagerly load port bindings
|
||||
port = orm.relationship(
|
||||
models_v2.Port,
|
||||
backref=orm.backref("binding_levels", lazy='joined', cascade='delete'))
|
||||
|
||||
|
||||
class DistributedPortBinding(model_base.BASEV2):
|
||||
"""Represent binding-related state of a Distributed Router(DVR, HA) port.
|
||||
|
@ -11,12 +11,15 @@
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from neutron_lib import exceptions as n_exc
|
||||
|
||||
from neutron import context
|
||||
from neutron.db import models_v2
|
||||
from neutron import manager
|
||||
from neutron.objects import base
|
||||
from neutron.objects.db import api
|
||||
from neutron.tests import base as test_base
|
||||
from neutron.tests.unit import testlib_api
|
||||
|
||||
|
||||
PLUGIN_NAME = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
|
||||
@ -47,3 +50,72 @@ class GetObjectsTestCase(test_base.BaseTestCase):
|
||||
filters={},
|
||||
limit=limit,
|
||||
marker_obj=get_object.return_value)
|
||||
|
||||
|
||||
class CRUDScenarioTestCase(testlib_api.SqlTestCase):
|
||||
|
||||
CORE_PLUGIN = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
|
||||
|
||||
def setUp(self):
|
||||
super(CRUDScenarioTestCase, self).setUp()
|
||||
# TODO(ihrachys): revisit plugin setup once we decouple
|
||||
# neutron.objects.db.api from core plugin instance
|
||||
self.setup_coreplugin(self.CORE_PLUGIN)
|
||||
# NOTE(ihrachys): nothing specific to networks in this test case, but
|
||||
# we needed to pick some real model, so we picked the network. Any
|
||||
# other model would work as well for our needs here.
|
||||
self.model = models_v2.Network
|
||||
self.ctxt = context.get_admin_context()
|
||||
|
||||
def test_get_object_create_update_delete(self):
|
||||
obj = api.create_object(self.ctxt, self.model, {'name': 'foo'})
|
||||
|
||||
new_obj = api.get_object(self.ctxt, self.model, id=obj.id)
|
||||
self.assertEqual(obj, new_obj)
|
||||
|
||||
obj = new_obj
|
||||
api.update_object(self.ctxt, self.model, {'name': 'bar'}, id=obj.id)
|
||||
|
||||
new_obj = api.get_object(self.ctxt, self.model, id=obj.id)
|
||||
self.assertEqual(obj, new_obj)
|
||||
|
||||
obj = new_obj
|
||||
api.delete_object(self.ctxt, self.model, id=obj.id)
|
||||
|
||||
new_obj = api.get_object(self.ctxt, self.model, id=obj.id)
|
||||
self.assertIsNone(new_obj)
|
||||
|
||||
# delete_object raises an exception on missing object
|
||||
self.assertRaises(
|
||||
n_exc.ObjectNotFound,
|
||||
api.delete_object, self.ctxt, self.model, id=obj.id)
|
||||
|
||||
# but delete_objects does not not
|
||||
api.delete_objects(self.ctxt, self.model, id=obj.id)
|
||||
|
||||
def test_delete_objects_removes_all_matching_objects(self):
|
||||
# create some objects with identical description
|
||||
for i in range(10):
|
||||
api.create_object(
|
||||
self.ctxt, self.model,
|
||||
{'name': 'foo%d' % i, 'description': 'bar'})
|
||||
# create some more objects with a different description
|
||||
descriptions = set()
|
||||
for i in range(10, 20):
|
||||
desc = 'bar%d' % i
|
||||
descriptions.add(desc)
|
||||
api.create_object(
|
||||
self.ctxt, self.model,
|
||||
{'name': 'foo%d' % i, 'description': desc})
|
||||
# make sure that all objects are in the database
|
||||
self.assertEqual(20, api.count(self.ctxt, self.model))
|
||||
# now delete just those with the 'bar' description
|
||||
api.delete_objects(self.ctxt, self.model, description='bar')
|
||||
|
||||
# check that half of objects are gone, and remaining have expected
|
||||
# descriptions
|
||||
objs = api.get_objects(self.ctxt, self.model)
|
||||
self.assertEqual(10, len(objs))
|
||||
self.assertEqual(
|
||||
descriptions,
|
||||
{obj.description for obj in objs})
|
||||
|
@ -385,7 +385,7 @@ class QosPolicyDbObjectTestCase(test_base.BaseDbObjectTestCase,
|
||||
obj = self._create_test_policy()
|
||||
obj.attach_port(self._port['id'])
|
||||
ids = self._test_class.get_bound_tenant_ids(self.context, obj['id'])
|
||||
self.assertEqual(ids.pop(), self._port['tenant_id'])
|
||||
self.assertEqual(ids.pop(), self._port.project_id)
|
||||
self.assertEqual(len(ids), 0)
|
||||
|
||||
obj.detach_port(self._port['id'])
|
||||
|
@ -13,10 +13,11 @@
|
||||
import collections
|
||||
import copy
|
||||
import itertools
|
||||
import netaddr
|
||||
import os.path
|
||||
import random
|
||||
|
||||
import mock
|
||||
import netaddr
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from oslo_db import exception as obj_exc
|
||||
from oslo_utils import timeutils
|
||||
@ -27,14 +28,17 @@ from oslo_versionedobjects import fixture
|
||||
import testtools
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.common import utils
|
||||
from neutron import context
|
||||
from neutron.db import db_base_plugin_v2
|
||||
from neutron.db import model_base
|
||||
from neutron.db import models_v2
|
||||
from neutron.db import segments_db
|
||||
from neutron import objects
|
||||
from neutron.objects import base
|
||||
from neutron.objects import common_types
|
||||
from neutron.objects.db import api as obj_db_api
|
||||
from neutron.objects import ports
|
||||
from neutron.objects import subnet
|
||||
from neutron.tests import base as test_base
|
||||
from neutron.tests import tools
|
||||
@ -355,10 +359,6 @@ def get_random_dscp_mark():
|
||||
return random.choice(constants.VALID_DSCP_MARKS)
|
||||
|
||||
|
||||
def get_random_direction():
|
||||
return random.choice(constants.VALID_DIRECTIONS)
|
||||
|
||||
|
||||
def get_list_of_random_networks(num=10):
|
||||
for i in range(5):
|
||||
res = [tools.get_random_ip_network() for i in range(num)]
|
||||
@ -368,6 +368,27 @@ def get_list_of_random_networks(num=10):
|
||||
raise Exception('Failed to generate unique networks')
|
||||
|
||||
|
||||
def get_random_domain_name():
|
||||
return '.'.join([
|
||||
tools.get_random_string(62)[:random.choice(range(63))]
|
||||
for i in range(4)
|
||||
])
|
||||
|
||||
|
||||
def get_random_dict_of_strings():
|
||||
return {
|
||||
tools.get_random_string(): tools.get_random_string()
|
||||
for i in range(10)
|
||||
}
|
||||
|
||||
|
||||
def get_set_of_random_uuids():
|
||||
return {
|
||||
uuidutils.generate_uuid()
|
||||
for i in range(10)
|
||||
}
|
||||
|
||||
|
||||
FIELD_TYPE_VALUE_GENERATOR_MAP = {
|
||||
obj_fields.BooleanField: tools.get_random_boolean,
|
||||
obj_fields.IntegerField: tools.get_random_integer,
|
||||
@ -375,8 +396,9 @@ FIELD_TYPE_VALUE_GENERATOR_MAP = {
|
||||
obj_fields.UUIDField: uuidutils.generate_uuid,
|
||||
obj_fields.ObjectField: lambda: None,
|
||||
obj_fields.ListOfObjectsField: lambda: [],
|
||||
obj_fields.DictOfStringsField: get_random_dict_of_strings,
|
||||
common_types.DomainNameField: get_random_domain_name,
|
||||
common_types.DscpMarkField: get_random_dscp_mark,
|
||||
common_types.FlowDirectionEnumField: get_random_direction,
|
||||
obj_fields.IPNetworkField: tools.get_random_ip_network,
|
||||
common_types.IPNetworkField: tools.get_random_ip_network,
|
||||
common_types.IPNetworkPrefixLenField: tools.get_random_prefixlen,
|
||||
@ -390,6 +412,7 @@ FIELD_TYPE_VALUE_GENERATOR_MAP = {
|
||||
common_types.EtherTypeEnumField: tools.get_random_ether_type,
|
||||
common_types.IpProtocolEnumField: tools.get_random_ip_protocol,
|
||||
common_types.PortRangeField: tools.get_random_port,
|
||||
common_types.SetOfUUIDsField: get_set_of_random_uuids,
|
||||
}
|
||||
|
||||
|
||||
@ -427,6 +450,8 @@ class _BaseObjectTestCase(object):
|
||||
# TODO(ihrachys): revisit plugin setup once we decouple
|
||||
# neutron.objects.db.api from core plugin instance
|
||||
self.setup_coreplugin(self.CORE_PLUGIN)
|
||||
# make sure all objects are loaded and registered in the registry
|
||||
utils.import_modules_recursively(os.path.dirname(objects.__file__))
|
||||
self.context = context.get_admin_context()
|
||||
self.db_objs = [
|
||||
self._test_class.db_model(**self.get_random_fields())
|
||||
@ -457,9 +482,8 @@ class _BaseObjectTestCase(object):
|
||||
|
||||
# TODO(ihrachys): rename the method to explicitly reflect it returns db
|
||||
# attributes not object fields
|
||||
@classmethod
|
||||
def get_random_fields(cls, obj_cls=None):
|
||||
obj_cls = obj_cls or cls._test_class
|
||||
def get_random_fields(self, obj_cls=None):
|
||||
obj_cls = obj_cls or self._test_class
|
||||
fields = {}
|
||||
ip_version = tools.get_random_ip_version()
|
||||
for field, field_obj in obj_cls.fields.items():
|
||||
@ -577,14 +601,16 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
|
||||
if self._test_class.is_object_field(field):
|
||||
obj_class = self._get_ovo_object_class(self._test_class,
|
||||
field)
|
||||
foreign_keys = obj_class.foreign_keys.get(
|
||||
self._test_class.__name__)
|
||||
filter_kwargs = {
|
||||
obj_class.fields_need_translation.get(k, k): db_obj[v]
|
||||
for k, v in obj_class.foreign_keys.get(
|
||||
self._test_class.__name__).items()
|
||||
}
|
||||
mock_calls.append(
|
||||
mock.call(
|
||||
self.context, obj_class.db_model,
|
||||
_pager=self.pager_map[obj_class.obj_name()],
|
||||
**{k: db_obj[v]
|
||||
for k, v in foreign_keys.items()}))
|
||||
**filter_kwargs))
|
||||
return mock_calls
|
||||
|
||||
def test_get_objects(self):
|
||||
@ -1028,7 +1054,13 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
|
||||
continue
|
||||
for db_obj in self.db_objs:
|
||||
objclass_fields = self.get_random_fields(objclass)
|
||||
db_obj[synth_field] = [objclass.db_model(**objclass_fields)]
|
||||
if isinstance(self._test_class.fields[synth_field],
|
||||
obj_fields.ObjectField):
|
||||
db_obj[synth_field] = objclass.db_model(**objclass_fields)
|
||||
else:
|
||||
db_obj[synth_field] = [
|
||||
objclass.db_model(**objclass_fields)
|
||||
]
|
||||
|
||||
def _create_test_network(self):
|
||||
# TODO(ihrachys): replace with network.create() once we get an object
|
||||
@ -1060,14 +1092,16 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
|
||||
|
||||
def _create_port(self, **port_attrs):
|
||||
if not hasattr(self, '_mac_address_generator'):
|
||||
self._mac_address_generator = (":".join(["%02x" % i] * 6)
|
||||
for i in itertools.count())
|
||||
self._mac_address_generator = (
|
||||
netaddr.EUI(":".join(["%02x" % i] * 6))
|
||||
for i in itertools.count()
|
||||
)
|
||||
|
||||
if not hasattr(self, '_port_name_generator'):
|
||||
self._port_name_generator = ("test-port%d" % i
|
||||
for i in itertools.count(1))
|
||||
|
||||
attrs = {'tenant_id': 'fake_tenant_id',
|
||||
attrs = {'project_id': uuidutils.generate_uuid(),
|
||||
'admin_state_up': True,
|
||||
'status': 'ACTIVE',
|
||||
'device_id': 'fake_device',
|
||||
@ -1079,9 +1113,9 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
|
||||
if 'mac_address' not in attrs:
|
||||
attrs['mac_address'] = next(self._mac_address_generator)
|
||||
|
||||
# TODO(ihrachys): replace with port.create() once we get an object
|
||||
# implementation for ports
|
||||
return obj_db_api.create_object(self.context, models_v2.Port, attrs)
|
||||
port = ports.Port(self.context, **attrs)
|
||||
port.create()
|
||||
return port
|
||||
|
||||
def _create_test_segment(self, network):
|
||||
test_segment = {
|
||||
@ -1246,8 +1280,12 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
|
||||
dbattr = obj.fields_need_translation.get(field, field)
|
||||
self.assertFalse(getattr(obj.db_obj, dbattr, None))
|
||||
|
||||
objclass_fields = self._get_non_synth_fields(objclass,
|
||||
db_obj[field][0])
|
||||
if isinstance(cls_.fields[field], obj_fields.ObjectField):
|
||||
objclass_fields = self._get_non_synth_fields(objclass,
|
||||
db_obj[field])
|
||||
else:
|
||||
objclass_fields = self._get_non_synth_fields(objclass,
|
||||
db_obj[field][0])
|
||||
|
||||
# make sure children point to the base object
|
||||
foreign_keys = objclass.foreign_keys.get(obj.__class__.__name__)
|
||||
|
@ -18,6 +18,7 @@ import random
|
||||
from neutron_lib import constants as const
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.extensions import dns as dns_ext
|
||||
from neutron.objects import common_types
|
||||
from neutron.tests import base as test_base
|
||||
from neutron.tests import tools
|
||||
@ -170,6 +171,23 @@ class FlowDirectionEnumFieldTest(test_base.BaseTestCase, TestField):
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class DomainNameFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(DomainNameFieldTest, self).setUp()
|
||||
self.field = common_types.DomainNameField()
|
||||
self.coerce_good_values = [
|
||||
(val, val)
|
||||
for val in ('www.google.com', 'hostname', '1abc.com')
|
||||
]
|
||||
self.coerce_bad_values = ['x' * (dns_ext.FQDN_MAX_LEN + 1), 10, []]
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class EtherTypeEnumFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(EtherTypeEnumFieldTest, self).setUp()
|
||||
|
@ -28,13 +28,19 @@ from neutron.tests import base as test_base
|
||||
object_data = {
|
||||
'_DefaultSecurityGroup': '1.0-971520cb2e0ec06d747885a0cf78347f',
|
||||
'AddressScope': '1.0-25560799db384acfe1549634959a82b4',
|
||||
'AllowedAddressPair': '1.0-9f9186b6f952fbf31d257b0458b852c0',
|
||||
'DistributedPortBinding': '1.0-4df058ae1aeae3ae1c15b8f6a4c692d9',
|
||||
'DNSNameServer': '1.0-bf87a85327e2d812d1666ede99d9918b',
|
||||
'ExtraDhcpOpt': '1.0-632f689cbeb36328995a7aed1d0a78d3',
|
||||
'IPAllocation': '1.0-47251b4c6d45c3b5feb0297fe5c461f2',
|
||||
'IPAllocationPool': '1.0-371016a6480ed0b4299319cb46d9215d',
|
||||
'NetworkPortSecurity': '1.0-b30802391a87945ee9c07582b4ff95e3',
|
||||
'NetworkSegment': '1.0-40707ef6bd9a0bf095038158d995cc7d',
|
||||
'Port': '1.0-638f6b09a3809ebd8b2b46293f56871b',
|
||||
'PortBinding': '1.0-f5d3048bec0ac58f08a758427581dff9',
|
||||
'PortBindingLevel': '1.0-de66a4c61a083b8f34319fa9dde5b060',
|
||||
'PortDNS': '1.0-201cf6d057fde75539c3d1f2bbf05902',
|
||||
'PortSecurity': '1.0-b30802391a87945ee9c07582b4ff95e3',
|
||||
'AllowedAddressPair': '1.0-9f9186b6f952fbf31d257b0458b852c0',
|
||||
'QosBandwidthLimitRule': '1.2-4e44a8f5c2895ab1278399f87b40a13d',
|
||||
'QosDscpMarkingRule': '1.2-0313c6554b34fd10c753cb63d638256c',
|
||||
'QosMinimumBandwidthRule': '1.2-314c3419f4799067cc31cc319080adff',
|
||||
|
322
neutron/tests/unit/objects/test_ports.py
Normal file
322
neutron/tests/unit/objects/test_ports.py
Normal file
@ -0,0 +1,322 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
import testscenarios
|
||||
|
||||
from neutron.db.models import securitygroup as sg_models
|
||||
from neutron.db import models_v2
|
||||
from neutron.objects import base as obj_base
|
||||
from neutron.objects.db import api as obj_db_api
|
||||
from neutron.objects import ports
|
||||
from neutron.objects.qos import policy
|
||||
from neutron.tests import tools
|
||||
from neutron.tests.unit.objects import test_base as obj_test_base
|
||||
from neutron.tests.unit import testlib_api
|
||||
|
||||
|
||||
class BasePortBindingDbObjectTestCase(obj_test_base._BaseObjectTestCase,
|
||||
testlib_api.SqlTestCase):
|
||||
def setUp(self):
|
||||
super(BasePortBindingDbObjectTestCase, self).setUp()
|
||||
self._create_test_network()
|
||||
for db_obj, fields, obj in zip(
|
||||
self.db_objs, self.obj_fields, self.objs):
|
||||
port = self._create_port(network_id=self._network['id'])
|
||||
db_obj['port_id'] = port.id
|
||||
fields['port_id'] = port.id
|
||||
obj.port_id = port.id
|
||||
|
||||
|
||||
class PortBindingIfaceObjTestCase(obj_test_base.BaseObjectIfaceTestCase):
|
||||
_test_class = ports.PortBinding
|
||||
|
||||
|
||||
class PortBindingDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
|
||||
BasePortBindingDbObjectTestCase):
|
||||
_test_class = ports.PortBinding
|
||||
|
||||
|
||||
class DistributedPortBindingIfaceObjTestCase(
|
||||
obj_test_base.BaseObjectIfaceTestCase):
|
||||
_test_class = ports.DistributedPortBinding
|
||||
|
||||
|
||||
class DistributedPortBindingDbObjectTestCase(
|
||||
obj_test_base.BaseDbObjectTestCase,
|
||||
BasePortBindingDbObjectTestCase):
|
||||
_test_class = ports.DistributedPortBinding
|
||||
|
||||
|
||||
# TODO(ihrachys): this test case copies some functions from the base module.
|
||||
# This is because we currently cannot inherit from the base class that contains
|
||||
# those functions, because that same class provides test cases that we don't
|
||||
# want to execute. Ideally, we would need to copy paste, but that would require
|
||||
# some significant refactoring in the base test classes. Leaving it for a
|
||||
# follow up.
|
||||
class PortBindingVifDetailsTestCase(testscenarios.WithScenarios,
|
||||
obj_test_base._BaseObjectTestCase,
|
||||
testlib_api.SqlTestCase):
|
||||
|
||||
scenarios = [
|
||||
(cls.__name__, {'_test_class': cls})
|
||||
for cls in (ports.PortBinding, ports.DistributedPortBinding)
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
super(PortBindingVifDetailsTestCase, self).setUp()
|
||||
self._create_test_network()
|
||||
for db_obj, fields, obj in zip(
|
||||
self.db_objs, self.obj_fields, self.objs):
|
||||
port = self._create_port(network_id=self._network['id'])
|
||||
db_obj['port_id'] = port.id
|
||||
fields['port_id'] = port.id
|
||||
obj.port_id = port.id
|
||||
|
||||
def _create_port(self, **port_attrs):
|
||||
attrs = {'project_id': uuidutils.generate_uuid(),
|
||||
'admin_state_up': True,
|
||||
'status': 'ACTIVE',
|
||||
'device_id': 'fake_device',
|
||||
'device_owner': 'fake_owner',
|
||||
'mac_address': tools.get_random_EUI()}
|
||||
attrs.update(port_attrs)
|
||||
port = ports.Port(self.context, **attrs)
|
||||
port.create()
|
||||
return port
|
||||
|
||||
def _create_test_network(self):
|
||||
# TODO(ihrachys): replace with network.create() once we get an object
|
||||
# implementation for networks
|
||||
self._network = obj_db_api.create_object(self.context,
|
||||
models_v2.Network,
|
||||
{'name': 'test-network1'})
|
||||
|
||||
def _make_object(self, fields):
|
||||
fields = obj_test_base.get_non_synthetic_fields(
|
||||
self._test_class, fields
|
||||
)
|
||||
return self._test_class(
|
||||
self.context,
|
||||
**obj_test_base.remove_timestamps_from_fields(fields))
|
||||
|
||||
def test_vif_details(self):
|
||||
vif_details = {'item1': 'val1', 'item2': 'val2'}
|
||||
obj = self._make_object(self.obj_fields[0])
|
||||
obj.vif_details = vif_details
|
||||
obj.create()
|
||||
|
||||
obj = self._test_class.get_object(
|
||||
self.context, **obj._get_composite_keys())
|
||||
self.assertEqual(vif_details, obj.vif_details)
|
||||
|
||||
vif_details['item1'] = 'val2'
|
||||
del vif_details['item2']
|
||||
vif_details['item3'] = 'val3'
|
||||
|
||||
obj.vif_details = vif_details
|
||||
obj.update()
|
||||
|
||||
obj = self._test_class.get_object(
|
||||
self.context, **obj._get_composite_keys())
|
||||
self.assertEqual(vif_details, obj.vif_details)
|
||||
|
||||
obj.vif_details = None
|
||||
obj.update()
|
||||
|
||||
obj = self._test_class.get_object(
|
||||
self.context, **obj._get_composite_keys())
|
||||
self.assertIsNone(obj.vif_details)
|
||||
|
||||
|
||||
class IPAllocationIfaceObjTestCase(obj_test_base.BaseObjectIfaceTestCase):
|
||||
|
||||
_test_class = ports.IPAllocation
|
||||
|
||||
|
||||
class IPAllocationDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
|
||||
testlib_api.SqlTestCase):
|
||||
|
||||
_test_class = ports.IPAllocation
|
||||
|
||||
def setUp(self):
|
||||
super(IPAllocationDbObjectTestCase, self).setUp()
|
||||
self._create_test_network()
|
||||
self._create_test_subnet(self._network)
|
||||
self._create_test_port(self._network)
|
||||
for obj in itertools.chain(self.db_objs, self.obj_fields, self.objs):
|
||||
obj['port_id'] = self._port.id
|
||||
obj['network_id'] = self._network.id
|
||||
obj['subnet_id'] = self._subnet.id
|
||||
|
||||
|
||||
class PortDNSIfaceObjTestCase(obj_test_base.BaseObjectIfaceTestCase):
|
||||
|
||||
_test_class = ports.PortDNS
|
||||
|
||||
|
||||
class PortDNSDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
|
||||
testlib_api.SqlTestCase):
|
||||
|
||||
_test_class = ports.PortDNS
|
||||
|
||||
def setUp(self):
|
||||
super(PortDNSDbObjectTestCase, self).setUp()
|
||||
self._create_test_network()
|
||||
for db_obj, fields, obj in zip(
|
||||
self.db_objs, self.obj_fields, self.objs):
|
||||
port_ = self._create_port(network_id=self._network['id'])
|
||||
db_obj['port_id'] = port_.id
|
||||
fields['port_id'] = port_.id
|
||||
obj.port_id = port_.id
|
||||
|
||||
|
||||
class PortBindingLevelIfaceObjTestCase(
|
||||
obj_test_base.BaseObjectIfaceTestCase):
|
||||
|
||||
_test_class = ports.PortBindingLevel
|
||||
|
||||
def setUp(self):
|
||||
super(PortBindingLevelIfaceObjTestCase, self).setUp()
|
||||
# for this object, the model contains segment_id but we expose it
|
||||
# through an ObjectField that is loaded without a relationship
|
||||
for obj in self.db_objs:
|
||||
obj['segment_id'] = None
|
||||
self.pager_map[self._test_class.obj_name()] = (
|
||||
obj_base.Pager(sorts=[('port_id', True), ('level', True)]))
|
||||
|
||||
|
||||
class PortBindingLevelDbObjectTestCase(
|
||||
obj_test_base.BaseDbObjectTestCase):
|
||||
|
||||
_test_class = ports.PortBindingLevel
|
||||
|
||||
|
||||
class PortIfaceObjTestCase(obj_test_base.BaseObjectIfaceTestCase):
|
||||
|
||||
_test_class = ports.Port
|
||||
|
||||
def setUp(self):
|
||||
super(PortIfaceObjTestCase, self).setUp()
|
||||
self.pager_map[ports.PortBindingLevel.obj_name()] = (
|
||||
obj_base.Pager(sorts=[('port_id', True), ('level', True)]))
|
||||
|
||||
|
||||
class PortDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
|
||||
testlib_api.SqlTestCase):
|
||||
|
||||
_test_class = ports.Port
|
||||
|
||||
def _create_test_security_group(self):
|
||||
return obj_db_api.create_object(
|
||||
self.context, sg_models.SecurityGroup, {})
|
||||
|
||||
def setUp(self):
|
||||
super(PortDbObjectTestCase, self).setUp()
|
||||
self._create_test_network()
|
||||
self._create_test_subnet(self._network)
|
||||
for obj in itertools.chain(self.db_objs, self.obj_fields, self.objs):
|
||||
obj['network_id'] = self._network['id']
|
||||
for obj in self.db_objs:
|
||||
for ipalloc in obj['fixed_ips']:
|
||||
ipalloc['subnet_id'] = self._subnet.id
|
||||
ipalloc['network_id'] = self._network['id']
|
||||
|
||||
def test_security_group_ids(self):
|
||||
sg1 = self._create_test_security_group()
|
||||
sg2 = self._create_test_security_group()
|
||||
groups = {sg1.id, sg2.id}
|
||||
obj = self._make_object(self.obj_fields[0])
|
||||
obj.security_group_ids = groups
|
||||
obj.create()
|
||||
|
||||
obj = ports.Port.get_object(self.context, id=obj.id)
|
||||
self.assertEqual(groups, obj.security_group_ids)
|
||||
|
||||
sg3 = self._create_test_security_group()
|
||||
obj.security_group_ids = {sg3.id}
|
||||
obj.update()
|
||||
|
||||
obj = ports.Port.get_object(self.context, id=obj.id)
|
||||
self.assertEqual({sg3.id}, obj.security_group_ids)
|
||||
|
||||
obj.security_group_ids = set()
|
||||
obj.update()
|
||||
|
||||
obj = ports.Port.get_object(self.context, id=obj.id)
|
||||
self.assertFalse(obj.security_group_ids)
|
||||
|
||||
def test__attach_security_group(self):
|
||||
obj = self._make_object(self.obj_fields[0])
|
||||
obj.create()
|
||||
|
||||
sg = self._create_test_security_group()
|
||||
obj._attach_security_group(sg.id)
|
||||
|
||||
obj = ports.Port.get_object(self.context, id=obj.id)
|
||||
self.assertIn(sg.id, obj.security_group_ids)
|
||||
|
||||
sg2 = self._create_test_security_group()
|
||||
obj._attach_security_group(sg2.id)
|
||||
|
||||
obj = ports.Port.get_object(self.context, id=obj.id)
|
||||
self.assertIn(sg2.id, obj.security_group_ids)
|
||||
|
||||
def test_qos_policy_id(self):
|
||||
policy_obj = policy.QosPolicy(self.context)
|
||||
policy_obj.create()
|
||||
|
||||
obj = self._make_object(self.obj_fields[0])
|
||||
obj.qos_policy_id = policy_obj.id
|
||||
obj.create()
|
||||
|
||||
obj = ports.Port.get_object(self.context, id=obj.id)
|
||||
self.assertEqual(policy_obj.id, obj.qos_policy_id)
|
||||
|
||||
policy_obj2 = policy.QosPolicy(self.context)
|
||||
policy_obj2.create()
|
||||
|
||||
obj.qos_policy_id = policy_obj2.id
|
||||
obj.update()
|
||||
|
||||
obj = ports.Port.get_object(self.context, id=obj.id)
|
||||
self.assertEqual(policy_obj2.id, obj.qos_policy_id)
|
||||
|
||||
obj.qos_policy_id = None
|
||||
obj.update()
|
||||
|
||||
obj = ports.Port.get_object(self.context, id=obj.id)
|
||||
self.assertIsNone(obj.qos_policy_id)
|
||||
|
||||
def test__attach_qos_policy(self):
|
||||
obj = self._make_object(self.obj_fields[0])
|
||||
obj.create()
|
||||
|
||||
policy_obj = policy.QosPolicy(self.context)
|
||||
policy_obj.create()
|
||||
obj._attach_qos_policy(policy_obj.id)
|
||||
|
||||
obj = ports.Port.get_object(self.context, id=obj.id)
|
||||
self.assertEqual(policy_obj.id, obj.qos_policy_id)
|
||||
|
||||
policy_obj2 = policy.QosPolicy(self.context)
|
||||
policy_obj2.create()
|
||||
obj._attach_qos_policy(policy_obj2.id)
|
||||
|
||||
obj = ports.Port.get_object(self.context, id=obj.id)
|
||||
self.assertEqual(policy_obj2.id, obj.qos_policy_id)
|
||||
|
||||
def test_get_objects_queries_constant(self):
|
||||
self.skipTest(
|
||||
'Port object loads segment info without relationships')
|
Loading…
Reference in New Issue
Block a user