Add FWaaS V2 commands for OSC plugin

This commit supports FWaaS v2 CLI as OSC plugin[1].
[1] http://specs.openstack.org/openstack/neutron-specs/specs/newton/fwaas-api-2.0.html

Partial-Implements: blueprint fwaas-api-2.0
Depends-On: I7b4108772e8370e8f51971caf40ecd23e9f977e9
Change-Id: I57504f97ac39b5b7d301fd5cc88228a121f0677e
Closes-Bug: #1609686
This commit is contained in:
Yushiro FURUKAWA 2016-09-07 09:48:04 +09:00
parent f1ae9b7cb4
commit 0a721cf576
12 changed files with 3679 additions and 0 deletions

@ -0,0 +1,26 @@
# Copyright 2016 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
FWG = 'firewall_group'
FWGS = 'firewall_groups'
FWP = 'firewall_policy'
FWPS = 'firewall_policies'
FWR = 'firewall_rule'
FWRS = 'firewall_rules'
CMD_RESOURCE_MAP = dict(
(res, 'fwaas_' + res) for res in [FWG, FWGS, FWP, FWPS, FWR, FWRS]
)

@ -0,0 +1,377 @@
# Copyright 2016 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils
from oslo_log import log as logging
from neutronclient._i18n import _
from neutronclient.osc import utils as osc_utils
from neutronclient.osc.v2.fwaas import constants as const
from neutronclient.osc.v2 import utils as v2_utils
LOG = logging.getLogger(__name__)
_formatters = {
'admin_state_up': v2_utils.format_admin_state,
}
_attr_map = (
('id', 'ID', osc_utils.LIST_BOTH),
('name', 'Name', osc_utils.LIST_BOTH),
('ingress_firewall_policy_id', 'Ingress Policy ID', osc_utils.LIST_BOTH),
('egress_firewall_policy_id', 'Egress Policy ID', osc_utils.LIST_BOTH),
('description', 'Description', osc_utils.LIST_LONG_ONLY),
('status', 'Status', osc_utils.LIST_LONG_ONLY),
('ports', 'Ports', osc_utils.LIST_LONG_ONLY),
('admin_state_up', 'State', osc_utils.LIST_LONG_ONLY),
('public', 'Public', osc_utils.LIST_LONG_ONLY),
('tenant_id', 'Project', osc_utils.LIST_LONG_ONLY),
)
def _get_common_parser(parser):
parser.add_argument(
'--name',
help=_('Name for the firewall group'))
parser.add_argument(
'--description',
metavar='<description>',
help=_('Description of the firewall group'))
ingress_group = parser.add_mutually_exclusive_group()
ingress_group.add_argument(
'--ingress-firewall-policy',
metavar='<ingress-firewall-policy>',
dest='ingress_firewall_policy',
help=_('Ingress firewall policy (name or ID)'))
ingress_group.add_argument(
'--no-ingress-firewall-policy',
dest='no_ingress_firewall_policy',
action='store_true',
help=_('Detach ingress firewall policy from the firewall group'))
egress_group = parser.add_mutually_exclusive_group()
egress_group.add_argument(
'--egress-firewall-policy',
metavar='<egress-firewall-policy>',
dest='egress_firewall_policy',
help=_('Egress firewall policy (name or ID)'))
egress_group.add_argument(
'--no-egress-firewall-policy',
dest='no_egress_firewall_policy',
action='store_true',
help=_('Detach egress firewall policy from the firewall group'))
public_group = parser.add_mutually_exclusive_group()
public_group.add_argument(
'--public',
action='store_true',
help=_('Make the firewall group public, which allows it to be '
'used in all projects (as opposed to the default, '
'which is to restrict its use to the current project)'))
public_group.add_argument(
'--private',
action='store_true',
help=_('Restrict use of the firewall group to the '
'current project'))
admin_group = parser.add_mutually_exclusive_group()
admin_group.add_argument(
'--enable',
action='store_true',
help=_('Enable firewall group'))
admin_group.add_argument(
'--disable',
action='store_true',
help=_('Disable firewall group'))
return parser
def _get_common_attrs(client_manager, parsed_args, is_create=True):
attrs = {}
client = client_manager.neutronclient
if is_create:
if 'project' in parsed_args and parsed_args.project is not None:
attrs['tenant_id'] = osc_utils.find_project(
client_manager.identity,
parsed_args.project,
parsed_args.project_domain,
).id
if (parsed_args.ingress_firewall_policy and
parsed_args.no_ingress_firewall_policy):
attrs['ingress_firewall_policy_id'] = client.find_resource(
const.FWP, parsed_args.ingress_firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
elif parsed_args.ingress_firewall_policy:
attrs['ingress_firewall_policy_id'] = client.find_resource(
const.FWP, parsed_args.ingress_firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
elif parsed_args.no_ingress_firewall_policy:
attrs['ingress_firewall_policy_id'] = None
if (parsed_args.egress_firewall_policy and
parsed_args.no_egress_firewall_policy):
attrs['egress_firewall_policy_id'] = client.find_resource(
const.FWP, parsed_args.egress_firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
elif parsed_args.egress_firewall_policy:
attrs['egress_firewall_policy_id'] = client.find_resource(
const.FWP, parsed_args.egress_firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
elif parsed_args.no_egress_firewall_policy:
attrs['egress_firewall_policy_id'] = None
if parsed_args.public:
attrs['public'] = True
if parsed_args.private:
attrs['public'] = False
if parsed_args.enable:
attrs['admin_state_up'] = True
if parsed_args.disable:
attrs['admin_state_up'] = False
if parsed_args.name:
attrs['name'] = str(parsed_args.name)
if parsed_args.description:
attrs['description'] = str(parsed_args.description)
if parsed_args.port and parsed_args.no_port:
attrs['ports'] = sorted([client.find_resource(
'port', p)['id'] for p in set(parsed_args.port)])
elif parsed_args.port:
ports = []
for p in set(parsed_args.port):
ports.append(client.find_resource('port', p)['id'])
if not is_create:
ports += client.find_resource(
const.FWG, parsed_args.firewall_group)['ports']
attrs['ports'] = sorted(set(ports))
elif parsed_args.no_port:
attrs['ports'] = []
return attrs
class CreateFirewallGroup(command.ShowOne):
_description = _("Create a new firewall group")
def get_parser(self, prog_name):
parser = super(CreateFirewallGroup, self).get_parser(prog_name)
_get_common_parser(parser)
osc_utils.add_project_owner_option_to_parser(parser)
port_group = parser.add_mutually_exclusive_group()
port_group.add_argument(
'--port',
metavar='<port>',
action='append',
help=_('Port(s) (name or ID) to apply firewall group. This '
'option can be repeated'))
port_group.add_argument(
'--no-port',
dest='no_port',
action='store_true',
help=_('Detach all port from the firewall group'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_common_attrs(self.app.client_manager, parsed_args)
obj = client.create_fwaas_firewall_group(
{const.FWG: attrs})[const.FWG]
columns, display_columns = osc_utils.get_columns(obj, _attr_map)
data = utils.get_dict_properties(obj, columns, formatters=_formatters)
return (display_columns, data)
class DeleteFirewallGroup(command.Command):
_description = _("Delete firewall group(s)")
def get_parser(self, prog_name):
parser = super(DeleteFirewallGroup, self).get_parser(prog_name)
parser.add_argument(
const.FWG,
metavar='<firewall-group>',
nargs='+',
help=_('Firewall group(s) to delete (name or ID)'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
result = 0
for fwg in parsed_args.firewall_group:
try:
fwg_id = client.find_resource(
const.FWG, fwg, cmd_resource='fwaas_' + const.FWG)['id']
client.delete_fwaas_firewall_group(fwg_id)
except Exception as e:
result += 1
LOG.error(_("Failed to delete firewall group with "
"name or ID '%(firewall_group)s': %(e)s") % {
const.FWG: fwg, 'e': e})
if result > 0:
total = len(parsed_args.firewall_groups)
msg = (_("%(result)s of %(total)s firewall group(s) "
"failed to delete.") % {'result': result, 'total': total})
raise exceptions.CommandError(msg)
class ListFirewallGroup(command.Lister):
_description = _("List firewall groups")
def get_parser(self, prog_name):
parser = super(ListFirewallGroup, self).get_parser(prog_name)
parser.add_argument(
'--long',
action='store_true',
help=_("List additional fields in output")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
obj = client.list_fwaas_firewall_groups()[const.FWGS]
headers, columns = osc_utils.get_column_definitions(
_attr_map, long_listing=parsed_args.long)
return (headers, (utils.get_dict_properties(
s, columns, formatters=_formatters) for s in obj))
class SetFirewallGroup(command.Command):
_description = _("Set firewall group properties")
def get_parser(self, prog_name):
parser = super(SetFirewallGroup, self).get_parser(prog_name)
_get_common_parser(parser)
parser.add_argument(
const.FWG,
metavar='<firewall-group>',
help=_('Firewall group to update (name or ID)'))
parser.add_argument(
'--port',
metavar='<port>',
action='append',
help=_('Port(s) (name or ID) to apply firewall group. This '
'option can be repeated'))
parser.add_argument(
'--no-port',
dest='no_port',
action='store_true',
help=_('Detach all port from the firewall group'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group,
cmd_resource='fwaas_' + const.FWG)['id']
attrs = _get_common_attrs(self.app.client_manager, parsed_args,
is_create=False)
try:
client.update_fwaas_firewall_group(fwg_id, {const.FWG: attrs})
except Exception as e:
msg = (_("Failed to set firewall group '%(group)s': %(e)s")
% {'group': parsed_args.firewall_group, 'e': e})
raise exceptions.CommandError(msg)
class ShowFirewallGroup(command.ShowOne):
_description = _("Display firewall group details")
def get_parser(self, prog_name):
parser = super(ShowFirewallGroup, self).get_parser(prog_name)
parser.add_argument(
const.FWG,
metavar='<firewall-group>',
help=_('Firewall group to show (name or ID)'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group,
cmd_resource='fwaas_' + const.FWG)['id']
obj = client.show_fwaas_firewall_group(fwg_id)[const.FWG]
columns, display_columns = osc_utils.get_columns(obj, _attr_map)
data = utils.get_dict_properties(obj, columns, formatters=_formatters)
return (display_columns, data)
class UnsetFirewallGroup(command.Command):
_description = _("Unset firewall group properties")
def get_parser(self, prog_name):
parser = super(UnsetFirewallGroup, self).get_parser(prog_name)
parser.add_argument(
const.FWG,
metavar='<firewall-group>',
help=_('Firewall group to unset (name or ID)'))
port_group = parser.add_mutually_exclusive_group()
port_group.add_argument(
'--port',
metavar='<port>',
action='append',
help=_('Port(s) (name or ID) to apply firewall group. This '
'option can be repeated'))
port_group.add_argument(
'--all-port',
action='store_true',
help=_('Remove all ports for this firewall group'))
parser.add_argument(
'--ingress-firewall-policy',
action='store_true',
help=_('Ingress firewall policy (name or ID) to delete'))
parser.add_argument(
'--egress-firewall-policy',
action='store_true',
dest='egress_firewall_policy',
help=_('Egress firewall policy (name or ID) to delete'))
parser.add_argument(
'--public',
action='store_true',
help=_('Restrict use of the firewall group to the '
'current project'))
parser.add_argument(
'--enable',
action='store_true',
help=_('Disable firewall group'))
return parser
def _get_attrs(self, client_manager, parsed_args):
attrs = {}
client = client_manager.neutronclient
if parsed_args.ingress_firewall_policy:
attrs['ingress_firewall_policy_id'] = None
if parsed_args.egress_firewall_policy:
attrs['egress_firewall_policy_id'] = None
if parsed_args.public:
attrs['public'] = False
if parsed_args.enable:
attrs['admin_state_up'] = False
if parsed_args.port:
old = client.find_resource(
const.FWG, parsed_args.firewall_group)['ports']
new = [client.find_resource(
'port', r)['id'] for r in parsed_args.port]
attrs['ports'] = sorted(list(set(old) - set(new)))
if parsed_args.all_port:
attrs['ports'] = []
return attrs
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group,
cmd_resource='fwaas_' + const.FWG)['id']
attrs = self._get_attrs(self.app.client_manager, parsed_args)
try:
client.update_fwaas_firewall_group(fwg_id, {const.FWG: attrs})
except Exception as e:
msg = (_("Failed to unset firewall group '%(group)s': %(e)s")
% {'group': parsed_args.firewall_group, 'e': e})
raise exceptions.CommandError(msg)

@ -0,0 +1,423 @@
# Copyright 2016 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from __future__ import print_function
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils
from oslo_log import log as logging
from neutronclient._i18n import _
from neutronclient.osc import utils as osc_utils
from neutronclient.osc.v2.fwaas import constants as const
LOG = logging.getLogger(__name__)
_formatters = {}
_attr_map = (
('id', 'ID', osc_utils.LIST_BOTH),
('name', 'Name', osc_utils.LIST_BOTH),
('firewall_rules', 'Firewall Rules', osc_utils.LIST_BOTH),
('description', 'Description', osc_utils.LIST_LONG_ONLY),
('audited', 'Audited', osc_utils.LIST_LONG_ONLY),
('public', 'Public', osc_utils.LIST_LONG_ONLY),
('tenant_id', 'Project', osc_utils.LIST_LONG_ONLY),
)
def _get_common_attrs(client_manager, parsed_args, is_create=True):
attrs = {}
client = client_manager.neutronclient
if is_create:
if 'project' in parsed_args and parsed_args.project is not None:
attrs['tenant_id'] = osc_utils.find_project(
client_manager.identity,
parsed_args.project,
parsed_args.project_domain,
).id
if parsed_args.firewall_rule and parsed_args.no_firewall_rule:
_firewall_rules = []
for f in set(parsed_args.firewall_rule):
_firewall_rules.append(client.find_resource(
const.FWR, f, cmd_resource='fwaas_' + const.FWR)['id'])
attrs[const.FWRS] = sorted(_firewall_rules)
elif parsed_args.firewall_rule:
rules = []
for f in set(parsed_args.firewall_rule):
rules.append(client.find_resource(
const.FWR, f, cmd_resource='fwaas_' + const.FWR)['id'])
if not is_create:
rules += client.find_resource(
const.FWP, parsed_args.firewall_policy)[const.FWRS]
attrs[const.FWRS] = sorted(set(rules))
elif parsed_args.no_firewall_rule:
attrs[const.FWRS] = []
if parsed_args.audited:
attrs['audited'] = True
if parsed_args.no_audited:
attrs['audited'] = False
if parsed_args.name:
attrs['name'] = str(parsed_args.name)
if parsed_args.description:
attrs['description'] = str(parsed_args.description)
if parsed_args.public:
attrs['public'] = True
if parsed_args.private:
attrs['public'] = False
return attrs
def _get_common_parser(parser):
parser.add_argument(
'--description',
help=_('Description of the firewall policy'))
audited_group = parser.add_mutually_exclusive_group()
audited_group.add_argument(
'--audited',
action='store_true',
help=_('Enable auditing for the policy'))
audited_group.add_argument(
'--no-audited',
action='store_true',
help=_('Disable auditing for the policy'))
public_group = parser.add_mutually_exclusive_group()
public_group.add_argument(
'--public',
action='store_true',
help=_('Make the firewall policy public, which allows it to be '
'used in all projects (as opposed to the default, '
'which is to restrict its use to the current project)'))
public_group.add_argument(
'--private',
action='store_true',
help=_('Restrict use of the firewall policy to the '
'current project'))
return parser
class CreateFirewallPolicy(command.ShowOne):
_description = _("Create a new firewall policy")
def get_parser(self, prog_name):
parser = super(CreateFirewallPolicy, self).get_parser(prog_name)
_get_common_parser(parser)
osc_utils.add_project_owner_option_to_parser(parser)
parser.add_argument(
'name',
metavar='<name>',
help=_('Name for the firewall policy'))
fwr_group = parser.add_mutually_exclusive_group()
fwr_group.add_argument(
'--firewall-rule',
action='append',
metavar='<firewall-rule>',
help=_('Firewall rule(s) to apply (name or ID)'))
fwr_group.add_argument(
'--no-firewall-rule',
action='store_true',
help=_('Unset all firewall rules from firewall policy'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_common_attrs(self.app.client_manager, parsed_args)
obj = client.create_fwaas_firewall_policy(
{const.FWP: attrs})[const.FWP]
columns, display_columns = osc_utils.get_columns(obj, _attr_map)
data = utils.get_dict_properties(obj, columns, formatters=_formatters)
return (display_columns, data)
class DeleteFirewallPolicy(command.Command):
_description = _("Delete firewall policy(s)")
def get_parser(self, prog_name):
parser = super(DeleteFirewallPolicy, self).get_parser(prog_name)
parser.add_argument(
const.FWP,
metavar='<firewall-policy>',
nargs='+',
help=_('Firewall policy(s) to delete (name or ID)'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
result = 0
for fwp in parsed_args.firewall_policy:
try:
fwp_id = client.find_resource(
const.FWP, fwp, cmd_resource='fwaas_' + const.FWP)['id']
client.delete_fwaas_firewall_policy(fwp_id)
except Exception as e:
result += 1
LOG.error(_("Failed to delete Firewall policy with "
"name or ID '%(firewall_policy)s': %(e)s") % {
const.FWP: fwp, 'e': e})
if result > 0:
total = len(parsed_args.firewall_policy)
msg = (_("%(result)s of %(total)s Firewall policy(s) "
"failed to delete.") % {'result': result, 'total': total})
raise exceptions.CommandError(msg)
class FirewallPolicyInsertRule(command.Command):
_description = _("Insert a rule into a given firewall policy")
def get_parser(self, prog_name):
parser = super(FirewallPolicyInsertRule, self).get_parser(prog_name)
parser.add_argument(
const.FWP,
metavar='<firewall-policy>',
help=_('Firewall policy to insert rule (name or ID)'))
parser.add_argument(
'--insert-before',
metavar='<firewall-rule>',
help=_('Insert the new rule before this existing rule '
'(name or ID)'))
parser.add_argument(
'--insert-after',
metavar='<firewall-rule>',
help=_('Insert the new rule after this existing rule '
'(name or ID)'))
parser.add_argument(
const.FWR,
metavar='<firewall-rule>',
help=_('Firewall rule to be inserted (name or ID)'))
return parser
def args2body(self, parsed_args):
client = self.app.client_manager.neutronclient
_rule_id = _get_required_firewall_rule(client, parsed_args)
_insert_before = ''
if 'insert_before' in parsed_args:
if parsed_args.insert_before:
_insert_before = client.find_resource(
const.FWR, parsed_args.insert_before,
cmd_resource='fwaas_' + const.FWR)['id']
_insert_after = ''
if 'insert_after' in parsed_args:
if parsed_args.insert_after:
_insert_after = client.find_resource(
const.FWR, parsed_args.insert_after,
cmd_resource='fwaas_' + const.FWR)['id']
return {'firewall_rule_id': _rule_id,
'insert_before': _insert_before,
'insert_after': _insert_after}
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
body = self.args2body(parsed_args)
policy_id = client.find_resource(
const.FWP, parsed_args.firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
client.insert_rule_fwaas_firewall_policy(policy_id, body)
rule_id = body['firewall_rule_id']
policy = parsed_args.firewall_policy
print((_('Inserted firewall rule %(rule)s in firewall policy '
'%(policy)s') % {'rule': rule_id, 'policy': policy}),
file=self.app.stdout)
class FirewallPolicyRemoveRule(command.Command):
_description = _("Remove a rule from a given firewall policy")
def get_parser(self, prog_name):
parser = super(FirewallPolicyRemoveRule, self).get_parser(prog_name)
parser.add_argument(
const.FWP,
metavar='<firewall-policy>',
help=_('Firewall policy to remove rule (name or ID)'))
parser.add_argument(
const.FWR,
metavar='<firewall-rule>',
help=_('Firewall rule to remove from policy (name or ID)'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
fwr_id = _get_required_firewall_rule(client, parsed_args)
body = {'firewall_rule_id': fwr_id}
policy_id = client.find_resource(
const.FWP, parsed_args.firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
client.remove_rule_fwaas_firewall_policy(policy_id, body)
rule_id = body['firewall_rule_id']
policy = parsed_args.firewall_policy
print((_('Removed firewall rule %(rule)s from firewall policy '
'%(policy)s') % {'rule': rule_id, 'policy': policy}),
file=self.app.stdout)
class ListFirewallPolicy(command.Lister):
_description = _("List firewall policies")
def get_parser(self, prog_name):
parser = super(ListFirewallPolicy, self).get_parser(prog_name)
parser.add_argument(
'--long',
action='store_true',
default=False,
help=_("List additional fields in output")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
obj = client.list_fwaas_firewall_policies()[const.FWPS]
headers, columns = osc_utils.get_column_definitions(
_attr_map, long_listing=parsed_args.long)
return (headers, (utils.get_dict_properties(
s, columns, formatters=_formatters) for s in obj))
class SetFirewallPolicy(command.Command):
_description = _("Set firewall policy properties")
def get_parser(self, prog_name):
parser = super(SetFirewallPolicy, self).get_parser(prog_name)
_get_common_parser(parser)
parser.add_argument(
const.FWP,
metavar='<firewall-policy>',
help=_('Firewall policy to update (name or ID)'))
parser.add_argument(
'--name',
metavar='<name>',
help=_('Name for the firewall policy'))
parser.add_argument(
'--firewall-rule',
action='append',
metavar='<firewall-rule>',
help=_('Firewall rule(s) to apply (name or ID)'))
parser.add_argument(
'--no-firewall-rule',
action='store_true',
help=_('Remove all firewall rules from firewall policy'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
fwp_id = client.find_resource(
const.FWP, parsed_args.firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
attrs = _get_common_attrs(self.app.client_manager,
parsed_args, is_create=False)
try:
client.update_fwaas_firewall_policy(fwp_id, {const.FWP: attrs})
except Exception as e:
msg = (_("Failed to set firewall policy '%(policy)s': %(e)s")
% {'policy': parsed_args.firewall_policy, 'e': e})
raise exceptions.CommandError(msg)
class ShowFirewallPolicy(command.ShowOne):
_description = _("Display firewall policy details")
def get_parser(self, prog_name):
parser = super(ShowFirewallPolicy, self).get_parser(prog_name)
parser.add_argument(
const.FWP,
metavar='<firewall-policy>',
help=_('Firewall policy to show (name or ID)'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
fwp_id = client.find_resource(const.FWP,
parsed_args.firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
obj = client.show_fwaas_firewall_policy(fwp_id)[const.FWP]
columns, display_columns = osc_utils.get_columns(obj, _attr_map)
data = utils.get_dict_properties(obj, columns, formatters=_formatters)
return (display_columns, data)
def _get_required_firewall_rule(client, parsed_args):
if not parsed_args.firewall_rule:
msg = (_("Firewall rule (name or ID) is required."))
raise exceptions.CommandError(msg)
return client.find_resource(const.FWR, parsed_args.firewall_rule,
cmd_resource='fwaas_' + const.FWR)['id']
class UnsetFirewallPolicy(command.Command):
_description = _("Unset firewall policy properties")
def get_parser(self, prog_name):
parser = super(UnsetFirewallPolicy, self).get_parser(prog_name)
parser.add_argument(
const.FWP,
metavar='<firewall-policy>',
help=_('Firewall policy to unset (name or ID)'))
firewall_rule_group = parser.add_mutually_exclusive_group()
firewall_rule_group.add_argument(
'--firewall-rule',
action='append',
metavar='<firewall-rule>',
help=_('Remove firewall rule(s) from the firewall policy '
'(name or ID)'))
firewall_rule_group.add_argument(
'--all-firewall-rule',
action='store_true',
help=_('Remove all firewall rules from the firewall policy'))
parser.add_argument(
'--audited',
action='store_true',
help=_('Disable auditing for the policy'))
parser.add_argument(
'--public',
action='store_true',
help=_('Restrict use of the firewall policy to the '
'current project'))
return parser
def _get_attrs(self, client_manager, parsed_args):
attrs = {}
client = client_manager.neutronclient
if parsed_args.firewall_rule:
old = client.find_resource(
const.FWP, parsed_args.firewall_policy)[const.FWRS]
new = []
for f in set(parsed_args.firewall_rule):
new.append(client.find_resource(
const.FWR, f, cmd_resource='fwaas_' + const.FWR)['id'])
attrs[const.FWRS] = sorted(list(set(old) - set(new)))
if parsed_args.all_firewall_rule:
attrs[const.FWRS] = []
if parsed_args.audited:
attrs['audited'] = False
if parsed_args.public:
attrs['public'] = False
return attrs
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
fwp_id = client.find_resource(
const.FWP, parsed_args.firewall_policy,
cmd_resource='fwaas_' + const.FWP)['id']
attrs = self._get_attrs(self.app.client_manager, parsed_args)
try:
client.update_fwaas_firewall_policy(fwp_id, {const.FWP: attrs})
except Exception as e:
msg = (_("Failed to unset firewall policy '%(policy)s': %(e)s")
% {'policy': parsed_args.firewall_policy, 'e': e})
raise exceptions.CommandError(msg)

@ -0,0 +1,399 @@
# Copyright 2016 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils
from oslo_log import log as logging
from neutronclient._i18n import _
from neutronclient.common import utils as nc_utils
from neutronclient.osc import utils as osc_utils
from neutronclient.osc.v2.fwaas import constants as const
LOG = logging.getLogger(__name__)
_attr_map = (
('id', 'ID', osc_utils.LIST_BOTH),
('name', 'Name', osc_utils.LIST_BOTH),
('enabled', 'Enabled', osc_utils.LIST_BOTH),
('summary', 'Summary', osc_utils.LIST_SHORT_ONLY),
('description', 'Description', osc_utils.LIST_LONG_ONLY),
('ip_version', 'IP Version', osc_utils.LIST_LONG_ONLY),
('action', 'Action', osc_utils.LIST_LONG_ONLY),
('protocol', 'Protocol', osc_utils.LIST_LONG_ONLY),
('source_ip_address', 'Source IP Address', osc_utils.LIST_LONG_ONLY),
('source_port', 'Source Port', osc_utils.LIST_LONG_ONLY),
('destination_ip_address', 'Destination IP Address',
osc_utils.LIST_LONG_ONLY),
('destination_port', 'Destination Port', osc_utils.LIST_LONG_ONLY),
('public', 'Public', osc_utils.LIST_LONG_ONLY),
('tenant_id', 'Project', osc_utils.LIST_LONG_ONLY),
)
def _get_common_parser(parser):
parser.add_argument(
'--name',
metavar='<name>',
help=_('Name of the firewall rule'))
parser.add_argument(
'--description',
metavar='<description>',
help=_('Description of the firewall rule'))
parser.add_argument(
'--protocol',
choices=['tcp', 'udp', 'icmp', 'any'],
type=nc_utils.convert_to_lowercase,
help=_('Protocol for the firewall rule'))
parser.add_argument(
'--action',
choices=['allow', 'deny', 'reject'],
type=nc_utils.convert_to_lowercase,
help=_('Action for the firewall rule'))
parser.add_argument(
'--ip-version',
metavar='<ip-version>',
choices=['4', '6'],
help=_('Set IP version 4 or 6 (default is 4)'))
src_ip_group = parser.add_mutually_exclusive_group()
src_ip_group.add_argument(
'--source-ip-address',
metavar='<source-ip-address>',
help=_('Source IP address or subnet'))
src_ip_group.add_argument(
'--no-source-ip-address',
action='store_true',
help=_('Detach source IP address'))
dst_ip_group = parser.add_mutually_exclusive_group()
dst_ip_group.add_argument(
'--destination-ip-address',
metavar='<destination-ip-address>',
help=_('Destination IP address or subnet'))
dst_ip_group.add_argument(
'--no-destination-ip-address',
action='store_true',
help=_('Detach destination IP address'))
src_port_group = parser.add_mutually_exclusive_group()
src_port_group.add_argument(
'--source-port',
metavar='<source-port>',
help=_('Source port number or range'
'(integer in [1, 65535] or range like 123:456)'))
src_port_group.add_argument(
'--no-source-port',
action='store_true',
help=_('Detach source port number or range'))
dst_port_group = parser.add_mutually_exclusive_group()
dst_port_group.add_argument(
'--destination-port',
metavar='<destination-port>',
help=_('Destination port number or range'
'(integer in [1, 65535] or range like 123:456)'))
dst_port_group.add_argument(
'--no-destination-port',
action='store_true',
help=_('Detach destination port number or range'))
public_group = parser.add_mutually_exclusive_group()
public_group.add_argument(
'--public',
action='store_true',
help=_('Make the firewall rule public, which allows it to be '
'used in all projects (as opposed to the default, '
'which is to restrict its use to the current project)'))
public_group.add_argument(
'--private',
action='store_true',
help=_('Restrict use of the firewall rule to the current project'))
enable_group = parser.add_mutually_exclusive_group()
enable_group.add_argument(
'--enable-rule',
action='store_true',
help=_('Enable this rule (default is enabled)'))
enable_group.add_argument(
'--disable-rule',
action='store_true',
help=_('Disable this rule'))
return parser
def _get_common_attrs(client_manager, parsed_args, is_create=True):
attrs = {}
if is_create:
if 'project' in parsed_args and parsed_args.project is not None:
attrs['tenant_id'] = osc_utils.find_project(
client_manager.identity,
parsed_args.project,
parsed_args.project_domain,
).id
if parsed_args.name:
attrs['name'] = str(parsed_args.name)
if parsed_args.description:
attrs['description'] = str(parsed_args.description)
if parsed_args.protocol:
attrs['protocol'] = parsed_args.protocol
if parsed_args.action:
attrs['action'] = parsed_args.action
if parsed_args.ip_version:
attrs['ip_version'] = str(parsed_args.ip_version)
if parsed_args.source_port:
attrs['source_port'] = parsed_args.source_port
if parsed_args.no_source_port:
attrs['source_port'] = None
if parsed_args.source_ip_address:
attrs['source_ip_address'] = parsed_args.source_ip_address
if parsed_args.no_source_ip_address:
attrs['source_ip_address'] = None
if parsed_args.destination_port:
attrs['destination_port'] = str(parsed_args.destination_port)
if parsed_args.no_destination_port:
attrs['destination_port'] = None
if parsed_args.destination_ip_address:
attrs['destination_ip_address'] = str(
parsed_args.destination_ip_address)
if parsed_args.no_destination_ip_address:
attrs['destination_ip_address'] = None
if parsed_args.enable_rule:
attrs['enabled'] = True
if parsed_args.disable_rule:
attrs['enabled'] = False
if parsed_args.public:
attrs['public'] = True
if parsed_args.private:
attrs['public'] = False
return attrs
def format_protocol(protocol):
return protocol if protocol else 'any'
_formatters = {'protocol': format_protocol}
class CreateFirewallRule(command.ShowOne):
_description = _("Create a new firewall rule")
def get_parser(self, prog_name):
parser = super(CreateFirewallRule, self).get_parser(prog_name)
_get_common_parser(parser)
osc_utils.add_project_owner_option_to_parser(parser)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_common_attrs(self.app.client_manager, parsed_args)
obj = client.create_fwaas_firewall_rule(
{const.FWR: attrs})[const.FWR]
columns, display_columns = osc_utils.get_columns(obj, _attr_map)
data = utils.get_dict_properties(obj, columns, formatters=_formatters)
return display_columns, data
class DeleteFirewallRule(command.Command):
_description = _("Delete firewall rule(s)")
def get_parser(self, prog_name):
parser = super(DeleteFirewallRule, self).get_parser(prog_name)
parser.add_argument(
const.FWR,
metavar='<firewall-rule>',
nargs='+',
help=_('Firewall rule(s) to delete (name or ID)'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
result = 0
for fwr in parsed_args.firewall_rule:
try:
fwr_id = client.find_resource(
const.FWR, fwr, cmd_resource='fwaas_' + const.FWR)['id']
client.delete_fwaas_firewall_rule(fwr_id)
except Exception as e:
result += 1
LOG.error(_("Failed to delete Firewall rule with "
"name or ID '%(firewall_rule)s': %(e)s") % {
const.FWR: fwr, 'e': e})
if result > 0:
total = len(parsed_args.firewall_rule)
msg = (_("%(result)s of %(total)s Firewall rule(s) failed "
"to delete.") % {'result': result, 'total': total})
raise exceptions.CommandError(msg)
class ListFirewallRule(command.Lister):
_description = _("List firewall rules that belong to a given tenant")
def get_parser(self, prog_name):
parser = super(ListFirewallRule, self).get_parser(prog_name)
parser.add_argument(
'--long',
action='store_true',
default=False,
help=_("List additional fields in output")
)
return parser
def extend_list(self, data, parsed_args):
ext_data = copy.deepcopy(data)
for d in ext_data:
protocol = d['protocol'].upper() if d['protocol'] else 'ANY'
src_ip = 'none specified'
dst_ip = 'none specified'
src_port = '(none specified)'
dst_port = '(none specified)'
if 'source_ip_address' in d and d['source_ip_address']:
src_ip = str(d['source_ip_address']).lower()
if 'source_port' in d and d['source_port']:
src_port = '(' + str(d['source_port']).lower() + ')'
if 'destination_ip_address' in d and d['destination_ip_address']:
dst_ip = str(d['destination_ip_address']).lower()
if 'destination_port' in d and d['destination_port']:
dst_port = '(' + str(d['destination_port']).lower() + ')'
action = d['action'] if d.get('action') else 'no-action'
src = 'source(port): ' + src_ip + src_port
dst = 'dest(port): ' + dst_ip + dst_port
d['summary'] = ',\n '.join([protocol, src, dst, action])
return ext_data
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
obj = client.list_fwaas_firewall_rules()[const.FWRS]
obj_extend = self.extend_list(obj, parsed_args)
headers, columns = osc_utils.get_column_definitions(
_attr_map, long_listing=parsed_args.long)
return (headers, (utils.get_dict_properties(
s, columns, formatters=_formatters) for s in obj_extend))
class SetFirewallRule(command.Command):
_description = _("Set firewall rule properties")
def get_parser(self, prog_name):
parser = super(SetFirewallRule, self).get_parser(prog_name)
_get_common_parser(parser)
parser.add_argument(
const.FWR,
metavar='<firewall-rule>',
help=_('Firewall rule to set (name or ID)'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_common_attrs(self.app.client_manager,
parsed_args, is_create=False)
fwr_id = client.find_resource(
const.FWR, parsed_args.firewall_rule,
cmd_resource='fwaas_' + const.FWR)['id']
try:
client.update_fwaas_firewall_rule(fwr_id, {const.FWR: attrs})
except Exception as e:
msg = (_("Failed to set firewall rule '%(rule)s': %(e)s")
% {'rule': parsed_args.firewall_rule, 'e': e})
raise exceptions.CommandError(msg)
class ShowFirewallRule(command.ShowOne):
_description = _("Display firewall rule details")
def get_parser(self, prog_name):
parser = super(ShowFirewallRule, self).get_parser(prog_name)
parser.add_argument(
const.FWR,
metavar='<firewall-rule>',
help=_('Firewall rule to display (name or ID)'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
fwr_id = client.find_resource(
const.FWR, parsed_args.firewall_rule,
cmd_resource='fwaas_' + const.FWR)['id']
obj = client.show_fwaas_firewall_rule(fwr_id)[const.FWR]
columns, display_columns = osc_utils.get_columns(obj, _attr_map)
data = utils.get_dict_properties(obj, columns, formatters=_formatters)
return (display_columns, data)
class UnsetFirewallRule(command.Command):
_description = _("Unset firewall rule properties")
def get_parser(self, prog_name):
parser = super(UnsetFirewallRule, self).get_parser(prog_name)
parser.add_argument(
const.FWR,
metavar='<firewall-rule>',
help=_('Firewall rule to unset (name or ID)'))
parser.add_argument(
'--source-ip-address',
action='store_true',
help=_('Source IP address or subnet'))
parser.add_argument(
'--destination-ip-address',
action='store_true',
help=_('Destination IP address or subnet'))
parser.add_argument(
'--source-port',
action='store_true',
help=_('Source port number or range'
'(integer in [1, 65535] or range like 123:456)'))
parser.add_argument(
'--destination-port',
action='store_true',
help=_('Destination port number or range'
'(integer in [1, 65535] or range like 123:456)'))
parser.add_argument(
'--public',
action='store_true',
help=_('Restrict use of the firewall rule to the current project'))
parser.add_argument(
'--enable-rule',
action='store_true',
help=_('Disable this rule'))
return parser
def _get_attrs(self, client_manager, parsed_args):
attrs = {}
if parsed_args.source_ip_address:
attrs['source_ip_address'] = None
if parsed_args.source_port:
attrs['source_port'] = None
if parsed_args.destination_ip_address:
attrs['destination_ip_address'] = None
if parsed_args.destination_port:
attrs['destination_port'] = None
if parsed_args.public:
attrs['public'] = False
if parsed_args.enable_rule:
attrs['enabled'] = False
return attrs
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = self._get_attrs(self.app.client_manager, parsed_args)
fwr_id = client.find_resource(
const.FWR, parsed_args.firewall_rule,
cmd_resource='fwaas_' + const.FWR)['id']
try:
client.update_fwaas_firewall_rule(fwr_id, {const.FWR: attrs})
except Exception as e:
msg = (_("Failed to unset firewall rule '%(rule)s': %(e)s")
% {'rule': parsed_args.firewall_rule, 'e': e})
raise exceptions.CommandError(msg)

@ -0,0 +1,270 @@
# Copyright 2016 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from osc_lib.tests import utils
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
class TestListFWaaS(test_fakes.TestNeutronClientOSCV2):
def test_list_with_no_option(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with()
self.assertEqual(list(self.list_headers), headers)
self.assertEqual([self.list_data], list(data))
def test_list_with_long_option(self):
arglist = ['--long']
verifylist = [('long', True)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with()
self.assertEqual(list(self.headers), headers)
self.assertEqual([self.data], list(data))
class TestShowFWaaS(test_fakes.TestNeutronClientOSCV2):
def test_show_filtered_by_id_or_name(self):
target = self.resource['id']
arglist = [target]
verifylist = [(self.res, target)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(target)
self.assertEqual(self.ordered_headers, headers)
self.assertEqual(self.ordered_data, data)
class TestCreateFWaaS(test_fakes.TestNeutronClientOSCV2):
pass
class TestSetFWaaS(test_fakes.TestNeutronClientOSCV2):
def test_set_name(self):
target = self.resource['id']
update = 'change'
arglist = [target, '--name', update]
verifylist = [
(self.res, target),
('name', update),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'name': update}})
self.assertIsNone(result)
def test_set_description(self):
target = self.resource['id']
update = 'change-desc'
arglist = [target, '--description', update]
verifylist = [
(self.res, target),
('description', update),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'description': update}})
self.assertIsNone(result)
def test_set_public(self):
target = self.resource['id']
arglist = [target, '--public']
verifylist = [
(self.res, target),
('public', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'public': True}})
self.assertIsNone(result)
def test_set_duplicate_public(self):
target = self.resource['id']
arglist = [target, '--public', '--public']
verifylist = [
(self.res, target),
('public', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'public': True}})
self.assertIsNone(result)
def test_set_private(self):
target = self.resource['id']
arglist = [target, '--private']
verifylist = [
(self.res, target),
('public', False),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'public': False}})
self.assertIsNone(result)
def test_set_duplicate_private(self):
target = self.resource['id']
arglist = [target, '--private', '--private']
verifylist = [
(self.res, target),
('public', False),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'public': False}})
self.assertIsNone(result)
def test_set_private_and_public(self):
target = self.resource['id']
arglist = [target, '--private', '--public']
verifylist = [
(self.res, target),
('private', True),
('public', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_set_public_and_priavte(self):
target = self.resource['id']
arglist = [target, '--public', '--private']
verifylist = [
(self.res, target),
('public', True),
('private', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_set_project(self):
target = self.resource['id']
project_id = 'b14ce3b699594d13819a859480286489'
arglist = [target, '--project', project_id]
verifylist = [
(self.res, target),
('tenant_id', project_id),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_set_project_domain(self):
target = self.resource['id']
project_domain = 'mydomain.com'
arglist = [target, '--project-domain', project_domain]
verifylist = [
(self.res, target),
('project_domain', project_domain),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
class TestDeleteFWaaS(test_fakes.TestNeutronClientOSCV2):
def test_delete_with_one_resource(self):
target = self.resource['id']
arglist = [target]
verifylist = [(self.res, [target])]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(target)
self.assertIsNone(result)
def test_delete_with_multiple_resources(self):
target1 = 'target1'
target2 = 'target2'
arglist = [target1, target2]
verifylist = [(self.res, [target1, target2])]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.assertIsNone(result)
self.assertEqual(2, self.mocked.call_count)
for idx, reference in enumerate([target1, target2]):
actual = ''.join(self.mocked.call_args_list[idx][0])
self.assertEqual(reference, actual)
class TestUnsetFWaaS(test_fakes.TestNeutronClientOSCV2):
def test_unset_public(self):
target = self.resource['id']
arglist = [
target,
'--public',
]
verifylist = [
(self.res, target),
('public', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'public': False}})
self.assertIsNone(result)
def test_set_public_and_priavte(self):
target = self.resource['id']
arglist = [target, '--public', '--private']
verifylist = [
(self.res, target),
('public', True),
('private', True),
]
# check_parser: error: unrecognized arguments: --private
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_set_duplicate_public(self):
target = self.resource['id']
arglist = [target, '--public', '--public']
verifylist = [
(self.res, target),
('public', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'public': False}})
self.assertIsNone(result)

@ -0,0 +1,117 @@
# Copyright 2016 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import collections
import copy
import mock
import uuid
class FakeFWaaS(object):
def create(self, attrs={}):
"""Create a fake fwaas resources
:param Dictionary attrs:
A dictionary with all attributes
:return:
A OrderedDict faking the fwaas resource
"""
self.ordered.update(attrs)
return copy.deepcopy(self.ordered)
def bulk_create(self, attrs=None, count=2):
"""Create multiple fake fwaas resources
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
The number of fwaas resources to fake
:return:
A list of dictionaries faking the fwaas resources
"""
return [self.create(attrs=attrs) for i in range(0, count)]
def get(self, attrs=None, count=2):
"""Create multiple fake fwaas resources
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
The number of fwaas resources to fake
:return:
A list of dictionaries faking the fwaas resource
"""
if attrs is None:
self.attrs = self.bulk_create(count=count)
return mock.Mock(side_effect=attrs)
class FirewallGroup(FakeFWaaS):
"""Fake one or more firewall group"""
def __init__(self):
super(FirewallGroup, self).__init__()
self.ordered = collections.OrderedDict((
('id', 'firewall-group-id-' + uuid.uuid4().hex),
('name', 'my-group-' + uuid.uuid4().hex),
('ingress_firewall_policy_id', None),
('egress_firewall_policy_id', None),
('description', 'my-desc-' + uuid.uuid4().hex),
('status', 'INACTIVE'),
('ports', []),
('admin_state_up', True),
('public', False),
('tenant_id', 'tenant-id-' + uuid.uuid4().hex),
))
class FirewallPolicy(FakeFWaaS):
"""Fake one or more firewall policy"""
def __init__(self):
super(FirewallPolicy, self).__init__()
self.ordered = collections.OrderedDict((
('id', 'firewall-policy-' + uuid.uuid4().hex),
('name', 'my-policy-' + uuid.uuid4().hex),
('firewall_rules', []),
('description', 'my-desc-' + uuid.uuid4().hex),
('audited', True),
('public', False),
('tenant_id', 'tenant-id-' + uuid.uuid4().hex),
))
class FirewallRule(FakeFWaaS):
"""Fake one or more firewall rule"""
def __init__(self):
super(FirewallRule, self).__init__()
self.ordered = collections.OrderedDict((
('id', 'firewall-rule-id-' + uuid.uuid4().hex),
('name', 'my-rule-' + uuid.uuid4().hex),
('enabled', False),
('description', 'my-desc-' + uuid.uuid4().hex),
('ip_version', 4),
('action', 'deny'),
('protocol', 'any'),
('source_ip_address', '192.168.1.0/24'),
('source_port', '1:11111'),
('destination_ip_address', '192.168.2.2'),
('destination_port', '2:22222'),
('public', False),
('tenant_id', 'tenant-id-' + uuid.uuid4().hex),
))

@ -0,0 +1,613 @@
# Copyright 2016 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
import mock
import re
from osc_lib import exceptions
from osc_lib.tests import utils
from neutronclient.osc import utils as osc_utils
from neutronclient.osc.v2.fwaas import firewallgroup
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
from neutronclient.tests.unit.osc.v2.fwaas import common
from neutronclient.tests.unit.osc.v2.fwaas import fakes
_fwg = fakes.FirewallGroup().create()
CONVERT_MAP = {
'ingress_firewall_policy': 'ingress_firewall_policy_id',
'egress_firewall_policy': 'egress_firewall_policy_id',
'no_ingress_firewall_policy': 'ingress_firewall_policy_id',
'no_egress_firewall_policy': 'egress_firewall_policy_id',
'public': 'public',
'private': 'public',
'project': 'tenant_id',
'enable': 'admin_state_up',
'disable': 'admin_state_up',
'port': 'ports',
}
def _generate_response(ordered_dict=None, data=None):
source = ordered_dict if ordered_dict else _fwg
up = {'admin_state_up': 'UP' if source['admin_state_up'] else 'DOWN'}
if data:
up.append(data)
source.update(up)
return tuple(source[key] for key in source)
def _generate_req_and_res(verifylist):
request = dict(verifylist)
response = copy.deepcopy(_fwg)
for key, val in verifylist:
del request[key]
if re.match('^no_', key) and val is True:
new_value = None
elif key == 'enable' and val:
new_value = True
elif key == 'disable' and val:
new_value = False
else:
new_value = val
converted = CONVERT_MAP.get(key, key)
request[converted] = new_value
response[converted] = new_value
return request, response
class TestFirewallGroup(test_fakes.TestNeutronClientOSCV2):
def check_results(self, headers, data, exp_req, is_list=False):
if is_list:
req_body = {self.res_plural: [exp_req]}
else:
req_body = {self.res: exp_req}
self.mocked.assert_called_once_with(req_body)
self.assertEqual(self.ordered_headers, headers)
self.assertEqual(self.ordered_data, data)
def setUp(self):
super(TestFirewallGroup, self).setUp()
def side_effect_for_find_resource(*args, **kwargs):
port_id = args[1]
ports = _fwg['ports']
if self.res in args[0]:
ports = _fwg['ports']
return {'id': port_id, 'ports': ports}
def side_effect_for_list_ports(*args):
port_name = 'id_for_port'
return {'ports': [{'id': port_name}]}
self.neutronclient.find_resource.side_effect = mock.Mock(
side_effect=side_effect_for_find_resource)
self.neutronclient.list_ports = mock.Mock(
side_effect=side_effect_for_list_ports)
osc_utils.find_project = mock.Mock()
osc_utils.find_project.id = _fwg['tenant_id']
self.res = 'firewall_group'
self.res_plural = 'firewall_groups'
self.resource = _fwg
self.list_headers = (
'ID',
'Name',
'Ingress Policy ID',
'Egress Policy ID',
)
self.list_data = (
_fwg['id'],
_fwg['name'],
_fwg['ingress_firewall_policy_id'],
_fwg['egress_firewall_policy_id'],
)
self.headers = tuple(self.list_headers + (
'Description',
'Status',
'Ports',
'State',
'Public',
'Project',
))
self.data = _generate_response()
self.ordered_headers = copy.deepcopy(tuple(sorted(self.headers)))
self.ordered_data = (
_fwg['description'],
_fwg['egress_firewall_policy_id'],
_fwg['id'],
_fwg['ingress_firewall_policy_id'],
_fwg['name'],
_fwg['ports'],
_fwg['tenant_id'],
_fwg['public'],
'UP' if _fwg['admin_state_up'] else 'DOWN',
_fwg['status'],
)
self.ordered_columns = (
'description',
'egress_firewall_policy_id',
'id',
'ingress_firewall_policy_id',
'name',
'ports',
'tenant_id',
'public',
'admin_state_up',
'status',
)
class TestCreateFirewallGroup(TestFirewallGroup, common.TestCreateFWaaS):
def setUp(self):
# Mock objects
super(TestCreateFirewallGroup, self).setUp()
self.neutronclient.create_fwaas_firewall_group = mock.Mock(
return_value={self.res: _fwg})
self.mocked = self.neutronclient.create_fwaas_firewall_group
self.cmd = firewallgroup.CreateFirewallGroup(self.app, self.namespace)
def _update_expect_response(self, request, response):
"""Set expected request and response
:param request
A dictionary of request body(dict of verifylist)
:param response
A OrderedDict of request body
"""
# Update response body
self.neutronclient.create_fwaas_firewall_group.return_value = \
{self.res: dict(response)}
osc_utils.find_project.return_value.id = response['tenant_id']
# Update response(finally returns 'data')
self.data = _generate_response(ordered_dict=response)
self.ordered_data = tuple(
response[column] for column in self.ordered_columns
)
def test_create_with_no_option(self):
# firewall_group-create with mandatory (none) params.
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.assertEqual(self.ordered_headers, headers)
self.assertEqual(self.ordered_data, data)
def test_create_with_port(self):
# firewall_group-create with 'port'
port_id = 'id_for_port'
arglist = ['--port', port_id]
verifylist = [('port', [port_id])]
request, response = _generate_req_and_res(verifylist)
self._update_expect_response(request, response)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.check_results(headers, data, request)
def test_create_with_all_params(self):
name = 'my-name'
description = 'my-desc'
ingress_policy = 'my-ingress-policy'
egress_policy = 'my-egress-policy'
port = 'port'
tenant_id = 'my-tenant'
arglist = [
'--name', name,
'--description', description,
'--ingress-firewall-policy', ingress_policy,
'--egress-firewall-policy', egress_policy,
'--port', port,
'--project', tenant_id,
'--public',
'--disable',
]
verifylist = [
('name', name),
('description', description),
('ingress_firewall_policy', ingress_policy),
('egress_firewall_policy', egress_policy),
('port', [port]),
('public', True),
('project', tenant_id),
('disable', True),
]
request, response = _generate_req_and_res(verifylist)
self._update_expect_response(request, response)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.check_results(headers, data, request)
def test_create_with_public_and_private(self):
arglist = [
'--public',
'--private',
]
verifylist = [
('public', True),
('private', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_create_with_ports_and_no(self):
port = 'my-port'
arglist = [
'--port', port,
'--no-port',
]
verifylist = [
('port', [port]),
('no_port', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_create_with_ingress_policy_and_no(self):
policy = 'my-policy'
arglist = [
'--ingress-firewall-policy', policy,
'--no-ingress-firewall-policy',
]
verifylist = [
('ingress_firewall_policy', policy),
('no_ingress_firewall_policy', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_create_with_egress_policy_and_no(self):
policy = 'my-policy'
arglist = [
'--egress-firewall-policy', policy,
'--no-egress-firewall-policy',
]
verifylist = [
('egress_firewall_policy', policy),
('no_egress_firewall_policy', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
class TestListFirewallGroup(TestFirewallGroup, common.TestListFWaaS):
def setUp(self):
super(TestListFirewallGroup, self).setUp()
# Mock objects
self.neutronclient.list_fwaas_firewall_groups = mock.Mock(
return_value={self.res_plural: [_fwg]})
self.mocked = self.neutronclient.list_fwaas_firewall_groups
self.cmd = firewallgroup.ListFirewallGroup(self.app, self.namespace)
class TestShowFirewallGroup(TestFirewallGroup, common.TestShowFWaaS):
def setUp(self):
super(TestShowFirewallGroup, self).setUp()
# Mock objects
self.neutronclient.show_fwaas_firewall_group = mock.Mock(
return_value={self.res: _fwg})
self.mocked = self.neutronclient.show_fwaas_firewall_group
self.cmd = firewallgroup.ShowFirewallGroup(self.app, self.namespace)
class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS):
def setUp(self):
super(TestSetFirewallGroup, self).setUp()
# Mock objects
_fwg['ports'] = ['old_port']
self.neutronclient.update_fwaas_firewall_group = mock.Mock(
return_value={self.res: _fwg})
self.mocked = self.neutronclient.update_fwaas_firewall_group
self.cmd = firewallgroup.SetFirewallGroup(self.app, self.namespace)
def _update_expect_response(self, request, response):
"""Set expected request and response
:param request
A dictionary of request body(dict of verifylist)
:param response
A OrderedDict of request body
"""
osc_utils.find_project.return_value.id = response['tenant_id']
# Update response(finally returns 'data')
self.data = _generate_response(ordered_dict=response)
self.ordered_data = tuple(
response[column] for column in self.ordered_columns
)
def test_set_ingress_policy(self):
# firewall_group-update myid --policy newpolicy.
target = self.resource['id']
policy = 'ingress_policy'
arglist = [target, '--ingress-firewall-policy', policy]
verifylist = [
(self.res, target),
('ingress_firewall_policy', policy),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'ingress_firewall_policy_id': policy}})
self.assertIsNone(result)
def test_set_port(self):
target = self.resource['id']
port1 = 'additional_port1'
port2 = 'additional_port2'
arglist = [
target,
'--port', port1,
'--port', port2,
]
verifylist = [
(self.res, target),
('port', [port1, port2]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
expect = {'ports': sorted(_fwg['ports'] + [port1, port2])}
self.mocked.assert_called_once_with(target, {self.res: expect})
self.assertIsNone(result)
def test_set_no_port(self):
# firewall_group-update myid --policy newpolicy.
target = self.resource['id']
arglist = [target, '--no-port']
verifylist = [
(self.res, target),
('no_port', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'ports': []}})
self.assertIsNone(result)
def test_set_admin_state(self):
target = self.resource['id']
arglist = [target, '--enable']
verifylist = [
(self.res, target),
('enable', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'admin_state_up': True}})
self.assertIsNone(result)
def test_set_egress_policy(self):
target = self.resource['id']
policy = 'egress_policy'
arglist = [target, '--egress-firewall-policy', policy]
verifylist = [
(self.res, target),
('egress_firewall_policy', policy),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'egress_firewall_policy_id': policy}})
self.assertIsNone(result)
def test_set_no_ingress_policies(self):
target = self.resource['id']
arglist = [target, '--no-ingress-firewall-policy']
verifylist = [
(self.res, target),
('no_ingress_firewall_policy', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'ingress_firewall_policy_id': None}})
self.assertIsNone(result)
def test_set_no_egress_policies(self):
target = self.resource['id']
arglist = [target, '--no-egress-firewall-policy']
verifylist = [
(self.res, target),
('no_egress_firewall_policy', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'egress_firewall_policy_id': None}})
self.assertIsNone(result)
def test_set_port_and_no_port(self):
target = self.resource['id']
port = 'my-port'
arglist = [
target,
'--port', port,
'--no-port',
]
verifylist = [
(self.res, target),
('port', [port]),
('no_port', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'ports': [port]}})
self.assertIsNone(result)
def test_set_ingress_policy_and_no_ingress_policy(self):
target = self.resource['id']
arglist = [
target,
'--ingress-firewall-policy', 'my-ingress',
'--no-ingress-firewall-policy',
]
verifylist = [
(self.res, target),
('ingress_firewall_policy', 'my-ingress'),
('no_ingress_firewall_policy', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_set_egress_policy_and_no_egress_policy(self):
target = self.resource['id']
arglist = [
target,
'--egress-firewall-policy', 'my-egress',
'--no-egress-firewall-policy',
]
verifylist = [
(self.res, target),
('egress_firewall_policy', 'my-egress'),
('no_egress_firewall_policy', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_set_and_raises(self):
self.neutronclient.update_fwaas_firewall_group = mock.Mock(
side_effect=Exception)
target = self.resource['id']
arglist = [target, '--name', 'my-name']
verifylist = [(self.res, target), ('name', 'my-name')]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.assertRaises(
exceptions.CommandError, self.cmd.take_action, parsed_args)
class TestDeleteFirewallGroup(TestFirewallGroup, common.TestDeleteFWaaS):
def setUp(self):
super(TestDeleteFirewallGroup, self).setUp()
# Mock objects
self.neutronclient.delete_fwaas_firewall_group = mock.Mock()
self.mocked = self.neutronclient.delete_fwaas_firewall_group
self.cmd = firewallgroup.DeleteFirewallGroup(self.app, self.namespace)
class TestUnsetFirewallGroup(TestFirewallGroup, common.TestUnsetFWaaS):
def setUp(self):
super(TestUnsetFirewallGroup, self).setUp()
_fwg['ports'] = ['old_port']
# Mock objects
self.neutronclient.update_fwaas_firewall_group = mock.Mock()
self.mocked = self.neutronclient.update_fwaas_firewall_group
self.cmd = firewallgroup.UnsetFirewallGroup(self.app, self.namespace)
def test_unset_ingress_policy(self):
target = self.resource['id']
arglist = [
target,
'--ingress-firewall-policy',
]
verifylist = [
(self.res, target),
('ingress_firewall_policy', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'ingress_firewall_policy_id': None}})
self.assertIsNone(result)
def test_unset_egress_policy(self):
target = self.resource['id']
arglist = [
target,
'--egress-firewall-policy',
]
verifylist = [
(self.res, target),
('egress_firewall_policy', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'egress_firewall_policy_id': None}})
self.assertIsNone(result)
def test_unset_enable(self):
target = self.resource['id']
arglist = [
target,
'--enable',
]
verifylist = [
(self.res, target),
('enable', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'admin_state_up': False}})
self.assertIsNone(result)
def test_unset_port(self):
target = self.resource['id']
port = 'old_port'
arglist = [
target,
'--port', port,
]
verifylist = [
(self.res, target),
('port', [port]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(target, {self.res: {'ports': []}})
self.assertIsNone(result)
def test_unset_all_port(self):
target = self.resource['id']
arglist = [
target,
'--all-port',
]
verifylist = [
(self.res, target),
('all_port', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(target, {self.res: {'ports': []}})
self.assertIsNone(result)

@ -0,0 +1,599 @@
# Copyright 2016 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
import mock
import re
from osc_lib import exceptions
from osc_lib.tests import utils
from neutronclient.osc import utils as osc_utils
from neutronclient.osc.v2.fwaas import firewallpolicy
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
from neutronclient.tests.unit.osc.v2.fwaas import common
from neutronclient.tests.unit.osc.v2.fwaas import fakes
_fwp = fakes.FirewallPolicy().create()
CONVERT_MAP = {
'public': 'public',
'private': 'public',
'project': 'tenant_id',
'port': 'ports',
'name': 'name',
'id': 'id',
'firewall_rule': 'firewall_rules',
'description': 'description'
}
def _generate_data(ordered_dict=None, data=None):
source = ordered_dict if ordered_dict else _fwp
if data:
source.update(data)
return tuple(source[key] for key in source)
def _generate_req_and_res(verifylist):
request = dict(verifylist)
response = copy.deepcopy(_fwp)
for key, val in verifylist:
converted = CONVERT_MAP.get(key, key)
del request[key]
if re.match('^no_', key) and val is True:
new_value = None
elif key == 'enable' and val:
new_value = True
elif key == 'disable' and val:
new_value = False
else:
new_value = val
request[converted] = new_value
response[converted] = new_value
return request, response
class TestFirewallPolicy(test_fakes.TestNeutronClientOSCV2):
def check_results(self, headers, data, exp_req, is_list=False):
if is_list:
req_body = {self.res_plural: [exp_req]}
else:
req_body = {self.res: exp_req}
self.mocked.assert_called_once_with(req_body)
self.assertEqual(self.ordered_headers, headers)
self.assertEqual(self.ordered_data, data)
def setUp(self):
super(TestFirewallPolicy, self).setUp()
def _find_resource(*args, **kwargs):
rule_id = args[1]
rules = []
if self.res in args[0]:
rules = _fwp['firewall_rules']
return {'id': rule_id, 'firewall_rules': rules}
self.neutronclient.find_resource = mock.Mock(
side_effect=_find_resource)
# fw_common.get_id.side_effect = lambda x, y, z: z
osc_utils.find_project = mock.Mock()
osc_utils.find_project.id = _fwp['tenant_id']
self.res = 'firewall_policy'
self.res_plural = 'firewall_policies'
self.resource = _fwp
self.list_headers = (
'ID',
'Name',
'Firewall Rules',
)
self.list_data = (
_fwp['id'],
_fwp['name'],
_fwp['firewall_rules'],
)
self.headers = tuple(self.list_headers + (
'Description',
'Audited',
'Public',
'Project')
)
self.data = _generate_data()
self.ordered_headers = (
'Audited',
'Description',
'Firewall Rules',
'ID',
'Name',
'Project',
'Public',
)
self.ordered_data = (
_fwp['audited'],
_fwp['description'],
_fwp['firewall_rules'],
_fwp['id'],
_fwp['name'],
_fwp['tenant_id'],
_fwp['public'],
)
self.ordered_columns = (
'audited',
'description',
'firewall_rules',
'id',
'name',
'tenant_id',
'public',
)
class TestCreateFirewallPolicy(TestFirewallPolicy, common.TestCreateFWaaS):
def setUp(self):
super(TestCreateFirewallPolicy, self).setUp()
self.neutronclient.create_fwaas_firewall_policy = mock.Mock(
return_value={self.res: _fwp})
self.mocked = self.neutronclient.create_fwaas_firewall_policy
self.cmd = firewallpolicy.CreateFirewallPolicy(self.app,
self.namespace)
def _update_expect_response(self, request, response):
"""Set expected request and response
:param request
A dictionary of request body(dict of verifylist)
:param response
A OrderedDict of request body
"""
# Update response body
self.neutronclient.create_fwaas_firewall_policy.return_value = \
{self.res: dict(response)}
osc_utils.find_project.return_value.id = response['tenant_id']
# Update response(finally returns 'data')
self.data = _generate_data(data=response)
self.ordered_data = tuple(
response[column] for column in self.ordered_columns
)
def test_create_with_no_options(self):
arglist = []
verifylist = []
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_create_with_mandatory_param(self):
name = 'my-fwg'
arglist = [
name,
]
verifylist = [
('name', name),
]
request, response = _generate_req_and_res(verifylist)
self._update_expect_response(request, response)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.check_results(headers, data, request)
def test_create_with_all_params(self):
name = 'my-fwp'
desc = 'my-desc'
rule1 = 'rule1'
rule2 = 'rule2'
project = 'my-tenant'
arglist = [
name,
'--description', desc,
'--firewall-rule', rule1,
'--firewall-rule', rule2,
'--project', project,
'--public',
'--audited',
]
verifylist = [
('name', name),
('description', desc),
('firewall_rule', [rule1, rule2]),
('project', project),
('public', True),
('audited', True),
]
request, response = _generate_req_and_res(verifylist)
self._update_expect_response(request, response)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.check_results(headers, data, request)
def test_create_with_firewall_rule_and_no(self):
name = 'my-fwp'
rule1 = 'rule1'
rule2 = 'rule2'
arglist = [
name,
'--firewall-rule', rule1,
'--firewall-rule', rule2,
'--no-firewall-rule',
]
verifylist = [
('name', name),
('firewall_rule', [rule1, rule2]),
('no_firewall_rule', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_create_with_public_and_private(self):
name = 'my-fwp'
arglist = [
name,
'--public',
'--private',
]
verifylist = [
('name', name),
('public', True),
('private', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_create_with_audited_and_no(self):
name = 'my-fwp'
arglist = [
name,
'--audited',
'--no-audited',
]
verifylist = [
('name', name),
('audited', True),
('no_audited', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
class TestListFirewallPolicy(TestFirewallPolicy, common.TestListFWaaS):
def setUp(self):
super(TestListFirewallPolicy, self).setUp()
self.neutronclient.list_fwaas_firewall_policies = mock.Mock(
return_value={'firewall_policies': [_fwp]})
self.mocked = self.neutronclient.list_fwaas_firewall_policies
self.cmd = firewallpolicy.ListFirewallPolicy(self.app, self.namespace)
class TestShowFirewallPolicy(TestFirewallPolicy, common.TestShowFWaaS):
def setUp(self):
super(TestShowFirewallPolicy, self).setUp()
self.neutronclient.show_fwaas_firewall_policy = mock.Mock(
return_value={self.res: _fwp})
self.mocked = self.neutronclient.show_fwaas_firewall_policy
self.cmd = firewallpolicy.ShowFirewallPolicy(self.app, self.namespace)
class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS):
def setUp(self):
super(TestSetFirewallPolicy, self).setUp()
self.neutronclient.update_fwaas_firewall_policy = mock.Mock(
return_value={self.res: _fwp})
self.mocked = self.neutronclient.update_fwaas_firewall_policy
self.cmd = firewallpolicy.SetFirewallPolicy(self.app, self.namespace)
def test_set_rules(self):
target = self.resource['id']
rule1 = 'new_rule1'
rule2 = 'new_rule2'
arglist = [
target,
'--firewall-rule', rule1,
'--firewall-rule', rule2,
]
verifylist = [
(self.res, target),
('firewall_rule', [rule1, rule2]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
expect = sorted(set(_fwp['firewall_rules'] + [rule1, rule2]))
body = {self.res: {'firewall_rules': expect}}
self.mocked.assert_called_once_with(target, body)
self.assertIsNone(result)
def test_set_no_rules(self):
target = self.resource['id']
arglist = [target, '--no-firewall-rule']
verifylist = [
(self.res, target),
('no_firewall_rule', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
body = {self.res: {'firewall_rules': []}}
self.mocked.assert_called_once_with(target, body)
self.assertIsNone(result)
def test_set_rules_and_no_rules(self):
target = self.resource['id']
rule1 = 'rule1'
arglist = [
target,
'--firewall-rule', rule1,
'--no-firewall-rule',
]
verifylist = [
(self.res, target),
('firewall_rule', [rule1]),
('no_firewall_rule', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
body = {self.res: {'firewall_rules': [rule1]}}
self.mocked.assert_called_once_with(target, body)
self.assertIsNone(result)
def test_set_audited(self):
target = self.resource['id']
arglist = [target, '--audited']
verifylist = [
(self.res, target),
('audited', True),
]
body = {'audited': True}
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(target, {self.res: body})
self.assertIsNone(result)
def test_set_no_audited(self):
target = self.resource['id']
arglist = [target, '--no-audited']
verifylist = [
(self.res, target),
('no_audited', True),
]
body = {'audited': False}
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(target, {self.res: body})
self.assertIsNone(result)
def test_set_audited_and_no_audited(self):
target = self.resource['id']
arglist = [
target,
'--audited',
'--no-audited',
]
verifylist = [
(self.res, target),
('audited', True),
('no_audited', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_set_and_raises(self):
self.neutronclient.update_fwaas_firewall_policy = mock.Mock(
side_effect=Exception)
target = self.resource['id']
arglist = [target, '--name', 'my-name']
verifylist = [(self.res, target), ('name', 'my-name')]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.assertRaises(
exceptions.CommandError, self.cmd.take_action, parsed_args)
class TestDeleteFirewallPolicy(TestFirewallPolicy, common.TestDeleteFWaaS):
def setUp(self):
super(TestDeleteFirewallPolicy, self).setUp()
self.neutronclient.delete_fwaas_firewall_policy = mock.Mock(
return_value={self.res: _fwp})
self.mocked = self.neutronclient.delete_fwaas_firewall_policy
self.cmd = firewallpolicy.DeleteFirewallPolicy(
self.app, self.namespace)
class TestFirewallPolicyInsertRule(TestFirewallPolicy):
def setUp(self):
super(TestFirewallPolicyInsertRule, self).setUp()
self.neutronclient.insert_rule_fwaas_firewall_policy = mock.Mock(
return_value={self.res: _fwp})
self.mocked = self.neutronclient.insert_rule_fwaas_firewall_policy
self.cmd = firewallpolicy.FirewallPolicyInsertRule(self.app,
self.namespace)
def test_insert_firewall_rule(self):
target = self.resource['id']
rule = 'new-rule'
before = 'before'
after = 'after'
arglist = [
target,
rule,
'--insert-before', before,
'--insert-after', after,
]
verifylist = [
(self.res, target),
('firewall_rule', rule),
('insert_before', before),
('insert_after', after),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {
'firewall_rule_id': rule,
'insert_before': before,
'insert_after': after
})
self.assertIsNone(result)
def test_insert_with_no_firewall_rule(self):
target = self.resource['id']
arglist = [
target,
]
verifylist = [
(self.res, target),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
class TestFirewallPolicyRemoveRule(TestFirewallPolicy):
def setUp(self):
super(TestFirewallPolicyRemoveRule, self).setUp()
self.neutronclient.remove_rule_fwaas_firewall_policy = mock.Mock(
return_value={self.res: _fwp})
self.mocked = self.neutronclient.remove_rule_fwaas_firewall_policy
self.cmd = firewallpolicy.FirewallPolicyRemoveRule(self.app,
self.namespace)
def test_remove_firewall_rule(self):
target = self.resource['id']
rule = 'remove-rule'
arglist = [
target,
rule,
]
verifylist = [
(self.res, target),
('firewall_rule', rule),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {'firewall_rule_id': rule})
self.assertIsNone(result)
def test_remove_with_no_firewall_rule(self):
target = self.resource['id']
arglist = [
target,
]
verifylist = [
(self.res, target),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
class TestUnsetFirewallPolicy(TestFirewallPolicy, common.TestUnsetFWaaS):
def setUp(self):
super(TestUnsetFirewallPolicy, self).setUp()
self.neutronclient.update_fwaas_firewall_policy = mock.Mock(
return_value={self.res: _fwp})
self.mocked = self.neutronclient.update_fwaas_firewall_policy
self.cmd = firewallpolicy.UnsetFirewallPolicy(self.app, self.namespace)
def test_unset_audited(self):
target = self.resource['id']
arglist = [
target,
'--audited',
]
verifylist = [
(self.res, target),
('audited', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
body = {self.res: {'audited': False}}
self.mocked.assert_called_once_with(target, body)
self.assertIsNone(result)
def test_unset_firewall_rule_not_matched(self):
_fwp['firewall_rules'] = ['old_rule']
target = self.resource['id']
rule = 'new_rule'
arglist = [
target,
'--firewall-rule', rule,
]
verifylist = [
(self.res, target),
('firewall_rule', [rule]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
body = {self.res: {'firewall_rules': _fwp['firewall_rules']}}
self.mocked.assert_called_once_with(target, body)
self.assertIsNone(result)
def test_unset_firewall_rule_matched(self):
_fwp['firewall_rules'] = ['rule1', 'rule2']
target = self.resource['id']
rule = 'rule1'
arglist = [
target,
'--firewall-rule', rule,
]
verifylist = [
(self.res, target),
('firewall_rule', [rule]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
body = {self.res: {'firewall_rules': ['rule2']}}
self.mocked.assert_called_once_with(target, body)
self.assertIsNone(result)
def test_unset_all_firewall_rule(self):
target = self.resource['id']
arglist = [
target,
'--all-firewall-rule',
]
verifylist = [
(self.res, target),
('all_firewall_rule', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
body = {self.res: {'firewall_rules': []}}
self.mocked.assert_called_once_with(target, body)
self.assertIsNone(result)

@ -0,0 +1,736 @@
# Copyright 2016 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
import mock
import re
import testtools
from osc_lib import exceptions
from osc_lib.tests import utils
from neutronclient.osc import utils as osc_utils
from neutronclient.osc.v2.fwaas import firewallrule
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
from neutronclient.tests.unit.osc.v2.fwaas import common
from neutronclient.tests.unit.osc.v2.fwaas import fakes
_fwr = fakes.FirewallRule().create()
CONVERT_MAP = {
'project': 'tenant_id',
'enable_rule': 'enabled',
'disable_rule': 'enabled',
}
def _generate_data(ordered_dict=None, data=None):
source = ordered_dict if ordered_dict else _fwr
if data:
source.update(data)
return tuple(source[key] for key in source)
def _generate_req_and_res(verifylist):
request = dict(verifylist)
response = copy.deepcopy(_fwr)
for key, val in verifylist:
converted = CONVERT_MAP.get(key, key)
del request[key]
if re.match('^no_', key) and val is True:
new_value = None
elif (key == 'enable' or key == 'enable_rule') and val:
new_value = True
elif (key == 'disable' or key == 'disable_rule') and val:
new_value = False
else:
new_value = val
request[converted] = new_value
response[converted] = new_value
return request, response
class TestFirewallRule(test_fakes.TestNeutronClientOSCV2):
def check_results(self, headers, data, exp_req, is_list=False):
if is_list:
req_body = {self.res_plural: [exp_req]}
else:
req_body = {self.res: exp_req}
self.mocked.assert_called_once_with(req_body)
self.assertEqual(self.ordered_headers, headers)
self.assertEqual(self.ordered_data, data)
def setUp(self):
super(TestFirewallRule, self).setUp()
self.neutronclient.find_resource = mock.Mock()
self.neutronclient.find_resource.side_effect = \
lambda x, y, **k: {'id': y}
# fw_common.get_id.side_effect = lambda x, y, z: z
osc_utils.find_project = mock.Mock()
osc_utils.find_project.id = _fwr['tenant_id']
self.res = 'firewall_rule'
self.res_plural = 'firewall_rules'
self.resource = _fwr
self.headers = (
'ID',
'Name',
'Enabled',
'Description',
'IP Version',
'Action',
'Protocol',
'Source IP Address',
'Source Port',
'Destination IP Address',
'Destination Port',
'Public',
'Project',
)
self.data = _generate_data()
self.ordered_headers = (
'Action',
'Description',
'Destination IP Address',
'Destination Port',
'Enabled',
'ID',
'IP Version',
'Name',
'Project',
'Protocol',
'Public',
'Source IP Address',
'Source Port',
)
self.ordered_data = (
_fwr['action'],
_fwr['description'],
_fwr['destination_ip_address'],
_fwr['destination_port'],
_fwr['enabled'],
_fwr['id'],
_fwr['ip_version'],
_fwr['name'],
_fwr['tenant_id'],
_fwr['protocol'],
_fwr['public'],
_fwr['source_ip_address'],
_fwr['source_port'],
)
self.ordered_columns = (
'action',
'description',
'destination_ip_address',
'destination_port',
'enabled',
'id',
'ip_version',
'name',
'tenant_id',
'protocol',
'public',
'source_ip_address',
'source_port',
)
class TestCreateFirewallRule(TestFirewallRule, common.TestCreateFWaaS):
def setUp(self):
super(TestCreateFirewallRule, self).setUp()
self.neutronclient.create_fwaas_firewall_rule = mock.Mock(
return_value={self.res: _fwr})
self.mocked = self.neutronclient.create_fwaas_firewall_rule
self.cmd = firewallrule.CreateFirewallRule(self.app, self.namespace)
def _update_expect_response(self, request, response):
"""Set expected request and response
:param request
A dictionary of request body(dict of verifylist)
:param response
A OrderedDict of request body
"""
# Update response body
self.neutronclient.create_fwaas_firewall_rule.return_value = \
{self.res: dict(response)}
osc_utils.find_project.return_value.id = response['tenant_id']
# Update response(finally returns 'data')
self.data = _generate_data(ordered_dict=response)
self.ordered_data = tuple(
response[column] for column in self.ordered_columns
)
def _set_all_params(self, args={}):
name = args.get('name') or 'my-name'
description = args.get('description') or 'my-desc'
source_ip = args.get('source_ip_address') or '192.168.1.0/24'
destination_ip = args.get('destination_ip_address') or '192.168.2.0/24'
source_port = args.get('source_port') or '0:65535'
protocol = args.get('protocol') or 'udp'
action = args.get('action') or 'deny'
ip_version = args.get('ip_version') or '4'
destination_port = args.get('destination_port') or '0:65535'
tenant_id = args.get('tenant_id') or 'my-tenant'
arglist = [
'--description', description,
'--name', name,
'--protocol', protocol,
'--ip-version', ip_version,
'--source-ip-address', source_ip,
'--destination-ip-address', destination_ip,
'--source-port', source_port,
'--destination-port', destination_port,
'--action', action,
'--project', tenant_id,
'--disable-rule',
'--public',
]
verifylist = [
('name', name),
('description', description),
('public', True),
('protocol', protocol),
('ip_version', ip_version),
('source_ip_address', source_ip),
('destination_ip_address', destination_ip),
('source_port', source_port),
('destination_port', destination_port),
('action', action),
('disable_rule', True),
('project', tenant_id),
]
return arglist, verifylist
def _test_create_with_all_params(self, args={}):
arglist, verifylist = self._set_all_params(args)
request, response = _generate_req_and_res(verifylist)
self._update_expect_response(request, response)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.check_results(headers, data, request)
def test_create_with_no_options(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.check_results(headers, data, {})
def test_create_with_all_params(self):
self._test_create_with_all_params()
def test_create_with_all_params_protocol_any(self):
self._test_create_with_all_params({'protocol': 'any'})
def test_create_with_all_params_ip_version_6(self):
self._test_create_with_all_params({'ip_version': '6'})
def test_create_with_all_params_invalid_ip_version(self):
arglist, verifylist = self._set_all_params({'ip_version': '128'})
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_create_with_all_params_action_upper_capitalized(self):
for action in ('Allow', 'DENY', 'Reject'):
arglist, verifylist = self._set_all_params({'action': action})
self.assertRaises(
testtools.matchers._impl.MismatchError,
self.check_parser, self.cmd, arglist, verifylist)
def test_create_with_all_params_protocol_upper_capitalized(self):
for protocol in ('TCP', 'Tcp', 'ANY', 'AnY'):
arglist, verifylist = self._set_all_params({'protocol': protocol})
self.assertRaises(
testtools.matchers._impl.MismatchError,
self.check_parser, self.cmd, arglist, verifylist)
class TestListFirewallRule(TestFirewallRule):
def _setup_summary(self, expect=None):
protocol = _fwr['protocol'].upper()
src = 'source(port): 192.168.1.0/24(1:11111)'
dst = 'dest(port): 192.168.2.2(2:22222)'
action = 'deny'
if expect:
if expect.get('protocol'):
protocol = expect['protocol']
if expect.get('source'):
src = expect['source']
if expect.get('dest'):
dst = expect['dest']
if expect.get('action'):
action = expect['action']
summary = ',\n '.join([protocol, src, dst, action])
self.short_data = (
_fwr['id'],
_fwr['name'],
_fwr['enabled'],
summary
)
def setUp(self):
super(TestListFirewallRule, self).setUp()
self.cmd = firewallrule.ListFirewallRule(self.app, self.namespace)
self.short_header = (
'ID',
'Name',
'Enabled',
'Summary',
)
self._setup_summary()
self.neutronclient.list_fwaas_firewall_rules = mock.Mock(
return_value={self.res_plural: [_fwr]})
self.mocked = self.neutronclient.list_fwaas_firewall_rules
def test_list_with_long_option(self):
arglist = ['--long']
verifylist = [('long', True)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with()
self.assertEqual(list(self.headers), headers)
self.assertEqual([self.data], list(data))
def test_list_with_no_option(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with()
self.assertEqual(list(self.short_header), headers)
self.assertEqual([self.short_data], list(data))
class TestShowFirewallRule(TestFirewallRule, common.TestShowFWaaS):
def setUp(self):
super(TestShowFirewallRule, self).setUp()
self.neutronclient.show_fwaas_firewall_rule = mock.Mock(
return_value={self.res: _fwr})
self.mocked = self.neutronclient.show_fwaas_firewall_rule
self.cmd = firewallrule.ShowFirewallRule(self.app, self.namespace)
class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS):
def setUp(self):
super(TestSetFirewallRule, self).setUp()
self.neutronclient.update_fwaas_firewall_rule = mock.Mock(
return_value={self.res: _fwr})
self.mocked = self.neutronclient.update_fwaas_firewall_rule
self.cmd = firewallrule.SetFirewallRule(self.app, self.namespace)
def test_set_protocol_with_any(self):
target = self.resource['id']
protocol = 'any'
arglist = [target, '--protocol', protocol]
verifylist = [
(self.res, target),
('protocol', protocol),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'protocol': protocol}})
self.assertIsNone(result)
def test_set_protocol_with_udp(self):
target = self.resource['id']
protocol = 'udp'
arglist = [target, '--protocol', protocol]
verifylist = [
(self.res, target),
('protocol', protocol),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'protocol': protocol}})
self.assertIsNone(result)
def test_set_source_ip_address(self):
target = self.resource['id']
src_ip = '192.192.192.192'
arglist = [target, '--source-ip-address', src_ip]
verifylist = [
(self.res, target),
('source_ip_address', src_ip),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'source_ip_address': src_ip}})
self.assertIsNone(result)
def test_set_source_port(self):
target = self.resource['id']
src_port = '32678'
arglist = [target, '--source-port', src_port]
verifylist = [
(self.res, target),
('source_port', src_port),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'source_port': src_port}})
self.assertIsNone(result)
def test_set_destination_ip_address(self):
target = self.resource['id']
dst_ip = '0.1.0.1'
arglist = [target, '--destination-ip-address', dst_ip]
verifylist = [
(self.res, target),
('destination_ip_address', dst_ip),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'destination_ip_address': dst_ip}})
self.assertIsNone(result)
def test_set_destination_port(self):
target = self.resource['id']
dst_port = '65432'
arglist = [target, '--destination-port', dst_port]
verifylist = [
(self.res, target),
('destination_port', dst_port),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'destination_port': dst_port}})
self.assertIsNone(result)
def test_set_enable_rule(self):
target = self.resource['id']
arglist = [target, '--enable-rule']
verifylist = [
(self.res, target),
('enable_rule', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'enabled': True}})
self.assertIsNone(result)
def test_set_disable_rule(self):
target = self.resource['id']
arglist = [target, '--disable-rule']
verifylist = [
(self.res, target),
('disable_rule', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'enabled': False}})
self.assertIsNone(result)
def test_set_action(self):
target = self.resource['id']
action = 'reject'
arglist = [target, '--action', action]
verifylist = [
(self.res, target),
('action', action),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'action': action}})
self.assertIsNone(result)
def test_set_enable_rule_and_disable_rule(self):
target = self.resource['id']
arglist = [target, '--enable-rule', '--disable-rule']
verifylist = [
(self.res, target),
('enable_rule', True),
('disable_rule', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_set_no_source_ip_address(self):
target = self.resource['id']
arglist = [
target,
'--no-source-ip-address',
]
verifylist = [
(self.res, target),
('no_source_ip_address', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'source_ip_address': None}})
self.assertIsNone(result)
def test_set_no_source_port(self):
target = self.resource['id']
arglist = [
target,
'--no-source-port',
]
verifylist = [
(self.res, target),
('no_source_port', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'source_port': None}})
self.assertIsNone(result)
def test_set_no_destination_ip_address(self):
target = self.resource['id']
arglist = [
target,
'--no-destination-ip-address',
]
verifylist = [
(self.res, target),
('no_destination_ip_address', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'destination_ip_address': None}})
self.assertIsNone(result)
def test_set_no_destination_port(self):
target = self.resource['id']
arglist = [
target,
'--no-destination-port',
]
verifylist = [
(self.res, target),
('no_destination_port', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'destination_port': None}})
self.assertIsNone(result)
def test_set_source_ip_address_and_no(self):
target = self.resource['id']
arglist = [
target,
'--source-ip-address', '192.168.1.0/24',
'--no-source-ip-address',
]
verifylist = [
(self.res, target),
('source_ip_address', '192.168.1.0/24'),
('no_source_ip_address', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_set_destination_ip_address_and_no(self):
target = self.resource['id']
arglist = [
target,
'--destination-ip-address', '192.168.2.0/24',
'--no-destination-ip-address',
]
verifylist = [
(self.res, target),
('destination_ip_address', '192.168.2.0/24'),
('no_destination_ip_address', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_set_source_port_and_no(self):
target = self.resource['id']
arglist = [
target,
'--source-port', '1:12345',
'--no-source-port',
]
verifylist = [
(self.res, target),
('source_port', '1:12345'),
('no_source_port', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_set_destination_port_and_no(self):
target = self.resource['id']
arglist = [
target,
'--destination-port', '1:54321',
'--no-destination-port',
]
verifylist = [
(self.res, target),
('destination_port', '1:54321'),
('no_destination_port', True),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_set_and_raises(self):
self.neutronclient.update_fwaas_firewall_rule = mock.Mock(
side_effect=Exception)
target = self.resource['id']
arglist = [target, '--name', 'my-name']
verifylist = [(self.res, target), ('name', 'my-name')]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.assertRaises(
exceptions.CommandError, self.cmd.take_action, parsed_args)
class TestUnsetFirewallRule(TestFirewallRule, common.TestUnsetFWaaS):
def setUp(self):
super(TestUnsetFirewallRule, self).setUp()
self.neutronclient.update_fwaas_firewall_rule = mock.Mock(
return_value={self.res: _fwr})
self.mocked = self.neutronclient.update_fwaas_firewall_rule
self.cmd = firewallrule.UnsetFirewallRule(self.app, self.namespace)
def test_unset_source_port(self):
target = self.resource['id']
arglist = [
target,
'--source-port',
]
verifylist = [
(self.res, target),
('source_port', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'source_port': None}})
self.assertIsNone(result)
def test_unset_destination_port(self):
target = self.resource['id']
arglist = [
target,
'--destination-port',
]
verifylist = [
(self.res, target),
('destination_port', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'destination_port': None}})
self.assertIsNone(result)
def test_unset_source_ip_address(self):
target = self.resource['id']
arglist = [
target,
'--source-ip-address',
]
verifylist = [
(self.res, target),
('source_ip_address', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'source_ip_address': None}})
self.assertIsNone(result)
def test_unset_destination_ip_address(self):
target = self.resource['id']
arglist = [
target,
'--destination-ip-address',
]
verifylist = [
(self.res, target),
('destination_ip_address', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'destination_ip_address': None}})
self.assertIsNone(result)
def test_unset_enable_rule(self):
target = self.resource['id']
arglist = [
target,
'--enable-rule',
]
verifylist = [
(self.res, target),
('enable_rule', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {self.res: {'enabled': False}})
self.assertIsNone(result)
class TestDeleteFirewallRule(TestFirewallRule, common.TestDeleteFWaaS):
def setUp(self):
super(TestDeleteFirewallRule, self).setUp()
self.neutronclient.delete_fwaas_firewall_rule = mock.Mock(
return_value={self.res: _fwr})
self.mocked = self.neutronclient.delete_fwaas_firewall_rule
self.cmd = firewallrule.DeleteFirewallRule(self.app, self.namespace)

@ -588,6 +588,16 @@ class Client(ClientBase):
firewall_policy_remove_path = "/fw/firewall_policies/%s/remove_rule"
firewalls_path = "/fw/firewalls"
firewall_path = "/fw/firewalls/%s"
fwaas_firewall_groups_path = "/fwaas/firewall_groups"
fwaas_firewall_group_path = "/fwaas/firewall_groups/%s"
fwaas_firewall_rules_path = "/fwaas/firewall_rules"
fwaas_firewall_rule_path = "/fwaas/firewall_rules/%s"
fwaas_firewall_policies_path = "/fwaas/firewall_policies"
fwaas_firewall_policy_path = "/fwaas/firewall_policies/%s"
fwaas_firewall_policy_insert_path = \
"/fwaas/firewall_policies/%s/insert_rule"
fwaas_firewall_policy_remove_path = \
"/fwaas/firewall_policies/%s/remove_rule"
rbac_policies_path = "/rbac-policies"
rbac_policy_path = "/rbac-policies/%s"
qos_policies_path = "/qos/policies"
@ -649,6 +659,9 @@ class Client(ClientBase):
'firewall_rules': 'firewall_rule',
'firewall_policies': 'firewall_policy',
'firewalls': 'firewall',
'fwaas_firewall_rules': 'fwaas_firewall_rule',
'fwaas_firewall_policies': 'fwaas_firewall_policy',
'fwaas_firewall_groups': 'fwaas_firewall_group',
'metering_labels': 'metering_label',
'metering_label_rules': 'metering_label_rule',
'loadbalancers': 'loadbalancer',
@ -1572,6 +1585,87 @@ class Client(ClientBase):
"""Deletes the specified firewall."""
return self.delete(self.firewall_path % (firewall))
def list_fwaas_firewall_groups(self, retrieve_all=True, **_params):
"""Fetches a list of all firewall groups for a project"""
return self.list('firewall_groups', self.fwaas_firewall_groups_path,
retrieve_all, **_params)
def show_fwaas_firewall_group(self, fwg, **_params):
"""Fetches information of a certain firewall group"""
return self.get(self.fwaas_firewall_group_path % (fwg), params=_params)
def create_fwaas_firewall_group(self, body=None):
"""Creates a new firewall group"""
return self.post(self.fwaas_firewall_groups_path, body=body)
def update_fwaas_firewall_group(self, fwg, body=None):
"""Updates a firewall group"""
return self.put(self.fwaas_firewall_group_path % (fwg), body=body)
def delete_fwaas_firewall_group(self, fwg):
"""Deletes the specified firewall group"""
return self.delete(self.fwaas_firewall_group_path % (fwg))
def list_fwaas_firewall_rules(self, retrieve_all=True, **_params):
"""Fetches a list of all firewall rules for a project"""
# Pass filters in "params" argument to do_request
return self.list('firewall_rules', self.fwaas_firewall_rules_path,
retrieve_all, **_params)
def show_fwaas_firewall_rule(self, firewall_rule, **_params):
"""Fetches information of a certain firewall rule"""
return self.get(self.fwaas_firewall_rule_path % (firewall_rule),
params=_params)
def create_fwaas_firewall_rule(self, body=None):
"""Creates a new firewall rule"""
return self.post(self.fwaas_firewall_rules_path, body=body)
def update_fwaas_firewall_rule(self, firewall_rule, body=None):
"""Updates a firewall rule"""
return self.put(self.fwaas_firewall_rule_path % (firewall_rule),
body=body)
def delete_fwaas_firewall_rule(self, firewall_rule):
"""Deletes the specified firewall rule"""
return self.delete(self.fwaas_firewall_rule_path % (firewall_rule))
def list_fwaas_firewall_policies(self, retrieve_all=True, **_params):
"""Fetches a list of all firewall policies for a project"""
# Pass filters in "params" argument to do_request
return self.list('firewall_policies',
self.fwaas_firewall_policies_path,
retrieve_all, **_params)
def show_fwaas_firewall_policy(self, firewall_policy, **_params):
"""Fetches information of a certain firewall policy"""
return self.get(self.fwaas_firewall_policy_path % (firewall_policy),
params=_params)
def create_fwaas_firewall_policy(self, body=None):
"""Creates a new firewall policy"""
return self.post(self.fwaas_firewall_policies_path, body=body)
def update_fwaas_firewall_policy(self, firewall_policy, body=None):
"""Updates a firewall policy"""
return self.put(self.fwaas_firewall_policy_path % (firewall_policy),
body=body)
def delete_fwaas_firewall_policy(self, firewall_policy):
"""Deletes the specified firewall policy"""
return self.delete(self.fwaas_firewall_policy_path % (firewall_policy))
def insert_rule_fwaas_firewall_policy(self, firewall_policy, body=None):
"""Inserts specified rule into firewall policy"""
return self.put((self.fwaas_firewall_policy_insert_path %
(firewall_policy)), body=body)
def remove_rule_fwaas_firewall_policy(self, firewall_policy, body=None):
"""Removes specified rule from firewall policy"""
return self.put((self.fwaas_firewall_policy_remove_path %
(firewall_policy)), body=body)
def remove_router_from_l3_agent(self, l3_agent, router_id):
"""Remove a router from l3 agent."""
return self.delete((self.agent_path + self.L3_ROUTERS + "/%s") % (

@ -0,0 +1,4 @@
---
features:
- CLI support for the "Firewall as a Service v2" feature, which is enhanced
FWaaS functionality, as OSC plugin commands.

@ -44,6 +44,27 @@ openstack.neutronclient.v2 =
network_trunk_show = neutronclient.osc.v2.trunk.network_trunk:ShowNetworkTrunk
network_trunk_unset = neutronclient.osc.v2.trunk.network_trunk:UnsetNetworkTrunk
firewall_group_create = neutronclient.osc.v2.fwaas.firewallgroup:CreateFirewallGroup
firewall_group_delete = neutronclient.osc.v2.fwaas.firewallgroup:DeleteFirewallGroup
firewall_group_list = neutronclient.osc.v2.fwaas.firewallgroup:ListFirewallGroup
firewall_group_set = neutronclient.osc.v2.fwaas.firewallgroup:SetFirewallGroup
firewall_group_show = neutronclient.osc.v2.fwaas.firewallgroup:ShowFirewallGroup
firewall_group_unset = neutronclient.osc.v2.fwaas.firewallgroup:UnsetFirewallGroup
firewall_group_policy_create = neutronclient.osc.v2.fwaas.firewallpolicy:CreateFirewallPolicy
firewall_group_policy_delete = neutronclient.osc.v2.fwaas.firewallpolicy:DeleteFirewallPolicy
firewall_group_policy_add_rule = neutronclient.osc.v2.fwaas.firewallpolicy:FirewallPolicyInsertRule
firewall_group_policy_list = neutronclient.osc.v2.fwaas.firewallpolicy:ListFirewallPolicy
firewall_group_policy_remove_rule = neutronclient.osc.v2.fwaas.firewallpolicy:FirewallPolicyRemoveRule
firewall_group_policy_set = neutronclient.osc.v2.fwaas.firewallpolicy:SetFirewallPolicy
firewall_group_policy_show = neutronclient.osc.v2.fwaas.firewallpolicy:ShowFirewallPolicy
firewall_group_policy_unset = neutronclient.osc.v2.fwaas.firewallpolicy:UnsetFirewallPolicy
firewall_group_rule_create = neutronclient.osc.v2.fwaas.firewallrule:CreateFirewallRule
firewall_group_rule_delete = neutronclient.osc.v2.fwaas.firewallrule:DeleteFirewallRule
firewall_group_rule_list = neutronclient.osc.v2.fwaas.firewallrule:ListFirewallRule
firewall_group_rule_set = neutronclient.osc.v2.fwaas.firewallrule:SetFirewallRule
firewall_group_rule_show = neutronclient.osc.v2.fwaas.firewallrule:ShowFirewallRule
firewall_group_rule_unset = neutronclient.osc.v2.fwaas.firewallrule:UnsetFirewallRule
[build_sphinx]
all_files = 1
build-dir = doc/build