Support packet_filter extension in NEC plugin

blueprint nec-packet-filter-cli

Also adds common validators.

Change-Id: Ia431c2268e9a654e10dc9cf740288fb746825d13
This commit is contained in:
Akihiro MOTOKI 2013-09-02 05:15:01 +09:00 committed by Akihiro Motoki
parent 1246cd977a
commit 320e014eff
8 changed files with 750 additions and 1 deletions

@ -0,0 +1,69 @@
# Copyright 2014 NEC Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from neutronclient.common import exceptions
from neutronclient.openstack.common.gettextutils import _
def validate_int_range(parsed_args, attr_name, min_value=None, max_value=None):
val = getattr(parsed_args, attr_name, None)
if val is None:
return
try:
if not isinstance(val, int):
int_val = int(val, 0)
else:
int_val = val
if ((min_value is None or min_value <= int_val) and
(max_value is None or int_val <= max_value)):
return
except (ValueError, TypeError):
pass
if min_value is not None and max_value is not None:
msg = (_('%(attr_name)s "%(val)s" should be an integer '
'[%(min)i:%(max)i].') %
{'attr_name': attr_name.replace('_', '-'),
'val': val, 'min': min_value, 'max': max_value})
elif min_value is not None:
msg = (_('%(attr_name)s "%(val)s" should be an integer '
'greater than or equal to %(min)i.') %
{'attr_name': attr_name.replace('_', '-'),
'val': val, 'min': min_value})
elif max_value is not None:
msg = (_('%(attr_name)s "%(val)s" should be an integer '
'smaller than or equal to %(max)i.') %
{'attr_name': attr_name.replace('_', '-'),
'val': val, 'max': max_value})
else:
msg = (_('%(attr_name)s "%(val)s" should be an integer.') %
{'attr_name': attr_name.replace('_', '-'),
'val': val})
raise exceptions.CommandError(msg)
def validate_ip_subnet(parsed_args, attr_name):
val = getattr(parsed_args, attr_name)
if not val:
return
try:
netaddr.IPNetwork(val)
except (netaddr.AddrFormatError, ValueError):
raise exceptions.CommandError(
(_('%(attr_name)s "%(val)s" is not a valid CIDR.') %
{'attr_name': attr_name.replace('_', '-'), 'val': val}))

