Merge "FWaaS - Adds an argument into find_resource"

This commit is contained in:
Jenkins 2017-01-23 21:09:02 +00:00 committed by Gerrit Code Review
commit a910bf78aa
8 changed files with 331 additions and 68 deletions

View File

@ -1,4 +1,4 @@
# Copyright 2016 FUJITSU LIMITED
# Copyright 2016-2017 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -20,7 +20,6 @@ FWP = 'firewall_policy'
FWPS = 'firewall_policies'
FWR = 'firewall_rule'
FWRS = 'firewall_rules'
CMD_RESOURCE_MAP = dict(
(res, 'fwaas_' + res) for res in [FWG, FWGS, FWP, FWPS, FWR, FWRS]
)
CMD_FWG = 'fwaas_' + FWG
CMD_FWP = 'fwaas_' + FWP
CMD_FWR = 'fwaas_' + FWR

View File

@ -1,4 +1,4 @@
# Copyright 2016 FUJITSU LIMITED
# Copyright 2016-2017 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -114,22 +114,22 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True):
parsed_args.no_ingress_firewall_policy):
attrs['ingress_firewall_policy_id'] = client.find_resource(
const.FWP, parsed_args.ingress_firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
cmd_resource=const.CMD_FWP)['id']
elif parsed_args.ingress_firewall_policy:
attrs['ingress_firewall_policy_id'] = client.find_resource(
const.FWP, parsed_args.ingress_firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
cmd_resource=const.CMD_FWP)['id']
elif parsed_args.no_ingress_firewall_policy:
attrs['ingress_firewall_policy_id'] = None
if (parsed_args.egress_firewall_policy and
parsed_args.no_egress_firewall_policy):
attrs['egress_firewall_policy_id'] = client.find_resource(
const.FWP, parsed_args.egress_firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
cmd_resource=const.CMD_FWP)['id']
elif parsed_args.egress_firewall_policy:
attrs['egress_firewall_policy_id'] = client.find_resource(
const.FWP, parsed_args.egress_firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
cmd_resource=const.CMD_FWP)['id']
elif parsed_args.no_egress_firewall_policy:
attrs['egress_firewall_policy_id'] = None
if parsed_args.public:
@ -153,7 +153,8 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True):
ports.append(client.find_resource('port', p)['id'])
if not is_create:
ports += client.find_resource(
const.FWG, parsed_args.firewall_group)['ports']
const.FWG, parsed_args.firewall_group,
cmd_resource=const.CMD_FWG)['ports']
attrs['ports'] = sorted(set(ports))
elif parsed_args.no_port:
attrs['ports'] = []
@ -209,7 +210,7 @@ class DeleteFirewallGroup(command.Command):
for fwg in parsed_args.firewall_group:
try:
fwg_id = client.find_resource(
const.FWG, fwg, cmd_resource='fwaas_' + const.FWG)['id']
const.FWG, fwg, cmd_resource=const.CMD_FWG)['id']
client.delete_fwaas_firewall_group(fwg_id)
except Exception as e:
result += 1
@ -218,7 +219,7 @@ class DeleteFirewallGroup(command.Command):
const.FWG: fwg, 'e': e})
if result > 0:
total = len(parsed_args.firewall_groups)
total = len(parsed_args.firewall_group)
msg = (_("%(result)s of %(total)s firewall group(s) "
"failed to delete.") % {'result': result, 'total': total})
raise exceptions.CommandError(msg)
@ -271,7 +272,7 @@ class SetFirewallGroup(command.Command):
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group,
cmd_resource='fwaas_' + const.FWG)['id']
cmd_resource=const.CMD_FWG)['id']
attrs = _get_common_attrs(self.app.client_manager, parsed_args,
is_create=False)
try:
@ -296,7 +297,7 @@ class ShowFirewallGroup(command.ShowOne):
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group,
cmd_resource='fwaas_' + const.FWG)['id']
cmd_resource=const.CMD_FWG)['id']
obj = client.show_fwaas_firewall_group(fwg_id)[const.FWG]
columns, display_columns = osc_utils.get_columns(obj, _attr_map)
data = utils.get_dict_properties(obj, columns, formatters=_formatters)
@ -356,7 +357,8 @@ class UnsetFirewallGroup(command.Command):
attrs['admin_state_up'] = False
if parsed_args.port:
old = client.find_resource(
const.FWG, parsed_args.firewall_group)['ports']
const.FWG, parsed_args.firewall_group,
cmd_resource=const.CMD_FWG)['ports']
new = [client.find_resource(
'port', r)['id'] for r in parsed_args.port]
attrs['ports'] = sorted(list(set(old) - set(new)))
@ -367,7 +369,7 @@ class UnsetFirewallGroup(command.Command):
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group,
cmd_resource='fwaas_' + const.FWG)['id']
cmd_resource=const.CMD_FWG)['id']
attrs = self._get_attrs(self.app.client_manager, parsed_args)
try:
client.update_fwaas_firewall_group(fwg_id, {const.FWG: attrs})

