diff --git a/vmware_nsxlib/tests/unit/v3/test_security.py b/vmware_nsxlib/tests/unit/v3/test_security.py new file mode 100644 index 00000000..e46787dc --- /dev/null +++ b/vmware_nsxlib/tests/unit/v3/test_security.py @@ -0,0 +1,83 @@ +# Copyright (c) 2015 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from vmware_nsxlib.tests.unit.v3 import nsxlib_testcase + + +class TestNsxLibFirewallSection(nsxlib_testcase.NsxLibTestCase): + """Tests for vmware_nsxlib.v3.security.NsxLibFirewallSection""" + + def test_get_logicalport_reference(self): + mock_port = '3ed55c9f-f879-4048-bdd3-eded92465252' + result = self.nsxlib.firewall_section.get_logicalport_reference( + mock_port) + expected = { + 'target_id': '3ed55c9f-f879-4048-bdd3-eded92465252', + 'target_type': 'LogicalPort' + } + self.assertEqual(expected, result) + + def test_get_rule_address(self): + result = self.nsxlib.firewall_section.get_rule_address( + 'target-id', 'display-name') + expected = { + 'target_display_name': 'display-name', + 'target_id': 'target-id', + 'is_valid': True, + 'target_type': 'IPv4Address' + } + self.assertEqual(expected, result) + + def test_get_l4portset_nsservice(self): + result = self.nsxlib.firewall_section.get_l4portset_nsservice() + expected = { + 'service': { + 'resource_type': 'L4PortSetNSService', + 'source_ports': [], + 'destination_ports': [], + 'l4_protocol': 'TCP' + } + } + self.assertEqual(expected, result) + + def test_create_with_rules(self): + expected_body = { + 'display_name': 'display-name', + 'description': 'section-description', + 'stateful': True, + 'section_type': "LAYER3", + 'applied_tos': [], + 'rules': [{ + 'display_name': 'rule-name', + 'direction': 'IN_OUT', + 'ip_protocol': "IPV4_IPV6", + 'action': "ALLOW", + 'logged': False, + 'disabled': False, + 'sources': [], + 'destinations': [], + 'services': [] + }], + 'tags': [] + } + with mock.patch.object(self.nsxlib.client, 'create') as create: + rule = self.nsxlib.firewall_section.get_rule_dict('rule-name') + self.nsxlib.firewall_section.create_with_rules( + 'display-name', 'section-description', rules=[rule]) + resource = 'firewall/sections?operation=insert_bottom' \ + '&action=create_with_rules' + create.assert_called_with(resource, expected_body) diff --git a/vmware_nsxlib/v3/security.py b/vmware_nsxlib/v3/security.py index 89dd5a0e..0b2c99a8 100644 --- a/vmware_nsxlib/v3/security.py +++ b/vmware_nsxlib/v3/security.py @@ -316,6 +316,25 @@ class NsxLibFirewallSection(utils.NsxLibApiBase): resource += '&id=%s' % other_section return self.client.create(resource, body) + def create_with_rules(self, display_name, description, applied_tos=None, + tags=None, operation=consts.FW_INSERT_BOTTOM, + other_section=None, rules=None): + resource = 'firewall/sections?operation=%s' % operation + body = { + 'display_name': display_name, + 'description': description, + 'stateful': True, + 'section_type': consts.FW_SECTION_LAYER3, + 'applied_tos': applied_tos or [], + 'rules': rules or [], + 'tags': tags or [] + } + if rules: + resource += '&action=create_with_rules' + if other_section: + resource += '&id=%s' % other_section + return self.client.create(resource, body) + def update(self, section_id, display_name=None, description=None, applied_tos=None, rules=None, tags_update=None): # Using internal method so we can access max_attempts in the decorator @@ -363,6 +382,10 @@ class NsxLibFirewallSection(utils.NsxLibApiBase): return {'target_id': nsgroup_id, 'target_type': consts.NSGROUP} + def get_logicalport_reference(self, port_id): + return {'target_id': port_id, + 'target_type': consts.TARGET_TYPE_LOGICAL_PORT} + def get_ip_cidr_reference(self, ip_cidr_block, ip_protocol): target_type = (consts.TARGET_TYPE_IPV4ADDRESS if ip_protocol == consts.IPV4 @@ -370,6 +393,23 @@ class NsxLibFirewallSection(utils.NsxLibApiBase): return {'target_id': ip_cidr_block, 'target_type': target_type} + def get_rule_address(self, target_id, display_name=None, is_valid=True, + target_type=consts.TARGET_TYPE_IPV4ADDRESS): + return {'target_display_name': display_name or '', + 'target_id': target_id, + 'is_valid': is_valid, + 'target_type': target_type} + + def get_l4portset_nsservice(self, sources=None, destinations=None, + protocol=consts.TCP): + return { + 'service': { + 'resource_type': 'L4PortSetNSService', + 'source_ports': sources or [], + 'destination_ports': destinations or [], + 'l4_protocol': protocol} + } + def get_rule_dict(self, display_name, sources=None, destinations=None, direction=consts.IN_OUT, ip_protocol=consts.IPV4_IPV6, services=None, action=consts.FW_ACTION_ALLOW,