VPN as a Service (VPNaaS) Client API

Implements:blueprint VPNaaS-Python-API

This provides the basic Neutron Client
CLI for configuring:
* VPNService
* IKEPolicy
* IPsecPolicy
* IPsecSiteConnection

Change-Id: I72ab670d23109a4bb1174acf48e4b77958e855e1
This commit is contained in:
Swaminathan Vasudevan
2013-05-20 06:55:58 -07:00
committed by Nachi Ueno
parent e34a06adf0
commit 653c4b1949
16 changed files with 1888 additions and 6 deletions

View File

@@ -168,7 +168,7 @@ class XMLDictSerializer(DictSerializer):
constants.TYPE_DICT)
return result
attrs = metadata.get('attributes', {}).get(nodename, {})
for k, v in data.items():
for k, v in sorted(data.items()):
if k in attrs:
result.set(k, str(v))
else:

View File

@@ -0,0 +1,17 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Swaminathan Vasudevan, Hewlett-Packard.

View File

@@ -0,0 +1,138 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Swaminathan Vasudevan, Hewlett-Packard.
#
import logging
from neutronclient.common import utils
from neutronclient.neutron import v2_0 as neutronv20
from neutronclient.neutron.v2_0.vpn import utils as vpn_utils
class ListIKEPolicy(neutronv20.ListCommand):
"""List IKEPolicies that belong to a tenant."""
resource = 'ikepolicy'
log = logging.getLogger(__name__ + '.ListIKEPolicy')
list_columns = ['id', 'name', 'auth_algorithm',
'encryption_algorithm', 'ike_version', 'pfs']
_formatters = {}
pagination_support = True
sorting_support = True
class ShowIKEPolicy(neutronv20.ShowCommand):
"""Show information of a given IKEPolicy."""
resource = 'ikepolicy'
log = logging.getLogger(__name__ + '.ShowIKEPolicy')
class CreateIKEPolicy(neutronv20.CreateCommand):
"""Create an IKEPolicy."""
resource = 'ikepolicy'
log = logging.getLogger(__name__ + '.CreateIKEPolicy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help='Description of the IKE policy')
parser.add_argument(
'--auth-algorithm',
default='sha1', choices=['sha1'],
help='Authentication algorithm in lowercase. '
'default:sha1')
parser.add_argument(
'--encryption-algorithm',
default='aes-128', choices=['3des',
'aes-128',
'aes-192',
'aes-256'],
help='Encryption Algorithm in lowercase, default:aes-128')
parser.add_argument(
'--phase1-negotiation-mode',
default='main', choices=['main'],
help='IKE Phase1 negotiation mode in lowercase, default:main')
parser.add_argument(
'--ike-version',
default='v1', choices=['v1', 'v2'],
help='IKE version in lowercase, default:v1')
parser.add_argument(
'--pfs',
default='group5', choices=['group2', 'group5', 'group14'],
help='Perfect Forward Secrecy in lowercase, default:group5')
parser.add_argument(
'--lifetime',
metavar="units=UNITS,value=VALUE",
type=utils.str2dict,
help=vpn_utils.lifetime_help("IKE"))
parser.add_argument(
'name', metavar='NAME',
help='Name of the IKE Policy')
def args2body(self, parsed_args):
body = {'ikepolicy': {
'auth_algorithm': parsed_args.auth_algorithm,
'encryption_algorithm': parsed_args.encryption_algorithm,
'phase1_negotiation_mode': parsed_args.phase1_negotiation_mode,
'ike_version': parsed_args.ike_version,
'pfs': parsed_args.pfs,
}, }
if parsed_args.name:
body['ikepolicy'].update({'name': parsed_args.name})
if parsed_args.description:
body['ikepolicy'].update({'description': parsed_args.description})
if parsed_args.tenant_id:
body['ikepolicy'].update({'tenant_id': parsed_args.tenant_id})
if parsed_args.lifetime:
vpn_utils.validate_lifetime_dict(parsed_args.lifetime)
body['ikepolicy'].update({'lifetime': parsed_args.lifetime})
return body
class UpdateIKEPolicy(neutronv20.UpdateCommand):
"""Update a given IKE Policy."""
resource = 'ikepolicy'
log = logging.getLogger(__name__ + '.UpdateIKEPolicy')
def add_known_arguments(self, parser):
parser.add_argument(
'--lifetime',
metavar="units=UNITS,value=VALUE",
type=utils.str2dict,
help=vpn_utils.lifetime_help("IKE"))
def args2body(self, parsed_args):
body = {'ikepolicy': {
}, }
if parsed_args.lifetime:
vpn_utils.validate_lifetime_dict(parsed_args.lifetime)
body['ikepolicy'].update({'lifetime': parsed_args.lifetime})
return body
class DeleteIKEPolicy(neutronv20.DeleteCommand):
"""Delete a given IKE Policy."""
resource = 'ikepolicy'
log = logging.getLogger(__name__ + '.DeleteIKEPolicy')

View File

@@ -0,0 +1,193 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Swaminathan Vasudevan, Hewlett-Packard.
#
import logging
from neutronclient.common import utils
from neutronclient.neutron import v2_0 as neutronv20
from neutronclient.neutron.v2_0.vpn import utils as vpn_utils
from neutronclient.openstack.common.gettextutils import _
def _format_peer_cidrs(ipsec_site_connection):
try:
return '\n'.join([utils.dumps(cidrs) for cidrs in
ipsec_site_connection['peer_cidrs']])
except Exception:
return ''
class ListIPsecSiteConnection(neutronv20.ListCommand):
"""List IPsecSiteConnections that belong to a given tenant."""
resource = 'ipsec_site_connection'
log = logging.getLogger(__name__ + '.ListIPsecSiteConnection')
_formatters = {'peer_cidrs': _format_peer_cidrs}
list_columns = [
'id', 'name', 'peer_address', 'peer_cidrs', 'route_mode',
'auth_mode', 'status']
pagination_support = True
sorting_support = True
class ShowIPsecSiteConnection(neutronv20.ShowCommand):
"""Show information of a given IPsecSiteConnection."""
resource = 'ipsec_site_connection'
log = logging.getLogger(__name__ + '.ShowIPsecSiteConnection')
class CreateIPsecSiteConnection(neutronv20.CreateCommand):
"""Create an IPsecSiteConnection."""
resource = 'ipsec_site_connection'
log = logging.getLogger(__name__ + '.CreateIPsecSiteConnection')
def add_known_arguments(self, parser):
parser.add_argument(
'--admin-state-down',
default=True, action='store_false',
help='Set admin state up to false')
parser.add_argument(
'--name',
help='Set friendly name for the connection')
parser.add_argument(
'--description',
help='Set a description for the connection')
parser.add_argument(
'--mtu',
default='1500',
help='MTU size for the connection, default:1500')
parser.add_argument(
'--initiator',
default='bi-directional', choices=['bi-directional',
'response-only'],
help='Initiator state in lowercase, default:bi-directional')
parser.add_argument(
'--dpd',
metavar="action=ACTION,interval=INTERVAL,timeout=TIMEOUT",
type=utils.str2dict,
help=vpn_utils.dpd_help("IPsec Connection"))
parser.add_argument(
'--vpnservice-id', metavar='VPNSERVICE',
required=True,
help='VPNService instance id associated with this connection')
parser.add_argument(
'--ikepolicy-id', metavar='IKEPOLICY',
required=True,
help='IKEPolicy id associated with this connection')
parser.add_argument(
'--ipsecpolicy-id', metavar='IPSECPOLICY',
required=True,
help='IPsecPolicy id associated with this connection')
parser.add_argument(
'--peer-address',
required=True,
help='Remote branch router public '
'IPv4 address or IPv6 address or FQDN.')
parser.add_argument(
'--peer-id',
required=True,
help='Remote branch router identity')
parser.add_argument(
'--peer-cidr',
action='append', dest='peer_cidrs',
required=True,
help='Remote subnet(s) in CIDR format')
parser.add_argument(
'--psk',
required=True,
help='Pre-Shared Key string')
def args2body(self, parsed_args):
_vpnservice_id = neutronv20.find_resourceid_by_name_or_id(
self.get_client(), 'vpnservice',
parsed_args.vpnservice_id)
_ikepolicy_id = neutronv20.find_resourceid_by_name_or_id(
self.get_client(), 'ikepolicy',
parsed_args.ikepolicy_id)
_ipsecpolicy_id = neutronv20.find_resourceid_by_name_or_id(
self.get_client(), 'ipsecpolicy',
parsed_args.ipsecpolicy_id)
if int(parsed_args.mtu) < 68:
message = _("Invalid MTU value: MTU must be "
"greater than or equal to 68")
raise ValueError(message)
body = {'ipsec_site_connection': {
'vpnservice_id': _vpnservice_id,
'ikepolicy_id': _ikepolicy_id,
'ipsecpolicy_id': _ipsecpolicy_id,
'peer_address': parsed_args.peer_address,
'peer_id': parsed_args.peer_id,
'mtu': parsed_args.mtu,
'initiator': parsed_args.initiator,
'psk': parsed_args.psk,
'admin_state_up': parsed_args.admin_state_down,
}, }
if parsed_args.name:
body['ipsec_site_connection'].update(
{'name': parsed_args.name}
)
if parsed_args.description:
body['ipsec_site_connection'].update(
{'description': parsed_args.description}
)
if parsed_args.tenant_id:
body['ipsec_site_connection'].update(
{'tenant_id': parsed_args.tenant_id}
)
if parsed_args.dpd:
vpn_utils.validate_dpd_dict(parsed_args.dpd)
body['ipsec_site_connection'].update({'dpd': parsed_args.dpd})
if parsed_args.peer_cidrs:
body['ipsec_site_connection'][
'peer_cidrs'] = parsed_args.peer_cidrs
return body
class UpdateIPsecSiteConnection(neutronv20.UpdateCommand):
"""Update a given IPsecSiteConnection."""
resource = 'ipsec_site_connection'
log = logging.getLogger(__name__ + '.UpdateIPsecSiteConnection')
def add_known_arguments(self, parser):
parser.add_argument(
'--dpd',
metavar="action=ACTION,interval=INTERVAL,timeout=TIMEOUT",
type=utils.str2dict,
help=vpn_utils.dpd_help("IPsec Connection"))
def args2body(self, parsed_args):
body = {'ipsec_site_connection': {
}, }
if parsed_args.dpd:
vpn_utils.validate_dpd_dict(parsed_args.dpd)
body['ipsec_site_connection'].update({'dpd': parsed_args.dpd})
return body
class DeleteIPsecSiteConnection(neutronv20.DeleteCommand):
"""Delete a given IPsecSiteConnection."""
resource = 'ipsec_site_connection'
log = logging.getLogger(__name__ + '.DeleteIPsecSiteConnection')

View File

@@ -0,0 +1,138 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Swaminathan Vasudevan, Hewlett-Packard.
import logging
from neutronclient.common import utils
from neutronclient.neutron import v2_0 as neutronv20
from neutronclient.neutron.v2_0.vpn import utils as vpn_utils
class ListIPsecPolicy(neutronv20.ListCommand):
"""List ipsecpolicies that belongs to a given tenant connection."""
resource = 'ipsecpolicy'
log = logging.getLogger(__name__ + '.ListIPsecPolicy')
list_columns = ['id', 'name', 'auth_algorithm',
'encryption_algorithm', 'pfs']
_formatters = {}
pagination_support = True
sorting_support = True
class ShowIPsecPolicy(neutronv20.ShowCommand):
"""Show information of a given ipsecpolicy."""
resource = 'ipsecpolicy'
log = logging.getLogger(__name__ + '.ShowIPsecPolicy')
class CreateIPsecPolicy(neutronv20.CreateCommand):
"""Create an ipsecpolicy."""
resource = 'ipsecpolicy'
log = logging.getLogger(__name__ + '.CreateIPsecPolicy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help='Description of the IPsecPolicy')
parser.add_argument(
'--transform-protocol',
default='esp', choices=['esp', 'ah', 'ah-esp'],
help='Transform Protocol in lowercase, default:esp')
parser.add_argument(
'--auth-algorithm',
default='sha1', choices=['sha1'],
help='Authentication algorithm in lowercase, default:sha1')
parser.add_argument(
'--encryption-algorithm',
default='aes-128', choices=['3des',
'aes-128',
'aes-192',
'aes-256'],
help='Encryption Algorithm in lowercase, default:aes-128')
parser.add_argument(
'--encapsulation-mode',
default='tunnel', choices=['tunnel', 'transport'],
help='Encapsulation Mode in lowercase, default:tunnel')
parser.add_argument(
'--pfs',
default='group5', choices=['group2', 'group5', 'group14'],
help='Perfect Forward Secrecy in lowercase, default:group5')
parser.add_argument(
'--lifetime',
metavar="units=UNITS,value=VALUE",
type=utils.str2dict,
help=vpn_utils.lifetime_help("IPsec"))
parser.add_argument(
'name', metavar='NAME',
help='Name of the IPsecPolicy')
def args2body(self, parsed_args):
body = {'ipsecpolicy': {
'auth_algorithm': parsed_args.auth_algorithm,
'encryption_algorithm': parsed_args.encryption_algorithm,
'encapsulation_mode': parsed_args.encapsulation_mode,
'transform_protocol': parsed_args.transform_protocol,
'pfs': parsed_args.pfs,
}, }
if parsed_args.name:
body['ipsecpolicy'].update({'name': parsed_args.name})
if parsed_args.description:
body['ipsecpolicy'].update(
{'description': parsed_args.description}
)
if parsed_args.tenant_id:
body['ipsecpolicy'].update({'tenant_id': parsed_args.tenant_id})
if parsed_args.lifetime:
vpn_utils.validate_lifetime_dict(parsed_args.lifetime)
body['ipsecpolicy'].update({'lifetime': parsed_args.lifetime})
return body
class UpdateIPsecPolicy(neutronv20.UpdateCommand):
"""Update a given ipsec policy."""
resource = 'ipsecpolicy'
log = logging.getLogger(__name__ + '.UpdateIPsecPolicy')
def add_known_arguments(self, parser):
parser.add_argument(
'--lifetime',
metavar="units=UNITS,value=VALUE",
type=utils.str2dict,
help=vpn_utils.lifetime_help("IPsec"))
def args2body(self, parsed_args):
body = {'ipsecpolicy': {
}, }
if parsed_args.lifetime:
vpn_utils.validate_lifetime_dict(parsed_args.lifetime)
body['ipsecpolicy'].update({'lifetime': parsed_args.lifetime})
return body
class DeleteIPsecPolicy(neutronv20.DeleteCommand):
"""Delete a given ipsecpolicy."""
resource = 'ipsecpolicy'
log = logging.getLogger(__name__ + '.DeleteIPsecPolicy')

View File

@@ -0,0 +1,109 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Swaminathan Vasudevan, Hewlett-Packard.
#
"""VPN Utilities and helper functions."""
from neutronclient.openstack.common.gettextutils import _
dpd_supported_actions = ['hold', 'clear', 'restart',
'restart-by-peer', 'disabled']
dpd_supported_keys = ['action', 'interval', 'timeout']
lifetime_keys = ['units', 'value']
lifetime_units = ['seconds']
def validate_dpd_dict(dpd_dict):
for key, value in dpd_dict.items():
if key not in dpd_supported_keys:
message = _(
"DPD Dictionary KeyError: "
"Reason-Invalid DPD key : "
"'%(key)s' not in %(supported_key)s ") % {
'key': key, 'supported_key': dpd_supported_keys}
raise KeyError(message)
if key == 'action' and value not in dpd_supported_actions:
message = _(
"DPD Dictionary ValueError: "
"Reason-Invalid DPD action : "
"'%(key_value)s' not in %(supported_action)s ") % {
'key_value': value,
'supported_action': dpd_supported_actions}
raise ValueError(message)
if key in ('interval', 'timeout'):
if int(value) <= 0:
message = _(
"DPD Dictionary ValueError: "
"Reason-Invalid positive integer value: "
"'%(key)s' = %(value)i ") % {
'key': key, 'value': int(value)}
raise ValueError(message)
else:
dpd_dict[key] = int(value)
return
def validate_lifetime_dict(lifetime_dict):
for key, value in lifetime_dict.items():
if key not in lifetime_keys:
message = _(
"Lifetime Dictionary KeyError: "
"Reason-Invalid unit key : "
"'%(key)s' not in %(supported_key)s ") % {
'key': key, 'supported_key': lifetime_keys}
raise KeyError(message)
if key == 'units' and value not in lifetime_units:
message = _(
"Lifetime Dictionary ValueError: "
"Reason-Invalid units : "
"'%(key_value)s' not in %(supported_units)s ") % {
'key_value': key, 'supported_units': lifetime_units}
raise ValueError(message)
if key == 'value':
if int(value) < 60:
message = _(
"Lifetime Dictionary ValueError: "
"Reason-Invalid value should be at least 60:"
"'%(key_value)s' = %(value)i ") % {
'key_value': key, 'value': int(value)}
raise ValueError(str(message))
else:
lifetime_dict['value'] = int(value)
return
def lifetime_help(policy):
lifetime = ("%s Lifetime Attributes."
"'units'-seconds,default:seconds. "
"'value'-non negative integer, default:3600." % policy)
return lifetime
def dpd_help(policy):
dpd = (" %s Dead Peer Detection Attributes. "
" 'action'-hold,clear,disabled,restart,restart-by-peer."
" 'interval' and 'timeout' are non negative integers. "
" 'interval' should be less than 'timeout' value. "
" 'action', default:hold 'interval', default:30, "
" 'timeout', default:120." % policy)
return dpd

View File

@@ -0,0 +1,98 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Swaminathan Vasudevan, Hewlett-Packard.
#
import logging
from neutronclient.neutron import v2_0 as neutronv20
class ListVPNService(neutronv20.ListCommand):
"""List VPNService configurations that belong to a given tenant."""
resource = 'vpnservice'
log = logging.getLogger(__name__ + '.ListVPNService')
list_columns = [
'id', 'name', 'router_id', 'status'
]
_formatters = {}
pagination_support = True
sorting_support = True
class ShowVPNService(neutronv20.ShowCommand):
"""Show information of a given VPNService."""
resource = 'vpnservice'
log = logging.getLogger(__name__ + '.ShowVPNService')
class CreateVPNService(neutronv20.CreateCommand):
"""Create a VPNService."""
resource = 'vpnservice'
log = logging.getLogger(__name__ + '.CreateVPNService')
def add_known_arguments(self, parser):
parser.add_argument(
'--admin-state-down',
dest='admin_state', action='store_false',
help='set admin state up to false')
parser.add_argument(
'--name',
help='Set a name for the vpnservice')
parser.add_argument(
'--description',
help='Set a description for the vpnservice')
parser.add_argument(
'router', metavar='ROUTER',
help='Router unique identifier for the vpnservice')
parser.add_argument(
'subnet', metavar='SUBNET',
help='Subnet unique identifier for the vpnservice deployment')
def args2body(self, parsed_args):
_subnet_id = neutronv20.find_resourceid_by_name_or_id(
self.get_client(), 'subnet',
parsed_args.subnet)
_router_id = neutronv20.find_resourceid_by_name_or_id(
self.get_client(), 'router',
parsed_args.router)
body = {self.resource: {'subnet_id': _subnet_id,
'router_id': _router_id,
'admin_state_up': parsed_args.admin_state}, }
neutronv20.update_dict(parsed_args, body[self.resource],
['name', 'description',
'tenant_id'])
return body
class UpdateVPNService(neutronv20.UpdateCommand):
"""Update a given VPNService."""
resource = 'vpnservice'
log = logging.getLogger(__name__ + '.UpdateVPNService')
class DeleteVPNService(neutronv20.DeleteCommand):
"""Delete a given VPNService."""
resource = 'vpnservice'
log = logging.getLogger(__name__ + '.DeleteVPNService')

View File

@@ -53,6 +53,10 @@ from neutronclient.neutron.v2_0 import router
from neutronclient.neutron.v2_0 import securitygroup
from neutronclient.neutron.v2_0 import servicetype
from neutronclient.neutron.v2_0 import subnet
from neutronclient.neutron.v2_0.vpn import ikepolicy
from neutronclient.neutron.v2_0.vpn import ipsec_site_connection
from neutronclient.neutron.v2_0.vpn import ipsecpolicy
from neutronclient.neutron.v2_0.vpn import vpnservice
from neutronclient.openstack.common import strutils
from neutronclient.version import __version__
@@ -75,7 +79,7 @@ def run_command(cmd, cmd_parser, sub_argv):
def env(*_vars, **kwargs):
"""Search for the first defined of possibly many env vars
"""Search for the first defined of possibly many env vars.
Returns the first environment variable defined in vars, or
returns the default defined in kwargs.
@@ -216,6 +220,36 @@ COMMAND_V2 = {
'cisco-policy-profile-list': policyprofile.ListPolicyProfile,
'cisco-policy-profile-show': policyprofile.ShowPolicyProfile,
'cisco-policy-profile-update': policyprofile.UpdatePolicyProfile,
'ipsec-site-connection-list': (
ipsec_site_connection.ListIPsecSiteConnection
),
'ipsec-site-connection-show': (
ipsec_site_connection.ShowIPsecSiteConnection
),
'ipsec-site-connection-create': (
ipsec_site_connection.CreateIPsecSiteConnection
),
'ipsec-site-connection-update': (
ipsec_site_connection.UpdateIPsecSiteConnection
),
'ipsec-site-connection-delete': (
ipsec_site_connection.DeleteIPsecSiteConnection
),
'vpn-service-list': vpnservice.ListVPNService,
'vpn-service-show': vpnservice.ShowVPNService,
'vpn-service-create': vpnservice.CreateVPNService,
'vpn-service-update': vpnservice.UpdateVPNService,
'vpn-service-delete': vpnservice.DeleteVPNService,
'vpn-ipsecpolicy-list': ipsecpolicy.ListIPsecPolicy,
'vpn-ipsecpolicy-show': ipsecpolicy.ShowIPsecPolicy,
'vpn-ipsecpolicy-create': ipsecpolicy.CreateIPsecPolicy,
'vpn-ipsecpolicy-update': ipsecpolicy.UpdateIPsecPolicy,
'vpn-ipsecpolicy-delete': ipsecpolicy.DeleteIPsecPolicy,
'vpn-ikepolicy-list': ikepolicy.ListIKEPolicy,
'vpn-ikepolicy-show': ikepolicy.ShowIKEPolicy,
'vpn-ikepolicy-create': ikepolicy.CreateIKEPolicy,
'vpn-ikepolicy-update': ikepolicy.UpdateIKEPolicy,
'vpn-ikepolicy-delete': ikepolicy.DeleteIKEPolicy,
}
COMMANDS = {'2.0': COMMAND_V2}
@@ -594,8 +628,7 @@ class NeutronShell(app.App):
self.log.debug('got an error: %s', unicode(err))
def configure_logging(self):
"""Create logging handlers for any log output.
"""
"""Create logging handlers for any log output."""
root_logger = logging.getLogger('')
# Set up logging to a file

View File

@@ -188,7 +188,8 @@ class CLITestV20Base(testtools.TestCase):
non_admin_status_resources = ['subnet', 'floatingip', 'security_group',
'security_group_rule', 'qos_queue',
'network_gateway', 'credential',
'network_profile', 'policy_profile']
'network_profile', 'policy_profile',
'ikepolicy', 'ipsecpolicy']
if (resource in non_admin_status_resources):
body = {resource: {}, }
else:
@@ -215,9 +216,13 @@ class CLITestV20Base(testtools.TestCase):
resource_plural = neutronV2_0._get_resource_plural(resource,
self.client)
path = getattr(self.client, resource_plural + "_path")
if self.format == 'json':
mox_body = MyComparator(body, self.client)
else:
mox_body = self.client.serialize(body)
self.client.httpclient.request(
end_url(path, format=self.format), 'POST',
body=MyComparator(body, self.client),
body=mox_body,
headers=mox.ContainsKeyValue(
'X-Auth-Token', TOKEN)).AndReturn((MyResp(200), resstr))
args.extend(['--request-format', self.format])

View File

@@ -165,6 +165,14 @@ class Client(object):
security_group_path = "/security-groups/%s"
security_group_rules_path = "/security-group-rules"
security_group_rule_path = "/security-group-rules/%s"
vpnservices_path = "/vpn/vpnservices"
vpnservice_path = "/vpn/vpnservices/%s"
ipsecpolicies_path = "/vpn/ipsecpolicies"
ipsecpolicy_path = "/vpn/ipsecpolicies/%s"
ikepolicies_path = "/vpn/ikepolicies"
ikepolicy_path = "/vpn/ikepolicies/%s"
ipsec_site_connections_path = "/vpn/ipsec-site-connections"
ipsec_site_connection_path = "/vpn/ipsec-site-connections/%s"
vips_path = "/lb/vips"
vip_path = "/lb/vips/%s"
pools_path = "/lb/pools"
@@ -215,6 +223,10 @@ class Client(object):
'service_definitions': 'service_definition',
'security_groups': 'security_group',
'security_group_rules': 'security_group_rule',
'ipsecpolicies': 'ipsecpolicy',
'ikepolicies': 'ikepolicy',
'ipsec_site_connections': 'ipsec_site_connection',
'vpnservices': 'vpnservice',
'vips': 'vip',
'pools': 'pool',
'members': 'member',
@@ -488,6 +500,118 @@ class Client(object):
return self.get(self.security_group_rule_path % (security_group_rule),
params=_params)
@APIParamsCall
def list_vpnservices(self, retrieve_all=True, **_params):
"""Fetches a list of all configured VPNServices for a tenant."""
return self.list('vpnservices', self.vpnservices_path, retrieve_all,
**_params)
@APIParamsCall
def show_vpnservice(self, vpnservice, **_params):
"""Fetches information of a specific VPNService."""
return self.get(self.vpnservice_path % (vpnservice), params=_params)
@APIParamsCall
def create_vpnservice(self, body=None):
"""Creates a new VPNService."""
return self.post(self.vpnservices_path, body=body)
@APIParamsCall
def update_vpnservice(self, vpnservice, body=None):
"""Updates a VPNService."""
return self.put(self.vpnservice_path % (vpnservice), body=body)
@APIParamsCall
def delete_vpnservice(self, vpnservice):
"""Deletes the specified VPNService."""
return self.delete(self.vpnservice_path % (vpnservice))
@APIParamsCall
def list_ipsec_site_connections(self, retrieve_all=True, **_params):
"""Fetches all configured IPsecSiteConnections for a tenant."""
return self.list('ipsec_site_connections',
self.ipsec_site_connections_path,
retrieve_all,
**_params)
@APIParamsCall
def show_ipsec_site_connection(self, ipsecsite_conn, **_params):
"""Fetches information of a specific IPsecSiteConnection."""
return self.get(
self.ipsec_site_connection_path % (ipsecsite_conn), params=_params
)
@APIParamsCall
def create_ipsec_site_connection(self, body=None):
"""Creates a new IPsecSiteConnection."""
return self.post(self.ipsec_site_connections_path, body=body)
@APIParamsCall
def update_ipsec_site_connection(self, ipsecsite_conn, body=None):
"""Updates an IPsecSiteConnection."""
return self.put(
self.ipsec_site_connection_path % (ipsecsite_conn), body=body
)
@APIParamsCall
def delete_ipsec_site_connection(self, ipsecsite_conn):
"""Deletes the specified IPsecSiteConnection."""
return self.delete(self.ipsec_site_connection_path % (ipsecsite_conn))
@APIParamsCall
def list_ikepolicies(self, retrieve_all=True, **_params):
"""Fetches a list of all configured IKEPolicies for a tenant."""
return self.list('ikepolicies', self.ikepolicies_path, retrieve_all,
**_params)
@APIParamsCall
def show_ikepolicy(self, ikepolicy, **_params):
"""Fetches information of a specific IKEPolicy."""
return self.get(self.ikepolicy_path % (ikepolicy), params=_params)
@APIParamsCall
def create_ikepolicy(self, body=None):
"""Creates a new IKEPolicy."""
return self.post(self.ikepolicies_path, body=body)
@APIParamsCall
def update_ikepolicy(self, ikepolicy, body=None):
"""Updates an IKEPolicy."""
return self.put(self.ikepolicy_path % (ikepolicy), body=body)
@APIParamsCall
def delete_ikepolicy(self, ikepolicy):
"""Deletes the specified IKEPolicy."""
return self.delete(self.ikepolicy_path % (ikepolicy))
@APIParamsCall
def list_ipsecpolicies(self, retrieve_all=True, **_params):
"""Fetches a list of all configured IPsecPolicies for a tenant."""
return self.list('ipsecpolicies',
self.ipsecpolicies_path,
retrieve_all,
**_params)
@APIParamsCall
def show_ipsecpolicy(self, ipsecpolicy, **_params):
"""Fetches information of a specific IPsecPolicy."""
return self.get(self.ipsecpolicy_path % (ipsecpolicy), params=_params)
@APIParamsCall
def create_ipsecpolicy(self, body=None):
"""Creates a new IPsecPolicy."""
return self.post(self.ipsecpolicies_path, body=body)
@APIParamsCall
def update_ipsecpolicy(self, ipsecpolicy, body=None):
"""Updates an IPsecPolicy."""
return self.put(self.ipsecpolicy_path % (ipsecpolicy), body=body)
@APIParamsCall
def delete_ipsecpolicy(self, ipsecpolicy):
"""Deletes the specified IPsecPolicy."""
return self.delete(self.ipsecpolicy_path % (ipsecpolicy))
@APIParamsCall
def list_vips(self, retrieve_all=True, **_params):
"""Fetches a list of all load balancer vips for a tenant."""

View File

@@ -0,0 +1,18 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Swaminathan Vasudevan, Hewlett-Packard.

View File

@@ -0,0 +1,214 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Swaminathan Vasudevan, Hewlett Packard.
import sys
from neutronclient.neutron.v2_0.vpn import ikepolicy
from tests.unit import test_cli20
class CLITestV20VpnIkePolicyJSON(test_cli20.CLITestV20Base):
def test_create_ikepolicy_all_params(self):
"""vpn-ikepolicy-create all params."""
resource = 'ikepolicy'
cmd = ikepolicy.CreateIKEPolicy(test_cli20.MyApp(sys.stdout), None)
name = 'ikepolicy1'
description = 'my-ike-policy'
auth_algorithm = 'sha1'
encryption_algorithm = 'aes-256'
ike_version = 'v1'
phase1_negotiation_mode = 'main'
pfs = 'group5'
tenant_id = 'my-tenant'
my_id = 'my-id'
lifetime = 'units=seconds,value=20000'
args = [name,
'--description', description,
'--tenant-id', tenant_id,
'--auth-algorithm', auth_algorithm,
'--encryption-algorithm', encryption_algorithm,
'--ike-version', ike_version,
'--phase1-negotiation-mode', phase1_negotiation_mode,
'--lifetime', lifetime,
'--pfs', pfs]
position_names = ['name', 'description',
'auth_algorithm', 'encryption_algorithm',
'phase1_negotiation_mode',
'ike_version', 'pfs',
'tenant_id']
position_values = [name, description,
auth_algorithm, encryption_algorithm,
phase1_negotiation_mode, ike_version, pfs,
tenant_id]
extra_body = {
'lifetime': {
'units': 'seconds',
'value': 20000,
},
}
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values,
extra_body=extra_body)
def test_create_ikepolicy_with_limited_params(self):
"""vpn-ikepolicy-create with limited params."""
resource = 'ikepolicy'
cmd = ikepolicy.CreateIKEPolicy(test_cli20.MyApp(sys.stdout), None)
name = 'ikepolicy1'
auth_algorithm = 'sha1'
encryption_algorithm = 'aes-128'
ike_version = 'v1'
phase1_negotiation_mode = 'main'
pfs = 'group5'
tenant_id = 'my-tenant'
my_id = 'my-id'
args = [name,
'--tenant-id', tenant_id]
position_names = ['name',
'auth_algorithm', 'encryption_algorithm',
'phase1_negotiation_mode',
'ike_version', 'pfs',
'tenant_id']
position_values = [name,
auth_algorithm, encryption_algorithm,
phase1_negotiation_mode,
ike_version, pfs,
tenant_id]
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values)
def _test_lifetime_values(self, lifetime):
resource = 'ikepolicy'
cmd = ikepolicy.CreateIKEPolicy(test_cli20.MyApp(sys.stdout), None)
name = 'ikepolicy1'
description = 'my-ike-policy'
auth_algorithm = 'sha1'
encryption_algorithm = 'aes-256'
ike_version = 'v1'
phase1_negotiation_mode = 'main'
pfs = 'group5'
tenant_id = 'my-tenant'
my_id = 'my-id'
args = [name,
'--description', description,
'--tenant-id', tenant_id,
'--auth-algorithm', auth_algorithm,
'--encryption-algorithm', encryption_algorithm,
'--ike-version', ike_version,
'--phase1-negotiation-mode', phase1_negotiation_mode,
'--lifetime', lifetime,
'--pfs', pfs]
position_names = ['name', 'description',
'auth_algorithm', 'encryption_algorithm',
'phase1_negotiation_mode',
'ike_version', 'pfs',
'tenant_id']
position_values = [name, description,
auth_algorithm, encryption_algorithm,
phase1_negotiation_mode, ike_version, pfs,
tenant_id]
try:
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values)
except Exception:
return
self.fail("IKEPolicy Lifetime Error")
def test_create_ikepolicy_with_invalid_lifetime_keys(self):
lifetime = 'uts=seconds,val=20000'
self._test_lifetime_values(lifetime)
def test_create_ikepolicy_with_invalid_lifetime_value(self):
lifetime = 'units=seconds,value=-1'
self._test_lifetime_values(lifetime)
def test_list_ikepolicy(self):
"""vpn-ikepolicy-list."""
resources = "ikepolicies"
cmd = ikepolicy.ListIKEPolicy(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, True)
def test_list_ikepolicy_pagination(self):
"""vpn-ikepolicy-list."""
resources = "ikepolicies"
cmd = ikepolicy.ListIKEPolicy(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources_with_pagination(resources, cmd)
def test_list_ikepolicy_sort(self):
"""vpn-ikepolicy-list --sort-key name --sort-key id --sort-key asc
--sort-key desc
"""
resources = "ikepolicies"
cmd = ikepolicy.ListIKEPolicy(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd,
sort_key=["name", "id"],
sort_dir=["asc", "desc"])
def test_list_ikepolicy_limit(self):
"""vpn-ikepolicy-list -P."""
resources = "ikepolicies"
cmd = ikepolicy.ListIKEPolicy(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, page_size=1000)
def test_show_ikepolicy_id(self):
"""vpn-ikepolicy-show ikepolicy_id."""
resource = 'ikepolicy'
cmd = ikepolicy.ShowIKEPolicy(test_cli20.MyApp(sys.stdout), None)
args = ['--fields', 'id', self.test_id]
self._test_show_resource(resource, cmd, self.test_id, args, ['id'])
def test_show_ikepolicy_id_name(self):
"""vpn-ikepolicy-show."""
resource = 'ikepolicy'
cmd = ikepolicy.ShowIKEPolicy(test_cli20.MyApp(sys.stdout), None)
args = ['--fields', 'id', '--fields', 'name', self.test_id]
self._test_show_resource(resource, cmd, self.test_id,
args, ['id', 'name'])
def test_update_ikepolicy(self):
"""vpn-ikepolicy-update myid --name newname --tags a b."""
resource = 'ikepolicy'
cmd = ikepolicy.UpdateIKEPolicy(test_cli20.MyApp(sys.stdout), None)
self._test_update_resource(resource, cmd, 'myid',
['myid', '--name', 'newname'],
{'name': 'newname', })
def test_delete_ikepolicy(self):
"""vpn-ikepolicy-delete my-id."""
resource = 'ikepolicy'
cmd = ikepolicy.DeleteIKEPolicy(test_cli20.MyApp(sys.stdout), None)
my_id = 'my-id'
args = [my_id]
self._test_delete_resource(resource, cmd, my_id, args)
class CLITestV20VpnIkePolicyXML(CLITestV20VpnIkePolicyJSON):
format = 'xml'

View File

@@ -0,0 +1,312 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Swaminathan Vasudevan, Hewlett Packard.
import sys
from neutronclient.neutron.v2_0.vpn import ipsec_site_connection
from tests.unit import test_cli20
class CLITestV20IPsecSiteConnectionJSON(test_cli20.CLITestV20Base):
def test_create_ipsec_site_connection_all_params(self):
"""ipsecsite-connection-create all params."""
resource = 'ipsec_site_connection'
cmd = ipsec_site_connection.CreateIPsecSiteConnection(
test_cli20.MyApp(sys.stdout), None
)
tenant_id = 'mytenant_id'
name = 'connection1'
my_id = 'my_id'
peer_address = '192.168.2.10'
peer_id = '192.168.2.10'
psk = 'abcd'
mtu = '1500'
initiator = 'bi-directional'
vpnservice_id = 'vpnservice_id'
ikepolicy_id = 'ikepolicy_id'
ipsecpolicy_id = 'ipsecpolicy_id'
peer_cidrs = ['192.168.3.0/24', '192.168.2.0/24']
admin_state = True
description = 'my-vpn-connection'
dpd = 'action=restart,interval=30,timeout=120'
args = ['--tenant-id', tenant_id,
'--peer-address', peer_address, '--peer-id', peer_id,
'--psk', psk, '--initiator', initiator,
'--vpnservice-id', vpnservice_id,
'--ikepolicy-id', ikepolicy_id, '--name', name,
'--ipsecpolicy-id', ipsecpolicy_id, '--mtu', mtu,
'--description', description,
'--peer-cidr', '192.168.3.0/24',
'--peer-cidr', '192.168.2.0/24',
'--dpd', dpd]
position_names = ['name', 'tenant_id', 'admin_state_up',
'peer_address', 'peer_id', 'peer_cidrs',
'psk', 'mtu', 'initiator', 'description',
'vpnservice_id', 'ikepolicy_id',
'ipsecpolicy_id']
position_values = [name, tenant_id, admin_state, peer_address,
peer_id, peer_cidrs, psk, mtu,
initiator, description,
vpnservice_id, ikepolicy_id, ipsecpolicy_id]
extra_body = {
'dpd': {
'action': 'restart',
'interval': 30,
'timeout': 120,
},
}
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values,
extra_body=extra_body)
def test_create_ipsec_site_connection_with_limited_params(self):
"""ipsecsite-connection-create with limited params."""
resource = 'ipsec_site_connection'
cmd = ipsec_site_connection.CreateIPsecSiteConnection(
test_cli20.MyApp(sys.stdout), None
)
tenant_id = 'mytenant_id'
my_id = 'my_id'
peer_address = '192.168.2.10'
peer_id = '192.168.2.10'
psk = 'abcd'
mtu = '1500'
initiator = 'bi-directional'
vpnservice_id = 'vpnservice_id'
ikepolicy_id = 'ikepolicy_id'
ipsecpolicy_id = 'ipsecpolicy_id'
peer_cidrs = ['192.168.3.0/24', '192.168.2.0/24']
admin_state = True
args = ['--tenant-id', tenant_id,
'--peer-address', peer_address,
'--peer-id', peer_id,
'--psk', psk,
'--vpnservice-id', vpnservice_id,
'--ikepolicy-id', ikepolicy_id,
'--ipsecpolicy-id', ipsecpolicy_id,
'--peer-cidr', '192.168.3.0/24',
'--peer-cidr', '192.168.2.0/24']
position_names = ['tenant_id', 'admin_state_up',
'peer_address', 'peer_id', 'peer_cidrs',
'psk', 'mtu', 'initiator',
'vpnservice_id', 'ikepolicy_id',
'ipsecpolicy_id']
position_values = [tenant_id, admin_state, peer_address,
peer_id, peer_cidrs, psk, mtu,
initiator,
vpnservice_id, ikepolicy_id, ipsecpolicy_id]
self._test_create_resource(resource, cmd, None, my_id, args,
position_names, position_values)
def _test_dpd_values(self, dpd):
"""ipsecsite-connection-create with invalid dpd values."""
resource = 'ipsec_site_connection'
cmd = ipsec_site_connection.CreateIPsecSiteConnection(
test_cli20.MyApp(sys.stdout), None
)
tenant_id = 'mytenant_id'
name = 'connection1'
my_id = 'my_id'
peer_address = '192.168.2.10'
peer_id = '192.168.2.10'
psk = 'abcd'
mtu = '1500'
initiator = 'bi-directional'
vpnservice_id = 'vpnservice_id'
ikepolicy_id = 'ikepolicy_id'
ipsecpolicy_id = 'ipsecpolicy_id'
peer_cidrs = ['192.168.3.0/24', '192.168.2.0/24']
admin_state = True
description = 'my-vpn-connection'
args = ['--tenant-id', tenant_id,
'--peer-address', peer_address, '--peer-id', peer_id,
'--psk', psk, '--initiator', initiator,
'--vpnservice-id', vpnservice_id,
'--ikepolicy-id', ikepolicy_id, '--name', name,
'--ipsecpolicy-id', ipsecpolicy_id, '--mtu', mtu,
'--description', description,
'--peer-cidr', '192.168.3.0/24',
'--peer-cidr', '192.168.2.0/24',
'--dpd', dpd]
position_names = ['name', 'tenant_id', 'admin_state_up',
'peer_address', 'peer_id', 'peer_cidrs',
'psk', 'mtu', 'initiator', 'description',
'vpnservice_id', 'ikepolicy_id',
'ipsecpolicy_id']
position_values = [name, tenant_id, admin_state, peer_address,
peer_id, peer_cidrs, psk, mtu,
initiator, description,
vpnservice_id, ikepolicy_id, ipsecpolicy_id]
self.assertRaises(
Exception,
self._test_create_resource,
resource, cmd, name, my_id, args,
position_names, position_values)
def test_invalid_mtu(self):
"""ipsecsite-connection-create with invalid dpd values."""
resource = 'ipsec_site_connection'
cmd = ipsec_site_connection.CreateIPsecSiteConnection(
test_cli20.MyApp(sys.stdout), None
)
tenant_id = 'mytenant_id'
name = 'connection1'
my_id = 'my_id'
peer_address = '192.168.2.10'
peer_id = '192.168.2.10'
psk = 'abcd'
mtu = '67'
initiator = 'bi-directional'
vpnservice_id = 'vpnservice_id'
ikepolicy_id = 'ikepolicy_id'
ipsecpolicy_id = 'ipsecpolicy_id'
peer_cidrs = ['192.168.3.0/24', '192.168.2.0/24']
admin_state = True
description = 'my-vpn-connection'
args = ['--tenant-id', tenant_id,
'--peer-address', peer_address, '--peer-id', peer_id,
'--psk', psk, '--initiator', initiator,
'--vpnservice-id', vpnservice_id,
'--ikepolicy-id', ikepolicy_id, '--name', name,
'--ipsecpolicy-id', ipsecpolicy_id, '--mtu', mtu,
'--description', description,
'--peer-cidr', '192.168.3.0/24',
'--peer-cidr', '192.168.2.0/24']
position_names = ['name', 'tenant_id', 'admin_state_up',
'peer_address', 'peer_id', 'peer_cidrs',
'psk', 'mtu', 'initiator', 'description',
'vpnservice_id', 'ikepolicy_id',
'ipsecpolicy_id']
position_values = [name, tenant_id, admin_state, peer_address,
peer_id, peer_cidrs, psk, mtu,
initiator, description,
vpnservice_id, ikepolicy_id, ipsecpolicy_id]
self.assertRaises(
Exception,
self._test_create_resource,
resource, cmd, name, my_id, args,
position_names, position_values)
def test_create_ipsec_site_connection_with_invalid_dpd_keys(self):
dpd = 'act=restart,interval=30,time=120'
self._test_dpd_values(dpd)
def test_create_ipsec_site_connection_with_invalid_dpd_values(self):
dpd = 'action=hold,interval=30,timeout=-1'
self._test_dpd_values(dpd)
def test_create_ipsec_site_connection_dpd_timeout_less_than_interval(self):
dpd = 'action=restart,interval=30,timeout=10'
self._test_dpd_values(dpd)
def test_list_ipsec_site_connection(self):
"""ipsecsite-connection-list."""
resources = "ipsec_site_connections"
cmd = ipsec_site_connection.ListIPsecSiteConnection(
test_cli20.MyApp(sys.stdout), None
)
self._test_list_resources(resources, cmd, True)
def test_list_ipsec_site_connection_pagination(self):
"""ipsecsite-connection-list."""
resources = "ipsec_site_connections"
cmd = ipsec_site_connection.ListIPsecSiteConnection(
test_cli20.MyApp(sys.stdout), None
)
self._test_list_resources_with_pagination(resources, cmd)
def test_list_ipsec_site_connection_sort(self):
"""ipsecsite-connection-list.
--sort-key name --sort-key id --sort-key asc --sort-key desc
"""
resources = "ipsec_site_connections"
cmd = ipsec_site_connection.ListIPsecSiteConnection(
test_cli20.MyApp(sys.stdout), None
)
self._test_list_resources(resources, cmd,
sort_key=["name", "id"],
sort_dir=["asc", "desc"])
def test_list_ipsec_site_connection_limit(self):
"""ipsecsite-connection-list -P."""
resources = "ipsec_site_connections"
cmd = ipsec_site_connection.ListIPsecSiteConnection(
test_cli20.MyApp(sys.stdout), None
)
self._test_list_resources(resources, cmd, page_size=1000)
def test_delete_ipsec_site_connection(self):
"""ipsecsite-connection-delete my-id."""
resource = 'ipsec_site_connection'
cmd = ipsec_site_connection.DeleteIPsecSiteConnection(
test_cli20.MyApp(sys.stdout), None
)
my_id = 'my-id'
args = [my_id]
self._test_delete_resource(resource, cmd, my_id, args)
def test_update_ipsec_site_connection(self):
"""ipsecsite-connection-update myid --name myname --tags a b."""
resource = 'ipsec_site_connection'
cmd = ipsec_site_connection.UpdateIPsecSiteConnection(
test_cli20.MyApp(sys.stdout), None
)
self._test_update_resource(resource, cmd, 'myid',
['myid', '--name', 'Branch-new',
'--tags', 'a', 'b'],
{'name': 'Branch-new',
'tags': ['a', 'b'], })
def test_show_ipsec_site_connection_id(self):
"""ipsecsite-connection-show test_id."""
resource = 'ipsec_site_connection'
cmd = ipsec_site_connection.ShowIPsecSiteConnection(
test_cli20.MyApp(sys.stdout), None
)
args = ['--fields', 'id', self.test_id]
self._test_show_resource(resource, cmd, self.test_id, args, ['id'])
def test_show_ipsec_site_connection_id_name(self):
"""ipsecsite-connection-show."""
resource = 'ipsec_site_connection'
cmd = ipsec_site_connection.ShowIPsecSiteConnection(
test_cli20.MyApp(sys.stdout), None
)
args = ['--fields', 'id', '--fields', 'name', self.test_id]
self._test_show_resource(resource, cmd, self.test_id,
args, ['id', 'name'])
class CLITestV20IPsecSiteConnectionXML(CLITestV20IPsecSiteConnectionJSON):
format = 'xml'