View File

@ -1,4 +1,4 @@
# Copyright 2016 FUJITSU LIMITED
# Copyright 2016-2017 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -56,16 +56,17 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True):
_firewall_rules = []
for f in set(parsed_args.firewall_rule):
_firewall_rules.append(client.find_resource(
const.FWR, f, cmd_resource='fwaas_' + const.FWR)['id'])
const.FWR, f, cmd_resource=const.CMD_FWR)['id'])
attrs[const.FWRS] = sorted(_firewall_rules)
elif parsed_args.firewall_rule:
rules = []
for f in set(parsed_args.firewall_rule):
rules.append(client.find_resource(
const.FWR, f, cmd_resource='fwaas_' + const.FWR)['id'])
const.FWR, f, cmd_resource=const.CMD_FWR)['id'])
if not is_create:
rules += client.find_resource(
const.FWP, parsed_args.firewall_policy)[const.FWRS]
const.FWP, parsed_args.firewall_policy,
cmd_resource=const.CMD_FWP)[const.FWRS]
attrs[const.FWRS] = sorted(set(rules))
elif parsed_args.no_firewall_rule:
attrs[const.FWRS] = []
@ -173,7 +174,7 @@ class DeleteFirewallPolicy(command.Command):
if result > 0:
total = len(parsed_args.firewall_policy)
msg = (_("%(result)s of %(total)s Firewall policy(s) "
msg = (_("%(result)s of %(total)s firewall policy(s) "
"failed to delete.") % {'result': result, 'total': total})
raise exceptions.CommandError(msg)
@ -211,23 +212,23 @@ class FirewallPolicyInsertRule(command.Command):
if parsed_args.insert_before:
_insert_before = client.find_resource(
const.FWR, parsed_args.insert_before,
cmd_resource='fwaas_' + const.FWR)['id']
cmd_resource=const.CMD_FWR)['id']
_insert_after = ''
if 'insert_after' in parsed_args:
if parsed_args.insert_after:
_insert_after = client.find_resource(
const.FWR, parsed_args.insert_after,
cmd_resource='fwaas_' + const.FWR)['id']
cmd_resource=const.CMD_FWR)['id']
return {'firewall_rule_id': _rule_id,
'insert_before': _insert_before,
'insert_after': _insert_after}
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
body = self.args2body(parsed_args)
policy_id = client.find_resource(
const.FWP, parsed_args.firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
cmd_resource=const.CMD_FWP)['id']
body = self.args2body(parsed_args)
client.insert_rule_fwaas_firewall_policy(policy_id, body)
rule_id = body['firewall_rule_id']
policy = parsed_args.firewall_policy
@ -253,11 +254,11 @@ class FirewallPolicyRemoveRule(command.Command):
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
fwr_id = _get_required_firewall_rule(client, parsed_args)
body = {'firewall_rule_id': fwr_id}
policy_id = client.find_resource(
const.FWP, parsed_args.firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
cmd_resource=const.CMD_FWP)['id']
fwr_id = _get_required_firewall_rule(client, parsed_args)
body = {'firewall_rule_id': fwr_id}
client.remove_rule_fwaas_firewall_policy(policy_id, body)
rule_id = body['firewall_rule_id']
policy = parsed_args.firewall_policy
@ -317,7 +318,7 @@ class SetFirewallPolicy(command.Command):
client = self.app.client_manager.neutronclient
fwp_id = client.find_resource(
const.FWP, parsed_args.firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
cmd_resource=const.CMD_FWP)['id']
attrs = _get_common_attrs(self.app.client_manager,
parsed_args, is_create=False)
try:
@ -343,7 +344,7 @@ class ShowFirewallPolicy(command.ShowOne):
client = self.app.client_manager.neutronclient
fwp_id = client.find_resource(const.FWP,
parsed_args.firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
cmd_resource=const.CMD_FWP)['id']
obj = client.show_fwaas_firewall_policy(fwp_id)[const.FWP]
columns, display_columns = osc_utils.get_columns(obj, _attr_map)
data = utils.get_dict_properties(obj, columns, formatters=_formatters)
@ -354,8 +355,8 @@ def _get_required_firewall_rule(client, parsed_args):
if not parsed_args.firewall_rule:
msg = (_("Firewall rule (name or ID) is required."))
raise exceptions.CommandError(msg)
return client.find_resource(const.FWR, parsed_args.firewall_rule,
cmd_resource='fwaas_' + const.FWR)['id']
return client.find_resource(
const.FWR, parsed_args.firewall_rule, cmd_resource=const.CMD_FWR)['id']
class UnsetFirewallPolicy(command.Command):
@ -395,11 +396,12 @@ class UnsetFirewallPolicy(command.Command):
if parsed_args.firewall_rule:
old = client.find_resource(
const.FWP, parsed_args.firewall_policy)[const.FWRS]
const.FWP, parsed_args.firewall_policy,
cmd_resource=const.CMD_FWP)[const.FWRS]
new = []
for f in set(parsed_args.firewall_rule):
new.append(client.find_resource(
const.FWR, f, cmd_resource='fwaas_' + const.FWR)['id'])
const.FWR, f, cmd_resource=const.CMD_FWR)['id'])
attrs[const.FWRS] = sorted(list(set(old) - set(new)))
if parsed_args.all_firewall_rule:
attrs[const.FWRS] = []
@ -413,7 +415,7 @@ class UnsetFirewallPolicy(command.Command):
client = self.app.client_manager.neutronclient
fwp_id = client.find_resource(
const.FWP, parsed_args.firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
cmd_resource=const.CMD_FWP)['id']
attrs = self._get_attrs(self.app.client_manager, parsed_args)
try:
client.update_fwaas_firewall_policy(fwp_id, {const.FWP: attrs})