@ -0,0 +1,243 @@
# Copyright 2014 NEC Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from neutronclient.common import exceptions
from neutronclient.common import validators
from neutronclient.neutron import v2_0 as neutronV20
from neutronclient.openstack.common.gettextutils import _
class ListPacketFilter(neutronV20.ListCommand):
"""List packet filters that belong to a given tenant."""
resource = 'packet_filter'
log = logging.getLogger(__name__ + '.ListPacketFilter')
list_columns = ['id', 'name', 'action', 'priority', 'summary']
pagination_support = True
sorting_support = True
def extend_list(self, data, parsed_args):
for d in data:
val = []
proto_eth_type = []
if d.get('protocol'):
proto_eth_type.append('protocol: %s' % d['protocol'].upper())
if d.get('eth_type'):
proto_eth_type.append('eth_type: %s' % d['eth_type'])
if proto_eth_type:
val.append(', '.join(proto_eth_type))
val.append('network: ' + d['network_id'])
if d.get('in_port'):
val.append('in_port: ' + d['in_port'])
source = [str(d.get(field)) for field
in ['src_mac', 'src_cidr', 'src_port'] if d.get(field)]
if source:
val.append('source: ' + ' '.join(source))
dest = [str(d.get(field)) for field
in ['dst_mac', 'dst_cidr', 'dst_port'] if d.get(field)]
if dest:
val.append('destination: ' + ' '.join(dest))
d['summary'] = '\n'.join(val)
class ShowPacketFilter(neutronV20.ShowCommand):
"""Show information of a given packet filter."""
resource = 'packet_filter'
log = logging.getLogger(__name__ + '.ShowPacketFilter')
class PacketFilterOptionMixin(object):
def add_known_arguments(self, parser):
mode = self._get_mode()
if not mode:
return
mode_create = mode == 'create'
if mode_create:
parser.add_argument(
'--admin-state-down',
dest='admin_state', action='store_false',
help=_('Set Admin State Up to false'))
else:
parser.add_argument(
'--admin-state', choices=['True', 'False'],
help=_('Set a value of Admin State Up'))
parser.add_argument(
'--name',
help=_('Name of this packet filter'))
if mode_create:
parser.add_argument(
'--in-port', metavar='PORT',
help=_('Name or ID of the input port'))
parser.add_argument(
'--src-mac',
help=_('Source MAC address'))
parser.add_argument(
'--dst-mac',
help=_('Destination MAC address'))
parser.add_argument(
'--eth-type',
help=_('Ether Type. Integer [0:65535] (hex or decimal).'
' E.g., 0x0800 (IPv4), 0x0806 (ARP), 0x86DD (IPv6)'))
parser.add_argument(
'--protocol',
help=_('IP Protocol.'
' Protocol name or integer.'
' Recognized names are icmp, tcp, udp, arp'
' (case insensitive).'
' Integer should be [0:255] (decimal or hex).'))
parser.add_argument(
'--src-cidr',
help=_('Source IP address CIDR'))
parser.add_argument(
'--dst-cidr',
help=_('Destination IP address CIDR'))
parser.add_argument(
'--src-port',
help=_('Source port address'))
parser.add_argument(
'--dst-port',
help=_('Destination port address'))
default_priority = '30000' if mode_create else None
parser.add_argument(
'--priority', metavar='PRIORITY',
default=default_priority,
help=(_('Priority of the filter. Integer of [0:65535].%s')
% (' Default: 30000.' if mode_create else '')))
default_action = 'allow' if mode_create else None
parser.add_argument(
'--action',
choices=['allow', 'drop'],
default=default_action,
help=(_('Action of the filter.%s')
% (' Default: allow' if mode_create else '')))
if mode_create:
parser.add_argument(
'network', metavar='NETWORK',
help=_('network to which this packet filter is applied'))
def _get_mode(self):
klass = self.__class__.__name__.lower()
if klass.startswith('create'):
mode = 'create'
elif klass.startswith('update'):
mode = 'update'
else:
mode = None
return mode
def validate_fields(self, parsed_args):
self._validate_protocol(parsed_args.protocol)
validators.validate_int_range(parsed_args, 'priority', 0, 0xffff)
validators.validate_int_range(parsed_args, 'src_port', 0, 0xffff)
validators.validate_int_range(parsed_args, 'dst_port', 0, 0xffff)
validators.validate_ip_subnet(parsed_args, 'src_cidr')
validators.validate_ip_subnet(parsed_args, 'dst_cidr')
def _validate_protocol(self, protocol):
if not protocol or protocol == 'action=clear':
return
try:
protocol = int(protocol, 0)
if 0 <= protocol <= 255:
return
except ValueError:
# Use string as a protocol name
# Exact check will be done in the server side.
return
msg = (_('protocol %s should be either of name '
'(tcp, udp, icmp, arp; '
'case insensitive) or integer [0:255] (decimal or hex).') %
protocol)
raise exceptions.CommandError(msg)
class CreatePacketFilter(PacketFilterOptionMixin,
neutronV20.CreateCommand):
"""Create a packet filter for a given tenant."""
resource = 'packet_filter'
log = logging.getLogger(__name__ + '.CreatePacketFilter')
def args2body(self, parsed_args):
self.validate_fields(parsed_args)
_network_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'network', parsed_args.network)
body = {'network_id': _network_id,
'admin_state_up': parsed_args.admin_state}
if parsed_args.in_port:
_port_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'port', parsed_args.in_port)
body['in_port'] = _port_id
neutronV20.update_dict(
parsed_args, body,
['action', 'priority', 'name',
'eth_type', 'protocol', 'src_mac', 'dst_mac',
'src_cidr', 'dst_cidr', 'src_port', 'dst_port'])
return {self.resource: body}
class UpdatePacketFilter(PacketFilterOptionMixin,
neutronV20.UpdateCommand):
"""Update packet filter's information."""
resource = 'packet_filter'
log = logging.getLogger(__name__ + '.UpdatePacketFilter')
def args2body(self, parsed_args):
self.validate_fields(parsed_args)
body = {}
if parsed_args.admin_state:
body['admin_state_up'] = (parsed_args.admin_state == 'True')
# fields which allows None
for attr in ['eth_type', 'protocol', 'src_mac', 'dst_mac',
'src_cidr', 'dst_cidr', 'src_port', 'dst_port']:
if not hasattr(parsed_args, attr):
continue
val = getattr(parsed_args, attr)
if val is None:
continue
if val == '' or val == 'action=clear':
body[attr] = None
else:
body[attr] = val
for attr in ['action', 'priority', 'name']:
if (hasattr(parsed_args, attr) and
getattr(parsed_args, attr) is not None):
body[attr] = getattr(parsed_args, attr)
return {self.resource: body}
class DeletePacketFilter(neutronV20.DeleteCommand):
"""Delete a given packet filter."""
resource = 'packet_filter'
log = logging.getLogger(__name__ + '.DeletePacketFilter')

