Merge "Introduce ovo objects for networks"

This commit is contained in:
Jenkins 2016-09-30 11:41:06 +00:00 committed by Gerrit Code Review
commit 47601b38ba
19 changed files with 449 additions and 187 deletions

View File

@ -13,7 +13,7 @@
# under the License.
from neutron.extensions import portsecurity as psec
from neutron.objects.network.extensions import port_security as n_ps
from neutron.objects import network
from neutron.objects.port.extensions import port_security as p_ps
@ -47,7 +47,7 @@ class PortSecurityDbCommon(object):
def _process_network_port_security_create(
self, context, network_req, network_res):
self._process_port_security_create(
context, n_ps.NetworkPortSecurity, 'network',
context, network.NetworkPortSecurity, 'network',
network_req, network_res)
def _get_security_binding(self, context, obj_cls, res_id):
@ -58,7 +58,7 @@ class PortSecurityDbCommon(object):
def _get_network_security_binding(self, context, network_id):
return self._get_security_binding(
context, n_ps.NetworkPortSecurity, network_id)
context, network.NetworkPortSecurity, network_id)
def _get_port_security_binding(self, context, port_id):
return self._get_security_binding(context, p_ps.PortSecurity, port_id)
@ -71,7 +71,7 @@ class PortSecurityDbCommon(object):
def _process_network_port_security_update(
self, context, network_req, network_res):
self._process_port_security_update(
context, n_ps.NetworkPortSecurity, 'network',
context, network.NetworkPortSecurity, 'network',
network_req, network_res)
def _process_port_security_update(

View File

@ -14,6 +14,7 @@ from neutron_lib.db import model_base
from oslo_log import log as logging
from oslo_utils import uuidutils
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
from neutron._i18n import _LI
@ -21,6 +22,7 @@ from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.db import models_v2
from neutron.db import standard_attr
from neutron.extensions import segment
@ -54,6 +56,10 @@ class NetworkSegment(standard_attr.HasStandardAttributes,
segment_index = sa.Column(sa.Integer, nullable=False, server_default='0')
name = sa.Column(sa.String(attributes.NAME_MAX_LEN),
nullable=True)
network = orm.relationship(models_v2.Network,
backref=orm.backref("segments",
lazy='joined',
cascade='delete'))
api_collections = [segment.SEGMENTS]

221
neutron/objects/network.py Normal file
View File

@ -0,0 +1,221 @@
# Copyright (c) 2016 OpenStack Foundation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from neutron.db import api as db_api
from neutron.db import dns_db
from neutron.db import models_v2
from neutron.db.port_security import models as ps_models
from neutron.db.qos import models as qos_models
from neutron.db import rbac_db_models
from neutron.db import segments_db
from neutron.extensions import availability_zone as az_ext
from neutron.objects import base
from neutron.objects import common_types
from neutron.objects.db import api as obj_db_api
from neutron.objects.extensions import port_security as base_ps
from neutron.objects import rbac_db
@obj_base.VersionedObjectRegistry.register
class NetworkSegment(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
db_model = segments_db.NetworkSegment
fields = {
'id': obj_fields.UUIDField(),
'network_id': obj_fields.UUIDField(),
'name': obj_fields.StringField(),
'network_type': obj_fields.StringField(),
'physical_network': obj_fields.StringField(nullable=True),
'segmentation_id': obj_fields.IntegerField(nullable=True),
'is_dynamic': obj_fields.BooleanField(default=False),
'segment_index': obj_fields.IntegerField(default=0)
}
foreign_keys = {
'Network': {'network_id': 'id'},
'PortBindingLevel': {'id': 'segment_id'},
}
@classmethod
def get_objects(cls, context, _pager=None, **kwargs):
if not _pager:
_pager = base.Pager()
if not _pager.sorts:
# (NOTE) True means ASC, False is DESC
_pager.sorts = [
(field, True) for field in ('network_id', 'segment_index')
]
return super(NetworkSegment, cls).get_objects(context, _pager,
**kwargs)
@obj_base.VersionedObjectRegistry.register
class NetworkPortSecurity(base_ps._PortSecurity):
# Version 1.0: Initial version
VERSION = "1.0"
db_model = ps_models.NetworkSecurityBinding
fields_need_translation = {'id': 'network_id'}
@obj_base.VersionedObjectRegistry.register
class Network(rbac_db.NeutronRbacObject):
# Version 1.0: Initial version
VERSION = '1.0'
rbac_db_model = rbac_db_models.NetworkRBAC
db_model = models_v2.Network
fields = {
'id': obj_fields.UUIDField(),
'project_id': obj_fields.StringField(nullable=True),
'name': obj_fields.StringField(nullable=True),
'status': obj_fields.StringField(nullable=True),
'admin_state_up': obj_fields.BooleanField(nullable=True),
'vlan_transparent': obj_fields.BooleanField(nullable=True),
# TODO(ihrachys): consider converting to a field of stricter type
'availability_zone_hints': obj_fields.ListOfStringsField(
nullable=True),
'shared': obj_fields.BooleanField(default=False),
'mtu': obj_fields.IntegerField(nullable=True),
# TODO(ihrachys): consider exposing availability zones
# TODO(ihrachys): consider converting to boolean
'security': obj_fields.ObjectField(
'NetworkPortSecurity', nullable=True),
'segments': obj_fields.ListOfObjectsField(
'NetworkSegment', nullable=True),
'dns_domain': common_types.DomainNameField(nullable=True),
'qos_policy_id': obj_fields.UUIDField(nullable=True, default=None),
# TODO(ihrachys): add support for tags, probably through a base class
# since it's a feature that will probably later be added for other
# resources too
# TODO(ihrachys): expose external network attributes
}
synthetic_fields = [
'dns_domain',
# MTU is not stored in the database any more, it's a synthetic field
# that may be used by plugins to provide a canonical representation for
# the resource
'mtu',
'qos_policy_id',
'security',
'segments',
]
fields_need_translation = {
'security': 'port_security',
}
def create(self):
fields = self.obj_get_changes()
with db_api.autonested_transaction(self.obj_context.session):
dns_domain = self.dns_domain
qos_policy_id = self.qos_policy_id
super(Network, self).create()
if 'dns_domain' in fields:
self._set_dns_domain(dns_domain)
if 'qos_policy_id' in fields:
self._attach_qos_policy(qos_policy_id)
def update(self):
fields = self.obj_get_changes()
with db_api.autonested_transaction(self.obj_context.session):
super(Network, self).update()
if 'dns_domain' in fields:
self._set_dns_domain(fields['dns_domain'])
if 'qos_policy_id' in fields:
self._attach_qos_policy(fields['qos_policy_id'])
def _attach_qos_policy(self, qos_policy_id):
# TODO(ihrachys): introduce an object for the binding to isolate
# database access in a single place, currently scattered between port
# and policy objects
obj_db_api.delete_objects(
self.obj_context, qos_models.QosNetworkPolicyBinding,
network_id=self.id,
)
if qos_policy_id:
obj_db_api.create_object(
self.obj_context, qos_models.QosNetworkPolicyBinding,
{'network_id': self.id, 'policy_id': qos_policy_id}
)
self.qos_policy_id = qos_policy_id
self.obj_reset_changes(['qos_policy_id'])
def _set_dns_domain(self, dns_domain):
obj_db_api.delete_objects(
self.obj_context, dns_db.NetworkDNSDomain, network_id=self.id,
)
if dns_domain:
obj_db_api.create_object(
self.obj_context, dns_db.NetworkDNSDomain,
{'network_id': self.id, 'dns_domain': dns_domain}
)
self.dns_domain = dns_domain
self.obj_reset_changes(['dns_domain'])
@classmethod
def modify_fields_from_db(cls, db_obj):
result = super(Network, cls).modify_fields_from_db(db_obj)
if az_ext.AZ_HINTS in result:
result[az_ext.AZ_HINTS] = (
az_ext.convert_az_string_to_list(result[az_ext.AZ_HINTS]))
return result
@classmethod
def modify_fields_to_db(cls, fields):
result = super(Network, cls).modify_fields_to_db(fields)
if az_ext.AZ_HINTS in result:
result[az_ext.AZ_HINTS] = (
az_ext.convert_az_list_to_string(result[az_ext.AZ_HINTS]))
return result
def from_db_object(self, *objs):
super(Network, self).from_db_object(*objs)
for db_obj in objs:
# extract domain name
if db_obj.get('dns_domain'):
self.dns_domain = (
db_obj.dns_domain.dns_domain
)
else:
self.dns_domain = None
self.obj_reset_changes(['dns_domain'])
# extract qos policy binding
if db_obj.get('qos_policy_binding'):
self.qos_policy_id = (
db_obj.qos_policy_binding.policy_id
)
else:
self.qos_policy_id = None
self.obj_reset_changes(['qos_policy_id'])
@classmethod
def get_bound_tenant_ids(cls, context, policy_id):
# TODO(ihrachys): provide actual implementation
return set()

View File

@ -1,26 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_versionedobjects import base as obj_base
from neutron.db.port_security import models
from neutron.objects.extensions import port_security as base_ps
@obj_base.VersionedObjectRegistry.register
class NetworkPortSecurity(base_ps._PortSecurity):
# Version 1.0: Initial version
VERSION = "1.0"
fields_need_translation = {'id': 'network_id'}
db_model = models.NetworkSecurityBinding

View File

@ -1,41 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from neutron.db import segments_db as segment_model
from neutron.objects import base
@obj_base.VersionedObjectRegistry.register
class NetworkSegment(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
db_model = segment_model.NetworkSegment
fields = {
'id': obj_fields.UUIDField(),
'network_id': obj_fields.UUIDField(),
'name': obj_fields.StringField(),
'network_type': obj_fields.StringField(),
'physical_network': obj_fields.StringField(nullable=True),
'segmentation_id': obj_fields.IntegerField(nullable=True),
'is_dynamic': obj_fields.BooleanField(default=False),
'segment_index': obj_fields.IntegerField(default=0)
}
foreign_keys = {
'Network': {'network_id': 'id'},
'PortBindingLevel': {'id': 'segment_id'},
}

View File

@ -204,6 +204,10 @@ def get_random_string(n=10):
return ''.join(random.choice(string.ascii_lowercase) for _ in range(n))
def get_random_string_list(i=3, n=5):
return [get_random_string(n) for _ in range(0, i)]
def get_random_boolean():
return bool(random.getrandbits(1))

View File

@ -16,7 +16,7 @@ from neutron.db import common_db_mixin
from neutron.db import portsecurity_db_common as pdc
from neutron.extensions import portsecurity as psec
from neutron.objects import base as objects_base
from neutron.objects.network.extensions import port_security as n_ps
from neutron.objects import network
from neutron.objects.port.extensions import port_security as p_ps
from neutron.tests import base
@ -69,7 +69,7 @@ class PortSecurityDbCommonTestCase(base.BaseTestCase):
def test__process_network_port_security_update_no_binding(self):
self._test__process_security_update_no_binding(
'network', n_ps.NetworkPortSecurity,
'network', network.NetworkPortSecurity,
self.plugin._process_network_port_security_update)
def test__extend_port_security_dict_no_port_security(self):

View File

@ -1,38 +0,0 @@
# Copyright 2013 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.objects.network.extensions import port_security
from neutron.tests.unit.objects import test_base as obj_test_base
from neutron.tests.unit import testlib_api
class NetworkPortSecurityIfaceObjTestCase(
obj_test_base.BaseObjectIfaceTestCase):
_test_class = port_security.NetworkPortSecurity
class NetworkPortSecurityDbObjTestCase(obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = port_security.NetworkPortSecurity
def setUp(self):
super(NetworkPortSecurityDbObjTestCase, self).setUp()
for db_obj, obj_field, obj in zip(
self.db_objs, self.obj_fields, self.objs):
network = self._create_network()
db_obj['network_id'] = network['id']
obj_field['id'] = network['id']
obj['id'] = network['id']

View File

@ -1,36 +0,0 @@
# Copyright (c) 2016 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from neutron.objects.network import network_segment
from neutron.tests.unit.objects import test_base as obj_test_base
from neutron.tests.unit import testlib_api
class NetworkSegmentIfaceObjectTestCase(obj_test_base.BaseObjectIfaceTestCase):
_test_class = network_segment.NetworkSegment
class NetworkSegmentDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = network_segment.NetworkSegment
def setUp(self):
super(NetworkSegmentDbObjectTestCase, self).setUp()
self._create_test_network()
for obj in itertools.chain(self.db_objs, self.obj_fields, self.objs):
obj['network_id'] = self._network['id']

View File

@ -50,37 +50,24 @@ class QosPolicyObjectTestCase(test_base.BaseObjectIfaceTestCase):
self.get_random_fields(rule.QosMinimumBandwidthRule)
for _ in range(3)]
self.model_map = {
self.model_map.update({
self._test_class.db_model: self.db_objs,
self._test_class.rbac_db_model: [],
self._test_class.port_binding_model: [],
self._test_class.network_binding_model: [],
rule.QosBandwidthLimitRule.db_model: self.db_qos_bandwidth_rules,
rule.QosDscpMarkingRule.db_model: self.db_qos_dscp_rules,
rule.QosMinimumBandwidthRule.db_model:
self.db_qos_minimum_bandwidth_rules}
self.get_object = mock.patch.object(
db_api, 'get_object', side_effect=self.fake_get_object).start()
self.get_objects = mock.patch.object(
db_api, 'get_objects', side_effect=self.fake_get_objects).start()
def fake_get_objects(self, context, model, **kwargs):
return self.model_map[model]
def fake_get_object(self, context, model, **kwargs):
objects = self.model_map[model]
if not objects:
return None
return [obj for obj in objects if obj['id'] == kwargs['id']][0]
self.db_qos_minimum_bandwidth_rules})
# TODO(ihrachys): stop overriding those test cases, instead base test cases
# should be expanded if there are missing bits there to support QoS objects
def test_get_objects(self):
admin_context = self.context.elevated()
with mock.patch.object(self.context, 'elevated',
return_value=admin_context) as context_mock:
objs = self._test_class.get_objects(self.context)
context_mock.assert_called_once_with()
self.get_objects.assert_any_call(
self.get_objects_mock.assert_any_call(
admin_context, self._test_class.db_model, _pager=None)
self.assertItemsEqual(
[test_base.get_obj_db_fields(obj) for obj in self.objs],
@ -137,14 +124,6 @@ class QosPolicyDbObjectTestCase(test_base.BaseDbObjectTestCase,
def setUp(self):
super(QosPolicyDbObjectTestCase, self).setUp()
self.db_qos_bandwidth_rules = [
self.get_random_fields(rule.QosBandwidthLimitRule)
for _ in range(3)]
self.model_map.update({
rule.QosBandwidthLimitRule.db_model: self.db_qos_bandwidth_rules
})
self._create_test_network()
self._create_test_port(self._network)

View File

@ -39,6 +39,7 @@ from neutron.objects import base
from neutron.objects import common_types
from neutron.objects.db import api as obj_db_api
from neutron.objects import ports
from neutron.objects import rbac_db
from neutron.objects import subnet
from neutron.tests import base as test_base
from neutron.tests import tools
@ -397,6 +398,7 @@ FIELD_TYPE_VALUE_GENERATOR_MAP = {
obj_fields.ObjectField: lambda: None,
obj_fields.ListOfObjectsField: lambda: [],
obj_fields.DictOfStringsField: get_random_dict_of_strings,
obj_fields.ListOfStringsField: tools.get_random_string_list,
common_types.DomainNameField: get_random_domain_name,
common_types.DscpMarkField: get_random_dscp_mark,
obj_fields.IPNetworkField: tools.get_random_ip_network,
@ -541,6 +543,31 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
mock.patch.object(self.context.session, 'refresh').start()
mock.patch.object(self.context.session, 'expunge').start()
self.get_objects_mock = mock.patch.object(
obj_db_api, 'get_objects',
side_effect=self.fake_get_objects).start()
self.get_object_mock = mock.patch.object(
obj_db_api, 'get_object',
side_effect=self.fake_get_object).start()
# NOTE(ihrachys): for matters of basic object behaviour validation,
# mock out rbac code accessing database. There are separate tests that
# cover RBAC, per object type.
if getattr(self._test_class, 'rbac_db_model', None):
mock.patch.object(
rbac_db.RbacNeutronDbObjectMixin,
'is_shared_with_tenant', return_value=False).start()
def fake_get_object(self, context, model, **kwargs):
objects = self.model_map[model]
if not objects:
return None
return [obj for obj in objects if obj['id'] == kwargs['id']][0]
def fake_get_objects(self, context, model, **kwargs):
return self.model_map[model]
# TODO(ihrachys) document the intent of all common test cases in docstrings
def test_get_object(self):
with mock.patch.object(
@ -1247,7 +1274,10 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
obj.create()
for field in remove_timestamps_from_fields(get_obj_db_fields(obj)):
filters = {field: [self.objs[0][field]]}
if not isinstance(self.objs[0][field], list):
filters = {field: [self.objs[0][field]]}
else:
filters = {field: self.objs[0][field]}
new = self._test_class.get_objects(self.context, **filters)
self.assertItemsEqual(
[obj._get_composite_keys()],

View File

@ -0,0 +1,156 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from neutron.objects import base as obj_base
from neutron.objects import network
from neutron.objects.qos import policy
from neutron.tests.unit.objects import test_base as obj_test_base
from neutron.tests.unit import testlib_api
class NetworkPortSecurityIfaceObjTestCase(
obj_test_base.BaseObjectIfaceTestCase):
_test_class = network.NetworkPortSecurity
class NetworkPortSecurityDbObjTestCase(obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = network.NetworkPortSecurity
def setUp(self):
super(NetworkPortSecurityDbObjTestCase, self).setUp()
for db_obj, obj_field, obj in zip(
self.db_objs, self.obj_fields, self.objs):
network = self._create_network()
db_obj['network_id'] = network.id
obj_field['id'] = network.id
obj['id'] = network['id']
class NetworkSegmentIfaceObjTestCase(obj_test_base.BaseObjectIfaceTestCase):
_test_class = network.NetworkSegment
def setUp(self):
super(NetworkSegmentIfaceObjTestCase, self).setUp()
# TODO(ihrachys): we should not need to duplicate that in every single
# place, instead we should move the default pager into the base class
# attribute and pull it from there for testing matters. Leaving it for
# a follow up.
self.pager_map[self._test_class.obj_name()] = (
obj_base.Pager(
sorts=[('network_id', True), ('segment_index', True)]))
class NetworkSegmentDbObjTestCase(obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = network.NetworkSegment
def setUp(self):
super(NetworkSegmentDbObjTestCase, self).setUp()
network = self._create_network()
for obj in itertools.chain(self.db_objs, self.obj_fields, self.objs):
obj['network_id'] = network.id
class NetworkObjectIfaceTestCase(obj_test_base.BaseObjectIfaceTestCase):
_test_class = network.Network
def setUp(self):
super(NetworkObjectIfaceTestCase, self).setUp()
self.pager_map[network.NetworkSegment.obj_name()] = (
obj_base.Pager(
sorts=[('network_id', True), ('segment_index', True)]))
class NetworkDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = network.Network
def test_qos_policy_id(self):
policy_obj = policy.QosPolicy(self.context)
policy_obj.create()
obj = self._make_object(self.obj_fields[0])
obj.qos_policy_id = policy_obj.id
obj.create()
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual(policy_obj.id, obj.qos_policy_id)
policy_obj2 = policy.QosPolicy(self.context)
policy_obj2.create()
obj.qos_policy_id = policy_obj2.id
obj.update()
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual(policy_obj2.id, obj.qos_policy_id)
obj.qos_policy_id = None
obj.update()
obj = network.Network.get_object(self.context, id=obj.id)
self.assertIsNone(obj.qos_policy_id)
def test__attach_qos_policy(self):
obj = self._make_object(self.obj_fields[0])
obj.create()
policy_obj = policy.QosPolicy(self.context)
policy_obj.create()
obj._attach_qos_policy(policy_obj.id)
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual(policy_obj.id, obj.qos_policy_id)
policy_obj2 = policy.QosPolicy(self.context)
policy_obj2.create()
obj._attach_qos_policy(policy_obj2.id)
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual(policy_obj2.id, obj.qos_policy_id)
def test_dns_domain(self):
obj = self._make_object(self.obj_fields[0])
obj.dns_domain = 'foo.com'
obj.create()
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual('foo.com', obj.dns_domain)
obj.dns_domain = 'bar.com'
obj.update()
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual('bar.com', obj.dns_domain)
obj.dns_domain = None
obj.update()
obj = network.Network.get_object(self.context, id=obj.id)
self.assertIsNone(obj.dns_domain)
def test__set_dns_domain(self):
obj = self._make_object(self.obj_fields[0])
obj.create()
obj._set_dns_domain('foo.com')
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual('foo.com', obj.dns_domain)
obj._set_dns_domain('bar.com')
obj = network.Network.get_object(self.context, id=obj.id)
self.assertEqual('bar.com', obj.dns_domain)

View File

@ -24,7 +24,8 @@ from neutron.tests import base as test_base
# NOTE: The hashes in this list should only be changed if they come with a
# corresponding version bump in the affected objects.
# corresponding version bump in the affected objects. Please keep the list in
# alphabetic order.
object_data = {
'_DefaultSecurityGroup': '1.0-971520cb2e0ec06d747885a0cf78347f',
'AddressScope': '1.0-25560799db384acfe1549634959a82b4',
@ -34,6 +35,7 @@ object_data = {
'ExtraDhcpOpt': '1.0-632f689cbeb36328995a7aed1d0a78d3',
'IPAllocation': '1.0-47251b4c6d45c3b5feb0297fe5c461f2',
'IPAllocationPool': '1.0-371016a6480ed0b4299319cb46d9215d',
'Network': '1.0-f2f6308f79731a767b92b26b0f4f3849',
'NetworkPortSecurity': '1.0-b30802391a87945ee9c07582b4ff95e3',
'NetworkSegment': '1.0-40707ef6bd9a0bf095038158d995cc7d',
'Port': '1.0-638f6b09a3809ebd8b2b46293f56871b',

View File

@ -19,6 +19,7 @@ from neutron.db.models import securitygroup as sg_models
from neutron.db import models_v2
from neutron.objects import base as obj_base
from neutron.objects.db import api as obj_db_api
from neutron.objects import network
from neutron.objects import ports
from neutron.objects.qos import policy
from neutron.tests import tools
@ -195,6 +196,9 @@ class PortBindingLevelIfaceObjTestCase(
obj['segment_id'] = None
self.pager_map[self._test_class.obj_name()] = (
obj_base.Pager(sorts=[('port_id', True), ('level', True)]))
self.pager_map[network.NetworkSegment.obj_name()] = (
obj_base.Pager(
sorts=[('network_id', True), ('segment_index', True)]))
class PortBindingLevelDbObjectTestCase(

View File

@ -588,6 +588,15 @@ class DHCPAgentWeightSchedulerTestCase(test_plugin.Ml2PluginV2TestCase):
self.assertEqual('host-c', agent2[0]['host'])
self.assertEqual('host-d', agent3[0]['host'])
def _get_network_with_candidate_hosts(self, net_id, seg_id):
# expire the session so that the segment is fully reloaded on fetch,
# including its new host mapping
self.ctx.session.expire_all()
net = self.plugin.get_network(self.ctx, net_id)
seg = self.segments_plugin.get_segment(self.ctx, seg_id)
net['candidate_hosts'] = seg['hosts']
return net
def test_schedule_segment_one_hostable_agent(self):
net_id = self._create_network()
seg_id = self._create_segment(net_id)
@ -595,9 +604,7 @@ class DHCPAgentWeightSchedulerTestCase(test_plugin.Ml2PluginV2TestCase):
helpers.register_dhcp_agent(HOST_D)
segments_service_db.update_segment_host_mapping(
self.ctx, HOST_C, {seg_id})
net = self.plugin.get_network(self.ctx, net_id)
seg = self.segments_plugin.get_segment(self.ctx, seg_id)
net['candidate_hosts'] = seg['hosts']
net = self._get_network_with_candidate_hosts(net_id, seg_id)
agents = self.plugin.network_scheduler.schedule(
self.plugin, self.ctx, net)
self.assertEqual(1, len(agents))
@ -612,9 +619,7 @@ class DHCPAgentWeightSchedulerTestCase(test_plugin.Ml2PluginV2TestCase):
self.ctx, HOST_C, {seg_id})
segments_service_db.update_segment_host_mapping(
self.ctx, HOST_D, {seg_id})
net = self.plugin.get_network(self.ctx, net_id)
seg = self.segments_plugin.get_segment(self.ctx, seg_id)
net['candidate_hosts'] = seg['hosts']
net = self._get_network_with_candidate_hosts(net_id, seg_id)
agents = self.plugin.network_scheduler.schedule(
self.plugin, self.ctx, net)
self.assertEqual(1, len(agents))
@ -642,9 +647,7 @@ class DHCPAgentWeightSchedulerTestCase(test_plugin.Ml2PluginV2TestCase):
self.ctx, HOST_C, {seg_id})
segments_service_db.update_segment_host_mapping(
self.ctx, HOST_D, {seg_id})
net = self.plugin.get_network(self.ctx, net_id)
seg = self.segments_plugin.get_segment(self.ctx, seg_id)
net['candidate_hosts'] = seg['hosts']
net = self._get_network_with_candidate_hosts(net_id, seg_id)
agents = self.plugin.network_scheduler.schedule(
self.plugin, self.ctx, net)
self.assertEqual(2, len(agents))
@ -659,9 +662,7 @@ class DHCPAgentWeightSchedulerTestCase(test_plugin.Ml2PluginV2TestCase):
helpers.register_dhcp_agent(HOST_D)
segments_service_db.update_segment_host_mapping(
self.ctx, HOST_C, {seg_id})
net = self.plugin.get_network(self.ctx, net_id)
seg = self.segments_plugin.get_segment(self.ctx, seg_id)
net['candidate_hosts'] = seg['hosts']
net = self._get_network_with_candidate_hosts(net_id, seg_id)
agents = self.plugin.network_scheduler.schedule(
self.plugin, self.ctx, net)
self.assertEqual(1, len(agents))