Merge "Improve str2dict key validation to avoid wrong keys"
This commit is contained in:
@@ -18,6 +18,7 @@
|
|||||||
"""Utilities and helper functions."""
|
"""Utilities and helper functions."""
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import functools
|
||||||
import logging
|
import logging
|
||||||
import netaddr
|
import netaddr
|
||||||
import os
|
import os
|
||||||
@@ -108,10 +109,21 @@ def str2bool(strbool):
|
|||||||
return strbool.lower() == 'true'
|
return strbool.lower() == 'true'
|
||||||
|
|
||||||
|
|
||||||
def str2dict(strdict):
|
def str2dict(strdict, required_keys=None, optional_keys=None):
|
||||||
"""Convert key1=value1,key2=value2,... string into dictionary.
|
"""Convert key1=value1,key2=value2,... string into dictionary.
|
||||||
|
|
||||||
:param strdict: key1=value1,key2=value2
|
:param strdict: string in the form of key1=value1,key2=value2
|
||||||
|
:param required_keys: list of required keys. All keys in this list must be
|
||||||
|
specified. Otherwise ArgumentTypeError will be raised.
|
||||||
|
If this parameter is unspecified, no required key check
|
||||||
|
will be done.
|
||||||
|
:param optional_keys: list of optional keys.
|
||||||
|
This parameter is used for valid key check.
|
||||||
|
When at least one of required_keys and optional_keys,
|
||||||
|
a key must be a member of either of required_keys or
|
||||||
|
optional_keys. Otherwise, ArgumentTypeError will be
|
||||||
|
raised. When both required_keys and optional_keys are
|
||||||
|
unspecified, no valid key check will be done.
|
||||||
"""
|
"""
|
||||||
result = {}
|
result = {}
|
||||||
if strdict:
|
if strdict:
|
||||||
@@ -121,9 +133,29 @@ def str2dict(strdict):
|
|||||||
msg = _("invalid key-value '%s', expected format: key=value")
|
msg = _("invalid key-value '%s', expected format: key=value")
|
||||||
raise argparse.ArgumentTypeError(msg % kv)
|
raise argparse.ArgumentTypeError(msg % kv)
|
||||||
result[key] = value
|
result[key] = value
|
||||||
|
valid_keys = set(required_keys or []) | set(optional_keys or [])
|
||||||
|
if valid_keys:
|
||||||
|
invalid_keys = [k for k in result if k not in valid_keys]
|
||||||
|
if invalid_keys:
|
||||||
|
msg = _("Invalid key(s) '%(invalid_keys)s' specified. "
|
||||||
|
"Valid key(s): '%(valid_keys)s'.")
|
||||||
|
raise argparse.ArgumentTypeError(
|
||||||
|
msg % {'invalid_keys': ', '.join(sorted(invalid_keys)),
|
||||||
|
'valid_keys': ', '.join(sorted(valid_keys))})
|
||||||
|
if required_keys:
|
||||||
|
not_found_keys = [k for k in required_keys if k not in result]
|
||||||
|
if not_found_keys:
|
||||||
|
msg = _("Required key(s) '%s' not specified.")
|
||||||
|
raise argparse.ArgumentTypeError(msg % ', '.join(not_found_keys))
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def str2dict_type(optional_keys=None, required_keys=None):
|
||||||
|
return functools.partial(str2dict,
|
||||||
|
optional_keys=optional_keys,
|
||||||
|
required_keys=required_keys)
|
||||||
|
|
||||||
|
|
||||||
def http_log_req(_logger, args, kwargs):
|
def http_log_req(_logger, args, kwargs):
|
||||||
if not _logger.isEnabledFor(logging.DEBUG):
|
if not _logger.isEnabledFor(logging.DEBUG):
|
||||||
return
|
return
|
||||||
|
@@ -64,6 +64,8 @@ class CreatePool(neutronV20.CreateCommand):
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--session-persistence',
|
'--session-persistence',
|
||||||
metavar='type=TYPE[,cookie_name=COOKIE_NAME]',
|
metavar='type=TYPE[,cookie_name=COOKIE_NAME]',
|
||||||
|
type=utils.str2dict_type(required_keys=['type'],
|
||||||
|
optional_keys=['cookie_name']),
|
||||||
help=_('The type of session persistence to use and associated '
|
help=_('The type of session persistence to use and associated '
|
||||||
'cookie name'))
|
'cookie name'))
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
@@ -86,9 +88,6 @@ class CreatePool(neutronV20.CreateCommand):
|
|||||||
help=_('Protocol for balancing.'))
|
help=_('Protocol for balancing.'))
|
||||||
|
|
||||||
def args2body(self, parsed_args):
|
def args2body(self, parsed_args):
|
||||||
if parsed_args.session_persistence:
|
|
||||||
parsed_args.session_persistence = utils.str2dict(
|
|
||||||
parsed_args.session_persistence)
|
|
||||||
_listener_id = neutronV20.find_resourceid_by_name_or_id(
|
_listener_id = neutronV20.find_resourceid_by_name_or_id(
|
||||||
self.get_client(), 'listener', parsed_args.listener)
|
self.get_client(), 'listener', parsed_args.listener)
|
||||||
body = {'admin_state_up': parsed_args.admin_state,
|
body = {'admin_state_up': parsed_args.admin_state,
|
||||||
|
@@ -47,6 +47,7 @@ def _add_updatable_args(parser):
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--fixed-ip', metavar='subnet_id=SUBNET,ip_address=IP_ADDR',
|
'--fixed-ip', metavar='subnet_id=SUBNET,ip_address=IP_ADDR',
|
||||||
action='append',
|
action='append',
|
||||||
|
type=utils.str2dict_type(optional_keys=['subnet_id', 'ip_address']),
|
||||||
help=_('Desired IP and/or subnet for this port: '
|
help=_('Desired IP and/or subnet for this port: '
|
||||||
'subnet_id=<name_or_id>,ip_address=<ip>. '
|
'subnet_id=<name_or_id>,ip_address=<ip>. '
|
||||||
'You can repeat this option.'))
|
'You can repeat this option.'))
|
||||||
@@ -74,13 +75,12 @@ def _updatable_args2body(parsed_args, body, client):
|
|||||||
ips = []
|
ips = []
|
||||||
if parsed_args.fixed_ip:
|
if parsed_args.fixed_ip:
|
||||||
for ip_spec in parsed_args.fixed_ip:
|
for ip_spec in parsed_args.fixed_ip:
|
||||||
ip_dict = utils.str2dict(ip_spec)
|
if 'subnet_id' in ip_spec:
|
||||||
if 'subnet_id' in ip_dict:
|
subnet_name_id = ip_spec['subnet_id']
|
||||||
subnet_name_id = ip_dict['subnet_id']
|
|
||||||
_subnet_id = neutronV20.find_resourceid_by_name_or_id(
|
_subnet_id = neutronV20.find_resourceid_by_name_or_id(
|
||||||
client, 'subnet', subnet_name_id)
|
client, 'subnet', subnet_name_id)
|
||||||
ip_dict['subnet_id'] = _subnet_id
|
ip_spec['subnet_id'] = _subnet_id
|
||||||
ips.append(ip_dict)
|
ips.append(ip_spec)
|
||||||
if ips:
|
if ips:
|
||||||
body['fixed_ips'] = ips
|
body['fixed_ips'] = ips
|
||||||
|
|
||||||
@@ -159,6 +159,9 @@ class UpdateExtraDhcpOptMixin(object):
|
|||||||
default=[],
|
default=[],
|
||||||
action='append',
|
action='append',
|
||||||
dest='extra_dhcp_opts',
|
dest='extra_dhcp_opts',
|
||||||
|
type=utils.str2dict_type(
|
||||||
|
required_keys=['opt_name'],
|
||||||
|
optional_keys=['opt_value', 'ip_version']),
|
||||||
help=_('Extra dhcp options to be assigned to this port: '
|
help=_('Extra dhcp options to be assigned to this port: '
|
||||||
'opt_name=<dhcp_option_name>,opt_value=<value>,'
|
'opt_name=<dhcp_option_name>,opt_value=<value>,'
|
||||||
'ip_version={4,6}. You can repeat this option.'))
|
'ip_version={4,6}. You can repeat this option.'))
|
||||||
@@ -175,7 +178,7 @@ class UpdateExtraDhcpOptMixin(object):
|
|||||||
"ip_version={4,6}. "
|
"ip_version={4,6}. "
|
||||||
"You can repeat this option.")
|
"You can repeat this option.")
|
||||||
for opt in parsed_args.extra_dhcp_opts:
|
for opt in parsed_args.extra_dhcp_opts:
|
||||||
opt_ele.update(utils.str2dict(opt))
|
opt_ele.update(opt)
|
||||||
if ('opt_name' in opt_ele and
|
if ('opt_name' in opt_ele and
|
||||||
('opt_value' in opt_ele or 'ip_version' in opt_ele)):
|
('opt_value' in opt_ele or 'ip_version' in opt_ele)):
|
||||||
if opt_ele.get('opt_value') == 'null':
|
if opt_ele.get('opt_value') == 'null':
|
||||||
@@ -200,7 +203,9 @@ class UpdatePortAllowedAddressPair(object):
|
|||||||
default=[],
|
default=[],
|
||||||
action='append',
|
action='append',
|
||||||
dest='allowed_address_pairs',
|
dest='allowed_address_pairs',
|
||||||
type=utils.str2dict,
|
type=utils.str2dict_type(
|
||||||
|
required_keys=['ip_address'],
|
||||||
|
optional_keys=['mac_address']),
|
||||||
help=_('Allowed address pair associated with the port.'
|
help=_('Allowed address pair associated with the port.'
|
||||||
'You can repeat this option.'))
|
'You can repeat this option.'))
|
||||||
group_aap.add_argument(
|
group_aap.add_argument(
|
||||||
|
@@ -114,7 +114,8 @@ class UpdateRouter(neutronV20.UpdateCommand):
|
|||||||
routes_group = parser.add_mutually_exclusive_group()
|
routes_group = parser.add_mutually_exclusive_group()
|
||||||
routes_group.add_argument(
|
routes_group.add_argument(
|
||||||
'--route', metavar='destination=CIDR,nexthop=IP_ADDR',
|
'--route', metavar='destination=CIDR,nexthop=IP_ADDR',
|
||||||
action='append', dest='routes', type=utils.str2dict,
|
action='append', dest='routes',
|
||||||
|
type=utils.str2dict_type(required_keys=['destination', 'nexthop']),
|
||||||
help=_('Route to associate with the router.'
|
help=_('Route to associate with the router.'
|
||||||
' You can repeat this option.'))
|
' You can repeat this option.'))
|
||||||
routes_group.add_argument(
|
routes_group.add_argument(
|
||||||
@@ -226,6 +227,8 @@ class SetGatewayRouter(neutronV20.NeutronCommand):
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--fixed-ip', metavar='subnet_id=SUBNET,ip_address=IP_ADDR',
|
'--fixed-ip', metavar='subnet_id=SUBNET,ip_address=IP_ADDR',
|
||||||
action='append',
|
action='append',
|
||||||
|
type=utils.str2dict_type(optional_keys=['subnet_id',
|
||||||
|
'ip_address']),
|
||||||
help=_('Desired IP and/or subnet on external network: '
|
help=_('Desired IP and/or subnet on external network: '
|
||||||
'subnet_id=<name_or_id>,ip_address=<ip>. '
|
'subnet_id=<name_or_id>,ip_address=<ip>. '
|
||||||
'You can specify both of subnet_id and ip_address or '
|
'You can specify both of subnet_id and ip_address or '
|
||||||
@@ -246,13 +249,12 @@ class SetGatewayRouter(neutronV20.NeutronCommand):
|
|||||||
if parsed_args.fixed_ip:
|
if parsed_args.fixed_ip:
|
||||||
ips = []
|
ips = []
|
||||||
for ip_spec in parsed_args.fixed_ip:
|
for ip_spec in parsed_args.fixed_ip:
|
||||||
ip_dict = utils.str2dict(ip_spec)
|
subnet_name_id = ip_spec.get('subnet_id')
|
||||||
subnet_name_id = ip_dict.get('subnet_id')
|
|
||||||
if subnet_name_id:
|
if subnet_name_id:
|
||||||
subnet_id = neutronV20.find_resourceid_by_name_or_id(
|
subnet_id = neutronV20.find_resourceid_by_name_or_id(
|
||||||
neutron_client, 'subnet', subnet_name_id)
|
neutron_client, 'subnet', subnet_name_id)
|
||||||
ip_dict['subnet_id'] = subnet_id
|
ip_spec['subnet_id'] = subnet_id
|
||||||
ips.append(ip_dict)
|
ips.append(ip_spec)
|
||||||
router_dict['external_fixed_ips'] = ips
|
router_dict['external_fixed_ips'] = ips
|
||||||
neutron_client.add_gateway_router(_router_id, router_dict)
|
neutron_client.add_gateway_router(_router_id, router_dict)
|
||||||
print(_('Set gateway for router %s') % parsed_args.router,
|
print(_('Set gateway for router %s') % parsed_args.router,
|
||||||
|
@@ -62,16 +62,19 @@ def add_updatable_arguments(parser):
|
|||||||
help=_('No distribution of gateway.'))
|
help=_('No distribution of gateway.'))
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--allocation-pool', metavar='start=IP_ADDR,end=IP_ADDR',
|
'--allocation-pool', metavar='start=IP_ADDR,end=IP_ADDR',
|
||||||
action='append', dest='allocation_pools', type=utils.str2dict,
|
action='append', dest='allocation_pools',
|
||||||
|
type=utils.str2dict_type(required_keys=['start', 'end']),
|
||||||
help=_('Allocation pool IP addresses for this subnet '
|
help=_('Allocation pool IP addresses for this subnet '
|
||||||
'(This option can be repeated).'))
|
'(This option can be repeated).'))
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--allocation_pool',
|
'--allocation_pool',
|
||||||
action='append', dest='allocation_pools', type=utils.str2dict,
|
action='append', dest='allocation_pools',
|
||||||
|
type=utils.str2dict_type(required_keys=['start', 'end']),
|
||||||
help=argparse.SUPPRESS)
|
help=argparse.SUPPRESS)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--host-route', metavar='destination=CIDR,nexthop=IP_ADDR',
|
'--host-route', metavar='destination=CIDR,nexthop=IP_ADDR',
|
||||||
action='append', dest='host_routes', type=utils.str2dict,
|
action='append', dest='host_routes',
|
||||||
|
type=utils.str2dict_type(required_keys=['destination', 'nexthop']),
|
||||||
help=_('Additional route (This option can be repeated).'))
|
help=_('Additional route (This option can be repeated).'))
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--dns-nameserver', metavar='DNS_NAMESERVER',
|
'--dns-nameserver', metavar='DNS_NAMESERVER',
|
||||||
|
@@ -71,7 +71,7 @@ class CreateIKEPolicy(neutronv20.CreateCommand):
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--lifetime',
|
'--lifetime',
|
||||||
metavar="units=UNITS,value=VALUE",
|
metavar="units=UNITS,value=VALUE",
|
||||||
type=utils.str2dict,
|
type=utils.str2dict_type(optional_keys=['units', 'value']),
|
||||||
help=vpn_utils.lifetime_help("IKE"))
|
help=vpn_utils.lifetime_help("IKE"))
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'name', metavar='NAME',
|
'name', metavar='NAME',
|
||||||
@@ -100,7 +100,7 @@ class UpdateIKEPolicy(neutronv20.UpdateCommand):
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--lifetime',
|
'--lifetime',
|
||||||
metavar="units=UNITS,value=VALUE",
|
metavar="units=UNITS,value=VALUE",
|
||||||
type=utils.str2dict,
|
type=utils.str2dict_type(optional_keys=['units', 'value']),
|
||||||
help=vpn_utils.lifetime_help("IKE"))
|
help=vpn_utils.lifetime_help("IKE"))
|
||||||
|
|
||||||
def args2body(self, parsed_args):
|
def args2body(self, parsed_args):
|
||||||
|
@@ -55,7 +55,8 @@ class IPsecSiteConnectionMixin(object):
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--dpd',
|
'--dpd',
|
||||||
metavar="action=ACTION,interval=INTERVAL,timeout=TIMEOUT",
|
metavar="action=ACTION,interval=INTERVAL,timeout=TIMEOUT",
|
||||||
type=utils.str2dict,
|
type=utils.str2dict_type(
|
||||||
|
optional_keys=['action', 'interval', 'timeout']),
|
||||||
help=vpn_utils.dpd_help("IPsec connection."))
|
help=vpn_utils.dpd_help("IPsec connection."))
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--local-ep-group',
|
'--local-ep-group',
|
||||||
|
@@ -70,7 +70,7 @@ class CreateIPsecPolicy(neutronv20.CreateCommand):
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--lifetime',
|
'--lifetime',
|
||||||
metavar="units=UNITS,value=VALUE",
|
metavar="units=UNITS,value=VALUE",
|
||||||
type=utils.str2dict,
|
type=utils.str2dict_type(optional_keys=['units', 'value']),
|
||||||
help=vpn_utils.lifetime_help("IPsec"))
|
help=vpn_utils.lifetime_help("IPsec"))
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'name', metavar='NAME',
|
'name', metavar='NAME',
|
||||||
@@ -99,7 +99,7 @@ class UpdateIPsecPolicy(neutronv20.UpdateCommand):
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--lifetime',
|
'--lifetime',
|
||||||
metavar="units=UNITS,value=VALUE",
|
metavar="units=UNITS,value=VALUE",
|
||||||
type=utils.str2dict,
|
type=utils.str2dict_type(optional_keys=['units', 'value']),
|
||||||
help=vpn_utils.lifetime_help("IPsec"))
|
help=vpn_utils.lifetime_help("IPsec"))
|
||||||
|
|
||||||
def args2body(self, parsed_args):
|
def args2body(self, parsed_args):
|
||||||
|
@@ -603,13 +603,14 @@ class CLITestV20PortJSON(test_cli20.CLITestV20Base):
|
|||||||
resource = 'port'
|
resource = 'port'
|
||||||
cmd = port.UpdatePort(test_cli20.MyApp(sys.stdout), None)
|
cmd = port.UpdatePort(test_cli20.MyApp(sys.stdout), None)
|
||||||
myid = 'myid'
|
myid = 'myid'
|
||||||
net_id = 'net_id'
|
subnet_id = 'subnet_id'
|
||||||
ip_addr = '123.123.123.123'
|
ip_addr = '123.123.123.123'
|
||||||
args = [myid,
|
args = [myid,
|
||||||
'--fixed-ip', "network_id=%(net_id)s,ip_address=%(ip_addr)s" %
|
'--fixed-ip',
|
||||||
{'net_id': net_id,
|
"subnet_id=%(subnet_id)s,ip_address=%(ip_addr)s" %
|
||||||
|
{'subnet_id': subnet_id,
|
||||||
'ip_addr': ip_addr}]
|
'ip_addr': ip_addr}]
|
||||||
updated_fields = {"fixed_ips": [{'network_id': net_id,
|
updated_fields = {"fixed_ips": [{'subnet_id': subnet_id,
|
||||||
'ip_address': ip_addr}]}
|
'ip_address': ip_addr}]}
|
||||||
self._test_update_resource(resource, cmd, myid, args, updated_fields)
|
self._test_update_resource(resource, cmd, myid, args, updated_fields)
|
||||||
|
|
||||||
|
@@ -49,6 +49,36 @@ class TestUtils(testtools.TestCase):
|
|||||||
self.assertRaises(argparse.ArgumentTypeError,
|
self.assertRaises(argparse.ArgumentTypeError,
|
||||||
utils.str2dict, input_str)
|
utils.str2dict, input_str)
|
||||||
|
|
||||||
|
def test_str2dict_optional_keys(self):
|
||||||
|
self.assertDictEqual({'key1': 'value1'},
|
||||||
|
utils.str2dict('key1=value1',
|
||||||
|
optional_keys=['key1', 'key2']))
|
||||||
|
self.assertDictEqual({'key1': 'value1', 'key2': 'value2'},
|
||||||
|
utils.str2dict('key1=value1,key2=value2',
|
||||||
|
optional_keys=['key1', 'key2']))
|
||||||
|
e = self.assertRaises(argparse.ArgumentTypeError,
|
||||||
|
utils.str2dict,
|
||||||
|
'key1=value1,key2=value2,key3=value3',
|
||||||
|
optional_keys=['key1', 'key2'])
|
||||||
|
self.assertEqual("Invalid key(s) 'key3' specified. "
|
||||||
|
"Valid key(s): 'key1, key2'.",
|
||||||
|
str(e))
|
||||||
|
|
||||||
|
def test_str2dict_required_keys(self):
|
||||||
|
self.assertDictEqual(
|
||||||
|
{'key1': 'value1', 'key2': 'value2', 'key3': 'value3'},
|
||||||
|
utils.str2dict('key1=value1,key2=value2,key3=value3',
|
||||||
|
required_keys=['key1', 'key2'],
|
||||||
|
optional_keys=['key3']))
|
||||||
|
self.assertDictEqual(
|
||||||
|
{'key1': 'value1', 'key2': 'value2'},
|
||||||
|
utils.str2dict('key1=value1,key2=value2',
|
||||||
|
required_keys=['key1', 'key2']))
|
||||||
|
e = self.assertRaises(argparse.ArgumentTypeError,
|
||||||
|
utils.str2dict, 'key1=value1',
|
||||||
|
required_keys=['key1', 'key2'])
|
||||||
|
self.assertEqual("Required key(s) 'key2' not specified.", str(e))
|
||||||
|
|
||||||
def test_get_dict_item_properties(self):
|
def test_get_dict_item_properties(self):
|
||||||
item = {'name': 'test_name', 'id': 'test_id'}
|
item = {'name': 'test_name', 'id': 'test_id'}
|
||||||
fields = ('name', 'id')
|
fields = ('name', 'id')
|
||||||
|
@@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
from neutronclient.common import exceptions
|
||||||
from neutronclient.neutron.v2_0.vpn import ikepolicy
|
from neutronclient.neutron.v2_0.vpn import ikepolicy
|
||||||
from neutronclient.tests.unit import test_cli20
|
from neutronclient.tests.unit import test_cli20
|
||||||
|
|
||||||
@@ -101,7 +102,7 @@ class CLITestV20VpnIkePolicyJSON(test_cli20.CLITestV20Base):
|
|||||||
self._test_create_resource(resource, cmd, name, my_id, args,
|
self._test_create_resource(resource, cmd, name, my_id, args,
|
||||||
position_names, position_values)
|
position_names, position_values)
|
||||||
|
|
||||||
def _test_lifetime_values(self, lifetime):
|
def _test_lifetime_values(self, lifetime, expected_exc=None):
|
||||||
resource = 'ikepolicy'
|
resource = 'ikepolicy'
|
||||||
cmd = ikepolicy.CreateIKEPolicy(test_cli20.MyApp(sys.stdout), None)
|
cmd = ikepolicy.CreateIKEPolicy(test_cli20.MyApp(sys.stdout), None)
|
||||||
name = 'ikepolicy1'
|
name = 'ikepolicy1'
|
||||||
@@ -134,16 +135,17 @@ class CLITestV20VpnIkePolicyJSON(test_cli20.CLITestV20Base):
|
|||||||
auth_algorithm, encryption_algorithm,
|
auth_algorithm, encryption_algorithm,
|
||||||
phase1_negotiation_mode, ike_version, pfs,
|
phase1_negotiation_mode, ike_version, pfs,
|
||||||
tenant_id]
|
tenant_id]
|
||||||
try:
|
if not expected_exc:
|
||||||
self._test_create_resource(resource, cmd, name, my_id, args,
|
expected_exc = exceptions.CommandError
|
||||||
position_names, position_values)
|
self.assertRaises(
|
||||||
except Exception:
|
expected_exc,
|
||||||
return
|
self._test_create_resource,
|
||||||
self.fail("IKEPolicy Lifetime Error")
|
resource, cmd, name, my_id, args,
|
||||||
|
position_names, position_values)
|
||||||
|
|
||||||
def test_create_ikepolicy_with_invalid_lifetime_keys(self):
|
def test_create_ikepolicy_with_invalid_lifetime_keys(self):
|
||||||
lifetime = 'uts=seconds,val=20000'
|
lifetime = 'uts=seconds,val=20000'
|
||||||
self._test_lifetime_values(lifetime)
|
self._test_lifetime_values(lifetime, expected_exc=SystemExit)
|
||||||
|
|
||||||
def test_create_ikepolicy_with_invalid_lifetime_value(self):
|
def test_create_ikepolicy_with_invalid_lifetime_value(self):
|
||||||
lifetime = 'units=seconds,value=-1'
|
lifetime = 'units=seconds,value=-1'
|
||||||
|
@@ -180,7 +180,7 @@ class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base):
|
|||||||
self._test_create_resource(resource, cmd, None, my_id, args,
|
self._test_create_resource(resource, cmd, None, my_id, args,
|
||||||
position_names, position_values)
|
position_names, position_values)
|
||||||
|
|
||||||
def _test_create_failure(self, additional_args=None):
|
def _test_create_failure(self, additional_args=None, expected_exc=None):
|
||||||
# Helper to test failure of IPSec site-to-site creation failure.
|
# Helper to test failure of IPSec site-to-site creation failure.
|
||||||
resource = 'ipsec_site_connection'
|
resource = 'ipsec_site_connection'
|
||||||
cmd = ipsec_site_connection.CreateIPsecSiteConnection(
|
cmd = ipsec_site_connection.CreateIPsecSiteConnection(
|
||||||
@@ -215,7 +215,9 @@ class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base):
|
|||||||
position_values = [tenant_id, admin_state, peer_address, peer_id, psk,
|
position_values = [tenant_id, admin_state, peer_address, peer_id, psk,
|
||||||
mtu, initiator, None, None, vpnservice_id,
|
mtu, initiator, None, None, vpnservice_id,
|
||||||
ikepolicy_id, ipsecpolicy_id]
|
ikepolicy_id, ipsecpolicy_id]
|
||||||
self.assertRaises(exceptions.CommandError,
|
if not expected_exc:
|
||||||
|
expected_exc = exceptions.CommandError
|
||||||
|
self.assertRaises(expected_exc,
|
||||||
self._test_create_resource,
|
self._test_create_resource,
|
||||||
resource, cmd, None, my_id, args,
|
resource, cmd, None, my_id, args,
|
||||||
position_names, position_values)
|
position_names, position_values)
|
||||||
@@ -227,7 +229,7 @@ class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base):
|
|||||||
|
|
||||||
def test_fail_create_with_invalid_dpd_keys(self):
|
def test_fail_create_with_invalid_dpd_keys(self):
|
||||||
bad_dpd_key = ['--dpd', 'act=restart,interval=30,time=120']
|
bad_dpd_key = ['--dpd', 'act=restart,interval=30,time=120']
|
||||||
self._test_create_failure(bad_dpd_key)
|
self._test_create_failure(bad_dpd_key, SystemExit)
|
||||||
|
|
||||||
def test_fail_create_with_invalid_dpd_values(self):
|
def test_fail_create_with_invalid_dpd_values(self):
|
||||||
bad_dpd_values = ['--dpd', 'action=hold,interval=30,timeout=-1']
|
bad_dpd_values = ['--dpd', 'action=hold,interval=30,timeout=-1']
|
||||||
|
@@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
from neutronclient.common import exceptions
|
||||||
from neutronclient.neutron.v2_0.vpn import ipsecpolicy
|
from neutronclient.neutron.v2_0.vpn import ipsecpolicy
|
||||||
from neutronclient.tests.unit import test_cli20
|
from neutronclient.tests.unit import test_cli20
|
||||||
|
|
||||||
@@ -98,7 +99,7 @@ class CLITestV20VpnIpsecPolicyJSON(test_cli20.CLITestV20Base):
|
|||||||
self._test_create_resource(resource, cmd, name, my_id, args,
|
self._test_create_resource(resource, cmd, name, my_id, args,
|
||||||
position_names, position_values)
|
position_names, position_values)
|
||||||
|
|
||||||
def _test_lifetime_values(self, lifetime):
|
def _test_lifetime_values(self, lifetime, expected_exc=None):
|
||||||
resource = 'ipsecpolicy'
|
resource = 'ipsecpolicy'
|
||||||
cmd = ipsecpolicy.CreateIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
|
cmd = ipsecpolicy.CreateIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
|
||||||
name = 'ipsecpolicy1'
|
name = 'ipsecpolicy1'
|
||||||
@@ -131,19 +132,24 @@ class CLITestV20VpnIpsecPolicyJSON(test_cli20.CLITestV20Base):
|
|||||||
auth_algorithm, encryption_algorithm,
|
auth_algorithm, encryption_algorithm,
|
||||||
phase1_negotiation_mode, ike_version, pfs,
|
phase1_negotiation_mode, ike_version, pfs,
|
||||||
tenant_id]
|
tenant_id]
|
||||||
try:
|
if not expected_exc:
|
||||||
self._test_create_resource(resource, cmd, name, my_id, args,
|
expected_exc = exceptions.CommandError
|
||||||
position_names, position_values)
|
self.assertRaises(
|
||||||
except Exception:
|
expected_exc,
|
||||||
return
|
self._test_create_resource,
|
||||||
self.fail("IPsecPolicy Lifetime Error")
|
resource, cmd, name, my_id, args,
|
||||||
|
position_names, position_values)
|
||||||
|
|
||||||
def test_create_ipsecpolicy_with_invalid_lifetime_keys(self):
|
def test_create_ipsecpolicy_with_invalid_lifetime_keys(self):
|
||||||
lifetime = 'uts=seconds,val=20000'
|
lifetime = 'uts=seconds,val=20000'
|
||||||
|
self._test_lifetime_values(lifetime, SystemExit)
|
||||||
|
|
||||||
|
def test_create_ipsecpolicy_with_invalid_lifetime_units(self):
|
||||||
|
lifetime = 'units=minutes,value=600'
|
||||||
self._test_lifetime_values(lifetime)
|
self._test_lifetime_values(lifetime)
|
||||||
|
|
||||||
def test_create_ipsecpolicy_with_invalide_lifetime_values(self):
|
def test_create_ipsecpolicy_with_invalid_lifetime_value(self):
|
||||||
lifetime = 'units=minutes,value=0'
|
lifetime = 'units=seconds,value=0'
|
||||||
self._test_lifetime_values(lifetime)
|
self._test_lifetime_values(lifetime)
|
||||||
|
|
||||||
def test_list_ipsecpolicy(self):
|
def test_list_ipsecpolicy(self):
|
||||||
|
Reference in New Issue
Block a user