@ -44,6 +44,7 @@ from neutronclient.neutron.v2_0.lb import member as lb_member
from neutronclient.neutron.v2_0.lb import pool as lb_pool
from neutronclient.neutron.v2_0.lb import vip as lb_vip
from neutronclient.neutron.v2_0 import metering
from neutronclient.neutron.v2_0.nec import packetfilter
from neutronclient.neutron.v2_0 import netpartition
from neutronclient.neutron.v2_0 import network
from neutronclient.neutron.v2_0 import networkprofile
@ -271,6 +272,11 @@ COMMAND_V2 = {
'nuage-netpartition-show': netpartition.ShowNetPartition,
'nuage-netpartition-create': netpartition.CreateNetPartition,
'nuage-netpartition-delete': netpartition.DeleteNetPartition,
'nec-packet-filter-list': packetfilter.ListPacketFilter,
'nec-packet-filter-show': packetfilter.ShowPacketFilter,
'nec-packet-filter-create': packetfilter.CreatePacketFilter,
'nec-packet-filter-update': packetfilter.UpdatePacketFilter,
'nec-packet-filter-delete': packetfilter.DeletePacketFilter,
}
COMMANDS = {'2.0': COMMAND_V2}

@ -0,0 +1,298 @@
# Copyright 2014 NEC Corporation.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import mox
from neutronclient.common import exceptions
from neutronclient.neutron.v2_0.nec import packetfilter as pf
from neutronclient import shell
from neutronclient.tests.unit import test_cli20
class CLITestV20PacketFilterJSON(test_cli20.CLITestV20Base):
def test_create_packetfilter_with_mandatory_params(self):
"""Create packetfilter: packetfilter1."""
resource = 'packet_filter'
cmd = pf.CreatePacketFilter(test_cli20.MyApp(sys.stdout), None)
name = 'packetfilter1'
myid = 'myid'
args = ['--priority', '30000', '--action', 'allow', 'net1']
position_names = ['network_id', 'action', 'priority']
position_values = ['net1', 'allow', '30000']
self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values)
def test_create_packetfilter_with_all_params(self):
"""Create packetfilter: packetfilter1."""
resource = 'packet_filter'
cmd = pf.CreatePacketFilter(test_cli20.MyApp(sys.stdout), None)
name = 'packetfilter1'
myid = 'myid'
args = ['--name', name,
'--admin-state-down',
'--in-port', 'port1',
'--src-mac', '00:11:22:33:44:55',
'--dst-mac', 'aa:bb:cc:dd:ee:ff',
'--eth-type', '0x0800',
'--protocol', 'tcp',
'--src-cidr', '10.1.1.0/24',
'--dst-cidr', '10.2.2.0/24',
'--src-port', '40001',
'--dst-port', '4000',
'--priority', '30000',
'--action', 'drop', 'net1']
params = {'network_id': 'net1',
'action': 'drop',
'priority': '30000',
'name': name,
'admin_state_up': False,
'in_port': 'port1',
'src_mac': '00:11:22:33:44:55',
'dst_mac': 'aa:bb:cc:dd:ee:ff',
'eth_type': '0x0800',
'protocol': 'tcp',
'src_cidr': '10.1.1.0/24',
'dst_cidr': '10.2.2.0/24',
'src_port': '40001',
'dst_port': '4000',
}
position_names = sorted(params)
position_values = [params[k] for k in sorted(params)]
self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values)
def test_list_packetfilters_detail(self):
"""list packetfilters: -D."""
resources = "packet_filters"
cmd = pf.ListPacketFilter(test_cli20.MyApp(sys.stdout), None)
response_contents = [{'id': 'myid1', 'network_id': 'net1'},
{'id': 'myid2', 'network_id': 'net2'}]
self._test_list_resources(resources, cmd, True,
response_contents=response_contents)
def _stubout_extend_list(self):
self.mox.StubOutWithMock(pf.ListPacketFilter, "extend_list")
pf.ListPacketFilter.extend_list(mox.IsA(list), mox.IgnoreArg())
def test_list_packetfilters_pagination(self):
resources = "packet_filters"
cmd = pf.ListPacketFilter(test_cli20.MyApp(sys.stdout), None)
self._stubout_extend_list()
self._test_list_resources_with_pagination(resources, cmd)
def test_list_packetfilters_sort(self):
"""list packetfilters: --sort-key name --sort-key id --sort-key asc
--sort-key desc
"""
resources = "packet_filters"
cmd = pf.ListPacketFilter(test_cli20.MyApp(sys.stdout), None)
self._stubout_extend_list()
self._test_list_resources(resources, cmd,
sort_key=["name", "id"],
sort_dir=["asc", "desc"])
def test_list_packetfilters_limit(self):
"""list packetfilters: -P."""
resources = "packet_filters"
cmd = pf.ListPacketFilter(test_cli20.MyApp(sys.stdout), None)
self._stubout_extend_list()
self._test_list_resources(resources, cmd, page_size=1000)
def test_update_packetfilter(self):
"""Update packetfilter: myid --name myname --tags a b."""
resource = 'packet_filter'
cmd = pf.UpdatePacketFilter(test_cli20.MyApp(sys.stdout), None)
self._test_update_resource(resource, cmd, 'myid',
['myid', '--name', 'myname'],
{'name': 'myname'}
)
def test_update_packetfilter_with_all_params(self):
resource = 'packet_filter'
cmd = pf.UpdatePacketFilter(test_cli20.MyApp(sys.stdout), None)
name = 'packetfilter1'
args = ['--name', name,
'--admin-state', 'True',
'--src-mac', '00:11:22:33:44:55',
'--dst-mac', 'aa:bb:cc:dd:ee:ff',
'--eth-type', '0x0800',
'--protocol', 'tcp',
'--src-cidr', '10.1.1.0/24',
'--dst-cidr', '10.2.2.0/24',
'--src-port', '40001',
'--dst-port', '4000',
'--priority', '30000',
'--action', 'drop',
'myid'
]
params = {'action': 'drop',
'priority': '30000',
'name': name,
'admin_state_up': True,
'src_mac': '00:11:22:33:44:55',
'dst_mac': 'aa:bb:cc:dd:ee:ff',
'eth_type': '0x0800',
'protocol': 'tcp',
'src_cidr': '10.1.1.0/24',
'dst_cidr': '10.2.2.0/24',
'src_port': '40001',
'dst_port': '4000',
}
# position_names = sorted(params)
# position_values = [params[k] for k in sorted(params)]
self._test_update_resource(resource, cmd, 'myid',
args, params)
def test_update_packetfilter_admin_state_false(self):
resource = 'packet_filter'
cmd = pf.UpdatePacketFilter(test_cli20.MyApp(sys.stdout), None)
args = ['--admin-state', 'False', 'myid']
params = {'admin_state_up': False}
self._test_update_resource(resource, cmd, 'myid',
args, params)
def test_update_packetfilter_exception(self):
"""Update packetfilter: myid."""
resource = 'packet_filter'
cmd = pf.UpdatePacketFilter(test_cli20.MyApp(sys.stdout), None)
exc = self.assertRaises(exceptions.CommandError,
self._test_update_resource,
resource, cmd, 'myid', ['myid'], {})
self.assertEqual('Must specify new values to update packet_filter',
unicode(exc))
def test_delete_packetfilter(self):
"""Delete packetfilter: myid."""
resource = 'packet_filter'
cmd = pf.DeletePacketFilter(test_cli20.MyApp(sys.stdout), None)
myid = 'myid'
args = [myid]
self._test_delete_resource(resource, cmd, myid, args)
def test_show_packetfilter(self):
"""Show packetfilter: myid."""
resource = 'packet_filter'
cmd = pf.ShowPacketFilter(test_cli20.MyApp(sys.stdout), None)
args = ['--fields', 'id', '--fields', 'name', self.test_id]
self._test_show_resource(resource, cmd, self.test_id, args,
['id', 'name'])
class CLITestV20PacketFilterXML(CLITestV20PacketFilterJSON):
format = 'xml'
class CLITestV20PacketFilterValidateParam(test_cli20.CLITestV20Base):
def _test_create_packetfilter_pass_validation(self, cmdline=None,
params=None, base_args=None):
resource = 'packet_filter'
cmd = pf.CreatePacketFilter(test_cli20.MyApp(sys.stdout), None)
name = 'packetfilter1'
myid = 'myid'
if base_args is None:
args = '--priority 30000 --action allow net1'.split()
else:
args = base_args.split()
if cmdline:
args += cmdline.split()
_params = {'network_id': 'net1',
'action': 'allow',
'priority': '30000'}
if params:
_params.update(params)
position_names = sorted(_params)
position_values = [_params[k] for k in sorted(_params)]
self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values)
def _test_create_packetfilter_negative_validation(self, cmdline):
resource = 'packet_filter'
cmd = pf.CreatePacketFilter(test_cli20.MyApp(sys.stdout), None)
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
cmd_parser = cmd.get_parser('create_' + resource)
args = cmdline.split()
self.assertRaises(exceptions.CommandError,
shell.run_command,
cmd, cmd_parser, args)
def test_create_pf_hex_priority(self):
self._test_create_packetfilter_pass_validation(
base_args='--priority 0xffff --action allow net1',
params={'priority': '0xffff'})
def test_create_pf_hex_src_port(self):
self._test_create_packetfilter_pass_validation(
cmdline='--src-port 0xffff', params={'src_port': '0xffff'})
def test_create_pf_hex_dst_port(self):
self._test_create_packetfilter_pass_validation(
cmdline='--dst-port 0xffff', params={'dst_port': '0xffff'})
def test_create_pf_ip_proto_zero(self):
self._test_create_packetfilter_pass_validation(
cmdline='--protocol 0', params={'protocol': '0'})
def test_create_pf_ip_proto_max_hex(self):
self._test_create_packetfilter_pass_validation(
cmdline='--protocol 0xff', params={'protocol': '0xff'})
def test_create_pf_ip_proto_with_names(self):
for proto in ['tcp', 'xxxx']:
self._test_create_packetfilter_pass_validation(
cmdline='--protocol ' + proto, params={'protocol': proto})
def test_create_pf_negative_priority(self):
self._test_create_packetfilter_negative_validation(
'--priority -1 --action allow net1')
def test_create_pf_too_big_priority(self):
self._test_create_packetfilter_negative_validation(
'--priority 65536 --action allow net1')
def test_create_pf_negative_src_port(self):
self._test_create_packetfilter_negative_validation(
'--src-port -1 --priority 20000 --action allow net1')
def test_create_pf_too_big_src_port(self):
self._test_create_packetfilter_negative_validation(
'--src-port 65536 --priority 20000 --action allow net1')
def test_create_pf_negative_dst_port(self):
self._test_create_packetfilter_negative_validation(
'--dst-port -1 --priority 20000 --action allow net1')
def test_create_pf_too_big_dst_port(self):
self._test_create_packetfilter_negative_validation(
'--dst-port 65536 --priority 20000 --action allow net1')
def test_create_pf_negative_protocol(self):
self._test_create_packetfilter_negative_validation(
'--protocol -1 --priority 20000 --action allow net1')
def test_create_pf_too_big_hex_protocol(self):
self._test_create_packetfilter_negative_validation(
'--protocol 0x100 --priority 20000 --action allow net1')
def test_create_pf_invalid_src_cidr(self):
self._test_create_packetfilter_negative_validation(
'--src-cidr invalid --priority 20000 --action allow net1')
def test_create_pf_invalid_dst_cidr(self):
self._test_create_packetfilter_negative_validation(
'--dst-cidr invalid --priority 20000 --action allow net1')

