Migrate from neutronclient to SDK

Related-Bug: #1999774
Change-Id: I6a1d643c7344ceb2e52e52089bc8a8b730a75fd0
Signed-off-by: lajoskatona <lajos.katona@est.tech>
This commit is contained in:
lajoskatona
2025-09-09 17:58:38 +02:00
committed by Lajos Katona
parent 9a44be6633
commit 2b2f018082
4 changed files with 246 additions and 302 deletions

View File

@@ -17,7 +17,7 @@ import openstack_dashboard.api.nova as nova
from openstack_dashboard.contrib.developer.profiler import api as profiler
neutronclient = neutron.neutronclient
networkclient = neutron.networkclient
class Port(neutron.NeutronAPIDictWrapper):
@@ -52,25 +52,21 @@ def rule_create(request, **kwargs):
:param enabled: boolean (default true)
:return: Rule object
"""
body = {'firewall_rule': kwargs}
rule = neutronclient(request).create_fwaas_firewall_rule(
body).get('firewall_rule')
return Rule(rule)
rule = networkclient(request).create_firewall_rule(**kwargs)
return Rule(rule.to_dict())
@profiler.trace
def get_network_names(request):
networks = neutronclient(request).list_networks(fields=["name", "id"])\
.get('networks', [])
mapped = {n['id']: neutron.Network(n) for n in networks}
networks = networkclient(request).networks()
mapped = {n.id: neutron.Network(n.to_dict()) for n in networks}
return mapped
@profiler.trace
def get_router_names(request):
routers = neutronclient(request).list_routers(fields=["name", "id"])\
.get('routers', [])
mapped = {r['id']: neutron.Router(r) for r in routers}
routers = networkclient(request).routers()
mapped = {r.id: neutron.Router(r.to_dict()) for r in routers}
return mapped
@@ -89,39 +85,37 @@ def rule_list(request, **kwargs):
@profiler.trace
def port_list(request, tenant_id, **kwargs):
kwargs['tenant_id'] = tenant_id
ports = neutronclient(request).list_ports(**kwargs).get('ports')
ports = networkclient(request).ports(**kwargs)
return {
p['id']: Port(p) for p in ports if _is_target(p)
p.id: Port(p.to_dict()) for p in ports if _is_target(p.to_dict())
}
# Gets ids of all ports assigned to firewall groups
@profiler.trace
def fwg_port_list(request, **kwargs):
fwgs = neutronclient(request).list_fwaas_firewall_groups(
**kwargs).get('firewall_groups')
fwgs = networkclient(request).firewall_groups(**kwargs)
ports = set()
for fwg in fwgs:
if fwg['ports']:
ports.update(fwg['ports'])
if fwg.ports:
ports.update(fwg.ports)
return ports
@profiler.trace
def fwg_port_list_for_tenant(request, tenant_id, **kwargs):
kwargs['tenant_id'] = tenant_id
ports = neutronclient(request).list_ports(**kwargs).get('ports')
ports = networkclient(request).ports(**kwargs)
# TODO(SarathMekala): Remove ports which are already associated with a FWG
fwgs = neutronclient(request).list_fwaas_firewall_groups(
**kwargs).get('firewall_groups')
fwgs = networkclient(request).firewall_groups(**kwargs)
fwg_ports = []
for fwg in fwgs:
if not fwg['ports']:
if not fwg.ports:
continue
fwg_ports += fwg['ports']
return [Port(p) for p in ports
if _is_target(p) and p['id'] not in fwg_ports]
fwg_ports += fwg.ports
return [Port(p.to_dict()) for p in ports
if _is_target(p.to_dict()) and p.id not in fwg_ports]
def _is_target(port):
@@ -145,9 +139,8 @@ def rule_list_for_tenant(request, tenant_id, **kwargs):
def _rule_list(request, **kwargs):
rules = neutronclient(request).list_fwaas_firewall_rules(
**kwargs).get('firewall_rules')
return [Rule(r) for r in rules]
rules = networkclient(request).firewall_rules(**kwargs)
return [Rule(r.to_dict()) for r in rules]
@profiler.trace
@@ -156,22 +149,20 @@ def rule_get(request, rule_id):
def _rule_get(request, rule_id):
rule = neutronclient(request).show_fwaas_firewall_rule(
rule_id).get('firewall_rule')
return Rule(rule)
rule = networkclient(request).get_firewall_rule(rule_id)
return Rule(rule.to_dict())
@profiler.trace
def rule_delete(request, rule_id):
neutronclient(request).delete_fwaas_firewall_rule(rule_id)
networkclient(request).delete_firewall_rule(rule_id)
@profiler.trace
def rule_update(request, rule_id, **kwargs):
body = {'firewall_rule': kwargs}
rule = neutronclient(request).update_fwaas_firewall_rule(
rule_id, body).get('firewall_rule')
return Rule(rule)
rule = networkclient(request).update_firewall_rule(
rule_id, **kwargs)
return Rule(rule.to_dict())
@profiler.trace
@@ -186,10 +177,8 @@ def policy_create(request, **kwargs):
:param audited: boolean (default false)
:return: Policy object
"""
body = {'firewall_policy': kwargs}
policy = neutronclient(request).create_fwaas_firewall_policy(
body).get('firewall_policy')
return Policy(policy)
policy = networkclient(request).create_firewall_policy(**kwargs)
return Policy(policy.to_dict())
@profiler.trace
@@ -212,8 +201,8 @@ def policy_list_for_tenant(request, tenant_id, **kwargs):
def _policy_list(request, expand_rule, **kwargs):
policies = neutronclient(request).list_fwaas_firewall_policies(
**kwargs).get('firewall_policies')
policies = [p.to_dict() for p in
networkclient(request).firewall_policies(**kwargs)]
if expand_rule and policies:
rules = _rule_list(request)
rule_dict = collections.OrderedDict((rule.id, rule) for rule in rules)
@@ -228,8 +217,7 @@ def policy_get(request, policy_id):
def _policy_get(request, policy_id, expand_rule):
policy = neutronclient(request).show_fwaas_firewall_policy(
policy_id).get('firewall_policy')
policy = networkclient(request).get_firewall_policy(policy_id).to_dict()
if expand_rule:
policy_rules = policy['firewall_rules']
if policy_rules:
@@ -244,29 +232,28 @@ def _policy_get(request, policy_id, expand_rule):
@profiler.trace
def policy_delete(request, policy_id):
neutronclient(request).delete_fwaas_firewall_policy(policy_id)
networkclient(request).delete_firewall_policy(policy_id)
@profiler.trace
def policy_update(request, policy_id, **kwargs):
body = {'firewall_policy': kwargs}
policy = neutronclient(request).update_fwaas_firewall_policy(
policy_id, body).get('firewall_policy')
return Policy(policy)
policy = networkclient(request).update_firewall_policy(
policy_id, **kwargs)
return Policy(policy.to_dict())
@profiler.trace
def policy_insert_rule(request, policy_id, **kwargs):
policy = neutronclient(request).insert_rule_fwaas_firewall_policy(
policy_id, kwargs)
return Policy(policy)
policy = networkclient(request).insert_rule_into_policy(
policy_id, **kwargs)
return Policy(policy.to_dict())
@profiler.trace
def policy_remove_rule(request, policy_id, **kwargs):
policy = neutronclient(request).remove_rule_fwaas_firewall_policy(
policy_id, kwargs)
return Policy(policy)
policy = networkclient(request).remove_rule_from_policy(
policy_id, **kwargs)
return Policy(policy.to_dict())
@profiler.trace
@@ -281,9 +268,8 @@ def firewall_group_create(request, **kwargs):
:param admin_state_up: boolean (default true)
:return: Firewall group object
"""
body = {'firewall_group': kwargs}
firewall_group = neutronclient(request).create_fwaas_firewall_group(body)
return FirewallGroup(firewall_group['firewall_group'])
firewall_group = networkclient(request).create_firewall_group(**kwargs)
return FirewallGroup(firewall_group.to_dict())
@profiler.trace
@@ -307,9 +293,8 @@ def firewall_group_list_for_tenant(request, tenant_id, **kwargs):
# TODO(SarathMekala): Support expand_policy for _firewall_group_list
def _firewall_group_list(request, **kwargs):
firewall_groups = neutronclient(request).list_fwaas_firewall_groups(
**kwargs).get('firewall_groups')
return [FirewallGroup(f) for f in firewall_groups]
firewall_groups = networkclient(request).firewall_groups(**kwargs)
return [FirewallGroup(f.to_dict()) for f in firewall_groups]
@profiler.trace
@@ -318,8 +303,8 @@ def firewall_group_get(request, firewallgroup_id):
def _firewall_group_get(request, firewallgroup_id, expand_policy):
firewall_group = neutronclient(request).show_fwaas_firewall_group(
firewallgroup_id).get('firewall_group')
firewall_group = networkclient(request).get_firewall_group(
firewallgroup_id).to_dict()
if expand_policy:
ingress_policy_id = firewall_group['ingress_firewall_policy_id']
if ingress_policy_id:
@@ -339,12 +324,11 @@ def _firewall_group_get(request, firewallgroup_id, expand_policy):
@profiler.trace
def firewall_group_delete(request, firewallgroup_id):
neutronclient(request).delete_fwaas_firewall_group(firewallgroup_id)
networkclient(request).delete_firewall_group(firewallgroup_id)
@profiler.trace
def firewall_group_update(request, firewallgroup_id, **kwargs):
body = {'firewall_group': kwargs}
firewall_group = neutronclient(request).update_fwaas_firewall_group(
firewallgroup_id, body).get('firewall_group')
return FirewallGroup(firewall_group)
firewall_group = networkclient(request).update_firewall_group(
firewallgroup_id, **kwargs)
return FirewallGroup(firewall_group.to_dict())

View File

@@ -14,7 +14,12 @@
from unittest import mock
from neutronclient.v2_0.client import Client as neutronclient
from openstack.network.v2._proxy import Proxy as networkclient
from openstack.network.v2 import firewall_group as sdk_fw_group
from openstack.network.v2 import firewall_policy as sdk_fw_policy
from openstack.network.v2 import network as sdk_net
from openstack.network.v2 import port as sdk_port
from openstack.network.v2 import router as sdk_router
import openstack_dashboard.api.nova as nova
from openstack_dashboard.test import helpers
@@ -70,100 +75,68 @@ class FwaasV2ApiTests(test.APITestCase):
setattr(server, key, val)
return server
@helpers.create_mocks({neutronclient: ('list_networks',)})
@helpers.create_mocks({networkclient: ('networks',)})
def test_get_networks(self):
fields = ['name', 'id']
mock_networks = {
'64e8c993-1c99-40fb-a8bc-42d3fd487a97': {
'64e8c993-1c99-40fb-a8bc-42d3fd487a97': sdk_net.Network(**{
'name': 'mock-network-1',
'id': '64e8c993-1c99-40fb-a8bc-42d3fd487a97'
},
'f1bd4bb5-2bf3-4e0e-9c8d-9a1a500eaece': {
}),
'f1bd4bb5-2bf3-4e0e-9c8d-9a1a500eaece': sdk_net.Network(**{
'name': 'mock-network-2',
'id': 'f1bd4bb5-2bf3-4e0e-9c8d-9a1a500eaece'
},
'74173cf1-461e-4fd0-881e-2a0cc4a94e14': {
}),
'74173cf1-461e-4fd0-881e-2a0cc4a94e14': sdk_net.Network(**{
'name': 'mock-network-3',
'id': '74173cf1-461e-4fd0-881e-2a0cc4a94e14'
}
})
}
mock_network_ids = sorted(mock_networks.keys())
self.mock_list_networks.return_value = {
'networks': list(mock_networks.values())
}
self.mock_networks.return_value = list(mock_networks.values())
network_names = api_fwaas_v2.get_network_names(self.request)
self.mock_list_networks.assert_called_once_with(fields=fields)
self.mock_networks.assert_called_once_with()
network_ids = sorted(network_names.keys())
self.assertEqual(network_ids, mock_network_ids)
for key in mock_network_ids:
self._assert_api_dict(
network_names[key]._apidict,
mock_networks[key],
fields
)
@helpers.create_mocks({neutronclient: ('list_routers',)})
@helpers.create_mocks({networkclient: ('routers',)})
def test_get_router_names(self):
fields = ['name', 'id']
mock_routers = {
'9d143b82-bd74-4ccf-81ba-9b7e02f3f7b2': {
'9d143b82-bd74-4ccf-81ba-9b7e02f3f7b2': sdk_router.Router(**{
'name': 'mock-router-1',
'id': '9d143b82-bd74-4ccf-81ba-9b7e02f3f7b2'
},
'84d72522-1c26-4d28-83ed-b8653ac5d38c': {
}),
'84d72522-1c26-4d28-83ed-b8653ac5d38c': sdk_router.Router(**{
'name': 'mock-router-2',
'id': '84d72522-1c26-4d28-83ed-b8653ac5d38c'
},
'2149de19-840a-4b41-8a44-4755ce8a881b': {
}),
'2149de19-840a-4b41-8a44-4755ce8a881b': sdk_router.Router(**{
'name': 'mock-router-3',
'id': '2149de19-840a-4b41-8a44-4755ce8a881b'
}
})
}
mock_router_ids = sorted(mock_routers.keys())
sdk_routers = [sdk_router.Router(**r) for r in mock_routers.values()]
# Mock API call
self.mock_list_routers.return_value = {
'routers': list(mock_routers.values())
}
self.mock_routers.return_value = sdk_routers
# call results
router_names = api_fwaas_v2.get_router_names(self.request)
# Check that the correct filters were applied for the API call
self.mock_list_routers.assert_called_once_with(fields=fields)
self.mock_routers.assert_called_once_with()
# Ensure that exactly the expected mock data ids have been retrieved
router_ids = sorted(router_names.keys())
self.assertEqual(router_ids, mock_router_ids)
# Check that the returned values correspond to the (mocked) API data
for key in mock_router_ids:
# Note that _apidict is being checked
self._assert_api_dict(
router_names[key]._apidict,
mock_routers[key],
fields
)
def _assert_api_dict(self, actual, expected, fields):
# Ensure exactly the required fields have been retrieved
actual_fields = sorted(actual.keys())
self.assertEqual(actual_fields, sorted(fields))
# Ensure expected datum was returned in each field
for field in fields:
self.assertEqual(actual[field], expected[field])
@helpers.create_mocks({neutronclient: ('create_fwaas_firewall_rule',)})
@helpers.create_mocks({networkclient: ('create_firewall_rule',)})
def test_rule_create(self):
rule1 = self.fw_rules_v2.first()
rule1_dict = self.api_fw_rules_v2.first()
rule1_dict = self.api_fw_rules_v2_sdk[0]
form_data = {'name': rule1.name,
'description': rule1.description,
'protocol': rule1.protocol,
@@ -175,40 +148,38 @@ class FwaasV2ApiTests(test.APITestCase):
'shared': rule1.shared,
'enabled': rule1.enabled
}
form_dict = {'firewall_rule': form_data}
ret_dict = {'firewall_rule': rule1_dict}
self.mock_create_fwaas_firewall_rule.return_value = ret_dict
self.mock_create_firewall_rule.return_value = rule1_dict
ret_val = api_fwaas_v2.rule_create(self.request, **form_data)
self._assert_rule_return_value(ret_val, rule1)
self.mock_create_fwaas_firewall_rule.assert_called_once_with(form_dict)
self.mock_create_firewall_rule.assert_called_once_with(**form_data)
def _assert_rule_return_value(self, ret_val, exp_rule):
self.assertIsInstance(ret_val, api_fwaas_v2.Rule)
self.assertEqual(exp_rule.name, ret_val.name)
self.assertTrue(ret_val.id)
@helpers.create_mocks({neutronclient: ('list_fwaas_firewall_rules',)})
@helpers.create_mocks({networkclient: ('firewall_rules',)})
def test_rule_list(self):
exp_rules = self.fw_rules_v2.list()
api_rules = {'firewall_rules': self.api_fw_rules_v2.list()}
api_rules = self.api_fw_rules_v2_sdk
self.mock_list_fwaas_firewall_rules.return_value = api_rules
self.mock_firewall_rules.return_value = api_rules
ret_val = api_fwaas_v2.rule_list(self.request)
for (v, d) in zip(ret_val, exp_rules):
self._assert_rule_return_value(v, d)
self.mock_list_fwaas_firewall_rules.assert_called_once_with()
self.mock_firewall_rules.assert_called_once_with()
@helpers.create_mocks({neutronclient: ('list_fwaas_firewall_rules',)})
@helpers.create_mocks({networkclient: ('firewall_rules',)})
def test_rule_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_rules = self.fw_rules_v2.list()
api_rules = {'firewall_rules': self.api_fw_rules_v2.list()}
api_rules = self.api_fw_rules_v2_sdk
self.mock_list_fwaas_firewall_rules.side_effect = [
{'firewall_rules': []},
self.mock_firewall_rules.side_effect = [
[],
api_rules,
]
@@ -216,28 +187,28 @@ class FwaasV2ApiTests(test.APITestCase):
for (v, d) in zip(ret_val, exp_rules):
self._assert_rule_return_value(v, d)
self.assertEqual(2, self.mock_list_fwaas_firewall_rules.call_count)
self.mock_list_fwaas_firewall_rules.assert_has_calls([
self.assertEqual(2, self.mock_firewall_rules.call_count)
self.mock_firewall_rules.assert_has_calls([
mock.call(tenant_id=tenant_id, shared=False),
mock.call(shared=True),
])
@helpers.create_mocks({neutronclient: ('show_fwaas_firewall_rule',)})
@helpers.create_mocks({networkclient: ('get_firewall_rule',)})
def test_rule_get(self):
exp_rule = self.fw_rules_v2.first()
ret_dict = {'firewall_rule': self.api_fw_rules_v2.first()}
rule_dict = self.api_fw_rules_v2_sdk[0]
self.mock_show_fwaas_firewall_rule.return_value = ret_dict
self.mock_get_firewall_rule.return_value = rule_dict
ret_val = api_fwaas_v2.rule_get(self.request, exp_rule.id)
self._assert_rule_return_value(ret_val, exp_rule)
self.mock_show_fwaas_firewall_rule.assert_called_once_with(exp_rule.id)
self.mock_get_firewall_rule.assert_called_once_with(exp_rule.id)
@helpers.create_mocks({neutronclient: ('update_fwaas_firewall_rule',)})
@helpers.create_mocks({networkclient: ('update_firewall_rule',)})
def test_rule_update(self):
rule = self.fw_rules_v2.first()
rule_dict = self.api_fw_rules_v2.first()
rule_dict = self.api_fw_rules_v2_sdk[0]
rule.name = 'new name'
rule.description = 'new desc'
@@ -260,22 +231,20 @@ class FwaasV2ApiTests(test.APITestCase):
'shared': rule.shared,
'enabled': rule.enabled
}
form_dict = {'firewall_rule': form_data}
ret_dict = {'firewall_rule': rule_dict}
self.mock_update_fwaas_firewall_rule.return_value = ret_dict
self.mock_update_firewall_rule.return_value = rule_dict
ret_val = api_fwaas_v2.rule_update(self.request,
rule.id, **form_data)
self._assert_rule_return_value(ret_val, rule)
self.mock_update_fwaas_firewall_rule.assert_called_once_with(
rule.id, form_dict)
self.mock_update_firewall_rule.assert_called_once_with(
rule.id, **form_data)
@helpers.create_mocks({neutronclient: ('create_fwaas_firewall_policy', )})
@helpers.create_mocks({networkclient: ('create_firewall_policy', )})
def test_policy_create(self):
policy1 = self.fw_policies_v2.first()
policy1_dict = self.api_fw_policies_v2.first()
policy1_dict = self.api_fw_policies_v2_sdk[0]
print(policy1_dict)
form_data = {'name': policy1.name,
'description': policy1.description,
@@ -283,18 +252,15 @@ class FwaasV2ApiTests(test.APITestCase):
'shared': policy1.shared,
'audited': policy1.audited
}
form_dict = {'firewall_policy': form_data}
ret_dict = {'firewall_policy': policy1_dict}
self.mock_create_fwaas_firewall_policy.return_value = ret_dict
self.mock_create_firewall_policy.return_value = policy1_dict
ret_val = api_fwaas_v2.policy_create(self.request, **form_data)
self.assertIsInstance(ret_val, api_fwaas_v2.Policy)
self.assertEqual(policy1.name, ret_val.name)
self.assertTrue(ret_val.id)
self.mock_create_fwaas_firewall_policy.assert_called_once_with(
form_dict)
self.mock_create_firewall_policy.assert_called_once_with(
**form_data)
def _assert_policy_return_value(self, ret_val, exp_policy):
self.assertIsInstance(ret_val, api_fwaas_v2.Policy)
@@ -306,78 +272,75 @@ class FwaasV2ApiTests(test.APITestCase):
for (r, exp_r) in zip(ret_val.rules, exp_policy.rules):
self.assertEqual(exp_r.id, r.id)
@helpers.create_mocks({neutronclient: ('list_fwaas_firewall_policies',
'list_fwaas_firewall_rules')})
@helpers.create_mocks({networkclient: ('firewall_policies',
'firewall_rules')})
def test_policy_list(self):
exp_policies = self.fw_policies_v2.list()
policies_dict = {'firewall_policies': self.api_fw_policies_v2.list()}
rules_dict = {'firewall_rules': self.api_fw_rules_v2.list()}
policies_dict = self.api_fw_policies_v2_sdk
rules_dict = self.api_fw_rules_v2_sdk
self.mock_list_fwaas_firewall_policies.return_value = policies_dict
self.mock_list_fwaas_firewall_rules.return_value = rules_dict
self.mock_firewall_policies.return_value = policies_dict
self.mock_firewall_rules.return_value = rules_dict
ret_val = api_fwaas_v2.policy_list(self.request)
for (v, d) in zip(ret_val, exp_policies):
self._assert_policy_return_value(v, d)
self.mock_list_fwaas_firewall_policies.assert_called_once_with()
self.mock_list_fwaas_firewall_rules.assert_called_once_with()
self.mock_firewall_policies.assert_called_once_with()
self.mock_firewall_rules.assert_called_once_with()
@helpers.create_mocks({neutronclient: ('list_fwaas_firewall_policies',
'list_fwaas_firewall_rules')})
@helpers.create_mocks({networkclient: ('firewall_policies',
'firewall_rules')})
def test_policy_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_policies = self.fw_policies_v2.list()
policies_dict = {'firewall_policies': self.api_fw_policies_v2.list()}
rules_dict = {'firewall_rules': self.api_fw_rules_v2.list()}
policies_dict = self.api_fw_policies_v2_sdk
rules_dict = self.api_fw_rules_v2_sdk
self.mock_list_fwaas_firewall_policies.side_effect = [
{'firewall_policies': []},
self.mock_firewall_policies.side_effect = [
[],
policies_dict,
]
self.mock_list_fwaas_firewall_rules.return_value = rules_dict
self.mock_firewall_rules.return_value = rules_dict
ret_val = api_fwaas_v2.policy_list_for_tenant(self.request, tenant_id)
for (v, d) in zip(ret_val, exp_policies):
self._assert_policy_return_value(v, d)
self.assertEqual(2, self.mock_list_fwaas_firewall_policies.call_count)
self.mock_list_fwaas_firewall_policies.assert_has_calls([
self.assertEqual(2, self.mock_firewall_policies.call_count)
self.mock_firewall_policies.assert_has_calls([
mock.call(tenant_id=tenant_id, shared=False),
mock.call(shared=True),
])
self.mock_list_fwaas_firewall_rules.assert_called_once_with()
self.mock_firewall_rules.assert_called_once_with()
@helpers.create_mocks({neutronclient: ('show_fwaas_firewall_policy',
'list_fwaas_firewall_rules')})
@helpers.create_mocks({networkclient: ('get_firewall_policy',
'firewall_rules')})
def test_policy_get(self):
exp_policy = self.fw_policies_v2.first()
policy_dict = self.api_fw_policies_v2.first()
policy_dict = self.api_fw_policies_v2_sdk[0]
# The first two rules are associated with the first policy.
api_rules = self.api_fw_rules_v2.list()[:2]
api_rules = self.api_fw_rules_v2_sdk[:2]
ret_dict = {'firewall_policy': policy_dict}
self.mock_show_fwaas_firewall_policy.return_value = ret_dict
self.mock_get_firewall_policy.return_value = policy_dict
filters = {'firewall_policy_id': exp_policy.id}
ret_dict = {'firewall_rules': api_rules}
self.mock_list_fwaas_firewall_rules.return_value = ret_dict
self.mock_firewall_rules.return_value = api_rules
ret_val = api_fwaas_v2.policy_get(self.request, exp_policy.id)
self._assert_policy_return_value(ret_val, exp_policy)
self.mock_show_fwaas_firewall_policy.assert_called_once_with(
self.mock_get_firewall_policy.assert_called_once_with(
exp_policy.id)
self.mock_list_fwaas_firewall_rules.assert_called_once_with(**filters)
self.mock_firewall_rules.assert_called_once_with(**filters)
@helpers.create_mocks({neutronclient: ('show_fwaas_firewall_policy',)})
@helpers.create_mocks({networkclient: ('get_firewall_policy',)})
def test_policy_get_no_rule(self):
# 2nd policy is not associated with any rules.
exp_policy = self.fw_policies_v2.list()[1]
policy_dict = self.api_fw_policies_v2.list()[1]
policy_dict = self.api_fw_policies_v2_sdk[1]
ret_dict = {'firewall_policy': policy_dict}
self.mock_show_fwaas_firewall_policy.return_value = ret_dict
self.mock_get_firewall_policy.return_value = policy_dict
ret_val = api_fwaas_v2.policy_get(self.request, exp_policy.id)
self.assertIsInstance(ret_val, api_fwaas_v2.Policy)
@@ -385,13 +348,13 @@ class FwaasV2ApiTests(test.APITestCase):
self.assertTrue(ret_val.id)
self.assertFalse(len(ret_val.rules))
self.mock_show_fwaas_firewall_policy.assert_called_once_with(
self.mock_get_firewall_policy.assert_called_once_with(
exp_policy.id)
@helpers.create_mocks({neutronclient: ('update_fwaas_firewall_policy',)})
@helpers.create_mocks({networkclient: ('update_firewall_policy',)})
def test_policy_update(self):
policy = self.fw_policies_v2.first()
policy_dict = self.api_fw_policies_v2.first()
policy_dict = self.api_fw_policies_v2_sdk[0]
policy.name = 'new name'
policy.description = 'new desc'
@@ -409,10 +372,7 @@ class FwaasV2ApiTests(test.APITestCase):
'audited': policy.audited
}
form_dict = {'firewall_policy': form_data}
ret_dict = {'firewall_policy': policy_dict}
self.mock_update_fwaas_firewall_policy.return_value = ret_dict
self.mock_update_firewall_policy.return_value = policy_dict
ret_val = api_fwaas_v2.policy_update(self.request,
policy.id, **form_data)
@@ -420,14 +380,14 @@ class FwaasV2ApiTests(test.APITestCase):
self.assertEqual(policy.name, ret_val.name)
self.assertTrue(ret_val.id)
self.mock_update_fwaas_firewall_policy.assert_called_once_with(
policy.id, form_dict)
self.mock_update_firewall_policy.assert_called_once_with(
policy.id, **form_data)
@helpers.create_mocks(
{neutronclient: ('insert_rule_fwaas_firewall_policy',)})
{networkclient: ('insert_rule_into_policy',)})
def test_policy_insert_rule(self):
policy = self.fw_policies_v2.first()
policy_dict = self.api_fw_policies_v2.first()
policy_dict = self.api_fw_policies_v2_sdk[0]
new_rule_id = 'h0881d38-c3eb-4fee-9763-12de3338041d'
policy.firewall_rules.append(new_rule_id)
@@ -437,37 +397,37 @@ class FwaasV2ApiTests(test.APITestCase):
'insert_before': policy.firewall_rules[1],
'insert_after': policy.firewall_rules[0]}
self.mock_insert_rule_fwaas_firewall_policy.return_value = policy_dict
self.mock_insert_rule_into_policy.return_value = policy_dict
ret_val = api_fwaas_v2.policy_insert_rule(self.request,
policy.id, **body)
self.assertIn(new_rule_id, ret_val.firewall_rules)
self.mock_insert_rule_fwaas_firewall_policy.assert_called_once_with(
policy.id, body)
self.mock_insert_rule_into_policy.assert_called_once_with(
policy.id, **body)
@helpers.create_mocks(
{neutronclient: ('remove_rule_fwaas_firewall_policy',)})
{networkclient: ('remove_rule_from_policy',)})
def test_policy_remove_rule(self):
policy = self.fw_policies_v2.first()
policy_dict = self.api_fw_policies_v2.first()
policy_dict = self.api_fw_policies_v2_sdk[0]
remove_rule_id = policy.firewall_rules[0]
policy_dict['firewall_rules'].remove(remove_rule_id)
body = {'firewall_rule_id': remove_rule_id}
self.mock_remove_rule_fwaas_firewall_policy.return_value = policy_dict
self.mock_remove_rule_from_policy.return_value = policy_dict
ret_val = api_fwaas_v2.policy_remove_rule(self.request,
policy.id, **body)
self.assertNotIn(remove_rule_id, ret_val.firewall_rules)
self.mock_remove_rule_fwaas_firewall_policy.assert_called_once_with(
policy.id, body)
self.mock_remove_rule_from_policy.assert_called_once_with(
policy.id, **body)
@helpers.create_mocks({neutronclient: ('create_fwaas_firewall_group', )})
@helpers.create_mocks({networkclient: ('create_firewall_group', )})
def test_firewall_group_create(self):
firewall_group = self.firewall_groups_v2.first()
firewall_group_dict = self.api_firewall_groups_v2.first()
firewall_group_dict = self.api_firewall_groups_v2_sdk[0]
form_data = {
'name': firewall_group.name,
@@ -479,17 +439,15 @@ class FwaasV2ApiTests(test.APITestCase):
'admin_state_up': firewall_group.admin_state_up
}
form_dict = {'firewall_group': form_data}
ret_dict = {'firewall_group': firewall_group_dict}
self.mock_create_fwaas_firewall_group.return_value = ret_dict
self.mock_create_firewall_group.return_value = firewall_group_dict
ret_val = api_fwaas_v2.firewall_group_create(self.request, **form_data)
self.assertIsInstance(ret_val, api_fwaas_v2.FirewallGroup)
self.assertEqual(firewall_group.name, ret_val.name)
self.assertEqual(firewall_group.id, ret_val.id)
self.mock_create_fwaas_firewall_group.assert_called_once_with(
form_dict)
self.mock_create_firewall_group.assert_called_once_with(
**form_data)
def _assert_firewall_return_value(self, ret_val, exp_firewall,
expand_policy=True):
@@ -517,29 +475,25 @@ class FwaasV2ApiTests(test.APITestCase):
# TODO(Sarath Mekala) : Add API tests for firewall_group_create with ports,
# add port to firewall and remove port from fw.
@helpers.create_mocks({neutronclient: ('list_fwaas_firewall_groups',
'list_fwaas_firewall_policies')})
@helpers.create_mocks({networkclient: ('firewall_groups',)})
def test_firewall_group_list(self):
exp_firewalls = self.firewall_groups_v2.list()
firewalls_dict = {
'firewall_groups': self.api_firewall_groups_v2.list()}
firewalls_dict = self.api_firewall_groups_v2_sdk
self.mock_list_fwaas_firewall_groups.return_value = firewalls_dict
self.mock_firewall_groups.return_value = firewalls_dict
ret_val = api_fwaas_v2.firewall_group_list(self.request)
for (v, d) in zip(ret_val, exp_firewalls):
self._assert_firewall_return_value(v, d, expand_policy=False)
self.mock_list_fwaas_firewall_groups.assert_called_once_with()
self.mock_firewall_groups.assert_called_once_with()
@helpers.create_mocks({neutronclient: ('list_fwaas_firewall_groups',
'list_fwaas_firewall_policies')})
@helpers.create_mocks({networkclient: ('firewall_groups',)})
def test_firewall_group_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_firewalls = self.firewall_groups_v2.list()
firewalls_dict = {
'firewall_groups': self.api_firewall_groups_v2.list()}
firewalls_dict = self.api_firewall_groups_v2_sdk
self.mock_list_fwaas_firewall_groups.side_effect = [
self.mock_firewall_groups.side_effect = [
firewalls_dict,
firewalls_dict,
]
@@ -549,34 +503,33 @@ class FwaasV2ApiTests(test.APITestCase):
for (v, d) in zip(ret_val, exp_firewalls):
self._assert_firewall_return_value(v, d, expand_policy=False)
self.assertEqual(2, self.mock_list_fwaas_firewall_groups.call_count)
self.mock_list_fwaas_firewall_groups.assert_has_calls([
self.assertEqual(2, self.mock_firewall_groups.call_count)
self.mock_firewall_groups.assert_has_calls([
mock.call(shared=False, tenant_id=tenant_id),
mock.call(shared=True),
])
@helpers.create_mocks({neutronclient: ('list_fwaas_firewall_groups', )})
@helpers.create_mocks({networkclient: ('firewall_groups', )})
def test_fwg_port_list(self):
mock_port_id_1 = '62b974c5-48fb-4fd1-946f-5ace1d970dd4'
mock_port_id_2 = 'da012bb6-c350-4a72-b6c9-69c4f2008aa4'
mock_port_id_3 = 'c2a2ce11-71dd-49a5-84ec-2407ecb42106'
mock_groups = [
{'ports': [mock_port_id_1, mock_port_id_2]},
{'ports': []},
{'ports': [mock_port_id_3]}
sdk_fw_group.FirewallGroup(
**{'ports': [mock_port_id_1, mock_port_id_2]}),
sdk_fw_group.FirewallGroup(**{'ports': []}),
sdk_fw_group.FirewallGroup(**{'ports': [mock_port_id_3]})
]
self.mock_list_fwaas_firewall_groups.return_value = {
'firewall_groups': mock_groups
}
self.mock_firewall_groups.return_value = mock_groups
expected_set = {mock_port_id_1, mock_port_id_2, mock_port_id_3}
retrieved_set = api_fwaas_v2.fwg_port_list(self.request)
self.assertEqual(expected_set, retrieved_set)
@helpers.create_mocks({neutronclient: ('list_ports',
'list_fwaas_firewall_groups')})
@helpers.create_mocks({networkclient: ('ports',
'firewall_groups')})
def test_fwg_port_list_for_tenant(self):
tenant_id = self.request.user.project_id
router_port = {
@@ -609,18 +562,18 @@ class FwaasV2ApiTests(test.APITestCase):
'name': 'port-5',
'device_owner': 'network:ha_router_replicated_interface'
}
dummy_ports = {'ports': [
dummy_ports = [
router_port,
vm_port1,
vm_port2,
gateway_port,
dhcp_port,
ha_router_port
]}
]
sdk_ports = [sdk_port.Port(**p) for p in dummy_ports]
self.mock_list_ports.return_value = dummy_ports
self.mock_list_fwaas_firewall_groups.return_value = \
{'firewall_groups': []}
self.mock_ports.return_value = sdk_ports
self.mock_firewall_groups.return_value = []
ports = api_fwaas_v2.fwg_port_list_for_tenant(self.request, tenant_id)
self.assertEqual(router_port['id'], ports[0]['id'])
@@ -629,12 +582,12 @@ class FwaasV2ApiTests(test.APITestCase):
self.assertEqual(ha_router_port['id'], ports[3]['id'])
self.assertEqual(4, len(ports))
self.mock_list_ports.assert_called_once_with(tenant_id=tenant_id)
self.mock_list_fwaas_firewall_groups.assert_called_once_with(
self.mock_ports.assert_called_once_with(tenant_id=tenant_id)
self.mock_firewall_groups.assert_called_once_with(
tenant_id=tenant_id)
@helpers.create_mocks({neutronclient: ('list_ports',
'list_fwaas_firewall_groups')})
@helpers.create_mocks({networkclient: ('ports',
'firewall_groups')})
def test_fwg_port_list_for_tenant_with_used_port(self):
tenant_id = self.request.user.project_id
router_port = {
@@ -662,99 +615,102 @@ class FwaasV2ApiTests(test.APITestCase):
'name': 'port-5',
'device_owner': 'network:ha_router_replicated_interface'
}
dummy_ports = {'ports': [
dummy_ports = [
router_port,
vm_port1,
gateway_port,
dhcp_port,
ha_router_port
]}
]
sdk_ports = [sdk_port.Port(**p) for p in dummy_ports]
used_ports = {'firewall_groups': [{'ports': [router_port['id'],
ha_router_port['id']]}]}
used_ports = [{'ports': [router_port['id'],
ha_router_port['id']]}]
sdk_fwgs = [sdk_fw_group.FirewallGroup(**g) for g in used_ports]
self.mock_list_ports.return_value = dummy_ports
self.mock_list_fwaas_firewall_groups.return_value = used_ports
self.mock_ports.return_value = sdk_ports
self.mock_firewall_groups.return_value = sdk_fwgs
ports = api_fwaas_v2.fwg_port_list_for_tenant(self.request, tenant_id)
self.assertEqual(vm_port1['id'], ports[0]['id'])
self.assertEqual(1, len(ports))
self.mock_list_ports.assert_called_once_with(tenant_id=tenant_id)
self.mock_list_fwaas_firewall_groups.assert_called_once_with(
self.mock_ports.assert_called_once_with(tenant_id=tenant_id)
self.mock_firewall_groups.assert_called_once_with(
tenant_id=tenant_id)
@helpers.create_mocks({neutronclient: ('list_ports',
'list_fwaas_firewall_groups')})
@helpers.create_mocks({networkclient: ('ports',
'firewall_groups')})
def test_fwg_port_list_for_tenant_no_match(self):
tenant_id = self.request.user.project_id
dummy_ports = {'ports': [
dummy_ports = [
{'name': 'port-3', 'device_owner': 'network:router_gateway'},
{'name': 'port-4', 'device_owner': 'network:dhcp'},
]}
]
sdk_ports = [sdk_port.Port(**p) for p in dummy_ports]
self.mock_list_ports.return_value = dummy_ports
self.mock_list_fwaas_firewall_groups.return_value = \
{'firewall_groups': []}
self.mock_ports.return_value = sdk_ports
self.mock_firewall_groups.return_value = []
ports = api_fwaas_v2.fwg_port_list_for_tenant(self.request, tenant_id)
self.assertEqual([], ports)
self.mock_list_ports.assert_called_once_with(tenant_id=tenant_id)
self.mock_list_fwaas_firewall_groups.assert_called_once_with(
self.mock_ports.assert_called_once_with(tenant_id=tenant_id)
self.mock_firewall_groups.assert_called_once_with(
tenant_id=tenant_id)
@helpers.create_mocks({neutronclient: ('list_ports',
'list_fwaas_firewall_groups')})
@helpers.create_mocks({networkclient: ('ports',
'firewall_groups')})
def test_fwg_port_list_for_tenant_no_ports(self):
tenant_id = self.request.user.project_id
self.mock_list_ports.return_value = {'ports': []}
self.mock_list_fwaas_firewall_groups.return_value = \
{'firewall_groups': []}
self.mock_ports.return_value = []
self.mock_firewall_groups.return_value = []
ports = api_fwaas_v2.fwg_port_list_for_tenant(self.request, tenant_id)
self.assertEqual([], ports)
self.mock_list_ports.assert_called_once_with(tenant_id=tenant_id)
self.mock_list_fwaas_firewall_groups.assert_called_once_with(
self.mock_ports.assert_called_once_with(tenant_id=tenant_id)
self.mock_firewall_groups.assert_called_once_with(
tenant_id=tenant_id)
@helpers.create_mocks({neutronclient: ('show_fwaas_firewall_group',
'show_fwaas_firewall_policy')})
@helpers.create_mocks({networkclient: ('get_firewall_group',
'get_firewall_policy')})
def test_firewall_group_get(self):
exp_firewall = self.firewall_groups_v2.first()
ret_dict = {'firewall_group': self.api_firewall_groups_v2.first()}
fwg_dict = self.api_firewall_groups_v2_sdk[0]
ingress_policy_id = exp_firewall.ingress_firewall_policy_id
ingress_policy = [p for p in self.api_fw_policies_v2.list()
if p['id'] == ingress_policy_id][0]
ingress_policy = [p for p in self.api_fw_policies_v2_sdk
if p.id == ingress_policy_id][0]
sdk_ingress_policy = sdk_fw_policy.FirewallPolicy(**ingress_policy)
egress_policy_id = exp_firewall.egress_firewall_policy_id
egress_policy = [p for p in self.api_fw_policies_v2.list()
if p['id'] == egress_policy_id][0]
egress_policy = [p for p in self.api_fw_policies_v2_sdk
if p.id == egress_policy_id][0]
sdk_egress_policy = sdk_fw_policy.FirewallPolicy(**egress_policy)
self.mock_show_fwaas_firewall_group.return_value = ret_dict
self.mock_show_fwaas_firewall_policy.side_effect = [
{'firewall_policy': ingress_policy},
{'firewall_policy': egress_policy}
self.mock_get_firewall_group.return_value = fwg_dict
self.mock_get_firewall_policy.side_effect = [
sdk_ingress_policy,
sdk_egress_policy
]
ret_val = api_fwaas_v2.firewall_group_get(self.request,
exp_firewall.id)
self._assert_firewall_return_value(ret_val, exp_firewall)
self.mock_show_fwaas_firewall_group.assert_called_once_with(
self.mock_get_firewall_group.assert_called_once_with(
exp_firewall.id)
self.assertEqual(2, self.mock_show_fwaas_firewall_policy.call_count)
self.mock_show_fwaas_firewall_policy.assert_has_calls([
self.assertEqual(2, self.mock_get_firewall_policy.call_count)
self.mock_get_firewall_policy.assert_has_calls([
mock.call(ingress_policy_id),
mock.call(egress_policy_id),
])
@helpers.create_mocks({neutronclient: ('update_fwaas_firewall_group',)})
@helpers.create_mocks({networkclient: ('update_firewall_group',)})
def test_firewall_group_update(self):
firewall = self.firewall_groups_v2.first()
firewall_dict = self.api_firewall_groups_v2.first()
firewall_dict = self.api_firewall_groups_v2_sdk[0]
firewall.name = 'new name'
firewall.description = 'new desc'
@@ -769,10 +725,7 @@ class FwaasV2ApiTests(test.APITestCase):
'admin_state_up': firewall.admin_state_up
}
form_dict = {'firewall_group': form_data}
ret_dict = {'firewall_group': firewall_dict}
self.mock_update_fwaas_firewall_group.return_value = ret_dict
self.mock_update_firewall_group.return_value = firewall_dict
ret_val = api_fwaas_v2.firewall_group_update(self.request,
firewall.id, **form_data)
@@ -780,5 +733,5 @@ class FwaasV2ApiTests(test.APITestCase):
self.assertEqual(firewall.name, ret_val.name)
self.assertTrue(ret_val.id)
self.mock_update_fwaas_firewall_group.assert_called_once_with(
firewall.id, form_dict)
self.mock_update_firewall_group.assert_called_once_with(
firewall.id, **form_data)

View File

@@ -14,6 +14,9 @@
import copy
from openstack.network.v2 import firewall_group as sdk_fw_group
from openstack.network.v2 import firewall_policy as sdk_fw_policy
from openstack.network.v2 import firewall_rule as sdk_fw_rule
from openstack_dashboard.test.test_data import utils
from neutron_fwaas_dashboard.api import fwaas_v2 as fwaas
@@ -25,10 +28,10 @@ def data(TEST):
TEST.fw_policies_v2 = utils.TestDataContainer()
TEST.fw_rules_v2 = utils.TestDataContainer()
# Data return by neutronclient.
TEST.api_firewall_groups_v2 = utils.TestDataContainer()
TEST.api_fw_policies_v2 = utils.TestDataContainer()
TEST.api_fw_rules_v2 = utils.TestDataContainer()
# Data return by openstacksdk.
TEST.api_firewall_groups_v2_sdk = list()
TEST.api_fw_policies_v2_sdk = list()
TEST.api_fw_rules_v2_sdk = list()
# 1st rule (used by 1st policy)
rule1_dict = {
@@ -46,7 +49,7 @@ def data(TEST):
'source_port': '80',
'tenant_id': '1',
}
TEST.api_fw_rules_v2.add(rule1_dict)
TEST.api_fw_rules_v2_sdk.append(sdk_fw_rule.FirewallRule(**rule1_dict))
rule1 = fwaas.Rule(copy.deepcopy(rule1_dict))
TEST.fw_rules_v2.add(rule1)
@@ -67,7 +70,7 @@ def data(TEST):
'source_port': '80',
'tenant_id': '1',
}
TEST.api_fw_rules_v2.add(rule2_dict)
TEST.api_fw_rules_v2_sdk.append(sdk_fw_rule.FirewallRule(**rule2_dict))
rule2 = fwaas.Rule(copy.deepcopy(rule2_dict))
TEST.fw_rules_v2.add(rule2)
@@ -88,7 +91,7 @@ def data(TEST):
'source_port': '80',
'tenant_id': '1',
}
TEST.api_fw_rules_v2.add(rule3_dict)
TEST.api_fw_rules_v2_sdk.append(sdk_fw_rule.FirewallRule(**rule3_dict))
rule3 = fwaas.Rule(copy.deepcopy(rule3_dict))
TEST.fw_rules_v2.add(rule3)
@@ -103,7 +106,8 @@ def data(TEST):
'shared': True,
'tenant_id': '1',
}
TEST.api_fw_policies_v2.add(policy1_dict)
TEST.api_fw_policies_v2_sdk.append(
sdk_fw_policy.FirewallPolicy(**policy1_dict))
policy1 = fwaas.Policy(copy.deepcopy(policy1_dict))
policy1._apidict['rules'] = [rule1, rule2]
@@ -119,7 +123,8 @@ def data(TEST):
'shared': False,
'tenant_id': '1',
}
TEST.api_fw_policies_v2.add(policy2_dict)
TEST.api_fw_policies_v2_sdk.append(
sdk_fw_policy.FirewallPolicy(**policy2_dict))
policy2 = fwaas.Policy(copy.deepcopy(policy2_dict))
policy2._apidict['rules'] = []
@@ -138,7 +143,8 @@ def data(TEST):
'status': 'PENDING_CREATE',
'tenant_id': '1',
}
TEST.api_firewall_groups_v2.add(fwg1_dict)
TEST.api_firewall_groups_v2_sdk.append(
sdk_fw_group.FirewallGroup(**fwg1_dict))
fwg1 = fwaas.FirewallGroup(copy.deepcopy(fwg1_dict))
fwg1._apidict['ingress_policy'] = policy1
@@ -159,7 +165,8 @@ def data(TEST):
'status': 'INACTIVE',
'tenant_id': '1',
}
TEST.api_firewall_groups_v2.add(fwg2_dict)
TEST.api_firewall_groups_v2_sdk.append(
sdk_fw_group.FirewallGroup(**fwg2_dict))
fwg2 = fwaas.FirewallGroup(copy.deepcopy(fwg2_dict))
fwg2._apidict['ingress_policy'] = None

View File

@@ -4,4 +4,4 @@
pbr!=2.1.0,>=2.0.0 # Apache-2.0
horizon>=17.1.0 # Apache-2.0
python-neutronclient>=6.7.0 # Apache-2.0
openstacksdk>=4.5.0 # Apache-2.0