From ed17ae60bd5fbcfffb94e91f270191f0b4d92970 Mon Sep 17 00:00:00 2001 From: Akihiro Motoki Date: Sun, 6 Dec 2015 16:55:24 +0900 Subject: [PATCH] Improve str2dict key validation to avoid wrong keys This commit adds valid_keys and required_keys to str2dict and define a new function which can be used as argparse type validator. By this function, we can declare what fields are valid and what fields are required for dictionary option in option definition. Change-Id: Ib7b233e082c15d0bd6e16a754b8acad52e413986 Closes-Bug: #1102897 --- neutronclient/common/utils.py | 36 +++++++++++++++++-- neutronclient/neutron/v2_0/lb/v2/pool.py | 5 ++- neutronclient/neutron/v2_0/port.py | 19 ++++++---- neutronclient/neutron/v2_0/router.py | 12 ++++--- neutronclient/neutron/v2_0/subnet.py | 9 +++-- neutronclient/neutron/v2_0/vpn/ikepolicy.py | 4 +-- .../neutron/v2_0/vpn/ipsec_site_connection.py | 3 +- neutronclient/neutron/v2_0/vpn/ipsecpolicy.py | 4 +-- neutronclient/tests/unit/test_cli20_port.py | 9 ++--- neutronclient/tests/unit/test_utils.py | 30 ++++++++++++++++ .../tests/unit/vpn/test_cli20_ikepolicy.py | 18 +++++----- .../vpn/test_cli20_ipsec_site_connection.py | 8 +++-- .../tests/unit/vpn/test_cli20_ipsecpolicy.py | 24 ++++++++----- 13 files changed, 132 insertions(+), 49 deletions(-) diff --git a/neutronclient/common/utils.py b/neutronclient/common/utils.py index 4f19dac40..47610e60a 100644 --- a/neutronclient/common/utils.py +++ b/neutronclient/common/utils.py @@ -18,6 +18,7 @@ """Utilities and helper functions.""" import argparse +import functools import logging import netaddr import os @@ -108,10 +109,21 @@ def str2bool(strbool): return strbool.lower() == 'true' -def str2dict(strdict): +def str2dict(strdict, required_keys=None, optional_keys=None): """Convert key1=value1,key2=value2,... string into dictionary. - :param strdict: key1=value1,key2=value2 + :param strdict: string in the form of key1=value1,key2=value2 + :param required_keys: list of required keys. All keys in this list must be + specified. Otherwise ArgumentTypeError will be raised. + If this parameter is unspecified, no required key check + will be done. + :param optional_keys: list of optional keys. + This parameter is used for valid key check. + When at least one of required_keys and optional_keys, + a key must be a member of either of required_keys or + optional_keys. Otherwise, ArgumentTypeError will be + raised. When both required_keys and optional_keys are + unspecified, no valid key check will be done. """ result = {} if strdict: @@ -121,9 +133,29 @@ def str2dict(strdict): msg = _("invalid key-value '%s', expected format: key=value") raise argparse.ArgumentTypeError(msg % kv) result[key] = value + valid_keys = set(required_keys or []) | set(optional_keys or []) + if valid_keys: + invalid_keys = [k for k in result if k not in valid_keys] + if invalid_keys: + msg = _("Invalid key(s) '%(invalid_keys)s' specified. " + "Valid key(s): '%(valid_keys)s'.") + raise argparse.ArgumentTypeError( + msg % {'invalid_keys': ', '.join(sorted(invalid_keys)), + 'valid_keys': ', '.join(sorted(valid_keys))}) + if required_keys: + not_found_keys = [k for k in required_keys if k not in result] + if not_found_keys: + msg = _("Required key(s) '%s' not specified.") + raise argparse.ArgumentTypeError(msg % ', '.join(not_found_keys)) return result +def str2dict_type(optional_keys=None, required_keys=None): + return functools.partial(str2dict, + optional_keys=optional_keys, + required_keys=required_keys) + + def http_log_req(_logger, args, kwargs): if not _logger.isEnabledFor(logging.DEBUG): return diff --git a/neutronclient/neutron/v2_0/lb/v2/pool.py b/neutronclient/neutron/v2_0/lb/v2/pool.py index 3c1ea99b8..2745091e4 100644 --- a/neutronclient/neutron/v2_0/lb/v2/pool.py +++ b/neutronclient/neutron/v2_0/lb/v2/pool.py @@ -64,6 +64,8 @@ class CreatePool(neutronV20.CreateCommand): parser.add_argument( '--session-persistence', metavar='type=TYPE[,cookie_name=COOKIE_NAME]', + type=utils.str2dict_type(required_keys=['type'], + optional_keys=['cookie_name']), help=_('The type of session persistence to use and associated ' 'cookie name')) parser.add_argument( @@ -86,9 +88,6 @@ class CreatePool(neutronV20.CreateCommand): help=_('Protocol for balancing.')) def args2body(self, parsed_args): - if parsed_args.session_persistence: - parsed_args.session_persistence = utils.str2dict( - parsed_args.session_persistence) _listener_id = neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'listener', parsed_args.listener) body = {'admin_state_up': parsed_args.admin_state, diff --git a/neutronclient/neutron/v2_0/port.py b/neutronclient/neutron/v2_0/port.py index 0e27cd227..9d71e1a4e 100644 --- a/neutronclient/neutron/v2_0/port.py +++ b/neutronclient/neutron/v2_0/port.py @@ -46,6 +46,7 @@ def _add_updatable_args(parser): parser.add_argument( '--fixed-ip', metavar='subnet_id=SUBNET,ip_address=IP_ADDR', action='append', + type=utils.str2dict_type(optional_keys=['subnet_id', 'ip_address']), help=_('Desired IP and/or subnet for this port: ' 'subnet_id=,ip_address=. ' 'You can repeat this option.')) @@ -73,13 +74,12 @@ def _updatable_args2body(parsed_args, body, client): ips = [] if parsed_args.fixed_ip: for ip_spec in parsed_args.fixed_ip: - ip_dict = utils.str2dict(ip_spec) - if 'subnet_id' in ip_dict: - subnet_name_id = ip_dict['subnet_id'] + if 'subnet_id' in ip_spec: + subnet_name_id = ip_spec['subnet_id'] _subnet_id = neutronV20.find_resourceid_by_name_or_id( client, 'subnet', subnet_name_id) - ip_dict['subnet_id'] = _subnet_id - ips.append(ip_dict) + ip_spec['subnet_id'] = _subnet_id + ips.append(ip_spec) if ips: body['fixed_ips'] = ips @@ -158,6 +158,9 @@ class UpdateExtraDhcpOptMixin(object): default=[], action='append', dest='extra_dhcp_opts', + type=utils.str2dict_type( + required_keys=['opt_name'], + optional_keys=['opt_value', 'ip_version']), help=_('Extra dhcp options to be assigned to this port: ' 'opt_name=,opt_value=,' 'ip_version={4,6}. You can repeat this option.')) @@ -174,7 +177,7 @@ class UpdateExtraDhcpOptMixin(object): "ip_version={4,6}. " "You can repeat this option.") for opt in parsed_args.extra_dhcp_opts: - opt_ele.update(utils.str2dict(opt)) + opt_ele.update(opt) if ('opt_name' in opt_ele and ('opt_value' in opt_ele or 'ip_version' in opt_ele)): if opt_ele.get('opt_value') == 'null': @@ -199,7 +202,9 @@ class UpdatePortAllowedAddressPair(object): default=[], action='append', dest='allowed_address_pairs', - type=utils.str2dict, + type=utils.str2dict_type( + required_keys=['ip_address'], + optional_keys=['mac_address']), help=_('Allowed address pair associated with the port.' 'You can repeat this option.')) group_aap.add_argument( diff --git a/neutronclient/neutron/v2_0/router.py b/neutronclient/neutron/v2_0/router.py index 88da856b3..c372f98ae 100644 --- a/neutronclient/neutron/v2_0/router.py +++ b/neutronclient/neutron/v2_0/router.py @@ -114,7 +114,8 @@ class UpdateRouter(neutronV20.UpdateCommand): routes_group = parser.add_mutually_exclusive_group() routes_group.add_argument( '--route', metavar='destination=CIDR,nexthop=IP_ADDR', - action='append', dest='routes', type=utils.str2dict, + action='append', dest='routes', + type=utils.str2dict_type(required_keys=['destination', 'nexthop']), help=_('Route to associate with the router.' ' You can repeat this option.')) routes_group.add_argument( @@ -226,6 +227,8 @@ class SetGatewayRouter(neutronV20.NeutronCommand): parser.add_argument( '--fixed-ip', metavar='subnet_id=SUBNET,ip_address=IP_ADDR', action='append', + type=utils.str2dict_type(optional_keys=['subnet_id', + 'ip_address']), help=_('Desired IP and/or subnet on external network: ' 'subnet_id=,ip_address=. ' 'You can specify both of subnet_id and ip_address or ' @@ -246,13 +249,12 @@ class SetGatewayRouter(neutronV20.NeutronCommand): if parsed_args.fixed_ip: ips = [] for ip_spec in parsed_args.fixed_ip: - ip_dict = utils.str2dict(ip_spec) - subnet_name_id = ip_dict.get('subnet_id') + subnet_name_id = ip_spec.get('subnet_id') if subnet_name_id: subnet_id = neutronV20.find_resourceid_by_name_or_id( neutron_client, 'subnet', subnet_name_id) - ip_dict['subnet_id'] = subnet_id - ips.append(ip_dict) + ip_spec['subnet_id'] = subnet_id + ips.append(ip_spec) router_dict['external_fixed_ips'] = ips neutron_client.add_gateway_router(_router_id, router_dict) print(_('Set gateway for router %s') % parsed_args.router, diff --git a/neutronclient/neutron/v2_0/subnet.py b/neutronclient/neutron/v2_0/subnet.py index d8b594663..3abe313f2 100644 --- a/neutronclient/neutron/v2_0/subnet.py +++ b/neutronclient/neutron/v2_0/subnet.py @@ -62,16 +62,19 @@ def add_updatable_arguments(parser): help=_('No distribution of gateway.')) parser.add_argument( '--allocation-pool', metavar='start=IP_ADDR,end=IP_ADDR', - action='append', dest='allocation_pools', type=utils.str2dict, + action='append', dest='allocation_pools', + type=utils.str2dict_type(required_keys=['start', 'end']), help=_('Allocation pool IP addresses for this subnet ' '(This option can be repeated).')) parser.add_argument( '--allocation_pool', - action='append', dest='allocation_pools', type=utils.str2dict, + action='append', dest='allocation_pools', + type=utils.str2dict_type(required_keys=['start', 'end']), help=argparse.SUPPRESS) parser.add_argument( '--host-route', metavar='destination=CIDR,nexthop=IP_ADDR', - action='append', dest='host_routes', type=utils.str2dict, + action='append', dest='host_routes', + type=utils.str2dict_type(required_keys=['destination', 'nexthop']), help=_('Additional route (This option can be repeated).')) parser.add_argument( '--dns-nameserver', metavar='DNS_NAMESERVER', diff --git a/neutronclient/neutron/v2_0/vpn/ikepolicy.py b/neutronclient/neutron/v2_0/vpn/ikepolicy.py index 509083163..fe41655ee 100644 --- a/neutronclient/neutron/v2_0/vpn/ikepolicy.py +++ b/neutronclient/neutron/v2_0/vpn/ikepolicy.py @@ -71,7 +71,7 @@ class CreateIKEPolicy(neutronv20.CreateCommand): parser.add_argument( '--lifetime', metavar="units=UNITS,value=VALUE", - type=utils.str2dict, + type=utils.str2dict_type(optional_keys=['units', 'value']), help=vpn_utils.lifetime_help("IKE")) parser.add_argument( 'name', metavar='NAME', @@ -100,7 +100,7 @@ class UpdateIKEPolicy(neutronv20.UpdateCommand): parser.add_argument( '--lifetime', metavar="units=UNITS,value=VALUE", - type=utils.str2dict, + type=utils.str2dict_type(optional_keys=['units', 'value']), help=vpn_utils.lifetime_help("IKE")) def args2body(self, parsed_args): diff --git a/neutronclient/neutron/v2_0/vpn/ipsec_site_connection.py b/neutronclient/neutron/v2_0/vpn/ipsec_site_connection.py index bace4d2e0..af596c3f2 100644 --- a/neutronclient/neutron/v2_0/vpn/ipsec_site_connection.py +++ b/neutronclient/neutron/v2_0/vpn/ipsec_site_connection.py @@ -55,7 +55,8 @@ class IPsecSiteConnectionMixin(object): parser.add_argument( '--dpd', metavar="action=ACTION,interval=INTERVAL,timeout=TIMEOUT", - type=utils.str2dict, + type=utils.str2dict_type( + optional_keys=['action', 'interval', 'timeout']), help=vpn_utils.dpd_help("IPsec connection.")) parser.add_argument( '--local-ep-group', diff --git a/neutronclient/neutron/v2_0/vpn/ipsecpolicy.py b/neutronclient/neutron/v2_0/vpn/ipsecpolicy.py index 76f09142e..8f6296485 100644 --- a/neutronclient/neutron/v2_0/vpn/ipsecpolicy.py +++ b/neutronclient/neutron/v2_0/vpn/ipsecpolicy.py @@ -70,7 +70,7 @@ class CreateIPsecPolicy(neutronv20.CreateCommand): parser.add_argument( '--lifetime', metavar="units=UNITS,value=VALUE", - type=utils.str2dict, + type=utils.str2dict_type(optional_keys=['units', 'value']), help=vpn_utils.lifetime_help("IPsec")) parser.add_argument( 'name', metavar='NAME', @@ -99,7 +99,7 @@ class UpdateIPsecPolicy(neutronv20.UpdateCommand): parser.add_argument( '--lifetime', metavar="units=UNITS,value=VALUE", - type=utils.str2dict, + type=utils.str2dict_type(optional_keys=['units', 'value']), help=vpn_utils.lifetime_help("IPsec")) def args2body(self, parsed_args): diff --git a/neutronclient/tests/unit/test_cli20_port.py b/neutronclient/tests/unit/test_cli20_port.py index 4b6d617f7..6ed31342a 100644 --- a/neutronclient/tests/unit/test_cli20_port.py +++ b/neutronclient/tests/unit/test_cli20_port.py @@ -589,13 +589,14 @@ class CLITestV20PortJSON(test_cli20.CLITestV20Base): resource = 'port' cmd = port.UpdatePort(test_cli20.MyApp(sys.stdout), None) myid = 'myid' - net_id = 'net_id' + subnet_id = 'subnet_id' ip_addr = '123.123.123.123' args = [myid, - '--fixed-ip', "network_id=%(net_id)s,ip_address=%(ip_addr)s" % - {'net_id': net_id, + '--fixed-ip', + "subnet_id=%(subnet_id)s,ip_address=%(ip_addr)s" % + {'subnet_id': subnet_id, 'ip_addr': ip_addr}] - updated_fields = {"fixed_ips": [{'network_id': net_id, + updated_fields = {"fixed_ips": [{'subnet_id': subnet_id, 'ip_address': ip_addr}]} self._test_update_resource(resource, cmd, myid, args, updated_fields) diff --git a/neutronclient/tests/unit/test_utils.py b/neutronclient/tests/unit/test_utils.py index 27ebc0f17..44152418d 100644 --- a/neutronclient/tests/unit/test_utils.py +++ b/neutronclient/tests/unit/test_utils.py @@ -49,6 +49,36 @@ class TestUtils(testtools.TestCase): self.assertRaises(argparse.ArgumentTypeError, utils.str2dict, input_str) + def test_str2dict_optional_keys(self): + self.assertDictEqual({'key1': 'value1'}, + utils.str2dict('key1=value1', + optional_keys=['key1', 'key2'])) + self.assertDictEqual({'key1': 'value1', 'key2': 'value2'}, + utils.str2dict('key1=value1,key2=value2', + optional_keys=['key1', 'key2'])) + e = self.assertRaises(argparse.ArgumentTypeError, + utils.str2dict, + 'key1=value1,key2=value2,key3=value3', + optional_keys=['key1', 'key2']) + self.assertEqual("Invalid key(s) 'key3' specified. " + "Valid key(s): 'key1, key2'.", + str(e)) + + def test_str2dict_required_keys(self): + self.assertDictEqual( + {'key1': 'value1', 'key2': 'value2', 'key3': 'value3'}, + utils.str2dict('key1=value1,key2=value2,key3=value3', + required_keys=['key1', 'key2'], + optional_keys=['key3'])) + self.assertDictEqual( + {'key1': 'value1', 'key2': 'value2'}, + utils.str2dict('key1=value1,key2=value2', + required_keys=['key1', 'key2'])) + e = self.assertRaises(argparse.ArgumentTypeError, + utils.str2dict, 'key1=value1', + required_keys=['key1', 'key2']) + self.assertEqual("Required key(s) 'key2' not specified.", str(e)) + def test_get_dict_item_properties(self): item = {'name': 'test_name', 'id': 'test_id'} fields = ('name', 'id') diff --git a/neutronclient/tests/unit/vpn/test_cli20_ikepolicy.py b/neutronclient/tests/unit/vpn/test_cli20_ikepolicy.py index d7d1bbfe9..c6ac8fb78 100644 --- a/neutronclient/tests/unit/vpn/test_cli20_ikepolicy.py +++ b/neutronclient/tests/unit/vpn/test_cli20_ikepolicy.py @@ -16,6 +16,7 @@ import sys +from neutronclient.common import exceptions from neutronclient.neutron.v2_0.vpn import ikepolicy from neutronclient.tests.unit import test_cli20 @@ -101,7 +102,7 @@ class CLITestV20VpnIkePolicyJSON(test_cli20.CLITestV20Base): self._test_create_resource(resource, cmd, name, my_id, args, position_names, position_values) - def _test_lifetime_values(self, lifetime): + def _test_lifetime_values(self, lifetime, expected_exc=None): resource = 'ikepolicy' cmd = ikepolicy.CreateIKEPolicy(test_cli20.MyApp(sys.stdout), None) name = 'ikepolicy1' @@ -134,16 +135,17 @@ class CLITestV20VpnIkePolicyJSON(test_cli20.CLITestV20Base): auth_algorithm, encryption_algorithm, phase1_negotiation_mode, ike_version, pfs, tenant_id] - try: - self._test_create_resource(resource, cmd, name, my_id, args, - position_names, position_values) - except Exception: - return - self.fail("IKEPolicy Lifetime Error") + if not expected_exc: + expected_exc = exceptions.CommandError + self.assertRaises( + expected_exc, + self._test_create_resource, + resource, cmd, name, my_id, args, + position_names, position_values) def test_create_ikepolicy_with_invalid_lifetime_keys(self): lifetime = 'uts=seconds,val=20000' - self._test_lifetime_values(lifetime) + self._test_lifetime_values(lifetime, expected_exc=SystemExit) def test_create_ikepolicy_with_invalid_lifetime_value(self): lifetime = 'units=seconds,value=-1' diff --git a/neutronclient/tests/unit/vpn/test_cli20_ipsec_site_connection.py b/neutronclient/tests/unit/vpn/test_cli20_ipsec_site_connection.py index 85a28b60d..8ead88f2a 100644 --- a/neutronclient/tests/unit/vpn/test_cli20_ipsec_site_connection.py +++ b/neutronclient/tests/unit/vpn/test_cli20_ipsec_site_connection.py @@ -180,7 +180,7 @@ class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base): self._test_create_resource(resource, cmd, None, my_id, args, position_names, position_values) - def _test_create_failure(self, additional_args=None): + def _test_create_failure(self, additional_args=None, expected_exc=None): # Helper to test failure of IPSec site-to-site creation failure. resource = 'ipsec_site_connection' cmd = ipsec_site_connection.CreateIPsecSiteConnection( @@ -215,7 +215,9 @@ class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base): position_values = [tenant_id, admin_state, peer_address, peer_id, psk, mtu, initiator, None, None, vpnservice_id, ikepolicy_id, ipsecpolicy_id] - self.assertRaises(exceptions.CommandError, + if not expected_exc: + expected_exc = exceptions.CommandError + self.assertRaises(expected_exc, self._test_create_resource, resource, cmd, None, my_id, args, position_names, position_values) @@ -227,7 +229,7 @@ class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base): def test_fail_create_with_invalid_dpd_keys(self): bad_dpd_key = ['--dpd', 'act=restart,interval=30,time=120'] - self._test_create_failure(bad_dpd_key) + self._test_create_failure(bad_dpd_key, SystemExit) def test_fail_create_with_invalid_dpd_values(self): bad_dpd_values = ['--dpd', 'action=hold,interval=30,timeout=-1'] diff --git a/neutronclient/tests/unit/vpn/test_cli20_ipsecpolicy.py b/neutronclient/tests/unit/vpn/test_cli20_ipsecpolicy.py index 5df90ba4e..c6b23e19c 100644 --- a/neutronclient/tests/unit/vpn/test_cli20_ipsecpolicy.py +++ b/neutronclient/tests/unit/vpn/test_cli20_ipsecpolicy.py @@ -16,6 +16,7 @@ import sys +from neutronclient.common import exceptions from neutronclient.neutron.v2_0.vpn import ipsecpolicy from neutronclient.tests.unit import test_cli20 @@ -98,7 +99,7 @@ class CLITestV20VpnIpsecPolicyJSON(test_cli20.CLITestV20Base): self._test_create_resource(resource, cmd, name, my_id, args, position_names, position_values) - def _test_lifetime_values(self, lifetime): + def _test_lifetime_values(self, lifetime, expected_exc=None): resource = 'ipsecpolicy' cmd = ipsecpolicy.CreateIPsecPolicy(test_cli20.MyApp(sys.stdout), None) name = 'ipsecpolicy1' @@ -131,19 +132,24 @@ class CLITestV20VpnIpsecPolicyJSON(test_cli20.CLITestV20Base): auth_algorithm, encryption_algorithm, phase1_negotiation_mode, ike_version, pfs, tenant_id] - try: - self._test_create_resource(resource, cmd, name, my_id, args, - position_names, position_values) - except Exception: - return - self.fail("IPsecPolicy Lifetime Error") + if not expected_exc: + expected_exc = exceptions.CommandError + self.assertRaises( + expected_exc, + self._test_create_resource, + resource, cmd, name, my_id, args, + position_names, position_values) def test_create_ipsecpolicy_with_invalid_lifetime_keys(self): lifetime = 'uts=seconds,val=20000' + self._test_lifetime_values(lifetime, SystemExit) + + def test_create_ipsecpolicy_with_invalid_lifetime_units(self): + lifetime = 'units=minutes,value=600' self._test_lifetime_values(lifetime) - def test_create_ipsecpolicy_with_invalide_lifetime_values(self): - lifetime = 'units=minutes,value=0' + def test_create_ipsecpolicy_with_invalid_lifetime_value(self): + lifetime = 'units=seconds,value=0' self._test_lifetime_values(lifetime) def test_list_ipsecpolicy(self):