@ -0,0 +1,101 @@
# Copyright 2014 NEC Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from neutronclient.common import exceptions
from neutronclient.common import validators
class FakeParsedArgs():
pass
class ValidatorTest(testtools.TestCase):
def _test_validate_int(self, attr_val, attr_name='attr1',
min_value=1, max_value=10):
obj = FakeParsedArgs()
setattr(obj, attr_name, attr_val)
ret = validators.validate_int_range(obj, attr_name,
min_value, max_value)
# Come here only if there is no exception.
self.assertIsNone(ret)
def _test_validate_int_error(self, attr_val, expected_msg,
attr_name='attr1', expected_exc=None,
min_value=1, max_value=10):
if expected_exc is None:
expected_exc = exceptions.CommandError
e = self.assertRaises(expected_exc,
self._test_validate_int,
attr_val, attr_name, min_value, max_value)
self.assertEqual(expected_msg, str(e))
def test_validate_int_min_max(self):
self._test_validate_int(1)
self._test_validate_int(10)
self._test_validate_int('1')
self._test_validate_int('10')
self._test_validate_int('0x0a')
self._test_validate_int_error(
0, 'attr1 "0" should be an integer [1:10].')
self._test_validate_int_error(
11, 'attr1 "11" should be an integer [1:10].')
self._test_validate_int_error(
'0x10', 'attr1 "0x10" should be an integer [1:10].')
def test_validate_int_min_only(self):
self._test_validate_int(1, max_value=None)
self._test_validate_int(10, max_value=None)
self._test_validate_int(11, max_value=None)
self._test_validate_int_error(
0, 'attr1 "0" should be an integer greater than or equal to 1.',
max_value=None)
def test_validate_int_max_only(self):
self._test_validate_int(0, min_value=None)
self._test_validate_int(1, min_value=None)
self._test_validate_int(10, min_value=None)
self._test_validate_int_error(
11, 'attr1 "11" should be an integer smaller than or equal to 10.',
min_value=None)
def test_validate_int_no_limit(self):
self._test_validate_int(0, min_value=None, max_value=None)
self._test_validate_int(1, min_value=None, max_value=None)
self._test_validate_int(10, min_value=None, max_value=None)
self._test_validate_int(11, min_value=None, max_value=None)
self._test_validate_int_error(
'abc', 'attr1 "abc" should be an integer.',
min_value=None, max_value=None)
def _test_validate_subnet(self, attr_val, attr_name='attr1'):
obj = FakeParsedArgs()
setattr(obj, attr_name, attr_val)
ret = validators.validate_ip_subnet(obj, attr_name)
# Come here only if there is no exception.
self.assertIsNone(ret)
def test_validate_ip_subnet(self):
self._test_validate_subnet('192.168.2.0/24')
self._test_validate_subnet('192.168.2.3/20')
self._test_validate_subnet('192.168.2.1')
e = self.assertRaises(exceptions.CommandError,
self._test_validate_subnet,
'192.168.2.256')
self.assertEqual('attr1 "192.168.2.256" is not a valid CIDR.', str(e))

