objects: introduce NetworkPortSecurity object

Since there are two models for binding, and it's easier if they behave
identically from objects POV, renamed port_id field in PortSecurity into
id, and kept the same fields in NetworkPortSecurity.

Switched db code to using those objects. Some refactoring took place to
reduce code duplication.

Partially-Implements: blueprint adopt-oslo-versioned-objects-for-db
Change-Id: Ic104f113c4aa3f0c1448f83fe5128feed7b4444b
This commit is contained in:
Ihar Hrachyshka 2016-06-08 19:45:18 +02:00
parent 4449d1b825
commit 9fcc8c52c9
15 changed files with 225 additions and 148 deletions

View File

@ -40,8 +40,8 @@ from neutron.db import l3_hamode_db # noqa
from neutron.db.metering import metering_db # noqa
from neutron.db import model_base
from neutron.db import models_v2 # noqa
from neutron.db.port_security import models # noqa
from neutron.db import portbindings_db # noqa
from neutron.db import portsecurity_db # noqa
from neutron.db import provisioning_blocks # noqa
from neutron.db.qos import models as qos_models # noqa
from neutron.db.quota import models # noqa

View File

View File

@ -0,0 +1,46 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sa
from sqlalchemy import orm
from neutron.db import model_base
from neutron.db import models_v2
class PortSecurityBinding(model_base.BASEV2):
port_id = sa.Column(sa.String(36),
sa.ForeignKey('ports.id', ondelete="CASCADE"),
primary_key=True)
port_security_enabled = sa.Column(sa.Boolean(), nullable=False)
# Add a relationship to the Port model in order to be to able to
# instruct SQLAlchemy to eagerly load port security binding
port = orm.relationship(
models_v2.Port,
backref=orm.backref("port_security", uselist=False,
cascade='delete', lazy='joined'))
class NetworkSecurityBinding(model_base.BASEV2):
network_id = sa.Column(sa.String(36),
sa.ForeignKey('networks.id', ondelete="CASCADE"),
primary_key=True)
port_security_enabled = sa.Column(sa.Boolean(), nullable=False)
# Add a relationship to the Port model in order to be able to instruct
# SQLAlchemy to eagerly load default port security setting for ports
# on this network
network = orm.relationship(
models_v2.Network,
backref=orm.backref("port_security", uselist=False,
cascade='delete', lazy='joined'))

View File

