Browse Source

Add methods for firewall section and rule

1. get logical port applyto reference
2. get rule address
3. get l4 portset nsservice
4. create section with rules

Change-Id: I02003b64f6937f1200572cb07accd8b59be19544
changes/12/438812/4 0.7.3
Danting Liu 5 years ago
parent
commit
576bac2ae0
  1. 83
      vmware_nsxlib/tests/unit/v3/test_security.py
  2. 40
      vmware_nsxlib/v3/security.py

83
vmware_nsxlib/tests/unit/v3/test_security.py

@ -0,0 +1,83 @@
# Copyright (c) 2015 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from vmware_nsxlib.tests.unit.v3 import nsxlib_testcase
class TestNsxLibFirewallSection(nsxlib_testcase.NsxLibTestCase):
"""Tests for vmware_nsxlib.v3.security.NsxLibFirewallSection"""
def test_get_logicalport_reference(self):
mock_port = '3ed55c9f-f879-4048-bdd3-eded92465252'
result = self.nsxlib.firewall_section.get_logicalport_reference(
mock_port)
expected = {
'target_id': '3ed55c9f-f879-4048-bdd3-eded92465252',
'target_type': 'LogicalPort'
}
self.assertEqual(expected, result)
def test_get_rule_address(self):
result = self.nsxlib.firewall_section.get_rule_address(
'target-id', 'display-name')
expected = {
'target_display_name': 'display-name',
'target_id': 'target-id',
'is_valid': True,
'target_type': 'IPv4Address'
}
self.assertEqual(expected, result)
def test_get_l4portset_nsservice(self):
result = self.nsxlib.firewall_section.get_l4portset_nsservice()
expected = {
'service': {
'resource_type': 'L4PortSetNSService',
'source_ports': [],
'destination_ports': [],
'l4_protocol': 'TCP'
}
}
self.assertEqual(expected, result)
def test_create_with_rules(self):
expected_body = {
'display_name': 'display-name',
'description': 'section-description',
'stateful': True,
'section_type': "LAYER3",
'applied_tos': [],
'rules': [{
'display_name': 'rule-name',
'direction': 'IN_OUT',
'ip_protocol': "IPV4_IPV6",
'action': "ALLOW",
'logged': False,
'disabled': False,
'sources': [],
'destinations': [],
'services': []
}],
'tags': []
}
with mock.patch.object(self.nsxlib.client, 'create') as create:
rule = self.nsxlib.firewall_section.get_rule_dict('rule-name')
self.nsxlib.firewall_section.create_with_rules(
'display-name', 'section-description', rules=[rule])
resource = 'firewall/sections?operation=insert_bottom' \
'&action=create_with_rules'
create.assert_called_with(resource, expected_body)

40
vmware_nsxlib/v3/security.py

@ -316,6 +316,25 @@ class NsxLibFirewallSection(utils.NsxLibApiBase):
resource += '&id=%s' % other_section
return self.client.create(resource, body)
def create_with_rules(self, display_name, description, applied_tos=None,
tags=None, operation=consts.FW_INSERT_BOTTOM,
other_section=None, rules=None):
resource = 'firewall/sections?operation=%s' % operation
body = {
'display_name': display_name,
'description': description,
'stateful': True,
'section_type': consts.FW_SECTION_LAYER3,
'applied_tos': applied_tos or [],
'rules': rules or [],
'tags': tags or []
}
if rules:
resource += '&action=create_with_rules'
if other_section:
resource += '&id=%s' % other_section
return self.client.create(resource, body)
def update(self, section_id, display_name=None, description=None,
applied_tos=None, rules=None, tags_update=None):
# Using internal method so we can access max_attempts in the decorator
@ -363,6 +382,10 @@ class NsxLibFirewallSection(utils.NsxLibApiBase):
return {'target_id': nsgroup_id,
'target_type': consts.NSGROUP}
def get_logicalport_reference(self, port_id):
return {'target_id': port_id,
'target_type': consts.TARGET_TYPE_LOGICAL_PORT}
def get_ip_cidr_reference(self, ip_cidr_block, ip_protocol):
target_type = (consts.TARGET_TYPE_IPV4ADDRESS
if ip_protocol == consts.IPV4
@ -370,6 +393,23 @@ class NsxLibFirewallSection(utils.NsxLibApiBase):
return {'target_id': ip_cidr_block,
'target_type': target_type}
def get_rule_address(self, target_id, display_name=None, is_valid=True,
target_type=consts.TARGET_TYPE_IPV4ADDRESS):
return {'target_display_name': display_name or '',
'target_id': target_id,
'is_valid': is_valid,
'target_type': target_type}
def get_l4portset_nsservice(self, sources=None, destinations=None,
protocol=consts.TCP):
return {
'service': {
'resource_type': 'L4PortSetNSService',
'source_ports': sources or [],
'destination_ports': destinations or [],
'l4_protocol': protocol}
}
def get_rule_dict(self, display_name, sources=None, destinations=None,
direction=consts.IN_OUT, ip_protocol=consts.IPV4_IPV6,
services=None, action=consts.FW_ACTION_ALLOW,

Loading…
Cancel
Save