View File

@ -1,4 +1,4 @@
# Copyright 2016 FUJITSU LIMITED
# Copyright 2016-2017 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -223,7 +223,7 @@ class DeleteFirewallRule(command.Command):
for fwr in parsed_args.firewall_rule:
try:
fwr_id = client.find_resource(
const.FWR, fwr, cmd_resource='fwaas_' + const.FWR)['id']
const.FWR, fwr, cmd_resource=const.CMD_FWR)['id']
client.delete_fwaas_firewall_rule(fwr_id)
except Exception as e:
result += 1
@ -233,7 +233,7 @@ class DeleteFirewallRule(command.Command):
if result > 0:
total = len(parsed_args.firewall_rule)
msg = (_("%(result)s of %(total)s Firewall rule(s) failed "
msg = (_("%(result)s of %(total)s firewall rule(s) failed "
"to delete.") % {'result': result, 'total': total})
raise exceptions.CommandError(msg)
@ -301,7 +301,7 @@ class SetFirewallRule(command.Command):
parsed_args, is_create=False)
fwr_id = client.find_resource(
const.FWR, parsed_args.firewall_rule,
cmd_resource='fwaas_' + const.FWR)['id']
cmd_resource=const.CMD_FWR)['id']
try:
client.update_fwaas_firewall_rule(fwr_id, {const.FWR: attrs})
except Exception as e:
@ -325,7 +325,7 @@ class ShowFirewallRule(command.ShowOne):
client = self.app.client_manager.neutronclient
fwr_id = client.find_resource(
const.FWR, parsed_args.firewall_rule,
cmd_resource='fwaas_' + const.FWR)['id']
cmd_resource=const.CMD_FWR)['id']
obj = client.show_fwaas_firewall_rule(fwr_id)[const.FWR]
columns, display_columns = osc_utils.get_columns(obj, _attr_map)
data = utils.get_dict_properties(obj, columns, formatters=_formatters)
@ -390,7 +390,7 @@ class UnsetFirewallRule(command.Command):
attrs = self._get_attrs(self.app.client_manager, parsed_args)
fwr_id = client.find_resource(
const.FWR, parsed_args.firewall_rule,
cmd_resource='fwaas_' + const.FWR)['id']
cmd_resource=const.CMD_FWR)['id']
try:
client.update_fwaas_firewall_rule(fwr_id, {const.FWR: attrs})
except Exception as e:

View File

@ -1,4 +1,4 @@
# Copyright 2016 FUJITSU LIMITED
# Copyright 2016-2017 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -14,6 +14,9 @@
# under the License.
#
import testtools
from osc_lib import exceptions
from osc_lib.tests import utils
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
@ -46,6 +49,17 @@ class TestShowFWaaS(test_fakes.TestNeutronClientOSCV2):
def test_show_filtered_by_id_or_name(self):
target = self.resource['id']
def _mock_fwaas(*args, **kwargs):
# Find specified ingress_firewall_policy
if self.neutronclient.find_resource.call_count == 1:
self.assertEqual(self.res, args[0])
self.assertEqual(self.resource['id'], args[1])
self.assertEqual({'cmd_resource': 'fwaas_' + self.res}, kwargs)
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = _mock_fwaas
arglist = [target]
verifylist = [(self.res, target)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@ -210,10 +224,20 @@ class TestDeleteFWaaS(test_fakes.TestNeutronClientOSCV2):
self.assertIsNone(result)
def test_delete_with_multiple_resources(self):
def _mock_fwaas(*args, **kwargs):
self.assertEqual(self.res, args[0])
self.assertIsNotNone(args[1])
self.assertEqual({'cmd_resource': 'fwaas_' + self.res}, kwargs)
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = _mock_fwaas
target1 = 'target1'
target2 = 'target2'
arglist = [target1, target2]
verifylist = [(self.res, [target1, target2])]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.assertIsNone(result)
@ -223,6 +247,21 @@ class TestDeleteFWaaS(test_fakes.TestNeutronClientOSCV2):
actual = ''.join(self.mocked.call_args_list[idx][0])
self.assertEqual(reference, actual)
def test_delete_multiple_with_exception(self):
target1 = 'target'
arglist = [target1]
verifylist = [(self.res, [target1])]
self.neutronclient.find_resource.side_effect = [
target1, exceptions.CommandError
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
resource_name = self.res.replace('_', ' ')
msg = "1 of 2 %s(s) failed to delete." % resource_name
with testtools.ExpectedException(exceptions.CommandError) as e:
self.cmd.take_action(parsed_args)
self.assertEqual(msg, str(e))
class TestUnsetFWaaS(test_fakes.TestNeutronClientOSCV2):

View File

@ -22,6 +22,7 @@ from osc_lib import exceptions
from osc_lib.tests import utils
from neutronclient.osc import utils as osc_utils
from neutronclient.osc.v2.fwaas import constants as const
from neutronclient.osc.v2.fwaas import firewallgroup
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
from neutronclient.tests.unit.osc.v2.fwaas import common
@ -85,21 +86,11 @@ class TestFirewallGroup(test_fakes.TestNeutronClientOSCV2):
def setUp(self):
super(TestFirewallGroup, self).setUp()
def side_effect_for_find_resource(*args, **kwargs):
port_id = args[1]
ports = _fwg['ports']
if self.res in args[0]:
ports = _fwg['ports']
return {'id': port_id, 'ports': ports}
def _find_resource(*args, **kwargs):
return {'id': args[1], 'ports': _fwg['ports']}
def side_effect_for_list_ports(*args):
port_name = 'id_for_port'
return {'ports': [{'id': port_name}]}
self.neutronclient.find_resource.side_effect = mock.Mock(
side_effect=side_effect_for_find_resource)
self.neutronclient.list_ports = mock.Mock(
side_effect=side_effect_for_list_ports)
self.neutronclient.find_resource = mock.Mock(
side_effect=_find_resource)
osc_utils.find_project = mock.Mock()
osc_utils.find_project.id = _fwg['tenant_id']
self.res = 'firewall_group'
@ -204,6 +195,45 @@ class TestCreateFirewallGroup(TestFirewallGroup, common.TestCreateFWaaS):
self.check_results(headers, data, request)
def test_create_with_ingress_policy(self):
ingress_policy = 'my-ingress-policy'
def _mock_port_fwg(*args, **kwargs):
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = _mock_port_fwg
arglist = ['--ingress-firewall-policy', ingress_policy]
verifylist = [('ingress_firewall_policy', ingress_policy)]
request, response = _generate_req_and_res(verifylist)
self._update_expect_response(request, response)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.neutronclient.find_resource.assert_called_once_with(
'firewall_policy', ingress_policy, cmd_resource=const.CMD_FWP)
self.check_results(headers, data, request)
def test_create_with_egress_policy(self):
egress_policy = 'my-egress-policy'
def _mock_port_fwg(*args, **kwargs):
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = _mock_port_fwg
arglist = ['--egress-firewall-policy', egress_policy]
verifylist = [('egress_firewall_policy', egress_policy)]
request, response = _generate_req_and_res(verifylist)
self._update_expect_response(request, response)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.neutronclient.find_resource.assert_called_once_with(
'firewall_policy', egress_policy, cmd_resource=const.CMD_FWP)
self.check_results(headers, data, request)
def test_create_with_all_params(self):
name = 'my-name'
description = 'my-desc'
@ -342,26 +372,77 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS):
response[column] for column in self.ordered_columns
)
def test_set_ingress_policy(self):
# firewall_group-update myid --policy newpolicy.
def test_set_ingress_policy_and_egress_policy(self):
target = self.resource['id']
policy = 'ingress_policy'
arglist = [target, '--ingress-firewall-policy', policy]
ingress_policy = 'ingress_policy'
egress_policy = 'egress_policy'
def _mock_fwg_policy(*args, **kwargs):
# 1. Find specified firewall_group
if self.neutronclient.find_resource.call_count == 1:
self.neutronclient.find_resource.assert_called_with(
self.res, target, cmd_resource=const.CMD_FWG)
# 2. Find specified 'ingress_firewall_policy'
if self.neutronclient.find_resource.call_count == 2:
self.neutronclient.find_resource.assert_called_with(
'firewall_policy', ingress_policy,
cmd_resource=const.CMD_FWP)
# 3. Find specified 'ingress_firewall_policy'
if self.neutronclient.find_resource.call_count == 3:
self.neutronclient.find_resource.assert_called_with(
'firewall_policy', egress_policy,
cmd_resource=const.CMD_FWP)
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = _mock_fwg_policy
arglist = [
target,
'--ingress-firewall-policy', ingress_policy,
'--egress-firewall-policy', egress_policy,
]
verifylist = [
(self.res, target),
('ingress_firewall_policy', policy),
('ingress_firewall_policy', ingress_policy),
('egress_firewall_policy', egress_policy),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'ingress_firewall_policy_id': policy}})
target, {self.res: {'ingress_firewall_policy_id': ingress_policy,
'egress_firewall_policy_id': egress_policy}})
self.assertIsNone(result)
def test_set_port(self):
target = self.resource['id']
port1 = 'additional_port1'
port2 = 'additional_port2'
def _mock_port_fwg(*args, **kwargs):
# 1. Find specified firewall_group
if self.neutronclient.find_resource.call_count == 1:
self.neutronclient.find_resource.assert_called_with(
self.res, target, cmd_resource=const.CMD_FWG)
return {'id': args[1]}
# 2. Find specified 'port' #1
if self.neutronclient.find_resource.call_count == 2:
self.neutronclient.find_resource.assert_called_with(
'port', args[1])
return {'id': args[1]}
# 3. Find specified 'port' #2
if self.neutronclient.find_resource.call_count == 3:
self.neutronclient.find_resource.assert_called_with(
'port', args[1])
return {'id': args[1]}
# 4. Find specified firewall_group and refer 'ports' attribute
if self.neutronclient.find_resource.call_count == 4:
self.neutronclient.find_resource.assert_called_with(
self.res, target, cmd_resource=const.CMD_FWG)
return {'ports': _fwg['ports']}
self.neutronclient.find_resource.side_effect = _mock_port_fwg
arglist = [
target,
'--port', port1,
@ -376,6 +457,7 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS):
expect = {'ports': sorted(_fwg['ports'] + [port1, port2])}
self.mocked.assert_called_once_with(target, {self.res: expect})
self.assertEqual(4, self.neutronclient.find_resource.call_count)
self.assertIsNone(result)
def test_set_no_port(self):
@ -584,6 +666,27 @@ class TestUnsetFirewallGroup(TestFirewallGroup, common.TestUnsetFWaaS):
def test_unset_port(self):
target = self.resource['id']
port = 'old_port'
def _mock_port_fwg(*args, **kwargs):
# 1. Find specified firewall_group
if self.neutronclient.find_resource.call_count == 1:
self.neutronclient.find_resource.assert_called_with(
self.res, target, cmd_resource=const.CMD_FWG)
return {'id': args[1]}
# 2. Find specified firewall_group and refer 'ports' attribute
if self.neutronclient.find_resource.call_count == 2:
self.neutronclient.find_resource.assert_called_with(
self.res, target, cmd_resource=const.CMD_FWG)
return {'ports': _fwg['ports']}
# 3. Find specified 'port'
if self.neutronclient.find_resource.call_count == 3:
self.neutronclient.find_resource.assert_called_with(
'port', port)
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = mock.Mock(
side_effect=_mock_port_fwg)
arglist = [
target,
'--port', port,

View File

@ -22,6 +22,7 @@ from osc_lib import exceptions
from osc_lib.tests import utils
from neutronclient.osc import utils as osc_utils
from neutronclient.osc.v2.fwaas import constants as const
from neutronclient.osc.v2.fwaas import firewallpolicy
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
from neutronclient.tests.unit.osc.v2.fwaas import common
@ -90,7 +91,6 @@ class TestFirewallPolicy(test_fakes.TestNeutronClientOSCV2):
self.neutronclient.find_resource = mock.Mock(
side_effect=_find_resource)
# fw_common.get_id.side_effect = lambda x, y, z: z
osc_utils.find_project = mock.Mock()
osc_utils.find_project.id = _fwp['tenant_id']
self.res = 'firewall_policy'
@ -193,6 +193,35 @@ class TestCreateFirewallPolicy(TestFirewallPolicy, common.TestCreateFWaaS):
self.check_results(headers, data, request)
def test_create_with_rules(self):
name = 'my-fwg'
rule1 = 'rule1'
rule2 = 'rule2'
def _mock_policy(*args, **kwargs):
self.neutronclient.find_resource.assert_called_with(
'firewall_rule', args[1], cmd_resource=const.CMD_FWR)
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = _mock_policy
arglist = [
name,
'--firewall-rule', rule1,
'--firewall-rule', rule2,
]
verifylist = [
('name', name),
('firewall_rule', [rule1, rule2]),
]
request, response = _generate_req_and_res(verifylist)
self._update_expect_response(request, response)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.assertEqual(2, self.neutronclient.find_resource.call_count)
self.check_results(headers, data, request)
def test_create_with_all_params(self):
name = 'my-fwp'
desc = 'my-desc'
@ -308,6 +337,29 @@ class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS):
target = self.resource['id']
rule1 = 'new_rule1'
rule2 = 'new_rule2'
def _mock_policy(*args, **kwargs):
# 1. Find specified firewall_policy
if self.neutronclient.find_resource.call_count == 1:
self.neutronclient.find_resource.assert_called_with(
self.res, target, cmd_resource=const.CMD_FWP)
# 2. Find specified firewall_rule
if self.neutronclient.find_resource.call_count == 2:
self.neutronclient.find_resource.assert_called_with(
'firewall_rule', args[1], cmd_resource=const.CMD_FWR)
# 3. Find specified firewall_rule
if self.neutronclient.find_resource.call_count == 3:
self.neutronclient.find_resource.assert_called_with(
'firewall_rule', args[1], cmd_resource=const.CMD_FWR)
# 4. Find specified firewall_policy's 'firewall_rules' attribute
if self.neutronclient.find_resource.call_count == 4:
self.neutronclient.find_resource.assert_called_with(
self.res, target, cmd_resource=const.CMD_FWP)
return {'firewall_rules': _fwp['firewall_rules']}
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = _mock_policy
arglist = [
target,
'--firewall-rule', rule1,
@ -323,6 +375,7 @@ class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS):
expect = sorted(set(_fwp['firewall_rules'] + [rule1, rule2]))
body = {self.res: {'firewall_rules': expect}}
self.mocked.assert_called_once_with(target, body)
self.assertEqual(4, self.neutronclient.find_resource.call_count)
self.assertIsNone(result)
def test_set_no_rules(self):
@ -357,6 +410,7 @@ class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS):
body = {self.res: {'firewall_rules': [rule1]}}
self.mocked.assert_called_once_with(target, body)
self.assertEqual(2, self.neutronclient.find_resource.call_count)
self.assertIsNone(result)
def test_set_audited(self):
@ -441,6 +495,28 @@ class TestFirewallPolicyInsertRule(TestFirewallPolicy):
rule = 'new-rule'
before = 'before'
after = 'after'
def _mock_policy(*args, **kwargs):
# 1. Find specified firewall_policy
if self.neutronclient.find_resource.call_count == 1:
self.neutronclient.find_resource.assert_called_with(
self.res, target, cmd_resource=const.CMD_FWP)
# 2. Find specified firewall_rule
if self.neutronclient.find_resource.call_count == 2:
self.neutronclient.find_resource.assert_called_with(
'firewall_rule', args[1], cmd_resource=const.CMD_FWR)
# 3. Find specified firewall_rule as 'before'
if self.neutronclient.find_resource.call_count == 3:
self.neutronclient.find_resource.assert_called_with(
'firewall_rule', args[1], cmd_resource=const.CMD_FWR)
# 4. Find specified firewall_rule as 'after'
if self.neutronclient.find_resource.call_count == 4:
self.neutronclient.find_resource.assert_called_with(
'firewall_rule', args[1], cmd_resource=const.CMD_FWR)
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = _mock_policy
arglist = [
target,
rule,
@ -463,6 +539,7 @@ class TestFirewallPolicyInsertRule(TestFirewallPolicy):
'insert_after': after
})
self.assertIsNone(result)
self.assertEqual(4, self.neutronclient.find_resource.call_count)
def test_insert_with_no_firewall_rule(self):
target = self.resource['id']
@ -490,6 +567,21 @@ class TestFirewallPolicyRemoveRule(TestFirewallPolicy):
def test_remove_firewall_rule(self):
target = self.resource['id']
rule = 'remove-rule'
def _mock_policy(*args, **kwargs):
# 1. Find specified firewall_policy
if self.neutronclient.find_resource.call_count == 1:
self.neutronclient.find_resource.assert_called_with(
self.res, target, cmd_resource=const.CMD_FWP)
# 2. Find specified firewall_rule
if self.neutronclient.find_resource.call_count == 2:
self.neutronclient.find_resource.assert_called_with(
'firewall_rule', rule, cmd_resource=const.CMD_FWR)
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = mock.Mock(
side_effect=_mock_policy)
arglist = [
target,
rule,
@ -503,6 +595,7 @@ class TestFirewallPolicyRemoveRule(TestFirewallPolicy):
self.mocked.assert_called_once_with(
target, {'firewall_rule_id': rule})
self.assertIsNone(result)
self.assertEqual(2, self.neutronclient.find_resource.call_count)
def test_remove_with_no_firewall_rule(self):
target = self.resource['id']
@ -566,6 +659,25 @@ class TestUnsetFirewallPolicy(TestFirewallPolicy, common.TestUnsetFWaaS):
_fwp['firewall_rules'] = ['rule1', 'rule2']
target = self.resource['id']
rule = 'rule1'
def _mock_policy(*args, **kwargs):
# 1. Find specified firewall_policy
if self.neutronclient.find_resource.call_count == 1:
self.neutronclient.find_resource.assert_called_with(
self.res, target, cmd_resource=const.CMD_FWP)
# 2. Find 'firewall_rules' attribute from specified firewall_policy
if self.neutronclient.find_resource.call_count == 2:
self.neutronclient.find_resource.assert_called_with(
self.res, target, cmd_resource=const.CMD_FWP)
return {'firewall_rules': _fwp['firewall_rules']}
# 3. Find specified 'firewall_rule'
if self.neutronclient.find_resource.call_count == 3:
self.neutronclient.find_resource.assert_called_with(
'firewall_rule', rule, cmd_resource=const.CMD_FWR)
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = _mock_policy
arglist = [
target,
'--firewall-rule', rule,
@ -580,6 +692,7 @@ class TestUnsetFirewallPolicy(TestFirewallPolicy, common.TestUnsetFWaaS):
body = {self.res: {'firewall_rules': ['rule2']}}
self.mocked.assert_called_once_with(target, body)
self.assertIsNone(result)
self.assertEqual(3, self.neutronclient.find_resource.call_count)
def test_unset_all_firewall_rule(self):
target = self.resource['id']

View File

@ -23,6 +23,7 @@ from osc_lib import exceptions
from osc_lib.tests import utils
from neutronclient.osc import utils as osc_utils
from neutronclient.osc.v2.fwaas import constants as const
from neutronclient.osc.v2.fwaas import firewallrule
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
from neutronclient.tests.unit.osc.v2.fwaas import common
@ -76,10 +77,14 @@ class TestFirewallRule(test_fakes.TestNeutronClientOSCV2):
def setUp(self):
super(TestFirewallRule, self).setUp()
self.neutronclient.find_resource = mock.Mock()
self.neutronclient.find_resource.side_effect = \
lambda x, y, **k: {'id': y}
# fw_common.get_id.side_effect = lambda x, y, z: z
def _mock_fwr(*args, **kwargs):
self.neutronclient.find_resource.assert_called_once_with(
self.res, self.resource['id'], cmd_resource=const.CMD_FWR)
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = mock.Mock(
side_effect=_mock_fwr)
osc_utils.find_project = mock.Mock()
osc_utils.find_project.id = _fwr['tenant_id']
self.res = 'firewall_rule'