@ -12,42 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
from neutron.db import model_base
from neutron.db import models_v2
from neutron.extensions import portsecurity as psec
class PortSecurityBinding(model_base.BASEV2):
port_id = sa.Column(sa.String(36),
sa.ForeignKey('ports.id', ondelete="CASCADE"),
primary_key=True)
port_security_enabled = sa.Column(sa.Boolean(), nullable=False)
# Add a relationship to the Port model in order to be to able to
# instruct SQLAlchemy to eagerly load port security binding
port = orm.relationship(
models_v2.Port,
backref=orm.backref("port_security", uselist=False,
cascade='delete', lazy='joined'))
class NetworkSecurityBinding(model_base.BASEV2):
network_id = sa.Column(sa.String(36),
sa.ForeignKey('networks.id', ondelete="CASCADE"),
primary_key=True)
port_security_enabled = sa.Column(sa.Boolean(), nullable=False)
# Add a relationship to the Port model in order to be able to instruct
# SQLAlchemy to eagerly load default port security setting for ports
# on this network
network = orm.relationship(
models_v2.Network,
backref=orm.backref("port_security", uselist=False,
cascade='delete', lazy='joined'))
from neutron.objects.network.extensions import port_security as n_ps
from neutron.objects.port.extensions import port_security as p_ps
class PortSecurityDbCommon(object):
@ -60,90 +27,71 @@ class PortSecurityDbCommon(object):
response_data[psec.PORTSECURITY] = (
db_data['port_security'][psec.PORTSECURITY])
def _process_network_port_security_create(
self, context, network_req, network_res):
with context.session.begin(subtransactions=True):
db = NetworkSecurityBinding(
network_id=network_res['id'],
port_security_enabled=network_req[psec.PORTSECURITY])
context.session.add(db)
network_res[psec.PORTSECURITY] = network_req[psec.PORTSECURITY]
return self._make_network_port_security_dict(db)
def _process_port_security_create(
self, context, obj_cls, res_name, req, res):
obj = obj_cls(
context,
id=res['id'],
port_security_enabled=req[psec.PORTSECURITY]
)
obj.create()
res[psec.PORTSECURITY] = req[psec.PORTSECURITY]
return self._make_port_security_dict(obj, res_name)
def _process_port_port_security_create(
self, context, port_req, port_res):
with context.session.begin(subtransactions=True):
db = PortSecurityBinding(
port_id=port_res['id'],
port_security_enabled=port_req[psec.PORTSECURITY])
context.session.add(db)
port_res[psec.PORTSECURITY] = port_req[psec.PORTSECURITY]
return self._make_port_security_dict(db)
self._process_port_security_create(
context, p_ps.PortSecurity, 'port',
port_req, port_res)
def _process_network_port_security_create(
self, context, network_req, network_res):
self._process_port_security_create(
context, n_ps.NetworkPortSecurity, 'network',
network_req, network_res)
def _get_security_binding(self, context, obj_cls, res_id):
obj = obj_cls.get_object(context, id=res_id)
# NOTE(ihrachys) the resource may have been created before port
# security extension was enabled; return default value
return obj.port_security_enabled if obj else psec.DEFAULT_PORT_SECURITY
def _get_network_security_binding(self, context, network_id):
try:
query = self._model_query(context, NetworkSecurityBinding)
binding = query.filter(
NetworkSecurityBinding.network_id == network_id).one()
return binding.port_security_enabled
except exc.NoResultFound:
# NOTE(ihrachys) the resource may have been created before port
# security extension was enabled; return default value
return psec.DEFAULT_PORT_SECURITY
return self._get_security_binding(
context, n_ps.NetworkPortSecurity, network_id)
def _get_port_security_binding(self, context, port_id):
try:
query = self._model_query(context, PortSecurityBinding)
binding = query.filter(
PortSecurityBinding.port_id == port_id).one()
return binding.port_security_enabled
except exc.NoResultFound:
# NOTE(ihrachys) the resource may have been created before port
# security extension was enabled; return default value
return psec.DEFAULT_PORT_SECURITY
return self._get_security_binding(context, p_ps.PortSecurity, port_id)
def _process_port_port_security_update(
self, context, port_req, port_res):
if psec.PORTSECURITY not in port_req:
return
port_security_enabled = port_req[psec.PORTSECURITY]
try:
query = self._model_query(context, PortSecurityBinding)
port_id = port_res['id']
binding = query.filter(
PortSecurityBinding.port_id == port_id).one()
binding.port_security_enabled = port_security_enabled
port_res[psec.PORTSECURITY] = port_security_enabled
except exc.NoResultFound:
# NOTE(ihrachys) the resource may have been created before port
# security extension was enabled; create the binding model
self._process_port_port_security_create(
context, port_req, port_res)
self._process_port_security_update(
context, p_ps.PortSecurity, 'port', port_req, port_res)
def _process_network_port_security_update(
self, context, network_req, network_res):
if psec.PORTSECURITY not in network_req:
self._process_port_security_update(
context, n_ps.NetworkPortSecurity, 'network',
network_req, network_res)
def _process_port_security_update(
self, context, obj_cls, res_name, req, res):
if psec.PORTSECURITY not in req:
return
port_security_enabled = network_req[psec.PORTSECURITY]
try:
query = self._model_query(context, NetworkSecurityBinding)
network_id = network_res['id']
binding = query.filter(
NetworkSecurityBinding.network_id == network_id).one()
binding.port_security_enabled = port_security_enabled
network_res[psec.PORTSECURITY] = port_security_enabled
except exc.NoResultFound:
port_security_enabled = req[psec.PORTSECURITY]
obj = obj_cls.get_object(context, id=res['id'])
if obj:
obj.port_security_enabled = port_security_enabled
obj.update()
res[psec.PORTSECURITY] = port_security_enabled
else:
# NOTE(ihrachys) the resource may have been created before port
# security extension was enabled; create the binding model
self._process_network_port_security_create(
context, network_req, network_res)
self._process_port_security_create(
context, obj_cls, res_name, req, res)
def _make_network_port_security_dict(self, port_security, fields=None):
res = {'network_id': port_security['network_id'],
psec.PORTSECURITY: port_security.port_security_enabled}
return self._fields(res, fields)
def _make_port_security_dict(self, port, fields=None):
res = {'port_id': port['port_id'],
psec.PORTSECURITY: port.port_security_enabled}
return self._fields(res, fields)
def _make_port_security_dict(self, res, res_name, fields=None):
res_ = {'%s_id' % res_name: res.id,
psec.PORTSECURITY: res.port_security_enabled}
return self._fields(res_, fields)