View File

@@ -0,0 +1,211 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Swaminathan Vasudevan, Hewlett Packard.
import sys
from neutronclient.neutron.v2_0.vpn import ipsecpolicy
from tests.unit import test_cli20
class CLITestV20VpnIpsecPolicyJSON(test_cli20.CLITestV20Base):
def test_create_ipsecpolicy_all_params(self):
"""vpn-ipsecpolicy-create all params with dashes."""
resource = 'ipsecpolicy'
cmd = ipsecpolicy.CreateIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
name = 'ipsecpolicy1'
description = 'first-ipsecpolicy1'
auth_algorithm = 'sha1'
encryption_algorithm = 'aes-256'
encapsulation_mode = 'tunnel'
pfs = 'group5'
transform_protocol = 'ah'
tenant_id = 'my-tenant'
my_id = 'my-id'
lifetime = 'units=seconds,value=20000'
args = [name,
'--description', description,
'--tenant-id', tenant_id,
'--auth-algorithm', auth_algorithm,
'--encryption-algorithm', encryption_algorithm,
'--transform-protocol', transform_protocol,
'--encapsulation-mode', encapsulation_mode,
'--lifetime', lifetime,
'--pfs', pfs]
position_names = ['name', 'auth_algorithm', 'encryption_algorithm',
'encapsulation_mode', 'description',
'transform_protocol', 'pfs',
'tenant_id']
position_values = [name, auth_algorithm, encryption_algorithm,
encapsulation_mode, description,
transform_protocol, pfs,
tenant_id]
extra_body = {
'lifetime': {
'units': 'seconds',
'value': 20000,
},
}
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values,
extra_body=extra_body)
def test_create_ipsecpolicy_with_limited_params(self):
"""vpn-ipsecpolicy-create with limited params."""
resource = 'ipsecpolicy'
cmd = ipsecpolicy.CreateIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
name = 'ipsecpolicy1'
auth_algorithm = 'sha1'
encryption_algorithm = 'aes-128'
encapsulation_mode = 'tunnel'
pfs = 'group5'
transform_protocol = 'esp'
tenant_id = 'my-tenant'
my_id = 'my-id'
args = [name,
'--tenant-id', tenant_id]
position_names = ['name', 'auth_algorithm', 'encryption_algorithm',
'encapsulation_mode',
'transform_protocol', 'pfs',
'tenant_id']
position_values = [name, auth_algorithm, encryption_algorithm,
encapsulation_mode,
transform_protocol, pfs,
tenant_id]
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values)
def _test_lifetime_values(self, lifetime):
resource = 'ipsecpolicy'
cmd = ipsecpolicy.CreateIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
name = 'ipsecpolicy1'
description = 'my-ipsec-policy'
auth_algorithm = 'sha1'
encryption_algorithm = 'aes-256'
ike_version = 'v1'
phase1_negotiation_mode = 'main'
pfs = 'group5'
tenant_id = 'my-tenant'
my_id = 'my-id'
args = [name,
'--description', description,
'--tenant-id', tenant_id,
'--auth-algorithm', auth_algorithm,
'--encryption-algorithm', encryption_algorithm,
'--ike-version', ike_version,
'--phase1-negotiation-mode', phase1_negotiation_mode,
'--lifetime', lifetime,
'--pfs', pfs]
position_names = ['name', 'description',
'auth_algorithm', 'encryption_algorithm',
'phase1_negotiation_mode',
'ike_version', 'pfs',
'tenant_id']
position_values = [name, description,
auth_algorithm, encryption_algorithm,
phase1_negotiation_mode, ike_version, pfs,
tenant_id]
try:
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values)
except Exception:
return
self.fail("IPsecPolicy Lifetime Error")
def test_create_ipsecpolicy_with_invalid_lifetime_keys(self):
lifetime = 'uts=seconds,val=20000'
self._test_lifetime_values(lifetime)
def test_create_ipsecpolicy_with_invalide_lifetime_values(self):
lifetime = 'units=minutes,value=0'
self._test_lifetime_values(lifetime)
def test_list_ipsecpolicy(self):
"""vpn-ipsecpolicy-list."""
resources = "ipsecpolicies"
cmd = ipsecpolicy.ListIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, True)
def test_list_ipsecpolicy_pagination(self):
"""vpn-ipsecpolicy-list."""
resources = "ipsecpolicies"
cmd = ipsecpolicy.ListIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources_with_pagination(resources, cmd)
def test_list_ipsecpolicy_sort(self):
"""vpn-ipsecpolicy-list --sort-key name --sort-key id --sort-key asc
--sort-key desc
"""
resources = "ipsecpolicies"
cmd = ipsecpolicy.ListIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd,
sort_key=["name", "id"],
sort_dir=["asc", "desc"])
def test_list_ipsecpolicy_limit(self):
"""vpn-ipsecpolicy-list -P."""
resources = "ipsecpolicies"
cmd = ipsecpolicy.ListIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, page_size=1000)
def test_show_ipsecpolicy_id(self):
"""vpn-ipsecpolicy-show ipsecpolicy_id."""
resource = 'ipsecpolicy'
cmd = ipsecpolicy.ShowIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
args = ['--fields', 'id', self.test_id]
self._test_show_resource(resource, cmd, self.test_id, args, ['id'])
def test_show_ipsecpolicy_id_name(self):
"""vpn-ipsecpolicy-show."""
resource = 'ipsecpolicy'
cmd = ipsecpolicy.ShowIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
args = ['--fields', 'id', '--fields', 'name', self.test_id]
self._test_show_resource(resource, cmd, self.test_id,
args, ['id', 'name'])
def test_update_ipsecpolicy(self):
"""vpn-ipsecpolicy-update myid --name newname --tags a b."""
resource = 'ipsecpolicy'
cmd = ipsecpolicy.UpdateIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
self._test_update_resource(resource, cmd, 'myid',
['myid', '--name', 'newname'],
{'name': 'newname', })
def test_delete_ipsecpolicy(self):
"""vpn-ipsecpolicy-delete my-id."""
resource = 'ipsecpolicy'
cmd = ipsecpolicy.DeleteIPsecPolicy(test_cli20.MyApp(sys.stdout), None)
my_id = 'my-id'
args = [my_id]
self._test_delete_resource(resource, cmd, my_id, args)
class CLITestV20VpnIpsecPolicyXML(CLITestV20VpnIpsecPolicyJSON):
format = 'xml'

