From bc2f975d6ccdff0c4ddbf57242bab905a572bacf Mon Sep 17 00:00:00 2001 From: Yushiro FURUKAWA Date: Wed, 18 Jan 2017 01:01:28 +0900 Subject: [PATCH] FWaaS - Adds an argument into find_resource This commit adds an additional argument 'cmd_resource' for find_resource in order to call client method correctly and fixes typo. Change-Id: Ib079f9fda9d7c190ef9a2f152b5a5af1853810a6 Closes-Bug: #1657180 --- neutronclient/osc/v2/fwaas/constants.py | 9 +- neutronclient/osc/v2/fwaas/firewallgroup.py | 26 ++-- neutronclient/osc/v2/fwaas/firewallpolicy.py | 40 ++--- neutronclient/osc/v2/fwaas/firewallrule.py | 12 +- .../tests/unit/osc/v2/fwaas/common.py | 41 ++++- .../unit/osc/v2/fwaas/test_firewallgroup.py | 143 +++++++++++++++--- .../unit/osc/v2/fwaas/test_firewallpolicy.py | 115 +++++++++++++- .../unit/osc/v2/fwaas/test_firewallrule.py | 13 +- 8 files changed, 331 insertions(+), 68 deletions(-) diff --git a/neutronclient/osc/v2/fwaas/constants.py b/neutronclient/osc/v2/fwaas/constants.py index 29f3ce09f..4c8ee6a2d 100644 --- a/neutronclient/osc/v2/fwaas/constants.py +++ b/neutronclient/osc/v2/fwaas/constants.py @@ -1,4 +1,4 @@ -# Copyright 2016 FUJITSU LIMITED +# Copyright 2016-2017 FUJITSU LIMITED # All Rights Reserved # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -20,7 +20,6 @@ FWP = 'firewall_policy' FWPS = 'firewall_policies' FWR = 'firewall_rule' FWRS = 'firewall_rules' - -CMD_RESOURCE_MAP = dict( - (res, 'fwaas_' + res) for res in [FWG, FWGS, FWP, FWPS, FWR, FWRS] -) +CMD_FWG = 'fwaas_' + FWG +CMD_FWP = 'fwaas_' + FWP +CMD_FWR = 'fwaas_' + FWR diff --git a/neutronclient/osc/v2/fwaas/firewallgroup.py b/neutronclient/osc/v2/fwaas/firewallgroup.py index 3aa569f8f..b5b388879 100644 --- a/neutronclient/osc/v2/fwaas/firewallgroup.py +++ b/neutronclient/osc/v2/fwaas/firewallgroup.py @@ -1,4 +1,4 @@ -# Copyright 2016 FUJITSU LIMITED +# Copyright 2016-2017 FUJITSU LIMITED # All Rights Reserved # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -114,22 +114,22 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True): parsed_args.no_ingress_firewall_policy): attrs['ingress_firewall_policy_id'] = client.find_resource( const.FWP, parsed_args.ingress_firewall_policy, - cmd_resource='fwaas_' + const.FWP)['id'] + cmd_resource=const.CMD_FWP)['id'] elif parsed_args.ingress_firewall_policy: attrs['ingress_firewall_policy_id'] = client.find_resource( const.FWP, parsed_args.ingress_firewall_policy, - cmd_resource='fwaas_' + const.FWP)['id'] + cmd_resource=const.CMD_FWP)['id'] elif parsed_args.no_ingress_firewall_policy: attrs['ingress_firewall_policy_id'] = None if (parsed_args.egress_firewall_policy and parsed_args.no_egress_firewall_policy): attrs['egress_firewall_policy_id'] = client.find_resource( const.FWP, parsed_args.egress_firewall_policy, - cmd_resource='fwaas_' + const.FWP)['id'] + cmd_resource=const.CMD_FWP)['id'] elif parsed_args.egress_firewall_policy: attrs['egress_firewall_policy_id'] = client.find_resource( const.FWP, parsed_args.egress_firewall_policy, - cmd_resource='fwaas_' + const.FWP)['id'] + cmd_resource=const.CMD_FWP)['id'] elif parsed_args.no_egress_firewall_policy: attrs['egress_firewall_policy_id'] = None if parsed_args.public: @@ -153,7 +153,8 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True): ports.append(client.find_resource('port', p)['id']) if not is_create: ports += client.find_resource( - const.FWG, parsed_args.firewall_group)['ports'] + const.FWG, parsed_args.firewall_group, + cmd_resource=const.CMD_FWG)['ports'] attrs['ports'] = sorted(set(ports)) elif parsed_args.no_port: attrs['ports'] = [] @@ -209,7 +210,7 @@ class DeleteFirewallGroup(command.Command): for fwg in parsed_args.firewall_group: try: fwg_id = client.find_resource( - const.FWG, fwg, cmd_resource='fwaas_' + const.FWG)['id'] + const.FWG, fwg, cmd_resource=const.CMD_FWG)['id'] client.delete_fwaas_firewall_group(fwg_id) except Exception as e: result += 1 @@ -218,7 +219,7 @@ class DeleteFirewallGroup(command.Command): const.FWG: fwg, 'e': e}) if result > 0: - total = len(parsed_args.firewall_groups) + total = len(parsed_args.firewall_group) msg = (_("%(result)s of %(total)s firewall group(s) " "failed to delete.") % {'result': result, 'total': total}) raise exceptions.CommandError(msg) @@ -271,7 +272,7 @@ class SetFirewallGroup(command.Command): def take_action(self, parsed_args): client = self.app.client_manager.neutronclient fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group, - cmd_resource='fwaas_' + const.FWG)['id'] + cmd_resource=const.CMD_FWG)['id'] attrs = _get_common_attrs(self.app.client_manager, parsed_args, is_create=False) try: @@ -296,7 +297,7 @@ class ShowFirewallGroup(command.ShowOne): def take_action(self, parsed_args): client = self.app.client_manager.neutronclient fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group, - cmd_resource='fwaas_' + const.FWG)['id'] + cmd_resource=const.CMD_FWG)['id'] obj = client.show_fwaas_firewall_group(fwg_id)[const.FWG] columns, display_columns = osc_utils.get_columns(obj, _attr_map) data = utils.get_dict_properties(obj, columns, formatters=_formatters) @@ -356,7 +357,8 @@ class UnsetFirewallGroup(command.Command): attrs['admin_state_up'] = False if parsed_args.port: old = client.find_resource( - const.FWG, parsed_args.firewall_group)['ports'] + const.FWG, parsed_args.firewall_group, + cmd_resource=const.CMD_FWG)['ports'] new = [client.find_resource( 'port', r)['id'] for r in parsed_args.port] attrs['ports'] = sorted(list(set(old) - set(new))) @@ -367,7 +369,7 @@ class UnsetFirewallGroup(command.Command): def take_action(self, parsed_args): client = self.app.client_manager.neutronclient fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group, - cmd_resource='fwaas_' + const.FWG)['id'] + cmd_resource=const.CMD_FWG)['id'] attrs = self._get_attrs(self.app.client_manager, parsed_args) try: client.update_fwaas_firewall_group(fwg_id, {const.FWG: attrs}) diff --git a/neutronclient/osc/v2/fwaas/firewallpolicy.py b/neutronclient/osc/v2/fwaas/firewallpolicy.py index 9a5a0d716..5ac097145 100644 --- a/neutronclient/osc/v2/fwaas/firewallpolicy.py +++ b/neutronclient/osc/v2/fwaas/firewallpolicy.py @@ -1,4 +1,4 @@ -# Copyright 2016 FUJITSU LIMITED +# Copyright 2016-2017 FUJITSU LIMITED # All Rights Reserved # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -56,16 +56,17 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True): _firewall_rules = [] for f in set(parsed_args.firewall_rule): _firewall_rules.append(client.find_resource( - const.FWR, f, cmd_resource='fwaas_' + const.FWR)['id']) + const.FWR, f, cmd_resource=const.CMD_FWR)['id']) attrs[const.FWRS] = sorted(_firewall_rules) elif parsed_args.firewall_rule: rules = [] for f in set(parsed_args.firewall_rule): rules.append(client.find_resource( - const.FWR, f, cmd_resource='fwaas_' + const.FWR)['id']) + const.FWR, f, cmd_resource=const.CMD_FWR)['id']) if not is_create: rules += client.find_resource( - const.FWP, parsed_args.firewall_policy)[const.FWRS] + const.FWP, parsed_args.firewall_policy, + cmd_resource=const.CMD_FWP)[const.FWRS] attrs[const.FWRS] = sorted(set(rules)) elif parsed_args.no_firewall_rule: attrs[const.FWRS] = [] @@ -173,7 +174,7 @@ class DeleteFirewallPolicy(command.Command): if result > 0: total = len(parsed_args.firewall_policy) - msg = (_("%(result)s of %(total)s Firewall policy(s) " + msg = (_("%(result)s of %(total)s firewall policy(s) " "failed to delete.") % {'result': result, 'total': total}) raise exceptions.CommandError(msg) @@ -211,23 +212,23 @@ class FirewallPolicyInsertRule(command.Command): if parsed_args.insert_before: _insert_before = client.find_resource( const.FWR, parsed_args.insert_before, - cmd_resource='fwaas_' + const.FWR)['id'] + cmd_resource=const.CMD_FWR)['id'] _insert_after = '' if 'insert_after' in parsed_args: if parsed_args.insert_after: _insert_after = client.find_resource( const.FWR, parsed_args.insert_after, - cmd_resource='fwaas_' + const.FWR)['id'] + cmd_resource=const.CMD_FWR)['id'] return {'firewall_rule_id': _rule_id, 'insert_before': _insert_before, 'insert_after': _insert_after} def take_action(self, parsed_args): client = self.app.client_manager.neutronclient - body = self.args2body(parsed_args) policy_id = client.find_resource( const.FWP, parsed_args.firewall_policy, - cmd_resource='fwaas_' + const.FWP)['id'] + cmd_resource=const.CMD_FWP)['id'] + body = self.args2body(parsed_args) client.insert_rule_fwaas_firewall_policy(policy_id, body) rule_id = body['firewall_rule_id'] policy = parsed_args.firewall_policy @@ -253,11 +254,11 @@ class FirewallPolicyRemoveRule(command.Command): def take_action(self, parsed_args): client = self.app.client_manager.neutronclient - fwr_id = _get_required_firewall_rule(client, parsed_args) - body = {'firewall_rule_id': fwr_id} policy_id = client.find_resource( const.FWP, parsed_args.firewall_policy, - cmd_resource='fwaas_' + const.FWP)['id'] + cmd_resource=const.CMD_FWP)['id'] + fwr_id = _get_required_firewall_rule(client, parsed_args) + body = {'firewall_rule_id': fwr_id} client.remove_rule_fwaas_firewall_policy(policy_id, body) rule_id = body['firewall_rule_id'] policy = parsed_args.firewall_policy @@ -317,7 +318,7 @@ class SetFirewallPolicy(command.Command): client = self.app.client_manager.neutronclient fwp_id = client.find_resource( const.FWP, parsed_args.firewall_policy, - cmd_resource='fwaas_' + const.FWP)['id'] + cmd_resource=const.CMD_FWP)['id'] attrs = _get_common_attrs(self.app.client_manager, parsed_args, is_create=False) try: @@ -343,7 +344,7 @@ class ShowFirewallPolicy(command.ShowOne): client = self.app.client_manager.neutronclient fwp_id = client.find_resource(const.FWP, parsed_args.firewall_policy, - cmd_resource='fwaas_' + const.FWP)['id'] + cmd_resource=const.CMD_FWP)['id'] obj = client.show_fwaas_firewall_policy(fwp_id)[const.FWP] columns, display_columns = osc_utils.get_columns(obj, _attr_map) data = utils.get_dict_properties(obj, columns, formatters=_formatters) @@ -354,8 +355,8 @@ def _get_required_firewall_rule(client, parsed_args): if not parsed_args.firewall_rule: msg = (_("Firewall rule (name or ID) is required.")) raise exceptions.CommandError(msg) - return client.find_resource(const.FWR, parsed_args.firewall_rule, - cmd_resource='fwaas_' + const.FWR)['id'] + return client.find_resource( + const.FWR, parsed_args.firewall_rule, cmd_resource=const.CMD_FWR)['id'] class UnsetFirewallPolicy(command.Command): @@ -395,11 +396,12 @@ class UnsetFirewallPolicy(command.Command): if parsed_args.firewall_rule: old = client.find_resource( - const.FWP, parsed_args.firewall_policy)[const.FWRS] + const.FWP, parsed_args.firewall_policy, + cmd_resource=const.CMD_FWP)[const.FWRS] new = [] for f in set(parsed_args.firewall_rule): new.append(client.find_resource( - const.FWR, f, cmd_resource='fwaas_' + const.FWR)['id']) + const.FWR, f, cmd_resource=const.CMD_FWR)['id']) attrs[const.FWRS] = sorted(list(set(old) - set(new))) if parsed_args.all_firewall_rule: attrs[const.FWRS] = [] @@ -413,7 +415,7 @@ class UnsetFirewallPolicy(command.Command): client = self.app.client_manager.neutronclient fwp_id = client.find_resource( const.FWP, parsed_args.firewall_policy, - cmd_resource='fwaas_' + const.FWP)['id'] + cmd_resource=const.CMD_FWP)['id'] attrs = self._get_attrs(self.app.client_manager, parsed_args) try: client.update_fwaas_firewall_policy(fwp_id, {const.FWP: attrs}) diff --git a/neutronclient/osc/v2/fwaas/firewallrule.py b/neutronclient/osc/v2/fwaas/firewallrule.py index a7d9051f3..4b10e3bc5 100644 --- a/neutronclient/osc/v2/fwaas/firewallrule.py +++ b/neutronclient/osc/v2/fwaas/firewallrule.py @@ -1,4 +1,4 @@ -# Copyright 2016 FUJITSU LIMITED +# Copyright 2016-2017 FUJITSU LIMITED # All Rights Reserved # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -223,7 +223,7 @@ class DeleteFirewallRule(command.Command): for fwr in parsed_args.firewall_rule: try: fwr_id = client.find_resource( - const.FWR, fwr, cmd_resource='fwaas_' + const.FWR)['id'] + const.FWR, fwr, cmd_resource=const.CMD_FWR)['id'] client.delete_fwaas_firewall_rule(fwr_id) except Exception as e: result += 1 @@ -233,7 +233,7 @@ class DeleteFirewallRule(command.Command): if result > 0: total = len(parsed_args.firewall_rule) - msg = (_("%(result)s of %(total)s Firewall rule(s) failed " + msg = (_("%(result)s of %(total)s firewall rule(s) failed " "to delete.") % {'result': result, 'total': total}) raise exceptions.CommandError(msg) @@ -301,7 +301,7 @@ class SetFirewallRule(command.Command): parsed_args, is_create=False) fwr_id = client.find_resource( const.FWR, parsed_args.firewall_rule, - cmd_resource='fwaas_' + const.FWR)['id'] + cmd_resource=const.CMD_FWR)['id'] try: client.update_fwaas_firewall_rule(fwr_id, {const.FWR: attrs}) except Exception as e: @@ -325,7 +325,7 @@ class ShowFirewallRule(command.ShowOne): client = self.app.client_manager.neutronclient fwr_id = client.find_resource( const.FWR, parsed_args.firewall_rule, - cmd_resource='fwaas_' + const.FWR)['id'] + cmd_resource=const.CMD_FWR)['id'] obj = client.show_fwaas_firewall_rule(fwr_id)[const.FWR] columns, display_columns = osc_utils.get_columns(obj, _attr_map) data = utils.get_dict_properties(obj, columns, formatters=_formatters) @@ -390,7 +390,7 @@ class UnsetFirewallRule(command.Command): attrs = self._get_attrs(self.app.client_manager, parsed_args) fwr_id = client.find_resource( const.FWR, parsed_args.firewall_rule, - cmd_resource='fwaas_' + const.FWR)['id'] + cmd_resource=const.CMD_FWR)['id'] try: client.update_fwaas_firewall_rule(fwr_id, {const.FWR: attrs}) except Exception as e: diff --git a/neutronclient/tests/unit/osc/v2/fwaas/common.py b/neutronclient/tests/unit/osc/v2/fwaas/common.py index 915988488..b9ea7841b 100644 --- a/neutronclient/tests/unit/osc/v2/fwaas/common.py +++ b/neutronclient/tests/unit/osc/v2/fwaas/common.py @@ -1,4 +1,4 @@ -# Copyright 2016 FUJITSU LIMITED +# Copyright 2016-2017 FUJITSU LIMITED # All Rights Reserved # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -14,6 +14,9 @@ # under the License. # +import testtools + +from osc_lib import exceptions from osc_lib.tests import utils from neutronclient.tests.unit.osc.v2 import fakes as test_fakes @@ -46,6 +49,17 @@ class TestShowFWaaS(test_fakes.TestNeutronClientOSCV2): def test_show_filtered_by_id_or_name(self): target = self.resource['id'] + + def _mock_fwaas(*args, **kwargs): + # Find specified ingress_firewall_policy + if self.neutronclient.find_resource.call_count == 1: + self.assertEqual(self.res, args[0]) + self.assertEqual(self.resource['id'], args[1]) + self.assertEqual({'cmd_resource': 'fwaas_' + self.res}, kwargs) + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = _mock_fwaas + arglist = [target] verifylist = [(self.res, target)] parsed_args = self.check_parser(self.cmd, arglist, verifylist) @@ -210,10 +224,20 @@ class TestDeleteFWaaS(test_fakes.TestNeutronClientOSCV2): self.assertIsNone(result) def test_delete_with_multiple_resources(self): + + def _mock_fwaas(*args, **kwargs): + self.assertEqual(self.res, args[0]) + self.assertIsNotNone(args[1]) + self.assertEqual({'cmd_resource': 'fwaas_' + self.res}, kwargs) + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = _mock_fwaas + target1 = 'target1' target2 = 'target2' arglist = [target1, target2] verifylist = [(self.res, [target1, target2])] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) self.assertIsNone(result) @@ -223,6 +247,21 @@ class TestDeleteFWaaS(test_fakes.TestNeutronClientOSCV2): actual = ''.join(self.mocked.call_args_list[idx][0]) self.assertEqual(reference, actual) + def test_delete_multiple_with_exception(self): + target1 = 'target' + arglist = [target1] + verifylist = [(self.res, [target1])] + + self.neutronclient.find_resource.side_effect = [ + target1, exceptions.CommandError + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + resource_name = self.res.replace('_', ' ') + msg = "1 of 2 %s(s) failed to delete." % resource_name + with testtools.ExpectedException(exceptions.CommandError) as e: + self.cmd.take_action(parsed_args) + self.assertEqual(msg, str(e)) + class TestUnsetFWaaS(test_fakes.TestNeutronClientOSCV2): diff --git a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallgroup.py b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallgroup.py index 44bcc708f..cc074ff2f 100644 --- a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallgroup.py +++ b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallgroup.py @@ -22,6 +22,7 @@ from osc_lib import exceptions from osc_lib.tests import utils from neutronclient.osc import utils as osc_utils +from neutronclient.osc.v2.fwaas import constants as const from neutronclient.osc.v2.fwaas import firewallgroup from neutronclient.tests.unit.osc.v2 import fakes as test_fakes from neutronclient.tests.unit.osc.v2.fwaas import common @@ -85,21 +86,11 @@ class TestFirewallGroup(test_fakes.TestNeutronClientOSCV2): def setUp(self): super(TestFirewallGroup, self).setUp() - def side_effect_for_find_resource(*args, **kwargs): - port_id = args[1] - ports = _fwg['ports'] - if self.res in args[0]: - ports = _fwg['ports'] - return {'id': port_id, 'ports': ports} + def _find_resource(*args, **kwargs): + return {'id': args[1], 'ports': _fwg['ports']} - def side_effect_for_list_ports(*args): - port_name = 'id_for_port' - return {'ports': [{'id': port_name}]} - - self.neutronclient.find_resource.side_effect = mock.Mock( - side_effect=side_effect_for_find_resource) - self.neutronclient.list_ports = mock.Mock( - side_effect=side_effect_for_list_ports) + self.neutronclient.find_resource = mock.Mock( + side_effect=_find_resource) osc_utils.find_project = mock.Mock() osc_utils.find_project.id = _fwg['tenant_id'] self.res = 'firewall_group' @@ -204,6 +195,45 @@ class TestCreateFirewallGroup(TestFirewallGroup, common.TestCreateFWaaS): self.check_results(headers, data, request) + def test_create_with_ingress_policy(self): + ingress_policy = 'my-ingress-policy' + + def _mock_port_fwg(*args, **kwargs): + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = _mock_port_fwg + + arglist = ['--ingress-firewall-policy', ingress_policy] + verifylist = [('ingress_firewall_policy', ingress_policy)] + request, response = _generate_req_and_res(verifylist) + self._update_expect_response(request, response) + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + self.neutronclient.find_resource.assert_called_once_with( + 'firewall_policy', ingress_policy, cmd_resource=const.CMD_FWP) + + self.check_results(headers, data, request) + + def test_create_with_egress_policy(self): + egress_policy = 'my-egress-policy' + + def _mock_port_fwg(*args, **kwargs): + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = _mock_port_fwg + + arglist = ['--egress-firewall-policy', egress_policy] + verifylist = [('egress_firewall_policy', egress_policy)] + request, response = _generate_req_and_res(verifylist) + self._update_expect_response(request, response) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + + self.neutronclient.find_resource.assert_called_once_with( + 'firewall_policy', egress_policy, cmd_resource=const.CMD_FWP) + self.check_results(headers, data, request) + def test_create_with_all_params(self): name = 'my-name' description = 'my-desc' @@ -342,26 +372,77 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): response[column] for column in self.ordered_columns ) - def test_set_ingress_policy(self): - # firewall_group-update myid --policy newpolicy. + def test_set_ingress_policy_and_egress_policy(self): target = self.resource['id'] - policy = 'ingress_policy' - arglist = [target, '--ingress-firewall-policy', policy] + ingress_policy = 'ingress_policy' + egress_policy = 'egress_policy' + + def _mock_fwg_policy(*args, **kwargs): + # 1. Find specified firewall_group + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource=const.CMD_FWG) + # 2. Find specified 'ingress_firewall_policy' + if self.neutronclient.find_resource.call_count == 2: + self.neutronclient.find_resource.assert_called_with( + 'firewall_policy', ingress_policy, + cmd_resource=const.CMD_FWP) + # 3. Find specified 'ingress_firewall_policy' + if self.neutronclient.find_resource.call_count == 3: + self.neutronclient.find_resource.assert_called_with( + 'firewall_policy', egress_policy, + cmd_resource=const.CMD_FWP) + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = _mock_fwg_policy + + arglist = [ + target, + '--ingress-firewall-policy', ingress_policy, + '--egress-firewall-policy', egress_policy, + ] verifylist = [ (self.res, target), - ('ingress_firewall_policy', policy), + ('ingress_firewall_policy', ingress_policy), + ('egress_firewall_policy', egress_policy), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'ingress_firewall_policy_id': policy}}) + target, {self.res: {'ingress_firewall_policy_id': ingress_policy, + 'egress_firewall_policy_id': egress_policy}}) self.assertIsNone(result) def test_set_port(self): target = self.resource['id'] port1 = 'additional_port1' port2 = 'additional_port2' + + def _mock_port_fwg(*args, **kwargs): + # 1. Find specified firewall_group + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource=const.CMD_FWG) + return {'id': args[1]} + # 2. Find specified 'port' #1 + if self.neutronclient.find_resource.call_count == 2: + self.neutronclient.find_resource.assert_called_with( + 'port', args[1]) + return {'id': args[1]} + # 3. Find specified 'port' #2 + if self.neutronclient.find_resource.call_count == 3: + self.neutronclient.find_resource.assert_called_with( + 'port', args[1]) + return {'id': args[1]} + # 4. Find specified firewall_group and refer 'ports' attribute + if self.neutronclient.find_resource.call_count == 4: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource=const.CMD_FWG) + return {'ports': _fwg['ports']} + + self.neutronclient.find_resource.side_effect = _mock_port_fwg + arglist = [ target, '--port', port1, @@ -376,6 +457,7 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): expect = {'ports': sorted(_fwg['ports'] + [port1, port2])} self.mocked.assert_called_once_with(target, {self.res: expect}) + self.assertEqual(4, self.neutronclient.find_resource.call_count) self.assertIsNone(result) def test_set_no_port(self): @@ -584,6 +666,27 @@ class TestUnsetFirewallGroup(TestFirewallGroup, common.TestUnsetFWaaS): def test_unset_port(self): target = self.resource['id'] port = 'old_port' + + def _mock_port_fwg(*args, **kwargs): + # 1. Find specified firewall_group + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource=const.CMD_FWG) + return {'id': args[1]} + # 2. Find specified firewall_group and refer 'ports' attribute + if self.neutronclient.find_resource.call_count == 2: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource=const.CMD_FWG) + return {'ports': _fwg['ports']} + # 3. Find specified 'port' + if self.neutronclient.find_resource.call_count == 3: + self.neutronclient.find_resource.assert_called_with( + 'port', port) + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = mock.Mock( + side_effect=_mock_port_fwg) + arglist = [ target, '--port', port, diff --git a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallpolicy.py b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallpolicy.py index fff4e912f..49e43212b 100644 --- a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallpolicy.py +++ b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallpolicy.py @@ -22,6 +22,7 @@ from osc_lib import exceptions from osc_lib.tests import utils from neutronclient.osc import utils as osc_utils +from neutronclient.osc.v2.fwaas import constants as const from neutronclient.osc.v2.fwaas import firewallpolicy from neutronclient.tests.unit.osc.v2 import fakes as test_fakes from neutronclient.tests.unit.osc.v2.fwaas import common @@ -90,7 +91,6 @@ class TestFirewallPolicy(test_fakes.TestNeutronClientOSCV2): self.neutronclient.find_resource = mock.Mock( side_effect=_find_resource) - # fw_common.get_id.side_effect = lambda x, y, z: z osc_utils.find_project = mock.Mock() osc_utils.find_project.id = _fwp['tenant_id'] self.res = 'firewall_policy' @@ -193,6 +193,35 @@ class TestCreateFirewallPolicy(TestFirewallPolicy, common.TestCreateFWaaS): self.check_results(headers, data, request) + def test_create_with_rules(self): + name = 'my-fwg' + rule1 = 'rule1' + rule2 = 'rule2' + + def _mock_policy(*args, **kwargs): + self.neutronclient.find_resource.assert_called_with( + 'firewall_rule', args[1], cmd_resource=const.CMD_FWR) + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = _mock_policy + + arglist = [ + name, + '--firewall-rule', rule1, + '--firewall-rule', rule2, + ] + verifylist = [ + ('name', name), + ('firewall_rule', [rule1, rule2]), + ] + request, response = _generate_req_and_res(verifylist) + self._update_expect_response(request, response) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + self.assertEqual(2, self.neutronclient.find_resource.call_count) + + self.check_results(headers, data, request) + def test_create_with_all_params(self): name = 'my-fwp' desc = 'my-desc' @@ -308,6 +337,29 @@ class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS): target = self.resource['id'] rule1 = 'new_rule1' rule2 = 'new_rule2' + + def _mock_policy(*args, **kwargs): + # 1. Find specified firewall_policy + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource=const.CMD_FWP) + # 2. Find specified firewall_rule + if self.neutronclient.find_resource.call_count == 2: + self.neutronclient.find_resource.assert_called_with( + 'firewall_rule', args[1], cmd_resource=const.CMD_FWR) + # 3. Find specified firewall_rule + if self.neutronclient.find_resource.call_count == 3: + self.neutronclient.find_resource.assert_called_with( + 'firewall_rule', args[1], cmd_resource=const.CMD_FWR) + # 4. Find specified firewall_policy's 'firewall_rules' attribute + if self.neutronclient.find_resource.call_count == 4: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource=const.CMD_FWP) + return {'firewall_rules': _fwp['firewall_rules']} + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = _mock_policy + arglist = [ target, '--firewall-rule', rule1, @@ -323,6 +375,7 @@ class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS): expect = sorted(set(_fwp['firewall_rules'] + [rule1, rule2])) body = {self.res: {'firewall_rules': expect}} self.mocked.assert_called_once_with(target, body) + self.assertEqual(4, self.neutronclient.find_resource.call_count) self.assertIsNone(result) def test_set_no_rules(self): @@ -357,6 +410,7 @@ class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS): body = {self.res: {'firewall_rules': [rule1]}} self.mocked.assert_called_once_with(target, body) + self.assertEqual(2, self.neutronclient.find_resource.call_count) self.assertIsNone(result) def test_set_audited(self): @@ -441,6 +495,28 @@ class TestFirewallPolicyInsertRule(TestFirewallPolicy): rule = 'new-rule' before = 'before' after = 'after' + + def _mock_policy(*args, **kwargs): + # 1. Find specified firewall_policy + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource=const.CMD_FWP) + # 2. Find specified firewall_rule + if self.neutronclient.find_resource.call_count == 2: + self.neutronclient.find_resource.assert_called_with( + 'firewall_rule', args[1], cmd_resource=const.CMD_FWR) + # 3. Find specified firewall_rule as 'before' + if self.neutronclient.find_resource.call_count == 3: + self.neutronclient.find_resource.assert_called_with( + 'firewall_rule', args[1], cmd_resource=const.CMD_FWR) + # 4. Find specified firewall_rule as 'after' + if self.neutronclient.find_resource.call_count == 4: + self.neutronclient.find_resource.assert_called_with( + 'firewall_rule', args[1], cmd_resource=const.CMD_FWR) + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = _mock_policy + arglist = [ target, rule, @@ -463,6 +539,7 @@ class TestFirewallPolicyInsertRule(TestFirewallPolicy): 'insert_after': after }) self.assertIsNone(result) + self.assertEqual(4, self.neutronclient.find_resource.call_count) def test_insert_with_no_firewall_rule(self): target = self.resource['id'] @@ -490,6 +567,21 @@ class TestFirewallPolicyRemoveRule(TestFirewallPolicy): def test_remove_firewall_rule(self): target = self.resource['id'] rule = 'remove-rule' + + def _mock_policy(*args, **kwargs): + # 1. Find specified firewall_policy + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource=const.CMD_FWP) + # 2. Find specified firewall_rule + if self.neutronclient.find_resource.call_count == 2: + self.neutronclient.find_resource.assert_called_with( + 'firewall_rule', rule, cmd_resource=const.CMD_FWR) + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = mock.Mock( + side_effect=_mock_policy) + arglist = [ target, rule, @@ -503,6 +595,7 @@ class TestFirewallPolicyRemoveRule(TestFirewallPolicy): self.mocked.assert_called_once_with( target, {'firewall_rule_id': rule}) self.assertIsNone(result) + self.assertEqual(2, self.neutronclient.find_resource.call_count) def test_remove_with_no_firewall_rule(self): target = self.resource['id'] @@ -566,6 +659,25 @@ class TestUnsetFirewallPolicy(TestFirewallPolicy, common.TestUnsetFWaaS): _fwp['firewall_rules'] = ['rule1', 'rule2'] target = self.resource['id'] rule = 'rule1' + + def _mock_policy(*args, **kwargs): + # 1. Find specified firewall_policy + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource=const.CMD_FWP) + # 2. Find 'firewall_rules' attribute from specified firewall_policy + if self.neutronclient.find_resource.call_count == 2: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource=const.CMD_FWP) + return {'firewall_rules': _fwp['firewall_rules']} + # 3. Find specified 'firewall_rule' + if self.neutronclient.find_resource.call_count == 3: + self.neutronclient.find_resource.assert_called_with( + 'firewall_rule', rule, cmd_resource=const.CMD_FWR) + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = _mock_policy + arglist = [ target, '--firewall-rule', rule, @@ -580,6 +692,7 @@ class TestUnsetFirewallPolicy(TestFirewallPolicy, common.TestUnsetFWaaS): body = {self.res: {'firewall_rules': ['rule2']}} self.mocked.assert_called_once_with(target, body) self.assertIsNone(result) + self.assertEqual(3, self.neutronclient.find_resource.call_count) def test_unset_all_firewall_rule(self): target = self.resource['id'] diff --git a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallrule.py b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallrule.py index dd730ee49..be1b3b171 100644 --- a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallrule.py +++ b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallrule.py @@ -23,6 +23,7 @@ from osc_lib import exceptions from osc_lib.tests import utils from neutronclient.osc import utils as osc_utils +from neutronclient.osc.v2.fwaas import constants as const from neutronclient.osc.v2.fwaas import firewallrule from neutronclient.tests.unit.osc.v2 import fakes as test_fakes from neutronclient.tests.unit.osc.v2.fwaas import common @@ -76,10 +77,14 @@ class TestFirewallRule(test_fakes.TestNeutronClientOSCV2): def setUp(self): super(TestFirewallRule, self).setUp() - self.neutronclient.find_resource = mock.Mock() - self.neutronclient.find_resource.side_effect = \ - lambda x, y, **k: {'id': y} - # fw_common.get_id.side_effect = lambda x, y, z: z + + def _mock_fwr(*args, **kwargs): + self.neutronclient.find_resource.assert_called_once_with( + self.res, self.resource['id'], cmd_resource=const.CMD_FWR) + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = mock.Mock( + side_effect=_mock_fwr) osc_utils.find_project = mock.Mock() osc_utils.find_project.id = _fwr['tenant_id'] self.res = 'firewall_rule'