diff --git a/vmware_nsx/plugins/nsx_v3/plugin.py b/vmware_nsx/plugins/nsx_v3/plugin.py index 5584ab9b3a..06436a6db5 100644 --- a/vmware_nsx/plugins/nsx_v3/plugin.py +++ b/vmware_nsx/plugins/nsx_v3/plugin.py @@ -30,6 +30,7 @@ from neutron.common import topics from neutron.common import utils as neutron_utils from neutron.db import agents_db from neutron.db import agentschedulers_db +from neutron.db import allowedaddresspairs_db as addr_pair_db from neutron.db import db_base_plugin_v2 from neutron.db import external_net_db from neutron.db import extradhcpopt_db @@ -38,11 +39,14 @@ from neutron.db import l3_db from neutron.db import l3_gwmode_db from neutron.db import models_v2 from neutron.db import portbindings_db +from neutron.db import portsecurity_db from neutron.db import securitygroups_db +from neutron.extensions import allowedaddresspairs as addr_pair from neutron.extensions import external_net as ext_net_extn from neutron.extensions import extra_dhcp_opt as ext_edo from neutron.extensions import l3 from neutron.extensions import portbindings as pbin +from neutron.extensions import portsecurity as psec from neutron.extensions import providernet as pnet from neutron.extensions import securitygroup as ext_sg from neutron.i18n import _LE, _LI, _LW @@ -67,14 +71,17 @@ from vmware_nsx.nsxlib.v3 import security LOG = log.getLogger(__name__) +NSX_V3_PSEC_PROFILE_NAME = 'neutron_port_spoof_guard_profile' -class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2, +class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, + db_base_plugin_v2.NeutronDbPluginV2, securitygroups_db.SecurityGroupDbMixin, external_net_db.External_net_db_mixin, extraroute_db.ExtraRoute_db_mixin, l3_gwmode_db.L3_NAT_db_mixin, portbindings_db.PortBindingMixin, + portsecurity_db.PortSecurityDbMixin, agentschedulers_db.DhcpAgentSchedulerDbMixin, extradhcpopt_db.ExtraDhcpOptMixin): @@ -82,11 +89,13 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2, __native_pagination_support = True __native_sorting_support = True - supported_extension_aliases = ["quotas", + supported_extension_aliases = ["allowed-address-pairs", + "quotas", "binding", "extra_dhcp_opt", "ext-gw-mode", "security-group", + "port-security", "provider", "external-net", "extraroute", @@ -109,6 +118,45 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.nsgroup_container, self.default_section = ( security.init_nsgroup_container_and_default_section_rules()) + LOG.debug("Initializing NSX v3 port spoofguard switching profile") + self._switching_profiles = nsx_resources.SwitchingProfile( + self._nsx_client) + self._psec_profile = None + self._psec_profile = self._init_port_security_profile() + if not self._psec_profile: + msg = (_("Unable to initialize NSX v3 port spoofguard " + "switching profile: %s") % NSX_V3_PSEC_PROFILE_NAME) + raise nsx_exc.NsxPluginException(msg) + + def _get_port_security_profile_id(self): + return nsx_resources.SwitchingProfile.build_switch_profile_ids( + self._switching_profiles, self._get_port_security_profile())[0] + + def _get_port_security_profile(self): + if self._psec_profile: + return self._psec_profile + profile = self._switching_profiles.find_by_display_name( + NSX_V3_PSEC_PROFILE_NAME) + return profile[0] if profile else None + + @utils.retry_upon_exception_nsxv3(Exception) + def _init_port_security_profile(self): + # NOTE(boden): potential race cond with distributed plugins + # whereupon a different plugin could create the profile + # after we don't find an existing one and create another + profile = self._get_port_security_profile() + if profile: + return profile + + self._switching_profiles.create_spoofguard_profile( + NSX_V3_PSEC_PROFILE_NAME, 'Neutron Port Security Profile', + whitelist_ports=True, whitelist_switches=False, + tags=utils.build_v3_tags_payload({ + 'id': NSX_V3_PSEC_PROFILE_NAME, + 'tenant_id': 'neutron-nsx-plugin'})) + + return self._get_port_security_profile() + def _setup_rpc(self): self.topic = topics.PLUGIN self.conn = n_rpc.create_connection(new=True) @@ -259,12 +307,16 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2, self._create_network_at_the_backend(context, net_data)) tenant_id = self._get_tenant_id_for_create( context, net_data) + self._ensure_default_security_group(context, tenant_id) with context.session.begin(subtransactions=True): # Create network in Neutron try: created_net = super(NsxV3Plugin, self).create_network(context, network) + + self._process_network_port_security_create( + context, net_data, created_net) self._process_l3_create(context, created_net, net_data) except Exception: with excutils.save_and_reraise_exception(): @@ -308,6 +360,10 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2, pnet._raise_if_updates_provider_attributes(net_data) updated_net = super(NsxV3Plugin, self).update_network(context, id, network) + + if psec.PORTSECURITY in network['network']: + self._process_network_port_security_update( + context, network['network'], updated_net) self._process_l3_update(context, updated_net, network['network']) self._extend_network_dict_provider(context, updated_net) @@ -343,10 +399,18 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2, # NOTE(arosen): nsx-v3 doesn't seem to handle ipv6 addresses # currently so for now we remove them here and do not pass # them to the backend which would raise an error. - if(netaddr.IPNetwork(fixed_ip['ip_address']).version == 6): + if netaddr.IPNetwork(fixed_ip['ip_address']).version == 6: continue address_bindings.append(nsx_resources.PacketAddressClassifier( fixed_ip['ip_address'], port['mac_address'], None)) + + for pair in port.get(addr_pair.ADDRESS_PAIRS): + address_bindings.append(nsx_resources.PacketAddressClassifier( + pair['ip_address'], pair['mac_address'], None)) + + # TODO(boden): this default pair is not needed with nsxv3 for dhcp + address_bindings.append(nsx_resources.PacketAddressClassifier( + '0.0.0.0', port['mac_address'], None)) return address_bindings def get_network(self, context, id, fields=None): @@ -412,7 +476,8 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2, return parent_name, tag def _create_port_at_the_backend(self, context, neutron_db, - port_data, l2gw_port_check): + port_data, l2gw_port_check, + psec_is_on): tags = utils.build_v3_tags_payload(port_data) parent_name, tag = self._get_data_from_binding_profile( context, port_data) @@ -427,6 +492,11 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2, # for ports plugged into a Bridge Endpoint. vif_uuid = port_data.get('device_id') attachment_type = port_data.get('device_owner') + + profiles = None + if psec_is_on: + profiles = [self._get_port_security_profile_id()] + result = self._port_client.create( port_data['network_id'], vif_uuid, tags=tags, @@ -434,7 +504,8 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2, admin_state=port_data['admin_state_up'], address_bindings=address_bindings, attachment_type=attachment_type, - parent_name=parent_name, parent_tag=tag) + parent_name=parent_name, parent_tag=tag, + switch_profile_ids=profiles) # TODO(salv-orlando): The logical switch identifier in the # mapping object is not necessary anymore. @@ -443,43 +514,73 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2, neutron_db['network_id'], result['id']) return result - def create_port(self, context, port, l2gw_port_check=False): - dhcp_opts = port['port'].get(ext_edo.EXTRADHCPOPTS, []) - port_id = uuidutils.generate_uuid() - port['port']['id'] = port_id + def _create_port_preprocess_security( + self, context, port, port_data, neutron_db): + (port_security, has_ip) = self._determine_port_security_and_has_ip( + context, port_data) + port_data[psec.PORTSECURITY] = port_security + self._process_port_port_security_create( + context, port_data, neutron_db) + # allowed address pair checks + if attributes.is_attr_set(port_data.get(addr_pair.ADDRESS_PAIRS)): + if not port_security: + raise addr_pair.AddressPairAndPortSecurityRequired() + else: + self._process_create_allowed_address_pairs( + context, neutron_db, + port_data[addr_pair.ADDRESS_PAIRS]) + else: + # remove ATTR_NOT_SPECIFIED + port_data[addr_pair.ADDRESS_PAIRS] = [] + + if port_security and has_ip: + self._ensure_default_security_group_on_port(context, port) + elif self._check_update_has_security_groups( + {'port': port_data}): + raise psec.PortSecurityAndIPRequiredForSecurityGroups() + port_data[ext_sg.SECURITYGROUPS] = ( + self._get_security_groups_on_port(context, port)) + return port_security, has_ip + + def create_port(self, context, port, l2gw_port_check=False): + port_data = port['port'] + dhcp_opts = port_data.get(ext_edo.EXTRADHCPOPTS, []) - self._ensure_default_security_group_on_port(context, port) # TODO(salv-orlando): Undo logical switch creation on failure with context.session.begin(subtransactions=True): neutron_db = super(NsxV3Plugin, self).create_port(context, port) - self._process_portbindings_create_and_update(context, - port['port'], - neutron_db) - port["port"].update(neutron_db) - if not self._network_is_external( - context, port['port']['network_id']): - lport = self._create_port_at_the_backend( - context, neutron_db, port['port'], l2gw_port_check) - self._process_portbindings_create_and_update(context, - port['port'], - neutron_db) + (is_psec_on, has_ip) = self._create_port_preprocess_security( + context, port, port_data, neutron_db) + self._process_portbindings_create_and_update( + context, port['port'], port_data) + self._process_port_create_extra_dhcp_opts( + context, port_data, dhcp_opts) - neutron_db[pbin.VNIC_TYPE] = pbin.VNIC_NORMAL - if (pbin.PROFILE in port['port'] and - attributes.is_attr_set(port['port'][pbin.PROFILE])): - neutron_db[pbin.PROFILE] = port['port'][pbin.PROFILE] - self._process_port_create_extra_dhcp_opts(context, neutron_db, - dhcp_opts) + context.session.flush() + + if not self._network_is_external(context, port_data['network_id']): + lport = self._create_port_at_the_backend( + context, neutron_db, port_data, + l2gw_port_check, is_psec_on) + + # For some reason the port bindings DB mixin does not handle + # the VNIC_TYPE attribute, which is required by nova for + # setting up VIFs. + port_data[pbin.VNIC_TYPE] = pbin.VNIC_NORMAL sgids = self._get_security_groups_on_port(context, port) if sgids is not None: self._process_port_create_security_group( - context, neutron_db, sgids) + context, port_data, sgids) + #FIXME(abhiraut): Security group should not be processed for + # a port belonging to an external network. + # Below call will fail since there is no lport + # in the backend. security.update_lport_with_security_groups( context, lport['id'], [], sgids) - return neutron_db + return port_data def _pre_delete_port_check(self, context, port_id, l2gw_port_check): """Perform checks prior to deleting a port.""" @@ -521,21 +622,100 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2, return ret_val + def _update_port_preprocess_security( + self, context, port, id, updated_port): + delete_addr_pairs = self._check_update_deletes_allowed_address_pairs( + port) + has_addr_pairs = self._check_update_has_allowed_address_pairs(port) + has_security_groups = self._check_update_has_security_groups(port) + delete_security_groups = self._check_update_deletes_security_groups( + port) + + # populate port_security setting + if psec.PORTSECURITY not in port['port']: + updated_port[psec.PORTSECURITY] = \ + self._get_port_security_binding(context, id) + has_ip = self._ip_on_port(updated_port) + # validate port security and allowed address pairs + if not updated_port[psec.PORTSECURITY]: + # has address pairs in request + if has_addr_pairs: + raise addr_pair.AddressPairAndPortSecurityRequired() + elif not delete_addr_pairs: + # check if address pairs are in db + updated_port[addr_pair.ADDRESS_PAIRS] = ( + self.get_allowed_address_pairs(context, id)) + if updated_port[addr_pair.ADDRESS_PAIRS]: + raise addr_pair.AddressPairAndPortSecurityRequired() + + if delete_addr_pairs or has_addr_pairs: + # delete address pairs and read them in + self._delete_allowed_address_pairs(context, id) + self._process_create_allowed_address_pairs( + context, updated_port, + updated_port[addr_pair.ADDRESS_PAIRS]) + + # checks if security groups were updated adding/modifying + # security groups, port security is set and port has ip + if not (has_ip and updated_port[psec.PORTSECURITY]): + if has_security_groups: + raise psec.PortSecurityAndIPRequiredForSecurityGroups() + # Update did not have security groups passed in. Check + # that port does not have any security groups already on it. + filters = {'port_id': [id]} + security_groups = ( + super(NsxV3Plugin, self)._get_port_security_group_bindings( + context, filters) + ) + if security_groups and not delete_security_groups: + raise psec.PortSecurityPortHasSecurityGroup() + + if delete_security_groups or has_security_groups: + # delete the port binding and read it with the new rules. + self._delete_port_security_group_bindings(context, id) + sgids = self._get_security_groups_on_port(context, port) + self._process_port_create_security_group(context, updated_port, + sgids) + + if psec.PORTSECURITY in port['port']: + self._process_port_port_security_update( + context, port['port'], updated_port) + + return updated_port + def update_port(self, context, id, port): original_port = super(NsxV3Plugin, self).get_port(context, id) _, nsx_lport_id = nsx_db.get_nsx_switch_and_port_id( context.session, id) + with context.session.begin(subtransactions=True): updated_port = super(NsxV3Plugin, self).update_port(context, id, port) + + # copy values over - except fixed_ips as + # they've already been processed + port['port'].pop('fixed_ips', None) + updated_port.update(port['port']) + self._update_extra_dhcp_opts_on_port( + context, id, port, updated_port) + + updated_port = self._update_port_preprocess_security( + context, port, id, updated_port) + self._update_extra_dhcp_opts_on_port(context, id, port, updated_port) sec_grp_updated = self.update_security_group_on_port( context, id, port, original_port, updated_port) + (port_security, has_ip) = self._determine_port_security_and_has_ip( + context, updated_port) try: self._port_client.update( - nsx_lport_id, name=port['port'].get('name'), - admin_state=port['port'].get('admin_state_up')) + nsx_lport_id, name=updated_port.get('name'), + admin_state=updated_port.get('admin_state_up'), + address_bindings=self._build_address_bindings(updated_port), + switch_profile_ids=[self._get_port_security_profile_id()] + if port_security else None) + security.update_lport_with_security_groups( context, nsx_lport_id, original_port.get(ext_sg.SECURITYGROUPS, []), diff --git a/vmware_nsx/tests/unit/extensions/test_addresspairs.py b/vmware_nsx/tests/unit/extensions/test_addresspairs.py index 74767f33b4..c5ef6d507c 100644 --- a/vmware_nsx/tests/unit/extensions/test_addresspairs.py +++ b/vmware_nsx/tests/unit/extensions/test_addresspairs.py @@ -17,10 +17,12 @@ from neutron.extensions import allowedaddresspairs as addr_pair from neutron.tests.unit.db import test_allowedaddresspairs_db as ext_pairs from vmware_nsx.tests.unit.nsx_mh import test_plugin as test_nsx_plugin +from vmware_nsx.tests.unit.nsx_v3 import test_constants as v3_constants +from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_v3_plugin -class TestAllowedAddressPairs(test_nsx_plugin.NsxPluginV2TestCase, - ext_pairs.TestAllowedAddressPairs): +class TestAllowedAddressPairsNSXv2(test_nsx_plugin.NsxPluginV2TestCase, + ext_pairs.TestAllowedAddressPairs): # TODO(arosen): move to ext_pairs.TestAllowedAddressPairs once all # plugins do this correctly. @@ -33,3 +35,16 @@ class TestAllowedAddressPairs(test_nsx_plugin.NsxPluginV2TestCase, def test_create_port_security_false_allowed_address_pairs(self): self.skipTest('TBD') + + +class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin, + ext_pairs.TestAllowedAddressPairs): + + def setUp(self, plugin=v3_constants.PLUGIN_NAME, + ext_mgr=None, + service_plugins=None): + super(TestAllowedAddressPairsNSXv3, self).setUp( + plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins) + + def test_create_port_security_false_allowed_address_pairs(self): + self.skipTest('TBD') diff --git a/vmware_nsx/tests/unit/extensions/test_portsecurity.py b/vmware_nsx/tests/unit/extensions/test_portsecurity.py index 441bfe928a..9865a38274 100644 --- a/vmware_nsx/tests/unit/extensions/test_portsecurity.py +++ b/vmware_nsx/tests/unit/extensions/test_portsecurity.py @@ -19,10 +19,12 @@ from neutron.tests.unit.extensions import test_portsecurity as psec from vmware_nsx.common import sync from vmware_nsx.tests import unit as vmware from vmware_nsx.tests.unit.nsx_mh.apiclient import fake +from vmware_nsx.tests.unit.nsx_v3 import test_constants as v3_constants +from vmware_nsx.tests.unit.nsxlib.v3 import nsxlib_testcase from vmware_nsx.tests.unit import test_utils -class PortSecurityTestCase(psec.PortSecurityDBTestCase): +class PortSecurityTestCaseNSXv2(psec.PortSecurityDBTestCase): def setUp(self): test_utils.override_nsx_ini_test() @@ -36,11 +38,23 @@ class PortSecurityTestCase(psec.PortSecurityDBTestCase): patch_sync.start() instance.return_value.request.side_effect = self.fc.fake_request - super(PortSecurityTestCase, self).setUp(vmware.PLUGIN_NAME) + super(PortSecurityTestCaseNSXv2, self).setUp(vmware.PLUGIN_NAME) self.addCleanup(self.fc.reset_all) self.addCleanup(self.mock_nsx.stop) self.addCleanup(patch_sync.stop) -class TestPortSecurity(PortSecurityTestCase, psec.TestPortSecurity): +class TestPortSecurityNSXv2(PortSecurityTestCaseNSXv2, psec.TestPortSecurity): pass + + +class PortSecurityTestCaseNSXv3(nsxlib_testcase.NsxClientTestCase, + psec.PortSecurityDBTestCase): + def setUp(self, *args, **kwargs): + super(PortSecurityTestCaseNSXv3, self).setUp( + plugin=v3_constants.PLUGIN_NAME) + + +class TestPortSecurityNSXv3(PortSecurityTestCaseNSXv3, + psec.TestPortSecurity): + pass diff --git a/vmware_nsx/tests/unit/nsx_v3/test_constants.py b/vmware_nsx/tests/unit/nsx_v3/test_constants.py index 1a28f40763..086218c568 100644 --- a/vmware_nsx/tests/unit/nsx_v3/test_constants.py +++ b/vmware_nsx/tests/unit/nsx_v3/test_constants.py @@ -16,6 +16,7 @@ from oslo_utils import uuidutils +PLUGIN_NAME = 'vmware_nsx.plugins.nsx_v3.plugin.NsxV3Plugin' FAKE_NAME = "fake_name" FAKE_SWITCH_UUID = uuidutils.generate_uuid() diff --git a/vmware_nsx/tests/unit/nsx_v3/test_plugin.py b/vmware_nsx/tests/unit/nsx_v3/test_plugin.py index e4025b78f1..25f9ec5eca 100644 --- a/vmware_nsx/tests/unit/nsx_v3/test_plugin.py +++ b/vmware_nsx/tests/unit/nsx_v3/test_plugin.py @@ -23,6 +23,7 @@ from neutron.extensions import extraroute from neutron.extensions import l3 from neutron.extensions import l3_ext_gw_mode from neutron.extensions import providernet as pnet +from neutron.extensions import securitygroup as secgrp from neutron import manager from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin from neutron.tests.unit.extensions import test_extra_dhcp_opt as test_dhcpopts @@ -58,17 +59,17 @@ class NsxV3PluginTestCaseMixin(test_plugin.NeutronDbPluginV2TestCase, self.mock_api = nsx_v3_mocks.MockRequestSessionApi() self.client = nsx_client.NSX3Client() - mocked = nsxlib_testcase.NsxClientTestCase.mocked_session_module( - nsx_plugin.security.firewall, self.client, - mock_session=self.mock_api) - mocked.start() - self._patchers.append(mocked) + def mock_client_module(mod): + mocked = nsxlib_testcase.NsxClientTestCase.mocked_session_module( + mod, self.client, + mock_session=self.mock_api) + mocked.start() + self._patchers.append(mocked) + + mock_client_module(nsx_plugin.security.firewall) + mock_client_module(nsx_plugin.routerlib.nsxlib) + mock_client_module(nsx_plugin) - mocked = nsxlib_testcase.NsxClientTestCase.mocked_session_module( - nsx_plugin.routerlib.nsxlib, self.client, - mock_session=self.mock_api) - mocked.start() - self._patchers.append(mocked) super(NsxV3PluginTestCaseMixin, self).setUp(plugin=plugin, ext_mgr=ext_mgr) @@ -117,7 +118,24 @@ class TestNetworksV2(test_plugin.TestNetworksV2, NsxV3PluginTestCaseMixin): class TestPortsV2(test_plugin.TestPortsV2, NsxV3PluginTestCaseMixin): - pass + + def test_update_port_delete_ip(self): + # This test case overrides the default because the nsx plugin + # implements port_security/security groups and it is not allowed + # to remove an ip address from a port unless the security group + # is first removed. + with self.subnet() as subnet: + with self.port(subnet=subnet) as port: + data = {'port': {'admin_state_up': False, + 'fixed_ips': [], + secgrp.SECURITYGROUPS: []}} + req = self.new_update_request('ports', + data, port['port']['id']) + res = self.deserialize('json', req.get_response(self.api)) + self.assertEqual(res['port']['admin_state_up'], + data['port']['admin_state_up']) + self.assertEqual(res['port']['fixed_ips'], + data['port']['fixed_ips']) class TestSecurityGroups(ext_sg.TestSecurityGroups, NsxV3PluginTestCaseMixin): diff --git a/vmware_nsx/tests/unit/nsxlib/v3/nsxlib_testcase.py b/vmware_nsx/tests/unit/nsxlib/v3/nsxlib_testcase.py index 516e3b77b0..bab4d3d674 100644 --- a/vmware_nsx/tests/unit/nsxlib/v3/nsxlib_testcase.py +++ b/vmware_nsx/tests/unit/nsxlib/v3/nsxlib_testcase.py @@ -19,6 +19,7 @@ import types import unittest from oslo_config import cfg +from oslo_utils import uuidutils from vmware_nsx.nsxlib.v3 import client as nsx_client from vmware_nsx.tests.unit.nsx_v3 import mocks @@ -38,6 +39,9 @@ class NsxLibTestCase(unittest.TestCase): super(NsxLibTestCase, self).setUp() cfg.CONF.set_override('nsx_user', NSX_USER) cfg.CONF.set_override('nsx_password', NSX_PASSWORD) + cfg.CONF.set_override('default_tz_uuid', + uuidutils.generate_uuid()) + cfg.CONF.set_override('nsx_controllers', ['11.9.8.7', '11.9.8.77']) cfg.CONF.set_override('nsx_user', NSX_USER, 'nsx_v3') cfg.CONF.set_override('nsx_password', NSX_PASSWORD, 'nsx_v3') @@ -148,8 +152,17 @@ class NsxClientTestCase(NsxLibTestCase): return client_fn(*args, **kwargs) return _client + def _mock_client_init(*args, **kwargs): + return with_client + fn_map = {} for fn in BRIDGE_FNS: fn_map[fn] = _call_client(fn) - fn_map['NSX3Client'] = nsx_client.NSX3Client + + fn_map['NSX3Client'] = _mock_client_init + fn_map['JSONRESTClient'] = _mock_client_init + fn_map['RESTClient'] = _mock_client_init + + with_client.new_client_for = _mock_client_init + return cls.patch_client_module(in_module, fn_map) diff --git a/vmware_nsx/tests/unit/services/l2gateway/test_nsxv3_driver.py b/vmware_nsx/tests/unit/services/l2gateway/test_nsxv3_driver.py index 145ca241ca..2957954228 100644 --- a/vmware_nsx/tests/unit/services/l2gateway/test_nsxv3_driver.py +++ b/vmware_nsx/tests/unit/services/l2gateway/test_nsxv3_driver.py @@ -55,6 +55,11 @@ class TestNsxV3L2GatewayDriver(test_l2gw_db.L2GWTestCase, self.l2gw_plugin = l2gw_plugin.NsxL2GatewayPlugin() self.context = context.get_admin_context() + def _get_nw_data(self): + net_data = super(TestNsxV3L2GatewayDriver, self)._get_nw_data() + net_data['network']['port_security_enabled'] = True + return net_data + def test_nsxl2gw_driver_init(self): with mock.patch.object(nsx_v3_driver.NsxV3Driver, '_ensure_default_l2_gateway') as def_gw: