Improve str2dict key validation to avoid wrong keys

This commit adds valid_keys and required_keys to str2dict
and define a new function which can be used as argparse type
validator.

By this function, we can declare what fields are valid and what
fields are required for dictionary option in option definition.

Change-Id: Ib7b233e082c15d0bd6e16a754b8acad52e413986
Closes-Bug: #1102897
This commit is contained in:
Akihiro Motoki 2015-12-06 16:55:24 +09:00 committed by Akihiro Motoki
parent f18aea286b
commit ed17ae60bd
13 changed files with 132 additions and 49 deletions

View File

@ -18,6 +18,7 @@
"""Utilities and helper functions."""
import argparse
import functools
import logging
import netaddr
import os
@ -108,10 +109,21 @@ def str2bool(strbool):
return strbool.lower() == 'true'
def str2dict(strdict):
def str2dict(strdict, required_keys=None, optional_keys=None):
"""Convert key1=value1,key2=value2,... string into dictionary.
:param strdict: key1=value1,key2=value2
:param strdict: string in the form of key1=value1,key2=value2
:param required_keys: list of required keys. All keys in this list must be
specified. Otherwise ArgumentTypeError will be raised.
If this parameter is unspecified, no required key check
will be done.
:param optional_keys: list of optional keys.
This parameter is used for valid key check.
When at least one of required_keys and optional_keys,
a key must be a member of either of required_keys or
optional_keys. Otherwise, ArgumentTypeError will be
raised. When both required_keys and optional_keys are
unspecified, no valid key check will be done.
"""
result = {}
if strdict:
@ -121,9 +133,29 @@ def str2dict(strdict):
msg = _("invalid key-value '%s', expected format: key=value")
raise argparse.ArgumentTypeError(msg % kv)
result[key] = value
valid_keys = set(required_keys or []) | set(optional_keys or [])
if valid_keys:
invalid_keys = [k for k in result if k not in valid_keys]
if invalid_keys:
msg = _("Invalid key(s) '%(invalid_keys)s' specified. "
"Valid key(s): '%(valid_keys)s'.")
raise argparse.ArgumentTypeError(
msg % {'invalid_keys': ', '.join(sorted(invalid_keys)),
'valid_keys': ', '.join(sorted(valid_keys))})
if required_keys:
not_found_keys = [k for k in required_keys if k not in result]
if not_found_keys:
msg = _("Required key(s) '%s' not specified.")
raise argparse.ArgumentTypeError(msg % ', '.join(not_found_keys))
return result
def str2dict_type(optional_keys=None, required_keys=None):
return functools.partial(str2dict,
optional_keys=optional_keys,
required_keys=required_keys)
def http_log_req(_logger, args, kwargs):
if not _logger.isEnabledFor(logging.DEBUG):
return

View File

