Merge "FWaaS Client and Cli"

This commit is contained in:
Jenkins
2013-08-07 04:54:23 +00:00
committed by Gerrit Code Review
12 changed files with 1133 additions and 10 deletions

View File

@@ -34,11 +34,20 @@ UUID_PATTERN = '-'.join([HEX_ELEM + '{8}', HEX_ELEM + '{4}',
HEX_ELEM + '{12}'])
def _get_resource_plural(resource, client):
plurals = client.EXTED_PLURALS
for k in plurals:
if plurals[k] == resource:
return k
return resource + 's'
def find_resourceid_by_name_or_id(client, resource, name_or_id):
obj_lister = getattr(client, "list_%ss" % resource)
resource_plural = _get_resource_plural(resource, client)
obj_lister = getattr(client, "list_%s" % resource_plural)
# perform search by id only if we are passing a valid UUID
match = re.match(UUID_PATTERN, name_or_id)
collection = resource + "s"
collection = resource_plural
if match:
data = obj_lister(id=name_or_id, fields='id')
if data and data[collection]:
@@ -47,9 +56,10 @@ def find_resourceid_by_name_or_id(client, resource, name_or_id):
def _find_resourceid_by_name(client, resource, name):
obj_lister = getattr(client, "list_%ss" % resource)
resource_plural = _get_resource_plural(resource, client)
obj_lister = getattr(client, "list_%s" % resource_plural)
data = obj_lister(name=name, fields='id')
collection = resource + "s"
collection = resource_plural
info = data[collection]
if len(info) > 1:
raise exceptions.NeutronClientNoUniqueMatch(resource=resource,
@@ -511,8 +521,8 @@ class ListCommand(NeutronCommand, lister.Lister):
return search_opts
def call_server(self, neutron_client, search_opts, parsed_args):
obj_lister = getattr(neutron_client,
"list_%ss" % self.resource)
resource_plural = _get_resource_plural(self.resource, neutron_client)
obj_lister = getattr(neutron_client, "list_%s" % resource_plural)
data = obj_lister(**search_opts)
return data
@@ -542,7 +552,7 @@ class ListCommand(NeutronCommand, lister.Lister):
if dirs:
search_opts.update({'sort_dir': dirs})
data = self.call_server(neutron_client, search_opts, parsed_args)
collection = self.resource + "s"
collection = _get_resource_plural(self.resource, neutron_client)
return data.get(collection, [])
def extend_list(self, data, parsed_args):

View File

@@ -0,0 +1,16 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4

View File

@@ -0,0 +1,94 @@
# Copyright 2013 Big Switch Networks
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: KC Wang, Big Switch Networks
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import logging
from neutronclient.neutron import v2_0 as neutronv20
class ListFirewall(neutronv20.ListCommand):
"""List firewalls that belong to a given tenant."""
resource = 'firewall'
log = logging.getLogger(__name__ + '.ListFirewall')
list_columns = ['id', 'name', 'firewall_policy_id']
_formatters = {}
pagination_support = True
sorting_support = True
class ShowFirewall(neutronv20.ShowCommand):
"""Show information of a given firewall."""
resource = 'firewall'
log = logging.getLogger(__name__ + '.ShowFirewall')
class CreateFirewall(neutronv20.CreateCommand):
"""Create a firewall."""
resource = 'firewall'
log = logging.getLogger(__name__ + '.CreateFirewall')
def add_known_arguments(self, parser):
parser.add_argument(
'firewall_policy_id', metavar='POLICY',
help='firewall policy id')
parser.add_argument(
'--name',
help='name for the firewall')
parser.add_argument(
'--description',
help='description for the firewall rule')
parser.add_argument(
'--shared',
action='store_true',
help='set shared to True (default False)')
parser.add_argument(
'--admin-state-down',
dest='admin_state',
action='store_false',
help='set admin state up to false')
def args2body(self, parsed_args):
_policy_id = neutronv20.find_resourceid_by_name_or_id(
self.get_client(), 'firewall_policy',
parsed_args.firewall_policy_id)
body = {
self.resource: {
'firewall_policy_id': _policy_id,
'admin_state_up': parsed_args.admin_state, }, }
neutronv20.update_dict(parsed_args, body[self.resource],
['name', 'description', 'shared',
'tenant_id'])
return body
class UpdateFirewall(neutronv20.UpdateCommand):
"""Update a given firewall."""
resource = 'firewall'
log = logging.getLogger(__name__ + '.UpdateFirewall')
class DeleteFirewall(neutronv20.DeleteCommand):
"""Delete a given firewall."""
resource = 'firewall'
log = logging.getLogger(__name__ + '.DeleteFirewall')

View File

@@ -0,0 +1,217 @@
# Copyright 2013 Big Switch Networks
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: KC Wang, Big Switch Networks
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import json
import logging
import string
from neutronclient.neutron import v2_0 as neutronv20
def _format_firewall_rules(firewall_policy):
try:
output = '[' + ',\n '.join([rule for rule in
firewall_policy['firewall_rules']]) + ']'
return output
except Exception:
return ''
class ListFirewallPolicy(neutronv20.ListCommand):
"""List firewall policies that belong to a given tenant."""
resource = 'firewall_policy'
log = logging.getLogger(__name__ + '.ListFirewallPolicy')
list_columns = ['id', 'name', 'firewall_rules']
_formatters = {'firewall_rules': _format_firewall_rules,
}
pagination_support = True
sorting_support = True
class ShowFirewallPolicy(neutronv20.ShowCommand):
"""Show information of a given firewall policy."""
resource = 'firewall_policy'
log = logging.getLogger(__name__ + '.ShowFirewallPolicy')
class CreateFirewallPolicy(neutronv20.CreateCommand):
"""Create a firewall policy."""
resource = 'firewall_policy'
log = logging.getLogger(__name__ + '.CreateFirewallPolicy')
def add_known_arguments(self, parser):
parser.add_argument(
'name',
metavar='NAME',
help='name for the firewall policy')
parser.add_argument(
'--description',
help='description for the firewall policy')
parser.add_argument(
'--shared',
dest='shared',
action='store_true',
help='to create a shared policy')
parser.add_argument(
'--firewall-rules', type=string.split,
help='ordered list of whitespace-delimited firewall rule '
'names or IDs; e.g., --firewall-rules \"rule1 rule2\"')
parser.add_argument(
'--audited',
action='store_true',
help='to set audited to True')
def args2body(self, parsed_args):
if parsed_args.firewall_rules:
_firewall_rules = []
for f in parsed_args.firewall_rules:
_firewall_rules.append(
neutronv20.find_resourceid_by_name_or_id(
self.get_client(), 'firewall_rule', f))
body = {self.resource: {
'firewall_rules': _firewall_rules,
},
}
else:
body = {self.resource: {}}
neutronv20.update_dict(parsed_args, body[self.resource],
['name', 'description', 'shared',
'audited', 'tenant_id'])
return body
class UpdateFirewallPolicy(neutronv20.UpdateCommand):
"""Update a given firewall policy."""
resource = 'firewall_policy'
log = logging.getLogger(__name__ + '.UpdateFirewallPolicy')
class DeleteFirewallPolicy(neutronv20.DeleteCommand):
"""Delete a given firewall policy."""
resource = 'firewall_policy'
log = logging.getLogger(__name__ + '.DeleteFirewallPolicy')
class FirewallPolicyInsertRule(neutronv20.UpdateCommand):
"""Insert a rule into a given firewall policy."""
resource = 'firewall_policy'
log = logging.getLogger(__name__ + '.FirewallPolicyInsertRule')
def call_api(self, neutron_client, firewall_policy_id, body):
return neutron_client.firewall_policy_insert_rule(firewall_policy_id,
body)
def args2body(self, parsed_args):
_rule = ''
if parsed_args.firewall_rule_id:
_rule = neutronv20.find_resourceid_by_name_or_id(
self.get_client(), 'firewall_rule',
parsed_args.firewall_rule_id)
_insert_before = ''
if 'insert_before' in parsed_args:
if parsed_args.insert_before:
_insert_before = neutronv20.find_resourceid_by_name_or_id(
self.get_client(), 'firewall_rule',
parsed_args.insert_before)
_insert_after = ''
if 'insert_after' in parsed_args:
if parsed_args.insert_after:
_insert_after = neutronv20.find_resourceid_by_name_or_id(
self.get_client(), 'firewall_rule',
parsed_args.insert_after)
body = {'firewall_rule_id': _rule,
'insert_before': _insert_before,
'insert_after': _insert_after}
neutronv20.update_dict(parsed_args, body, [])
return body
def get_parser(self, prog_name):
parser = super(FirewallPolicyInsertRule, self).get_parser(prog_name)
parser.add_argument(
'--insert-before',
metavar='FIREWALL_RULE',
help='insert before this rule')
parser.add_argument(
'--insert-after',
metavar='FIREWALL_RULE',
help='insert after this rule')
parser.add_argument(
'firewall_rule_id',
metavar='FIREWALL_RULE',
help='new rule to insert')
self.add_known_arguments(parser)
return parser
def run(self, parsed_args):
neutron_client = self.get_client()
neutron_client.format = parsed_args.request_format
body = self.args2body(parsed_args)
_id = neutronv20.find_resourceid_by_name_or_id(neutron_client,
self.resource,
parsed_args.id)
policy_data = self.call_api(neutron_client, _id, body)
print json.dumps(policy_data)
return
class FirewallPolicyRemoveRule(neutronv20.UpdateCommand):
"""Remove a rule from a given firewall policy."""
resource = 'firewall_policy'
log = logging.getLogger(__name__ + '.FirewallPolicyRemoveRule')
def call_api(self, neutron_client, firewall_policy_id, body):
return neutron_client.firewall_policy_remove_rule(firewall_policy_id,
body)
def args2body(self, parsed_args):
_rule = ''
if parsed_args.firewall_rule_id:
_rule = neutronv20.find_resourceid_by_name_or_id(
self.get_client(), 'firewall_rule',
parsed_args.firewall_rule_id)
body = {'firewall_rule_id': _rule}
neutronv20.update_dict(parsed_args, body, [])
return body
def get_parser(self, prog_name):
parser = super(FirewallPolicyRemoveRule, self).get_parser(prog_name)
parser.add_argument(
'firewall_rule_id',
metavar='FIREWALL_RULE',
help='firewall rule to remove from policy')
self.add_known_arguments(parser)
return parser
def run(self, parsed_args):
neutron_client = self.get_client()
neutron_client.format = parsed_args.request_format
body = self.args2body(parsed_args)
_id = neutronv20.find_resourceid_by_name_or_id(neutron_client,
self.resource,
parsed_args.id)
policy_data = self.call_api(neutron_client, _id, body)
print json.dumps(policy_data)
return

View File

@@ -0,0 +1,137 @@
# Copyright 2013 Big Switch Networks
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: KC Wang, Big Switch Networks
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import logging
from neutronclient.neutron import v2_0 as neutronv20
class ListFirewallRule(neutronv20.ListCommand):
"""List firewall rules that belong to a given tenant."""
resource = 'firewall_rule'
log = logging.getLogger(__name__ + '.ListFirewallRule')
list_columns = ['id', 'name', 'firewall_policy_id', 'summary', 'enabled']
pagination_support = True
sorting_support = True
def extend_list(self, data, parsed_args):
for d in data:
val = []
if 'protocol' in d:
protocol = d['protocol'].upper()
else:
protocol = 'no-protocol'
val.append(protocol)
if 'source_ip_address' in d and 'source_port' in d:
src = 'source: ' + str(d['source_ip_address']).lower()
src = src + '(' + str(d['source_port']).lower() + ')'
else:
src = 'source: none specified'
val.append(src)
if 'destination_ip_address' in d and 'destination_port' in d:
dst = 'dest: ' + str(d['destination_ip_address']).lower()
dst = dst + '(' + str(d['destination_port']).lower() + ')'
else:
dst = 'dest: none specified'
val.append(dst)
if 'action' in d:
action = d['action']
else:
action = 'no-action'
val.append(action)
d['summary'] = ',\n '.join(val)
class ShowFirewallRule(neutronv20.ShowCommand):
"""Show information of a given firewall rule."""
resource = 'firewall_rule'
log = logging.getLogger(__name__ + '.ShowFirewallRule')
class CreateFirewallRule(neutronv20.CreateCommand):
"""Create a firewall rule."""
resource = 'firewall_rule'
log = logging.getLogger(__name__ + '.CreateFirewallRule')
def add_known_arguments(self, parser):
parser.add_argument(
'--name',
help='name for the firewall rule')
parser.add_argument(
'--description',
help='description for the firewall rule')
parser.add_argument(
'--shared',
dest='shared',
action='store_true',
help='set shared to True (default False)')
parser.add_argument(
'--source-ip-address',
help='source ip address or subnet')
parser.add_argument(
'--destination-ip-address',
help='destination ip address or subnet')
parser.add_argument(
'--source-port',
help='source port (integer in [1, 65535] or range in a:b)')
parser.add_argument(
'--destination-port',
help='destination port (integer in [1, 65535] or range in a:b)')
parser.add_argument(
'--disabled',
dest='enabled',
action='store_false',
help='to disable this rule')
parser.add_argument(
'--protocol', choices=['tcp', 'udp', 'icmp'],
required=True,
help='protocol for the firewall rule')
parser.add_argument(
'--action',
required=True,
choices=['allow', 'deny'],
help='action for the firewall rule')
def args2body(self, parsed_args):
body = {
self.resource: {},
}
neutronv20.update_dict(parsed_args, body[self.resource],
['name', 'description', 'shared', 'protocol',
'source_ip_address', 'destination_ip_address',
'source_port', 'destination_port',
'action', 'enabled', 'tenant_id'])
return body
class UpdateFirewallRule(neutronv20.UpdateCommand):
"""Update a given firewall rule."""
resource = 'firewall_rule'
log = logging.getLogger(__name__ + '.UpdateFirewallRule')
class DeleteFirewallRule(neutronv20.DeleteCommand):
"""Delete a given firewall rule."""
resource = 'firewall_rule'
log = logging.getLogger(__name__ + '.DeleteFirewallRule')

View File

@@ -34,6 +34,9 @@ from neutronclient.neutron.v2_0 import agent
from neutronclient.neutron.v2_0 import agentscheduler
from neutronclient.neutron.v2_0 import extension
from neutronclient.neutron.v2_0 import floatingip
from neutronclient.neutron.v2_0.fw import firewall
from neutronclient.neutron.v2_0.fw import firewallpolicy
from neutronclient.neutron.v2_0.fw import firewallrule
from neutronclient.neutron.v2_0.lb import healthmonitor as lb_healthmonitor
from neutronclient.neutron.v2_0.lb import member as lb_member
from neutronclient.neutron.v2_0.lb import pool as lb_pool
@@ -181,6 +184,23 @@ COMMAND_V2 = {
'lb-pool-list-on-agent': agentscheduler.ListPoolsOnLbaasAgent,
'lb-agent-hosting-pool': agentscheduler.GetLbaasAgentHostingPool,
'service-provider-list': servicetype.ListServiceProvider,
'firewall-rule-list': firewallrule.ListFirewallRule,
'firewall-rule-show': firewallrule.ShowFirewallRule,
'firewall-rule-create': firewallrule.CreateFirewallRule,
'firewall-rule-update': firewallrule.UpdateFirewallRule,
'firewall-rule-delete': firewallrule.DeleteFirewallRule,
'firewall-policy-list': firewallpolicy.ListFirewallPolicy,
'firewall-policy-show': firewallpolicy.ShowFirewallPolicy,
'firewall-policy-create': firewallpolicy.CreateFirewallPolicy,
'firewall-policy-update': firewallpolicy.UpdateFirewallPolicy,
'firewall-policy-delete': firewallpolicy.DeleteFirewallPolicy,
'firewall-policy-insert-rule': firewallpolicy.FirewallPolicyInsertRule,
'firewall-policy-remove-rule': firewallpolicy.FirewallPolicyRemoveRule,
'firewall-list': firewall.ListFirewall,
'firewall-show': firewall.ShowFirewall,
'firewall-create': firewall.CreateFirewall,
'firewall-update': firewall.UpdateFirewall,
'firewall-delete': firewall.DeleteFirewall,
}
COMMANDS = {'2.0': COMMAND_V2}

View File

@@ -191,6 +191,15 @@ class Client(object):
L3_AGENTS = '/l3-agents'
LOADBALANCER_POOLS = '/loadbalancer-pools'
LOADBALANCER_AGENT = '/loadbalancer-agent'
firewall_rules_path = "/fw/firewall_rules"
firewall_rule_path = "/fw/firewall_rules/%s"
firewall_policies_path = "/fw/firewall_policies"
firewall_policy_path = "/fw/firewall_policies/%s"
firewall_policy_insert_path = "/fw/firewall_policies/%s/insert_rule"
firewall_policy_remove_path = "/fw/firewall_policies/%s/remove_rule"
firewalls_path = "/fw/firewalls"
firewall_path = "/fw/firewalls/%s"
# API has no way to report plurals, so we have to hard code them
EXTED_PLURALS = {'routers': 'router',
'floatingips': 'floatingip',
@@ -203,7 +212,10 @@ class Client(object):
'members': 'member',
'health_monitors': 'health_monitor',
'quotas': 'quota',
'service_providers': 'service_provider'
'service_providers': 'service_provider',
'firewall_rules': 'firewall_rule',
'firewall_policies': 'firewall_policy',
'firewalls': 'firewall',
}
# 8192 Is the default max URI len for eventlet.wsgi.server
MAX_URI_LEN = 8192
@@ -716,6 +728,105 @@ class Client(object):
return self.post((self.agent_path + self.L3_ROUTERS) % l3_agent,
body=body)
@APIParamsCall
def list_firewall_rules(self, retrieve_all=True, **_params):
"""Fetches a list of all firewall rules for a tenant."""
# Pass filters in "params" argument to do_request
return self.list('firewall_rules', self.firewall_rules_path,
retrieve_all, **_params)
@APIParamsCall
def show_firewall_rule(self, firewall_rule, **_params):
"""Fetches information of a certain firewall rule."""
return self.get(self.firewall_rule_path % (firewall_rule),
params=_params)
@APIParamsCall
def create_firewall_rule(self, body=None):
"""Creates a new firewall rule."""
return self.post(self.firewall_rules_path, body=body)
@APIParamsCall
def update_firewall_rule(self, firewall_rule, body=None):
"""Updates a firewall rule."""
return self.put(self.firewall_rule_path % (firewall_rule), body=body)
@APIParamsCall
def delete_firewall_rule(self, firewall_rule):
"""Deletes the specified firewall rule."""
return self.delete(self.firewall_rule_path % (firewall_rule))
@APIParamsCall
def list_firewall_policies(self, retrieve_all=True, **_params):
"""Fetches a list of all firewall policies for a tenant."""
# Pass filters in "params" argument to do_request
return self.list('firewall_policies', self.firewall_policies_path,
retrieve_all, **_params)
@APIParamsCall
def show_firewall_policy(self, firewall_policy, **_params):
"""Fetches information of a certain firewall policy."""
return self.get(self.firewall_policy_path % (firewall_policy),
params=_params)
@APIParamsCall
def create_firewall_policy(self, body=None):
"""Creates a new firewall policy."""
return self.post(self.firewall_policies_path, body=body)
@APIParamsCall
def update_firewall_policy(self, firewall_policy, body=None):
"""Updates a firewall policy."""
return self.put(self.firewall_policy_path % (firewall_policy),
body=body)
@APIParamsCall
def delete_firewall_policy(self, firewall_policy):
"""Deletes the specified firewall policy."""
return self.delete(self.firewall_policy_path % (firewall_policy))
@APIParamsCall
def firewall_policy_insert_rule(self, firewall_policy, body=None):
"""Inserts specified rule into firewall policy."""
return self.put(self.firewall_policy_insert_path % (firewall_policy),
body=body)
@APIParamsCall
def firewall_policy_remove_rule(self, firewall_policy, body=None):
"""Removes specified rule from firewall policy."""
return self.put(self.firewall_policy_remove_path % (firewall_policy),
body=body)
@APIParamsCall
def list_firewalls(self, retrieve_all=True, **_params):
"""Fetches a list of all firewals for a tenant."""
# Pass filters in "params" argument to do_request
return self.list('firewalls', self.firewalls_path, retrieve_all,
**_params)
@APIParamsCall
def show_firewall(self, firewall, **_params):
"""Fetches information of a certain firewall."""
return self.get(self.firewall_path % (firewall), params=_params)
@APIParamsCall
def create_firewall(self, body=None):
"""Creates a new firewall."""
return self.post(self.firewalls_path, body=body)
@APIParamsCall
def update_firewall(self, firewall, body=None):
"""Updates a firewall."""
return self.put(self.firewall_path % (firewall), body=body)
@APIParamsCall
def delete_firewall(self, firewall):
"""Deletes the specified firewall."""
return self.delete(self.firewall_path % (firewall))
@APIParamsCall
def remove_router_from_l3_agent(self, l3_agent, router_id):
"""Remove a router from l3 agent."""

18
tests/unit/fw/__init__.py Normal file
View File

@@ -0,0 +1,18 @@
# Copyright 2013 Big Switch Networks Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: KC Wang, Big Switch Networks Inc.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4

View File

@@ -0,0 +1,126 @@
# Copyright 2013 Big Switch Networks Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: KC Wang, Big Switch Networks Inc.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import sys
from neutronclient.neutron.v2_0.fw import firewall
from tests.unit import test_cli20
class CLITestV20FirewallJSON(test_cli20.CLITestV20Base):
def test_create_firewall_with_mandatory_params(self):
"""firewall-create with mandatory (none) params."""
resource = 'firewall'
cmd = firewall.CreateFirewall(test_cli20.MyApp(sys.stdout), None)
name = ''
tenant_id = 'my-tenant'
my_id = 'my-id'
policy_id = 'my-policy-id'
args = ['--tenant-id', tenant_id, policy_id, ]
position_names = ['firewall_policy_id', ]
position_values = [policy_id, ]
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values,
admin_state_up=True, tenant_id=tenant_id)
def test_create_firewall_with_all_params(self):
"""firewall-create with all params set."""
resource = 'firewall'
cmd = firewall.CreateFirewall(test_cli20.MyApp(sys.stdout), None)
name = 'my-name'
description = 'my-desc'
policy_id = 'my-policy-id'
tenant_id = 'my-tenant'
my_id = 'my-id'
args = ['--description', description,
'--shared',
'--admin-state-down',
'--tenant-id', tenant_id,
policy_id]
position_names = ['firewall_policy_id', ]
position_values = [policy_id, ]
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values,
description=description,
shared=True, admin_state_up=False,
tenant_id=tenant_id)
def test_list_firewalls(self):
"""firewall-list."""
resources = "firewalls"
cmd = firewall.ListFirewall(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, True)
def test_list_firewalls_pagination(self):
"""firewall-list with pagination."""
resources = "firewalls"
cmd = firewall.ListFirewall(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources_with_pagination(resources, cmd)
def test_list_firewalls_sort(self):
"""sorted list: firewall-list --sort-key name --sort-key id
--sort-key asc --sort-key desc
"""
resources = "firewalls"
cmd = firewall.ListFirewall(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd,
sort_key=["name", "id"],
sort_dir=["asc", "desc"])
def test_list_firewalls_limit(self):
"""size (1000) limited list: firewall-list -P."""
resources = "firewalls"
cmd = firewall.ListFirewall(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, page_size=1000)
def test_show_firewall_id(self):
"""firewall-show test_id."""
resource = 'firewall'
cmd = firewall.ShowFirewall(test_cli20.MyApp(sys.stdout), None)
args = ['--fields', 'id', self.test_id]
self._test_show_resource(resource, cmd, self.test_id, args, ['id'])
def test_show_firewall_id_name(self):
"""firewall-show."""
resource = 'firewall'
cmd = firewall.ShowFirewall(test_cli20.MyApp(sys.stdout), None)
args = ['--fields', 'id', '--fields', 'name', self.test_id]
self._test_show_resource(resource, cmd, self.test_id,
args, ['id', 'name'])
def test_update_firewall(self):
"""firewall-update myid --name newname --tags a b."""
resource = 'firewall'
cmd = firewall.UpdateFirewall(test_cli20.MyApp(sys.stdout), None)
self._test_update_resource(resource, cmd, 'myid',
['myid', '--name', 'newname'],
{'name': 'newname', })
def test_delete_firewall(self):
"""firewall-delete my-id."""
resource = 'firewall'
cmd = firewall.DeleteFirewall(test_cli20.MyApp(sys.stdout), None)
my_id = 'my-id'
args = [my_id]
self._test_delete_resource(resource, cmd, my_id, args)
class CLITestV20FirewallXML(CLITestV20FirewallJSON):
format = 'xml'

View File

@@ -0,0 +1,214 @@
# Copyright 2013 Big Switch Networks Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: KC Wang, Big Switch Networks Inc.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import sys
import mox
from neutronclient.neutron.v2_0.fw import firewallpolicy
from neutronclient import shell
from tests.unit import test_cli20
class CLITestV20FirewallPolicyJSON(test_cli20.CLITestV20Base):
def setUp(self):
super(CLITestV20FirewallPolicyJSON, self).setUp()
def test_create_firewall_policy_with_mandatory_params(self):
"""firewall-policy-create with mandatory (none) params only."""
resource = 'firewall_policy'
cmd = firewallpolicy.CreateFirewallPolicy(test_cli20.MyApp(sys.stdout),
None)
tenant_id = 'my-tenant'
name = 'my-name'
my_id = 'myid'
args = ['--tenant-id', tenant_id,
'--admin-state_up',
name, ]
position_names = ['name', ]
position_values = [name, ]
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values,
admin_state_up=True, tenant_id=tenant_id)
def test_create_firewall_policy_with_all_params(self):
"""firewall-policy-create with all params set."""
resource = 'firewall_policy'
cmd = firewallpolicy.CreateFirewallPolicy(test_cli20.MyApp(sys.stdout),
None)
name = 'my-name'
description = 'my-desc'
firewall_rules_arg = 'rule_id1 rule_id2'
firewall_rules_res = ['rule_id1', 'rule_id2']
tenant_id = 'my-tenant'
my_id = 'myid'
args = ['--description', description,
'--shared',
'--firewall-rules', firewall_rules_arg,
'--audited',
'--tenant-id', tenant_id,
'--admin-state_up',
name]
position_names = ['name', ]
position_values = [name, ]
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values,
description=description, shared=True,
firewall_rules=firewall_rules_res,
audited=True, admin_state_up=True,
tenant_id=tenant_id)
def test_list_firewall_policies(self):
"""firewall-policy-list."""
resources = "firewall_policies"
cmd = firewallpolicy.ListFirewallPolicy(test_cli20.MyApp(sys.stdout),
None)
self._test_list_resources(resources, cmd, True)
def test_list_firewall_policies_pagination(self):
"""firewall-policy-list."""
resources = "firewall_policies"
cmd = firewallpolicy.ListFirewallPolicy(test_cli20.MyApp(sys.stdout),
None)
self._test_list_resources_with_pagination(resources, cmd)
def test_list_firewall_policies_sort(self):
"""sorted list: firewall-policy-list --sort-key name --sort-key id
--sort-key asc --sort-key desc
"""
resources = "firewall_policies"
cmd = firewallpolicy.ListFirewallPolicy(test_cli20.MyApp(sys.stdout),
None)
self._test_list_resources(resources, cmd,
sort_key=["name", "id"],
sort_dir=["asc", "desc"])
def test_list_firewall_policies_limit(self):
"""size (1000) limited list: firewall-policy-list -P."""
resources = "firewall_policies"
cmd = firewallpolicy.ListFirewallPolicy(test_cli20.MyApp(sys.stdout),
None)
self._test_list_resources(resources, cmd, page_size=1000)
def test_show_firewall_policy_id(self):
"""firewall-policy-show test_id."""
resource = 'firewall_policy'
cmd = firewallpolicy.ShowFirewallPolicy(test_cli20.MyApp(sys.stdout),
None)
args = ['--fields', 'id', self.test_id]
self._test_show_resource(resource, cmd, self.test_id, args, ['id'])
def test_show_firewall_policy_id_name(self):
"""firewall-policy-show."""
resource = 'firewall_policy'
cmd = firewallpolicy.ShowFirewallPolicy(test_cli20.MyApp(sys.stdout),
None)
args = ['--fields', 'id', '--fields', 'name', self.test_id]
self._test_show_resource(resource, cmd, self.test_id,
args, ['id', 'name'])
def test_update_firewall_policy(self):
"""firewall-policy-update myid --name newname."""
resource = 'firewall_policy'
cmd = firewallpolicy.UpdateFirewallPolicy(test_cli20.MyApp(sys.stdout),
None)
self._test_update_resource(resource, cmd, 'myid',
['myid', '--name', 'newname'],
{'name': 'newname', })
def test_delete_firewall_policy(self):
"""firewall-policy-delete my-id."""
resource = 'firewall_policy'
cmd = firewallpolicy.DeleteFirewallPolicy(test_cli20.MyApp(sys.stdout),
None)
my_id = 'myid1'
args = [my_id]
self._test_delete_resource(resource, cmd, my_id, args)
def test_insert_firewall_rule(self):
"""firewall-policy-insert-rule myid newruleid
--insert-before ruleAid
--insert-after ruleBid
"""
resource = 'firewall_policy'
cmd = firewallpolicy.FirewallPolicyInsertRule(
test_cli20.MyApp(sys.stdout),
None)
myid = 'myid'
args = ['myid', 'newrule',
'--insert-before', 'rule2',
'--insert-after', 'rule1']
extrafields = {'firewall_rule_id': 'newrule',
'insert_before': 'rule2',
'insert_after': 'rule1'}
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
body = extrafields
path = getattr(self.client, resource + "_insert_path")
self.client.httpclient.request(
test_cli20.MyUrlComparator(
test_cli20.end_url(path % myid, format=self.format),
self.client),
'PUT', body=test_cli20.MyComparator(body, self.client),
headers=mox.ContainsKeyValue(
'X-Auth-Token',
test_cli20.TOKEN)).AndReturn((test_cli20.MyResp(204), None))
args.extend(['--request-format', self.format])
self.mox.ReplayAll()
cmd_parser = cmd.get_parser(resource + "_insert_rule")
shell.run_command(cmd, cmd_parser, args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
def test_remove_firewall_rule(self):
"""firewall-policy-remove-rule myid ruleid
"""
resource = 'firewall_policy'
cmd = firewallpolicy.FirewallPolicyRemoveRule(
test_cli20.MyApp(sys.stdout),
None)
myid = 'myid'
args = ['myid', 'removerule']
extrafields = {'firewall_rule_id': 'removerule', }
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
body = extrafields
path = getattr(self.client, resource + "_remove_path")
self.client.httpclient.request(
test_cli20.MyUrlComparator(
test_cli20.end_url(path % myid, format=self.format),
self.client),
'PUT', body=test_cli20.MyComparator(body, self.client),
headers=mox.ContainsKeyValue(
'X-Auth-Token',
test_cli20.TOKEN)).AndReturn((test_cli20.MyResp(204), None))
args.extend(['--request-format', self.format])
self.mox.ReplayAll()
cmd_parser = cmd.get_parser(resource + "_remove_rule")
shell.run_command(cmd, cmd_parser, args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
class CLITestV20FirewallPolicyXML(CLITestV20FirewallPolicyJSON):
format = 'xml'

View File

@@ -0,0 +1,158 @@
# Copyright 2013 Big Switch Networks Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: KC Wang, Big Switch Networks Inc.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import sys
from neutronclient.neutron.v2_0.fw import firewallrule
from tests.unit import test_cli20
class CLITestV20FirewallRuleJSON(test_cli20.CLITestV20Base):
def test_create_firewall_rule_with_mandatory_params(self):
"""firewall-rule-create with mandatory (none) params only."""
resource = 'firewall_rule'
cmd = firewallrule.CreateFirewallRule(test_cli20.MyApp(sys.stdout),
None)
tenant_id = 'my-tenant'
name = ''
my_id = 'myid'
protocol = 'tcp'
action = 'allow'
args = ['--tenant-id', tenant_id,
'--admin-state-up',
'--protocol', protocol,
'--action', action,
'--enabled']
position_names = []
position_values = []
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values,
protocol=protocol, action=action,
enabled=True, tenant_id=tenant_id)
def test_create_firewall_rule_with_all_params(self):
"""firewall-rule-create with all params set."""
resource = 'firewall_rule'
cmd = firewallrule.CreateFirewallRule(test_cli20.MyApp(sys.stdout),
None)
name = 'my-name'
description = 'my-desc'
protocol = 'tcp'
source_ip = '192.168.1.0/24'
destination_ip = '192.168.2.0/24'
source_port = '0:65535'
destination_port = '0:65535'
action = 'allow'
tenant_id = 'my-tenant'
my_id = 'myid'
args = ['--description', description,
'--shared',
'--protocol', protocol,
'--source-ip-address', source_ip,
'--destination-ip-address', destination_ip,
'--source-port', source_port,
'--destination-port', destination_port,
'--action', action,
'--enabled',
'--admin-state-up',
'--tenant-id', tenant_id]
position_names = []
position_values = []
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values,
description=description, shared=True,
protocol=protocol,
source_ip_address=source_ip,
destination_ip_address=destination_ip,
source_port=source_port,
destination_port=destination_port,
action=action, enabled=True,
tenant_id=tenant_id)
def test_list_firewall_rules(self):
"""firewall-rule-list."""
resources = "firewall_rules"
cmd = firewallrule.ListFirewallRule(test_cli20.MyApp(sys.stdout),
None)
self._test_list_resources(resources, cmd, True)
def test_list_firewall_rules_pagination(self):
"""firewall-rule-list."""
resources = "firewall_rules"
cmd = firewallrule.ListFirewallRule(test_cli20.MyApp(sys.stdout),
None)
self._test_list_resources_with_pagination(resources, cmd)
def test_list_firewall_rules_sort(self):
"""firewall-rule-list --sort-key name --sort-key id --sort-key asc
--sort-key desc
"""
resources = "firewall_rules"
cmd = firewallrule.ListFirewallRule(test_cli20.MyApp(sys.stdout),
None)
self._test_list_resources(resources, cmd,
sort_key=["name", "id"],
sort_dir=["asc", "desc"])
def test_list_firewall_rules_limit(self):
"""firewall-rule-list -P."""
resources = "firewall_rules"
cmd = firewallrule.ListFirewallRule(test_cli20.MyApp(sys.stdout),
None)
self._test_list_resources(resources, cmd, page_size=1000)
def test_show_firewall_rule_id(self):
"""firewall-rule-show test_id."""
resource = 'firewall_rule'
cmd = firewallrule.ShowFirewallRule(test_cli20.MyApp(sys.stdout),
None)
args = ['--fields', 'id', self.test_id]
self._test_show_resource(resource, cmd, self.test_id, args, ['id'])
def test_show_firewall_rule_id_name(self):
"""firewall-rule-show."""
resource = 'firewall_rule'
cmd = firewallrule.ShowFirewallRule(test_cli20.MyApp(sys.stdout),
None)
args = ['--fields', 'id', '--fields', 'name', self.test_id]
self._test_show_resource(resource, cmd, self.test_id,
args, ['id', 'name'])
def test_update_firewall_rule(self):
"""firewall-rule-update myid --name newname."""
resource = 'firewall_rule'
cmd = firewallrule.UpdateFirewallRule(test_cli20.MyApp(sys.stdout),
None)
self._test_update_resource(resource, cmd, 'myid',
['myid', '--name', 'newname'],
{'name': 'newname', })
def test_delete_firewall_rule(self):
"""firewall-rule-delete my-id."""
resource = 'firewall_rule'
cmd = firewallrule.DeleteFirewallRule(test_cli20.MyApp(sys.stdout),
None)
my_id = 'myid1'
args = [my_id]
self._test_delete_resource(resource, cmd, my_id, args)
class CLITestV20FirewallRuleXML(CLITestV20FirewallRuleJSON):
format = 'xml'

View File

@@ -22,10 +22,10 @@ import mox
import testtools
from neutronclient.common import constants
from neutronclient.neutron import v2_0 as neutronV2_0
from neutronclient import shell
from neutronclient.v2_0 import client
API_VERSION = "2.0"
FORMAT = 'json'
TOKEN = 'testtoken'
@@ -211,7 +211,9 @@ class CLITestV20Base(testtools.TestCase):
self.client.format = self.format
resstr = self.client.serialize(ress)
# url method body
path = getattr(self.client, resource + "s_path")
resource_plural = neutronV2_0._get_resource_plural(resource,
self.client)
path = getattr(self.client, resource_plural + "_path")
self.client.httpclient.request(
end_url(path, format=self.format), 'POST',
body=MyComparator(body, self.client),