diff --git a/neutron/tests/unit/test_extension_allowedaddresspairs.py b/neutron/tests/unit/test_extension_allowedaddresspairs.py index a4cd042e9cb..753523ad6b8 100644 --- a/neutron/tests/unit/test_extension_allowedaddresspairs.py +++ b/neutron/tests/unit/test_extension_allowedaddresspairs.py @@ -20,6 +20,7 @@ from neutron.db import db_base_plugin_v2 from neutron.db import portsecurity_db from neutron.extensions import allowedaddresspairs as addr_pair from neutron.extensions import portsecurity as psec +from neutron.extensions import securitygroup as secgroup from neutron import manager from neutron.tests.unit import test_db_plugin from oslo.config import cfg @@ -247,26 +248,23 @@ class TestAllowedAddressPairs(AllowedAddressPairDBTestCase): if self._skip_port_security: self.skipTest("Plugin does not implement port-security extension") with self.network() as net: - with self.subnet(network=net): + with self.subnet(network=net) as subnet: address_pairs = [{'mac_address': '00:00:00:00:00:01', 'ip_address': '10.0.0.1'}] - res = self._create_port(self.fmt, net['network']['id'], - arg_list=('port_security_enabled', - addr_pair.ADDRESS_PAIRS,), - port_security_enabled=True, - allowed_address_pairs=address_pairs) - port = self.deserialize(self.fmt, res) - update_port = {'port': {psec.PORTSECURITY: False}} - # If plugin implements security groups we also need to remove - # the security group on port. - plugin_obj = manager.NeutronManager.get_plugin() - if 'security-groups' in plugin_obj.supported_extension_aliases: - update_port['port']['security_groups'] = [] - req = self.new_update_request('ports', update_port, - port['port']['id']) - res = req.get_response(self.api) - self.assertEqual(res.status_int, 409) - self._delete('ports', port['port']['id']) + # The port should not have any security-groups associated to it + with self.port(subnet=subnet, + arg_list=(psec.PORTSECURITY, + addr_pair.ADDRESS_PAIRS, + secgroup.SECURITYGROUPS), + port_security_enabled=True, + allowed_address_pairs=address_pairs, + security_groups=[]) as port: + + update_port = {'port': {psec.PORTSECURITY: False}} + req = self.new_update_request('ports', update_port, + port['port']['id']) + res = req.get_response(self.api) + self.assertEqual(409, res.status_int) def test_create_port_remove_allowed_address_pairs(self): with self.network() as net: