Introduce ovo objects for networks

Those objects are intentionally not integrated into the database code so
far. This is to quicken access to their definitions to implement
push-notifications for networks.

Segments are part of the network object. Since we need to reduce the
number of SQL queries executed per resource, and we want to include
segmentation information for networks, Network model was extended with
segments relationship that makes the information available on every
network resource fetch from the database. This change required some
change in test_dhcp_agent_scheduler to expire a session used in the
tests to avoid obsolete segment state to be reused to validate
scheduling.

This implementation of the object is not complete for the job of
updating the resource in database. For example, tags are not yet exposed
on the object; also attributes like availability_zones, or external
network attributes, are not covered. Those attributes are hopefully
needed on server side only, so until we adopt the object for server
side, it should be ok to live without them.

Another database related thing still missing in this patch is lack of
RBAC support for the object. To complete this support,
a real get_bound_tenant_ids should be put onto the object, instead of
the current no-op stub.

This patch also includes some rearrangements that simplified the work.
Specifically, all network related objects are consolidated in the
neutron.objects.network module, instead of being scattered through the
code base. Also, some setup code from test_policy relevant to RBAC was
moved into the base test class so that it can be utilized by other RBAC
enabled objects, like network.

Partially-Implements: blueprint adopt-oslo-versioned-objects-for-db
Partially-Implements: blueprint push-notifications

Co-Authored-By: Victor Morales <victor.morales@intel.com>
Change-Id: I5160d0ab9e8042c356229420739db0ce42842368
This commit is contained in:
Artur Korzeniewski 2016-09-19 18:49:55 +02:00 committed by Ihar Hrachyshka
parent dcd78423aa
commit 94ee8bc306
19 changed files with 449 additions and 187 deletions

View File

