diff --git a/neutronclient/common/utils.py b/neutronclient/common/utils.py index 4f19dac40..47610e60a 100644 --- a/neutronclient/common/utils.py +++ b/neutronclient/common/utils.py @@ -18,6 +18,7 @@ """Utilities and helper functions.""" import argparse +import functools import logging import netaddr import os @@ -108,10 +109,21 @@ def str2bool(strbool): return strbool.lower() == 'true' -def str2dict(strdict): +def str2dict(strdict, required_keys=None, optional_keys=None): """Convert key1=value1,key2=value2,... string into dictionary. - :param strdict: key1=value1,key2=value2 + :param strdict: string in the form of key1=value1,key2=value2 + :param required_keys: list of required keys. All keys in this list must be + specified. Otherwise ArgumentTypeError will be raised. + If this parameter is unspecified, no required key check + will be done. + :param optional_keys: list of optional keys. + This parameter is used for valid key check. + When at least one of required_keys and optional_keys, + a key must be a member of either of required_keys or + optional_keys. Otherwise, ArgumentTypeError will be + raised. When both required_keys and optional_keys are + unspecified, no valid key check will be done. """ result = {} if strdict: @@ -121,9 +133,29 @@ def str2dict(strdict): msg = _("invalid key-value '%s', expected format: key=value") raise argparse.ArgumentTypeError(msg % kv) result[key] = value + valid_keys = set(required_keys or []) | set(optional_keys or []) + if valid_keys: + invalid_keys = [k for k in result if k not in valid_keys] + if invalid_keys: + msg = _("Invalid key(s) '%(invalid_keys)s' specified. " + "Valid key(s): '%(valid_keys)s'.") + raise argparse.ArgumentTypeError( + msg % {'invalid_keys': ', '.join(sorted(invalid_keys)), + 'valid_keys': ', '.join(sorted(valid_keys))}) + if required_keys: + not_found_keys = [k for k in required_keys if k not in result] + if not_found_keys: + msg = _("Required key(s) '%s' not specified.") + raise argparse.ArgumentTypeError(msg % ', '.join(not_found_keys)) return result +def str2dict_type(optional_keys=None, required_keys=None): + return functools.partial(str2dict, + optional_keys=optional_keys, + required_keys=required_keys) + + def http_log_req(_logger, args, kwargs): if not _logger.isEnabledFor(logging.DEBUG): return diff --git a/neutronclient/neutron/v2_0/lb/v2/pool.py b/neutronclient/neutron/v2_0/lb/v2/pool.py index 3c1ea99b8..2745091e4 100644 --- a/neutronclient/neutron/v2_0/lb/v2/pool.py +++ b/neutronclient/neutron/v2_0/lb/v2/pool.py @@ -64,6 +64,8 @@ class CreatePool(neutronV20.CreateCommand): parser.add_argument( '--session-persistence', metavar='type=TYPE[,cookie_name=COOKIE_NAME]', + type=utils.str2dict_type(required_keys=['type'], + optional_keys=['cookie_name']), help=_('The type of session persistence to use and associated ' 'cookie name')) parser.add_argument( @@ -86,9 +88,6 @@ class CreatePool(neutronV20.CreateCommand): help=_('Protocol for balancing.')) def args2body(self, parsed_args): - if parsed_args.session_persistence: - parsed_args.session_persistence = utils.str2dict( - parsed_args.session_persistence) _listener_id = neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'listener', parsed_args.listener) body = {'admin_state_up': parsed_args.admin_state, diff --git a/neutronclient/neutron/v2_0/port.py b/neutronclient/neutron/v2_0/port.py index 29426ee58..f4917292d 100644 --- a/neutronclient/neutron/v2_0/port.py +++ b/neutronclient/neutron/v2_0/port.py @@ -47,6 +47,7 @@ def _add_updatable_args(parser): parser.add_argument( '--fixed-ip', metavar='subnet_id=SUBNET,ip_address=IP_ADDR', action='append', + type=utils.str2dict_type(optional_keys=['subnet_id', 'ip_address']), help=_('Desired IP and/or subnet for this port: ' 'subnet_id=,ip_address=. ' 'You can repeat this option.')) @@ -74,13 +75,12 @@ def _updatable_args2body(parsed_args, body, client): ips = [] if parsed_args.fixed_ip: for ip_spec in parsed_args.fixed_ip: - ip_dict = utils.str2dict(ip_spec) - if 'subnet_id' in ip_dict: - subnet_name_id = ip_dict['subnet_id'] + if 'subnet_id' in ip_spec: + subnet_name_id = ip_spec['subnet_id'] _subnet_id = neutronV20.find_resourceid_by_name_or_id( client, 'subnet', subnet_name_id) - ip_dict['subnet_id'] = _subnet_id - ips.append(ip_dict) + ip_spec['subnet_id'] = _subnet_id + ips.append(ip_spec) if ips: body['fixed_ips'] = ips @@ -159,6 +159,9 @@ class UpdateExtraDhcpOptMixin(object): default=[], action='append', dest='extra_dhcp_opts', + type=utils.str2dict_type( + required_keys=['opt_name'], + optional_keys=['opt_value', 'ip_version']), help=_('Extra dhcp options to be assigned to this port: ' 'opt_name=,opt_value=,' 'ip_version={4,6}. You can repeat this option.')) @@ -175,7 +178,7 @@ class UpdateExtraDhcpOptMixin(object): "ip_version={4,6}. " "You can repeat this option.") for opt in parsed_args.extra_dhcp_opts: - opt_ele.update(utils.str2dict(opt)) + opt_ele.update(opt) if ('opt_name' in opt_ele and ('opt_value' in opt_ele or 'ip_version' in opt_ele)): if opt_ele.get('opt_value') == 'null': @@ -200,7 +203,9 @@ class UpdatePortAllowedAddressPair(object): default=[], action='append', dest='allowed_address_pairs', - type=utils.str2dict, + type=utils.str2dict_type( + required_keys=['ip_address'], + optional_keys=['mac_address']), help=_('Allowed address pair associated with the port.' 'You can repeat this option.')) group_aap.add_argument( diff --git a/neutronclient/neutron/v2_0/router.py b/neutronclient/neutron/v2_0/router.py index 88da856b3..c372f98ae 100644 --- a/neutronclient/neutron/v2_0/router.py +++ b/neutronclient/neutron/v2_0/router.py @@ -114,7 +114,8 @@ class UpdateRouter(neutronV20.UpdateCommand): routes_group = parser.add_mutually_exclusive_group() routes_group.add_argument( '--route', metavar='destination=CIDR,nexthop=IP_ADDR', - action='append', dest='routes', type=utils.str2dict, + action='append', dest='routes', + type=utils.str2dict_type(required_keys=['destination', 'nexthop']), help=_('Route to associate with the router.' ' You can repeat this option.')) routes_group.add_argument( @@ -226,6 +227,8 @@ class SetGatewayRouter(neutronV20.NeutronCommand): parser.add_argument( '--fixed-ip', metavar='subnet_id=SUBNET,ip_address=IP_ADDR', action='append', + type=utils.str2dict_type(optional_keys=['subnet_id', + 'ip_address']), help=_('Desired IP and/or subnet on external network: ' 'subnet_id=,ip_address=. ' 'You can specify both of subnet_id and ip_address or ' @@ -246,13 +249,12 @@ class SetGatewayRouter(neutronV20.NeutronCommand): if parsed_args.fixed_ip: ips = [] for ip_spec in parsed_args.fixed_ip: - ip_dict = utils.str2dict(ip_spec) - subnet_name_id = ip_dict.get('subnet_id') + subnet_name_id = ip_spec.get('subnet_id') if subnet_name_id: subnet_id = neutronV20.find_resourceid_by_name_or_id( neutron_client, 'subnet', subnet_name_id) - ip_dict['subnet_id'] = subnet_id - ips.append(ip_dict) + ip_spec['subnet_id'] = subnet_id + ips.append(ip_spec) router_dict['external_fixed_ips'] = ips neutron_client.add_gateway_router(_router_id, router_dict) print(_('Set gateway for router %s') % parsed_args.router, diff --git a/neutronclient/neutron/v2_0/subnet.py b/neutronclient/neutron/v2_0/subnet.py index d8b594663..3abe313f2 100644 --- a/neutronclient/neutron/v2_0/subnet.py +++ b/neutronclient/neutron/v2_0/subnet.py @@ -62,16 +62,19 @@ def add_updatable_arguments(parser): help=_('No distribution of gateway.')) parser.add_argument( '--allocation-pool', metavar='start=IP_ADDR,end=IP_ADDR', - action='append', dest='allocation_pools', type=utils.str2dict, + action='append', dest='allocation_pools', + type=utils.str2dict_type(required_keys=['start', 'end']), help=_('Allocation pool IP addresses for this subnet ' '(This option can be repeated).')) parser.add_argument( '--allocation_pool', - action='append', dest='allocation_pools', type=utils.str2dict, + action='append', dest='allocation_pools', + type=utils.str2dict_type(required_keys=['start', 'end']), help=argparse.SUPPRESS) parser.add_argument( '--host-route', metavar='destination=CIDR,nexthop=IP_ADDR', - action='append', dest='host_routes', type=utils.str2dict, + action='append', dest='host_routes', + type=utils.str2dict_type(required_keys=['destination', 'nexthop']), help=_('Additional route (This option can be repeated).')) parser.add_argument( '--dns-nameserver', metavar='DNS_NAMESERVER', diff --git a/neutronclient/neutron/v2_0/vpn/ikepolicy.py b/neutronclient/neutron/v2_0/vpn/ikepolicy.py index 509083163..fe41655ee 100644 --- a/neutronclient/neutron/v2_0/vpn/ikepolicy.py +++ b/neutronclient/neutron/v2_0/vpn/ikepolicy.py @@ -71,7 +71,7 @@ class CreateIKEPolicy(neutronv20.CreateCommand): parser.add_argument( '--lifetime', metavar="units=UNITS,value=VALUE", - type=utils.str2dict, + type=utils.str2dict_type(optional_keys=['units', 'value']), help=vpn_utils.lifetime_help("IKE")) parser.add_argument( 'name', metavar='NAME', @@ -100,7 +100,7 @@ class UpdateIKEPolicy(neutronv20.UpdateCommand): parser.add_argument( '--lifetime', metavar="units=UNITS,value=VALUE", - type=utils.str2dict, + type=utils.str2dict_type(optional_keys=['units', 'value']), help=vpn_utils.lifetime_help("IKE")) def args2body(self, parsed_args): diff --git a/neutronclient/neutron/v2_0/vpn/ipsec_site_connection.py b/neutronclient/neutron/v2_0/vpn/ipsec_site_connection.py index bace4d2e0..af596c3f2 100644 --- a/neutronclient/neutron/v2_0/vpn/ipsec_site_connection.py +++ b/neutronclient/neutron/v2_0/vpn/ipsec_site_connection.py @@ -55,7 +55,8 @@ class IPsecSiteConnectionMixin(object): parser.add_argument( '--dpd', metavar="action=ACTION,interval=INTERVAL,timeout=TIMEOUT", - type=utils.str2dict, + type=utils.str2dict_type( + optional_keys=['action', 'interval', 'timeout']), help=vpn_utils.dpd_help("IPsec connection.")) parser.add_argument( '--local-ep-group', diff --git a/neutronclient/neutron/v2_0/vpn/ipsecpolicy.py b/neutronclient/neutron/v2_0/vpn/ipsecpolicy.py index 76f09142e..8f6296485 100644 --- a/neutronclient/neutron/v2_0/vpn/ipsecpolicy.py +++ b/neutronclient/neutron/v2_0/vpn/ipsecpolicy.py @@ -70,7 +70,7 @@ class CreateIPsecPolicy(neutronv20.CreateCommand): parser.add_argument( '--lifetime', metavar="units=UNITS,value=VALUE", - type=utils.str2dict, + type=utils.str2dict_type(optional_keys=['units', 'value']), help=vpn_utils.lifetime_help("IPsec")) parser.add_argument( 'name', metavar='NAME', @@ -99,7 +99,7 @@ class UpdateIPsecPolicy(neutronv20.UpdateCommand): parser.add_argument( '--lifetime', metavar="units=UNITS,value=VALUE", - type=utils.str2dict, + type=utils.str2dict_type(optional_keys=['units', 'value']), help=vpn_utils.lifetime_help("IPsec")) def args2body(self, parsed_args): diff --git a/neutronclient/tests/unit/test_cli20_port.py b/neutronclient/tests/unit/test_cli20_port.py index 8f502ebae..dd2ce6b63 100644 --- a/neutronclient/tests/unit/test_cli20_port.py +++ b/neutronclient/tests/unit/test_cli20_port.py @@ -603,13 +603,14 @@ class CLITestV20PortJSON(test_cli20.CLITestV20Base): resource = 'port' cmd = port.UpdatePort(test_cli20.MyApp(sys.stdout), None) myid = 'myid' - net_id = 'net_id' + subnet_id = 'subnet_id' ip_addr = '123.123.123.123' args = [myid, - '--fixed-ip', "network_id=%(net_id)s,ip_address=%(ip_addr)s" % - {'net_id': net_id, + '--fixed-ip', + "subnet_id=%(subnet_id)s,ip_address=%(ip_addr)s" % + {'subnet_id': subnet_id, 'ip_addr': ip_addr}] - updated_fields = {"fixed_ips": [{'network_id': net_id, + updated_fields = {"fixed_ips": [{'subnet_id': subnet_id, 'ip_address': ip_addr}]} self._test_update_resource(resource, cmd, myid, args, updated_fields) diff --git a/neutronclient/tests/unit/test_utils.py b/neutronclient/tests/unit/test_utils.py index 27ebc0f17..44152418d 100644 --- a/neutronclient/tests/unit/test_utils.py +++ b/neutronclient/tests/unit/test_utils.py @@ -49,6 +49,36 @@ class TestUtils(testtools.TestCase): self.assertRaises(argparse.ArgumentTypeError, utils.str2dict, input_str) + def test_str2dict_optional_keys(self): + self.assertDictEqual({'key1': 'value1'}, + utils.str2dict('key1=value1', + optional_keys=['key1', 'key2'])) + self.assertDictEqual({'key1': 'value1', 'key2': 'value2'}, + utils.str2dict('key1=value1,key2=value2', + optional_keys=['key1', 'key2'])) + e = self.assertRaises(argparse.ArgumentTypeError, + utils.str2dict, + 'key1=value1,key2=value2,key3=value3', + optional_keys=['key1', 'key2']) + self.assertEqual("Invalid key(s) 'key3' specified. " + "Valid key(s): 'key1, key2'.", + str(e)) + + def test_str2dict_required_keys(self): + self.assertDictEqual( + {'key1': 'value1', 'key2': 'value2', 'key3': 'value3'}, + utils.str2dict('key1=value1,key2=value2,key3=value3', + required_keys=['key1', 'key2'], + optional_keys=['key3'])) + self.assertDictEqual( + {'key1': 'value1', 'key2': 'value2'}, + utils.str2dict('key1=value1,key2=value2', + required_keys=['key1', 'key2'])) + e = self.assertRaises(argparse.ArgumentTypeError, + utils.str2dict, 'key1=value1', + required_keys=['key1', 'key2']) + self.assertEqual("Required key(s) 'key2' not specified.", str(e)) + def test_get_dict_item_properties(self): item = {'name': 'test_name', 'id': 'test_id'} fields = ('name', 'id') diff --git a/neutronclient/tests/unit/vpn/test_cli20_ikepolicy.py b/neutronclient/tests/unit/vpn/test_cli20_ikepolicy.py index d7d1bbfe9..c6ac8fb78 100644 --- a/neutronclient/tests/unit/vpn/test_cli20_ikepolicy.py +++ b/neutronclient/tests/unit/vpn/test_cli20_ikepolicy.py @@ -16,6 +16,7 @@ import sys +from neutronclient.common import exceptions from neutronclient.neutron.v2_0.vpn import ikepolicy from neutronclient.tests.unit import test_cli20 @@ -101,7 +102,7 @@ class CLITestV20VpnIkePolicyJSON(test_cli20.CLITestV20Base): self._test_create_resource(resource, cmd, name, my_id, args, position_names, position_values) - def _test_lifetime_values(self, lifetime): + def _test_lifetime_values(self, lifetime, expected_exc=None): resource = 'ikepolicy' cmd = ikepolicy.CreateIKEPolicy(test_cli20.MyApp(sys.stdout), None) name = 'ikepolicy1' @@ -134,16 +135,17 @@ class CLITestV20VpnIkePolicyJSON(test_cli20.CLITestV20Base): auth_algorithm, encryption_algorithm, phase1_negotiation_mode, ike_version, pfs, tenant_id] - try: - self._test_create_resource(resource, cmd, name, my_id, args, - position_names, position_values) - except Exception: - return - self.fail("IKEPolicy Lifetime Error") + if not expected_exc: + expected_exc = exceptions.CommandError + self.assertRaises( + expected_exc, + self._test_create_resource, + resource, cmd, name, my_id, args, + position_names, position_values) def test_create_ikepolicy_with_invalid_lifetime_keys(self): lifetime = 'uts=seconds,val=20000' - self._test_lifetime_values(lifetime) + self._test_lifetime_values(lifetime, expected_exc=SystemExit) def test_create_ikepolicy_with_invalid_lifetime_value(self): lifetime = 'units=seconds,value=-1' diff --git a/neutronclient/tests/unit/vpn/test_cli20_ipsec_site_connection.py b/neutronclient/tests/unit/vpn/test_cli20_ipsec_site_connection.py index 85a28b60d..8ead88f2a 100644 --- a/neutronclient/tests/unit/vpn/test_cli20_ipsec_site_connection.py +++ b/neutronclient/tests/unit/vpn/test_cli20_ipsec_site_connection.py @@ -180,7 +180,7 @@ class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base): self._test_create_resource(resource, cmd, None, my_id, args, position_names, position_values) - def _test_create_failure(self, additional_args=None): + def _test_create_failure(self, additional_args=None, expected_exc=None): # Helper to test failure of IPSec site-to-site creation failure. resource = 'ipsec_site_connection' cmd = ipsec_site_connection.CreateIPsecSiteConnection( @@ -215,7 +215,9 @@ class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base): position_values = [tenant_id, admin_state, peer_address, peer_id, psk, mtu, initiator, None, None, vpnservice_id, ikepolicy_id, ipsecpolicy_id] - self.assertRaises(exceptions.CommandError, + if not expected_exc: + expected_exc = exceptions.CommandError + self.assertRaises(expected_exc, self._test_create_resource, resource, cmd, None, my_id, args, position_names, position_values) @@ -227,7 +229,7 @@ class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base): def test_fail_create_with_invalid_dpd_keys(self): bad_dpd_key = ['--dpd', 'act=restart,interval=30,time=120'] - self._test_create_failure(bad_dpd_key) + self._test_create_failure(bad_dpd_key, SystemExit) def test_fail_create_with_invalid_dpd_values(self): bad_dpd_values = ['--dpd', 'action=hold,interval=30,timeout=-1'] diff --git a/neutronclient/tests/unit/vpn/test_cli20_ipsecpolicy.py b/neutronclient/tests/unit/vpn/test_cli20_ipsecpolicy.py index 5df90ba4e..c6b23e19c 100644 --- a/neutronclient/tests/unit/vpn/test_cli20_ipsecpolicy.py +++ b/neutronclient/tests/unit/vpn/test_cli20_ipsecpolicy.py @@ -16,6 +16,7 @@ import sys +from neutronclient.common import exceptions from neutronclient.neutron.v2_0.vpn import ipsecpolicy from neutronclient.tests.unit import test_cli20 @@ -98,7 +99,7 @@ class CLITestV20VpnIpsecPolicyJSON(test_cli20.CLITestV20Base): self._test_create_resource(resource, cmd, name, my_id, args, position_names, position_values) - def _test_lifetime_values(self, lifetime): + def _test_lifetime_values(self, lifetime, expected_exc=None): resource = 'ipsecpolicy' cmd = ipsecpolicy.CreateIPsecPolicy(test_cli20.MyApp(sys.stdout), None) name = 'ipsecpolicy1' @@ -131,19 +132,24 @@ class CLITestV20VpnIpsecPolicyJSON(test_cli20.CLITestV20Base): auth_algorithm, encryption_algorithm, phase1_negotiation_mode, ike_version, pfs, tenant_id] - try: - self._test_create_resource(resource, cmd, name, my_id, args, - position_names, position_values) - except Exception: - return - self.fail("IPsecPolicy Lifetime Error") + if not expected_exc: + expected_exc = exceptions.CommandError + self.assertRaises( + expected_exc, + self._test_create_resource, + resource, cmd, name, my_id, args, + position_names, position_values) def test_create_ipsecpolicy_with_invalid_lifetime_keys(self): lifetime = 'uts=seconds,val=20000' + self._test_lifetime_values(lifetime, SystemExit) + + def test_create_ipsecpolicy_with_invalid_lifetime_units(self): + lifetime = 'units=minutes,value=600' self._test_lifetime_values(lifetime) - def test_create_ipsecpolicy_with_invalide_lifetime_values(self): - lifetime = 'units=minutes,value=0' + def test_create_ipsecpolicy_with_invalid_lifetime_value(self): + lifetime = 'units=seconds,value=0' self._test_lifetime_values(lifetime) def test_list_ipsecpolicy(self):