diff --git a/neutronclient/osc/v2/fwaas/constants.py b/neutronclient/osc/v2/fwaas/constants.py new file mode 100644 index 0000000..29f3ce0 --- /dev/null +++ b/neutronclient/osc/v2/fwaas/constants.py @@ -0,0 +1,26 @@ +# Copyright 2016 FUJITSU LIMITED +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +FWG = 'firewall_group' +FWGS = 'firewall_groups' +FWP = 'firewall_policy' +FWPS = 'firewall_policies' +FWR = 'firewall_rule' +FWRS = 'firewall_rules' + +CMD_RESOURCE_MAP = dict( + (res, 'fwaas_' + res) for res in [FWG, FWGS, FWP, FWPS, FWR, FWRS] +) diff --git a/neutronclient/osc/v2/fwaas/firewallgroup.py b/neutronclient/osc/v2/fwaas/firewallgroup.py new file mode 100644 index 0000000..3aa569f --- /dev/null +++ b/neutronclient/osc/v2/fwaas/firewallgroup.py @@ -0,0 +1,377 @@ +# Copyright 2016 FUJITSU LIMITED +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from osc_lib.command import command +from osc_lib import exceptions +from osc_lib import utils +from oslo_log import log as logging + +from neutronclient._i18n import _ +from neutronclient.osc import utils as osc_utils +from neutronclient.osc.v2.fwaas import constants as const +from neutronclient.osc.v2 import utils as v2_utils + + +LOG = logging.getLogger(__name__) + +_formatters = { + 'admin_state_up': v2_utils.format_admin_state, +} + +_attr_map = ( + ('id', 'ID', osc_utils.LIST_BOTH), + ('name', 'Name', osc_utils.LIST_BOTH), + ('ingress_firewall_policy_id', 'Ingress Policy ID', osc_utils.LIST_BOTH), + ('egress_firewall_policy_id', 'Egress Policy ID', osc_utils.LIST_BOTH), + ('description', 'Description', osc_utils.LIST_LONG_ONLY), + ('status', 'Status', osc_utils.LIST_LONG_ONLY), + ('ports', 'Ports', osc_utils.LIST_LONG_ONLY), + ('admin_state_up', 'State', osc_utils.LIST_LONG_ONLY), + ('public', 'Public', osc_utils.LIST_LONG_ONLY), + ('tenant_id', 'Project', osc_utils.LIST_LONG_ONLY), +) + + +def _get_common_parser(parser): + parser.add_argument( + '--name', + help=_('Name for the firewall group')) + parser.add_argument( + '--description', + metavar='', + help=_('Description of the firewall group')) + ingress_group = parser.add_mutually_exclusive_group() + ingress_group.add_argument( + '--ingress-firewall-policy', + metavar='', + dest='ingress_firewall_policy', + help=_('Ingress firewall policy (name or ID)')) + ingress_group.add_argument( + '--no-ingress-firewall-policy', + dest='no_ingress_firewall_policy', + action='store_true', + help=_('Detach ingress firewall policy from the firewall group')) + egress_group = parser.add_mutually_exclusive_group() + egress_group.add_argument( + '--egress-firewall-policy', + metavar='', + dest='egress_firewall_policy', + help=_('Egress firewall policy (name or ID)')) + egress_group.add_argument( + '--no-egress-firewall-policy', + dest='no_egress_firewall_policy', + action='store_true', + help=_('Detach egress firewall policy from the firewall group')) + public_group = parser.add_mutually_exclusive_group() + public_group.add_argument( + '--public', + action='store_true', + help=_('Make the firewall group public, which allows it to be ' + 'used in all projects (as opposed to the default, ' + 'which is to restrict its use to the current project)')) + public_group.add_argument( + '--private', + action='store_true', + help=_('Restrict use of the firewall group to the ' + 'current project')) + admin_group = parser.add_mutually_exclusive_group() + admin_group.add_argument( + '--enable', + action='store_true', + help=_('Enable firewall group')) + admin_group.add_argument( + '--disable', + action='store_true', + help=_('Disable firewall group')) + return parser + + +def _get_common_attrs(client_manager, parsed_args, is_create=True): + attrs = {} + client = client_manager.neutronclient + + if is_create: + if 'project' in parsed_args and parsed_args.project is not None: + attrs['tenant_id'] = osc_utils.find_project( + client_manager.identity, + parsed_args.project, + parsed_args.project_domain, + ).id + if (parsed_args.ingress_firewall_policy and + parsed_args.no_ingress_firewall_policy): + attrs['ingress_firewall_policy_id'] = client.find_resource( + const.FWP, parsed_args.ingress_firewall_policy, + cmd_resource='fwaas_' + const.FWP)['id'] + elif parsed_args.ingress_firewall_policy: + attrs['ingress_firewall_policy_id'] = client.find_resource( + const.FWP, parsed_args.ingress_firewall_policy, + cmd_resource='fwaas_' + const.FWP)['id'] + elif parsed_args.no_ingress_firewall_policy: + attrs['ingress_firewall_policy_id'] = None + if (parsed_args.egress_firewall_policy and + parsed_args.no_egress_firewall_policy): + attrs['egress_firewall_policy_id'] = client.find_resource( + const.FWP, parsed_args.egress_firewall_policy, + cmd_resource='fwaas_' + const.FWP)['id'] + elif parsed_args.egress_firewall_policy: + attrs['egress_firewall_policy_id'] = client.find_resource( + const.FWP, parsed_args.egress_firewall_policy, + cmd_resource='fwaas_' + const.FWP)['id'] + elif parsed_args.no_egress_firewall_policy: + attrs['egress_firewall_policy_id'] = None + if parsed_args.public: + attrs['public'] = True + if parsed_args.private: + attrs['public'] = False + if parsed_args.enable: + attrs['admin_state_up'] = True + if parsed_args.disable: + attrs['admin_state_up'] = False + if parsed_args.name: + attrs['name'] = str(parsed_args.name) + if parsed_args.description: + attrs['description'] = str(parsed_args.description) + if parsed_args.port and parsed_args.no_port: + attrs['ports'] = sorted([client.find_resource( + 'port', p)['id'] for p in set(parsed_args.port)]) + elif parsed_args.port: + ports = [] + for p in set(parsed_args.port): + ports.append(client.find_resource('port', p)['id']) + if not is_create: + ports += client.find_resource( + const.FWG, parsed_args.firewall_group)['ports'] + attrs['ports'] = sorted(set(ports)) + elif parsed_args.no_port: + attrs['ports'] = [] + return attrs + + +class CreateFirewallGroup(command.ShowOne): + _description = _("Create a new firewall group") + + def get_parser(self, prog_name): + parser = super(CreateFirewallGroup, self).get_parser(prog_name) + _get_common_parser(parser) + osc_utils.add_project_owner_option_to_parser(parser) + port_group = parser.add_mutually_exclusive_group() + port_group.add_argument( + '--port', + metavar='', + action='append', + help=_('Port(s) (name or ID) to apply firewall group. This ' + 'option can be repeated')) + port_group.add_argument( + '--no-port', + dest='no_port', + action='store_true', + help=_('Detach all port from the firewall group')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + attrs = _get_common_attrs(self.app.client_manager, parsed_args) + obj = client.create_fwaas_firewall_group( + {const.FWG: attrs})[const.FWG] + columns, display_columns = osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns, formatters=_formatters) + return (display_columns, data) + + +class DeleteFirewallGroup(command.Command): + _description = _("Delete firewall group(s)") + + def get_parser(self, prog_name): + parser = super(DeleteFirewallGroup, self).get_parser(prog_name) + parser.add_argument( + const.FWG, + metavar='', + nargs='+', + help=_('Firewall group(s) to delete (name or ID)')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + result = 0 + for fwg in parsed_args.firewall_group: + try: + fwg_id = client.find_resource( + const.FWG, fwg, cmd_resource='fwaas_' + const.FWG)['id'] + client.delete_fwaas_firewall_group(fwg_id) + except Exception as e: + result += 1 + LOG.error(_("Failed to delete firewall group with " + "name or ID '%(firewall_group)s': %(e)s") % { + const.FWG: fwg, 'e': e}) + + if result > 0: + total = len(parsed_args.firewall_groups) + msg = (_("%(result)s of %(total)s firewall group(s) " + "failed to delete.") % {'result': result, 'total': total}) + raise exceptions.CommandError(msg) + + +class ListFirewallGroup(command.Lister): + _description = _("List firewall groups") + + def get_parser(self, prog_name): + parser = super(ListFirewallGroup, self).get_parser(prog_name) + parser.add_argument( + '--long', + action='store_true', + help=_("List additional fields in output") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + obj = client.list_fwaas_firewall_groups()[const.FWGS] + headers, columns = osc_utils.get_column_definitions( + _attr_map, long_listing=parsed_args.long) + return (headers, (utils.get_dict_properties( + s, columns, formatters=_formatters) for s in obj)) + + +class SetFirewallGroup(command.Command): + _description = _("Set firewall group properties") + + def get_parser(self, prog_name): + parser = super(SetFirewallGroup, self).get_parser(prog_name) + _get_common_parser(parser) + parser.add_argument( + const.FWG, + metavar='', + help=_('Firewall group to update (name or ID)')) + parser.add_argument( + '--port', + metavar='', + action='append', + help=_('Port(s) (name or ID) to apply firewall group. This ' + 'option can be repeated')) + parser.add_argument( + '--no-port', + dest='no_port', + action='store_true', + help=_('Detach all port from the firewall group')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group, + cmd_resource='fwaas_' + const.FWG)['id'] + attrs = _get_common_attrs(self.app.client_manager, parsed_args, + is_create=False) + try: + client.update_fwaas_firewall_group(fwg_id, {const.FWG: attrs}) + except Exception as e: + msg = (_("Failed to set firewall group '%(group)s': %(e)s") + % {'group': parsed_args.firewall_group, 'e': e}) + raise exceptions.CommandError(msg) + + +class ShowFirewallGroup(command.ShowOne): + _description = _("Display firewall group details") + + def get_parser(self, prog_name): + parser = super(ShowFirewallGroup, self).get_parser(prog_name) + parser.add_argument( + const.FWG, + metavar='', + help=_('Firewall group to show (name or ID)')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group, + cmd_resource='fwaas_' + const.FWG)['id'] + obj = client.show_fwaas_firewall_group(fwg_id)[const.FWG] + columns, display_columns = osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns, formatters=_formatters) + return (display_columns, data) + + +class UnsetFirewallGroup(command.Command): + _description = _("Unset firewall group properties") + + def get_parser(self, prog_name): + parser = super(UnsetFirewallGroup, self).get_parser(prog_name) + parser.add_argument( + const.FWG, + metavar='', + help=_('Firewall group to unset (name or ID)')) + port_group = parser.add_mutually_exclusive_group() + port_group.add_argument( + '--port', + metavar='', + action='append', + help=_('Port(s) (name or ID) to apply firewall group. This ' + 'option can be repeated')) + port_group.add_argument( + '--all-port', + action='store_true', + help=_('Remove all ports for this firewall group')) + parser.add_argument( + '--ingress-firewall-policy', + action='store_true', + help=_('Ingress firewall policy (name or ID) to delete')) + parser.add_argument( + '--egress-firewall-policy', + action='store_true', + dest='egress_firewall_policy', + help=_('Egress firewall policy (name or ID) to delete')) + parser.add_argument( + '--public', + action='store_true', + help=_('Restrict use of the firewall group to the ' + 'current project')) + parser.add_argument( + '--enable', + action='store_true', + help=_('Disable firewall group')) + return parser + + def _get_attrs(self, client_manager, parsed_args): + attrs = {} + client = client_manager.neutronclient + if parsed_args.ingress_firewall_policy: + attrs['ingress_firewall_policy_id'] = None + if parsed_args.egress_firewall_policy: + attrs['egress_firewall_policy_id'] = None + if parsed_args.public: + attrs['public'] = False + if parsed_args.enable: + attrs['admin_state_up'] = False + if parsed_args.port: + old = client.find_resource( + const.FWG, parsed_args.firewall_group)['ports'] + new = [client.find_resource( + 'port', r)['id'] for r in parsed_args.port] + attrs['ports'] = sorted(list(set(old) - set(new))) + if parsed_args.all_port: + attrs['ports'] = [] + return attrs + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group, + cmd_resource='fwaas_' + const.FWG)['id'] + attrs = self._get_attrs(self.app.client_manager, parsed_args) + try: + client.update_fwaas_firewall_group(fwg_id, {const.FWG: attrs}) + except Exception as e: + msg = (_("Failed to unset firewall group '%(group)s': %(e)s") + % {'group': parsed_args.firewall_group, 'e': e}) + raise exceptions.CommandError(msg) diff --git a/neutronclient/osc/v2/fwaas/firewallpolicy.py b/neutronclient/osc/v2/fwaas/firewallpolicy.py new file mode 100644 index 0000000..9a5a0d7 --- /dev/null +++ b/neutronclient/osc/v2/fwaas/firewallpolicy.py @@ -0,0 +1,423 @@ +# Copyright 2016 FUJITSU LIMITED +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from __future__ import print_function + +from osc_lib.command import command +from osc_lib import exceptions +from osc_lib import utils +from oslo_log import log as logging + +from neutronclient._i18n import _ +from neutronclient.osc import utils as osc_utils +from neutronclient.osc.v2.fwaas import constants as const + + +LOG = logging.getLogger(__name__) + +_formatters = {} + +_attr_map = ( + ('id', 'ID', osc_utils.LIST_BOTH), + ('name', 'Name', osc_utils.LIST_BOTH), + ('firewall_rules', 'Firewall Rules', osc_utils.LIST_BOTH), + ('description', 'Description', osc_utils.LIST_LONG_ONLY), + ('audited', 'Audited', osc_utils.LIST_LONG_ONLY), + ('public', 'Public', osc_utils.LIST_LONG_ONLY), + ('tenant_id', 'Project', osc_utils.LIST_LONG_ONLY), +) + + +def _get_common_attrs(client_manager, parsed_args, is_create=True): + attrs = {} + client = client_manager.neutronclient + + if is_create: + if 'project' in parsed_args and parsed_args.project is not None: + attrs['tenant_id'] = osc_utils.find_project( + client_manager.identity, + parsed_args.project, + parsed_args.project_domain, + ).id + if parsed_args.firewall_rule and parsed_args.no_firewall_rule: + _firewall_rules = [] + for f in set(parsed_args.firewall_rule): + _firewall_rules.append(client.find_resource( + const.FWR, f, cmd_resource='fwaas_' + const.FWR)['id']) + attrs[const.FWRS] = sorted(_firewall_rules) + elif parsed_args.firewall_rule: + rules = [] + for f in set(parsed_args.firewall_rule): + rules.append(client.find_resource( + const.FWR, f, cmd_resource='fwaas_' + const.FWR)['id']) + if not is_create: + rules += client.find_resource( + const.FWP, parsed_args.firewall_policy)[const.FWRS] + attrs[const.FWRS] = sorted(set(rules)) + elif parsed_args.no_firewall_rule: + attrs[const.FWRS] = [] + if parsed_args.audited: + attrs['audited'] = True + if parsed_args.no_audited: + attrs['audited'] = False + if parsed_args.name: + attrs['name'] = str(parsed_args.name) + if parsed_args.description: + attrs['description'] = str(parsed_args.description) + if parsed_args.public: + attrs['public'] = True + if parsed_args.private: + attrs['public'] = False + return attrs + + +def _get_common_parser(parser): + parser.add_argument( + '--description', + help=_('Description of the firewall policy')) + audited_group = parser.add_mutually_exclusive_group() + audited_group.add_argument( + '--audited', + action='store_true', + help=_('Enable auditing for the policy')) + audited_group.add_argument( + '--no-audited', + action='store_true', + help=_('Disable auditing for the policy')) + public_group = parser.add_mutually_exclusive_group() + public_group.add_argument( + '--public', + action='store_true', + help=_('Make the firewall policy public, which allows it to be ' + 'used in all projects (as opposed to the default, ' + 'which is to restrict its use to the current project)')) + public_group.add_argument( + '--private', + action='store_true', + help=_('Restrict use of the firewall policy to the ' + 'current project')) + return parser + + +class CreateFirewallPolicy(command.ShowOne): + _description = _("Create a new firewall policy") + + def get_parser(self, prog_name): + parser = super(CreateFirewallPolicy, self).get_parser(prog_name) + _get_common_parser(parser) + osc_utils.add_project_owner_option_to_parser(parser) + parser.add_argument( + 'name', + metavar='', + help=_('Name for the firewall policy')) + fwr_group = parser.add_mutually_exclusive_group() + fwr_group.add_argument( + '--firewall-rule', + action='append', + metavar='', + help=_('Firewall rule(s) to apply (name or ID)')) + fwr_group.add_argument( + '--no-firewall-rule', + action='store_true', + help=_('Unset all firewall rules from firewall policy')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + attrs = _get_common_attrs(self.app.client_manager, parsed_args) + obj = client.create_fwaas_firewall_policy( + {const.FWP: attrs})[const.FWP] + columns, display_columns = osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns, formatters=_formatters) + return (display_columns, data) + + +class DeleteFirewallPolicy(command.Command): + _description = _("Delete firewall policy(s)") + + def get_parser(self, prog_name): + parser = super(DeleteFirewallPolicy, self).get_parser(prog_name) + parser.add_argument( + const.FWP, + metavar='', + nargs='+', + help=_('Firewall policy(s) to delete (name or ID)')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + result = 0 + for fwp in parsed_args.firewall_policy: + try: + fwp_id = client.find_resource( + const.FWP, fwp, cmd_resource='fwaas_' + const.FWP)['id'] + client.delete_fwaas_firewall_policy(fwp_id) + except Exception as e: + result += 1 + LOG.error(_("Failed to delete Firewall policy with " + "name or ID '%(firewall_policy)s': %(e)s") % { + const.FWP: fwp, 'e': e}) + + if result > 0: + total = len(parsed_args.firewall_policy) + msg = (_("%(result)s of %(total)s Firewall policy(s) " + "failed to delete.") % {'result': result, 'total': total}) + raise exceptions.CommandError(msg) + + +class FirewallPolicyInsertRule(command.Command): + _description = _("Insert a rule into a given firewall policy") + + def get_parser(self, prog_name): + parser = super(FirewallPolicyInsertRule, self).get_parser(prog_name) + parser.add_argument( + const.FWP, + metavar='', + help=_('Firewall policy to insert rule (name or ID)')) + parser.add_argument( + '--insert-before', + metavar='', + help=_('Insert the new rule before this existing rule ' + '(name or ID)')) + parser.add_argument( + '--insert-after', + metavar='', + help=_('Insert the new rule after this existing rule ' + '(name or ID)')) + parser.add_argument( + const.FWR, + metavar='', + help=_('Firewall rule to be inserted (name or ID)')) + return parser + + def args2body(self, parsed_args): + client = self.app.client_manager.neutronclient + _rule_id = _get_required_firewall_rule(client, parsed_args) + _insert_before = '' + if 'insert_before' in parsed_args: + if parsed_args.insert_before: + _insert_before = client.find_resource( + const.FWR, parsed_args.insert_before, + cmd_resource='fwaas_' + const.FWR)['id'] + _insert_after = '' + if 'insert_after' in parsed_args: + if parsed_args.insert_after: + _insert_after = client.find_resource( + const.FWR, parsed_args.insert_after, + cmd_resource='fwaas_' + const.FWR)['id'] + return {'firewall_rule_id': _rule_id, + 'insert_before': _insert_before, + 'insert_after': _insert_after} + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + body = self.args2body(parsed_args) + policy_id = client.find_resource( + const.FWP, parsed_args.firewall_policy, + cmd_resource='fwaas_' + const.FWP)['id'] + client.insert_rule_fwaas_firewall_policy(policy_id, body) + rule_id = body['firewall_rule_id'] + policy = parsed_args.firewall_policy + print((_('Inserted firewall rule %(rule)s in firewall policy ' + '%(policy)s') % {'rule': rule_id, 'policy': policy}), + file=self.app.stdout) + + +class FirewallPolicyRemoveRule(command.Command): + _description = _("Remove a rule from a given firewall policy") + + def get_parser(self, prog_name): + parser = super(FirewallPolicyRemoveRule, self).get_parser(prog_name) + parser.add_argument( + const.FWP, + metavar='', + help=_('Firewall policy to remove rule (name or ID)')) + parser.add_argument( + const.FWR, + metavar='', + help=_('Firewall rule to remove from policy (name or ID)')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + fwr_id = _get_required_firewall_rule(client, parsed_args) + body = {'firewall_rule_id': fwr_id} + policy_id = client.find_resource( + const.FWP, parsed_args.firewall_policy, + cmd_resource='fwaas_' + const.FWP)['id'] + client.remove_rule_fwaas_firewall_policy(policy_id, body) + rule_id = body['firewall_rule_id'] + policy = parsed_args.firewall_policy + print((_('Removed firewall rule %(rule)s from firewall policy ' + '%(policy)s') % {'rule': rule_id, 'policy': policy}), + file=self.app.stdout) + + +class ListFirewallPolicy(command.Lister): + _description = _("List firewall policies") + + def get_parser(self, prog_name): + parser = super(ListFirewallPolicy, self).get_parser(prog_name) + parser.add_argument( + '--long', + action='store_true', + default=False, + help=_("List additional fields in output") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + obj = client.list_fwaas_firewall_policies()[const.FWPS] + headers, columns = osc_utils.get_column_definitions( + _attr_map, long_listing=parsed_args.long) + return (headers, (utils.get_dict_properties( + s, columns, formatters=_formatters) for s in obj)) + + +class SetFirewallPolicy(command.Command): + _description = _("Set firewall policy properties") + + def get_parser(self, prog_name): + parser = super(SetFirewallPolicy, self).get_parser(prog_name) + _get_common_parser(parser) + parser.add_argument( + const.FWP, + metavar='', + help=_('Firewall policy to update (name or ID)')) + parser.add_argument( + '--name', + metavar='', + help=_('Name for the firewall policy')) + parser.add_argument( + '--firewall-rule', + action='append', + metavar='', + help=_('Firewall rule(s) to apply (name or ID)')) + parser.add_argument( + '--no-firewall-rule', + action='store_true', + help=_('Remove all firewall rules from firewall policy')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + fwp_id = client.find_resource( + const.FWP, parsed_args.firewall_policy, + cmd_resource='fwaas_' + const.FWP)['id'] + attrs = _get_common_attrs(self.app.client_manager, + parsed_args, is_create=False) + try: + client.update_fwaas_firewall_policy(fwp_id, {const.FWP: attrs}) + except Exception as e: + msg = (_("Failed to set firewall policy '%(policy)s': %(e)s") + % {'policy': parsed_args.firewall_policy, 'e': e}) + raise exceptions.CommandError(msg) + + +class ShowFirewallPolicy(command.ShowOne): + _description = _("Display firewall policy details") + + def get_parser(self, prog_name): + parser = super(ShowFirewallPolicy, self).get_parser(prog_name) + parser.add_argument( + const.FWP, + metavar='', + help=_('Firewall policy to show (name or ID)')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + fwp_id = client.find_resource(const.FWP, + parsed_args.firewall_policy, + cmd_resource='fwaas_' + const.FWP)['id'] + obj = client.show_fwaas_firewall_policy(fwp_id)[const.FWP] + columns, display_columns = osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns, formatters=_formatters) + return (display_columns, data) + + +def _get_required_firewall_rule(client, parsed_args): + if not parsed_args.firewall_rule: + msg = (_("Firewall rule (name or ID) is required.")) + raise exceptions.CommandError(msg) + return client.find_resource(const.FWR, parsed_args.firewall_rule, + cmd_resource='fwaas_' + const.FWR)['id'] + + +class UnsetFirewallPolicy(command.Command): + _description = _("Unset firewall policy properties") + + def get_parser(self, prog_name): + parser = super(UnsetFirewallPolicy, self).get_parser(prog_name) + parser.add_argument( + const.FWP, + metavar='', + help=_('Firewall policy to unset (name or ID)')) + firewall_rule_group = parser.add_mutually_exclusive_group() + firewall_rule_group.add_argument( + '--firewall-rule', + action='append', + metavar='', + help=_('Remove firewall rule(s) from the firewall policy ' + '(name or ID)')) + firewall_rule_group.add_argument( + '--all-firewall-rule', + action='store_true', + help=_('Remove all firewall rules from the firewall policy')) + parser.add_argument( + '--audited', + action='store_true', + help=_('Disable auditing for the policy')) + parser.add_argument( + '--public', + action='store_true', + help=_('Restrict use of the firewall policy to the ' + 'current project')) + return parser + + def _get_attrs(self, client_manager, parsed_args): + attrs = {} + client = client_manager.neutronclient + + if parsed_args.firewall_rule: + old = client.find_resource( + const.FWP, parsed_args.firewall_policy)[const.FWRS] + new = [] + for f in set(parsed_args.firewall_rule): + new.append(client.find_resource( + const.FWR, f, cmd_resource='fwaas_' + const.FWR)['id']) + attrs[const.FWRS] = sorted(list(set(old) - set(new))) + if parsed_args.all_firewall_rule: + attrs[const.FWRS] = [] + if parsed_args.audited: + attrs['audited'] = False + if parsed_args.public: + attrs['public'] = False + return attrs + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + fwp_id = client.find_resource( + const.FWP, parsed_args.firewall_policy, + cmd_resource='fwaas_' + const.FWP)['id'] + attrs = self._get_attrs(self.app.client_manager, parsed_args) + try: + client.update_fwaas_firewall_policy(fwp_id, {const.FWP: attrs}) + except Exception as e: + msg = (_("Failed to unset firewall policy '%(policy)s': %(e)s") + % {'policy': parsed_args.firewall_policy, 'e': e}) + raise exceptions.CommandError(msg) diff --git a/neutronclient/osc/v2/fwaas/firewallrule.py b/neutronclient/osc/v2/fwaas/firewallrule.py new file mode 100644 index 0000000..a7d9051 --- /dev/null +++ b/neutronclient/osc/v2/fwaas/firewallrule.py @@ -0,0 +1,399 @@ +# Copyright 2016 FUJITSU LIMITED +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +import copy + +from osc_lib.command import command +from osc_lib import exceptions +from osc_lib import utils +from oslo_log import log as logging + +from neutronclient._i18n import _ +from neutronclient.common import utils as nc_utils +from neutronclient.osc import utils as osc_utils +from neutronclient.osc.v2.fwaas import constants as const + + +LOG = logging.getLogger(__name__) + + +_attr_map = ( + ('id', 'ID', osc_utils.LIST_BOTH), + ('name', 'Name', osc_utils.LIST_BOTH), + ('enabled', 'Enabled', osc_utils.LIST_BOTH), + ('summary', 'Summary', osc_utils.LIST_SHORT_ONLY), + ('description', 'Description', osc_utils.LIST_LONG_ONLY), + ('ip_version', 'IP Version', osc_utils.LIST_LONG_ONLY), + ('action', 'Action', osc_utils.LIST_LONG_ONLY), + ('protocol', 'Protocol', osc_utils.LIST_LONG_ONLY), + ('source_ip_address', 'Source IP Address', osc_utils.LIST_LONG_ONLY), + ('source_port', 'Source Port', osc_utils.LIST_LONG_ONLY), + ('destination_ip_address', 'Destination IP Address', + osc_utils.LIST_LONG_ONLY), + ('destination_port', 'Destination Port', osc_utils.LIST_LONG_ONLY), + ('public', 'Public', osc_utils.LIST_LONG_ONLY), + ('tenant_id', 'Project', osc_utils.LIST_LONG_ONLY), +) + + +def _get_common_parser(parser): + parser.add_argument( + '--name', + metavar='', + help=_('Name of the firewall rule')) + parser.add_argument( + '--description', + metavar='', + help=_('Description of the firewall rule')) + parser.add_argument( + '--protocol', + choices=['tcp', 'udp', 'icmp', 'any'], + type=nc_utils.convert_to_lowercase, + help=_('Protocol for the firewall rule')) + parser.add_argument( + '--action', + choices=['allow', 'deny', 'reject'], + type=nc_utils.convert_to_lowercase, + help=_('Action for the firewall rule')) + parser.add_argument( + '--ip-version', + metavar='', + choices=['4', '6'], + help=_('Set IP version 4 or 6 (default is 4)')) + src_ip_group = parser.add_mutually_exclusive_group() + src_ip_group.add_argument( + '--source-ip-address', + metavar='', + help=_('Source IP address or subnet')) + src_ip_group.add_argument( + '--no-source-ip-address', + action='store_true', + help=_('Detach source IP address')) + dst_ip_group = parser.add_mutually_exclusive_group() + dst_ip_group.add_argument( + '--destination-ip-address', + metavar='', + help=_('Destination IP address or subnet')) + dst_ip_group.add_argument( + '--no-destination-ip-address', + action='store_true', + help=_('Detach destination IP address')) + src_port_group = parser.add_mutually_exclusive_group() + src_port_group.add_argument( + '--source-port', + metavar='', + help=_('Source port number or range' + '(integer in [1, 65535] or range like 123:456)')) + src_port_group.add_argument( + '--no-source-port', + action='store_true', + help=_('Detach source port number or range')) + dst_port_group = parser.add_mutually_exclusive_group() + dst_port_group.add_argument( + '--destination-port', + metavar='', + help=_('Destination port number or range' + '(integer in [1, 65535] or range like 123:456)')) + dst_port_group.add_argument( + '--no-destination-port', + action='store_true', + help=_('Detach destination port number or range')) + public_group = parser.add_mutually_exclusive_group() + public_group.add_argument( + '--public', + action='store_true', + help=_('Make the firewall rule public, which allows it to be ' + 'used in all projects (as opposed to the default, ' + 'which is to restrict its use to the current project)')) + public_group.add_argument( + '--private', + action='store_true', + help=_('Restrict use of the firewall rule to the current project')) + enable_group = parser.add_mutually_exclusive_group() + enable_group.add_argument( + '--enable-rule', + action='store_true', + help=_('Enable this rule (default is enabled)')) + enable_group.add_argument( + '--disable-rule', + action='store_true', + help=_('Disable this rule')) + return parser + + +def _get_common_attrs(client_manager, parsed_args, is_create=True): + attrs = {} + if is_create: + if 'project' in parsed_args and parsed_args.project is not None: + attrs['tenant_id'] = osc_utils.find_project( + client_manager.identity, + parsed_args.project, + parsed_args.project_domain, + ).id + if parsed_args.name: + attrs['name'] = str(parsed_args.name) + if parsed_args.description: + attrs['description'] = str(parsed_args.description) + if parsed_args.protocol: + attrs['protocol'] = parsed_args.protocol + if parsed_args.action: + attrs['action'] = parsed_args.action + if parsed_args.ip_version: + attrs['ip_version'] = str(parsed_args.ip_version) + if parsed_args.source_port: + attrs['source_port'] = parsed_args.source_port + if parsed_args.no_source_port: + attrs['source_port'] = None + if parsed_args.source_ip_address: + attrs['source_ip_address'] = parsed_args.source_ip_address + if parsed_args.no_source_ip_address: + attrs['source_ip_address'] = None + if parsed_args.destination_port: + attrs['destination_port'] = str(parsed_args.destination_port) + if parsed_args.no_destination_port: + attrs['destination_port'] = None + if parsed_args.destination_ip_address: + attrs['destination_ip_address'] = str( + parsed_args.destination_ip_address) + if parsed_args.no_destination_ip_address: + attrs['destination_ip_address'] = None + if parsed_args.enable_rule: + attrs['enabled'] = True + if parsed_args.disable_rule: + attrs['enabled'] = False + if parsed_args.public: + attrs['public'] = True + if parsed_args.private: + attrs['public'] = False + return attrs + + +def format_protocol(protocol): + return protocol if protocol else 'any' + +_formatters = {'protocol': format_protocol} + + +class CreateFirewallRule(command.ShowOne): + _description = _("Create a new firewall rule") + + def get_parser(self, prog_name): + parser = super(CreateFirewallRule, self).get_parser(prog_name) + _get_common_parser(parser) + osc_utils.add_project_owner_option_to_parser(parser) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + attrs = _get_common_attrs(self.app.client_manager, parsed_args) + obj = client.create_fwaas_firewall_rule( + {const.FWR: attrs})[const.FWR] + columns, display_columns = osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns, formatters=_formatters) + return display_columns, data + + +class DeleteFirewallRule(command.Command): + _description = _("Delete firewall rule(s)") + + def get_parser(self, prog_name): + parser = super(DeleteFirewallRule, self).get_parser(prog_name) + parser.add_argument( + const.FWR, + metavar='', + nargs='+', + help=_('Firewall rule(s) to delete (name or ID)')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + result = 0 + for fwr in parsed_args.firewall_rule: + try: + fwr_id = client.find_resource( + const.FWR, fwr, cmd_resource='fwaas_' + const.FWR)['id'] + client.delete_fwaas_firewall_rule(fwr_id) + except Exception as e: + result += 1 + LOG.error(_("Failed to delete Firewall rule with " + "name or ID '%(firewall_rule)s': %(e)s") % { + const.FWR: fwr, 'e': e}) + + if result > 0: + total = len(parsed_args.firewall_rule) + msg = (_("%(result)s of %(total)s Firewall rule(s) failed " + "to delete.") % {'result': result, 'total': total}) + raise exceptions.CommandError(msg) + + +class ListFirewallRule(command.Lister): + _description = _("List firewall rules that belong to a given tenant") + + def get_parser(self, prog_name): + parser = super(ListFirewallRule, self).get_parser(prog_name) + parser.add_argument( + '--long', + action='store_true', + default=False, + help=_("List additional fields in output") + ) + return parser + + def extend_list(self, data, parsed_args): + ext_data = copy.deepcopy(data) + for d in ext_data: + protocol = d['protocol'].upper() if d['protocol'] else 'ANY' + src_ip = 'none specified' + dst_ip = 'none specified' + src_port = '(none specified)' + dst_port = '(none specified)' + if 'source_ip_address' in d and d['source_ip_address']: + src_ip = str(d['source_ip_address']).lower() + if 'source_port' in d and d['source_port']: + src_port = '(' + str(d['source_port']).lower() + ')' + if 'destination_ip_address' in d and d['destination_ip_address']: + dst_ip = str(d['destination_ip_address']).lower() + if 'destination_port' in d and d['destination_port']: + dst_port = '(' + str(d['destination_port']).lower() + ')' + action = d['action'] if d.get('action') else 'no-action' + src = 'source(port): ' + src_ip + src_port + dst = 'dest(port): ' + dst_ip + dst_port + d['summary'] = ',\n '.join([protocol, src, dst, action]) + return ext_data + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + obj = client.list_fwaas_firewall_rules()[const.FWRS] + obj_extend = self.extend_list(obj, parsed_args) + headers, columns = osc_utils.get_column_definitions( + _attr_map, long_listing=parsed_args.long) + return (headers, (utils.get_dict_properties( + s, columns, formatters=_formatters) for s in obj_extend)) + + +class SetFirewallRule(command.Command): + _description = _("Set firewall rule properties") + + def get_parser(self, prog_name): + parser = super(SetFirewallRule, self).get_parser(prog_name) + _get_common_parser(parser) + parser.add_argument( + const.FWR, + metavar='', + help=_('Firewall rule to set (name or ID)')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + attrs = _get_common_attrs(self.app.client_manager, + parsed_args, is_create=False) + fwr_id = client.find_resource( + const.FWR, parsed_args.firewall_rule, + cmd_resource='fwaas_' + const.FWR)['id'] + try: + client.update_fwaas_firewall_rule(fwr_id, {const.FWR: attrs}) + except Exception as e: + msg = (_("Failed to set firewall rule '%(rule)s': %(e)s") + % {'rule': parsed_args.firewall_rule, 'e': e}) + raise exceptions.CommandError(msg) + + +class ShowFirewallRule(command.ShowOne): + _description = _("Display firewall rule details") + + def get_parser(self, prog_name): + parser = super(ShowFirewallRule, self).get_parser(prog_name) + parser.add_argument( + const.FWR, + metavar='', + help=_('Firewall rule to display (name or ID)')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + fwr_id = client.find_resource( + const.FWR, parsed_args.firewall_rule, + cmd_resource='fwaas_' + const.FWR)['id'] + obj = client.show_fwaas_firewall_rule(fwr_id)[const.FWR] + columns, display_columns = osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns, formatters=_formatters) + return (display_columns, data) + + +class UnsetFirewallRule(command.Command): + _description = _("Unset firewall rule properties") + + def get_parser(self, prog_name): + parser = super(UnsetFirewallRule, self).get_parser(prog_name) + parser.add_argument( + const.FWR, + metavar='', + help=_('Firewall rule to unset (name or ID)')) + parser.add_argument( + '--source-ip-address', + action='store_true', + help=_('Source IP address or subnet')) + parser.add_argument( + '--destination-ip-address', + action='store_true', + help=_('Destination IP address or subnet')) + parser.add_argument( + '--source-port', + action='store_true', + help=_('Source port number or range' + '(integer in [1, 65535] or range like 123:456)')) + parser.add_argument( + '--destination-port', + action='store_true', + help=_('Destination port number or range' + '(integer in [1, 65535] or range like 123:456)')) + parser.add_argument( + '--public', + action='store_true', + help=_('Restrict use of the firewall rule to the current project')) + parser.add_argument( + '--enable-rule', + action='store_true', + help=_('Disable this rule')) + return parser + + def _get_attrs(self, client_manager, parsed_args): + attrs = {} + if parsed_args.source_ip_address: + attrs['source_ip_address'] = None + if parsed_args.source_port: + attrs['source_port'] = None + if parsed_args.destination_ip_address: + attrs['destination_ip_address'] = None + if parsed_args.destination_port: + attrs['destination_port'] = None + if parsed_args.public: + attrs['public'] = False + if parsed_args.enable_rule: + attrs['enabled'] = False + return attrs + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + attrs = self._get_attrs(self.app.client_manager, parsed_args) + fwr_id = client.find_resource( + const.FWR, parsed_args.firewall_rule, + cmd_resource='fwaas_' + const.FWR)['id'] + try: + client.update_fwaas_firewall_rule(fwr_id, {const.FWR: attrs}) + except Exception as e: + msg = (_("Failed to unset firewall rule '%(rule)s': %(e)s") + % {'rule': parsed_args.firewall_rule, 'e': e}) + raise exceptions.CommandError(msg) diff --git a/neutronclient/tests/unit/osc/v2/fwaas/common.py b/neutronclient/tests/unit/osc/v2/fwaas/common.py new file mode 100644 index 0000000..9159884 --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/fwaas/common.py @@ -0,0 +1,270 @@ +# Copyright 2016 FUJITSU LIMITED +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from osc_lib.tests import utils + +from neutronclient.tests.unit.osc.v2 import fakes as test_fakes + + +class TestListFWaaS(test_fakes.TestNeutronClientOSCV2): + + def test_list_with_no_option(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with() + self.assertEqual(list(self.list_headers), headers) + self.assertEqual([self.list_data], list(data)) + + def test_list_with_long_option(self): + arglist = ['--long'] + verifylist = [('long', True)] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with() + self.assertEqual(list(self.headers), headers) + self.assertEqual([self.data], list(data)) + + +class TestShowFWaaS(test_fakes.TestNeutronClientOSCV2): + + def test_show_filtered_by_id_or_name(self): + target = self.resource['id'] + arglist = [target] + verifylist = [(self.res, target)] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with(target) + self.assertEqual(self.ordered_headers, headers) + self.assertEqual(self.ordered_data, data) + + +class TestCreateFWaaS(test_fakes.TestNeutronClientOSCV2): + pass + + +class TestSetFWaaS(test_fakes.TestNeutronClientOSCV2): + + def test_set_name(self): + target = self.resource['id'] + update = 'change' + arglist = [target, '--name', update] + verifylist = [ + (self.res, target), + ('name', update), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'name': update}}) + self.assertIsNone(result) + + def test_set_description(self): + target = self.resource['id'] + update = 'change-desc' + arglist = [target, '--description', update] + verifylist = [ + (self.res, target), + ('description', update), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'description': update}}) + self.assertIsNone(result) + + def test_set_public(self): + target = self.resource['id'] + arglist = [target, '--public'] + verifylist = [ + (self.res, target), + ('public', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'public': True}}) + self.assertIsNone(result) + + def test_set_duplicate_public(self): + target = self.resource['id'] + arglist = [target, '--public', '--public'] + verifylist = [ + (self.res, target), + ('public', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'public': True}}) + self.assertIsNone(result) + + def test_set_private(self): + target = self.resource['id'] + arglist = [target, '--private'] + verifylist = [ + (self.res, target), + ('public', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'public': False}}) + self.assertIsNone(result) + + def test_set_duplicate_private(self): + target = self.resource['id'] + arglist = [target, '--private', '--private'] + verifylist = [ + (self.res, target), + ('public', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'public': False}}) + self.assertIsNone(result) + + def test_set_private_and_public(self): + target = self.resource['id'] + arglist = [target, '--private', '--public'] + verifylist = [ + (self.res, target), + ('private', True), + ('public', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_set_public_and_priavte(self): + target = self.resource['id'] + arglist = [target, '--public', '--private'] + verifylist = [ + (self.res, target), + ('public', True), + ('private', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_set_project(self): + target = self.resource['id'] + project_id = 'b14ce3b699594d13819a859480286489' + arglist = [target, '--project', project_id] + verifylist = [ + (self.res, target), + ('tenant_id', project_id), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_set_project_domain(self): + target = self.resource['id'] + project_domain = 'mydomain.com' + arglist = [target, '--project-domain', project_domain] + verifylist = [ + (self.res, target), + ('project_domain', project_domain), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + +class TestDeleteFWaaS(test_fakes.TestNeutronClientOSCV2): + + def test_delete_with_one_resource(self): + target = self.resource['id'] + arglist = [target] + verifylist = [(self.res, [target])] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with(target) + self.assertIsNone(result) + + def test_delete_with_multiple_resources(self): + target1 = 'target1' + target2 = 'target2' + arglist = [target1, target2] + verifylist = [(self.res, [target1, target2])] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.assertIsNone(result) + + self.assertEqual(2, self.mocked.call_count) + for idx, reference in enumerate([target1, target2]): + actual = ''.join(self.mocked.call_args_list[idx][0]) + self.assertEqual(reference, actual) + + +class TestUnsetFWaaS(test_fakes.TestNeutronClientOSCV2): + + def test_unset_public(self): + target = self.resource['id'] + arglist = [ + target, + '--public', + ] + verifylist = [ + (self.res, target), + ('public', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.mocked.assert_called_once_with( + target, {self.res: {'public': False}}) + self.assertIsNone(result) + + def test_set_public_and_priavte(self): + target = self.resource['id'] + arglist = [target, '--public', '--private'] + verifylist = [ + (self.res, target), + ('public', True), + ('private', True), + ] + # check_parser: error: unrecognized arguments: --private + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_set_duplicate_public(self): + target = self.resource['id'] + arglist = [target, '--public', '--public'] + verifylist = [ + (self.res, target), + ('public', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'public': False}}) + self.assertIsNone(result) diff --git a/neutronclient/tests/unit/osc/v2/fwaas/fakes.py b/neutronclient/tests/unit/osc/v2/fwaas/fakes.py new file mode 100644 index 0000000..0eb3fe5 --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/fwaas/fakes.py @@ -0,0 +1,117 @@ +# Copyright 2016 FUJITSU LIMITED +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import collections +import copy +import mock +import uuid + + +class FakeFWaaS(object): + + def create(self, attrs={}): + """Create a fake fwaas resources + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A OrderedDict faking the fwaas resource + """ + self.ordered.update(attrs) + return copy.deepcopy(self.ordered) + + def bulk_create(self, attrs=None, count=2): + """Create multiple fake fwaas resources + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of fwaas resources to fake + :return: + A list of dictionaries faking the fwaas resources + """ + return [self.create(attrs=attrs) for i in range(0, count)] + + def get(self, attrs=None, count=2): + """Create multiple fake fwaas resources + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of fwaas resources to fake + :return: + A list of dictionaries faking the fwaas resource + """ + if attrs is None: + self.attrs = self.bulk_create(count=count) + return mock.Mock(side_effect=attrs) + + +class FirewallGroup(FakeFWaaS): + """Fake one or more firewall group""" + + def __init__(self): + super(FirewallGroup, self).__init__() + self.ordered = collections.OrderedDict(( + ('id', 'firewall-group-id-' + uuid.uuid4().hex), + ('name', 'my-group-' + uuid.uuid4().hex), + ('ingress_firewall_policy_id', None), + ('egress_firewall_policy_id', None), + ('description', 'my-desc-' + uuid.uuid4().hex), + ('status', 'INACTIVE'), + ('ports', []), + ('admin_state_up', True), + ('public', False), + ('tenant_id', 'tenant-id-' + uuid.uuid4().hex), + )) + + +class FirewallPolicy(FakeFWaaS): + """Fake one or more firewall policy""" + + def __init__(self): + super(FirewallPolicy, self).__init__() + self.ordered = collections.OrderedDict(( + ('id', 'firewall-policy-' + uuid.uuid4().hex), + ('name', 'my-policy-' + uuid.uuid4().hex), + ('firewall_rules', []), + ('description', 'my-desc-' + uuid.uuid4().hex), + ('audited', True), + ('public', False), + ('tenant_id', 'tenant-id-' + uuid.uuid4().hex), + )) + + +class FirewallRule(FakeFWaaS): + """Fake one or more firewall rule""" + + def __init__(self): + super(FirewallRule, self).__init__() + self.ordered = collections.OrderedDict(( + ('id', 'firewall-rule-id-' + uuid.uuid4().hex), + ('name', 'my-rule-' + uuid.uuid4().hex), + ('enabled', False), + ('description', 'my-desc-' + uuid.uuid4().hex), + ('ip_version', 4), + ('action', 'deny'), + ('protocol', 'any'), + ('source_ip_address', '192.168.1.0/24'), + ('source_port', '1:11111'), + ('destination_ip_address', '192.168.2.2'), + ('destination_port', '2:22222'), + ('public', False), + ('tenant_id', 'tenant-id-' + uuid.uuid4().hex), + )) diff --git a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallgroup.py b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallgroup.py new file mode 100644 index 0000000..44bcc70 --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallgroup.py @@ -0,0 +1,613 @@ +# Copyright 2016 FUJITSU LIMITED +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import copy +import mock +import re + +from osc_lib import exceptions +from osc_lib.tests import utils + +from neutronclient.osc import utils as osc_utils +from neutronclient.osc.v2.fwaas import firewallgroup +from neutronclient.tests.unit.osc.v2 import fakes as test_fakes +from neutronclient.tests.unit.osc.v2.fwaas import common +from neutronclient.tests.unit.osc.v2.fwaas import fakes + + +_fwg = fakes.FirewallGroup().create() +CONVERT_MAP = { + 'ingress_firewall_policy': 'ingress_firewall_policy_id', + 'egress_firewall_policy': 'egress_firewall_policy_id', + 'no_ingress_firewall_policy': 'ingress_firewall_policy_id', + 'no_egress_firewall_policy': 'egress_firewall_policy_id', + 'public': 'public', + 'private': 'public', + 'project': 'tenant_id', + 'enable': 'admin_state_up', + 'disable': 'admin_state_up', + 'port': 'ports', +} + + +def _generate_response(ordered_dict=None, data=None): + source = ordered_dict if ordered_dict else _fwg + up = {'admin_state_up': 'UP' if source['admin_state_up'] else 'DOWN'} + if data: + up.append(data) + source.update(up) + return tuple(source[key] for key in source) + + +def _generate_req_and_res(verifylist): + request = dict(verifylist) + response = copy.deepcopy(_fwg) + for key, val in verifylist: + del request[key] + if re.match('^no_', key) and val is True: + new_value = None + elif key == 'enable' and val: + new_value = True + elif key == 'disable' and val: + new_value = False + else: + new_value = val + converted = CONVERT_MAP.get(key, key) + request[converted] = new_value + response[converted] = new_value + return request, response + + +class TestFirewallGroup(test_fakes.TestNeutronClientOSCV2): + + def check_results(self, headers, data, exp_req, is_list=False): + if is_list: + req_body = {self.res_plural: [exp_req]} + else: + req_body = {self.res: exp_req} + self.mocked.assert_called_once_with(req_body) + self.assertEqual(self.ordered_headers, headers) + self.assertEqual(self.ordered_data, data) + + def setUp(self): + super(TestFirewallGroup, self).setUp() + + def side_effect_for_find_resource(*args, **kwargs): + port_id = args[1] + ports = _fwg['ports'] + if self.res in args[0]: + ports = _fwg['ports'] + return {'id': port_id, 'ports': ports} + + def side_effect_for_list_ports(*args): + port_name = 'id_for_port' + return {'ports': [{'id': port_name}]} + + self.neutronclient.find_resource.side_effect = mock.Mock( + side_effect=side_effect_for_find_resource) + self.neutronclient.list_ports = mock.Mock( + side_effect=side_effect_for_list_ports) + osc_utils.find_project = mock.Mock() + osc_utils.find_project.id = _fwg['tenant_id'] + self.res = 'firewall_group' + self.res_plural = 'firewall_groups' + self.resource = _fwg + self.list_headers = ( + 'ID', + 'Name', + 'Ingress Policy ID', + 'Egress Policy ID', + ) + self.list_data = ( + _fwg['id'], + _fwg['name'], + _fwg['ingress_firewall_policy_id'], + _fwg['egress_firewall_policy_id'], + ) + self.headers = tuple(self.list_headers + ( + 'Description', + 'Status', + 'Ports', + 'State', + 'Public', + 'Project', + )) + self.data = _generate_response() + self.ordered_headers = copy.deepcopy(tuple(sorted(self.headers))) + self.ordered_data = ( + _fwg['description'], + _fwg['egress_firewall_policy_id'], + _fwg['id'], + _fwg['ingress_firewall_policy_id'], + _fwg['name'], + _fwg['ports'], + _fwg['tenant_id'], + _fwg['public'], + 'UP' if _fwg['admin_state_up'] else 'DOWN', + _fwg['status'], + ) + self.ordered_columns = ( + 'description', + 'egress_firewall_policy_id', + 'id', + 'ingress_firewall_policy_id', + 'name', + 'ports', + 'tenant_id', + 'public', + 'admin_state_up', + 'status', + ) + + +class TestCreateFirewallGroup(TestFirewallGroup, common.TestCreateFWaaS): + + def setUp(self): + # Mock objects + super(TestCreateFirewallGroup, self).setUp() + self.neutronclient.create_fwaas_firewall_group = mock.Mock( + return_value={self.res: _fwg}) + self.mocked = self.neutronclient.create_fwaas_firewall_group + self.cmd = firewallgroup.CreateFirewallGroup(self.app, self.namespace) + + def _update_expect_response(self, request, response): + """Set expected request and response + + :param request + A dictionary of request body(dict of verifylist) + :param response + A OrderedDict of request body + """ + # Update response body + self.neutronclient.create_fwaas_firewall_group.return_value = \ + {self.res: dict(response)} + osc_utils.find_project.return_value.id = response['tenant_id'] + # Update response(finally returns 'data') + self.data = _generate_response(ordered_dict=response) + self.ordered_data = tuple( + response[column] for column in self.ordered_columns + ) + + def test_create_with_no_option(self): + # firewall_group-create with mandatory (none) params. + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + + self.assertEqual(self.ordered_headers, headers) + self.assertEqual(self.ordered_data, data) + + def test_create_with_port(self): + # firewall_group-create with 'port' + port_id = 'id_for_port' + arglist = ['--port', port_id] + verifylist = [('port', [port_id])] + request, response = _generate_req_and_res(verifylist) + self._update_expect_response(request, response) + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + + self.check_results(headers, data, request) + + def test_create_with_all_params(self): + name = 'my-name' + description = 'my-desc' + ingress_policy = 'my-ingress-policy' + egress_policy = 'my-egress-policy' + port = 'port' + tenant_id = 'my-tenant' + arglist = [ + '--name', name, + '--description', description, + '--ingress-firewall-policy', ingress_policy, + '--egress-firewall-policy', egress_policy, + '--port', port, + '--project', tenant_id, + '--public', + '--disable', + ] + verifylist = [ + ('name', name), + ('description', description), + ('ingress_firewall_policy', ingress_policy), + ('egress_firewall_policy', egress_policy), + ('port', [port]), + ('public', True), + ('project', tenant_id), + ('disable', True), + ] + request, response = _generate_req_and_res(verifylist) + self._update_expect_response(request, response) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + + self.check_results(headers, data, request) + + def test_create_with_public_and_private(self): + arglist = [ + '--public', + '--private', + ] + verifylist = [ + ('public', True), + ('private', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_create_with_ports_and_no(self): + port = 'my-port' + arglist = [ + '--port', port, + '--no-port', + ] + verifylist = [ + ('port', [port]), + ('no_port', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_create_with_ingress_policy_and_no(self): + policy = 'my-policy' + arglist = [ + '--ingress-firewall-policy', policy, + '--no-ingress-firewall-policy', + ] + verifylist = [ + ('ingress_firewall_policy', policy), + ('no_ingress_firewall_policy', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_create_with_egress_policy_and_no(self): + policy = 'my-policy' + arglist = [ + '--egress-firewall-policy', policy, + '--no-egress-firewall-policy', + ] + verifylist = [ + ('egress_firewall_policy', policy), + ('no_egress_firewall_policy', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + +class TestListFirewallGroup(TestFirewallGroup, common.TestListFWaaS): + + def setUp(self): + super(TestListFirewallGroup, self).setUp() + # Mock objects + self.neutronclient.list_fwaas_firewall_groups = mock.Mock( + return_value={self.res_plural: [_fwg]}) + self.mocked = self.neutronclient.list_fwaas_firewall_groups + self.cmd = firewallgroup.ListFirewallGroup(self.app, self.namespace) + + +class TestShowFirewallGroup(TestFirewallGroup, common.TestShowFWaaS): + + def setUp(self): + super(TestShowFirewallGroup, self).setUp() + # Mock objects + self.neutronclient.show_fwaas_firewall_group = mock.Mock( + return_value={self.res: _fwg}) + self.mocked = self.neutronclient.show_fwaas_firewall_group + self.cmd = firewallgroup.ShowFirewallGroup(self.app, self.namespace) + + +class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): + + def setUp(self): + super(TestSetFirewallGroup, self).setUp() + # Mock objects + _fwg['ports'] = ['old_port'] + self.neutronclient.update_fwaas_firewall_group = mock.Mock( + return_value={self.res: _fwg}) + self.mocked = self.neutronclient.update_fwaas_firewall_group + self.cmd = firewallgroup.SetFirewallGroup(self.app, self.namespace) + + def _update_expect_response(self, request, response): + """Set expected request and response + + :param request + A dictionary of request body(dict of verifylist) + :param response + A OrderedDict of request body + """ + osc_utils.find_project.return_value.id = response['tenant_id'] + # Update response(finally returns 'data') + self.data = _generate_response(ordered_dict=response) + self.ordered_data = tuple( + response[column] for column in self.ordered_columns + ) + + def test_set_ingress_policy(self): + # firewall_group-update myid --policy newpolicy. + target = self.resource['id'] + policy = 'ingress_policy' + arglist = [target, '--ingress-firewall-policy', policy] + verifylist = [ + (self.res, target), + ('ingress_firewall_policy', policy), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'ingress_firewall_policy_id': policy}}) + self.assertIsNone(result) + + def test_set_port(self): + target = self.resource['id'] + port1 = 'additional_port1' + port2 = 'additional_port2' + arglist = [ + target, + '--port', port1, + '--port', port2, + ] + verifylist = [ + (self.res, target), + ('port', [port1, port2]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + expect = {'ports': sorted(_fwg['ports'] + [port1, port2])} + self.mocked.assert_called_once_with(target, {self.res: expect}) + self.assertIsNone(result) + + def test_set_no_port(self): + # firewall_group-update myid --policy newpolicy. + target = self.resource['id'] + arglist = [target, '--no-port'] + verifylist = [ + (self.res, target), + ('no_port', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'ports': []}}) + self.assertIsNone(result) + + def test_set_admin_state(self): + target = self.resource['id'] + arglist = [target, '--enable'] + verifylist = [ + (self.res, target), + ('enable', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'admin_state_up': True}}) + self.assertIsNone(result) + + def test_set_egress_policy(self): + target = self.resource['id'] + policy = 'egress_policy' + arglist = [target, '--egress-firewall-policy', policy] + verifylist = [ + (self.res, target), + ('egress_firewall_policy', policy), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'egress_firewall_policy_id': policy}}) + self.assertIsNone(result) + + def test_set_no_ingress_policies(self): + target = self.resource['id'] + arglist = [target, '--no-ingress-firewall-policy'] + verifylist = [ + (self.res, target), + ('no_ingress_firewall_policy', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'ingress_firewall_policy_id': None}}) + self.assertIsNone(result) + + def test_set_no_egress_policies(self): + target = self.resource['id'] + arglist = [target, '--no-egress-firewall-policy'] + verifylist = [ + (self.res, target), + ('no_egress_firewall_policy', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'egress_firewall_policy_id': None}}) + self.assertIsNone(result) + + def test_set_port_and_no_port(self): + target = self.resource['id'] + port = 'my-port' + arglist = [ + target, + '--port', port, + '--no-port', + ] + verifylist = [ + (self.res, target), + ('port', [port]), + ('no_port', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.mocked.assert_called_once_with( + target, {self.res: {'ports': [port]}}) + self.assertIsNone(result) + + def test_set_ingress_policy_and_no_ingress_policy(self): + target = self.resource['id'] + arglist = [ + target, + '--ingress-firewall-policy', 'my-ingress', + '--no-ingress-firewall-policy', + ] + verifylist = [ + (self.res, target), + ('ingress_firewall_policy', 'my-ingress'), + ('no_ingress_firewall_policy', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_set_egress_policy_and_no_egress_policy(self): + target = self.resource['id'] + arglist = [ + target, + '--egress-firewall-policy', 'my-egress', + '--no-egress-firewall-policy', + ] + verifylist = [ + (self.res, target), + ('egress_firewall_policy', 'my-egress'), + ('no_egress_firewall_policy', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_set_and_raises(self): + self.neutronclient.update_fwaas_firewall_group = mock.Mock( + side_effect=Exception) + target = self.resource['id'] + arglist = [target, '--name', 'my-name'] + verifylist = [(self.res, target), ('name', 'my-name')] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.assertRaises( + exceptions.CommandError, self.cmd.take_action, parsed_args) + + +class TestDeleteFirewallGroup(TestFirewallGroup, common.TestDeleteFWaaS): + + def setUp(self): + super(TestDeleteFirewallGroup, self).setUp() + # Mock objects + self.neutronclient.delete_fwaas_firewall_group = mock.Mock() + self.mocked = self.neutronclient.delete_fwaas_firewall_group + self.cmd = firewallgroup.DeleteFirewallGroup(self.app, self.namespace) + + +class TestUnsetFirewallGroup(TestFirewallGroup, common.TestUnsetFWaaS): + + def setUp(self): + super(TestUnsetFirewallGroup, self).setUp() + _fwg['ports'] = ['old_port'] + # Mock objects + self.neutronclient.update_fwaas_firewall_group = mock.Mock() + self.mocked = self.neutronclient.update_fwaas_firewall_group + self.cmd = firewallgroup.UnsetFirewallGroup(self.app, self.namespace) + + def test_unset_ingress_policy(self): + target = self.resource['id'] + arglist = [ + target, + '--ingress-firewall-policy', + ] + verifylist = [ + (self.res, target), + ('ingress_firewall_policy', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.mocked.assert_called_once_with( + target, {self.res: {'ingress_firewall_policy_id': None}}) + self.assertIsNone(result) + + def test_unset_egress_policy(self): + target = self.resource['id'] + arglist = [ + target, + '--egress-firewall-policy', + ] + verifylist = [ + (self.res, target), + ('egress_firewall_policy', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.mocked.assert_called_once_with( + target, {self.res: {'egress_firewall_policy_id': None}}) + self.assertIsNone(result) + + def test_unset_enable(self): + target = self.resource['id'] + arglist = [ + target, + '--enable', + ] + verifylist = [ + (self.res, target), + ('enable', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.mocked.assert_called_once_with( + target, {self.res: {'admin_state_up': False}}) + self.assertIsNone(result) + + def test_unset_port(self): + target = self.resource['id'] + port = 'old_port' + arglist = [ + target, + '--port', port, + ] + verifylist = [ + (self.res, target), + ('port', [port]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.mocked.assert_called_once_with(target, {self.res: {'ports': []}}) + self.assertIsNone(result) + + def test_unset_all_port(self): + target = self.resource['id'] + arglist = [ + target, + '--all-port', + ] + verifylist = [ + (self.res, target), + ('all_port', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.mocked.assert_called_once_with(target, {self.res: {'ports': []}}) + self.assertIsNone(result) diff --git a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallpolicy.py b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallpolicy.py new file mode 100644 index 0000000..fff4e91 --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallpolicy.py @@ -0,0 +1,599 @@ +# Copyright 2016 FUJITSU LIMITED +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import copy +import mock +import re + +from osc_lib import exceptions +from osc_lib.tests import utils + +from neutronclient.osc import utils as osc_utils +from neutronclient.osc.v2.fwaas import firewallpolicy +from neutronclient.tests.unit.osc.v2 import fakes as test_fakes +from neutronclient.tests.unit.osc.v2.fwaas import common +from neutronclient.tests.unit.osc.v2.fwaas import fakes + + +_fwp = fakes.FirewallPolicy().create() +CONVERT_MAP = { + 'public': 'public', + 'private': 'public', + 'project': 'tenant_id', + 'port': 'ports', + 'name': 'name', + 'id': 'id', + 'firewall_rule': 'firewall_rules', + 'description': 'description' +} + + +def _generate_data(ordered_dict=None, data=None): + source = ordered_dict if ordered_dict else _fwp + if data: + source.update(data) + return tuple(source[key] for key in source) + + +def _generate_req_and_res(verifylist): + request = dict(verifylist) + response = copy.deepcopy(_fwp) + for key, val in verifylist: + converted = CONVERT_MAP.get(key, key) + del request[key] + if re.match('^no_', key) and val is True: + new_value = None + elif key == 'enable' and val: + new_value = True + elif key == 'disable' and val: + new_value = False + else: + new_value = val + request[converted] = new_value + response[converted] = new_value + return request, response + + +class TestFirewallPolicy(test_fakes.TestNeutronClientOSCV2): + + def check_results(self, headers, data, exp_req, is_list=False): + if is_list: + req_body = {self.res_plural: [exp_req]} + else: + req_body = {self.res: exp_req} + self.mocked.assert_called_once_with(req_body) + self.assertEqual(self.ordered_headers, headers) + self.assertEqual(self.ordered_data, data) + + def setUp(self): + super(TestFirewallPolicy, self).setUp() + + def _find_resource(*args, **kwargs): + rule_id = args[1] + rules = [] + if self.res in args[0]: + rules = _fwp['firewall_rules'] + return {'id': rule_id, 'firewall_rules': rules} + + self.neutronclient.find_resource = mock.Mock( + side_effect=_find_resource) + # fw_common.get_id.side_effect = lambda x, y, z: z + osc_utils.find_project = mock.Mock() + osc_utils.find_project.id = _fwp['tenant_id'] + self.res = 'firewall_policy' + self.res_plural = 'firewall_policies' + self.resource = _fwp + self.list_headers = ( + 'ID', + 'Name', + 'Firewall Rules', + ) + self.list_data = ( + _fwp['id'], + _fwp['name'], + _fwp['firewall_rules'], + ) + self.headers = tuple(self.list_headers + ( + 'Description', + 'Audited', + 'Public', + 'Project') + ) + self.data = _generate_data() + self.ordered_headers = ( + 'Audited', + 'Description', + 'Firewall Rules', + 'ID', + 'Name', + 'Project', + 'Public', + ) + self.ordered_data = ( + _fwp['audited'], + _fwp['description'], + _fwp['firewall_rules'], + _fwp['id'], + _fwp['name'], + _fwp['tenant_id'], + _fwp['public'], + ) + self.ordered_columns = ( + 'audited', + 'description', + 'firewall_rules', + 'id', + 'name', + 'tenant_id', + 'public', + ) + + +class TestCreateFirewallPolicy(TestFirewallPolicy, common.TestCreateFWaaS): + + def setUp(self): + super(TestCreateFirewallPolicy, self).setUp() + self.neutronclient.create_fwaas_firewall_policy = mock.Mock( + return_value={self.res: _fwp}) + self.mocked = self.neutronclient.create_fwaas_firewall_policy + self.cmd = firewallpolicy.CreateFirewallPolicy(self.app, + self.namespace) + + def _update_expect_response(self, request, response): + """Set expected request and response + + :param request + A dictionary of request body(dict of verifylist) + :param response + A OrderedDict of request body + """ + # Update response body + self.neutronclient.create_fwaas_firewall_policy.return_value = \ + {self.res: dict(response)} + osc_utils.find_project.return_value.id = response['tenant_id'] + # Update response(finally returns 'data') + self.data = _generate_data(data=response) + self.ordered_data = tuple( + response[column] for column in self.ordered_columns + ) + + def test_create_with_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_create_with_mandatory_param(self): + name = 'my-fwg' + arglist = [ + name, + ] + verifylist = [ + ('name', name), + ] + request, response = _generate_req_and_res(verifylist) + self._update_expect_response(request, response) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + + self.check_results(headers, data, request) + + def test_create_with_all_params(self): + name = 'my-fwp' + desc = 'my-desc' + rule1 = 'rule1' + rule2 = 'rule2' + project = 'my-tenant' + arglist = [ + name, + '--description', desc, + '--firewall-rule', rule1, + '--firewall-rule', rule2, + '--project', project, + '--public', + '--audited', + ] + verifylist = [ + ('name', name), + ('description', desc), + ('firewall_rule', [rule1, rule2]), + ('project', project), + ('public', True), + ('audited', True), + ] + request, response = _generate_req_and_res(verifylist) + self._update_expect_response(request, response) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + + self.check_results(headers, data, request) + + def test_create_with_firewall_rule_and_no(self): + name = 'my-fwp' + rule1 = 'rule1' + rule2 = 'rule2' + arglist = [ + name, + '--firewall-rule', rule1, + '--firewall-rule', rule2, + '--no-firewall-rule', + ] + verifylist = [ + ('name', name), + ('firewall_rule', [rule1, rule2]), + ('no_firewall_rule', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_create_with_public_and_private(self): + name = 'my-fwp' + arglist = [ + name, + '--public', + '--private', + ] + verifylist = [ + ('name', name), + ('public', True), + ('private', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_create_with_audited_and_no(self): + name = 'my-fwp' + arglist = [ + name, + '--audited', + '--no-audited', + ] + verifylist = [ + ('name', name), + ('audited', True), + ('no_audited', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + +class TestListFirewallPolicy(TestFirewallPolicy, common.TestListFWaaS): + + def setUp(self): + super(TestListFirewallPolicy, self).setUp() + self.neutronclient.list_fwaas_firewall_policies = mock.Mock( + return_value={'firewall_policies': [_fwp]}) + self.mocked = self.neutronclient.list_fwaas_firewall_policies + self.cmd = firewallpolicy.ListFirewallPolicy(self.app, self.namespace) + + +class TestShowFirewallPolicy(TestFirewallPolicy, common.TestShowFWaaS): + + def setUp(self): + super(TestShowFirewallPolicy, self).setUp() + self.neutronclient.show_fwaas_firewall_policy = mock.Mock( + return_value={self.res: _fwp}) + self.mocked = self.neutronclient.show_fwaas_firewall_policy + self.cmd = firewallpolicy.ShowFirewallPolicy(self.app, self.namespace) + + +class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS): + + def setUp(self): + super(TestSetFirewallPolicy, self).setUp() + self.neutronclient.update_fwaas_firewall_policy = mock.Mock( + return_value={self.res: _fwp}) + self.mocked = self.neutronclient.update_fwaas_firewall_policy + self.cmd = firewallpolicy.SetFirewallPolicy(self.app, self.namespace) + + def test_set_rules(self): + target = self.resource['id'] + rule1 = 'new_rule1' + rule2 = 'new_rule2' + arglist = [ + target, + '--firewall-rule', rule1, + '--firewall-rule', rule2, + ] + verifylist = [ + (self.res, target), + ('firewall_rule', [rule1, rule2]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + expect = sorted(set(_fwp['firewall_rules'] + [rule1, rule2])) + body = {self.res: {'firewall_rules': expect}} + self.mocked.assert_called_once_with(target, body) + self.assertIsNone(result) + + def test_set_no_rules(self): + target = self.resource['id'] + arglist = [target, '--no-firewall-rule'] + verifylist = [ + (self.res, target), + ('no_firewall_rule', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + body = {self.res: {'firewall_rules': []}} + self.mocked.assert_called_once_with(target, body) + self.assertIsNone(result) + + def test_set_rules_and_no_rules(self): + target = self.resource['id'] + rule1 = 'rule1' + arglist = [ + target, + '--firewall-rule', rule1, + '--no-firewall-rule', + ] + verifylist = [ + (self.res, target), + ('firewall_rule', [rule1]), + ('no_firewall_rule', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + body = {self.res: {'firewall_rules': [rule1]}} + self.mocked.assert_called_once_with(target, body) + self.assertIsNone(result) + + def test_set_audited(self): + target = self.resource['id'] + arglist = [target, '--audited'] + verifylist = [ + (self.res, target), + ('audited', True), + ] + body = {'audited': True} + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with(target, {self.res: body}) + self.assertIsNone(result) + + def test_set_no_audited(self): + target = self.resource['id'] + arglist = [target, '--no-audited'] + verifylist = [ + (self.res, target), + ('no_audited', True), + ] + body = {'audited': False} + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with(target, {self.res: body}) + self.assertIsNone(result) + + def test_set_audited_and_no_audited(self): + target = self.resource['id'] + arglist = [ + target, + '--audited', + '--no-audited', + ] + verifylist = [ + (self.res, target), + ('audited', True), + ('no_audited', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_set_and_raises(self): + self.neutronclient.update_fwaas_firewall_policy = mock.Mock( + side_effect=Exception) + target = self.resource['id'] + arglist = [target, '--name', 'my-name'] + verifylist = [(self.res, target), ('name', 'my-name')] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.assertRaises( + exceptions.CommandError, self.cmd.take_action, parsed_args) + + +class TestDeleteFirewallPolicy(TestFirewallPolicy, common.TestDeleteFWaaS): + + def setUp(self): + super(TestDeleteFirewallPolicy, self).setUp() + self.neutronclient.delete_fwaas_firewall_policy = mock.Mock( + return_value={self.res: _fwp}) + self.mocked = self.neutronclient.delete_fwaas_firewall_policy + self.cmd = firewallpolicy.DeleteFirewallPolicy( + self.app, self.namespace) + + +class TestFirewallPolicyInsertRule(TestFirewallPolicy): + + def setUp(self): + super(TestFirewallPolicyInsertRule, self).setUp() + self.neutronclient.insert_rule_fwaas_firewall_policy = mock.Mock( + return_value={self.res: _fwp}) + self.mocked = self.neutronclient.insert_rule_fwaas_firewall_policy + self.cmd = firewallpolicy.FirewallPolicyInsertRule(self.app, + self.namespace) + + def test_insert_firewall_rule(self): + target = self.resource['id'] + rule = 'new-rule' + before = 'before' + after = 'after' + arglist = [ + target, + rule, + '--insert-before', before, + '--insert-after', after, + ] + verifylist = [ + (self.res, target), + ('firewall_rule', rule), + ('insert_before', before), + ('insert_after', after), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, { + 'firewall_rule_id': rule, + 'insert_before': before, + 'insert_after': after + }) + self.assertIsNone(result) + + def test_insert_with_no_firewall_rule(self): + target = self.resource['id'] + arglist = [ + target, + ] + verifylist = [ + (self.res, target), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + +class TestFirewallPolicyRemoveRule(TestFirewallPolicy): + + def setUp(self): + super(TestFirewallPolicyRemoveRule, self).setUp() + self.neutronclient.remove_rule_fwaas_firewall_policy = mock.Mock( + return_value={self.res: _fwp}) + self.mocked = self.neutronclient.remove_rule_fwaas_firewall_policy + self.cmd = firewallpolicy.FirewallPolicyRemoveRule(self.app, + self.namespace) + + def test_remove_firewall_rule(self): + target = self.resource['id'] + rule = 'remove-rule' + arglist = [ + target, + rule, + ] + verifylist = [ + (self.res, target), + ('firewall_rule', rule), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.mocked.assert_called_once_with( + target, {'firewall_rule_id': rule}) + self.assertIsNone(result) + + def test_remove_with_no_firewall_rule(self): + target = self.resource['id'] + arglist = [ + target, + ] + verifylist = [ + (self.res, target), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + +class TestUnsetFirewallPolicy(TestFirewallPolicy, common.TestUnsetFWaaS): + + def setUp(self): + super(TestUnsetFirewallPolicy, self).setUp() + self.neutronclient.update_fwaas_firewall_policy = mock.Mock( + return_value={self.res: _fwp}) + self.mocked = self.neutronclient.update_fwaas_firewall_policy + self.cmd = firewallpolicy.UnsetFirewallPolicy(self.app, self.namespace) + + def test_unset_audited(self): + target = self.resource['id'] + arglist = [ + target, + '--audited', + ] + verifylist = [ + (self.res, target), + ('audited', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + body = {self.res: {'audited': False}} + self.mocked.assert_called_once_with(target, body) + self.assertIsNone(result) + + def test_unset_firewall_rule_not_matched(self): + _fwp['firewall_rules'] = ['old_rule'] + target = self.resource['id'] + rule = 'new_rule' + arglist = [ + target, + '--firewall-rule', rule, + ] + verifylist = [ + (self.res, target), + ('firewall_rule', [rule]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + body = {self.res: {'firewall_rules': _fwp['firewall_rules']}} + self.mocked.assert_called_once_with(target, body) + self.assertIsNone(result) + + def test_unset_firewall_rule_matched(self): + _fwp['firewall_rules'] = ['rule1', 'rule2'] + target = self.resource['id'] + rule = 'rule1' + arglist = [ + target, + '--firewall-rule', rule, + ] + verifylist = [ + (self.res, target), + ('firewall_rule', [rule]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + body = {self.res: {'firewall_rules': ['rule2']}} + self.mocked.assert_called_once_with(target, body) + self.assertIsNone(result) + + def test_unset_all_firewall_rule(self): + target = self.resource['id'] + arglist = [ + target, + '--all-firewall-rule', + ] + verifylist = [ + (self.res, target), + ('all_firewall_rule', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + body = {self.res: {'firewall_rules': []}} + self.mocked.assert_called_once_with(target, body) + self.assertIsNone(result) diff --git a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallrule.py b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallrule.py new file mode 100644 index 0000000..dd730ee --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallrule.py @@ -0,0 +1,736 @@ +# Copyright 2016 FUJITSU LIMITED +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import copy +import mock +import re +import testtools + +from osc_lib import exceptions +from osc_lib.tests import utils + +from neutronclient.osc import utils as osc_utils +from neutronclient.osc.v2.fwaas import firewallrule +from neutronclient.tests.unit.osc.v2 import fakes as test_fakes +from neutronclient.tests.unit.osc.v2.fwaas import common +from neutronclient.tests.unit.osc.v2.fwaas import fakes + + +_fwr = fakes.FirewallRule().create() +CONVERT_MAP = { + 'project': 'tenant_id', + 'enable_rule': 'enabled', + 'disable_rule': 'enabled', +} + + +def _generate_data(ordered_dict=None, data=None): + source = ordered_dict if ordered_dict else _fwr + if data: + source.update(data) + return tuple(source[key] for key in source) + + +def _generate_req_and_res(verifylist): + request = dict(verifylist) + response = copy.deepcopy(_fwr) + for key, val in verifylist: + converted = CONVERT_MAP.get(key, key) + del request[key] + if re.match('^no_', key) and val is True: + new_value = None + elif (key == 'enable' or key == 'enable_rule') and val: + new_value = True + elif (key == 'disable' or key == 'disable_rule') and val: + new_value = False + else: + new_value = val + request[converted] = new_value + response[converted] = new_value + return request, response + + +class TestFirewallRule(test_fakes.TestNeutronClientOSCV2): + + def check_results(self, headers, data, exp_req, is_list=False): + if is_list: + req_body = {self.res_plural: [exp_req]} + else: + req_body = {self.res: exp_req} + self.mocked.assert_called_once_with(req_body) + self.assertEqual(self.ordered_headers, headers) + self.assertEqual(self.ordered_data, data) + + def setUp(self): + super(TestFirewallRule, self).setUp() + self.neutronclient.find_resource = mock.Mock() + self.neutronclient.find_resource.side_effect = \ + lambda x, y, **k: {'id': y} + # fw_common.get_id.side_effect = lambda x, y, z: z + osc_utils.find_project = mock.Mock() + osc_utils.find_project.id = _fwr['tenant_id'] + self.res = 'firewall_rule' + self.res_plural = 'firewall_rules' + self.resource = _fwr + self.headers = ( + 'ID', + 'Name', + 'Enabled', + 'Description', + 'IP Version', + 'Action', + 'Protocol', + 'Source IP Address', + 'Source Port', + 'Destination IP Address', + 'Destination Port', + 'Public', + 'Project', + ) + self.data = _generate_data() + self.ordered_headers = ( + 'Action', + 'Description', + 'Destination IP Address', + 'Destination Port', + 'Enabled', + 'ID', + 'IP Version', + 'Name', + 'Project', + 'Protocol', + 'Public', + 'Source IP Address', + 'Source Port', + ) + self.ordered_data = ( + _fwr['action'], + _fwr['description'], + _fwr['destination_ip_address'], + _fwr['destination_port'], + _fwr['enabled'], + _fwr['id'], + _fwr['ip_version'], + _fwr['name'], + _fwr['tenant_id'], + _fwr['protocol'], + _fwr['public'], + _fwr['source_ip_address'], + _fwr['source_port'], + ) + self.ordered_columns = ( + 'action', + 'description', + 'destination_ip_address', + 'destination_port', + 'enabled', + 'id', + 'ip_version', + 'name', + 'tenant_id', + 'protocol', + 'public', + 'source_ip_address', + 'source_port', + ) + + +class TestCreateFirewallRule(TestFirewallRule, common.TestCreateFWaaS): + + def setUp(self): + super(TestCreateFirewallRule, self).setUp() + self.neutronclient.create_fwaas_firewall_rule = mock.Mock( + return_value={self.res: _fwr}) + self.mocked = self.neutronclient.create_fwaas_firewall_rule + self.cmd = firewallrule.CreateFirewallRule(self.app, self.namespace) + + def _update_expect_response(self, request, response): + """Set expected request and response + + :param request + A dictionary of request body(dict of verifylist) + :param response + A OrderedDict of request body + """ + # Update response body + self.neutronclient.create_fwaas_firewall_rule.return_value = \ + {self.res: dict(response)} + osc_utils.find_project.return_value.id = response['tenant_id'] + # Update response(finally returns 'data') + self.data = _generate_data(ordered_dict=response) + self.ordered_data = tuple( + response[column] for column in self.ordered_columns + ) + + def _set_all_params(self, args={}): + name = args.get('name') or 'my-name' + description = args.get('description') or 'my-desc' + source_ip = args.get('source_ip_address') or '192.168.1.0/24' + destination_ip = args.get('destination_ip_address') or '192.168.2.0/24' + source_port = args.get('source_port') or '0:65535' + protocol = args.get('protocol') or 'udp' + action = args.get('action') or 'deny' + ip_version = args.get('ip_version') or '4' + destination_port = args.get('destination_port') or '0:65535' + tenant_id = args.get('tenant_id') or 'my-tenant' + arglist = [ + '--description', description, + '--name', name, + '--protocol', protocol, + '--ip-version', ip_version, + '--source-ip-address', source_ip, + '--destination-ip-address', destination_ip, + '--source-port', source_port, + '--destination-port', destination_port, + '--action', action, + '--project', tenant_id, + '--disable-rule', + '--public', + ] + verifylist = [ + ('name', name), + ('description', description), + ('public', True), + ('protocol', protocol), + ('ip_version', ip_version), + ('source_ip_address', source_ip), + ('destination_ip_address', destination_ip), + ('source_port', source_port), + ('destination_port', destination_port), + ('action', action), + ('disable_rule', True), + ('project', tenant_id), + ] + return arglist, verifylist + + def _test_create_with_all_params(self, args={}): + arglist, verifylist = self._set_all_params(args) + request, response = _generate_req_and_res(verifylist) + self._update_expect_response(request, response) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + + self.check_results(headers, data, request) + + def test_create_with_no_options(self): + arglist = [] + verifylist = [] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + self.check_results(headers, data, {}) + + def test_create_with_all_params(self): + self._test_create_with_all_params() + + def test_create_with_all_params_protocol_any(self): + self._test_create_with_all_params({'protocol': 'any'}) + + def test_create_with_all_params_ip_version_6(self): + self._test_create_with_all_params({'ip_version': '6'}) + + def test_create_with_all_params_invalid_ip_version(self): + arglist, verifylist = self._set_all_params({'ip_version': '128'}) + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_create_with_all_params_action_upper_capitalized(self): + for action in ('Allow', 'DENY', 'Reject'): + arglist, verifylist = self._set_all_params({'action': action}) + self.assertRaises( + testtools.matchers._impl.MismatchError, + self.check_parser, self.cmd, arglist, verifylist) + + def test_create_with_all_params_protocol_upper_capitalized(self): + for protocol in ('TCP', 'Tcp', 'ANY', 'AnY'): + arglist, verifylist = self._set_all_params({'protocol': protocol}) + self.assertRaises( + testtools.matchers._impl.MismatchError, + self.check_parser, self.cmd, arglist, verifylist) + + +class TestListFirewallRule(TestFirewallRule): + + def _setup_summary(self, expect=None): + protocol = _fwr['protocol'].upper() + src = 'source(port): 192.168.1.0/24(1:11111)' + dst = 'dest(port): 192.168.2.2(2:22222)' + action = 'deny' + if expect: + if expect.get('protocol'): + protocol = expect['protocol'] + if expect.get('source'): + src = expect['source'] + if expect.get('dest'): + dst = expect['dest'] + if expect.get('action'): + action = expect['action'] + summary = ',\n '.join([protocol, src, dst, action]) + self.short_data = ( + _fwr['id'], + _fwr['name'], + _fwr['enabled'], + summary + ) + + def setUp(self): + super(TestListFirewallRule, self).setUp() + self.cmd = firewallrule.ListFirewallRule(self.app, self.namespace) + + self.short_header = ( + 'ID', + 'Name', + 'Enabled', + 'Summary', + ) + self._setup_summary() + self.neutronclient.list_fwaas_firewall_rules = mock.Mock( + return_value={self.res_plural: [_fwr]}) + self.mocked = self.neutronclient.list_fwaas_firewall_rules + + def test_list_with_long_option(self): + arglist = ['--long'] + verifylist = [('long', True)] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with() + self.assertEqual(list(self.headers), headers) + self.assertEqual([self.data], list(data)) + + def test_list_with_no_option(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + headers, data = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with() + self.assertEqual(list(self.short_header), headers) + self.assertEqual([self.short_data], list(data)) + + +class TestShowFirewallRule(TestFirewallRule, common.TestShowFWaaS): + + def setUp(self): + super(TestShowFirewallRule, self).setUp() + self.neutronclient.show_fwaas_firewall_rule = mock.Mock( + return_value={self.res: _fwr}) + self.mocked = self.neutronclient.show_fwaas_firewall_rule + self.cmd = firewallrule.ShowFirewallRule(self.app, self.namespace) + + +class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): + + def setUp(self): + super(TestSetFirewallRule, self).setUp() + self.neutronclient.update_fwaas_firewall_rule = mock.Mock( + return_value={self.res: _fwr}) + self.mocked = self.neutronclient.update_fwaas_firewall_rule + self.cmd = firewallrule.SetFirewallRule(self.app, self.namespace) + + def test_set_protocol_with_any(self): + target = self.resource['id'] + protocol = 'any' + arglist = [target, '--protocol', protocol] + verifylist = [ + (self.res, target), + ('protocol', protocol), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'protocol': protocol}}) + self.assertIsNone(result) + + def test_set_protocol_with_udp(self): + target = self.resource['id'] + protocol = 'udp' + arglist = [target, '--protocol', protocol] + verifylist = [ + (self.res, target), + ('protocol', protocol), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'protocol': protocol}}) + self.assertIsNone(result) + + def test_set_source_ip_address(self): + target = self.resource['id'] + src_ip = '192.192.192.192' + arglist = [target, '--source-ip-address', src_ip] + verifylist = [ + (self.res, target), + ('source_ip_address', src_ip), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'source_ip_address': src_ip}}) + self.assertIsNone(result) + + def test_set_source_port(self): + target = self.resource['id'] + src_port = '32678' + arglist = [target, '--source-port', src_port] + verifylist = [ + (self.res, target), + ('source_port', src_port), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'source_port': src_port}}) + self.assertIsNone(result) + + def test_set_destination_ip_address(self): + target = self.resource['id'] + dst_ip = '0.1.0.1' + arglist = [target, '--destination-ip-address', dst_ip] + verifylist = [ + (self.res, target), + ('destination_ip_address', dst_ip), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'destination_ip_address': dst_ip}}) + self.assertIsNone(result) + + def test_set_destination_port(self): + target = self.resource['id'] + dst_port = '65432' + arglist = [target, '--destination-port', dst_port] + verifylist = [ + (self.res, target), + ('destination_port', dst_port), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'destination_port': dst_port}}) + self.assertIsNone(result) + + def test_set_enable_rule(self): + target = self.resource['id'] + arglist = [target, '--enable-rule'] + verifylist = [ + (self.res, target), + ('enable_rule', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'enabled': True}}) + self.assertIsNone(result) + + def test_set_disable_rule(self): + target = self.resource['id'] + arglist = [target, '--disable-rule'] + verifylist = [ + (self.res, target), + ('disable_rule', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'enabled': False}}) + self.assertIsNone(result) + + def test_set_action(self): + target = self.resource['id'] + action = 'reject' + arglist = [target, '--action', action] + verifylist = [ + (self.res, target), + ('action', action), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'action': action}}) + self.assertIsNone(result) + + def test_set_enable_rule_and_disable_rule(self): + target = self.resource['id'] + arglist = [target, '--enable-rule', '--disable-rule'] + verifylist = [ + (self.res, target), + ('enable_rule', True), + ('disable_rule', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_set_no_source_ip_address(self): + target = self.resource['id'] + arglist = [ + target, + '--no-source-ip-address', + ] + verifylist = [ + (self.res, target), + ('no_source_ip_address', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'source_ip_address': None}}) + self.assertIsNone(result) + + def test_set_no_source_port(self): + target = self.resource['id'] + arglist = [ + target, + '--no-source-port', + ] + verifylist = [ + (self.res, target), + ('no_source_port', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'source_port': None}}) + self.assertIsNone(result) + + def test_set_no_destination_ip_address(self): + target = self.resource['id'] + arglist = [ + target, + '--no-destination-ip-address', + ] + verifylist = [ + (self.res, target), + ('no_destination_ip_address', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'destination_ip_address': None}}) + self.assertIsNone(result) + + def test_set_no_destination_port(self): + target = self.resource['id'] + arglist = [ + target, + '--no-destination-port', + ] + verifylist = [ + (self.res, target), + ('no_destination_port', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'destination_port': None}}) + self.assertIsNone(result) + + def test_set_source_ip_address_and_no(self): + target = self.resource['id'] + arglist = [ + target, + '--source-ip-address', '192.168.1.0/24', + '--no-source-ip-address', + ] + verifylist = [ + (self.res, target), + ('source_ip_address', '192.168.1.0/24'), + ('no_source_ip_address', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_set_destination_ip_address_and_no(self): + target = self.resource['id'] + arglist = [ + target, + '--destination-ip-address', '192.168.2.0/24', + '--no-destination-ip-address', + ] + verifylist = [ + (self.res, target), + ('destination_ip_address', '192.168.2.0/24'), + ('no_destination_ip_address', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_set_source_port_and_no(self): + target = self.resource['id'] + arglist = [ + target, + '--source-port', '1:12345', + '--no-source-port', + ] + verifylist = [ + (self.res, target), + ('source_port', '1:12345'), + ('no_source_port', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_set_destination_port_and_no(self): + target = self.resource['id'] + arglist = [ + target, + '--destination-port', '1:54321', + '--no-destination-port', + ] + verifylist = [ + (self.res, target), + ('destination_port', '1:54321'), + ('no_destination_port', True), + ] + self.assertRaises( + utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_set_and_raises(self): + self.neutronclient.update_fwaas_firewall_rule = mock.Mock( + side_effect=Exception) + target = self.resource['id'] + arglist = [target, '--name', 'my-name'] + verifylist = [(self.res, target), ('name', 'my-name')] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.assertRaises( + exceptions.CommandError, self.cmd.take_action, parsed_args) + + +class TestUnsetFirewallRule(TestFirewallRule, common.TestUnsetFWaaS): + + def setUp(self): + super(TestUnsetFirewallRule, self).setUp() + self.neutronclient.update_fwaas_firewall_rule = mock.Mock( + return_value={self.res: _fwr}) + self.mocked = self.neutronclient.update_fwaas_firewall_rule + self.cmd = firewallrule.UnsetFirewallRule(self.app, self.namespace) + + def test_unset_source_port(self): + target = self.resource['id'] + arglist = [ + target, + '--source-port', + ] + verifylist = [ + (self.res, target), + ('source_port', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'source_port': None}}) + self.assertIsNone(result) + + def test_unset_destination_port(self): + target = self.resource['id'] + arglist = [ + target, + '--destination-port', + ] + verifylist = [ + (self.res, target), + ('destination_port', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'destination_port': None}}) + self.assertIsNone(result) + + def test_unset_source_ip_address(self): + target = self.resource['id'] + arglist = [ + target, + '--source-ip-address', + ] + verifylist = [ + (self.res, target), + ('source_ip_address', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'source_ip_address': None}}) + self.assertIsNone(result) + + def test_unset_destination_ip_address(self): + target = self.resource['id'] + arglist = [ + target, + '--destination-ip-address', + ] + verifylist = [ + (self.res, target), + ('destination_ip_address', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'destination_ip_address': None}}) + self.assertIsNone(result) + + def test_unset_enable_rule(self): + target = self.resource['id'] + arglist = [ + target, + '--enable-rule', + ] + verifylist = [ + (self.res, target), + ('enable_rule', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.mocked.assert_called_once_with( + target, {self.res: {'enabled': False}}) + self.assertIsNone(result) + + +class TestDeleteFirewallRule(TestFirewallRule, common.TestDeleteFWaaS): + + def setUp(self): + super(TestDeleteFirewallRule, self).setUp() + self.neutronclient.delete_fwaas_firewall_rule = mock.Mock( + return_value={self.res: _fwr}) + self.mocked = self.neutronclient.delete_fwaas_firewall_rule + self.cmd = firewallrule.DeleteFirewallRule(self.app, self.namespace) diff --git a/neutronclient/v2_0/client.py b/neutronclient/v2_0/client.py index 374f873..e20e94a 100644 --- a/neutronclient/v2_0/client.py +++ b/neutronclient/v2_0/client.py @@ -588,6 +588,16 @@ class Client(ClientBase): firewall_policy_remove_path = "/fw/firewall_policies/%s/remove_rule" firewalls_path = "/fw/firewalls" firewall_path = "/fw/firewalls/%s" + fwaas_firewall_groups_path = "/fwaas/firewall_groups" + fwaas_firewall_group_path = "/fwaas/firewall_groups/%s" + fwaas_firewall_rules_path = "/fwaas/firewall_rules" + fwaas_firewall_rule_path = "/fwaas/firewall_rules/%s" + fwaas_firewall_policies_path = "/fwaas/firewall_policies" + fwaas_firewall_policy_path = "/fwaas/firewall_policies/%s" + fwaas_firewall_policy_insert_path = \ + "/fwaas/firewall_policies/%s/insert_rule" + fwaas_firewall_policy_remove_path = \ + "/fwaas/firewall_policies/%s/remove_rule" rbac_policies_path = "/rbac-policies" rbac_policy_path = "/rbac-policies/%s" qos_policies_path = "/qos/policies" @@ -649,6 +659,9 @@ class Client(ClientBase): 'firewall_rules': 'firewall_rule', 'firewall_policies': 'firewall_policy', 'firewalls': 'firewall', + 'fwaas_firewall_rules': 'fwaas_firewall_rule', + 'fwaas_firewall_policies': 'fwaas_firewall_policy', + 'fwaas_firewall_groups': 'fwaas_firewall_group', 'metering_labels': 'metering_label', 'metering_label_rules': 'metering_label_rule', 'loadbalancers': 'loadbalancer', @@ -1572,6 +1585,87 @@ class Client(ClientBase): """Deletes the specified firewall.""" return self.delete(self.firewall_path % (firewall)) + def list_fwaas_firewall_groups(self, retrieve_all=True, **_params): + """Fetches a list of all firewall groups for a project""" + return self.list('firewall_groups', self.fwaas_firewall_groups_path, + retrieve_all, **_params) + + def show_fwaas_firewall_group(self, fwg, **_params): + """Fetches information of a certain firewall group""" + return self.get(self.fwaas_firewall_group_path % (fwg), params=_params) + + def create_fwaas_firewall_group(self, body=None): + """Creates a new firewall group""" + return self.post(self.fwaas_firewall_groups_path, body=body) + + def update_fwaas_firewall_group(self, fwg, body=None): + """Updates a firewall group""" + return self.put(self.fwaas_firewall_group_path % (fwg), body=body) + + def delete_fwaas_firewall_group(self, fwg): + """Deletes the specified firewall group""" + return self.delete(self.fwaas_firewall_group_path % (fwg)) + + def list_fwaas_firewall_rules(self, retrieve_all=True, **_params): + """Fetches a list of all firewall rules for a project""" + # Pass filters in "params" argument to do_request + return self.list('firewall_rules', self.fwaas_firewall_rules_path, + retrieve_all, **_params) + + def show_fwaas_firewall_rule(self, firewall_rule, **_params): + """Fetches information of a certain firewall rule""" + return self.get(self.fwaas_firewall_rule_path % (firewall_rule), + params=_params) + + def create_fwaas_firewall_rule(self, body=None): + """Creates a new firewall rule""" + return self.post(self.fwaas_firewall_rules_path, body=body) + + def update_fwaas_firewall_rule(self, firewall_rule, body=None): + """Updates a firewall rule""" + return self.put(self.fwaas_firewall_rule_path % (firewall_rule), + body=body) + + def delete_fwaas_firewall_rule(self, firewall_rule): + """Deletes the specified firewall rule""" + return self.delete(self.fwaas_firewall_rule_path % (firewall_rule)) + + def list_fwaas_firewall_policies(self, retrieve_all=True, **_params): + """Fetches a list of all firewall policies for a project""" + # Pass filters in "params" argument to do_request + + return self.list('firewall_policies', + self.fwaas_firewall_policies_path, + retrieve_all, **_params) + + def show_fwaas_firewall_policy(self, firewall_policy, **_params): + """Fetches information of a certain firewall policy""" + return self.get(self.fwaas_firewall_policy_path % (firewall_policy), + params=_params) + + def create_fwaas_firewall_policy(self, body=None): + """Creates a new firewall policy""" + return self.post(self.fwaas_firewall_policies_path, body=body) + + def update_fwaas_firewall_policy(self, firewall_policy, body=None): + """Updates a firewall policy""" + return self.put(self.fwaas_firewall_policy_path % (firewall_policy), + body=body) + + def delete_fwaas_firewall_policy(self, firewall_policy): + """Deletes the specified firewall policy""" + return self.delete(self.fwaas_firewall_policy_path % (firewall_policy)) + + def insert_rule_fwaas_firewall_policy(self, firewall_policy, body=None): + """Inserts specified rule into firewall policy""" + return self.put((self.fwaas_firewall_policy_insert_path % + (firewall_policy)), body=body) + + def remove_rule_fwaas_firewall_policy(self, firewall_policy, body=None): + """Removes specified rule from firewall policy""" + return self.put((self.fwaas_firewall_policy_remove_path % + (firewall_policy)), body=body) + def remove_router_from_l3_agent(self, l3_agent, router_id): """Remove a router from l3 agent.""" return self.delete((self.agent_path + self.L3_ROUTERS + "/%s") % ( diff --git a/releasenotes/notes/support-fwaasv2-cli-7f21676c551f8ae0.yaml b/releasenotes/notes/support-fwaasv2-cli-7f21676c551f8ae0.yaml new file mode 100644 index 0000000..2032bfa --- /dev/null +++ b/releasenotes/notes/support-fwaasv2-cli-7f21676c551f8ae0.yaml @@ -0,0 +1,4 @@ +--- +features: + - CLI support for the "Firewall as a Service v2" feature, which is enhanced + FWaaS functionality, as OSC plugin commands. diff --git a/setup.cfg b/setup.cfg index 73c1e65..041ff1a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -44,6 +44,27 @@ openstack.neutronclient.v2 = network_trunk_show = neutronclient.osc.v2.trunk.network_trunk:ShowNetworkTrunk network_trunk_unset = neutronclient.osc.v2.trunk.network_trunk:UnsetNetworkTrunk + firewall_group_create = neutronclient.osc.v2.fwaas.firewallgroup:CreateFirewallGroup + firewall_group_delete = neutronclient.osc.v2.fwaas.firewallgroup:DeleteFirewallGroup + firewall_group_list = neutronclient.osc.v2.fwaas.firewallgroup:ListFirewallGroup + firewall_group_set = neutronclient.osc.v2.fwaas.firewallgroup:SetFirewallGroup + firewall_group_show = neutronclient.osc.v2.fwaas.firewallgroup:ShowFirewallGroup + firewall_group_unset = neutronclient.osc.v2.fwaas.firewallgroup:UnsetFirewallGroup + firewall_group_policy_create = neutronclient.osc.v2.fwaas.firewallpolicy:CreateFirewallPolicy + firewall_group_policy_delete = neutronclient.osc.v2.fwaas.firewallpolicy:DeleteFirewallPolicy + firewall_group_policy_add_rule = neutronclient.osc.v2.fwaas.firewallpolicy:FirewallPolicyInsertRule + firewall_group_policy_list = neutronclient.osc.v2.fwaas.firewallpolicy:ListFirewallPolicy + firewall_group_policy_remove_rule = neutronclient.osc.v2.fwaas.firewallpolicy:FirewallPolicyRemoveRule + firewall_group_policy_set = neutronclient.osc.v2.fwaas.firewallpolicy:SetFirewallPolicy + firewall_group_policy_show = neutronclient.osc.v2.fwaas.firewallpolicy:ShowFirewallPolicy + firewall_group_policy_unset = neutronclient.osc.v2.fwaas.firewallpolicy:UnsetFirewallPolicy + firewall_group_rule_create = neutronclient.osc.v2.fwaas.firewallrule:CreateFirewallRule + firewall_group_rule_delete = neutronclient.osc.v2.fwaas.firewallrule:DeleteFirewallRule + firewall_group_rule_list = neutronclient.osc.v2.fwaas.firewallrule:ListFirewallRule + firewall_group_rule_set = neutronclient.osc.v2.fwaas.firewallrule:SetFirewallRule + firewall_group_rule_show = neutronclient.osc.v2.fwaas.firewallrule:ShowFirewallRule + firewall_group_rule_unset = neutronclient.osc.v2.fwaas.firewallrule:UnsetFirewallRule + [build_sphinx] all_files = 1 build-dir = doc/build