@ -13,7 +13,7 @@
# under the License.
from neutron.extensions import portsecurity as psec
from neutron.objects.network.extensions import port_security as n_ps
from neutron.objects import network
from neutron.objects.port.extensions import port_security as p_ps
@ -47,7 +47,7 @@ class PortSecurityDbCommon(object):
def _process_network_port_security_create(
self, context, network_req, network_res):
self._process_port_security_create(
context, n_ps.NetworkPortSecurity, 'network',
context, network.NetworkPortSecurity, 'network',
network_req, network_res)
def _get_security_binding(self, context, obj_cls, res_id):
@ -58,7 +58,7 @@ class PortSecurityDbCommon(object):
def _get_network_security_binding(self, context, network_id):
return self._get_security_binding(
context, n_ps.NetworkPortSecurity, network_id)
context, network.NetworkPortSecurity, network_id)
def _get_port_security_binding(self, context, port_id):
return self._get_security_binding(context, p_ps.PortSecurity, port_id)
@ -71,7 +71,7 @@ class PortSecurityDbCommon(object):
def _process_network_port_security_update(
self, context, network_req, network_res):
self._process_port_security_update(
context, n_ps.NetworkPortSecurity, 'network',
context, network.NetworkPortSecurity, 'network',
network_req, network_res)
def _process_port_security_update(

View File

@ -14,6 +14,7 @@ from neutron_lib.db import model_base
from oslo_log import log as logging
from oslo_utils import uuidutils
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
from neutron._i18n import _LI
@ -21,6 +22,7 @@ from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.db import models_v2
from neutron.db import standard_attr
from neutron.extensions import segment
@ -54,6 +56,10 @@ class NetworkSegment(standard_attr.HasStandardAttributes,
segment_index = sa.Column(sa.Integer, nullable=False, server_default='0')
name = sa.Column(sa.String(attributes.NAME_MAX_LEN),
nullable=True)
network = orm.relationship(models_v2.Network,
backref=orm.backref("segments",
lazy='joined',
cascade='delete'))
api_collections = [segment.SEGMENTS]

221
neutron/objects/network.py Normal file
View File

@ -0,0 +1,221 @@
# Copyright (c) 2016 OpenStack Foundation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from neutron.db import api as db_api
from neutron.db import dns_db
from neutron.db import models_v2
from neutron.db.port_security import models as ps_models
from neutron.db.qos import models as qos_models
from neutron.db import rbac_db_models
from neutron.db import segments_db
from neutron.extensions import availability_zone as az_ext
from neutron.objects import base
from neutron.objects import common_types
from neutron.objects.db import api as obj_db_api
from neutron.objects.extensions import port_security as base_ps
from neutron.objects import rbac_db
@obj_base.VersionedObjectRegistry.register
class NetworkSegment(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
db_model = segments_db.NetworkSegment
fields = {
'id': obj_fields.UUIDField(),
'network_id': obj_fields.UUIDField(),
'name': obj_fields.StringField(),
'network_type': obj_fields.StringField(),
'physical_network': obj_fields.StringField(nullable=True),
'segmentation_id': obj_fields.IntegerField(nullable=True),
'is_dynamic': obj_fields.BooleanField(default=False),
'segment_index': obj_fields.IntegerField(default=0)
}
foreign_keys = {
'Network': {'network_id': 'id'},
'PortBindingLevel': {'id': 'segment_id'},
}
@classmethod
def get_objects(cls, context, _pager=None, **kwargs):
if not _pager:
_pager = base.Pager()
if not _pager.sorts:
# (NOTE) True means ASC, False is DESC
_pager.sorts = [
(field, True) for field in ('network_id', 'segment_index')
]
return super(NetworkSegment, cls).get_objects(context, _pager,
**kwargs)
@obj_base.VersionedObjectRegistry.register
class NetworkPortSecurity(base_ps._PortSecurity):
# Version 1.0: Initial version
VERSION = "1.0"
db_model = ps_models.NetworkSecurityBinding
fields_need_translation = {'id': 'network_id'}
@obj_base.VersionedObjectRegistry.register
class Network(rbac_db.NeutronRbacObject):
# Version 1.0: Initial version
VERSION = '1.0'
rbac_db_model = rbac_db_models.NetworkRBAC
db_model = models_v2.Network
fields = {
'id': obj_fields.UUIDField(),
'project_id': obj_fields.StringField(nullable=True),
'name': obj_fields.StringField(nullable=True),
'status': obj_fields.StringField(nullable=True),
'admin_state_up': obj_fields.BooleanField(nullable=True),
'vlan_transparent': obj_fields.BooleanField(nullable=True),
# TODO(ihrachys): consider converting to a field of stricter type
'availability_zone_hints': obj_fields.ListOfStringsField(
nullable=True),
'shared': obj_fields.BooleanField(default=False),
'mtu': obj_fields.IntegerField(nullable=True),
# TODO(ihrachys): consider exposing availability zones
# TODO(ihrachys): consider converting to boolean
'security': obj_fields.ObjectField(
'NetworkPortSecurity', nullable=True),
'segments': obj_fields.ListOfObjectsField(
'NetworkSegment', nullable=True),
'dns_domain': common_types.DomainNameField(nullable=True),
'qos_policy_id': obj_fields.UUIDField(nullable=True, default=None),
# TODO(ihrachys): add support for tags, probably through a base class
# since it's a feature that will probably later be added for other
# resources too
# TODO(ihrachys): expose external network attributes
}
synthetic_fields = [
'dns_domain',
# MTU is not stored in the database any more, it's a synthetic field
# that may be used by plugins to provide a canonical representation for
# the resource
'mtu',
'qos_policy_id',
'security',
'segments',
]
fields_need_translation = {
'security': 'port_security',
}
def create(self):
fields = self.obj_get_changes()
with db_api.autonested_transaction(self.obj_context.session):
dns_domain = self.dns_domain
qos_policy_id = self.qos_policy_id
super(Network, self).create()
if 'dns_domain' in fields:
self._set_dns_domain(dns_domain)
if 'qos_policy_id' in fields:
self._attach_qos_policy(qos_policy_id)
def update(self):
fields = self.obj_get_changes()
with db_api.autonested_transaction(self.obj_context.session):
super(Network, self).update()
if 'dns_domain' in fields:
self._set_dns_domain(fields['dns_domain'])
if 'qos_policy_id' in fields:
self._attach_qos_policy(fields['qos_policy_id'])
def _attach_qos_policy(self, qos_policy_id):
# TODO(ihrachys): introduce an object for the binding to isolate
# database access in a single place, currently scattered between port
# and policy objects
obj_db_api.delete_objects(
self.obj_context, qos_models.QosNetworkPolicyBinding,
network_id=self.id,
)
if qos_policy_id:
obj_db_api.create_object(
self.obj_context, qos_models.QosNetworkPolicyBinding,
{'network_id': self.id, 'policy_id': qos_policy_id}
)
self.qos_policy_id = qos_policy_id
self.obj_reset_changes(['qos_policy_id'])
def _set_dns_domain(self, dns_domain):
obj_db_api.delete_objects(
self.obj_context, dns_db.NetworkDNSDomain, network_id=self.id,
)
if dns_domain:
obj_db_api.create_object(
self.obj_context, dns_db.NetworkDNSDomain,
{'network_id': self.id, 'dns_domain': dns_domain}
)
self.dns_domain = dns_domain
self.obj_reset_changes(['dns_domain'])
@classmethod
def modify_fields_from_db(cls, db_obj):
result = super(Network, cls).modify_fields_from_db(db_obj)
if az_ext.AZ_HINTS in result:
result[az_ext.AZ_HINTS] = (
az_ext.convert_az_string_to_list(result[az_ext.AZ_HINTS]))
return result
@classmethod
def modify_fields_to_db(cls, fields):
result = super(Network, cls).modify_fields_to_db(fields)
if az_ext.AZ_HINTS in result:
result[az_ext.AZ_HINTS] = (
az_ext.convert_az_list_to_string(result[az_ext.AZ_HINTS]))
return result
def from_db_object(self, *objs):
super(Network, self).from_db_object(*objs)
for db_obj in objs:
# extract domain name
if db_obj.get('dns_domain'):
self.dns_domain = (
db_obj.dns_domain.dns_domain
)
else:
self.dns_domain = None
self.obj_reset_changes(['dns_domain'])
# extract qos policy binding
if db_obj.get('qos_policy_binding'):
self.qos_policy_id = (
db_obj.qos_policy_binding.policy_id
)
else:
self.qos_policy_id = None
self.obj_reset_changes(['qos_policy_id'])
@classmethod
def get_bound_tenant_ids(cls, context, policy_id):
# TODO(ihrachys): provide actual implementation
return set()

View File

@ -1,26 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_versionedobjects import base as obj_base
from neutron.db.port_security import models
from neutron.objects.extensions import port_security as base_ps
@obj_base.VersionedObjectRegistry.register
class NetworkPortSecurity(base_ps._PortSecurity):
# Version 1.0: Initial version
VERSION = "1.0"
fields_need_translation = {'id': 'network_id'}
db_model = models.NetworkSecurityBinding

View File

@ -1,41 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from neutron.db import segments_db as segment_model
from neutron.objects import base
@obj_base.VersionedObjectRegistry.register
class NetworkSegment(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
db_model = segment_model.NetworkSegment
fields = {
'id': obj_fields.UUIDField(),
'network_id': obj_fields.UUIDField(),
'name': obj_fields.StringField(),
'network_type': obj_fields.StringField(),
'physical_network': obj_fields.StringField(nullable=True),
'segmentation_id': obj_fields.IntegerField(nullable=True),
'is_dynamic': obj_fields.BooleanField(default=False),
'segment_index': obj_fields.IntegerField(default=0)
}
foreign_keys = {
'Network': {'network_id': 'id'},
'PortBindingLevel': {'id': 'segment_id'},
}

View File

@ -204,6 +204,10 @@ def get_random_string(n=10):
return ''.join(random.choice(string.ascii_lowercase) for _ in range(n))
def get_random_string_list(i=3, n=5):
return [get_random_string(n) for _ in range(0, i)]
def get_random_boolean():
return bool(random.getrandbits(1))

View File

@ -16,7 +16,7 @@ from neutron.db import common_db_mixin
from neutron.db import portsecurity_db_common as pdc
from neutron.extensions import portsecurity as psec
from neutron.objects import base as objects_base
from neutron.objects.network.extensions import port_security as n_ps
from neutron.objects import network
from neutron.objects.port.extensions import port_security as p_ps
from neutron.tests import base
@ -69,7 +69,7 @@ class PortSecurityDbCommonTestCase(base.BaseTestCase):
def test__process_network_port_security_update_no_binding(self):
self._test__process_security_update_no_binding(
'network', n_ps.NetworkPortSecurity,
'network', network.NetworkPortSecurity,
self.plugin._process_network_port_security_update)
def test__extend_port_security_dict_no_port_security(self):

View File

@ -1,38 +0,0 @@
# Copyright 2013 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.objects.network.extensions import port_security
from neutron.tests.unit.objects import test_base as obj_test_base
from neutron.tests.unit import testlib_api
class NetworkPortSecurityIfaceObjTestCase(
obj_test_base.BaseObjectIfaceTestCase):
_test_class = port_security.NetworkPortSecurity
class NetworkPortSecurityDbObjTestCase(obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = port_security.NetworkPortSecurity
def setUp(self):
super(NetworkPortSecurityDbObjTestCase, self).setUp()
for db_obj, obj_field, obj in zip(
self.db_objs, self.obj_fields, self.objs):
network = self._create_network()
db_obj['network_id'] = network['id']
obj_field['id'] = network['id']
obj['id'] = network['id']

View File

@ -1,36 +0,0 @@
# Copyright (c) 2016 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from neutron.objects.network import network_segment
from neutron.tests.unit.objects import test_base as obj_test_base
from neutron.tests.unit import testlib_api
class NetworkSegmentIfaceObjectTestCase(obj_test_base.BaseObjectIfaceTestCase):
_test_class = network_segment.NetworkSegment
class NetworkSegmentDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = network_segment.NetworkSegment
def setUp(self):
super(NetworkSegmentDbObjectTestCase, self).setUp()
self._create_test_network()
for obj in itertools.chain(self.db_objs, self.obj_fields, self.objs):
obj['network_id'] = self._network['id']

View File

@ -50,37 +50,24 @@ class QosPolicyObjectTestCase(test_base.BaseObjectIfaceTestCase):
self.get_random_fields(rule.QosMinimumBandwidthRule)
for _ in range(3)]
self.model_map = {
self.model_map.update({
self._test_class.db_model: self.db_objs,
self._test_class.rbac_db_model: [],
self._test_class.port_binding_model: [],
self._test_class.network_binding_model: [],
rule.QosBandwidthLimitRule.db_model: self.db_qos_bandwidth_rules,
rule.QosDscpMarkingRule.db_model: self.db_qos_dscp_rules,
rule.QosMinimumBandwidthRule.db_model:
self.db_qos_minimum_bandwidth_rules}
self.get_object = mock.patch.object(
db_api, 'get_object', side_effect=self.fake_get_object).start()
self.get_objects = mock.patch.object(
db_api, 'get_objects', side_effect=self.fake_get_objects).start()
def fake_get_objects(self, context, model, **kwargs):
return self.model_map[model]
def fake_get_object(self, context, model, **kwargs):
objects = self.model_map[model]
if not objects:
return None
return [obj for obj in objects if obj['id'] == kwargs['id']][0]
self.db_qos_minimum_bandwidth_rules})
# TODO(ihrachys): stop overriding those test cases, instead base test cases
# should be expanded if there are missing bits there to support QoS objects
def test_get_objects(self):
admin_context = self.context.elevated()
with mock.patch.object(self.context, 'elevated',
return_value=admin_context) as context_mock:
objs = self._test_class.get_objects(self.context)
context_mock.assert_called_once_with()
self.get_objects.assert_any_call(
self.get_objects_mock.assert_any_call(
admin_context, self._test_class.db_model, _pager=None)
self.assertItemsEqual(
[test_base.get_obj_db_fields(obj) for obj in self.objs],
@ -137,14 +124,6 @@ class QosPolicyDbObjectTestCase(test_base.BaseDbObjectTestCase,
def setUp(self):
super(QosPolicyDbObjectTestCase, self).setUp()
self.db_qos_bandwidth_rules = [
self.get_random_fields(rule.QosBandwidthLimitRule)
for _ in range(3)]
self.model_map.update({
rule.QosBandwidthLimitRule.db_model: self.db_qos_bandwidth_rules
})
self._create_test_network()
self._create_test_port(self._network)

View File

@ -39,6 +39,7 @@ from neutron.objects import base
from neutron.objects import common_types
from neutron.objects.db import api as obj_db_api
from neutron.objects import ports
from neutron.objects import rbac_db
from neutron.objects import subnet
from neutron.tests import base as test_base
from neutron.tests import tools
@ -397,6 +398,7 @@ FIELD_TYPE_VALUE_GENERATOR_MAP = {
obj_fields.ObjectField: lambda: None,
obj_fields.ListOfObjectsField: lambda: [],
obj_fields.DictOfStringsField: get_random_dict_of_strings,
obj_fields.ListOfStringsField: tools.get_random_string_list,
common_types.DomainNameField: get_random_domain_name,
common_types.DscpMarkField: get_random_dscp_mark,
obj_fields.IPNetworkField: tools.get_random_ip_network,
@ -541,6 +543,31 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
mock.patch.object(self.context.session, 'refresh').start()
mock.patch.object(self.context.session, 'expunge').start()
self.get_objects_mock = mock.patch.object(
obj_db_api, 'get_objects',
side_effect=self.fake_get_objects).start()
self.get_object_mock = mock.patch.object(
obj_db_api, 'get_object',
side_effect=self.fake_get_object).start()
# NOTE(ihrachys): for matters of basic object behaviour validation,
# mock out rbac code accessing database. There are separate tests that
# cover RBAC, per object type.
if getattr(self._test_class, 'rbac_db_model', None):
mock.patch.object(
rbac_db.RbacNeutronDbObjectMixin,
'is_shared_with_tenant', return_value=False).start()
def fake_get_object(self, context, model, **kwargs):
objects = self.model_map[model]
if not objects:
return None
return [obj for obj in objects if obj['id'] == kwargs['id']][0]
def fake_get_objects(self, context, model, **kwargs):
return self.model_map[model]
# TODO(ihrachys) document the intent of all common test cases in docstrings
def test_get_object(self):
with mock.patch.object(
@ -1247,7 +1274,10 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
obj.create()
for field in remove_timestamps_from_fields(get_obj_db_fields(obj)):
filters = {field: [self.objs[0][field]]}
if not isinstance(self.objs[0][field], list):
filters = {field: [self.objs[0][field]]}
else:
filters = {field: self.objs[0][field]}
new = self._test_class.get_objects(self.context, **filters)
self.assertItemsEqual(
[obj._get_composite_keys()],

View File

@ -0,0 +1,156 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from neutron.objects import base as obj_base
from neutron.objects import network
from neutron.objects.qos import policy
from neutron.tests.unit.objects import test_base as obj_test_base
from neutron.tests.unit import testlib_api
class NetworkPortSecurityIfaceObjTestCase(
obj_test_base.BaseObjectIfaceTestCase):
_test_class = network.NetworkPortSecurity
class NetworkPortSecurityDbObjTestCase(obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = network.NetworkPortSecurity
def setUp(self):
super(NetworkPortSecurityDbObjTestCase, self).setUp()
for db_obj, obj_field, obj in zip(
self.db_objs, self.obj_fields, self.objs):
network = self._create_network()
db_obj['network_id'] = network.id
obj_field['id'] = network.id
obj['id'] = network['id']
class NetworkSegmentIfaceObjTestCase(obj_test_base.BaseObjectIfaceTestCase):
_test_class = network.NetworkSegment
def setUp(self):
super(NetworkSegmentIfaceObjTestCase, self).setUp()
# TODO(ihrachys): we should not need to duplicate that in every single
# place, instead we should move the default pager into the base class
# attribute and pull it from there for testing matters. Leaving it for
# a follow up.
self.pager_map[self._test_class.obj_name()] = (
obj_base.Pager(
sorts=[('network_id', True), ('segment_index', True)]))
class NetworkSegmentDbObjTestCase(obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = network.NetworkSegment
def setUp(self):
super(NetworkSegmentDbObjTestCase, self).setUp()
network = self._create_network()
for obj in itertools.chain(self.db_objs, self.obj_fields, self.objs):
obj['network_id'] = network.id
class NetworkObjectIfaceTestCase(obj_test_base.BaseObjectIfaceTestCase):
_test_class = network.Network
def setUp(self):
super(NetworkObjectIfaceTestCase, self).setUp()
self.pager_map[network.NetworkSegment.obj_name()] = (
obj_base.Pager(
sorts=[('network_id', True), ('segment_index', True)]))
class NetworkDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = network.Network
def test_qos_policy_id(self):
policy_obj = policy.QosPolicy(self.context)
policy_obj.create()
obj = self._make_object(self.obj_fields[0])
obj.qos_policy_id = policy_obj.id
obj.create()
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual(policy_obj.id, obj.qos_policy_id)
policy_obj2 = policy.QosPolicy(self.context)
policy_obj2.create()
obj.qos_policy_id = policy_obj2.id
obj.update()
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual(policy_obj2.id, obj.qos_policy_id)
obj.qos_policy_id = None
obj.update()
obj = network.Network.get_object(self.context, id=obj.id)
self.assertIsNone(obj.qos_policy_id)
def test__attach_qos_policy(self):
obj = self._make_object(self.obj_fields[0])
obj.create()
policy_obj = policy.QosPolicy(self.context)
policy_obj.create()
obj._attach_qos_policy(policy_obj.id)
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual(policy_obj.id, obj.qos_policy_id)
policy_obj2 = policy.QosPolicy(self.context)
policy_obj2.create()
obj._attach_qos_policy(policy_obj2.id)
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual(policy_obj2.id, obj.qos_policy_id)
def test_dns_domain(self):
obj = self._make_object(self.obj_fields[0])
obj.dns_domain = 'foo.com'
obj.create()
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual('foo.com', obj.dns_domain)
obj.dns_domain = 'bar.com'
obj.update()
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual('bar.com', obj.dns_domain)
obj.dns_domain = None
obj.update()
obj = network.Network.get_object(self.context, id=obj.id)
self.assertIsNone(obj.dns_domain)
def test__set_dns_domain(self):
obj = self._make_object(self.obj_fields[0])
obj.create()
obj._set_dns_domain('foo.com')
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual('foo.com', obj.dns_domain)
obj._set_dns_domain('bar.com')
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual('bar.com', obj.dns_domain)

View File

@ -24,7 +24,8 @@ from neutron.tests import base as test_base
# NOTE: The hashes in this list should only be changed if they come with a
# corresponding version bump in the affected objects.
# corresponding version bump in the affected objects. Please keep the list in
# alphabetic order.
object_data = {
'_DefaultSecurityGroup': '1.0-971520cb2e0ec06d747885a0cf78347f',
'AddressScope': '1.0-25560799db384acfe1549634959a82b4',
@ -34,6 +35,7 @@ object_data = {
'ExtraDhcpOpt': '1.0-632f689cbeb36328995a7aed1d0a78d3',
'IPAllocation': '1.0-47251b4c6d45c3b5feb0297fe5c461f2',
'IPAllocationPool': '1.0-371016a6480ed0b4299319cb46d9215d',
'Network': '1.0-f2f6308f79731a767b92b26b0f4f3849',
'NetworkPortSecurity': '1.0-b30802391a87945ee9c07582b4ff95e3',
'NetworkSegment': '1.0-40707ef6bd9a0bf095038158d995cc7d',
'Port': '1.0-638f6b09a3809ebd8b2b46293f56871b',

View File

@ -19,6 +19,7 @@ from neutron.db.models import securitygroup as sg_models
from neutron.db import models_v2
from neutron.objects import base as obj_base
from neutron.objects.db import api as obj_db_api
from neutron.objects import network
from neutron.objects import ports
from neutron.objects.qos import policy
from neutron.tests import tools
@ -195,6 +196,9 @@ class PortBindingLevelIfaceObjTestCase(
obj['segment_id'] = None
self.pager_map[self._test_class.obj_name()] = (
obj_base.Pager(sorts=[('port_id', True), ('level', True)]))
self.pager_map[network.NetworkSegment.obj_name()] = (
obj_base.Pager(
sorts=[('network_id', True), ('segment_index', True)]))
class PortBindingLevelDbObjectTestCase(

View File

@ -588,6 +588,15 @@ class DHCPAgentWeightSchedulerTestCase(test_plugin.Ml2PluginV2TestCase):
self.assertEqual('host-c', agent2[0]['host'])
self.assertEqual('host-d', agent3[0]['host'])
def _get_network_with_candidate_hosts(self, net_id, seg_id):
# expire the session so that the segment is fully reloaded on fetch,
# including its new host mapping
self.ctx.session.expire_all()
net = self.plugin.get_network(self.ctx, net_id)
seg = self.segments_plugin.get_segment(self.ctx, seg_id)
net['candidate_hosts'] = seg['hosts']
return net
def test_schedule_segment_one_hostable_agent(self):
net_id = self._create_network()
seg_id = self._create_segment(net_id)
@ -595,9 +604,7 @@ class DHCPAgentWeightSchedulerTestCase(test_plugin.Ml2PluginV2TestCase):
helpers.register_dhcp_agent(HOST_D)
segments_service_db.update_segment_host_mapping(
self.ctx, HOST_C, {seg_id})
net = self.plugin.get_network(self.ctx, net_id)
seg = self.segments_plugin.get_segment(self.ctx, seg_id)
net['candidate_hosts'] = seg['hosts']
net = self._get_network_with_candidate_hosts(net_id, seg_id)
agents = self.plugin.network_scheduler.schedule(
self.plugin, self.ctx, net)
self.assertEqual(1, len(agents))
@ -612,9 +619,7 @@ class DHCPAgentWeightSchedulerTestCase(test_plugin.Ml2PluginV2TestCase):
self.ctx, HOST_C, {seg_id})
segments_service_db.update_segment_host_mapping(
self.ctx, HOST_D, {seg_id})
net = self.plugin.get_network(self.ctx, net_id)
seg = self.segments_plugin.get_segment(self.ctx, seg_id)
net['candidate_hosts'] = seg['hosts']
net = self._get_network_with_candidate_hosts(net_id, seg_id)
agents = self.plugin.network_scheduler.schedule(
self.plugin, self.ctx, net)
self.assertEqual(1, len(agents))
@ -642,9 +647,7 @@ class DHCPAgentWeightSchedulerTestCase(test_plugin.Ml2PluginV2TestCase):
self.ctx, HOST_C, {seg_id})
segments_service_db.update_segment_host_mapping(
self.ctx, HOST_D, {seg_id})
net = self.plugin.get_network(self.ctx, net_id)
seg = self.segments_plugin.get_segment(self.ctx, seg_id)
net['candidate_hosts'] = seg['hosts']
net = self._get_network_with_candidate_hosts(net_id, seg_id)
agents = self.plugin.network_scheduler.schedule(
self.plugin, self.ctx, net)
self.assertEqual(2, len(agents))
@ -659,9 +662,7 @@ class DHCPAgentWeightSchedulerTestCase(test_plugin.Ml2PluginV2TestCase):
helpers.register_dhcp_agent(HOST_D)
segments_service_db.update_segment_host_mapping(
self.ctx, HOST_C, {seg_id})
net = self.plugin.get_network(self.ctx, net_id)
seg = self.segments_plugin.get_segment(self.ctx, seg_id)
net['candidate_hosts'] = seg['hosts']
net = self._get_network_with_candidate_hosts(net_id, seg_id)
agents = self.plugin.network_scheduler.schedule(
self.plugin, self.ctx, net)
self.assertEqual(1, len(agents))