View File

@@ -0,0 +1,140 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Swaminathan Vasudevan, Hewlett Packard.
import sys
from neutronclient.neutron.v2_0.vpn import vpnservice
from tests.unit import test_cli20
class CLITestV20VpnServiceJSON(test_cli20.CLITestV20Base):
def test_create_vpnservice_all_params(self):
"""vpn-service-create all params."""
resource = 'vpnservice'
cmd = vpnservice.CreateVPNService(test_cli20.MyApp(sys.stdout), None)
subnet = 'mysubnet-id'
router = 'myrouter-id'
tenant_id = 'mytenant-id'
my_id = 'my-id'
name = 'myvpnservice'
description = 'my-vpn-service'
admin_state = True
args = ['--name', name,
'--description', description,
router,
subnet,
'--tenant-id', tenant_id]
position_names = ['admin_state_up', 'name', 'description',
'subnet_id', 'router_id',
'tenant_id']
position_values = [admin_state, name, description,
subnet, router, tenant_id]
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values)
def test_create_vpnservice_with_limited_params(self):
"""vpn-service-create with limited params."""
resource = 'vpnservice'
cmd = vpnservice.CreateVPNService(test_cli20.MyApp(sys.stdout), None)
subnet = 'mysubnet-id'
router = 'myrouter-id'
tenant_id = 'mytenant-id'
my_id = 'my-id'
admin_state = True
args = [router,
subnet,
'--tenant-id', tenant_id]
position_names = ['admin_state_up',
'subnet_id', 'router_id',
'tenant_id']
position_values = [admin_state, subnet, router, tenant_id]
self._test_create_resource(resource, cmd, None, my_id, args,
position_names, position_values)
def test_list_vpnservice(self):
"""vpn-service-list."""
resources = "vpnservices"
cmd = vpnservice.ListVPNService(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, True)
def test_list_vpnservice_pagination(self):
"""vpn-service-list."""
resources = "vpnservices"
cmd = vpnservice.ListVPNService(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources_with_pagination(resources, cmd)
def test_list_vpnservice_sort(self):
"""vpn-service-list --sort-key name --sort-key id --sort-key asc
--sort-key desc
"""
resources = "vpnservices"
cmd = vpnservice.ListVPNService(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd,
sort_key=["name", "id"],
sort_dir=["asc", "desc"])
def test_list_vpnservice_limit(self):
"""vpn-service-list -P."""
resources = "vpnservices"
cmd = vpnservice.ListVPNService(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, page_size=1000)
def test_show_vpnservice_id(self):
"""vpn-service-show test_id."""
resource = 'vpnservice'
cmd = vpnservice.ShowVPNService(test_cli20.MyApp(sys.stdout), None)
args = ['--fields', 'id', self.test_id]
self._test_show_resource(resource, cmd, self.test_id, args, ['id'])
def test_show_vpnservice_id_name(self):
"""vpn-service-show."""
resource = 'vpnservice'
cmd = vpnservice.ShowVPNService(test_cli20.MyApp(sys.stdout), None)
args = ['--fields', 'id', '--fields', 'name', self.test_id]
self._test_show_resource(resource, cmd, self.test_id,
args, ['id', 'name'])
def test_update_vpnservice(self):
"""vpn-service-update myid --name newname --tags a b."""
resource = 'vpnservice'
cmd = vpnservice.UpdateVPNService(test_cli20.MyApp(sys.stdout), None)
self._test_update_resource(resource, cmd, 'myid',
['myid', '--name', 'newname'],
{'name': 'newname', })
def test_delete_vpnservice(self):
"""vpn-service-delete my-id."""
resource = 'vpnservice'
cmd = vpnservice.DeleteVPNService(test_cli20.MyApp(sys.stdout), None)
my_id = 'my-id'
args = [my_id]
self._test_delete_resource(resource, cmd, my_id, args)
class CLITestV20VpnServiceXML(CLITestV20VpnServiceJSON):
format = 'xml'

View File

@@ -0,0 +1,132 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Swaminathan Vasudevan, Hewlett Packard.
import testtools
from neutronclient.common import utils
from neutronclient.neutron.v2_0.vpn import utils as vpn_utils
class TestVPNUtils(testtools.TestCase):
def test_validate_lifetime_dictionary_seconds(self):
input_str = utils.str2dict("units=seconds,value=3600")
self.assertIsNone(vpn_utils.validate_lifetime_dict(input_str))
def test_validate_dpd_dictionary_action_hold(self):
input_str = utils.str2dict("action=hold,interval=30,timeout=120")
self.assertIsNone(vpn_utils.validate_dpd_dict(input_str))
def test_validate_dpd_dictionary_action_restart(self):
input_str = utils.str2dict("action=restart,interval=30,timeout=120")
self.assertIsNone(vpn_utils.validate_dpd_dict(input_str))
def test_validate_dpd_dictionary_action_restart_by_peer(self):
input_str = utils.str2dict(
"action=restart-by-peer,interval=30,timeout=120"
)
self.assertIsNone(vpn_utils.validate_dpd_dict(input_str))
def test_validate_dpd_dictionary_action_clear(self):
input_str = utils.str2dict('action=clear,interval=30,timeout=120')
self.assertIsNone(vpn_utils.validate_dpd_dict(input_str))
def test_validate_dpd_dictionary_action_disabled(self):
input_str = utils.str2dict('action=disabled,interval=30,timeout=120')
self.assertIsNone(vpn_utils.validate_dpd_dict(input_str))
def test_validate_lifetime_dictionary_invalid_unit_key(self):
input_str = utils.str2dict('ut=seconds,value=3600')
self._test_validate_lifetime_negative_test_case(input_str)
def test_validate_lifetime_dictionary_invalid_unit_key_value(self):
input_str = utils.str2dict('units=seconds,val=3600')
self._test_validate_lifetime_negative_test_case(input_str)
def test_validate_lifetime_dictionary_unsupported_units(self):
input_str = utils.str2dict('units=minutes,value=3600')
self._test_validate_lifetime_negative_test_case(input_str)
def test_validate_lifetime_dictionary_invalid_empty_unit(self):
input_str = utils.str2dict('units=,value=3600')
self._test_validate_lifetime_negative_test_case(input_str)
def test_validate_lifetime_dictionary_under_minimum_integer_value(self):
input_str = utils.str2dict('units=seconds,value=59')
self._test_validate_lifetime_negative_test_case(input_str)
def test_validate_lifetime_dictionary_negative_integer_value(self):
input_str = utils.str2dict('units=seconds,value=-1')
self._test_validate_lifetime_negative_test_case(input_str)
def test_validate_lifetime_dictionary_empty_value(self):
input_str = utils.str2dict('units=seconds,value=')
self._test_validate_lifetime_negative_test_case(input_str)
def test_validate_dpd_dictionary_invalid_key_action(self):
input_str = utils.str2dict('act=hold,interval=30,timeout=120')
self._test_validate_dpd_negative_test_case(input_str)
def test_validate_dpd_dictionary_invalid_key_interval(self):
input_str = utils.str2dict('action=hold,int=30,timeout=120')
self._test_validate_dpd_negative_test_case(input_str)
def test_validate_dpd_dictionary_invalid_key_timeout(self):
input_str = utils.str2dict('action=hold,interval=30,tiut=120')
self._test_validate_dpd_negative_test_case(input_str)
def test_validate_dpd_dictionary_unsupported_action(self):
input_str = utils.str2dict('action=bye-bye,interval=30,timeout=120')
self._test_validate_dpd_negative_test_case(input_str)
def test_validate_dpd_dictionary_empty_action(self):
input_str = utils.str2dict('action=,interval=30,timeout=120')
self._test_validate_dpd_negative_test_case(input_str)
def test_validate_dpd_dictionary_empty_interval(self):
input_str = utils.str2dict('action=hold,interval=,timeout=120')
self._test_validate_dpd_negative_test_case(input_str)
def test_validate_dpd_dictionary_negative_interval_value(self):
input_str = utils.str2dict('action=hold,interval=-1,timeout=120')
self._test_validate_lifetime_negative_test_case(input_str)
def test_validate_dpd_dictionary_zero_timeout(self):
input_str = utils.str2dict('action=hold,interval=30,timeout=0')
self._test_validate_dpd_negative_test_case(input_str)
def test_validate_dpd_dictionary_empty_timeout(self):
input_str = utils.str2dict('action=hold,interval=30,timeout=')
self._test_validate_dpd_negative_test_case(input_str)
def test_validate_dpd_dictionary_negative_timeout_value(self):
input_str = utils.str2dict('action=hold,interval=30,timeout=-1')
self._test_validate_lifetime_negative_test_case(input_str)
def _test_validate_lifetime_negative_test_case(self, input_str):
"""Generic handler for negative lifetime tests."""
self.assertRaises(Exception,
vpn_utils.validate_lifetime_dict,
(input_str))
def _test_validate_dpd_negative_test_case(self, input_str):
"""Generic handler for negative lifetime tests."""
self.assertRaises(Exception,
vpn_utils.validate_lifetime_dict,
(input_str))