View File

@ -0,0 +1,24 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_versionedobjects import fields as obj_fields
from neutron.extensions import portsecurity
from neutron.objects import base
class _PortSecurity(base.NeutronDbObject):
fields = {
'id': obj_fields.UUIDField(),
'port_security_enabled': obj_fields.BooleanField(
default=portsecurity.DEFAULT_PORT_SECURITY),
}

View File

View File

@ -0,0 +1,26 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_versionedobjects import base as obj_base
from neutron.db.port_security import models
from neutron.objects.extensions import port_security as base_ps
@obj_base.VersionedObjectRegistry.register
class NetworkPortSecurity(base_ps._PortSecurity):
# Version 1.0: Initial version
VERSION = "1.0"
fields_need_translation = {'id': 'network_id'}
db_model = models.NetworkSecurityBinding

View File

@ -1,5 +1,3 @@
# Copyright 2013 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -13,22 +11,16 @@
# under the License.
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from neutron.db import portsecurity_db_common as models
from neutron.objects import base
from neutron.db.port_security import models
from neutron.objects.extensions import port_security as base_ps
@obj_base.VersionedObjectRegistry.register
class PortSecurity(base.NeutronDbObject):
class PortSecurity(base_ps._PortSecurity):
# Version 1.0: Initial version
VERSION = "1.0"
fields_need_translation = {'id': 'port_id'}
db_model = models.PortSecurityBinding
primary_keys = ['port_id']
fields = {
'port_id': obj_fields.UUIDField(),
'port_security_enabled': obj_fields.BooleanField(default=True),
}

View File

@ -11,66 +11,69 @@
# under the License.
import mock
from sqlalchemy.orm import exc
from neutron.db import common_db_mixin
from neutron.db import portsecurity_db_common as pdc
from neutron.extensions import portsecurity as psec
from neutron.objects import base as objects_base
from neutron.objects.network.extensions import port_security as n_ps
from neutron.objects.port.extensions import port_security as p_ps
from neutron.tests import base
common = pdc.PortSecurityDbCommon
class FakePlugin(pdc.PortSecurityDbCommon, common_db_mixin.CommonDbMixin):
pass
class PortSecurityDbCommonTestCase(base.BaseTestCase):
def setUp(self):
super(PortSecurityDbCommonTestCase, self).setUp()
self.common = common()
self.plugin = FakePlugin()
def _test__get_security_binding_no_binding(self, getter):
port_sec_enabled = True
req = {psec.PORTSECURITY: port_sec_enabled}
res = {}
with mock.patch.object(
self.common, '_model_query',
create=True,
side_effect=exc.NoResultFound):
objects_base.NeutronDbObject, 'get_object',
return_value=None):
val = getter(req, res)
self.assertEqual(port_sec_enabled, val)
def test__get_port_security_binding_no_binding(self):
self._test__get_security_binding_no_binding(
self.common._get_port_security_binding)
self.plugin._get_port_security_binding)
def test__get_network_security_binding_no_binding(self):
self._test__get_security_binding_no_binding(
self.common._get_network_security_binding)
self.plugin._get_network_security_binding)
def _test__process_security_update_no_binding(self, creator, updater):
def _test__process_security_update_no_binding(self, res_name, obj_cls,
updater):
req = {psec.PORTSECURITY: False}
res = {}
context = mock.Mock()
res = {'id': 'fake-id'}
context = mock.MagicMock()
with mock.patch.object(
self.common, '_model_query',
create=True,
side_effect=exc.NoResultFound):
updater(context, req, res)
creator.assert_called_with(context, req, res)
self.plugin, '_process_port_security_create') as creator:
with mock.patch.object(
objects_base.NeutronDbObject, 'get_object',
return_value=None):
updater(context, req, res)
creator.assert_called_with(context, obj_cls, res_name, req, res)
@mock.patch.object(common, '_process_port_port_security_create')
def test__process_port_port_security_update_no_binding(self, creator):
self._test__process_security_update_no_binding(
creator,
self.common._process_port_port_security_update)
def test__process_port_port_security_update_no_binding(self):
self._test__process_security_update_no_binding(
'port', p_ps.PortSecurity,
self.plugin._process_port_port_security_update)
@mock.patch.object(common, '_process_network_port_security_create')
def test__process_network_port_security_update_no_binding(self, creator):
self._test__process_security_update_no_binding(
creator,
self.common._process_network_port_security_update)
def test__process_network_port_security_update_no_binding(self):
self._test__process_security_update_no_binding(
'network', n_ps.NetworkPortSecurity,
self.plugin._process_network_port_security_update)
def test__extend_port_security_dict_no_port_security(self):
for db_data in ({'port_security': None, 'name': 'net1'}, {}):
response_data = {}
self.common._extend_port_security_dict(response_data, db_data)
self.plugin._extend_port_security_dict(response_data, db_data)
self.assertTrue(response_data[psec.PORTSECURITY])

View File

@ -0,0 +1,37 @@
# Copyright 2013 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.objects.network.extensions import port_security
from neutron.tests.unit.objects import test_base as obj_test_base
from neutron.tests.unit import testlib_api
class NetworkPortSecurityIfaceObjTestCase(
obj_test_base.BaseObjectIfaceTestCase):
_test_class = port_security.NetworkPortSecurity
class NetworkPortSecurityDbObjTestCase(obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = port_security.NetworkPortSecurity
def setUp(self):
super(NetworkPortSecurityDbObjTestCase, self).setUp()
self._create_test_network()
for obj in self.db_objs:
obj['network_id'] = self._network['id']
for obj in self.obj_fields:
obj['id'] = self._network['id']

View File

@ -34,4 +34,4 @@ class PortSecurityDbObjTestCase(obj_test_base.BaseDbObjectTestCase,
for obj in self.db_objs:
obj['port_id'] = self._port['id']
for obj in self.obj_fields:
obj['port_id'] = self._port['id']
obj['id'] = self._port['id']

View File

@ -30,7 +30,8 @@ object_data = {
'DNSNameServer': '1.0-bf87a85327e2d812d1666ede99d9918b',
'ExtraDhcpOpt': '1.0-632f689cbeb36328995a7aed1d0a78d3',
'IPAllocationPool': '1.0-371016a6480ed0b4299319cb46d9215d',
'PortSecurity': '1.0-cf5b382a0112080ec4e0f23f697c7ab2',
'NetworkPortSecurity': '1.0-b30802391a87945ee9c07582b4ff95e3',
'PortSecurity': '1.0-b30802391a87945ee9c07582b4ff95e3',
'AllowedAddressPair': '1.0-9f9186b6f952fbf31d257b0458b852c0',
'QosBandwidthLimitRule': '1.1-4e44a8f5c2895ab1278399f87b40a13d',
'QosDscpMarkingRule': '1.1-0313c6554b34fd10c753cb63d638256c',