From 1683bea1c67a12a265bf21b8644622141602aa6f Mon Sep 17 00:00:00 2001 From: Paul Michali Date: Tue, 16 Sep 2014 11:22:17 -0400 Subject: [PATCH] Rework and enable VPNaaS UT for Cisco CSR REST The Cisco CSR REST client library unit tests were developed in Icehouse, using the httmock library. However, the community did not want to add this library to global requirements, as there was a similar httpretty library available (albeit with some short- comings). As a result, the test module was renamed with a "no" prefix, to prevent inclusion in automated tests. Since then, a new library, requests-mock, has been added to global requirements, to replace httpretty, and is being used on several other projects. This commit reworks the unit test to use requests-mock, instead of httmock. The functionality is the same, but the mechanism (a fixture with URI registration vs context manager) is different. This commit provides coverage for the REST client code, by using a mock for the Cisco CSR VM. The unit test module can be subclassed, and used with a real CSR VM, for 3rd party CI testing, in the future. Change-Id: I55c8a253eb32985bc2016ae748b1ded58d021e1a Closes-Bug: 1358470 --- .../vpn/device_drivers/cisco_csr_mock.py | 577 ------ .../device_drivers/notest_cisco_csr_rest.py | 1358 -------------- .../vpn/device_drivers/test_cisco_csr_rest.py | 1626 +++++++++++++++++ 3 files changed, 1626 insertions(+), 1935 deletions(-) delete mode 100644 neutron/tests/unit/services/vpn/device_drivers/cisco_csr_mock.py delete mode 100644 neutron/tests/unit/services/vpn/device_drivers/notest_cisco_csr_rest.py create mode 100644 neutron/tests/unit/services/vpn/device_drivers/test_cisco_csr_rest.py diff --git a/neutron/tests/unit/services/vpn/device_drivers/cisco_csr_mock.py b/neutron/tests/unit/services/vpn/device_drivers/cisco_csr_mock.py deleted file mode 100644 index 93a126bfd..000000000 --- a/neutron/tests/unit/services/vpn/device_drivers/cisco_csr_mock.py +++ /dev/null @@ -1,577 +0,0 @@ -# Copyright 2014 Cisco Systems, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Mock REST requests to Cisco Cloud Services Router.""" - -import re - -import functools -# TODO(pcm): Remove when switch to requests-mock package. Comment out, if use -# local copy of httmock.py source. Needed for PEP8. -import httmock -import requests -from requests import exceptions as r_exc - -from neutron.openstack.common import log as logging -# TODO(pcm) Remove once httmock package is added to test-requirements. For -# now, uncomment and include httmock source to unit test. -# from neutron.tests.unit.services.vpn.device_drivers import httmock - -LOG = logging.getLogger(__name__) - - -def repeat(n): - """Decorator to limit the number of times a handler is called. - - Will allow the wrapped function (handler) to be called 'n' times. - After that, this will return None for any additional calls, - allowing other handlers, if any, to be invoked. - """ - - class static: - retries = n - - def decorator(func): - @functools.wraps(func) - def wrapped(*args, **kwargs): - if static.retries == 0: - return None - static.retries -= 1 - return func(*args, **kwargs) - return wrapped - return decorator - - -def filter_request(methods, resource): - """Decorator to invoke handler once for a specific resource. - - This will call the handler only for a specific resource using - a specific method(s). Any other resource request or method will - return None, allowing other handlers, if any, to be invoked. - """ - - class static: - target_methods = [m.upper() for m in methods] - target_resource = resource - - def decorator(func): - @functools.wraps(func) - def wrapped(*args, **kwargs): - if (args[1].method in static.target_methods and - static.target_resource in args[0].path): - return func(*args, **kwargs) - else: - return None # Not for this resource - return wrapped - return decorator - - -@httmock.urlmatch(netloc=r'localhost') -def token(url, request): - if 'auth/token-services' in url.path: - return {'status_code': requests.codes.OK, - 'content': {'token-id': 'dummy-token'}} - - -@httmock.urlmatch(netloc=r'localhost') -def token_unauthorized(url, request): - if 'auth/token-services' in url.path: - return {'status_code': requests.codes.UNAUTHORIZED} - - -@httmock.urlmatch(netloc=r'wrong-host') -def token_wrong_host(url, request): - raise r_exc.ConnectionError() - - -@httmock.all_requests -def token_timeout(url, request): - raise r_exc.Timeout() - - -@filter_request(['get'], 'global/host-name') -@httmock.all_requests -def timeout(url, request): - """Simulated timeout of a normal request.""" - - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - raise r_exc.Timeout() - - -@httmock.urlmatch(netloc=r'localhost') -def no_such_resource(url, request): - """Indicate not found error, when invalid resource requested.""" - return {'status_code': requests.codes.NOT_FOUND} - - -@filter_request(['get'], 'global/host-name') -@repeat(1) -@httmock.urlmatch(netloc=r'localhost') -def expired_request(url, request): - """Simulate access denied failure on first request for this resource. - - Intent here is to simulate that the token has expired, by failing - the first request to the resource. Because of the repeat=1, this - will only be called once, and subsequent calls will not be handled - by this function, but instead will access the normal handler and - will pass. Currently configured for a GET request, but will work - with POST and PUT as well. For DELETE, would need to filter_request on a - different resource (e.g. 'global/local-users') - """ - - return {'status_code': requests.codes.UNAUTHORIZED} - - -@httmock.urlmatch(netloc=r'localhost') -def normal_get(url, request): - if request.method != 'GET': - return - LOG.debug("GET mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - if 'global/host-name' in url.path: - content = {u'kind': u'object#host-name', - u'host-name': u'Router'} - return httmock.response(requests.codes.OK, content=content) - if 'global/local-users' in url.path: - content = {u'kind': u'collection#local-user', - u'users': ['peter', 'paul', 'mary']} - return httmock.response(requests.codes.OK, content=content) - if 'interfaces/GigabitEthernet' in url.path: - actual_interface = url.path.split('/')[-1] - ip = actual_interface[-1] - content = {u'kind': u'object#interface', - u'description': u'Changed description', - u'if-name': actual_interface, - u'proxy-arp': True, - u'subnet-mask': u'255.255.255.0', - u'icmp-unreachable': True, - u'nat-direction': u'', - u'icmp-redirects': True, - u'ip-address': u'192.168.200.%s' % ip, - u'verify-unicast-source': False, - u'type': u'ethernet'} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/ike/policies/2' in url.path: - content = {u'kind': u'object#ike-policy', - u'priority-id': u'2', - u'version': u'v1', - u'local-auth-method': u'pre-share', - u'encryption': u'aes256', - u'hash': u'sha', - u'dhGroup': 5, - u'lifetime': 3600} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/ike/keyrings' in url.path: - content = {u'kind': u'object#ike-keyring', - u'keyring-name': u'5', - u'pre-shared-key-list': [ - {u'key': u'super-secret', - u'encrypted': False, - u'peer-address': u'10.10.10.20 255.255.255.0'} - ]} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/ipsec/policies/' in url.path: - ipsec_policy_id = url.path.split('/')[-1] - content = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'policy-id': u'%s' % ipsec_policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-256-aes', - u'esp-authentication': u'esp-sha-hmac', - u'ah': u'ah-sha-hmac', - }, - u'anti-replay-window-size': u'Disable', - u'lifetime-sec': 120, - u'pfs': u'group5', - u'lifetime-kb': 4608000, - u'idle-time': None} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/site-to-site/Tunnel' in url.path: - tunnel = url.path.split('/')[-1] - # Use same number, to allow mock to generate IPSec policy ID - ipsec_policy_id = tunnel[6:] - content = {u'kind': u'object#vpn-site-to-site', - u'vpn-interface-name': u'%s' % tunnel, - u'ip-version': u'ipv4', - u'vpn-type': u'site-to-site', - u'ipsec-policy-id': u'%s' % ipsec_policy_id, - u'ike-profile-id': None, - u'mtu': 1500, - u'local-device': { - u'ip-address': '10.3.0.1/24', - u'tunnel-ip-address': '10.10.10.10' - }, - u'remote-device': { - u'tunnel-ip-address': '10.10.10.20' - }} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/ike/keepalive' in url.path: - content = {u'interval': 60, - u'retry': 4, - u'periodic': True} - return httmock.response(requests.codes.OK, content=content) - if 'routing-svc/static-routes' in url.path: - content = {u'destination-network': u'10.1.0.0/24', - u'kind': u'object#static-route', - u'next-hop-router': None, - u'outgoing-interface': u'GigabitEthernet1', - u'admin-distance': 1} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/site-to-site/active/sessions' in url.path: - # Only including needed fields for mock - content = {u'kind': u'collection#vpn-active-sessions', - u'items': [{u'status': u'DOWN-NEGOTIATING', - u'vpn-interface-name': u'Tunnel123'}, ]} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/ike/keyrings') -@httmock.urlmatch(netloc=r'localhost') -def get_fqdn(url, request): - LOG.debug("GET FQDN mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - content = {u'kind': u'object#ike-keyring', - u'keyring-name': u'5', - u'pre-shared-key-list': [ - {u'key': u'super-secret', - u'encrypted': False, - u'peer-address': u'cisco.com'} - ]} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/ipsec/policies/') -@httmock.urlmatch(netloc=r'localhost') -def get_no_ah(url, request): - LOG.debug("GET No AH mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - ipsec_policy_id = url.path.split('/')[-1] - content = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'anti-replay-window-size': u'128', - u'policy-id': u'%s' % ipsec_policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-aes', - u'esp-authentication': u'esp-sha-hmac', - }, - u'lifetime-sec': 120, - u'pfs': u'group5', - u'lifetime-kb': 4608000, - u'idle-time': None} - return httmock.response(requests.codes.OK, content=content) - - -@httmock.urlmatch(netloc=r'localhost') -def get_defaults(url, request): - if request.method != 'GET': - return - LOG.debug("GET mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - if 'vpn-svc/ike/policies/2' in url.path: - content = {u'kind': u'object#ike-policy', - u'priority-id': u'2', - u'version': u'v1', - u'local-auth-method': u'pre-share', - u'encryption': u'des', - u'hash': u'sha', - u'dhGroup': 1, - u'lifetime': 86400} - return httmock.response(requests.codes.OK, content=content) - if 'vpn-svc/ipsec/policies/' in url.path: - ipsec_policy_id = url.path.split('/')[-1] - content = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'policy-id': u'%s' % ipsec_policy_id, - u'protection-suite': {}, - u'lifetime-sec': 3600, - u'pfs': u'Disable', - u'anti-replay-window-size': u'None', - u'lifetime-kb': 4608000, - u'idle-time': None} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/site-to-site') -@httmock.urlmatch(netloc=r'localhost') -def get_unnumbered(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - tunnel = url.path.split('/')[-1] - ipsec_policy_id = tunnel[6:] - content = {u'kind': u'object#vpn-site-to-site', - u'vpn-interface-name': u'%s' % tunnel, - u'ip-version': u'ipv4', - u'vpn-type': u'site-to-site', - u'ipsec-policy-id': u'%s' % ipsec_policy_id, - u'ike-profile-id': None, - u'mtu': 1500, - u'local-device': { - u'ip-address': u'GigabitEthernet3', - u'tunnel-ip-address': u'10.10.10.10' - }, - u'remote-device': { - u'tunnel-ip-address': u'10.10.10.20' - }} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/site-to-site/Tunnel') -@httmock.urlmatch(netloc=r'localhost') -def get_admin_down(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - # URI has .../Tunnel#/state, so get number from 2nd to last element - tunnel = url.path.split('/')[-2] - content = {u'kind': u'object#vpn-site-to-site-state', - u'vpn-interface-name': u'%s' % tunnel, - u'line-protocol-state': u'down', - u'enabled': False} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/site-to-site/Tunnel') -@httmock.urlmatch(netloc=r'localhost') -def get_admin_up(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - # URI has .../Tunnel#/state, so get number from 2nd to last element - tunnel = url.path.split('/')[-2] - content = {u'kind': u'object#vpn-site-to-site-state', - u'vpn-interface-name': u'%s' % tunnel, - u'line-protocol-state': u'down', - u'enabled': True} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/site-to-site') -@httmock.urlmatch(netloc=r'localhost') -def get_mtu(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - tunnel = url.path.split('/')[-1] - ipsec_policy_id = tunnel[6:] - content = {u'kind': u'object#vpn-site-to-site', - u'vpn-interface-name': u'%s' % tunnel, - u'ip-version': u'ipv4', - u'vpn-type': u'site-to-site', - u'ipsec-policy-id': u'%s' % ipsec_policy_id, - u'ike-profile-id': None, - u'mtu': 9192, - u'local-device': { - u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10' - }, - u'remote-device': { - u'tunnel-ip-address': u'10.10.10.20' - }} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'vpn-svc/ike/keepalive') -@httmock.urlmatch(netloc=r'localhost') -def get_not_configured(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.NOT_FOUND} - - -@filter_request(['get'], 'vpn-svc/site-to-site/active/sessions') -@httmock.urlmatch(netloc=r'localhost') -def get_none(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - content = {u'kind': u'collection#vpn-active-sessions', - u'items': []} - return httmock.response(requests.codes.OK, content=content) - - -@filter_request(['get'], 'interfaces/GigabitEthernet3') -@httmock.urlmatch(netloc=r'localhost') -def get_local_ip(url, request): - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - content = {u'kind': u'object#interface', - u'subnet-mask': u'255.255.255.0', - u'ip-address': u'10.5.0.2'} - return httmock.response(requests.codes.OK, content=content) - - -@httmock.urlmatch(netloc=r'localhost') -def post(url, request): - if request.method != 'POST': - return - LOG.debug("POST mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - if 'interfaces/GigabitEthernet' in url.path: - return {'status_code': requests.codes.NO_CONTENT} - if 'global/local-users' in url.path: - if 'username' not in request.body: - return {'status_code': requests.codes.BAD_REQUEST} - if '"privilege": 20' in request.body: - return {'status_code': requests.codes.BAD_REQUEST} - headers = {'location': '%s/test-user' % url.geturl()} - return httmock.response(requests.codes.CREATED, headers=headers) - if 'vpn-svc/ike/policies' in url.path: - headers = {'location': "%s/2" % url.geturl()} - return httmock.response(requests.codes.CREATED, headers=headers) - if 'vpn-svc/ipsec/policies' in url.path: - m = re.search(r'"policy-id": "(\S+)"', request.body) - if m: - headers = {'location': "%s/%s" % (url.geturl(), m.group(1))} - return httmock.response(requests.codes.CREATED, headers=headers) - return {'status_code': requests.codes.BAD_REQUEST} - if 'vpn-svc/ike/keyrings' in url.path: - headers = {'location': "%s/5" % url.geturl()} - return httmock.response(requests.codes.CREATED, headers=headers) - if 'vpn-svc/site-to-site' in url.path: - m = re.search(r'"vpn-interface-name": "(\S+)"', request.body) - if m: - headers = {'location': "%s/%s" % (url.geturl(), m.group(1))} - return httmock.response(requests.codes.CREATED, headers=headers) - return {'status_code': requests.codes.BAD_REQUEST} - if 'routing-svc/static-routes' in url.path: - headers = {'location': - "%s/10.1.0.0_24_GigabitEthernet1" % url.geturl()} - return httmock.response(requests.codes.CREATED, headers=headers) - - -@filter_request(['post'], 'global/local-users') -@httmock.urlmatch(netloc=r'localhost') -def post_change_attempt(url, request): - LOG.debug("POST change value mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.NOT_FOUND, - 'content': { - u'error-code': -1, - u'error-message': u'user test-user already exists'}} - - -@httmock.urlmatch(netloc=r'localhost') -def post_duplicate(url, request): - LOG.debug("POST duplicate mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST, - 'content': { - u'error-code': -1, - u'error-message': u'policy 2 exist, not allow to ' - u'update policy using POST method'}} - - -@filter_request(['post'], 'vpn-svc/site-to-site') -@httmock.urlmatch(netloc=r'localhost') -def post_missing_ipsec_policy(url, request): - LOG.debug("POST missing ipsec policy mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST} - - -@filter_request(['post'], 'vpn-svc/site-to-site') -@httmock.urlmatch(netloc=r'localhost') -def post_missing_ike_policy(url, request): - LOG.debug("POST missing ike policy mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST} - - -@filter_request(['post'], 'vpn-svc/site-to-site') -@httmock.urlmatch(netloc=r'localhost') -def post_bad_ip(url, request): - LOG.debug("POST bad IP mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST} - - -@filter_request(['post'], 'vpn-svc/site-to-site') -@httmock.urlmatch(netloc=r'localhost') -def post_bad_mtu(url, request): - LOG.debug("POST bad mtu mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST} - - -@filter_request(['post'], 'vpn-svc/ipsec/policies') -@httmock.urlmatch(netloc=r'localhost') -def post_bad_lifetime(url, request): - LOG.debug("POST bad lifetime mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST} - - -@filter_request(['post'], 'vpn-svc/ipsec/policies') -@httmock.urlmatch(netloc=r'localhost') -def post_bad_name(url, request): - LOG.debug("POST bad IPSec policy name for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - return {'status_code': requests.codes.BAD_REQUEST} - - -@httmock.urlmatch(netloc=r'localhost') -def put(url, request): - if request.method != 'PUT': - return - LOG.debug("PUT mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - # Any resource - return {'status_code': requests.codes.NO_CONTENT} - - -@httmock.urlmatch(netloc=r'localhost') -def delete(url, request): - if request.method != 'DELETE': - return - LOG.debug("DELETE mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - # Any resource - return {'status_code': requests.codes.NO_CONTENT} - - -@httmock.urlmatch(netloc=r'localhost') -def delete_unknown(url, request): - if request.method != 'DELETE': - return - LOG.debug("DELETE unknown mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - # Any resource - return {'status_code': requests.codes.NOT_FOUND, - 'content': { - u'error-code': -1, - u'error-message': 'user unknown not found'}} - - -@httmock.urlmatch(netloc=r'localhost') -def delete_not_allowed(url, request): - if request.method != 'DELETE': - return - LOG.debug("DELETE not allowed mock for %s", url) - if not request.headers.get('X-auth-token', None): - return {'status_code': requests.codes.UNAUTHORIZED} - # Any resource - return {'status_code': requests.codes.METHOD_NOT_ALLOWED} diff --git a/neutron/tests/unit/services/vpn/device_drivers/notest_cisco_csr_rest.py b/neutron/tests/unit/services/vpn/device_drivers/notest_cisco_csr_rest.py deleted file mode 100644 index a89abe944..000000000 --- a/neutron/tests/unit/services/vpn/device_drivers/notest_cisco_csr_rest.py +++ /dev/null @@ -1,1358 +0,0 @@ -# Copyright 2014 Cisco Systems, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -#TODO(pcm): Rename this file to remove the "no" prefix, once httmock is -# approved and added to requirements.txt - -import random - -# TODO(pcm): Remove when update to requests-mock package. Comment out, if use -# local copy of httmock.py source. Needed for PEP8. -import httmock -import requests - -from neutron.openstack.common import log as logging -from neutron.services.vpn.device_drivers import ( - cisco_csr_rest_client as csr_client) -from neutron.tests import base -from neutron.tests.unit.services.vpn.device_drivers import ( - cisco_csr_mock as csr_request) -# TODO(pcm) Uncomment to run w/local copy of httmock.py source. Remove when -# update to requests-mock package. -# from neutron.tests.unit.services.vpn.device_drivers import httmock - - -LOG = logging.getLogger(__name__) -# Enables debug logging to console -if True: - logging.CONF.set_override('debug', True) - logging.setup('neutron') - -dummy_policy_id = 'dummy-ipsec-policy-id-name' - - -# Note: Helper functions to test reuse of IDs. -def generate_pre_shared_key_id(): - return random.randint(100, 200) - - -def generate_ike_policy_id(): - return random.randint(200, 300) - - -def generate_ipsec_policy_id(): - return random.randint(300, 400) - - -class TestCsrLoginRestApi(base.BaseTestCase): - - """Test logging into CSR to obtain token-id.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrLoginRestApi, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_get_token(self): - """Obtain the token and its expiration time.""" - with httmock.HTTMock(csr_request.token): - self.assertTrue(self.csr.authenticate()) - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertIsNotNone(self.csr.token) - - def test_unauthorized_token_request(self): - """Negative test of invalid user/password.""" - self.csr.auth = ('stack', 'bogus') - with httmock.HTTMock(csr_request.token_unauthorized): - self.assertIsNone(self.csr.authenticate()) - self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status) - - def test_non_existent_host(self): - """Negative test of request to non-existent host.""" - self.csr.host = 'wrong-host' - self.csr.token = 'Set by some previously successful access' - with httmock.HTTMock(csr_request.token_wrong_host): - self.assertIsNone(self.csr.authenticate()) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - self.assertIsNone(self.csr.token) - - def test_timeout_on_token_access(self): - """Negative test of a timeout on a request.""" - with httmock.HTTMock(csr_request.token_timeout): - self.assertIsNone(self.csr.authenticate()) - self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status) - self.assertIsNone(self.csr.token) - - -class TestCsrGetRestApi(base.BaseTestCase): - - """Test CSR GET REST API.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrGetRestApi, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_valid_rest_gets(self): - """Simple GET requests. - - First request will do a post to get token (login). Assumes - that there are two interfaces on the CSR. - """ - - with httmock.HTTMock(csr_request.token, - csr_request.normal_get): - content = self.csr.get_request('global/host-name') - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertIn('host-name', content) - self.assertNotEqual(None, content['host-name']) - - content = self.csr.get_request('global/local-users') - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertIn('users', content) - - -class TestCsrPostRestApi(base.BaseTestCase): - - """Test CSR POST REST API.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrPostRestApi, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_post_requests(self): - """Simple POST requests (repeatable). - - First request will do a post to get token (login). Assumes - that there are two interfaces (Ge1 and Ge2) on the CSR. - """ - - with httmock.HTTMock(csr_request.token, - csr_request.post): - content = self.csr.post_request( - 'interfaces/GigabitEthernet1/statistics', - payload={'action': 'clear'}) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - self.assertIsNone(content) - content = self.csr.post_request( - 'interfaces/GigabitEthernet2/statistics', - payload={'action': 'clear'}) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - self.assertIsNone(content) - - def test_post_with_location(self): - """Create a user and verify that location returned.""" - with httmock.HTTMock(csr_request.token, - csr_request.post): - location = self.csr.post_request( - 'global/local-users', - payload={'username': 'test-user', - 'password': 'pass12345', - 'privilege': 15}) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('global/local-users/test-user', location) - - def test_post_missing_required_attribute(self): - """Negative test of POST with missing mandatory info.""" - with httmock.HTTMock(csr_request.token, - csr_request.post): - self.csr.post_request('global/local-users', - payload={'password': 'pass12345', - 'privilege': 15}) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - def test_post_invalid_attribute(self): - """Negative test of POST with invalid info.""" - with httmock.HTTMock(csr_request.token, - csr_request.post): - self.csr.post_request('global/local-users', - payload={'username': 'test-user', - 'password': 'pass12345', - 'privilege': 20}) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - def test_post_already_exists(self): - """Negative test of a duplicate POST. - - Uses the lower level _do_request() API to just perform the POST and - obtain the response, without any error processing. - """ - with httmock.HTTMock(csr_request.token, - csr_request.post): - location = self.csr._do_request( - 'POST', - 'global/local-users', - payload={'username': 'test-user', - 'password': 'pass12345', - 'privilege': 15}, - more_headers=csr_client.HEADER_CONTENT_TYPE_JSON) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('global/local-users/test-user', location) - with httmock.HTTMock(csr_request.token, - csr_request.post_change_attempt): - self.csr._do_request( - 'POST', - 'global/local-users', - payload={'username': 'test-user', - 'password': 'pass12345', - 'privilege': 15}, - more_headers=csr_client.HEADER_CONTENT_TYPE_JSON) - # Note: For local-user, a 404 error is returned. For - # site-to-site connection a 400 is returned. - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - - def test_post_changing_value(self): - """Negative test of a POST trying to change a value.""" - with httmock.HTTMock(csr_request.token, - csr_request.post): - location = self.csr.post_request( - 'global/local-users', - payload={'username': 'test-user', - 'password': 'pass12345', - 'privilege': 15}) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('global/local-users/test-user', location) - with httmock.HTTMock(csr_request.token, - csr_request.post_change_attempt): - content = self.csr.post_request('global/local-users', - payload={'username': 'test-user', - 'password': 'changed', - 'privilege': 15}) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - expected = {u'error-code': -1, - u'error-message': u'user test-user already exists'} - self.assertDictContainsSubset(expected, content) - - -class TestCsrPutRestApi(base.BaseTestCase): - - """Test CSR PUT REST API.""" - - def _save_resources(self): - with httmock.HTTMock(csr_request.token, - csr_request.normal_get): - details = self.csr.get_request('global/host-name') - if self.csr.status != requests.codes.OK: - self.fail("Unable to save original host name") - self.original_host = details['host-name'] - details = self.csr.get_request('interfaces/GigabitEthernet1') - if self.csr.status != requests.codes.OK: - self.fail("Unable to save interface Ge1 description") - self.original_if = details - if details.get('description', ''): - self.original_if['description'] = '' - self.csr.token = None - - def _restore_resources(self, user, password): - """Restore the host name and itnerface description. - - Must restore the user and password, so that authentication - token can be obtained (as some tests corrupt auth info). - Will also clear token, so that it gets a fresh token. - """ - - self.csr.auth = (user, password) - self.csr.token = None - with httmock.HTTMock(csr_request.token, - csr_request.put): - payload = {'host-name': self.original_host} - self.csr.put_request('global/host-name', payload=payload) - if self.csr.status != requests.codes.NO_CONTENT: - self.fail("Unable to restore host name after test") - payload = {'description': self.original_if['description'], - 'if-name': self.original_if['if-name'], - 'ip-address': self.original_if['ip-address'], - 'subnet-mask': self.original_if['subnet-mask'], - 'type': self.original_if['type']} - self.csr.put_request('interfaces/GigabitEthernet1', - payload=payload) - if self.csr.status != requests.codes.NO_CONTENT: - self.fail("Unable to restore I/F Ge1 description after test") - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - """Prepare for PUT API tests.""" - super(TestCsrPutRestApi, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - self._save_resources() - self.addCleanup(self._restore_resources, 'stack', 'cisco') - - def test_put_requests(self): - """Simple PUT requests (repeatable). - - First request will do a post to get token (login). Assumes - that there are two interfaces on the CSR (Ge1 and Ge2). - """ - - with httmock.HTTMock(csr_request.token, - csr_request.put, - csr_request.normal_get): - payload = {'host-name': 'TestHost'} - content = self.csr.put_request('global/host-name', - payload=payload) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - self.assertIsNone(content) - - payload = {'host-name': 'TestHost2'} - content = self.csr.put_request('global/host-name', - payload=payload) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - self.assertIsNone(content) - - def test_change_interface_description(self): - """Test that interface description can be changed. - - This was a problem with an earlier version of the CSR image and is - here to prevent regression. - """ - with httmock.HTTMock(csr_request.token, - csr_request.put, - csr_request.normal_get): - payload = {'description': u'Changed description', - 'if-name': self.original_if['if-name'], - 'ip-address': self.original_if['ip-address'], - 'subnet-mask': self.original_if['subnet-mask'], - 'type': self.original_if['type']} - content = self.csr.put_request( - 'interfaces/GigabitEthernet1', payload=payload) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - self.assertIsNone(content) - content = self.csr.get_request('interfaces/GigabitEthernet1') - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertIn('description', content) - self.assertEqual(u'Changed description', - content['description']) - - def ignore_test_change_to_empty_interface_description(self): - """Test that interface description can be changed to empty string. - - This is a problem in the current version of the CSR image, which - rejects the change with a 400 error. This test is here to prevent - a regression (once it is fixed) Note that there is code in the - test setup to change the description to a non-empty string to - avoid failures in other tests. - """ - with httmock.HTTMock(csr_request.token, - csr_request.put, - csr_request.normal_get): - payload = {'description': '', - 'if-name': self.original_if['if-name'], - 'ip-address': self.original_if['ip-address'], - 'subnet-mask': self.original_if['subnet-mask'], - 'type': self.original_if['type']} - content = self.csr.put_request( - 'interfaces/GigabitEthernet1', payload=payload) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - self.assertIsNone(content) - content = self.csr.get_request('interfaces/GigabitEthernet1') - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertIn('description', content) - self.assertEqual('', content['description']) - - -class TestCsrDeleteRestApi(base.BaseTestCase): - - """Test CSR DELETE REST API.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrDeleteRestApi, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def _make_dummy_user(self): - """Create a user that will be later deleted.""" - self.csr.post_request('global/local-users', - payload={'username': 'dummy', - 'password': 'dummy', - 'privilege': 15}) - self.assertEqual(requests.codes.CREATED, self.csr.status) - - def test_delete_requests(self): - """Simple DELETE requests (creating entry first).""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.delete): - self._make_dummy_user() - self.csr.token = None # Force login - self.csr.delete_request('global/local-users/dummy') - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - # Delete again, but without logging in this time - self._make_dummy_user() - self.csr.delete_request('global/local-users/dummy') - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - - def test_delete_non_existent_entry(self): - """Negative test of trying to delete a non-existent user.""" - with httmock.HTTMock(csr_request.token, - csr_request.delete_unknown): - content = self.csr.delete_request('global/local-users/unknown') - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - expected = {u'error-code': -1, - u'error-message': u'user unknown not found'} - self.assertDictContainsSubset(expected, content) - - def test_delete_not_allowed(self): - """Negative test of trying to delete the host-name.""" - with httmock.HTTMock(csr_request.token, - csr_request.delete_not_allowed): - self.csr.delete_request('global/host-name') - self.assertEqual(requests.codes.METHOD_NOT_ALLOWED, - self.csr.status) - - -class TestCsrRestApiFailures(base.BaseTestCase): - - """Test failure cases common for all REST APIs. - - Uses the lower level _do_request() to just perform the operation and get - the result, without any error handling. - """ - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=0.1): - super(TestCsrRestApiFailures, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_request_for_non_existent_resource(self): - """Negative test of non-existent resource on REST request.""" - with httmock.HTTMock(csr_request.token, - csr_request.no_such_resource): - self.csr.post_request('no/such/request') - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - # The result is HTTP 404 message, so no error content to check - - def test_timeout_during_request(self): - """Negative test of timeout during REST request.""" - with httmock.HTTMock(csr_request.token, - csr_request.timeout): - self.csr._do_request('GET', 'global/host-name') - self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status) - - def test_token_expired_on_request(self): - """Token expired before trying a REST request. - - The mock is configured to return a 401 error on the first - attempt to reference the host name. Simulate expiration of - token by changing it. - """ - - with httmock.HTTMock(csr_request.token, - csr_request.expired_request, - csr_request.normal_get): - self.csr.token = '123' # These are 44 characters, so won't match - content = self.csr._do_request('GET', 'global/host-name') - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertIn('host-name', content) - self.assertNotEqual(None, content['host-name']) - - def test_failed_to_obtain_token_for_request(self): - """Negative test of unauthorized user for REST request.""" - self.csr.auth = ('stack', 'bogus') - with httmock.HTTMock(csr_request.token_unauthorized): - self.csr._do_request('GET', 'global/host-name') - self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status) - - -class TestCsrRestIkePolicyCreate(base.BaseTestCase): - - """Test IKE policy create REST requests.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrRestIkePolicyCreate, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_create_delete_ike_policy(self): - """Create and then delete IKE policy.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - policy_id = '2' - policy_info = {u'priority-id': u'%s' % policy_id, - u'encryption': u'aes256', - u'hash': u'sha', - u'dhGroup': 5, - u'lifetime': 3600} - location = self.csr.create_ike_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ike/policies/%s' % policy_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ike-policy', - u'version': u'v1', - u'local-auth-method': u'pre-share'} - expected_policy.update(policy_info) - self.assertEqual(expected_policy, content) - # Now delete and verify the IKE policy is gone - with httmock.HTTMock(csr_request.token, - csr_request.delete, - csr_request.no_such_resource): - self.csr.delete_ike_policy(policy_id) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - - def test_create_ike_policy_with_defaults(self): - """Create IKE policy using defaults for all optional values.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.get_defaults): - policy_id = '2' - policy_info = {u'priority-id': u'%s' % policy_id} - location = self.csr.create_ike_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ike/policies/%s' % policy_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ike-policy', - u'version': u'v1', - u'encryption': u'des', - u'hash': u'sha', - u'dhGroup': 1, - u'lifetime': 86400, - # Lower level sets this, but it is the default - u'local-auth-method': u'pre-share'} - expected_policy.update(policy_info) - self.assertEqual(expected_policy, content) - - def test_create_duplicate_ike_policy(self): - """Negative test of trying to create a duplicate IKE policy.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - policy_id = '2' - policy_info = {u'priority-id': u'%s' % policy_id, - u'encryption': u'aes', - u'hash': u'sha', - u'dhGroup': 5, - u'lifetime': 3600} - location = self.csr.create_ike_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ike/policies/%s' % policy_id, location) - with httmock.HTTMock(csr_request.token, - csr_request.post_duplicate): - location = self.csr.create_ike_policy(policy_info) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - expected = {u'error-code': -1, - u'error-message': u'policy 2 exist, not allow to ' - u'update policy using POST method'} - self.assertDictContainsSubset(expected, location) - - -class TestCsrRestIPSecPolicyCreate(base.BaseTestCase): - - """Test IPSec policy create REST requests.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrRestIPSecPolicyCreate, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_create_delete_ipsec_policy(self): - """Create and then delete IPSec policy.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - policy_id = '123' - policy_info = { - u'policy-id': u'%s' % policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-256-aes', - u'esp-authentication': u'esp-sha-hmac', - u'ah': u'ah-sha-hmac', - }, - u'lifetime-sec': 120, - u'pfs': u'group5', - u'anti-replay-window-size': u'disable' - } - location = self.csr.create_ipsec_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ipsec/policies/%s' % policy_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'lifetime-kb': 4608000, - u'idle-time': None} - expected_policy.update(policy_info) - # CSR will respond with capitalized value - expected_policy[u'anti-replay-window-size'] = u'Disable' - self.assertEqual(expected_policy, content) - # Now delete and verify the IPSec policy is gone - with httmock.HTTMock(csr_request.token, - csr_request.delete, - csr_request.no_such_resource): - self.csr.delete_ipsec_policy(policy_id) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - - def test_create_ipsec_policy_with_defaults(self): - """Create IPSec policy with default for all optional values.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.get_defaults): - policy_id = '123' - policy_info = { - u'policy-id': u'%s' % policy_id, - } - location = self.csr.create_ipsec_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ipsec/policies/%s' % policy_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'protection-suite': {}, - u'lifetime-sec': 3600, - u'pfs': u'Disable', - u'anti-replay-window-size': u'None', - u'lifetime-kb': 4608000, - u'idle-time': None} - expected_policy.update(policy_info) - self.assertEqual(expected_policy, content) - - def test_create_ipsec_policy_with_uuid(self): - """Create IPSec policy using UUID for id.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - policy_info = { - u'policy-id': u'%s' % dummy_policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-256-aes', - u'esp-authentication': u'esp-sha-hmac', - u'ah': u'ah-sha-hmac', - }, - u'lifetime-sec': 120, - u'pfs': u'group5', - u'anti-replay-window-size': u'disable' - } - location = self.csr.create_ipsec_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ipsec/policies/%s' % dummy_policy_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'lifetime-kb': 4608000, - u'idle-time': None} - expected_policy.update(policy_info) - # CSR will respond with capitalized value - expected_policy[u'anti-replay-window-size'] = u'Disable' - self.assertEqual(expected_policy, content) - - def test_create_ipsec_policy_without_ah(self): - """Create IPSec policy.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.get_no_ah): - policy_id = '10' - policy_info = { - u'policy-id': u'%s' % policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-aes', - u'esp-authentication': u'esp-sha-hmac', - }, - u'lifetime-sec': 120, - u'pfs': u'group5', - u'anti-replay-window-size': u'128' - } - location = self.csr.create_ipsec_policy(policy_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ipsec/policies/%s' % policy_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ipsec-policy', - u'mode': u'tunnel', - u'lifetime-kb': 4608000, - u'idle-time': None} - expected_policy.update(policy_info) - self.assertEqual(expected_policy, content) - - def test_invalid_ipsec_policy_lifetime(self): - """Failure test of IPSec policy with unsupported lifetime.""" - with httmock.HTTMock(csr_request.token, - csr_request.post_bad_lifetime): - policy_id = '123' - policy_info = { - u'policy-id': u'%s' % policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-aes', - u'esp-authentication': u'esp-sha-hmac', - u'ah': u'ah-sha-hmac', - }, - u'lifetime-sec': 119, - u'pfs': u'group5', - u'anti-replay-window-size': u'128' - } - self.csr.create_ipsec_policy(policy_info) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - def test_create_ipsec_policy_with_invalid_name(self): - """Failure test of creating IPSec policy with name too long.""" - with httmock.HTTMock(csr_request.token, - csr_request.post_bad_name, - csr_request.get_defaults): - policy_id = 'policy-name-is-too-long-32-chars' - policy_info = { - u'policy-id': u'%s' % policy_id, - } - self.csr.create_ipsec_policy(policy_info) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - -class TestCsrRestPreSharedKeyCreate(base.BaseTestCase): - - """Test Pre-shared key (PSK) create REST requests.""" - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrRestPreSharedKeyCreate, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_create_delete_pre_shared_key(self): - """Create and then delete a keyring entry for pre-shared key.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - psk_id = '5' - psk_info = {u'keyring-name': u'%s' % psk_id, - u'pre-shared-key-list': [ - {u'key': u'super-secret', - u'encrypted': False, - u'peer-address': u'10.10.10.20/24'} - ]} - location = self.csr.create_pre_shared_key(psk_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ike/keyrings/%s' % psk_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ike-keyring'} - expected_policy.update(psk_info) - # Note: the peer CIDR is returned as an IP and mask - expected_policy[u'pre-shared-key-list'][0][u'peer-address'] = ( - u'10.10.10.20 255.255.255.0') - self.assertEqual(expected_policy, content) - # Now delete and verify pre-shared key is gone - with httmock.HTTMock(csr_request.token, - csr_request.delete, - csr_request.no_such_resource): - self.csr.delete_pre_shared_key(psk_id) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - - def test_create_pre_shared_key_with_fqdn_peer(self): - """Create pre-shared key using FQDN for peer address.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.get_fqdn): - psk_id = '5' - psk_info = {u'keyring-name': u'%s' % psk_id, - u'pre-shared-key-list': [ - {u'key': u'super-secret', - u'encrypted': False, - u'peer-address': u'cisco.com'} - ]} - location = self.csr.create_pre_shared_key(psk_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ike/keyrings/%s' % psk_id, location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_policy = {u'kind': u'object#ike-keyring'} - expected_policy.update(psk_info) - self.assertEqual(expected_policy, content) - - def test_create_pre_shared_key_with_duplicate_peer_address(self): - """Negative test of creating a second pre-shared key with same peer.""" - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - psk_id = '5' - psk_info = {u'keyring-name': u'%s' % psk_id, - u'pre-shared-key-list': [ - {u'key': u'super-secret', - u'encrypted': False, - u'peer-address': u'10.10.10.20/24'} - ]} - location = self.csr.create_pre_shared_key(psk_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/ike/keyrings/%s' % psk_id, location) - with httmock.HTTMock(csr_request.token, - csr_request.post_duplicate): - psk_id = u'6' - another_psk_info = {u'keyring-name': psk_id, - u'pre-shared-key-list': [ - {u'key': u'abc123def', - u'encrypted': False, - u'peer-address': u'10.10.10.20/24'} - ]} - self.csr.create_ike_policy(another_psk_info) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - -class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): - - """Test IPSec site-to-site connection REST requests. - - This requires us to have first created an IKE policy, IPSec policy, - and pre-shared key, so it's more of an itegration test, when used - with a real CSR (as we can't mock out these pre-conditions. - """ - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrRestIPSecConnectionCreate, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def _make_psk_for_test(self): - psk_id = generate_pre_shared_key_id() - self._remove_resource_for_test(self.csr.delete_pre_shared_key, - psk_id) - with httmock.HTTMock(csr_request.token, - csr_request.post): - psk_info = {u'keyring-name': u'%d' % psk_id, - u'pre-shared-key-list': [ - {u'key': u'super-secret', - u'encrypted': False, - u'peer-address': u'10.10.10.20/24'} - ]} - self.csr.create_pre_shared_key(psk_info) - if self.csr.status != requests.codes.CREATED: - self.fail("Unable to create PSK for test case") - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_pre_shared_key, psk_id) - return psk_id - - def _make_ike_policy_for_test(self): - policy_id = generate_ike_policy_id() - self._remove_resource_for_test(self.csr.delete_ike_policy, - policy_id) - with httmock.HTTMock(csr_request.token, - csr_request.post): - policy_info = {u'priority-id': u'%d' % policy_id, - u'encryption': u'aes', - u'hash': u'sha', - u'dhGroup': 5, - u'lifetime': 3600} - self.csr.create_ike_policy(policy_info) - if self.csr.status != requests.codes.CREATED: - self.fail("Unable to create IKE policy for test case") - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ike_policy, policy_id) - return policy_id - - def _make_ipsec_policy_for_test(self): - policy_id = generate_ipsec_policy_id() - self._remove_resource_for_test(self.csr.delete_ipsec_policy, - policy_id) - with httmock.HTTMock(csr_request.token, - csr_request.post): - policy_info = { - u'policy-id': u'%d' % policy_id, - u'protection-suite': { - u'esp-encryption': u'esp-aes', - u'esp-authentication': u'esp-sha-hmac', - u'ah': u'ah-sha-hmac', - }, - u'lifetime-sec': 120, - u'pfs': u'group5', - u'anti-replay-window-size': u'disable' - } - self.csr.create_ipsec_policy(policy_info) - if self.csr.status != requests.codes.CREATED: - self.fail("Unable to create IPSec policy for test case") - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_policy, policy_id) - return policy_id - - def _remove_resource_for_test(self, delete_resource, resource_id): - with httmock.HTTMock(csr_request.token, - csr_request.delete): - delete_resource(resource_id) - - def _prepare_for_site_conn_create(self, skip_psk=False, skip_ike=False, - skip_ipsec=False): - """Create the policies and PSK so can then create site conn.""" - if not skip_psk: - self._make_psk_for_test() - if not skip_ike: - self._make_ike_policy_for_test() - if not skip_ipsec: - ipsec_policy_id = self._make_ipsec_policy_for_test() - else: - ipsec_policy_id = generate_ipsec_policy_id() - # Note: Use same ID number for tunnel and IPSec policy, so that when - # GET tunnel info, the mocks can infer the IPSec policy ID from the - # tunnel number. - return (ipsec_policy_id, ipsec_policy_id) - - def test_create_delete_ipsec_connection(self): - """Create and then delete an IPSec connection.""" - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'mtu': 1500, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_connection = {u'kind': u'object#vpn-site-to-site', - u'ike-profile-id': None, - u'mtu': 1500, - u'ip-version': u'ipv4'} - expected_connection.update(connection_info) - self.assertEqual(expected_connection, content) - # Now delete and verify that site-to-site connection is gone - with httmock.HTTMock(csr_request.token, - csr_request.delete, - csr_request.no_such_resource): - # Only delete connection. Cleanup will take care of prerequisites - self.csr.delete_ipsec_connection('Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) - - def test_create_ipsec_connection_with_no_tunnel_subnet(self): - """Create an IPSec connection without an IP address on tunnel.""" - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.get_unnumbered): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'local-device': {u'ip-address': u'GigabitEthernet3', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_connection = {u'kind': u'object#vpn-site-to-site', - u'ike-profile-id': None, - u'mtu': 1500, - u'ip-version': u'ipv4'} - expected_connection.update(connection_info) - self.assertEqual(expected_connection, content) - - def test_create_ipsec_connection_no_pre_shared_key(self): - """Test of connection create without associated pre-shared key. - - The CSR will create the connection, but will not be able to pass - traffic without the pre-shared key. - """ - - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create( - skip_psk=True) - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'mtu': 1500, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_connection = {u'kind': u'object#vpn-site-to-site', - u'ike-profile-id': None, - u'mtu': 1500, - u'ip-version': u'ipv4'} - expected_connection.update(connection_info) - self.assertEqual(expected_connection, content) - - def test_create_ipsec_connection_with_default_ike_policy(self): - """Test of connection create without IKE policy (uses default). - - Without an IKE policy, the CSR will use a built-in default IKE - policy setting for the connection. - """ - - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create( - skip_ike=True) - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'mtu': 1500, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_connection = {u'kind': u'object#vpn-site-to-site', - u'ike-profile-id': None, - u'mtu': 1500, - u'ip-version': u'ipv4'} - expected_connection.update(connection_info) - self.assertEqual(expected_connection, content) - - def test_set_ipsec_connection_admin_state_changes(self): - """Create IPSec connection in admin down state.""" - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - tunnel = u'Tunnel%d' % tunnel_id - with httmock.HTTMock(csr_request.token, - csr_request.post): - connection_info = { - u'vpn-interface-name': tunnel, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'mtu': 1500, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - tunnel) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/%s' % tunnel, location) - state_uri = location + "/state" - # Note: When created, the tunnel will be in admin 'up' state - # Note: Line protocol state will be down, unless have an active conn. - expected_state = {u'kind': u'object#vpn-site-to-site-state', - u'vpn-interface-name': tunnel, - u'line-protocol-state': u'down', - u'enabled': False} - with httmock.HTTMock(csr_request.put, - csr_request.get_admin_down): - self.csr.set_ipsec_connection_state(tunnel, admin_up=False) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(state_uri, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertEqual(expected_state, content) - - with httmock.HTTMock(csr_request.put, - csr_request.get_admin_up): - self.csr.set_ipsec_connection_state(tunnel, admin_up=True) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(state_uri, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_state[u'enabled'] = True - self.assertEqual(expected_state, content) - - def test_create_ipsec_connection_missing_ipsec_policy(self): - """Negative test of connection create without IPSec policy.""" - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create( - skip_ipsec=True) - with httmock.HTTMock( - csr_request.token, - csr_request.post_missing_ipsec_policy): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - def _determine_conflicting_ip(self): - with httmock.HTTMock(csr_request.token, - csr_request.get_local_ip): - details = self.csr.get_request('interfaces/GigabitEthernet3') - if self.csr.status != requests.codes.OK: - self.fail("Unable to obtain interface GigabitEthernet3's IP") - if_ip = details.get('ip-address') - if not if_ip: - self.fail("No IP address for GigabitEthernet3 interface") - return '.'.join(if_ip.split('.')[:3]) + '.10' - - def test_create_ipsec_connection_conficting_tunnel_ip(self): - """Negative test of connection create with conflicting tunnel IP. - - Find out the IP of a local interface (GigabitEthernet3) and create an - IP that is on the same subnet. Note: this interface needs to be up. - """ - - conflicting_ip = self._determine_conflicting_ip() - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(csr_request.token, - csr_request.post_bad_ip): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'local-device': {u'ip-address': u'%s/24' % conflicting_ip, - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - def test_create_ipsec_connection_with_max_mtu(self): - """Create an IPSec connection with max MTU value.""" - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.get_mtu): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'mtu': 9192, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_connection = {u'kind': u'object#vpn-site-to-site', - u'ike-profile-id': None, - u'ip-version': u'ipv4'} - expected_connection.update(connection_info) - self.assertEqual(expected_connection, content) - - def test_create_ipsec_connection_with_bad_mtu(self): - """Negative test of connection create with unsupported MTU value.""" - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(csr_request.token, - csr_request.post_bad_mtu): - connection_info = { - u'vpn-interface-name': u'Tunnel%d' % tunnel_id, - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'mtu': 9193, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - 'Tunnel%d' % tunnel_id) - self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) - - def test_status_when_no_tunnels_exist(self): - """Get status, when there are no tunnels.""" - with httmock.HTTMock(csr_request.token, - csr_request.get_none): - tunnels = self.csr.read_tunnel_statuses() - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertEqual([], tunnels) - - def test_status_for_one_tunnel(self): - """Get status of one tunnel.""" - # Create the IPsec site-to-site connection first - tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - tunnel_id = 123 # Must hard code to work with mock - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - connection_info = { - u'vpn-interface-name': u'Tunnel123', - u'ipsec-policy-id': u'%d' % ipsec_policy_id, - u'local-device': {u'ip-address': u'10.3.0.1/24', - u'tunnel-ip-address': u'10.10.10.10'}, - u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} - } - location = self.csr.create_ipsec_connection(connection_info) - self.addCleanup(self._remove_resource_for_test, - self.csr.delete_ipsec_connection, - u'Tunnel123') - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id, - location) - with httmock.HTTMock(csr_request.token, - csr_request.normal_get): - tunnels = self.csr.read_tunnel_statuses() - self.assertEqual(requests.codes.OK, self.csr.status) - self.assertEqual([(u'Tunnel123', u'DOWN-NEGOTIATING'), ], tunnels) - - -class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase): - - """Test IKE keepalive REST requests. - - Note: On the Cisco CSR, the IKE keepalive for v1 is a global configuration - that applies to all VPN tunnels to specify Dead Peer Detection information. - As a result, this REST API is not used in the OpenStack device driver, and - the keepalive will default to zero (disabled). - """ - - def _save_dpd_info(self): - with httmock.HTTMock(csr_request.token, - csr_request.normal_get): - details = self.csr.get_request('vpn-svc/ike/keepalive') - if self.csr.status == requests.codes.OK: - self.dpd = details - self.addCleanup(self._restore_dpd_info) - elif self.csr.status != requests.codes.NOT_FOUND: - self.fail("Unable to save original DPD info") - - def _restore_dpd_info(self): - with httmock.HTTMock(csr_request.token, - csr_request.put): - payload = {'interval': self.dpd['interval'], - 'retry': self.dpd['retry']} - self.csr.put_request('vpn-svc/ike/keepalive', payload=payload) - if self.csr.status != requests.codes.NO_CONTENT: - self.fail("Unable to restore DPD info after test") - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrRestIkeKeepaliveCreate, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - self._save_dpd_info() - self.csr.token = None - - def test_configure_ike_keepalive(self): - """Set IKE keep-alive (aka Dead Peer Detection) for the CSR.""" - with httmock.HTTMock(csr_request.token, - csr_request.put, - csr_request.normal_get): - keepalive_info = {'interval': 60, 'retry': 4} - self.csr.configure_ike_keepalive(keepalive_info) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request('vpn-svc/ike/keepalive') - self.assertEqual(requests.codes.OK, self.csr.status) - expected = {'periodic': False} - expected.update(keepalive_info) - self.assertDictContainsSubset(expected, content) - - def test_disable_ike_keepalive(self): - """Disable IKE keep-alive (aka Dead Peer Detection) for the CSR.""" - with httmock.HTTMock(csr_request.token, - csr_request.delete, - csr_request.put, - csr_request.get_not_configured): - keepalive_info = {'interval': 0, 'retry': 4} - self.csr.configure_ike_keepalive(keepalive_info) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - - -class TestCsrRestStaticRoute(base.BaseTestCase): - - """Test static route REST requests. - - A static route is added for the peer's private network. Would create - a route for each of the peer CIDRs specified for the VPN connection. - """ - - def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): - super(TestCsrRestStaticRoute, self).setUp() - info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, - 'username': 'stack', 'password': 'cisco', 'timeout': timeout} - self.csr = csr_client.CsrRestClient(info) - - def test_create_delete_static_route(self): - """Create and then delete a static route for the tunnel.""" - cidr = u'10.1.0.0/24' - interface = u'GigabitEthernet1' - expected_id = '10.1.0.0_24_GigabitEthernet1' - with httmock.HTTMock(csr_request.token, - csr_request.post, - csr_request.normal_get): - route_info = {u'destination-network': cidr, - u'outgoing-interface': interface} - location = self.csr.create_static_route(route_info) - self.assertEqual(requests.codes.CREATED, self.csr.status) - self.assertIn('routing-svc/static-routes/%s' % expected_id, - location) - # Check the hard-coded items that get set as well... - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.OK, self.csr.status) - expected_route = {u'kind': u'object#static-route', - u'next-hop-router': None, - u'admin-distance': 1} - expected_route.update(route_info) - self.assertEqual(expected_route, content) - # Now delete and verify that static route is gone - with httmock.HTTMock(csr_request.token, - csr_request.delete, - csr_request.no_such_resource): - route_id = csr_client.make_route_id(cidr, interface) - self.csr.delete_static_route(route_id) - self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) - content = self.csr.get_request(location, full_url=True) - self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) diff --git a/neutron/tests/unit/services/vpn/device_drivers/test_cisco_csr_rest.py b/neutron/tests/unit/services/vpn/device_drivers/test_cisco_csr_rest.py new file mode 100644 index 000000000..b7092af45 --- /dev/null +++ b/neutron/tests/unit/services/vpn/device_drivers/test_cisco_csr_rest.py @@ -0,0 +1,1626 @@ +# Copyright 2014 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import random +import re + +import requests +from requests import exceptions as r_exc +from requests_mock.contrib import fixture as mock_fixture + +from neutron.services.vpn.device_drivers import ( + cisco_csr_rest_client as csr_client) +from neutron.tests import base + + +dummy_policy_id = 'dummy-ipsec-policy-id-name' +BASE_URL = 'https://%s:55443/api/v1/' +LOCAL_URL = 'https://localhost:55443/api/v1/' + +URI_HOSTNAME = 'global/host-name' +URI_USERS = 'global/local-users' +URI_AUTH = 'auth/token-services' +URI_INTERFACE_GE1 = 'interfaces/GigabitEthernet1' +URI_PSK = 'vpn-svc/ike/keyrings' +URI_PSK_ID = URI_PSK + '/%s' +URI_IKE_POLICY = 'vpn-svc/ike/policies' +URI_IKE_POLICY_ID = URI_IKE_POLICY + '/%s' +URI_IPSEC_POLICY = 'vpn-svc/ipsec/policies' +URI_IPSEC_POLICY_ID = URI_IPSEC_POLICY + '/%s' +URI_IPSEC_CONN = 'vpn-svc/site-to-site' +URI_IPSEC_CONN_ID = URI_IPSEC_CONN + '/%s' +URI_KEEPALIVE = 'vpn-svc/ike/keepalive' +URI_ROUTES = 'routing-svc/static-routes' +URI_ROUTES_ID = URI_ROUTES + '/%s' +URI_SESSIONS = 'vpn-svc/site-to-site/active/sessions' + + +# Note: Helper functions to test reuse of IDs. +def generate_pre_shared_key_id(): + return random.randint(100, 200) + + +def generate_ike_policy_id(): + return random.randint(200, 300) + + +def generate_ipsec_policy_id(): + return random.randint(300, 400) + + +class CiscoCsrBaseTestCase(base.BaseTestCase): + + """Helper methods to register mock intercepts - used by child classes.""" + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + super(CiscoCsrBaseTestCase, self).setUp() + self.base_url = BASE_URL % host + self.requests = self.useFixture(mock_fixture.Fixture()) + info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) + + def _register_local_get(self, uri, json=None, + result_code=requests.codes.OK): + self.requests.register_uri( + 'GET', + LOCAL_URL + uri, + status_code=result_code, + json=json) + + def _register_local_post(self, uri, resource_id, + result_code=requests.codes.CREATED): + self.requests.register_uri( + 'POST', + LOCAL_URL + uri, + status_code=result_code, + headers={'location': LOCAL_URL + uri + '/' + str(resource_id)}) + + def _register_local_delete(self, uri, resource_id, json=None, + result_code=requests.codes.NO_CONTENT): + self.requests.register_uri( + 'DELETE', + LOCAL_URL + uri + '/' + str(resource_id), + status_code=result_code, + json=json) + + def _register_local_delete_by_id(self, resource_id, + result_code=requests.codes.NO_CONTENT): + local_resource_re = re.compile(LOCAL_URL + '.+%s$' % resource_id) + self.requests.register_uri( + 'DELETE', + local_resource_re, + status_code=result_code) + + def _register_local_put(self, uri, resource_id, + result_code=requests.codes.NO_CONTENT): + self.requests.register_uri('PUT', + LOCAL_URL + uri + '/' + resource_id, + status_code=result_code) + + def _register_local_get_not_found(self, uri, resource_id, + result_code=requests.codes.NOT_FOUND): + self.requests.register_uri( + 'GET', + LOCAL_URL + uri + '/' + str(resource_id), + status_code=result_code) + + def _helper_register_auth_request(self): + self.requests.register_uri('POST', + LOCAL_URL + URI_AUTH, + status_code=requests.codes.OK, + json={'token-id': 'dummy-token'}) + + def _helper_register_psk_post(self, psk_id): + self._register_local_post(URI_PSK, psk_id) + + def _helper_register_ike_policy_post(self, policy_id): + self._register_local_post(URI_IKE_POLICY, policy_id) + + def _helper_register_ipsec_policy_post(self, policy_id): + self._register_local_post(URI_IPSEC_POLICY, policy_id) + + def _helper_register_tunnel_post(self, tunnel): + self._register_local_post(URI_IPSEC_CONN, tunnel) + + +class TestCsrLoginRestApi(CiscoCsrBaseTestCase): + + """Test logging into CSR to obtain token-id.""" + + def test_get_token(self): + """Obtain the token and its expiration time.""" + self._helper_register_auth_request() + self.assertTrue(self.csr.authenticate()) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertIsNotNone(self.csr.token) + + def test_unauthorized_token_request(self): + """Negative test of invalid user/password.""" + self.requests.register_uri('POST', + LOCAL_URL + URI_AUTH, + status_code=requests.codes.UNAUTHORIZED) + self.csr.auth = ('stack', 'bogus') + self.assertIsNone(self.csr.authenticate()) + self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status) + + def _simulate_wrong_host(self, request): + if 'wrong-host' in request.url: + raise r_exc.ConnectionError() + + def test_non_existent_host(self): + """Negative test of request to non-existent host.""" + self.requests.add_matcher(self._simulate_wrong_host) + self.csr.host = 'wrong-host' + self.csr.token = 'Set by some previously successful access' + self.assertIsNone(self.csr.authenticate()) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + self.assertIsNone(self.csr.token) + + def _simulate_token_timeout(self, request): + raise r_exc.Timeout() + + def test_timeout_on_token_access(self): + """Negative test of a timeout on a request.""" + self.requests.add_matcher(self._simulate_token_timeout) + self.assertIsNone(self.csr.authenticate()) + self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status) + self.assertIsNone(self.csr.token) + + +class TestCsrGetRestApi(CiscoCsrBaseTestCase): + + """Test CSR GET REST API.""" + + def test_valid_rest_gets(self): + """Simple GET requests. + + First request will do a post to get token (login). Assumes + that there are two interfaces on the CSR. + """ + + self._helper_register_auth_request() + self._register_local_get(URI_HOSTNAME, + json={u'kind': u'object#host-name', + u'host-name': u'Router'}) + self._register_local_get(URI_USERS, + json={u'kind': u'collection#local-user', + u'users': ['peter', 'paul', 'mary']}) + + actual = self.csr.get_request(URI_HOSTNAME) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertIn('host-name', actual) + self.assertIsNotNone(actual['host-name']) + + actual = self.csr.get_request(URI_USERS) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertIn('users', actual) + + +class TestCsrPostRestApi(CiscoCsrBaseTestCase): + + """Test CSR POST REST API.""" + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Setup for each test in this suite. + + Each test case will have a normal authentication mock response + registered here, although they may replace it, as needed. + """ + super(TestCsrPostRestApi, self).setUp(host, tunnel_ip, timeout) + self._helper_register_auth_request() + + def test_post_requests(self): + """Simple POST requests (repeatable). + + First request will do a post to get token (login). Assumes + that there are two interfaces (Ge1 and Ge2) on the CSR. + """ + + interface_re = re.compile('https://localhost:55443/.*/interfaces/' + 'GigabitEthernet\d/statistics') + self.requests.register_uri('POST', + interface_re, + status_code=requests.codes.NO_CONTENT) + + actual = self.csr.post_request( + 'interfaces/GigabitEthernet1/statistics', + payload={'action': 'clear'}) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + self.assertIsNone(actual) + actual = self.csr.post_request( + 'interfaces/GigabitEthernet2/statistics', + payload={'action': 'clear'}) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + self.assertIsNone(actual) + + def test_post_with_location(self): + """Create a user and verify that location returned.""" + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.CREATED, + headers={'location': LOCAL_URL + URI_USERS + '/test-user'}) + location = self.csr.post_request( + URI_USERS, + payload={'username': 'test-user', + 'password': 'pass12345', + 'privilege': 15}) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_USERS + '/test-user', location) + + def test_post_missing_required_attribute(self): + """Negative test of POST with missing mandatory info.""" + self.requests.register_uri('POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.BAD_REQUEST) + self.csr.post_request(URI_USERS, + payload={'password': 'pass12345', + 'privilege': 15}) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + def test_post_invalid_attribute(self): + """Negative test of POST with invalid info.""" + self.requests.register_uri('POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.BAD_REQUEST) + self.csr.post_request(URI_USERS, + payload={'username': 'test-user', + 'password': 'pass12345', + 'privilege': 20}) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + def test_post_already_exists(self): + """Negative test of a duplicate POST. + + Uses the lower level _do_request() API to just perform the POST and + obtain the response, without any error processing. + """ + + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.CREATED, + headers={'location': LOCAL_URL + URI_USERS + '/test-user'}) + + location = self.csr._do_request( + 'POST', + URI_USERS, + payload={'username': 'test-user', + 'password': 'pass12345', + 'privilege': 15}, + more_headers=csr_client.HEADER_CONTENT_TYPE_JSON) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_USERS + '/test-user', location) + self.csr.post_request(URI_USERS, + payload={'username': 'test-user', + 'password': 'pass12345', + 'privilege': 20}) + + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.NOT_FOUND, + json={u'error-code': -1, + u'error-message': u'user test-user already exists'}) + + self.csr._do_request( + 'POST', + URI_USERS, + payload={'username': 'test-user', + 'password': 'pass12345', + 'privilege': 15}, + more_headers=csr_client.HEADER_CONTENT_TYPE_JSON) + # Note: For local-user, a 404 error is returned. For + # site-to-site connection a 400 is returned. + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + + def test_post_changing_value(self): + """Negative test of a POST trying to change a value.""" + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.CREATED, + headers={'location': LOCAL_URL + URI_USERS + '/test-user'}) + + location = self.csr.post_request( + URI_USERS, + payload={'username': 'test-user', + 'password': 'pass12345', + 'privilege': 15}) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_USERS + '/test-user', location) + + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.NOT_FOUND, + json={u'error-code': -1, + u'error-message': u'user test-user already exists'}) + + actual = self.csr.post_request(URI_USERS, + payload={'username': 'test-user', + 'password': 'changed', + 'privilege': 15}) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + expected = {u'error-code': -1, + u'error-message': u'user test-user already exists'} + self.assertDictSupersetOf(expected, actual) + + +class TestCsrPutRestApi(CiscoCsrBaseTestCase): + + """Test CSR PUT REST API.""" + + def _save_resources(self): + self._register_local_get(URI_HOSTNAME, + json={u'kind': u'object#host-name', + u'host-name': u'Router'}) + interface_info = {u'kind': u'object#interface', + u'description': u'Changed description', + u'if-name': 'interfaces/GigabitEthernet1', + u'proxy-arp': True, + u'subnet-mask': u'255.255.255.0', + u'icmp-unreachable': True, + u'nat-direction': u'', + u'icmp-redirects': True, + u'ip-address': u'192.168.200.1', + u'verify-unicast-source': False, + u'type': u'ethernet'} + self._register_local_get(URI_INTERFACE_GE1, + json=interface_info) + details = self.csr.get_request(URI_HOSTNAME) + if self.csr.status != requests.codes.OK: + self.fail("Unable to save original host name") + self.original_host = details['host-name'] + details = self.csr.get_request(URI_INTERFACE_GE1) + if self.csr.status != requests.codes.OK: + self.fail("Unable to save interface Ge1 description") + self.original_if = details + self.csr.token = None + + def _restore_resources(self, user, password): + """Restore the host name and interface description. + + Must restore the user and password, so that authentication + token can be obtained (as some tests corrupt auth info). + Will also clear token, so that it gets a fresh token. + """ + + self._register_local_put('global', 'host-name') + self._register_local_put('interfaces', 'GigabitEthernet1') + + self.csr.auth = (user, password) + self.csr.token = None + payload = {'host-name': self.original_host} + self.csr.put_request(URI_HOSTNAME, payload=payload) + if self.csr.status != requests.codes.NO_CONTENT: + self.fail("Unable to restore host name after test") + payload = {'description': self.original_if['description'], + 'if-name': self.original_if['if-name'], + 'ip-address': self.original_if['ip-address'], + 'subnet-mask': self.original_if['subnet-mask'], + 'type': self.original_if['type']} + self.csr.put_request(URI_INTERFACE_GE1, + payload=payload) + if self.csr.status != requests.codes.NO_CONTENT: + self.fail("Unable to restore I/F Ge1 description after test") + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Setup for each test in this suite. + + Each test case will have a normal authentication mock response + registered here, although they may replace it, as needed. In + addition, resources are saved, before each test is run, and + restored, after each test completes. + """ + super(TestCsrPutRestApi, self).setUp(host, tunnel_ip, timeout) + self._helper_register_auth_request() + self._save_resources() + self.addCleanup(self._restore_resources, 'stack', 'cisco') + + def test_put_requests(self): + """Simple PUT requests (repeatable). + + First request will do a post to get token (login). Assumes + that there are two interfaces on the CSR (Ge1 and Ge2). + """ + + self._register_local_put('interfaces', 'GigabitEthernet1') + self._register_local_put('global', 'host-name') + + actual = self.csr.put_request(URI_HOSTNAME, + payload={'host-name': 'TestHost'}) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + self.assertIsNone(actual) + + actual = self.csr.put_request(URI_HOSTNAME, + payload={'host-name': 'TestHost2'}) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + self.assertIsNone(actual) + + def test_change_interface_description(self): + """Test that interface description can be changed. + + This was a problem with an earlier version of the CSR image and is + here to prevent regression. + """ + self._register_local_put('interfaces', 'GigabitEthernet1') + payload = {'description': u'Changed description', + 'if-name': self.original_if['if-name'], + 'ip-address': self.original_if['ip-address'], + 'subnet-mask': self.original_if['subnet-mask'], + 'type': self.original_if['type']} + actual = self.csr.put_request(URI_INTERFACE_GE1, payload=payload) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + self.assertIsNone(actual) + actual = self.csr.get_request(URI_INTERFACE_GE1) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertIn('description', actual) + self.assertEqual(u'Changed description', + actual['description']) + + def ignore_test_change_to_empty_interface_description(self): + """Test that interface description can be changed to empty string. + + This is here to prevent regression, where the CSR was rejecting + an attempt to set the description to an empty string. + """ + self._register_local_put('interfaces', 'GigabitEthernet1') + payload = {'description': '', + 'if-name': self.original_if['if-name'], + 'ip-address': self.original_if['ip-address'], + 'subnet-mask': self.original_if['subnet-mask'], + 'type': self.original_if['type']} + actual = self.csr.put_request(URI_INTERFACE_GE1, payload=payload) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + self.assertIsNone(actual) + actual = self.csr.get_request(URI_INTERFACE_GE1) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertIn('description', actual) + self.assertEqual('', actual['description']) + + +class TestCsrDeleteRestApi(CiscoCsrBaseTestCase): + + """Test CSR DELETE REST API.""" + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Setup for each test in this suite. + + Each test case will have a normal authentication mock response + registered here, although they may replace it, as needed. + """ + super(TestCsrDeleteRestApi, self).setUp(host, tunnel_ip, timeout) + self._helper_register_auth_request() + + def _make_dummy_user(self): + """Create a user that will be later deleted.""" + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_USERS, + status_code=requests.codes.CREATED, + headers={'location': LOCAL_URL + URI_USERS + '/dummy'}) + self.csr.post_request(URI_USERS, + payload={'username': 'dummy', + 'password': 'dummy', + 'privilege': 15}) + self.assertEqual(requests.codes.CREATED, self.csr.status) + + def test_delete_requests(self): + """Simple DELETE requests (creating entry first).""" + self._register_local_delete(URI_USERS, 'dummy') + self._make_dummy_user() + self.csr.token = None # Force login + self.csr.delete_request(URI_USERS + '/dummy') + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + # Delete again, but without logging in this time + self._make_dummy_user() + self.csr.delete_request(URI_USERS + '/dummy') + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + + def test_delete_non_existent_entry(self): + """Negative test of trying to delete a non-existent user.""" + expected = {u'error-code': -1, + u'error-message': u'user unknown not found'} + self._register_local_delete(URI_USERS, 'unknown', + result_code=requests.codes.NOT_FOUND, + json=expected) + actual = self.csr.delete_request(URI_USERS + '/unknown') + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + self.assertDictSupersetOf(expected, actual) + + def test_delete_not_allowed(self): + """Negative test of trying to delete the host-name.""" + self._register_local_delete( + 'global', 'host-name', + result_code=requests.codes.METHOD_NOT_ALLOWED) + self.csr.delete_request(URI_HOSTNAME) + self.assertEqual(requests.codes.METHOD_NOT_ALLOWED, + self.csr.status) + + +class TestCsrRestApiFailures(CiscoCsrBaseTestCase): + + """Test failure cases common for all REST APIs. + + Uses the lower level _do_request() to just perform the operation and get + the result, without any error handling. + """ + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=0.1): + """Setup for each test in this suite. + + Each test case will have a normal authentication mock response + registered here, although they may replace it, as needed. + """ + super(TestCsrRestApiFailures, self).setUp(host, tunnel_ip, timeout) + self._helper_register_auth_request() + + def _simulate_timeout(self, request): + if URI_HOSTNAME in request.path_uri: + raise r_exc.Timeout() + + def test_request_for_non_existent_resource(self): + """Negative test of non-existent resource on REST request.""" + self.requests.register_uri('POST', + LOCAL_URL + 'no/such/request', + status_code=requests.codes.NOT_FOUND) + self.csr.post_request('no/such/request') + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + # The result is HTTP 404 message, so no error content to check + + def _simulate_get_timeout(self, request): + """Will raise exception for any host request to this resource.""" + if URI_HOSTNAME in request.path_url: + raise r_exc.Timeout() + + def test_timeout_during_request(self): + """Negative test of timeout during REST request.""" + self.requests.add_matcher(self._simulate_get_timeout) + self.csr._do_request('GET', URI_HOSTNAME) + self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status) + + def _simulate_auth_failure(self, request): + """First time auth POST is done, re-report unauthorized.""" + if URI_AUTH in request.path_url and not self.called_once: + self.called_once = True + resp = requests.Response() + resp.status_code = requests.codes.UNAUTHORIZED + return resp + + def test_token_expired_on_request(self): + """Token expired before trying a REST request. + + First, the token is set to a bogus value, to force it to + try to authenticate on the GET request. Second, a mock that + runs once, will simulate an auth failure. Third, the normal + auth mock will simulate success. + """ + + self._register_local_get(URI_HOSTNAME, + json={u'kind': u'object#host-name', + u'host-name': u'Router'}) + self.called_once = False + self.requests.add_matcher(self._simulate_auth_failure) + self.csr.token = '123' # These are 44 characters, so won't match + actual = self.csr._do_request('GET', URI_HOSTNAME) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertIn('host-name', actual) + self.assertIsNotNone(actual['host-name']) + + def test_failed_to_obtain_token_for_request(self): + """Negative test of unauthorized user for REST request.""" + self.csr.auth = ('stack', 'bogus') + self._register_local_get(URI_HOSTNAME, + result_code=requests.codes.UNAUTHORIZED) + self.csr._do_request('GET', URI_HOSTNAME) + self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status) + + +class TestCsrRestIkePolicyCreate(CiscoCsrBaseTestCase): + + """Test IKE policy create REST requests.""" + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Setup for each test in this suite. + + Each test case will have a normal authentication and post mock + response registered, although the test may replace them, if needed. + """ + super(TestCsrRestIkePolicyCreate, self).setUp(host, tunnel_ip, timeout) + self._helper_register_auth_request() + self._helper_register_ike_policy_post(2) + + def _helper_register_ike_policy_get(self): + content = {u'kind': u'object#ike-policy', + u'priority-id': u'2', + u'version': u'v1', + u'local-auth-method': u'pre-share', + u'encryption': u'aes256', + u'hash': u'sha', + u'dhGroup': 5, + u'lifetime': 3600} + self._register_local_get(URI_IKE_POLICY_ID % '2', json=content) + + def test_create_delete_ike_policy(self): + """Create and then delete IKE policy.""" + self._helper_register_ike_policy_get() + policy_info = {u'priority-id': u'2', + u'encryption': u'aes256', + u'hash': u'sha', + u'dhGroup': 5, + u'lifetime': 3600} + location = self.csr.create_ike_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IKE_POLICY_ID % '2', location) + # Check the hard-coded items that get set as well... + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + expected_policy = {u'kind': u'object#ike-policy', + u'version': u'v1', + u'local-auth-method': u'pre-share'} + expected_policy.update(policy_info) + self.assertEqual(expected_policy, actual) + + # Now delete and verify the IKE policy is gone + self._register_local_delete(URI_IKE_POLICY, 2) + self._register_local_get_not_found(URI_IKE_POLICY, 2) + + self.csr.delete_ike_policy(2) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + + def test_create_ike_policy_with_defaults(self): + """Create IKE policy using defaults for all optional values.""" + policy = {u'kind': u'object#ike-policy', + u'priority-id': u'2', + u'version': u'v1', + u'local-auth-method': u'pre-share', + u'encryption': u'des', + u'hash': u'sha', + u'dhGroup': 1, + u'lifetime': 86400} + self._register_local_get(URI_IKE_POLICY_ID % '2', json=policy) + policy_info = {u'priority-id': u'2'} + location = self.csr.create_ike_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IKE_POLICY_ID % '2', location) + + # Check the hard-coded items that get set as well... + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + expected_policy = {u'kind': u'object#ike-policy', + u'version': u'v1', + u'encryption': u'des', + u'hash': u'sha', + u'dhGroup': 1, + u'lifetime': 86400, + # Lower level sets this, but it is the default + u'local-auth-method': u'pre-share'} + expected_policy.update(policy_info) + self.assertEqual(expected_policy, actual) + + def test_create_duplicate_ike_policy(self): + """Negative test of trying to create a duplicate IKE policy.""" + self._helper_register_ike_policy_get() + policy_info = {u'priority-id': u'2', + u'encryption': u'aes', + u'hash': u'sha', + u'dhGroup': 5, + u'lifetime': 3600} + location = self.csr.create_ike_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IKE_POLICY_ID % '2', location) + self.requests.register_uri( + 'POST', + LOCAL_URL + URI_IKE_POLICY, + status_code=requests.codes.BAD_REQUEST, + json={u'error-code': -1, + u'error-message': u'policy 2 exist, not allow to ' + u'update policy using POST method'}) + location = self.csr.create_ike_policy(policy_info) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + expected = {u'error-code': -1, + u'error-message': u'policy 2 exist, not allow to ' + u'update policy using POST method'} + self.assertDictSupersetOf(expected, location) + + +class TestCsrRestIPSecPolicyCreate(CiscoCsrBaseTestCase): + + """Test IPSec policy create REST requests.""" + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Set up for each test in this suite. + + Each test case will have a normal authentication and post mock + response registered, although the test may replace them, if needed. + """ + super(TestCsrRestIPSecPolicyCreate, self).setUp(host, + tunnel_ip, + timeout) + self._helper_register_auth_request() + self._helper_register_ipsec_policy_post(123) + + def _helper_register_ipsec_policy_get(self, override=None): + content = {u'kind': u'object#ipsec-policy', + u'mode': u'tunnel', + u'policy-id': u'123', + u'protection-suite': { + u'esp-encryption': u'esp-256-aes', + u'esp-authentication': u'esp-sha-hmac', + u'ah': u'ah-sha-hmac', + }, + u'anti-replay-window-size': u'Disable', + u'lifetime-sec': 120, + u'pfs': u'group5', + u'lifetime-kb': 4608000, + u'idle-time': None} + if override: + content.update(override) + self._register_local_get(URI_IPSEC_POLICY + '/123', json=content) + + def test_create_delete_ipsec_policy(self): + """Create and then delete IPSec policy.""" + policy_info = { + u'policy-id': u'123', + u'protection-suite': { + u'esp-encryption': u'esp-256-aes', + u'esp-authentication': u'esp-sha-hmac', + u'ah': u'ah-sha-hmac', + }, + u'lifetime-sec': 120, + u'pfs': u'group5', + u'anti-replay-window-size': u'disable' + } + location = self.csr.create_ipsec_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_POLICY + '/123', location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_policy_get() + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + expected_policy = {u'kind': u'object#ipsec-policy', + u'mode': u'tunnel', + u'lifetime-kb': 4608000, + u'idle-time': None} + expected_policy.update(policy_info) + # CSR will respond with capitalized value + expected_policy[u'anti-replay-window-size'] = u'Disable' + self.assertEqual(expected_policy, actual) + + # Now delete and verify the IPSec policy is gone + self._register_local_delete(URI_IPSEC_POLICY, 123) + self._register_local_get_not_found(URI_IPSEC_POLICY, 123) + + self.csr.delete_ipsec_policy('123') + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + + def test_create_ipsec_policy_with_defaults(self): + """Create IPSec policy with default for all optional values.""" + policy_info = {u'policy-id': u'123'} + location = self.csr.create_ipsec_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_POLICY + '/123', location) + + # Check the hard-coded items that get set as well... + expected_policy = {u'kind': u'object#ipsec-policy', + u'mode': u'tunnel', + u'policy-id': u'123', + u'protection-suite': {}, + u'lifetime-sec': 3600, + u'pfs': u'Disable', + u'anti-replay-window-size': u'None', + u'lifetime-kb': 4608000, + u'idle-time': None} + self._register_local_get(URI_IPSEC_POLICY + '/123', + json=expected_policy) + + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_policy, actual) + + def test_create_ipsec_policy_with_uuid(self): + """Create IPSec policy using UUID for id.""" + # Override normal POST response w/one that has a different policy ID + self._helper_register_ipsec_policy_post(dummy_policy_id) + policy_info = { + u'policy-id': u'%s' % dummy_policy_id, + u'protection-suite': { + u'esp-encryption': u'esp-256-aes', + u'esp-authentication': u'esp-sha-hmac', + u'ah': u'ah-sha-hmac', + }, + u'lifetime-sec': 120, + u'pfs': u'group5', + u'anti-replay-window-size': u'disable' + } + location = self.csr.create_ipsec_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_POLICY_ID % dummy_policy_id, location) + + # Check the hard-coded items that get set as well... + expected_policy = {u'kind': u'object#ipsec-policy', + u'mode': u'tunnel', + u'lifetime-kb': 4608000, + u'idle-time': None} + expected_policy.update(policy_info) + # CSR will respond with capitalized value + expected_policy[u'anti-replay-window-size'] = u'Disable' + self._register_local_get(URI_IPSEC_POLICY_ID % dummy_policy_id, + json=expected_policy) + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_policy, actual) + + def test_create_ipsec_policy_without_ah(self): + """Create IPSec policy.""" + policy_info = { + u'policy-id': u'123', + u'protection-suite': { + u'esp-encryption': u'esp-aes', + u'esp-authentication': u'esp-sha-hmac', + }, + u'lifetime-sec': 120, + u'pfs': u'group5', + u'anti-replay-window-size': u'128' + } + location = self.csr.create_ipsec_policy(policy_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_POLICY_ID % '123', location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_policy_get( + override={u'anti-replay-window-size': u'128', + u'protection-suite': { + u'esp-encryption': u'esp-aes', + u'esp-authentication': u'esp-sha-hmac'}}) + + actual = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + expected_policy = {u'kind': u'object#ipsec-policy', + u'mode': u'tunnel', + u'lifetime-kb': 4608000, + u'idle-time': None} + expected_policy.update(policy_info) + self.assertEqual(expected_policy, actual) + + def test_invalid_ipsec_policy_lifetime(self): + """Failure test of IPSec policy with unsupported lifetime.""" + # Override normal POST response with one that indicates bad request + self.requests.register_uri('POST', + LOCAL_URL + URI_IPSEC_POLICY, + status_code=requests.codes.BAD_REQUEST) + policy_info = { + u'policy-id': u'123', + u'protection-suite': { + u'esp-encryption': u'esp-aes', + u'esp-authentication': u'esp-sha-hmac', + u'ah': u'ah-sha-hmac', + }, + u'lifetime-sec': 119, + u'pfs': u'group5', + u'anti-replay-window-size': u'128' + } + self.csr.create_ipsec_policy(policy_info) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + def test_create_ipsec_policy_with_invalid_name(self): + """Failure test of creating IPSec policy with name too long.""" + # Override normal POST response with one that indicates bad request + self.requests.register_uri('POST', + LOCAL_URL + URI_IPSEC_POLICY, + status_code=requests.codes.BAD_REQUEST) + policy_info = {u'policy-id': u'policy-name-is-too-long-32-chars'} + self.csr.create_ipsec_policy(policy_info) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + +class TestCsrRestPreSharedKeyCreate(CiscoCsrBaseTestCase): + + """Test Pre-shared key (PSK) create REST requests.""" + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Set up for each test in this suite. + + Each test case will have a normal authentication and post mock + response registered, although the test may replace them, if needed. + """ + super(TestCsrRestPreSharedKeyCreate, self).setUp(host, + tunnel_ip, + timeout) + self._helper_register_auth_request() + self._helper_register_psk_post(5) + + def _helper_register_psk_get(self, override=None): + content = {u'kind': u'object#ike-keyring', + u'keyring-name': u'5', + u'pre-shared-key-list': [ + {u'key': u'super-secret', + u'encrypted': False, + u'peer-address': u'10.10.10.20 255.255.255.0'} + ]} + if override: + content.update(override) + self._register_local_get(URI_PSK_ID % '5', json=content) + + def test_create_delete_pre_shared_key(self): + """Create and then delete a keyring entry for pre-shared key.""" + psk_info = {u'keyring-name': u'5', + u'pre-shared-key-list': [ + {u'key': u'super-secret', + u'encrypted': False, + u'peer-address': u'10.10.10.20/24'} + ]} + location = self.csr.create_pre_shared_key(psk_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_PSK_ID % '5', location) + + # Check the hard-coded items that get set as well... + self._helper_register_psk_get() + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + expected_policy = {u'kind': u'object#ike-keyring'} + expected_policy.update(psk_info) + # Note: the peer CIDR is returned as an IP and mask + expected_policy[u'pre-shared-key-list'][0][u'peer-address'] = ( + u'10.10.10.20 255.255.255.0') + self.assertEqual(expected_policy, content) + + # Now delete and verify pre-shared key is gone + self._register_local_delete(URI_PSK, 5) + self._register_local_get_not_found(URI_PSK, 5) + + self.csr.delete_pre_shared_key('5') + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + + def test_create_pre_shared_key_with_fqdn_peer(self): + """Create pre-shared key using FQDN for peer address.""" + psk_info = {u'keyring-name': u'5', + u'pre-shared-key-list': [ + {u'key': u'super-secret', + u'encrypted': False, + u'peer-address': u'cisco.com'} + ]} + location = self.csr.create_pre_shared_key(psk_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_PSK_ID % '5', location) + + # Check the hard-coded items that get set as well... + self._helper_register_psk_get( + override={u'pre-shared-key-list': [ + {u'key': u'super-secret', + u'encrypted': False, + u'peer-address': u'cisco.com'} + ]} + ) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + expected_policy = {u'kind': u'object#ike-keyring'} + expected_policy.update(psk_info) + self.assertEqual(expected_policy, content) + + +class TestCsrRestIPSecConnectionCreate(CiscoCsrBaseTestCase): + + """Test IPSec site-to-site connection REST requests. + + This requires us to have first created an IKE policy, IPSec policy, + and pre-shared key, so it's more of an itegration test, when used + with a real CSR (as we can't mock out these pre-conditions). + """ + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Setup for each test in this suite. + + Each test case will have a normal authentication mock response + registered here, although they may replace it, as needed. + """ + super(TestCsrRestIPSecConnectionCreate, self).setUp(host, + tunnel_ip, + timeout) + self._helper_register_auth_request() + self.route_id = '10.1.0.0_24_GigabitEthernet1' + + def _make_psk_for_test(self): + psk_id = generate_pre_shared_key_id() + self._remove_resource_for_test(self.csr.delete_pre_shared_key, + psk_id) + self._helper_register_psk_post(psk_id) + psk_info = {u'keyring-name': u'%d' % psk_id, + u'pre-shared-key-list': [ + {u'key': u'super-secret', + u'encrypted': False, + u'peer-address': u'10.10.10.20/24'} + ]} + self.csr.create_pre_shared_key(psk_info) + if self.csr.status != requests.codes.CREATED: + self.fail("Unable to create PSK for test case") + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_pre_shared_key, psk_id) + return psk_id + + def _make_ike_policy_for_test(self): + policy_id = generate_ike_policy_id() + self._remove_resource_for_test(self.csr.delete_ike_policy, + policy_id) + self._helper_register_ike_policy_post(policy_id) + policy_info = {u'priority-id': u'%d' % policy_id, + u'encryption': u'aes', + u'hash': u'sha', + u'dhGroup': 5, + u'lifetime': 3600} + self.csr.create_ike_policy(policy_info) + if self.csr.status != requests.codes.CREATED: + self.fail("Unable to create IKE policy for test case") + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ike_policy, policy_id) + return policy_id + + def _make_ipsec_policy_for_test(self): + policy_id = generate_ipsec_policy_id() + self._remove_resource_for_test(self.csr.delete_ipsec_policy, + policy_id) + self._helper_register_ipsec_policy_post(policy_id) + policy_info = { + u'policy-id': u'%d' % policy_id, + u'protection-suite': { + u'esp-encryption': u'esp-aes', + u'esp-authentication': u'esp-sha-hmac', + u'ah': u'ah-sha-hmac', + }, + u'lifetime-sec': 120, + u'pfs': u'group5', + u'anti-replay-window-size': u'disable' + } + self.csr.create_ipsec_policy(policy_info) + if self.csr.status != requests.codes.CREATED: + self.fail("Unable to create IPSec policy for test case") + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_policy, policy_id) + return policy_id + + def _remove_resource_for_test(self, delete_resource, resource_id): + self._register_local_delete_by_id(resource_id) + delete_resource(resource_id) + + def _prepare_for_site_conn_create(self, skip_psk=False, skip_ike=False, + skip_ipsec=False): + """Create the policies and PSK so can then create site conn.""" + if not skip_psk: + ike_policy_id = self._make_psk_for_test() + else: + ike_policy_id = generate_ike_policy_id() + if not skip_ike: + self._make_ike_policy_for_test() + if not skip_ipsec: + ipsec_policy_id = self._make_ipsec_policy_for_test() + else: + ipsec_policy_id = generate_ipsec_policy_id() + # Note: Use same ID number for tunnel and IPSec policy, so that when + # GET tunnel info, the mocks can infer the IPSec policy ID from the + # tunnel number. + return (ike_policy_id, ipsec_policy_id, ipsec_policy_id) + + def _helper_register_ipsec_conn_get(self, tunnel, override=None): + # Use same number, to allow mock to generate IPSec policy ID + ipsec_policy_id = tunnel[6:] + content = {u'kind': u'object#vpn-site-to-site', + u'vpn-interface-name': u'%s' % tunnel, + u'ip-version': u'ipv4', + u'vpn-type': u'site-to-site', + u'ipsec-policy-id': u'%s' % ipsec_policy_id, + u'ike-profile-id': None, + u'mtu': 1500, + u'local-device': { + u'ip-address': '10.3.0.1/24', + u'tunnel-ip-address': '10.10.10.10' + }, + u'remote-device': { + u'tunnel-ip-address': '10.10.10.20' + }} + if override: + content.update(override) + self._register_local_get(URI_IPSEC_CONN_ID % tunnel, json=content) + + def test_create_delete_ipsec_connection(self): + """Create and then delete an IPSec connection.""" + ike_policy_id, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'mtu': 1500, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + expected_connection = {u'kind': u'object#vpn-site-to-site', + u'ike-profile-id': None, + u'vpn-type': u'site-to-site', + u'mtu': 1500, + u'ip-version': u'ipv4'} + expected_connection.update(connection_info) + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_CONN_ID % tunnel_name, location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_conn_get(tunnel_name) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_connection, content) + + # Now delete and verify that site-to-site connection is gone + self._register_local_delete_by_id(tunnel_name) + self._register_local_delete_by_id(ipsec_policy_id) + self._register_local_delete_by_id(ike_policy_id) + self._register_local_get_not_found(URI_IPSEC_CONN, + tunnel_name) + # Only delete connection. Cleanup will take care of prerequisites + self.csr.delete_ipsec_connection(tunnel_name) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) + + def test_create_ipsec_connection_with_no_tunnel_subnet(self): + """Create an IPSec connection without an IP address on tunnel.""" + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'local-device': {u'ip-address': u'GigabitEthernet3', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + expected_connection = {u'kind': u'object#vpn-site-to-site', + u'ike-profile-id': None, + u'vpn-type': u'site-to-site', + u'mtu': 1500, + u'ip-version': u'ipv4'} + expected_connection.update(connection_info) + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn('vpn-svc/site-to-site/' + tunnel_name, location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_conn_get(tunnel_name, override={ + u'local-device': { + u'ip-address': u'GigabitEthernet3', + u'tunnel-ip-address': u'10.10.10.10' + }}) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_connection, content) + + def test_create_ipsec_connection_no_pre_shared_key(self): + """Test of connection create without associated pre-shared key. + + The CSR will create the connection, but will not be able to pass + traffic without the pre-shared key. + """ + + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create(skip_psk=True)) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'mtu': 1500, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + expected_connection = {u'kind': u'object#vpn-site-to-site', + u'ike-profile-id': None, + u'vpn-type': u'site-to-site', + u'ip-version': u'ipv4'} + expected_connection.update(connection_info) + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_CONN_ID % tunnel_name, location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_conn_get(tunnel_name) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_connection, content) + + def test_create_ipsec_connection_with_default_ike_policy(self): + """Test of connection create without IKE policy (uses default). + + Without an IKE policy, the CSR will use a built-in default IKE + policy setting for the connection. + """ + + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create(skip_ike=True)) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'mtu': 1500, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + expected_connection = {u'kind': u'object#vpn-site-to-site', + u'ike-profile-id': None, + u'vpn-type': u'site-to-site', + u'ip-version': u'ipv4'} + expected_connection.update(connection_info) + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_CONN_ID % tunnel_name, location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_conn_get(tunnel_name) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_connection, content) + + def test_set_ipsec_connection_admin_state_changes(self): + """Create IPSec connection in admin down state.""" + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'mtu': 1500, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_CONN_ID % tunnel_name, location) + + state_url = location + "/state" + state_uri = URI_IPSEC_CONN_ID % tunnel_name + '/state' + # Note: When created, the tunnel will be in admin 'up' state + # Note: Line protocol state will be down, unless have an active conn. + expected_state = {u'kind': u'object#vpn-site-to-site-state', + u'vpn-interface-name': tunnel_name, + u'line-protocol-state': u'down', + u'enabled': False} + self._register_local_put(URI_IPSEC_CONN_ID % tunnel_name, 'state') + self.csr.set_ipsec_connection_state(tunnel_name, admin_up=False) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + + self._register_local_get(state_uri, json=expected_state) + content = self.csr.get_request(state_url, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_state, content) + + self.csr.set_ipsec_connection_state(tunnel_name, admin_up=True) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + expected_state = {u'kind': u'object#vpn-site-to-site-state', + u'vpn-interface-name': tunnel_name, + u'line-protocol-state': u'down', + u'enabled': True} + self._register_local_get(state_uri, json=expected_state) + content = self.csr.get_request(state_url, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_state, content) + + def test_create_ipsec_connection_missing_ipsec_policy(self): + """Negative test of connection create without IPSec policy.""" + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create(skip_ipsec=True)) + tunnel_name = u'Tunnel%s' % tunnel_id + self._register_local_post(URI_IPSEC_CONN, tunnel_name, + result_code=requests.codes.BAD_REQUEST) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + 'Tunnel%d' % tunnel_id) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + def _determine_conflicting_ip(self): + content = {u'kind': u'object#interface', + u'subnet-mask': u'255.255.255.0', + u'ip-address': u'10.5.0.2'} + self._register_local_get('interfaces/GigabitEthernet3', json=content) + details = self.csr.get_request('interfaces/GigabitEthernet3') + if self.csr.status != requests.codes.OK: + self.fail("Unable to obtain interface GigabitEthernet3's IP") + if_ip = details.get('ip-address') + if not if_ip: + self.fail("No IP address for GigabitEthernet3 interface") + return '.'.join(if_ip.split('.')[:3]) + '.10' + + def test_create_ipsec_connection_conficting_tunnel_ip(self): + """Negative test of connection create with conflicting tunnel IP. + + Find out the IP of a local interface (GigabitEthernet3) and create an + IP that is on the same subnet. Note: this interface needs to be up. + """ + + conflicting_ip = self._determine_conflicting_ip() + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._register_local_post(URI_IPSEC_CONN, tunnel_name, + result_code=requests.codes.BAD_REQUEST) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'local-device': {u'ip-address': u'%s/24' % conflicting_ip, + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + def test_create_ipsec_connection_with_max_mtu(self): + """Create an IPSec connection with max MTU value.""" + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'mtu': 9192, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + expected_connection = {u'kind': u'object#vpn-site-to-site', + u'ike-profile-id': None, + u'vpn-type': u'site-to-site', + u'ip-version': u'ipv4'} + expected_connection.update(connection_info) + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_CONN_ID % tunnel_name, location) + + # Check the hard-coded items that get set as well... + self._helper_register_ipsec_conn_get(tunnel_name, override={ + u'mtu': 9192}) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_connection, content) + + def test_create_ipsec_connection_with_bad_mtu(self): + """Negative test of connection create with unsupported MTU value.""" + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._register_local_post(URI_IPSEC_CONN, tunnel_name, + result_code=requests.codes.BAD_REQUEST) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'mtu': 9193, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) + + def test_status_when_no_tunnels_exist(self): + """Get status, when there are no tunnels.""" + content = {u'kind': u'collection#vpn-active-sessions', + u'items': []} + self._register_local_get(URI_SESSIONS, json=content) + tunnels = self.csr.read_tunnel_statuses() + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual([], tunnels) + + def test_status_for_one_tunnel(self): + """Get status of one tunnel.""" + # Create the IPsec site-to-site connection first + _, ipsec_policy_id, tunnel_id = ( + self._prepare_for_site_conn_create()) + tunnel_name = u'Tunnel%s' % tunnel_id + self._helper_register_tunnel_post(tunnel_name) + self._register_local_post(URI_ROUTES, self.route_id) + connection_info = { + u'vpn-interface-name': tunnel_name, + u'ipsec-policy-id': u'%d' % ipsec_policy_id, + u'local-device': {u'ip-address': u'10.3.0.1/24', + u'tunnel-ip-address': u'10.10.10.10'}, + u'remote-device': {u'tunnel-ip-address': u'10.10.10.20'} + } + location = self.csr.create_ipsec_connection(connection_info) + self.addCleanup(self._remove_resource_for_test, + self.csr.delete_ipsec_connection, + tunnel_name) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_IPSEC_CONN_ID % tunnel_name, location) + + # Now, check the status + content = {u'kind': u'collection#vpn-active-sessions', + u'items': [{u'status': u'DOWN-NEGOTIATING', + u'vpn-interface-name': tunnel_name}, ]} + self._register_local_get(URI_SESSIONS, json=content) + self._helper_register_ipsec_conn_get(tunnel_name) + tunnels = self.csr.read_tunnel_statuses() + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual([(tunnel_name, u'DOWN-NEGOTIATING'), ], tunnels) + + +class TestCsrRestIkeKeepaliveCreate(CiscoCsrBaseTestCase): + + """Test IKE keepalive REST requests. + + Note: On the Cisco CSR, the IKE keepalive for v1 is a global configuration + that applies to all VPN tunnels to specify Dead Peer Detection information. + As a result, this REST API is not used in the OpenStack device driver, and + the keepalive will default to zero (disabled). + """ + + def _save_dpd_info(self): + details = self.csr.get_request(URI_KEEPALIVE) + if self.csr.status == requests.codes.OK: + self.dpd = details + self.addCleanup(self._restore_dpd_info) + elif self.csr.status != requests.codes.NOT_FOUND: + self.fail("Unable to save original DPD info") + + def _restore_dpd_info(self): + payload = {'interval': self.dpd['interval'], + 'retry': self.dpd['retry']} + self.csr.put_request(URI_KEEPALIVE, payload=payload) + if self.csr.status != requests.codes.NO_CONTENT: + self.fail("Unable to restore DPD info after test") + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Set up for each test in this suite. + + Each test case will have a normal authentication, get, and put mock + responses registered, although the test may replace them, if needed. + Dead Peer Detection settions will be saved for each test, and + restored afterwards. + """ + super(TestCsrRestIkeKeepaliveCreate, self).setUp(host, + tunnel_ip, + timeout) + self._helper_register_auth_request() + self._helper_register_keepalive_get() + self._register_local_put('vpn-svc/ike', 'keepalive') + self._save_dpd_info() + self.csr.token = None + + def _helper_register_keepalive_get(self, override=None): + content = {u'interval': 60, + u'retry': 4, + u'periodic': True} + if override: + content.update(override) + self._register_local_get(URI_KEEPALIVE, json=content) + + def test_configure_ike_keepalive(self): + """Set IKE keep-alive (aka Dead Peer Detection) for the CSR.""" + keepalive_info = {'interval': 60, 'retry': 4} + self.csr.configure_ike_keepalive(keepalive_info) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + content = self.csr.get_request(URI_KEEPALIVE) + self.assertEqual(requests.codes.OK, self.csr.status) + expected = {'periodic': False} + expected.update(keepalive_info) + self.assertDictSupersetOf(expected, content) + + def test_disable_ike_keepalive(self): + """Disable IKE keep-alive (aka Dead Peer Detection) for the CSR.""" + keepalive_info = {'interval': 0, 'retry': 4} + self.csr.configure_ike_keepalive(keepalive_info) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + + +class TestCsrRestStaticRoute(CiscoCsrBaseTestCase): + + """Test static route REST requests. + + A static route is added for the peer's private network. Would create + a route for each of the peer CIDRs specified for the VPN connection. + """ + + def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): + """Set up for each test in this suite. + + Each test case will have a normal authentication mock response + registered, although the test may replace it, if needed. + """ + super(TestCsrRestStaticRoute, self).setUp(host, tunnel_ip, timeout) + self._helper_register_auth_request() + + def test_create_delete_static_route(self): + """Create and then delete a static route for the tunnel.""" + expected_id = '10.1.0.0_24_GigabitEthernet1' + self._register_local_post(URI_ROUTES, resource_id=expected_id) + cidr = u'10.1.0.0/24' + interface = u'GigabitEthernet1' + route_info = {u'destination-network': cidr, + u'outgoing-interface': interface} + location = self.csr.create_static_route(route_info) + self.assertEqual(requests.codes.CREATED, self.csr.status) + self.assertIn(URI_ROUTES_ID % expected_id, location) + + # Check the hard-coded items that get set as well... + expected_route = {u'destination-network': u'10.1.0.0/24', + u'kind': u'object#static-route', + u'next-hop-router': None, + u'outgoing-interface': u'GigabitEthernet1', + u'admin-distance': 1} + self._register_local_get(URI_ROUTES_ID % expected_id, + json=expected_route) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.OK, self.csr.status) + self.assertEqual(expected_route, content) + + # Now delete and verify that static route is gone + self._register_local_delete(URI_ROUTES, expected_id) + self._register_local_get_not_found(URI_ROUTES, expected_id) + route_id = csr_client.make_route_id(cidr, interface) + self.csr.delete_static_route(route_id) + self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) + content = self.csr.get_request(location, full_url=True) + self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)