@ -201,6 +201,9 @@ class Client(object):
metering_label_path = "/metering/metering-labels/%s"
metering_label_rules_path = "/metering/metering-label-rules"
metering_label_rule_path = "/metering/metering-label-rules/%s"
packet_filters_path = "/packet_filters"
packet_filter_path = "/packet_filters/%s"
DHCP_NETS = '/dhcp-networks'
DHCP_AGENTS = '/dhcp-agents'
L3_ROUTERS = '/l3-routers'
@ -240,7 +243,8 @@ class Client(object):
'firewalls': 'firewall',
'metering_labels': 'metering_label',
'metering_label_rules': 'metering_label_rule',
'net_partitions': 'net_partition'
'net_partitions': 'net_partition',
'packet_filters': 'packet_filter',
}
# 8192 Is the default max URI len for eventlet.wsgi.server
MAX_URI_LEN = 8192
@ -1155,6 +1159,33 @@ class Client(object):
"""Delete the network partition."""
return self.delete(self.net_partition_path % netpartition)
@APIParamsCall
def create_packet_filter(self, body=None):
"""Create a new packet filter."""
return self.post(self.packet_filters_path, body=body)
@APIParamsCall
def update_packet_filter(self, packet_filter_id, body=None):
"""Update a packet filter."""
return self.put(self.packet_filter_path % packet_filter_id, body=body)
@APIParamsCall
def list_packet_filters(self, retrieve_all=True, **_params):
"""Fetch a list of all packet filters for a tenant."""
return self.list('packet_filters', self.packet_filters_path,
retrieve_all, **_params)
@APIParamsCall
def show_packet_filter(self, packet_filter_id, **_params):
"""Fetch information of a certain packet filter."""
return self.get(self.packet_filter_path % packet_filter_id,
params=_params)
@APIParamsCall
def delete_packet_filter(self, packet_filter_id):
"""Delete the specified packet filter."""
return self.delete(self.packet_filter_path % packet_filter_id)
def __init__(self, **kwargs):
"""Initialize a new client for the Neutron v2.0 API."""
super(Client, self).__init__()

@ -3,6 +3,7 @@ argparse
cliff>=1.4.3
httplib2>=0.7.5
iso8601>=0.1.9
netaddr>=0.7.6
simplejson>=2.0.9
six>=1.5.2
Babel>=1.3