populate port security default into network

Previously, the default value would be populated into attr by API
controller, but some codes in plugin or service plugins call plugin
to create network directly, such as l3, which will have no default
value populated.
This patch fixes it by populating default port_security value into
network data.

In addition, for network without port-security set, we also give the
default value to populate the return network dict object, which will
let the extension construct the response dictionary gracefully for
those existing network.

Co-Authored-By: gong yong sheng <gong.yongsheng@99cloud.net>

Change-Id: I73abc98d83372f6259f17680806e6541458e2077
Closes-bug: #1461519
Closes-Bug: #1461647
Closes-Bug: #1468588
This commit is contained in:
Kahou Lei 2015-06-03 16:39:11 -07:00 committed by gong yong sheng
parent c24c3f4c90
commit f4e1289d8d
3 changed files with 44 additions and 3 deletions

View File

@ -38,7 +38,9 @@ class PortSecurityExtensionDriver(api.ExtensionDriver,
def process_create_network(self, context, data, result): def process_create_network(self, context, data, result):
# Create the network extension attributes. # Create the network extension attributes.
if psec.PORTSECURITY in data: if psec.PORTSECURITY not in data:
data[psec.PORTSECURITY] = (psec.EXTENDED_ATTRIBUTES_2_0['networks']
[psec.PORTSECURITY]['default'])
self._process_network_port_security_create(context, data, result) self._process_network_port_security_create(context, data, result)
def process_update_network(self, context, data, result): def process_update_network(self, context, data, result):
@ -63,6 +65,11 @@ class PortSecurityExtensionDriver(api.ExtensionDriver,
self._extend_port_security_dict(result, db_data) self._extend_port_security_dict(result, db_data)
def _extend_port_security_dict(self, response_data, db_data): def _extend_port_security_dict(self, response_data, db_data):
if db_data.get('port_security') is None:
response_data[psec.PORTSECURITY] = (
psec.EXTENDED_ATTRIBUTES_2_0['networks']
[psec.PORTSECURITY]['default'])
else:
response_data[psec.PORTSECURITY] = ( response_data[psec.PORTSECURITY] = (
db_data['port_security'][psec.PORTSECURITY]) db_data['port_security'][psec.PORTSECURITY])

View File

@ -23,6 +23,7 @@ from neutron.db import securitygroups_db
from neutron.extensions import portsecurity as psec from neutron.extensions import portsecurity as psec
from neutron.extensions import securitygroup as ext_sg from neutron.extensions import securitygroup as ext_sg
from neutron import manager from neutron import manager
from neutron.plugins.ml2.extensions import port_security
from neutron.tests.unit.db import test_db_base_plugin_v2 from neutron.tests.unit.db import test_db_base_plugin_v2
from neutron.tests.unit.extensions import test_securitygroup from neutron.tests.unit.extensions import test_securitygroup
@ -399,3 +400,15 @@ class TestPortSecurity(PortSecurityDBTestCase):
'', 'not_network_owner') '', 'not_network_owner')
res = req.get_response(self.api) res = req.get_response(self.api)
self.assertEqual(res.status_int, exc.HTTPForbidden.code) self.assertEqual(res.status_int, exc.HTTPForbidden.code)
def test_extend_port_dict_no_port_security(self):
"""Test _extend_port_security_dict won't crash
if port_security item is None
"""
for db_data in ({'port_security': None, 'name': 'net1'}, {}):
response_data = {}
driver = port_security.PortSecurityExtensionDriver()
driver._extend_port_security_dict(response_data, db_data)
self.assertTrue(response_data[psec.PORTSECURITY])

View File

@ -13,7 +13,9 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from neutron import context
from neutron.extensions import portsecurity as psec from neutron.extensions import portsecurity as psec
from neutron import manager
from neutron.plugins.ml2 import config from neutron.plugins.ml2 import config
from neutron.tests.unit.extensions import test_portsecurity as test_psec from neutron.tests.unit.extensions import test_portsecurity as test_psec
from neutron.tests.unit.plugins.ml2 import test_plugin from neutron.tests.unit.plugins.ml2 import test_plugin
@ -29,6 +31,25 @@ class PSExtDriverTestCase(test_plugin.Ml2PluginV2TestCase,
group='ml2') group='ml2')
super(PSExtDriverTestCase, self).setUp() super(PSExtDriverTestCase, self).setUp()
def test_create_net_port_security_default(self):
_core_plugin = manager.NeutronManager.get_plugin()
admin_ctx = context.get_admin_context()
_default_value = (psec.EXTENDED_ATTRIBUTES_2_0['networks']
[psec.PORTSECURITY]['default'])
args = {'network':
{'name': 'test',
'tenant_id': '',
'shared': False,
'admin_state_up': True,
'status': 'ACTIVE'}}
try:
network = _core_plugin.create_network(admin_ctx, args)
_value = network[psec.PORTSECURITY]
finally:
if network:
_core_plugin.delete_network(admin_ctx, network['id'])
self.assertEqual(_default_value, _value)
def test_create_port_with_secgroup_none_and_port_security_false(self): def test_create_port_with_secgroup_none_and_port_security_false(self):
if self._skip_security_group: if self._skip_security_group:
self.skipTest("Plugin does not support security groups") self.skipTest("Plugin does not support security groups")