@ -64,6 +64,8 @@ class CreatePool(neutronV20.CreateCommand):
parser.add_argument(
'--session-persistence',
metavar='type=TYPE[,cookie_name=COOKIE_NAME]',
type=utils.str2dict_type(required_keys=['type'],
optional_keys=['cookie_name']),
help=_('The type of session persistence to use and associated '
'cookie name'))
parser.add_argument(
@ -86,9 +88,6 @@ class CreatePool(neutronV20.CreateCommand):
help=_('Protocol for balancing.'))
def args2body(self, parsed_args):
if parsed_args.session_persistence:
parsed_args.session_persistence = utils.str2dict(
parsed_args.session_persistence)
_listener_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'listener', parsed_args.listener)
body = {'admin_state_up': parsed_args.admin_state,

View File

@ -46,6 +46,7 @@ def _add_updatable_args(parser):
parser.add_argument(
'--fixed-ip', metavar='subnet_id=SUBNET,ip_address=IP_ADDR',
action='append',
type=utils.str2dict_type(optional_keys=['subnet_id', 'ip_address']),
help=_('Desired IP and/or subnet for this port: '
'subnet_id=<name_or_id>,ip_address=<ip>. '
'You can repeat this option.'))
@ -73,13 +74,12 @@ def _updatable_args2body(parsed_args, body, client):
ips = []
if parsed_args.fixed_ip:
for ip_spec in parsed_args.fixed_ip:
ip_dict = utils.str2dict(ip_spec)
if 'subnet_id' in ip_dict:
subnet_name_id = ip_dict['subnet_id']
if 'subnet_id' in ip_spec:
subnet_name_id = ip_spec['subnet_id']
_subnet_id = neutronV20.find_resourceid_by_name_or_id(
client, 'subnet', subnet_name_id)
ip_dict['subnet_id'] = _subnet_id
ips.append(ip_dict)
ip_spec['subnet_id'] = _subnet_id
ips.append(ip_spec)
if ips:
body['fixed_ips'] = ips
@ -158,6 +158,9 @@ class UpdateExtraDhcpOptMixin(object):
default=[],
action='append',
dest='extra_dhcp_opts',
type=utils.str2dict_type(
required_keys=['opt_name'],
optional_keys=['opt_value', 'ip_version']),
help=_('Extra dhcp options to be assigned to this port: '
'opt_name=<dhcp_option_name>,opt_value=<value>,'
'ip_version={4,6}. You can repeat this option.'))
@ -174,7 +177,7 @@ class UpdateExtraDhcpOptMixin(object):
"ip_version={4,6}. "
"You can repeat this option.")
for opt in parsed_args.extra_dhcp_opts:
opt_ele.update(utils.str2dict(opt))
opt_ele.update(opt)
if ('opt_name' in opt_ele and
('opt_value' in opt_ele or 'ip_version' in opt_ele)):
if opt_ele.get('opt_value') == 'null':
@ -199,7 +202,9 @@ class UpdatePortAllowedAddressPair(object):
default=[],
action='append',
dest='allowed_address_pairs',
type=utils.str2dict,
type=utils.str2dict_type(
required_keys=['ip_address'],
optional_keys=['mac_address']),
help=_('Allowed address pair associated with the port.'
'You can repeat this option.'))
group_aap.add_argument(

View File

@ -114,7 +114,8 @@ class UpdateRouter(neutronV20.UpdateCommand):
routes_group = parser.add_mutually_exclusive_group()
routes_group.add_argument(
'--route', metavar='destination=CIDR,nexthop=IP_ADDR',
action='append', dest='routes', type=utils.str2dict,
action='append', dest='routes',
type=utils.str2dict_type(required_keys=['destination', 'nexthop']),
help=_('Route to associate with the router.'
' You can repeat this option.'))
routes_group.add_argument(
@ -226,6 +227,8 @@ class SetGatewayRouter(neutronV20.NeutronCommand):
parser.add_argument(
'--fixed-ip', metavar='subnet_id=SUBNET,ip_address=IP_ADDR',
action='append',
type=utils.str2dict_type(optional_keys=['subnet_id',
'ip_address']),
help=_('Desired IP and/or subnet on external network: '
'subnet_id=<name_or_id>,ip_address=<ip>. '
'You can specify both of subnet_id and ip_address or '
@ -246,13 +249,12 @@ class SetGatewayRouter(neutronV20.NeutronCommand):
if parsed_args.fixed_ip:
ips = []
for ip_spec in parsed_args.fixed_ip:
ip_dict = utils.str2dict(ip_spec)
subnet_name_id = ip_dict.get('subnet_id')
subnet_name_id = ip_spec.get('subnet_id')
if subnet_name_id:
subnet_id = neutronV20.find_resourceid_by_name_or_id(
neutron_client, 'subnet', subnet_name_id)
ip_dict['subnet_id'] = subnet_id
ips.append(ip_dict)
ip_spec['subnet_id'] = subnet_id
ips.append(ip_spec)
router_dict['external_fixed_ips'] = ips
neutron_client.add_gateway_router(_router_id, router_dict)
print(_('Set gateway for router %s') % parsed_args.router,

View File

@ -62,16 +62,19 @@ def add_updatable_arguments(parser):
help=_('No distribution of gateway.'))
parser.add_argument(
'--allocation-pool', metavar='start=IP_ADDR,end=IP_ADDR',
action='append', dest='allocation_pools', type=utils.str2dict,
action='append', dest='allocation_pools',
type=utils.str2dict_type(required_keys=['start', 'end']),
help=_('Allocation pool IP addresses for this subnet '
'(This option can be repeated).'))
parser.add_argument(
'--allocation_pool',
action='append', dest='allocation_pools', type=utils.str2dict,
action='append', dest='allocation_pools',
type=utils.str2dict_type(required_keys=['start', 'end']),
help=argparse.SUPPRESS)
parser.add_argument(
'--host-route', metavar='destination=CIDR,nexthop=IP_ADDR',
action='append', dest='host_routes', type=utils.str2dict,
action='append', dest='host_routes',
type=utils.str2dict_type(required_keys=['destination', 'nexthop']),
help=_('Additional route (This option can be repeated).'))
parser.add_argument(
'--dns-nameserver', metavar='DNS_NAMESERVER',

View File

@ -71,7 +71,7 @@ class CreateIKEPolicy(neutronv20.CreateCommand):
parser.add_argument(
'--lifetime',
metavar="units=UNITS,value=VALUE",
type=utils.str2dict,
type=utils.str2dict_type(optional_keys=['units', 'value']),
help=vpn_utils.lifetime_help("IKE"))
parser.add_argument(
'name', metavar='NAME',
@ -100,7 +100,7 @@ class UpdateIKEPolicy(neutronv20.UpdateCommand):
parser.add_argument(
'--lifetime',
metavar="units=UNITS,value=VALUE",
type=utils.str2dict,
type=utils.str2dict_type(optional_keys=['units', 'value']),
help=vpn_utils.lifetime_help("IKE"))
def args2body(self, parsed_args):

View File

@ -55,7 +55,8 @@ class IPsecSiteConnectionMixin(object):
parser.add_argument(
'--dpd',
metavar="action=ACTION,interval=INTERVAL,timeout=TIMEOUT",
type=utils.str2dict,
type=utils.str2dict_type(
optional_keys=['action', 'interval', 'timeout']),
help=vpn_utils.dpd_help("IPsec connection."))
parser.add_argument(
'--local-ep-group',

View File

@ -70,7 +70,7 @@ class CreateIPsecPolicy(neutronv20.CreateCommand):
parser.add_argument(
'--lifetime',
metavar="units=UNITS,value=VALUE",
type=utils.str2dict,
type=utils.str2dict_type(optional_keys=['units', 'value']),
help=vpn_utils.lifetime_help("IPsec"))
parser.add_argument(
'name', metavar='NAME',
@ -99,7 +99,7 @@ class UpdateIPsecPolicy(neutronv20.UpdateCommand):
parser.add_argument(
'--lifetime',
metavar="units=UNITS,value=VALUE",
type=utils.str2dict,
type=utils.str2dict_type(optional_keys=['units', 'value']),
help=vpn_utils.lifetime_help("IPsec"))
def args2body(self, parsed_args):

View File

@ -589,13 +589,14 @@ class CLITestV20PortJSON(test_cli20.CLITestV20Base):
resource = 'port'
cmd = port.UpdatePort(test_cli20.MyApp(sys.stdout), None)
myid = 'myid'
net_id = 'net_id'
subnet_id = 'subnet_id'
ip_addr = '123.123.123.123'
args = [myid,
'--fixed-ip', "network_id=%(net_id)s,ip_address=%(ip_addr)s" %
{'net_id': net_id,
'--fixed-ip',
"subnet_id=%(subnet_id)s,ip_address=%(ip_addr)s" %
{'subnet_id': subnet_id,
'ip_addr': ip_addr}]
updated_fields = {"fixed_ips": [{'network_id': net_id,
updated_fields = {"fixed_ips": [{'subnet_id': subnet_id,
'ip_address': ip_addr}]}
self._test_update_resource(resource, cmd, myid, args, updated_fields)

View File

@ -49,6 +49,36 @@ class TestUtils(testtools.TestCase):
self.assertRaises(argparse.ArgumentTypeError,
utils.str2dict, input_str)
def test_str2dict_optional_keys(self):
self.assertDictEqual({'key1': 'value1'},
utils.str2dict('key1=value1',
optional_keys=['key1', 'key2']))
self.assertDictEqual({'key1': 'value1', 'key2': 'value2'},
utils.str2dict('key1=value1,key2=value2',
optional_keys=['key1', 'key2']))
e = self.assertRaises(argparse.ArgumentTypeError,
utils.str2dict,
'key1=value1,key2=value2,key3=value3',
optional_keys=['key1', 'key2'])
self.assertEqual("Invalid key(s) 'key3' specified. "
"Valid key(s): 'key1, key2'.",
str(e))
def test_str2dict_required_keys(self):
self.assertDictEqual(
{'key1': 'value1', 'key2': 'value2', 'key3': 'value3'},
utils.str2dict('key1=value1,key2=value2,key3=value3',
required_keys=['key1', 'key2'],
optional_keys=['key3']))
self.assertDictEqual(
{'key1': 'value1', 'key2': 'value2'},
utils.str2dict('key1=value1,key2=value2',
required_keys=['key1', 'key2']))
e = self.assertRaises(argparse.ArgumentTypeError,
utils.str2dict, 'key1=value1',
required_keys=['key1', 'key2'])
self.assertEqual("Required key(s) 'key2' not specified.", str(e))
def test_get_dict_item_properties(self):
item = {'name': 'test_name', 'id': 'test_id'}
fields = ('name', 'id')

View File

@ -16,6 +16,7 @@
import sys
from neutronclient.common import exceptions
from neutronclient.neutron.v2_0.vpn import ikepolicy
from neutronclient.tests.unit import test_cli20
@ -101,7 +102,7 @@ class CLITestV20VpnIkePolicyJSON(test_cli20.CLITestV20Base):
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values)
def _test_lifetime_values(self, lifetime):
def _test_lifetime_values(self, lifetime, expected_exc=None):
resource = 'ikepolicy'
cmd = ikepolicy.CreateIKEPolicy(test_cli20.MyApp(sys.stdout), None)
name = 'ikepolicy1'
@ -134,16 +135,17 @@ class CLITestV20VpnIkePolicyJSON(test_cli20.CLITestV20Base):
auth_algorithm, encryption_algorithm,
phase1_negotiation_mode, ike_version, pfs,
tenant_id]
try:
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values)
except Exception:
return
self.fail("IKEPolicy Lifetime Error")
if not expected_exc:
expected_exc = exceptions.CommandError
self.assertRaises(
expected_exc,
self._test_create_resource,
resource, cmd, name, my_id, args,
position_names, position_values)
def test_create_ikepolicy_with_invalid_lifetime_keys(self):
lifetime = 'uts=seconds,val=20000'
self._test_lifetime_values(lifetime)
self._test_lifetime_values(lifetime, expected_exc=SystemExit)
def test_create_ikepolicy_with_invalid_lifetime_value(self):
lifetime = 'units=seconds,value=-1'

View File

@ -180,7 +180,7 @@ class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base):
self._test_create_resource(resource, cmd, None, my_id, args,
position_names, position_values)
def _test_create_failure(self, additional_args=None):
def _test_create_failure(self, additional_args=None, expected_exc=None):
# Helper to test failure of IPSec site-to-site creation failure.
resource = 'ipsec_site_connection'
cmd = ipsec_site_connection.CreateIPsecSiteConnection(
@ -215,7 +215,9 @@ class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base):
position_values = [tenant_id, admin_state, peer_address, peer_id, psk,
mtu, initiator, None, None, vpnservice_id,
ikepolicy_id, ipsecpolicy_id]
self.assertRaises(exceptions.CommandError,
if not expected_exc:
expected_exc = exceptions.CommandError
self.assertRaises(expected_exc,
self._test_create_resource,
resource, cmd, None, my_id, args,
position_names, position_values)
@ -227,7 +229,7 @@ class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base):
def test_fail_create_with_invalid_dpd_keys(self):
bad_dpd_key = ['--dpd', 'act=restart,interval=30,time=120']
self._test_create_failure(bad_dpd_key)
self._test_create_failure(bad_dpd_key, SystemExit)
def test_fail_create_with_invalid_dpd_values(self):
bad_dpd_values = ['--dpd', 'action=hold,interval=30,timeout=-1']

View File

@ -16,6 +16,7 @@
import sys
from neutronclient.common import exceptions
from neutronclient.neutron.v2_0.vpn import ipsecpolicy
from neutronclient.tests.unit import test_cli20
@ -98,7 +99,7 @@ class CLITestV20VpnIpsecPolicyJSON(test_cli20.CLITestV20Base):
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values)
def _test_lifetime_values(self, lifetime):
def _test_lifetime_values(self, lifetime, expected_exc=None):
resource = 'ipsecpolicy'
cmd = ipsecpolicy.CreateIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
name = 'ipsecpolicy1'
@ -131,19 +132,24 @@ class CLITestV20VpnIpsecPolicyJSON(test_cli20.CLITestV20Base):
auth_algorithm, encryption_algorithm,
phase1_negotiation_mode, ike_version, pfs,
tenant_id]
try:
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values)
except Exception:
return
self.fail("IPsecPolicy Lifetime Error")
if not expected_exc:
expected_exc = exceptions.CommandError
self.assertRaises(
expected_exc,
self._test_create_resource,
resource, cmd, name, my_id, args,
position_names, position_values)
def test_create_ipsecpolicy_with_invalid_lifetime_keys(self):
lifetime = 'uts=seconds,val=20000'
self._test_lifetime_values(lifetime, SystemExit)
def test_create_ipsecpolicy_with_invalid_lifetime_units(self):
lifetime = 'units=minutes,value=600'
self._test_lifetime_values(lifetime)
def test_create_ipsecpolicy_with_invalide_lifetime_values(self):
lifetime = 'units=minutes,value=0'
def test_create_ipsecpolicy_with_invalid_lifetime_value(self):
lifetime = 'units=seconds,value=0'
self._test_lifetime_values(lifetime)
def test_list_ipsecpolicy(self):