diff --git a/neutronclient/common/serializer.py b/neutronclient/common/serializer.py index b8f1c19..9b1b727 100644 --- a/neutronclient/common/serializer.py +++ b/neutronclient/common/serializer.py @@ -168,7 +168,7 @@ class XMLDictSerializer(DictSerializer): constants.TYPE_DICT) return result attrs = metadata.get('attributes', {}).get(nodename, {}) - for k, v in data.items(): + for k, v in sorted(data.items()): if k in attrs: result.set(k, str(v)) else: diff --git a/neutronclient/neutron/v2_0/vpn/__init__.py b/neutronclient/neutron/v2_0/vpn/__init__.py new file mode 100644 index 0000000..7f1c0e8 --- /dev/null +++ b/neutronclient/neutron/v2_0/vpn/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# (c) Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Swaminathan Vasudevan, Hewlett-Packard. diff --git a/neutronclient/neutron/v2_0/vpn/ikepolicy.py b/neutronclient/neutron/v2_0/vpn/ikepolicy.py new file mode 100644 index 0000000..8ff1daf --- /dev/null +++ b/neutronclient/neutron/v2_0/vpn/ikepolicy.py @@ -0,0 +1,138 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# (c) Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Swaminathan Vasudevan, Hewlett-Packard. +# + +import logging + +from neutronclient.common import utils +from neutronclient.neutron import v2_0 as neutronv20 +from neutronclient.neutron.v2_0.vpn import utils as vpn_utils + + +class ListIKEPolicy(neutronv20.ListCommand): + """List IKEPolicies that belong to a tenant.""" + + resource = 'ikepolicy' + log = logging.getLogger(__name__ + '.ListIKEPolicy') + list_columns = ['id', 'name', 'auth_algorithm', + 'encryption_algorithm', 'ike_version', 'pfs'] + _formatters = {} + pagination_support = True + sorting_support = True + + +class ShowIKEPolicy(neutronv20.ShowCommand): + """Show information of a given IKEPolicy.""" + + resource = 'ikepolicy' + log = logging.getLogger(__name__ + '.ShowIKEPolicy') + + +class CreateIKEPolicy(neutronv20.CreateCommand): + """Create an IKEPolicy.""" + + resource = 'ikepolicy' + log = logging.getLogger(__name__ + '.CreateIKEPolicy') + + def add_known_arguments(self, parser): + parser.add_argument( + '--description', + help='Description of the IKE policy') + parser.add_argument( + '--auth-algorithm', + default='sha1', choices=['sha1'], + help='Authentication algorithm in lowercase. ' + 'default:sha1') + parser.add_argument( + '--encryption-algorithm', + default='aes-128', choices=['3des', + 'aes-128', + 'aes-192', + 'aes-256'], + help='Encryption Algorithm in lowercase, default:aes-128') + parser.add_argument( + '--phase1-negotiation-mode', + default='main', choices=['main'], + help='IKE Phase1 negotiation mode in lowercase, default:main') + parser.add_argument( + '--ike-version', + default='v1', choices=['v1', 'v2'], + help='IKE version in lowercase, default:v1') + parser.add_argument( + '--pfs', + default='group5', choices=['group2', 'group5', 'group14'], + help='Perfect Forward Secrecy in lowercase, default:group5') + parser.add_argument( + '--lifetime', + metavar="units=UNITS,value=VALUE", + type=utils.str2dict, + help=vpn_utils.lifetime_help("IKE")) + parser.add_argument( + 'name', metavar='NAME', + help='Name of the IKE Policy') + + def args2body(self, parsed_args): + + body = {'ikepolicy': { + 'auth_algorithm': parsed_args.auth_algorithm, + 'encryption_algorithm': parsed_args.encryption_algorithm, + 'phase1_negotiation_mode': parsed_args.phase1_negotiation_mode, + 'ike_version': parsed_args.ike_version, + 'pfs': parsed_args.pfs, + }, } + if parsed_args.name: + body['ikepolicy'].update({'name': parsed_args.name}) + if parsed_args.description: + body['ikepolicy'].update({'description': parsed_args.description}) + if parsed_args.tenant_id: + body['ikepolicy'].update({'tenant_id': parsed_args.tenant_id}) + if parsed_args.lifetime: + vpn_utils.validate_lifetime_dict(parsed_args.lifetime) + body['ikepolicy'].update({'lifetime': parsed_args.lifetime}) + return body + + +class UpdateIKEPolicy(neutronv20.UpdateCommand): + """Update a given IKE Policy.""" + + resource = 'ikepolicy' + log = logging.getLogger(__name__ + '.UpdateIKEPolicy') + + def add_known_arguments(self, parser): + parser.add_argument( + '--lifetime', + metavar="units=UNITS,value=VALUE", + type=utils.str2dict, + help=vpn_utils.lifetime_help("IKE")) + + def args2body(self, parsed_args): + + body = {'ikepolicy': { + }, } + if parsed_args.lifetime: + vpn_utils.validate_lifetime_dict(parsed_args.lifetime) + body['ikepolicy'].update({'lifetime': parsed_args.lifetime}) + return body + + +class DeleteIKEPolicy(neutronv20.DeleteCommand): + """Delete a given IKE Policy.""" + + resource = 'ikepolicy' + log = logging.getLogger(__name__ + '.DeleteIKEPolicy') diff --git a/neutronclient/neutron/v2_0/vpn/ipsec_site_connection.py b/neutronclient/neutron/v2_0/vpn/ipsec_site_connection.py new file mode 100644 index 0000000..547553e --- /dev/null +++ b/neutronclient/neutron/v2_0/vpn/ipsec_site_connection.py @@ -0,0 +1,193 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# (c) Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Swaminathan Vasudevan, Hewlett-Packard. +# + +import logging + +from neutronclient.common import utils +from neutronclient.neutron import v2_0 as neutronv20 +from neutronclient.neutron.v2_0.vpn import utils as vpn_utils +from neutronclient.openstack.common.gettextutils import _ + + +def _format_peer_cidrs(ipsec_site_connection): + try: + return '\n'.join([utils.dumps(cidrs) for cidrs in + ipsec_site_connection['peer_cidrs']]) + except Exception: + return '' + + +class ListIPsecSiteConnection(neutronv20.ListCommand): + """List IPsecSiteConnections that belong to a given tenant.""" + + resource = 'ipsec_site_connection' + log = logging.getLogger(__name__ + '.ListIPsecSiteConnection') + _formatters = {'peer_cidrs': _format_peer_cidrs} + list_columns = [ + 'id', 'name', 'peer_address', 'peer_cidrs', 'route_mode', + 'auth_mode', 'status'] + pagination_support = True + sorting_support = True + + +class ShowIPsecSiteConnection(neutronv20.ShowCommand): + """Show information of a given IPsecSiteConnection.""" + + resource = 'ipsec_site_connection' + log = logging.getLogger(__name__ + '.ShowIPsecSiteConnection') + + +class CreateIPsecSiteConnection(neutronv20.CreateCommand): + """Create an IPsecSiteConnection.""" + resource = 'ipsec_site_connection' + log = logging.getLogger(__name__ + '.CreateIPsecSiteConnection') + + def add_known_arguments(self, parser): + parser.add_argument( + '--admin-state-down', + default=True, action='store_false', + help='Set admin state up to false') + parser.add_argument( + '--name', + help='Set friendly name for the connection') + parser.add_argument( + '--description', + help='Set a description for the connection') + parser.add_argument( + '--mtu', + default='1500', + help='MTU size for the connection, default:1500') + parser.add_argument( + '--initiator', + default='bi-directional', choices=['bi-directional', + 'response-only'], + help='Initiator state in lowercase, default:bi-directional') + parser.add_argument( + '--dpd', + metavar="action=ACTION,interval=INTERVAL,timeout=TIMEOUT", + type=utils.str2dict, + help=vpn_utils.dpd_help("IPsec Connection")) + parser.add_argument( + '--vpnservice-id', metavar='VPNSERVICE', + required=True, + help='VPNService instance id associated with this connection') + parser.add_argument( + '--ikepolicy-id', metavar='IKEPOLICY', + required=True, + help='IKEPolicy id associated with this connection') + parser.add_argument( + '--ipsecpolicy-id', metavar='IPSECPOLICY', + required=True, + help='IPsecPolicy id associated with this connection') + parser.add_argument( + '--peer-address', + required=True, + help='Remote branch router public ' + 'IPv4 address or IPv6 address or FQDN.') + parser.add_argument( + '--peer-id', + required=True, + help='Remote branch router identity') + parser.add_argument( + '--peer-cidr', + action='append', dest='peer_cidrs', + required=True, + help='Remote subnet(s) in CIDR format') + parser.add_argument( + '--psk', + required=True, + help='Pre-Shared Key string') + + def args2body(self, parsed_args): + _vpnservice_id = neutronv20.find_resourceid_by_name_or_id( + self.get_client(), 'vpnservice', + parsed_args.vpnservice_id) + _ikepolicy_id = neutronv20.find_resourceid_by_name_or_id( + self.get_client(), 'ikepolicy', + parsed_args.ikepolicy_id) + _ipsecpolicy_id = neutronv20.find_resourceid_by_name_or_id( + self.get_client(), 'ipsecpolicy', + parsed_args.ipsecpolicy_id) + if int(parsed_args.mtu) < 68: + message = _("Invalid MTU value: MTU must be " + "greater than or equal to 68") + raise ValueError(message) + body = {'ipsec_site_connection': { + 'vpnservice_id': _vpnservice_id, + 'ikepolicy_id': _ikepolicy_id, + 'ipsecpolicy_id': _ipsecpolicy_id, + 'peer_address': parsed_args.peer_address, + 'peer_id': parsed_args.peer_id, + 'mtu': parsed_args.mtu, + 'initiator': parsed_args.initiator, + 'psk': parsed_args.psk, + 'admin_state_up': parsed_args.admin_state_down, + }, } + if parsed_args.name: + body['ipsec_site_connection'].update( + {'name': parsed_args.name} + ) + if parsed_args.description: + body['ipsec_site_connection'].update( + {'description': parsed_args.description} + ) + if parsed_args.tenant_id: + body['ipsec_site_connection'].update( + {'tenant_id': parsed_args.tenant_id} + ) + if parsed_args.dpd: + vpn_utils.validate_dpd_dict(parsed_args.dpd) + body['ipsec_site_connection'].update({'dpd': parsed_args.dpd}) + if parsed_args.peer_cidrs: + body['ipsec_site_connection'][ + 'peer_cidrs'] = parsed_args.peer_cidrs + + return body + + +class UpdateIPsecSiteConnection(neutronv20.UpdateCommand): + """Update a given IPsecSiteConnection.""" + + resource = 'ipsec_site_connection' + log = logging.getLogger(__name__ + '.UpdateIPsecSiteConnection') + + def add_known_arguments(self, parser): + + parser.add_argument( + '--dpd', + metavar="action=ACTION,interval=INTERVAL,timeout=TIMEOUT", + type=utils.str2dict, + help=vpn_utils.dpd_help("IPsec Connection")) + + def args2body(self, parsed_args): + body = {'ipsec_site_connection': { + }, } + + if parsed_args.dpd: + vpn_utils.validate_dpd_dict(parsed_args.dpd) + body['ipsec_site_connection'].update({'dpd': parsed_args.dpd}) + return body + + +class DeleteIPsecSiteConnection(neutronv20.DeleteCommand): + """Delete a given IPsecSiteConnection.""" + + resource = 'ipsec_site_connection' + log = logging.getLogger(__name__ + '.DeleteIPsecSiteConnection') diff --git a/neutronclient/neutron/v2_0/vpn/ipsecpolicy.py b/neutronclient/neutron/v2_0/vpn/ipsecpolicy.py new file mode 100644 index 0000000..2aeb18e --- /dev/null +++ b/neutronclient/neutron/v2_0/vpn/ipsecpolicy.py @@ -0,0 +1,138 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# (c) Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Swaminathan Vasudevan, Hewlett-Packard. + +import logging + +from neutronclient.common import utils +from neutronclient.neutron import v2_0 as neutronv20 +from neutronclient.neutron.v2_0.vpn import utils as vpn_utils + + +class ListIPsecPolicy(neutronv20.ListCommand): + """List ipsecpolicies that belongs to a given tenant connection.""" + + resource = 'ipsecpolicy' + log = logging.getLogger(__name__ + '.ListIPsecPolicy') + list_columns = ['id', 'name', 'auth_algorithm', + 'encryption_algorithm', 'pfs'] + _formatters = {} + pagination_support = True + sorting_support = True + + +class ShowIPsecPolicy(neutronv20.ShowCommand): + """Show information of a given ipsecpolicy.""" + + resource = 'ipsecpolicy' + log = logging.getLogger(__name__ + '.ShowIPsecPolicy') + + +class CreateIPsecPolicy(neutronv20.CreateCommand): + """Create an ipsecpolicy.""" + + resource = 'ipsecpolicy' + log = logging.getLogger(__name__ + '.CreateIPsecPolicy') + + def add_known_arguments(self, parser): + parser.add_argument( + '--description', + help='Description of the IPsecPolicy') + parser.add_argument( + '--transform-protocol', + default='esp', choices=['esp', 'ah', 'ah-esp'], + help='Transform Protocol in lowercase, default:esp') + parser.add_argument( + '--auth-algorithm', + default='sha1', choices=['sha1'], + help='Authentication algorithm in lowercase, default:sha1') + parser.add_argument( + '--encryption-algorithm', + default='aes-128', choices=['3des', + 'aes-128', + 'aes-192', + 'aes-256'], + help='Encryption Algorithm in lowercase, default:aes-128') + parser.add_argument( + '--encapsulation-mode', + default='tunnel', choices=['tunnel', 'transport'], + help='Encapsulation Mode in lowercase, default:tunnel') + parser.add_argument( + '--pfs', + default='group5', choices=['group2', 'group5', 'group14'], + help='Perfect Forward Secrecy in lowercase, default:group5') + parser.add_argument( + '--lifetime', + metavar="units=UNITS,value=VALUE", + type=utils.str2dict, + help=vpn_utils.lifetime_help("IPsec")) + parser.add_argument( + 'name', metavar='NAME', + help='Name of the IPsecPolicy') + + def args2body(self, parsed_args): + + body = {'ipsecpolicy': { + 'auth_algorithm': parsed_args.auth_algorithm, + 'encryption_algorithm': parsed_args.encryption_algorithm, + 'encapsulation_mode': parsed_args.encapsulation_mode, + 'transform_protocol': parsed_args.transform_protocol, + 'pfs': parsed_args.pfs, + }, } + if parsed_args.name: + body['ipsecpolicy'].update({'name': parsed_args.name}) + if parsed_args.description: + body['ipsecpolicy'].update( + {'description': parsed_args.description} + ) + if parsed_args.tenant_id: + body['ipsecpolicy'].update({'tenant_id': parsed_args.tenant_id}) + if parsed_args.lifetime: + vpn_utils.validate_lifetime_dict(parsed_args.lifetime) + body['ipsecpolicy'].update({'lifetime': parsed_args.lifetime}) + return body + + +class UpdateIPsecPolicy(neutronv20.UpdateCommand): + """Update a given ipsec policy.""" + + resource = 'ipsecpolicy' + log = logging.getLogger(__name__ + '.UpdateIPsecPolicy') + + def add_known_arguments(self, parser): + parser.add_argument( + '--lifetime', + metavar="units=UNITS,value=VALUE", + type=utils.str2dict, + help=vpn_utils.lifetime_help("IPsec")) + + def args2body(self, parsed_args): + + body = {'ipsecpolicy': { + }, } + if parsed_args.lifetime: + vpn_utils.validate_lifetime_dict(parsed_args.lifetime) + body['ipsecpolicy'].update({'lifetime': parsed_args.lifetime}) + return body + + +class DeleteIPsecPolicy(neutronv20.DeleteCommand): + """Delete a given ipsecpolicy.""" + + resource = 'ipsecpolicy' + log = logging.getLogger(__name__ + '.DeleteIPsecPolicy') diff --git a/neutronclient/neutron/v2_0/vpn/utils.py b/neutronclient/neutron/v2_0/vpn/utils.py new file mode 100644 index 0000000..cd2f5f0 --- /dev/null +++ b/neutronclient/neutron/v2_0/vpn/utils.py @@ -0,0 +1,109 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# (c) Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Swaminathan Vasudevan, Hewlett-Packard. +# + + +"""VPN Utilities and helper functions.""" + + +from neutronclient.openstack.common.gettextutils import _ + +dpd_supported_actions = ['hold', 'clear', 'restart', + 'restart-by-peer', 'disabled'] +dpd_supported_keys = ['action', 'interval', 'timeout'] + +lifetime_keys = ['units', 'value'] +lifetime_units = ['seconds'] + + +def validate_dpd_dict(dpd_dict): + for key, value in dpd_dict.items(): + if key not in dpd_supported_keys: + message = _( + "DPD Dictionary KeyError: " + "Reason-Invalid DPD key : " + "'%(key)s' not in %(supported_key)s ") % { + 'key': key, 'supported_key': dpd_supported_keys} + raise KeyError(message) + if key == 'action' and value not in dpd_supported_actions: + message = _( + "DPD Dictionary ValueError: " + "Reason-Invalid DPD action : " + "'%(key_value)s' not in %(supported_action)s ") % { + 'key_value': value, + 'supported_action': dpd_supported_actions} + raise ValueError(message) + if key in ('interval', 'timeout'): + if int(value) <= 0: + message = _( + "DPD Dictionary ValueError: " + "Reason-Invalid positive integer value: " + "'%(key)s' = %(value)i ") % { + 'key': key, 'value': int(value)} + raise ValueError(message) + else: + dpd_dict[key] = int(value) + return + + +def validate_lifetime_dict(lifetime_dict): + + for key, value in lifetime_dict.items(): + if key not in lifetime_keys: + message = _( + "Lifetime Dictionary KeyError: " + "Reason-Invalid unit key : " + "'%(key)s' not in %(supported_key)s ") % { + 'key': key, 'supported_key': lifetime_keys} + raise KeyError(message) + if key == 'units' and value not in lifetime_units: + message = _( + "Lifetime Dictionary ValueError: " + "Reason-Invalid units : " + "'%(key_value)s' not in %(supported_units)s ") % { + 'key_value': key, 'supported_units': lifetime_units} + raise ValueError(message) + if key == 'value': + if int(value) < 60: + message = _( + "Lifetime Dictionary ValueError: " + "Reason-Invalid value should be at least 60:" + "'%(key_value)s' = %(value)i ") % { + 'key_value': key, 'value': int(value)} + raise ValueError(str(message)) + else: + lifetime_dict['value'] = int(value) + return + + +def lifetime_help(policy): + lifetime = ("%s Lifetime Attributes." + "'units'-seconds,default:seconds. " + "'value'-non negative integer, default:3600." % policy) + return lifetime + + +def dpd_help(policy): + dpd = (" %s Dead Peer Detection Attributes. " + " 'action'-hold,clear,disabled,restart,restart-by-peer." + " 'interval' and 'timeout' are non negative integers. " + " 'interval' should be less than 'timeout' value. " + " 'action', default:hold 'interval', default:30, " + " 'timeout', default:120." % policy) + return dpd diff --git a/neutronclient/neutron/v2_0/vpn/vpnservice.py b/neutronclient/neutron/v2_0/vpn/vpnservice.py new file mode 100644 index 0000000..0f9e92b --- /dev/null +++ b/neutronclient/neutron/v2_0/vpn/vpnservice.py @@ -0,0 +1,98 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# (c) Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Swaminathan Vasudevan, Hewlett-Packard. +# + +import logging + +from neutronclient.neutron import v2_0 as neutronv20 + + +class ListVPNService(neutronv20.ListCommand): + """List VPNService configurations that belong to a given tenant.""" + + resource = 'vpnservice' + log = logging.getLogger(__name__ + '.ListVPNService') + list_columns = [ + 'id', 'name', 'router_id', 'status' + ] + _formatters = {} + pagination_support = True + sorting_support = True + + +class ShowVPNService(neutronv20.ShowCommand): + """Show information of a given VPNService.""" + + resource = 'vpnservice' + log = logging.getLogger(__name__ + '.ShowVPNService') + + +class CreateVPNService(neutronv20.CreateCommand): + """Create a VPNService.""" + resource = 'vpnservice' + log = logging.getLogger(__name__ + '.CreateVPNService') + + def add_known_arguments(self, parser): + parser.add_argument( + '--admin-state-down', + dest='admin_state', action='store_false', + help='set admin state up to false') + parser.add_argument( + '--name', + help='Set a name for the vpnservice') + parser.add_argument( + '--description', + help='Set a description for the vpnservice') + parser.add_argument( + 'router', metavar='ROUTER', + help='Router unique identifier for the vpnservice') + parser.add_argument( + 'subnet', metavar='SUBNET', + help='Subnet unique identifier for the vpnservice deployment') + + def args2body(self, parsed_args): + _subnet_id = neutronv20.find_resourceid_by_name_or_id( + self.get_client(), 'subnet', + parsed_args.subnet) + _router_id = neutronv20.find_resourceid_by_name_or_id( + self.get_client(), 'router', + parsed_args.router) + + body = {self.resource: {'subnet_id': _subnet_id, + 'router_id': _router_id, + 'admin_state_up': parsed_args.admin_state}, } + neutronv20.update_dict(parsed_args, body[self.resource], + ['name', 'description', + 'tenant_id']) + + return body + + +class UpdateVPNService(neutronv20.UpdateCommand): + """Update a given VPNService.""" + + resource = 'vpnservice' + log = logging.getLogger(__name__ + '.UpdateVPNService') + + +class DeleteVPNService(neutronv20.DeleteCommand): + """Delete a given VPNService.""" + + resource = 'vpnservice' + log = logging.getLogger(__name__ + '.DeleteVPNService') diff --git a/neutronclient/shell.py b/neutronclient/shell.py index 70038d4..6d45f78 100644 --- a/neutronclient/shell.py +++ b/neutronclient/shell.py @@ -53,6 +53,10 @@ from neutronclient.neutron.v2_0 import router from neutronclient.neutron.v2_0 import securitygroup from neutronclient.neutron.v2_0 import servicetype from neutronclient.neutron.v2_0 import subnet +from neutronclient.neutron.v2_0.vpn import ikepolicy +from neutronclient.neutron.v2_0.vpn import ipsec_site_connection +from neutronclient.neutron.v2_0.vpn import ipsecpolicy +from neutronclient.neutron.v2_0.vpn import vpnservice from neutronclient.openstack.common import strutils from neutronclient.version import __version__ @@ -75,7 +79,7 @@ def run_command(cmd, cmd_parser, sub_argv): def env(*_vars, **kwargs): - """Search for the first defined of possibly many env vars + """Search for the first defined of possibly many env vars. Returns the first environment variable defined in vars, or returns the default defined in kwargs. @@ -216,6 +220,36 @@ COMMAND_V2 = { 'cisco-policy-profile-list': policyprofile.ListPolicyProfile, 'cisco-policy-profile-show': policyprofile.ShowPolicyProfile, 'cisco-policy-profile-update': policyprofile.UpdatePolicyProfile, + 'ipsec-site-connection-list': ( + ipsec_site_connection.ListIPsecSiteConnection + ), + 'ipsec-site-connection-show': ( + ipsec_site_connection.ShowIPsecSiteConnection + ), + 'ipsec-site-connection-create': ( + ipsec_site_connection.CreateIPsecSiteConnection + ), + 'ipsec-site-connection-update': ( + ipsec_site_connection.UpdateIPsecSiteConnection + ), + 'ipsec-site-connection-delete': ( + ipsec_site_connection.DeleteIPsecSiteConnection + ), + 'vpn-service-list': vpnservice.ListVPNService, + 'vpn-service-show': vpnservice.ShowVPNService, + 'vpn-service-create': vpnservice.CreateVPNService, + 'vpn-service-update': vpnservice.UpdateVPNService, + 'vpn-service-delete': vpnservice.DeleteVPNService, + 'vpn-ipsecpolicy-list': ipsecpolicy.ListIPsecPolicy, + 'vpn-ipsecpolicy-show': ipsecpolicy.ShowIPsecPolicy, + 'vpn-ipsecpolicy-create': ipsecpolicy.CreateIPsecPolicy, + 'vpn-ipsecpolicy-update': ipsecpolicy.UpdateIPsecPolicy, + 'vpn-ipsecpolicy-delete': ipsecpolicy.DeleteIPsecPolicy, + 'vpn-ikepolicy-list': ikepolicy.ListIKEPolicy, + 'vpn-ikepolicy-show': ikepolicy.ShowIKEPolicy, + 'vpn-ikepolicy-create': ikepolicy.CreateIKEPolicy, + 'vpn-ikepolicy-update': ikepolicy.UpdateIKEPolicy, + 'vpn-ikepolicy-delete': ikepolicy.DeleteIKEPolicy, } COMMANDS = {'2.0': COMMAND_V2} @@ -594,8 +628,7 @@ class NeutronShell(app.App): self.log.debug('got an error: %s', unicode(err)) def configure_logging(self): - """Create logging handlers for any log output. - """ + """Create logging handlers for any log output.""" root_logger = logging.getLogger('') # Set up logging to a file diff --git a/neutronclient/tests/unit/test_cli20.py b/neutronclient/tests/unit/test_cli20.py index a880191..886b114 100644 --- a/neutronclient/tests/unit/test_cli20.py +++ b/neutronclient/tests/unit/test_cli20.py @@ -188,7 +188,8 @@ class CLITestV20Base(testtools.TestCase): non_admin_status_resources = ['subnet', 'floatingip', 'security_group', 'security_group_rule', 'qos_queue', 'network_gateway', 'credential', - 'network_profile', 'policy_profile'] + 'network_profile', 'policy_profile', + 'ikepolicy', 'ipsecpolicy'] if (resource in non_admin_status_resources): body = {resource: {}, } else: @@ -215,9 +216,13 @@ class CLITestV20Base(testtools.TestCase): resource_plural = neutronV2_0._get_resource_plural(resource, self.client) path = getattr(self.client, resource_plural + "_path") + if self.format == 'json': + mox_body = MyComparator(body, self.client) + else: + mox_body = self.client.serialize(body) self.client.httpclient.request( end_url(path, format=self.format), 'POST', - body=MyComparator(body, self.client), + body=mox_body, headers=mox.ContainsKeyValue( 'X-Auth-Token', TOKEN)).AndReturn((MyResp(200), resstr)) args.extend(['--request-format', self.format]) diff --git a/neutronclient/v2_0/client.py b/neutronclient/v2_0/client.py index ce4afb5..0a2f7d9 100644 --- a/neutronclient/v2_0/client.py +++ b/neutronclient/v2_0/client.py @@ -165,6 +165,14 @@ class Client(object): security_group_path = "/security-groups/%s" security_group_rules_path = "/security-group-rules" security_group_rule_path = "/security-group-rules/%s" + vpnservices_path = "/vpn/vpnservices" + vpnservice_path = "/vpn/vpnservices/%s" + ipsecpolicies_path = "/vpn/ipsecpolicies" + ipsecpolicy_path = "/vpn/ipsecpolicies/%s" + ikepolicies_path = "/vpn/ikepolicies" + ikepolicy_path = "/vpn/ikepolicies/%s" + ipsec_site_connections_path = "/vpn/ipsec-site-connections" + ipsec_site_connection_path = "/vpn/ipsec-site-connections/%s" vips_path = "/lb/vips" vip_path = "/lb/vips/%s" pools_path = "/lb/pools" @@ -215,6 +223,10 @@ class Client(object): 'service_definitions': 'service_definition', 'security_groups': 'security_group', 'security_group_rules': 'security_group_rule', + 'ipsecpolicies': 'ipsecpolicy', + 'ikepolicies': 'ikepolicy', + 'ipsec_site_connections': 'ipsec_site_connection', + 'vpnservices': 'vpnservice', 'vips': 'vip', 'pools': 'pool', 'members': 'member', @@ -488,6 +500,118 @@ class Client(object): return self.get(self.security_group_rule_path % (security_group_rule), params=_params) + @APIParamsCall + def list_vpnservices(self, retrieve_all=True, **_params): + """Fetches a list of all configured VPNServices for a tenant.""" + return self.list('vpnservices', self.vpnservices_path, retrieve_all, + **_params) + + @APIParamsCall + def show_vpnservice(self, vpnservice, **_params): + """Fetches information of a specific VPNService.""" + return self.get(self.vpnservice_path % (vpnservice), params=_params) + + @APIParamsCall + def create_vpnservice(self, body=None): + """Creates a new VPNService.""" + return self.post(self.vpnservices_path, body=body) + + @APIParamsCall + def update_vpnservice(self, vpnservice, body=None): + """Updates a VPNService.""" + return self.put(self.vpnservice_path % (vpnservice), body=body) + + @APIParamsCall + def delete_vpnservice(self, vpnservice): + """Deletes the specified VPNService.""" + return self.delete(self.vpnservice_path % (vpnservice)) + + @APIParamsCall + def list_ipsec_site_connections(self, retrieve_all=True, **_params): + """Fetches all configured IPsecSiteConnections for a tenant.""" + return self.list('ipsec_site_connections', + self.ipsec_site_connections_path, + retrieve_all, + **_params) + + @APIParamsCall + def show_ipsec_site_connection(self, ipsecsite_conn, **_params): + """Fetches information of a specific IPsecSiteConnection.""" + return self.get( + self.ipsec_site_connection_path % (ipsecsite_conn), params=_params + ) + + @APIParamsCall + def create_ipsec_site_connection(self, body=None): + """Creates a new IPsecSiteConnection.""" + return self.post(self.ipsec_site_connections_path, body=body) + + @APIParamsCall + def update_ipsec_site_connection(self, ipsecsite_conn, body=None): + """Updates an IPsecSiteConnection.""" + return self.put( + self.ipsec_site_connection_path % (ipsecsite_conn), body=body + ) + + @APIParamsCall + def delete_ipsec_site_connection(self, ipsecsite_conn): + """Deletes the specified IPsecSiteConnection.""" + return self.delete(self.ipsec_site_connection_path % (ipsecsite_conn)) + + @APIParamsCall + def list_ikepolicies(self, retrieve_all=True, **_params): + """Fetches a list of all configured IKEPolicies for a tenant.""" + return self.list('ikepolicies', self.ikepolicies_path, retrieve_all, + **_params) + + @APIParamsCall + def show_ikepolicy(self, ikepolicy, **_params): + """Fetches information of a specific IKEPolicy.""" + return self.get(self.ikepolicy_path % (ikepolicy), params=_params) + + @APIParamsCall + def create_ikepolicy(self, body=None): + """Creates a new IKEPolicy.""" + return self.post(self.ikepolicies_path, body=body) + + @APIParamsCall + def update_ikepolicy(self, ikepolicy, body=None): + """Updates an IKEPolicy.""" + return self.put(self.ikepolicy_path % (ikepolicy), body=body) + + @APIParamsCall + def delete_ikepolicy(self, ikepolicy): + """Deletes the specified IKEPolicy.""" + return self.delete(self.ikepolicy_path % (ikepolicy)) + + @APIParamsCall + def list_ipsecpolicies(self, retrieve_all=True, **_params): + """Fetches a list of all configured IPsecPolicies for a tenant.""" + return self.list('ipsecpolicies', + self.ipsecpolicies_path, + retrieve_all, + **_params) + + @APIParamsCall + def show_ipsecpolicy(self, ipsecpolicy, **_params): + """Fetches information of a specific IPsecPolicy.""" + return self.get(self.ipsecpolicy_path % (ipsecpolicy), params=_params) + + @APIParamsCall + def create_ipsecpolicy(self, body=None): + """Creates a new IPsecPolicy.""" + return self.post(self.ipsecpolicies_path, body=body) + + @APIParamsCall + def update_ipsecpolicy(self, ipsecpolicy, body=None): + """Updates an IPsecPolicy.""" + return self.put(self.ipsecpolicy_path % (ipsecpolicy), body=body) + + @APIParamsCall + def delete_ipsecpolicy(self, ipsecpolicy): + """Deletes the specified IPsecPolicy.""" + return self.delete(self.ipsecpolicy_path % (ipsecpolicy)) + @APIParamsCall def list_vips(self, retrieve_all=True, **_params): """Fetches a list of all load balancer vips for a tenant.""" diff --git a/tests/unit/vpn/__init__.py b/tests/unit/vpn/__init__.py new file mode 100644 index 0000000..9850e2c --- /dev/null +++ b/tests/unit/vpn/__init__.py @@ -0,0 +1,18 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# (c) Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Swaminathan Vasudevan, Hewlett-Packard. diff --git a/tests/unit/vpn/test_cli20_ikepolicy.py b/tests/unit/vpn/test_cli20_ikepolicy.py new file mode 100644 index 0000000..f07d2fb --- /dev/null +++ b/tests/unit/vpn/test_cli20_ikepolicy.py @@ -0,0 +1,214 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# (c) Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Swaminathan Vasudevan, Hewlett Packard. + +import sys + +from neutronclient.neutron.v2_0.vpn import ikepolicy +from tests.unit import test_cli20 + + +class CLITestV20VpnIkePolicyJSON(test_cli20.CLITestV20Base): + + def test_create_ikepolicy_all_params(self): + """vpn-ikepolicy-create all params.""" + resource = 'ikepolicy' + cmd = ikepolicy.CreateIKEPolicy(test_cli20.MyApp(sys.stdout), None) + name = 'ikepolicy1' + description = 'my-ike-policy' + auth_algorithm = 'sha1' + encryption_algorithm = 'aes-256' + ike_version = 'v1' + phase1_negotiation_mode = 'main' + pfs = 'group5' + tenant_id = 'my-tenant' + my_id = 'my-id' + lifetime = 'units=seconds,value=20000' + + args = [name, + '--description', description, + '--tenant-id', tenant_id, + '--auth-algorithm', auth_algorithm, + '--encryption-algorithm', encryption_algorithm, + '--ike-version', ike_version, + '--phase1-negotiation-mode', phase1_negotiation_mode, + '--lifetime', lifetime, + '--pfs', pfs] + + position_names = ['name', 'description', + 'auth_algorithm', 'encryption_algorithm', + 'phase1_negotiation_mode', + 'ike_version', 'pfs', + 'tenant_id'] + + position_values = [name, description, + auth_algorithm, encryption_algorithm, + phase1_negotiation_mode, ike_version, pfs, + tenant_id] + extra_body = { + 'lifetime': { + 'units': 'seconds', + 'value': 20000, + }, + } + + self._test_create_resource(resource, cmd, name, my_id, args, + position_names, position_values, + extra_body=extra_body) + + def test_create_ikepolicy_with_limited_params(self): + """vpn-ikepolicy-create with limited params.""" + resource = 'ikepolicy' + cmd = ikepolicy.CreateIKEPolicy(test_cli20.MyApp(sys.stdout), None) + name = 'ikepolicy1' + auth_algorithm = 'sha1' + encryption_algorithm = 'aes-128' + ike_version = 'v1' + phase1_negotiation_mode = 'main' + pfs = 'group5' + tenant_id = 'my-tenant' + my_id = 'my-id' + + args = [name, + '--tenant-id', tenant_id] + + position_names = ['name', + 'auth_algorithm', 'encryption_algorithm', + 'phase1_negotiation_mode', + 'ike_version', 'pfs', + 'tenant_id'] + + position_values = [name, + auth_algorithm, encryption_algorithm, + phase1_negotiation_mode, + ike_version, pfs, + tenant_id] + + self._test_create_resource(resource, cmd, name, my_id, args, + position_names, position_values) + + def _test_lifetime_values(self, lifetime): + resource = 'ikepolicy' + cmd = ikepolicy.CreateIKEPolicy(test_cli20.MyApp(sys.stdout), None) + name = 'ikepolicy1' + description = 'my-ike-policy' + auth_algorithm = 'sha1' + encryption_algorithm = 'aes-256' + ike_version = 'v1' + phase1_negotiation_mode = 'main' + pfs = 'group5' + tenant_id = 'my-tenant' + my_id = 'my-id' + + args = [name, + '--description', description, + '--tenant-id', tenant_id, + '--auth-algorithm', auth_algorithm, + '--encryption-algorithm', encryption_algorithm, + '--ike-version', ike_version, + '--phase1-negotiation-mode', phase1_negotiation_mode, + '--lifetime', lifetime, + '--pfs', pfs] + + position_names = ['name', 'description', + 'auth_algorithm', 'encryption_algorithm', + 'phase1_negotiation_mode', + 'ike_version', 'pfs', + 'tenant_id'] + + position_values = [name, description, + auth_algorithm, encryption_algorithm, + phase1_negotiation_mode, ike_version, pfs, + tenant_id] + try: + self._test_create_resource(resource, cmd, name, my_id, args, + position_names, position_values) + except Exception: + return + self.fail("IKEPolicy Lifetime Error") + + def test_create_ikepolicy_with_invalid_lifetime_keys(self): + lifetime = 'uts=seconds,val=20000' + self._test_lifetime_values(lifetime) + + def test_create_ikepolicy_with_invalid_lifetime_value(self): + lifetime = 'units=seconds,value=-1' + self._test_lifetime_values(lifetime) + + def test_list_ikepolicy(self): + """vpn-ikepolicy-list.""" + resources = "ikepolicies" + cmd = ikepolicy.ListIKEPolicy(test_cli20.MyApp(sys.stdout), None) + self._test_list_resources(resources, cmd, True) + + def test_list_ikepolicy_pagination(self): + """vpn-ikepolicy-list.""" + resources = "ikepolicies" + cmd = ikepolicy.ListIKEPolicy(test_cli20.MyApp(sys.stdout), None) + self._test_list_resources_with_pagination(resources, cmd) + + def test_list_ikepolicy_sort(self): + """vpn-ikepolicy-list --sort-key name --sort-key id --sort-key asc + --sort-key desc + """ + resources = "ikepolicies" + cmd = ikepolicy.ListIKEPolicy(test_cli20.MyApp(sys.stdout), None) + self._test_list_resources(resources, cmd, + sort_key=["name", "id"], + sort_dir=["asc", "desc"]) + + def test_list_ikepolicy_limit(self): + """vpn-ikepolicy-list -P.""" + resources = "ikepolicies" + cmd = ikepolicy.ListIKEPolicy(test_cli20.MyApp(sys.stdout), None) + self._test_list_resources(resources, cmd, page_size=1000) + + def test_show_ikepolicy_id(self): + """vpn-ikepolicy-show ikepolicy_id.""" + resource = 'ikepolicy' + cmd = ikepolicy.ShowIKEPolicy(test_cli20.MyApp(sys.stdout), None) + args = ['--fields', 'id', self.test_id] + self._test_show_resource(resource, cmd, self.test_id, args, ['id']) + + def test_show_ikepolicy_id_name(self): + """vpn-ikepolicy-show.""" + resource = 'ikepolicy' + cmd = ikepolicy.ShowIKEPolicy(test_cli20.MyApp(sys.stdout), None) + args = ['--fields', 'id', '--fields', 'name', self.test_id] + self._test_show_resource(resource, cmd, self.test_id, + args, ['id', 'name']) + + def test_update_ikepolicy(self): + """vpn-ikepolicy-update myid --name newname --tags a b.""" + resource = 'ikepolicy' + cmd = ikepolicy.UpdateIKEPolicy(test_cli20.MyApp(sys.stdout), None) + self._test_update_resource(resource, cmd, 'myid', + ['myid', '--name', 'newname'], + {'name': 'newname', }) + + def test_delete_ikepolicy(self): + """vpn-ikepolicy-delete my-id.""" + resource = 'ikepolicy' + cmd = ikepolicy.DeleteIKEPolicy(test_cli20.MyApp(sys.stdout), None) + my_id = 'my-id' + args = [my_id] + self._test_delete_resource(resource, cmd, my_id, args) + + +class CLITestV20VpnIkePolicyXML(CLITestV20VpnIkePolicyJSON): + format = 'xml' diff --git a/tests/unit/vpn/test_cli20_ipsec_site_connection.py b/tests/unit/vpn/test_cli20_ipsec_site_connection.py new file mode 100644 index 0000000..a01acde --- /dev/null +++ b/tests/unit/vpn/test_cli20_ipsec_site_connection.py @@ -0,0 +1,312 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# (c) Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Swaminathan Vasudevan, Hewlett Packard. + +import sys + +from neutronclient.neutron.v2_0.vpn import ipsec_site_connection +from tests.unit import test_cli20 + + +class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base): + + def test_create_ipsec_site_connection_all_params(self): + """ipsecsite-connection-create all params.""" + resource = 'ipsec_site_connection' + cmd = ipsec_site_connection.CreateIPsecSiteConnection( + test_cli20.MyApp(sys.stdout), None + ) + tenant_id = 'mytenant_id' + name = 'connection1' + my_id = 'my_id' + peer_address = '192.168.2.10' + peer_id = '192.168.2.10' + psk = 'abcd' + mtu = '1500' + initiator = 'bi-directional' + vpnservice_id = 'vpnservice_id' + ikepolicy_id = 'ikepolicy_id' + ipsecpolicy_id = 'ipsecpolicy_id' + peer_cidrs = ['192.168.3.0/24', '192.168.2.0/24'] + admin_state = True + description = 'my-vpn-connection' + dpd = 'action=restart,interval=30,timeout=120' + + args = ['--tenant-id', tenant_id, + '--peer-address', peer_address, '--peer-id', peer_id, + '--psk', psk, '--initiator', initiator, + '--vpnservice-id', vpnservice_id, + '--ikepolicy-id', ikepolicy_id, '--name', name, + '--ipsecpolicy-id', ipsecpolicy_id, '--mtu', mtu, + '--description', description, + '--peer-cidr', '192.168.3.0/24', + '--peer-cidr', '192.168.2.0/24', + '--dpd', dpd] + + position_names = ['name', 'tenant_id', 'admin_state_up', + 'peer_address', 'peer_id', 'peer_cidrs', + 'psk', 'mtu', 'initiator', 'description', + 'vpnservice_id', 'ikepolicy_id', + 'ipsecpolicy_id'] + + position_values = [name, tenant_id, admin_state, peer_address, + peer_id, peer_cidrs, psk, mtu, + initiator, description, + vpnservice_id, ikepolicy_id, ipsecpolicy_id] + extra_body = { + 'dpd': { + 'action': 'restart', + 'interval': 30, + 'timeout': 120, + }, + } + + self._test_create_resource(resource, cmd, name, my_id, args, + position_names, position_values, + extra_body=extra_body) + + def test_create_ipsec_site_connection_with_limited_params(self): + """ipsecsite-connection-create with limited params.""" + resource = 'ipsec_site_connection' + cmd = ipsec_site_connection.CreateIPsecSiteConnection( + test_cli20.MyApp(sys.stdout), None + ) + tenant_id = 'mytenant_id' + my_id = 'my_id' + peer_address = '192.168.2.10' + peer_id = '192.168.2.10' + psk = 'abcd' + mtu = '1500' + initiator = 'bi-directional' + vpnservice_id = 'vpnservice_id' + ikepolicy_id = 'ikepolicy_id' + ipsecpolicy_id = 'ipsecpolicy_id' + peer_cidrs = ['192.168.3.0/24', '192.168.2.0/24'] + admin_state = True + + args = ['--tenant-id', tenant_id, + '--peer-address', peer_address, + '--peer-id', peer_id, + '--psk', psk, + '--vpnservice-id', vpnservice_id, + '--ikepolicy-id', ikepolicy_id, + '--ipsecpolicy-id', ipsecpolicy_id, + '--peer-cidr', '192.168.3.0/24', + '--peer-cidr', '192.168.2.0/24'] + + position_names = ['tenant_id', 'admin_state_up', + 'peer_address', 'peer_id', 'peer_cidrs', + 'psk', 'mtu', 'initiator', + 'vpnservice_id', 'ikepolicy_id', + 'ipsecpolicy_id'] + + position_values = [tenant_id, admin_state, peer_address, + peer_id, peer_cidrs, psk, mtu, + initiator, + vpnservice_id, ikepolicy_id, ipsecpolicy_id] + + self._test_create_resource(resource, cmd, None, my_id, args, + position_names, position_values) + + def _test_dpd_values(self, dpd): + """ipsecsite-connection-create with invalid dpd values.""" + resource = 'ipsec_site_connection' + cmd = ipsec_site_connection.CreateIPsecSiteConnection( + test_cli20.MyApp(sys.stdout), None + ) + tenant_id = 'mytenant_id' + name = 'connection1' + my_id = 'my_id' + peer_address = '192.168.2.10' + peer_id = '192.168.2.10' + psk = 'abcd' + mtu = '1500' + initiator = 'bi-directional' + vpnservice_id = 'vpnservice_id' + ikepolicy_id = 'ikepolicy_id' + ipsecpolicy_id = 'ipsecpolicy_id' + peer_cidrs = ['192.168.3.0/24', '192.168.2.0/24'] + admin_state = True + description = 'my-vpn-connection' + + args = ['--tenant-id', tenant_id, + '--peer-address', peer_address, '--peer-id', peer_id, + '--psk', psk, '--initiator', initiator, + '--vpnservice-id', vpnservice_id, + '--ikepolicy-id', ikepolicy_id, '--name', name, + '--ipsecpolicy-id', ipsecpolicy_id, '--mtu', mtu, + '--description', description, + '--peer-cidr', '192.168.3.0/24', + '--peer-cidr', '192.168.2.0/24', + '--dpd', dpd] + + position_names = ['name', 'tenant_id', 'admin_state_up', + 'peer_address', 'peer_id', 'peer_cidrs', + 'psk', 'mtu', 'initiator', 'description', + 'vpnservice_id', 'ikepolicy_id', + 'ipsecpolicy_id'] + + position_values = [name, tenant_id, admin_state, peer_address, + peer_id, peer_cidrs, psk, mtu, + initiator, description, + vpnservice_id, ikepolicy_id, ipsecpolicy_id] + self.assertRaises( + Exception, + self._test_create_resource, + resource, cmd, name, my_id, args, + position_names, position_values) + + def test_invalid_mtu(self): + """ipsecsite-connection-create with invalid dpd values.""" + resource = 'ipsec_site_connection' + cmd = ipsec_site_connection.CreateIPsecSiteConnection( + test_cli20.MyApp(sys.stdout), None + ) + tenant_id = 'mytenant_id' + name = 'connection1' + my_id = 'my_id' + peer_address = '192.168.2.10' + peer_id = '192.168.2.10' + psk = 'abcd' + mtu = '67' + initiator = 'bi-directional' + vpnservice_id = 'vpnservice_id' + ikepolicy_id = 'ikepolicy_id' + ipsecpolicy_id = 'ipsecpolicy_id' + peer_cidrs = ['192.168.3.0/24', '192.168.2.0/24'] + admin_state = True + description = 'my-vpn-connection' + + args = ['--tenant-id', tenant_id, + '--peer-address', peer_address, '--peer-id', peer_id, + '--psk', psk, '--initiator', initiator, + '--vpnservice-id', vpnservice_id, + '--ikepolicy-id', ikepolicy_id, '--name', name, + '--ipsecpolicy-id', ipsecpolicy_id, '--mtu', mtu, + '--description', description, + '--peer-cidr', '192.168.3.0/24', + '--peer-cidr', '192.168.2.0/24'] + + position_names = ['name', 'tenant_id', 'admin_state_up', + 'peer_address', 'peer_id', 'peer_cidrs', + 'psk', 'mtu', 'initiator', 'description', + 'vpnservice_id', 'ikepolicy_id', + 'ipsecpolicy_id'] + + position_values = [name, tenant_id, admin_state, peer_address, + peer_id, peer_cidrs, psk, mtu, + initiator, description, + vpnservice_id, ikepolicy_id, ipsecpolicy_id] + self.assertRaises( + Exception, + self._test_create_resource, + resource, cmd, name, my_id, args, + position_names, position_values) + + def test_create_ipsec_site_connection_with_invalid_dpd_keys(self): + dpd = 'act=restart,interval=30,time=120' + self._test_dpd_values(dpd) + + def test_create_ipsec_site_connection_with_invalid_dpd_values(self): + dpd = 'action=hold,interval=30,timeout=-1' + self._test_dpd_values(dpd) + + def test_create_ipsec_site_connection_dpd_timeout_less_than_interval(self): + dpd = 'action=restart,interval=30,timeout=10' + self._test_dpd_values(dpd) + + def test_list_ipsec_site_connection(self): + """ipsecsite-connection-list.""" + resources = "ipsec_site_connections" + cmd = ipsec_site_connection.ListIPsecSiteConnection( + test_cli20.MyApp(sys.stdout), None + ) + self._test_list_resources(resources, cmd, True) + + def test_list_ipsec_site_connection_pagination(self): + """ipsecsite-connection-list.""" + resources = "ipsec_site_connections" + cmd = ipsec_site_connection.ListIPsecSiteConnection( + test_cli20.MyApp(sys.stdout), None + ) + self._test_list_resources_with_pagination(resources, cmd) + + def test_list_ipsec_site_connection_sort(self): + """ipsecsite-connection-list. + --sort-key name --sort-key id --sort-key asc --sort-key desc + """ + resources = "ipsec_site_connections" + cmd = ipsec_site_connection.ListIPsecSiteConnection( + test_cli20.MyApp(sys.stdout), None + ) + self._test_list_resources(resources, cmd, + sort_key=["name", "id"], + sort_dir=["asc", "desc"]) + + def test_list_ipsec_site_connection_limit(self): + """ipsecsite-connection-list -P.""" + resources = "ipsec_site_connections" + cmd = ipsec_site_connection.ListIPsecSiteConnection( + test_cli20.MyApp(sys.stdout), None + ) + self._test_list_resources(resources, cmd, page_size=1000) + + def test_delete_ipsec_site_connection(self): + """ipsecsite-connection-delete my-id.""" + resource = 'ipsec_site_connection' + cmd = ipsec_site_connection.DeleteIPsecSiteConnection( + test_cli20.MyApp(sys.stdout), None + ) + my_id = 'my-id' + args = [my_id] + self._test_delete_resource(resource, cmd, my_id, args) + + def test_update_ipsec_site_connection(self): + """ipsecsite-connection-update myid --name myname --tags a b.""" + resource = 'ipsec_site_connection' + cmd = ipsec_site_connection.UpdateIPsecSiteConnection( + test_cli20.MyApp(sys.stdout), None + ) + self._test_update_resource(resource, cmd, 'myid', + ['myid', '--name', 'Branch-new', + '--tags', 'a', 'b'], + {'name': 'Branch-new', + 'tags': ['a', 'b'], }) + + def test_show_ipsec_site_connection_id(self): + """ipsecsite-connection-show test_id.""" + resource = 'ipsec_site_connection' + cmd = ipsec_site_connection.ShowIPsecSiteConnection( + test_cli20.MyApp(sys.stdout), None + ) + args = ['--fields', 'id', self.test_id] + self._test_show_resource(resource, cmd, self.test_id, args, ['id']) + + def test_show_ipsec_site_connection_id_name(self): + """ipsecsite-connection-show.""" + resource = 'ipsec_site_connection' + cmd = ipsec_site_connection.ShowIPsecSiteConnection( + test_cli20.MyApp(sys.stdout), None + ) + args = ['--fields', 'id', '--fields', 'name', self.test_id] + self._test_show_resource(resource, cmd, self.test_id, + args, ['id', 'name']) + + +class CLITestV20IPsecSiteConnectionXML(CLITestV20IPsecSiteConnectionJSON): + format = 'xml' diff --git a/tests/unit/vpn/test_cli20_ipsecpolicy.py b/tests/unit/vpn/test_cli20_ipsecpolicy.py new file mode 100644 index 0000000..1abe156 --- /dev/null +++ b/tests/unit/vpn/test_cli20_ipsecpolicy.py @@ -0,0 +1,211 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# (c) Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Swaminathan Vasudevan, Hewlett Packard. + +import sys + +from neutronclient.neutron.v2_0.vpn import ipsecpolicy +from tests.unit import test_cli20 + + +class CLITestV20VpnIpsecPolicyJSON(test_cli20.CLITestV20Base): + + def test_create_ipsecpolicy_all_params(self): + """vpn-ipsecpolicy-create all params with dashes.""" + resource = 'ipsecpolicy' + cmd = ipsecpolicy.CreateIPsecPolicy(test_cli20.MyApp(sys.stdout), None) + name = 'ipsecpolicy1' + description = 'first-ipsecpolicy1' + auth_algorithm = 'sha1' + encryption_algorithm = 'aes-256' + encapsulation_mode = 'tunnel' + pfs = 'group5' + transform_protocol = 'ah' + tenant_id = 'my-tenant' + my_id = 'my-id' + lifetime = 'units=seconds,value=20000' + + args = [name, + '--description', description, + '--tenant-id', tenant_id, + '--auth-algorithm', auth_algorithm, + '--encryption-algorithm', encryption_algorithm, + '--transform-protocol', transform_protocol, + '--encapsulation-mode', encapsulation_mode, + '--lifetime', lifetime, + '--pfs', pfs] + + position_names = ['name', 'auth_algorithm', 'encryption_algorithm', + 'encapsulation_mode', 'description', + 'transform_protocol', 'pfs', + 'tenant_id'] + + position_values = [name, auth_algorithm, encryption_algorithm, + encapsulation_mode, description, + transform_protocol, pfs, + tenant_id] + extra_body = { + 'lifetime': { + 'units': 'seconds', + 'value': 20000, + }, + } + + self._test_create_resource(resource, cmd, name, my_id, args, + position_names, position_values, + extra_body=extra_body) + + def test_create_ipsecpolicy_with_limited_params(self): + """vpn-ipsecpolicy-create with limited params.""" + resource = 'ipsecpolicy' + cmd = ipsecpolicy.CreateIPsecPolicy(test_cli20.MyApp(sys.stdout), None) + name = 'ipsecpolicy1' + auth_algorithm = 'sha1' + encryption_algorithm = 'aes-128' + encapsulation_mode = 'tunnel' + pfs = 'group5' + transform_protocol = 'esp' + tenant_id = 'my-tenant' + my_id = 'my-id' + + args = [name, + '--tenant-id', tenant_id] + + position_names = ['name', 'auth_algorithm', 'encryption_algorithm', + 'encapsulation_mode', + 'transform_protocol', 'pfs', + 'tenant_id'] + + position_values = [name, auth_algorithm, encryption_algorithm, + encapsulation_mode, + transform_protocol, pfs, + tenant_id] + + self._test_create_resource(resource, cmd, name, my_id, args, + position_names, position_values) + + def _test_lifetime_values(self, lifetime): + resource = 'ipsecpolicy' + cmd = ipsecpolicy.CreateIPsecPolicy(test_cli20.MyApp(sys.stdout), None) + name = 'ipsecpolicy1' + description = 'my-ipsec-policy' + auth_algorithm = 'sha1' + encryption_algorithm = 'aes-256' + ike_version = 'v1' + phase1_negotiation_mode = 'main' + pfs = 'group5' + tenant_id = 'my-tenant' + my_id = 'my-id' + + args = [name, + '--description', description, + '--tenant-id', tenant_id, + '--auth-algorithm', auth_algorithm, + '--encryption-algorithm', encryption_algorithm, + '--ike-version', ike_version, + '--phase1-negotiation-mode', phase1_negotiation_mode, + '--lifetime', lifetime, + '--pfs', pfs] + + position_names = ['name', 'description', + 'auth_algorithm', 'encryption_algorithm', + 'phase1_negotiation_mode', + 'ike_version', 'pfs', + 'tenant_id'] + + position_values = [name, description, + auth_algorithm, encryption_algorithm, + phase1_negotiation_mode, ike_version, pfs, + tenant_id] + try: + self._test_create_resource(resource, cmd, name, my_id, args, + position_names, position_values) + except Exception: + return + self.fail("IPsecPolicy Lifetime Error") + + def test_create_ipsecpolicy_with_invalid_lifetime_keys(self): + lifetime = 'uts=seconds,val=20000' + self._test_lifetime_values(lifetime) + + def test_create_ipsecpolicy_with_invalide_lifetime_values(self): + lifetime = 'units=minutes,value=0' + self._test_lifetime_values(lifetime) + + def test_list_ipsecpolicy(self): + """vpn-ipsecpolicy-list.""" + resources = "ipsecpolicies" + cmd = ipsecpolicy.ListIPsecPolicy(test_cli20.MyApp(sys.stdout), None) + self._test_list_resources(resources, cmd, True) + + def test_list_ipsecpolicy_pagination(self): + """vpn-ipsecpolicy-list.""" + resources = "ipsecpolicies" + cmd = ipsecpolicy.ListIPsecPolicy(test_cli20.MyApp(sys.stdout), None) + self._test_list_resources_with_pagination(resources, cmd) + + def test_list_ipsecpolicy_sort(self): + """vpn-ipsecpolicy-list --sort-key name --sort-key id --sort-key asc + --sort-key desc + """ + resources = "ipsecpolicies" + cmd = ipsecpolicy.ListIPsecPolicy(test_cli20.MyApp(sys.stdout), None) + self._test_list_resources(resources, cmd, + sort_key=["name", "id"], + sort_dir=["asc", "desc"]) + + def test_list_ipsecpolicy_limit(self): + """vpn-ipsecpolicy-list -P.""" + resources = "ipsecpolicies" + cmd = ipsecpolicy.ListIPsecPolicy(test_cli20.MyApp(sys.stdout), None) + self._test_list_resources(resources, cmd, page_size=1000) + + def test_show_ipsecpolicy_id(self): + """vpn-ipsecpolicy-show ipsecpolicy_id.""" + resource = 'ipsecpolicy' + cmd = ipsecpolicy.ShowIPsecPolicy(test_cli20.MyApp(sys.stdout), None) + args = ['--fields', 'id', self.test_id] + self._test_show_resource(resource, cmd, self.test_id, args, ['id']) + + def test_show_ipsecpolicy_id_name(self): + """vpn-ipsecpolicy-show.""" + resource = 'ipsecpolicy' + cmd = ipsecpolicy.ShowIPsecPolicy(test_cli20.MyApp(sys.stdout), None) + args = ['--fields', 'id', '--fields', 'name', self.test_id] + self._test_show_resource(resource, cmd, self.test_id, + args, ['id', 'name']) + + def test_update_ipsecpolicy(self): + """vpn-ipsecpolicy-update myid --name newname --tags a b.""" + resource = 'ipsecpolicy' + cmd = ipsecpolicy.UpdateIPsecPolicy(test_cli20.MyApp(sys.stdout), None) + self._test_update_resource(resource, cmd, 'myid', + ['myid', '--name', 'newname'], + {'name': 'newname', }) + + def test_delete_ipsecpolicy(self): + """vpn-ipsecpolicy-delete my-id.""" + resource = 'ipsecpolicy' + cmd = ipsecpolicy.DeleteIPsecPolicy(test_cli20.MyApp(sys.stdout), None) + my_id = 'my-id' + args = [my_id] + self._test_delete_resource(resource, cmd, my_id, args) + + +class CLITestV20VpnIpsecPolicyXML(CLITestV20VpnIpsecPolicyJSON): + format = 'xml' diff --git a/tests/unit/vpn/test_cli20_vpnservice.py b/tests/unit/vpn/test_cli20_vpnservice.py new file mode 100644 index 0000000..d63c432 --- /dev/null +++ b/tests/unit/vpn/test_cli20_vpnservice.py @@ -0,0 +1,140 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# (c) Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Swaminathan Vasudevan, Hewlett Packard. + +import sys + +from neutronclient.neutron.v2_0.vpn import vpnservice +from tests.unit import test_cli20 + + +class CLITestV20VpnServiceJSON(test_cli20.CLITestV20Base): + + def test_create_vpnservice_all_params(self): + """vpn-service-create all params.""" + resource = 'vpnservice' + cmd = vpnservice.CreateVPNService(test_cli20.MyApp(sys.stdout), None) + subnet = 'mysubnet-id' + router = 'myrouter-id' + tenant_id = 'mytenant-id' + my_id = 'my-id' + name = 'myvpnservice' + description = 'my-vpn-service' + admin_state = True + + args = ['--name', name, + '--description', description, + router, + subnet, + '--tenant-id', tenant_id] + + position_names = ['admin_state_up', 'name', 'description', + 'subnet_id', 'router_id', + 'tenant_id'] + + position_values = [admin_state, name, description, + subnet, router, tenant_id] + + self._test_create_resource(resource, cmd, name, my_id, args, + position_names, position_values) + + def test_create_vpnservice_with_limited_params(self): + """vpn-service-create with limited params.""" + resource = 'vpnservice' + cmd = vpnservice.CreateVPNService(test_cli20.MyApp(sys.stdout), None) + subnet = 'mysubnet-id' + router = 'myrouter-id' + tenant_id = 'mytenant-id' + my_id = 'my-id' + admin_state = True + + args = [router, + subnet, + '--tenant-id', tenant_id] + + position_names = ['admin_state_up', + 'subnet_id', 'router_id', + 'tenant_id'] + + position_values = [admin_state, subnet, router, tenant_id] + + self._test_create_resource(resource, cmd, None, my_id, args, + position_names, position_values) + + def test_list_vpnservice(self): + """vpn-service-list.""" + resources = "vpnservices" + cmd = vpnservice.ListVPNService(test_cli20.MyApp(sys.stdout), None) + self._test_list_resources(resources, cmd, True) + + def test_list_vpnservice_pagination(self): + """vpn-service-list.""" + resources = "vpnservices" + cmd = vpnservice.ListVPNService(test_cli20.MyApp(sys.stdout), None) + self._test_list_resources_with_pagination(resources, cmd) + + def test_list_vpnservice_sort(self): + """vpn-service-list --sort-key name --sort-key id --sort-key asc + --sort-key desc + """ + resources = "vpnservices" + cmd = vpnservice.ListVPNService(test_cli20.MyApp(sys.stdout), None) + self._test_list_resources(resources, cmd, + sort_key=["name", "id"], + sort_dir=["asc", "desc"]) + + def test_list_vpnservice_limit(self): + """vpn-service-list -P.""" + resources = "vpnservices" + cmd = vpnservice.ListVPNService(test_cli20.MyApp(sys.stdout), None) + self._test_list_resources(resources, cmd, page_size=1000) + + def test_show_vpnservice_id(self): + """vpn-service-show test_id.""" + resource = 'vpnservice' + cmd = vpnservice.ShowVPNService(test_cli20.MyApp(sys.stdout), None) + args = ['--fields', 'id', self.test_id] + self._test_show_resource(resource, cmd, self.test_id, args, ['id']) + + def test_show_vpnservice_id_name(self): + """vpn-service-show.""" + resource = 'vpnservice' + cmd = vpnservice.ShowVPNService(test_cli20.MyApp(sys.stdout), None) + args = ['--fields', 'id', '--fields', 'name', self.test_id] + self._test_show_resource(resource, cmd, self.test_id, + args, ['id', 'name']) + + def test_update_vpnservice(self): + """vpn-service-update myid --name newname --tags a b.""" + resource = 'vpnservice' + cmd = vpnservice.UpdateVPNService(test_cli20.MyApp(sys.stdout), None) + self._test_update_resource(resource, cmd, 'myid', + ['myid', '--name', 'newname'], + {'name': 'newname', }) + + def test_delete_vpnservice(self): + """vpn-service-delete my-id.""" + resource = 'vpnservice' + cmd = vpnservice.DeleteVPNService(test_cli20.MyApp(sys.stdout), None) + my_id = 'my-id' + args = [my_id] + self._test_delete_resource(resource, cmd, my_id, args) + + +class CLITestV20VpnServiceXML(CLITestV20VpnServiceJSON): + format = 'xml' diff --git a/tests/unit/vpn/test_utils.py b/tests/unit/vpn/test_utils.py new file mode 100644 index 0000000..fe59fb4 --- /dev/null +++ b/tests/unit/vpn/test_utils.py @@ -0,0 +1,132 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# (c) Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Swaminathan Vasudevan, Hewlett Packard. + +import testtools + +from neutronclient.common import utils +from neutronclient.neutron.v2_0.vpn import utils as vpn_utils + + +class TestVPNUtils(testtools.TestCase): + + def test_validate_lifetime_dictionary_seconds(self): + input_str = utils.str2dict("units=seconds,value=3600") + self.assertIsNone(vpn_utils.validate_lifetime_dict(input_str)) + + def test_validate_dpd_dictionary_action_hold(self): + input_str = utils.str2dict("action=hold,interval=30,timeout=120") + self.assertIsNone(vpn_utils.validate_dpd_dict(input_str)) + + def test_validate_dpd_dictionary_action_restart(self): + input_str = utils.str2dict("action=restart,interval=30,timeout=120") + self.assertIsNone(vpn_utils.validate_dpd_dict(input_str)) + + def test_validate_dpd_dictionary_action_restart_by_peer(self): + input_str = utils.str2dict( + "action=restart-by-peer,interval=30,timeout=120" + ) + self.assertIsNone(vpn_utils.validate_dpd_dict(input_str)) + + def test_validate_dpd_dictionary_action_clear(self): + input_str = utils.str2dict('action=clear,interval=30,timeout=120') + self.assertIsNone(vpn_utils.validate_dpd_dict(input_str)) + + def test_validate_dpd_dictionary_action_disabled(self): + input_str = utils.str2dict('action=disabled,interval=30,timeout=120') + self.assertIsNone(vpn_utils.validate_dpd_dict(input_str)) + + def test_validate_lifetime_dictionary_invalid_unit_key(self): + input_str = utils.str2dict('ut=seconds,value=3600') + self._test_validate_lifetime_negative_test_case(input_str) + + def test_validate_lifetime_dictionary_invalid_unit_key_value(self): + input_str = utils.str2dict('units=seconds,val=3600') + self._test_validate_lifetime_negative_test_case(input_str) + + def test_validate_lifetime_dictionary_unsupported_units(self): + input_str = utils.str2dict('units=minutes,value=3600') + self._test_validate_lifetime_negative_test_case(input_str) + + def test_validate_lifetime_dictionary_invalid_empty_unit(self): + input_str = utils.str2dict('units=,value=3600') + self._test_validate_lifetime_negative_test_case(input_str) + + def test_validate_lifetime_dictionary_under_minimum_integer_value(self): + input_str = utils.str2dict('units=seconds,value=59') + self._test_validate_lifetime_negative_test_case(input_str) + + def test_validate_lifetime_dictionary_negative_integer_value(self): + input_str = utils.str2dict('units=seconds,value=-1') + self._test_validate_lifetime_negative_test_case(input_str) + + def test_validate_lifetime_dictionary_empty_value(self): + input_str = utils.str2dict('units=seconds,value=') + self._test_validate_lifetime_negative_test_case(input_str) + + def test_validate_dpd_dictionary_invalid_key_action(self): + input_str = utils.str2dict('act=hold,interval=30,timeout=120') + self._test_validate_dpd_negative_test_case(input_str) + + def test_validate_dpd_dictionary_invalid_key_interval(self): + input_str = utils.str2dict('action=hold,int=30,timeout=120') + self._test_validate_dpd_negative_test_case(input_str) + + def test_validate_dpd_dictionary_invalid_key_timeout(self): + input_str = utils.str2dict('action=hold,interval=30,tiut=120') + self._test_validate_dpd_negative_test_case(input_str) + + def test_validate_dpd_dictionary_unsupported_action(self): + input_str = utils.str2dict('action=bye-bye,interval=30,timeout=120') + self._test_validate_dpd_negative_test_case(input_str) + + def test_validate_dpd_dictionary_empty_action(self): + input_str = utils.str2dict('action=,interval=30,timeout=120') + self._test_validate_dpd_negative_test_case(input_str) + + def test_validate_dpd_dictionary_empty_interval(self): + input_str = utils.str2dict('action=hold,interval=,timeout=120') + self._test_validate_dpd_negative_test_case(input_str) + + def test_validate_dpd_dictionary_negative_interval_value(self): + input_str = utils.str2dict('action=hold,interval=-1,timeout=120') + self._test_validate_lifetime_negative_test_case(input_str) + + def test_validate_dpd_dictionary_zero_timeout(self): + input_str = utils.str2dict('action=hold,interval=30,timeout=0') + self._test_validate_dpd_negative_test_case(input_str) + + def test_validate_dpd_dictionary_empty_timeout(self): + input_str = utils.str2dict('action=hold,interval=30,timeout=') + self._test_validate_dpd_negative_test_case(input_str) + + def test_validate_dpd_dictionary_negative_timeout_value(self): + input_str = utils.str2dict('action=hold,interval=30,timeout=-1') + self._test_validate_lifetime_negative_test_case(input_str) + + def _test_validate_lifetime_negative_test_case(self, input_str): + """Generic handler for negative lifetime tests.""" + self.assertRaises(Exception, + vpn_utils.validate_lifetime_dict, + (input_str)) + + def _test_validate_dpd_negative_test_case(self, input_str): + """Generic handler for negative lifetime tests.""" + self.assertRaises(Exception, + vpn_utils.validate_lifetime_dict, + (input_str))