Support enhancements to Cisco CSR VPN REST APIs

Incorporate latest enhancements and fixes in Cisco CSR router REST API
to the VPNaaS device driver and unit test cases. Primarily, is support in
the REST API for different IKE and IPSec encryption algorithms, name length
error handling, disable of anti replay window, and IKE keep alive.

Also includes minor typos and comment fixes mentioned in previous reviews.

Note: notest_cisco_csr_rest.py, which tests the CSR REST client, is not
part of the UT suite yet, pending resolution of httmock package inclusion
as test requirement.

Change-Id: I931f487fbd4ead93a1648e89c9c383b3d55fc07c
Closes-Bug: 1303820
This commit is contained in:
Paul Michali 2014-04-04 19:14:36 +00:00
parent f89d3fec9b
commit cb948a4705
5 changed files with 170 additions and 109 deletions

View File

@ -567,9 +567,8 @@ class CiscoCsrIPSecConnection(object):
# encryption_algorithm -> encryption # encryption_algorithm -> encryption
'3des': u'3des', '3des': u'3des',
'aes-128': u'aes', 'aes-128': u'aes',
# TODO(pcm) update these 2 once CSR updated 'aes-192': u'aes192',
'aes-192': u'aes', 'aes-256': u'aes256',
'aes-256': u'aes',
# pfs -> dhGroup # pfs -> dhGroup
'group2': 2, 'group2': 2,
'group5': 5, 'group5': 5,
@ -584,9 +583,8 @@ class CiscoCsrIPSecConnection(object):
# encryption_algorithm -> esp-encryption # encryption_algorithm -> esp-encryption
'3des': u'esp-3des', '3des': u'esp-3des',
'aes-128': u'esp-aes', 'aes-128': u'esp-aes',
# TODO(pcm) update these 2 once CSR updated 'aes-192': u'esp-192-aes',
'aes-192': u'esp-aes', 'aes-256': u'esp-256-aes',
'aes-256': u'esp-aes',
# pfs -> pfs # pfs -> pfs
'group2': u'group2', 'group2': u'group2',
'group5': u'group5', 'group5': u'group5',
@ -664,8 +662,7 @@ class CiscoCsrIPSecConnection(object):
u'esp-authentication': auth_algorithm}, u'esp-authentication': auth_algorithm},
u'lifetime-sec': lifetime, u'lifetime-sec': lifetime,
u'pfs': group, u'pfs': group,
# TODO(pcm): Remove when CSR fixes 'Disable' u'anti-replay-window-size': u'disable'}
u'anti-replay-window-size': u'64'}
if transform_protocol: if transform_protocol:
settings[u'protection-suite'][u'ah'] = transform_protocol settings[u'protection-suite'][u'ah'] = transform_protocol
return settings return settings

View File

@ -627,7 +627,7 @@ class IPsecDriver(device_drivers.DeviceDriver):
def update_downed_connections(self, process_id, new_status): def update_downed_connections(self, process_id, new_status):
"""Update info to be reported, if connections just went down. """Update info to be reported, if connections just went down.
If there is no longer any information for a connection (because it If there is no longer any information for a connection, because it
has been removed (e.g. due to an admin down of VPN service or IPSec has been removed (e.g. due to an admin down of VPN service or IPSec
connection), but there was previous status information for the connection), but there was previous status information for the
connection, mark the connection as down for reporting purposes. connection, mark the connection as down for reporting purposes.

View File

@ -28,11 +28,6 @@ from neutron.openstack.common import log as logging
# now, uncomment and include httmock source to UT # now, uncomment and include httmock source to UT
from neutron.tests.unit.services.vpn.device_drivers import httmock from neutron.tests.unit.services.vpn.device_drivers import httmock
# TODO(pcm) Remove, once verified these have been fixed
FIXED_CSCum35484 = False
FIXED_CSCul82396 = False
FIXED_CSCum10324 = False
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -143,7 +138,7 @@ def expired_request(url, request):
def normal_get(url, request): def normal_get(url, request):
if request.method != 'GET': if request.method != 'GET':
return return
LOG.debug("DEBUG: GET mock for %s", url) LOG.debug("GET mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
if 'global/host-name' in url.path: if 'global/host-name' in url.path:
@ -174,7 +169,7 @@ def normal_get(url, request):
u'priority-id': u'2', u'priority-id': u'2',
u'version': u'v1', u'version': u'v1',
u'local-auth-method': u'pre-share', u'local-auth-method': u'pre-share',
u'encryption': u'aes', u'encryption': u'aes256',
u'hash': u'sha', u'hash': u'sha',
u'dhGroup': 5, u'dhGroup': 5,
u'lifetime': 3600} u'lifetime': 3600}
@ -194,11 +189,11 @@ def normal_get(url, request):
u'mode': u'tunnel', u'mode': u'tunnel',
u'policy-id': u'%s' % ipsec_policy_id, u'policy-id': u'%s' % ipsec_policy_id,
u'protection-suite': { u'protection-suite': {
u'esp-encryption': u'esp-aes', u'esp-encryption': u'esp-256-aes',
u'esp-authentication': u'esp-sha-hmac', u'esp-authentication': u'esp-sha-hmac',
u'ah': u'ah-sha-hmac', u'ah': u'ah-sha-hmac',
}, },
u'anti-replay-window-size': u'128', u'anti-replay-window-size': u'Disable',
u'lifetime-sec': 120, u'lifetime-sec': 120,
u'pfs': u'group5', u'pfs': u'group5',
u'lifetime-kb': 4608000, u'lifetime-kb': 4608000,
@ -246,7 +241,7 @@ def normal_get(url, request):
@filter_request(['get'], 'vpn-svc/ike/keyrings') @filter_request(['get'], 'vpn-svc/ike/keyrings')
@httmock.urlmatch(netloc=r'localhost') @httmock.urlmatch(netloc=r'localhost')
def get_fqdn(url, request): def get_fqdn(url, request):
LOG.debug("DEBUG: GET FQDN mock for %s", url) LOG.debug("GET FQDN mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
content = {u'kind': u'object#ike-keyring', content = {u'kind': u'object#ike-keyring',
@ -262,7 +257,7 @@ def get_fqdn(url, request):
@filter_request(['get'], 'vpn-svc/ipsec/policies/') @filter_request(['get'], 'vpn-svc/ipsec/policies/')
@httmock.urlmatch(netloc=r'localhost') @httmock.urlmatch(netloc=r'localhost')
def get_no_ah(url, request): def get_no_ah(url, request):
LOG.debug("DEBUG: GET No AH mock for %s", url) LOG.debug("GET No AH mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
ipsec_policy_id = url.path.split('/')[-1] ipsec_policy_id = url.path.split('/')[-1]
@ -285,7 +280,7 @@ def get_no_ah(url, request):
def get_defaults(url, request): def get_defaults(url, request):
if request.method != 'GET': if request.method != 'GET':
return return
LOG.debug("DEBUG: GET mock for %s", url) LOG.debug("GET mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
if 'vpn-svc/ike/policies/2' in url.path: if 'vpn-svc/ike/policies/2' in url.path:
@ -393,7 +388,7 @@ def get_local_ip(url, request):
def post(url, request): def post(url, request):
if request.method != 'POST': if request.method != 'POST':
return return
LOG.debug("DEBUG: POST mock for %s", url) LOG.debug("POST mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
if 'interfaces/GigabitEthernet' in url.path: if 'interfaces/GigabitEthernet' in url.path:
@ -432,7 +427,7 @@ def post(url, request):
@filter_request(['post'], 'global/local-users') @filter_request(['post'], 'global/local-users')
@httmock.urlmatch(netloc=r'localhost') @httmock.urlmatch(netloc=r'localhost')
def post_change_attempt(url, request): def post_change_attempt(url, request):
LOG.debug("DEBUG: POST change value mock for %s", url) LOG.debug("POST change value mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.NOT_FOUND, return {'status_code': requests.codes.NOT_FOUND,
@ -443,7 +438,7 @@ def post_change_attempt(url, request):
@httmock.urlmatch(netloc=r'localhost') @httmock.urlmatch(netloc=r'localhost')
def post_duplicate(url, request): def post_duplicate(url, request):
LOG.debug("DEBUG: POST duplicate mock for %s", url) LOG.debug("POST duplicate mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST, return {'status_code': requests.codes.BAD_REQUEST,
@ -456,7 +451,7 @@ def post_duplicate(url, request):
@filter_request(['post'], 'vpn-svc/site-to-site') @filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost') @httmock.urlmatch(netloc=r'localhost')
def post_missing_ipsec_policy(url, request): def post_missing_ipsec_policy(url, request):
LOG.debug("DEBUG: POST missing ipsec policy mock for %s", url) LOG.debug("POST missing ipsec policy mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST} return {'status_code': requests.codes.BAD_REQUEST}
@ -465,7 +460,7 @@ def post_missing_ipsec_policy(url, request):
@filter_request(['post'], 'vpn-svc/site-to-site') @filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost') @httmock.urlmatch(netloc=r'localhost')
def post_missing_ike_policy(url, request): def post_missing_ike_policy(url, request):
LOG.debug("DEBUG: POST missing ike policy mock for %s", url) LOG.debug("POST missing ike policy mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST} return {'status_code': requests.codes.BAD_REQUEST}
@ -474,7 +469,7 @@ def post_missing_ike_policy(url, request):
@filter_request(['post'], 'vpn-svc/site-to-site') @filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost') @httmock.urlmatch(netloc=r'localhost')
def post_bad_ip(url, request): def post_bad_ip(url, request):
LOG.debug("DEBUG: POST bad IP mock for %s", url) LOG.debug("POST bad IP mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST} return {'status_code': requests.codes.BAD_REQUEST}
@ -483,7 +478,7 @@ def post_bad_ip(url, request):
@filter_request(['post'], 'vpn-svc/site-to-site') @filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost') @httmock.urlmatch(netloc=r'localhost')
def post_bad_mtu(url, request): def post_bad_mtu(url, request):
LOG.debug("DEBUG: POST bad mtu mock for %s", url) LOG.debug("POST bad mtu mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST} return {'status_code': requests.codes.BAD_REQUEST}
@ -492,7 +487,16 @@ def post_bad_mtu(url, request):
@filter_request(['post'], 'vpn-svc/ipsec/policies') @filter_request(['post'], 'vpn-svc/ipsec/policies')
@httmock.urlmatch(netloc=r'localhost') @httmock.urlmatch(netloc=r'localhost')
def post_bad_lifetime(url, request): def post_bad_lifetime(url, request):
LOG.debug("DEBUG: POST bad lifetime mock for %s", url) LOG.debug("POST bad lifetime mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@filter_request(['post'], 'vpn-svc/ipsec/policies')
@httmock.urlmatch(netloc=r'localhost')
def post_bad_name(url, request):
LOG.debug("POST bad IPSec policy name for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST} return {'status_code': requests.codes.BAD_REQUEST}
@ -502,7 +506,7 @@ def post_bad_lifetime(url, request):
def put(url, request): def put(url, request):
if request.method != 'PUT': if request.method != 'PUT':
return return
LOG.debug("DEBUG: PUT mock for %s", url) LOG.debug("PUT mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource # Any resource
@ -513,7 +517,7 @@ def put(url, request):
def delete(url, request): def delete(url, request):
if request.method != 'DELETE': if request.method != 'DELETE':
return return
LOG.debug("DEBUG: DELETE mock for %s", url) LOG.debug("DELETE mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource # Any resource
@ -524,7 +528,7 @@ def delete(url, request):
def delete_unknown(url, request): def delete_unknown(url, request):
if request.method != 'DELETE': if request.method != 'DELETE':
return return
LOG.debug("DEBUG: DELETE unknown mock for %s", url) LOG.debug("DELETE unknown mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource # Any resource
@ -538,7 +542,7 @@ def delete_unknown(url, request):
def delete_not_allowed(url, request): def delete_not_allowed(url, request):
if request.method != 'DELETE': if request.method != 'DELETE':
return return
LOG.debug("DEBUG: DELETE not allowed mock for %s", url) LOG.debug("DELETE not allowed mock for %s", url)
if not request.headers.get('X-auth-token', None): if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED} return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource # Any resource

View File

@ -19,7 +19,8 @@
import random import random
import httmock # TODO(pcm) Uncomment when httmock is added to test requirements.
# import httmock
import requests import requests
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
@ -28,9 +29,9 @@ from neutron.services.vpn.device_drivers import (
from neutron.tests import base from neutron.tests import base
from neutron.tests.unit.services.vpn.device_drivers import ( from neutron.tests.unit.services.vpn.device_drivers import (
cisco_csr_mock as csr_request) cisco_csr_mock as csr_request)
# TODO(pcm) Remove once httmock is available. In the meantime, use temp # TODO(pcm) Remove once httmock is available. In the meantime, use
# copy of hhtmock source to run UT # temporary local copy of httmock source to run UT
# from neutron.tests.unit.services.vpn.device_drivers import httmock from neutron.tests.unit.services.vpn.device_drivers import httmock
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -39,10 +40,7 @@ if True:
logging.CONF.set_override('debug', True) logging.CONF.set_override('debug', True)
logging.setup('neutron') logging.setup('neutron')
if csr_request.FIXED_CSCum35484: dummy_policy_id = 'dummy-ipsec-policy-id-name'
dummy_uuid = '1eb4ee6b-0870-45a0-b554-7b69096'
else:
dummy_uuid = '1eb4ee6b-0870-45a0-b554-7b'
# Note: Helper functions to test reuse of IDs. # Note: Helper functions to test reuse of IDs.
@ -62,10 +60,10 @@ class TestCsrLoginRestApi(base.BaseTestCase):
"""Test logging into CSR to obtain token-id.""" """Test logging into CSR to obtain token-id."""
def setUp(self): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrLoginRestApi, self).setUp() super(TestCsrLoginRestApi, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10', self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
'stack', 'cisco') timeout)
def test_get_token(self): def test_get_token(self):
"""Obtain the token and its expiration time.""" """Obtain the token and its expiration time."""
@ -102,10 +100,10 @@ class TestCsrGetRestApi(base.BaseTestCase):
"""Test CSR GET REST API.""" """Test CSR GET REST API."""
def setUp(self): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrGetRestApi, self).setUp() super(TestCsrGetRestApi, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10', self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
'stack', 'cisco') timeout)
def test_valid_rest_gets(self): def test_valid_rest_gets(self):
"""Simple GET requests. """Simple GET requests.
@ -129,10 +127,10 @@ class TestCsrPostRestApi(base.BaseTestCase):
"""Test CSR POST REST API.""" """Test CSR POST REST API."""
def setUp(self): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrPostRestApi, self).setUp() super(TestCsrPostRestApi, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10', self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
'stack', 'cisco') timeout)
def test_post_requests(self): def test_post_requests(self):
"""Simple POST requests (repeatable). """Simple POST requests (repeatable).
@ -275,11 +273,11 @@ class TestCsrPutRestApi(base.BaseTestCase):
if self.csr.status != requests.codes.NO_CONTENT: if self.csr.status != requests.codes.NO_CONTENT:
self.fail("Unable to restore I/F Ge1 description after test") self.fail("Unable to restore I/F Ge1 description after test")
def setUp(self): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
"""Prepare for PUT API tests.""" """Prepare for PUT API tests."""
super(TestCsrPutRestApi, self).setUp() super(TestCsrPutRestApi, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10', self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
'stack', 'cisco') timeout)
self._save_resources() self._save_resources()
self.addCleanup(self._restore_resources, 'stack', 'cisco') self.addCleanup(self._restore_resources, 'stack', 'cisco')
@ -357,10 +355,10 @@ class TestCsrDeleteRestApi(base.BaseTestCase):
"""Test CSR DELETE REST API.""" """Test CSR DELETE REST API."""
def setUp(self): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrDeleteRestApi, self).setUp() super(TestCsrDeleteRestApi, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10', self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
'stack', 'cisco') timeout)
def _make_dummy_user(self): def _make_dummy_user(self):
"""Create a user that will be later deleted.""" """Create a user that will be later deleted."""
@ -409,10 +407,10 @@ class TestCsrRestApiFailures(base.BaseTestCase):
the result, without any error handling. the result, without any error handling.
""" """
def setUp(self): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=0.1):
super(TestCsrRestApiFailures, self).setUp() super(TestCsrRestApiFailures, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10', self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
'stack', 'cisco', timeout=0.1) timeout)
def test_request_for_non_existent_resource(self): def test_request_for_non_existent_resource(self):
"""Negative test of non-existent resource on REST request.""" """Negative test of non-existent resource on REST request."""
@ -455,10 +453,10 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
"""Test IKE policy create REST requests.""" """Test IKE policy create REST requests."""
def setUp(self): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIkePolicyCreate, self).setUp() super(TestCsrRestIkePolicyCreate, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10', self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
'stack', 'cisco') timeout)
def test_create_delete_ike_policy(self): def test_create_delete_ike_policy(self):
"""Create and then delete IKE policy.""" """Create and then delete IKE policy."""
@ -466,7 +464,7 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
csr_request.normal_get): csr_request.normal_get):
policy_id = '2' policy_id = '2'
policy_info = {u'priority-id': u'%s' % policy_id, policy_info = {u'priority-id': u'%s' % policy_id,
u'encryption': u'aes', u'encryption': u'aes256',
u'hash': u'sha', u'hash': u'sha',
u'dhGroup': 5, u'dhGroup': 5,
u'lifetime': 3600} u'lifetime': 3600}
@ -538,10 +536,10 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
"""Test IPSec policy create REST requests.""" """Test IPSec policy create REST requests."""
def setUp(self): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIPSecPolicyCreate, self).setUp() super(TestCsrRestIPSecPolicyCreate, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10', self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
'stack', 'cisco') timeout)
def test_create_delete_ipsec_policy(self): def test_create_delete_ipsec_policy(self):
"""Create and then delete IPSec policy.""" """Create and then delete IPSec policy."""
@ -551,13 +549,13 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
policy_info = { policy_info = {
u'policy-id': u'%s' % policy_id, u'policy-id': u'%s' % policy_id,
u'protection-suite': { u'protection-suite': {
u'esp-encryption': u'esp-aes', u'esp-encryption': u'esp-256-aes',
u'esp-authentication': u'esp-sha-hmac', u'esp-authentication': u'esp-sha-hmac',
u'ah': u'ah-sha-hmac', u'ah': u'ah-sha-hmac',
}, },
u'lifetime-sec': 120, u'lifetime-sec': 120,
u'pfs': u'group5', u'pfs': u'group5',
u'anti-replay-window-size': u'128' u'anti-replay-window-size': u'disable'
} }
location = self.csr.create_ipsec_policy(policy_info) location = self.csr.create_ipsec_policy(policy_info)
self.assertEqual(requests.codes.CREATED, self.csr.status) self.assertEqual(requests.codes.CREATED, self.csr.status)
@ -570,6 +568,8 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
u'lifetime-kb': 4608000, u'lifetime-kb': 4608000,
u'idle-time': None} u'idle-time': None}
expected_policy.update(policy_info) expected_policy.update(policy_info)
# CSR will respond with capitalized value
expected_policy[u'anti-replay-window-size'] = u'Disable'
self.assertEqual(expected_policy, content) self.assertEqual(expected_policy, content)
# Now delete and verify the IPSec policy is gone # Now delete and verify the IPSec policy is gone
with httmock.HTTMock(csr_request.token, csr_request.delete, with httmock.HTTMock(csr_request.token, csr_request.delete,
@ -609,19 +609,19 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
with httmock.HTTMock(csr_request.token, csr_request.post, with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get): csr_request.normal_get):
policy_info = { policy_info = {
u'policy-id': u'%s' % dummy_uuid, u'policy-id': u'%s' % dummy_policy_id,
u'protection-suite': { u'protection-suite': {
u'esp-encryption': u'esp-aes', u'esp-encryption': u'esp-256-aes',
u'esp-authentication': u'esp-sha-hmac', u'esp-authentication': u'esp-sha-hmac',
u'ah': u'ah-sha-hmac', u'ah': u'ah-sha-hmac',
}, },
u'lifetime-sec': 120, u'lifetime-sec': 120,
u'pfs': u'group5', u'pfs': u'group5',
u'anti-replay-window-size': u'128' u'anti-replay-window-size': u'disable'
} }
location = self.csr.create_ipsec_policy(policy_info) location = self.csr.create_ipsec_policy(policy_info)
self.assertEqual(requests.codes.CREATED, self.csr.status) self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('vpn-svc/ipsec/policies/%s' % dummy_uuid, self.assertIn('vpn-svc/ipsec/policies/%s' % dummy_policy_id,
location) location)
# Check the hard-coded items that get set as well... # Check the hard-coded items that get set as well...
content = self.csr.get_request(location, full_url=True) content = self.csr.get_request(location, full_url=True)
@ -631,6 +631,8 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
u'lifetime-kb': 4608000, u'lifetime-kb': 4608000,
u'idle-time': None} u'idle-time': None}
expected_policy.update(policy_info) expected_policy.update(policy_info)
# CSR will respond with capitalized value
expected_policy[u'anti-replay-window-size'] = u'Disable'
self.assertEqual(expected_policy, content) self.assertEqual(expected_policy, content)
def test_create_ipsec_policy_without_ah(self): def test_create_ipsec_policy_without_ah(self):
@ -680,15 +682,26 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
self.csr.create_ipsec_policy(policy_info) self.csr.create_ipsec_policy(policy_info)
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
def test_create_ipsec_policy_with_invalid_name(self):
"""Failure test of creating IPSec policy with name too long."""
with httmock.HTTMock(csr_request.token, csr_request.post_bad_name,
csr_request.get_defaults):
policy_id = 'policy-name-is-too-long-32-chars'
policy_info = {
u'policy-id': u'%s' % policy_id,
}
self.csr.create_ipsec_policy(policy_info)
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
class TestCsrRestPreSharedKeyCreate(base.BaseTestCase): class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
"""Test Pre-shared key (PSK) create REST requests.""" """Test Pre-shared key (PSK) create REST requests."""
def setUp(self): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestPreSharedKeyCreate, self).setUp() super(TestCsrRestPreSharedKeyCreate, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10', self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
'stack', 'cisco') timeout)
def test_create_delete_pre_shared_key(self): def test_create_delete_pre_shared_key(self):
"""Create and then delete a keyring entry for pre-shared key.""" """Create and then delete a keyring entry for pre-shared key."""
@ -777,10 +790,10 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
with a real CSR (as we can't mock out these pre-conditions. with a real CSR (as we can't mock out these pre-conditions.
""" """
def setUp(self): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIPSecConnectionCreate, self).setUp() super(TestCsrRestIPSecConnectionCreate, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10', self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
'stack', 'cisco') timeout)
def _make_psk_for_test(self): def _make_psk_for_test(self):
psk_id = generate_pre_shared_key_id() psk_id = generate_pre_shared_key_id()
@ -831,7 +844,7 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
}, },
u'lifetime-sec': 120, u'lifetime-sec': 120,
u'pfs': u'group5', u'pfs': u'group5',
u'anti-replay-window-size': u'64' u'anti-replay-window-size': u'disable'
} }
self.csr.create_ipsec_policy(policy_info) self.csr.create_ipsec_policy(policy_info)
if self.csr.status != requests.codes.CREATED: if self.csr.status != requests.codes.CREATED:
@ -1136,16 +1149,35 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
"""Test IKE keepalive REST requests. """Test IKE keepalive REST requests.
This is a global configuration that will apply to all VPN tunnels and Note: On the Cisco CSR, the IKE keepalive for v1 is a global configuration
is used to specify Dead Peer Detection information. Currently, the API that applies to all VPN tunnels to specify Dead Peer Detection information.
supports DELETE API, but a bug has been created to remove the API and As a result, this REST API is not used in the OpenStack device driver, and
add an indicator of when the capability is disabled. the keepalive will default to zero (disabled).
""" """
def setUp(self): def _save_dpd_info(self):
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
details = self.csr.get_request('vpn-svc/ike/keepalive')
if self.csr.status == requests.codes.OK:
self.dpd = details
self.addCleanup(self._restore_dpd_info)
elif self.csr.status != requests.codes.NOT_FOUND:
self.fail("Unable to save original DPD info")
def _restore_dpd_info(self):
with httmock.HTTMock(csr_request.token, csr_request.put):
payload = {'interval': self.dpd['interval'],
'retry': self.dpd['retry']}
self.csr.put_request('vpn-svc/ike/keepalive', payload=payload)
if self.csr.status != requests.codes.NO_CONTENT:
self.fail("Unable to restore DPD info after test")
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIkeKeepaliveCreate, self).setUp() super(TestCsrRestIkeKeepaliveCreate, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10', self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
'stack', 'cisco') timeout)
self._save_dpd_info()
self.csr.token = None
def test_configure_ike_keepalive(self): def test_configure_ike_keepalive(self):
"""Set IKE keep-alive (aka Dead Peer Detection) for the CSR.""" """Set IKE keep-alive (aka Dead Peer Detection) for the CSR."""
@ -1164,18 +1196,9 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
"""Disable IKE keep-alive (aka Dead Peer Detection) for the CSR.""" """Disable IKE keep-alive (aka Dead Peer Detection) for the CSR."""
with httmock.HTTMock(csr_request.token, csr_request.delete, with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.put, csr_request.get_not_configured): csr_request.put, csr_request.get_not_configured):
if csr_request.FIXED_CSCum10324: keepalive_info = {'interval': 0, 'retry': 4}
# TODO(pcm) Is this how to disable? self.csr.configure_ike_keepalive(keepalive_info)
keepalive_info = {'interval': 0, 'retry': 4} self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
self.csr.configure_ike_keepalive(keepalive_info)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
else:
self.csr.delete_request('vnc-svc/ike/keepalive')
self.assertIn(self.csr.status,
(requests.codes.NO_CONTENT,
requests.codes.NOT_FOUND))
self.csr.get_request('vpn-svc/ike/keepalive')
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
class TestCsrRestStaticRoute(base.BaseTestCase): class TestCsrRestStaticRoute(base.BaseTestCase):
@ -1186,10 +1209,10 @@ class TestCsrRestStaticRoute(base.BaseTestCase):
a route for each of the peer CIDRs specified for the VPN connection. a route for each of the peer CIDRs specified for the VPN connection.
""" """
def setUp(self): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestStaticRoute, self).setUp() super(TestCsrRestStaticRoute, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10', self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
'stack', 'cisco') timeout)
def test_create_delete_static_route(self): def test_create_delete_static_route(self):
"""Create and then delete a static route for the tunnel.""" """Create and then delete a static route for the tunnel."""

View File

@ -259,6 +259,20 @@ class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
self.conn_info) self.conn_info)
self.assertEqual(expected, policy_info) self.assertEqual(expected, policy_info)
def test_create_ike_policy_info_different_encryption(self):
"""Ensure that IKE policy info is mapped/created correctly."""
self.conn_info['ike_policy']['encryption_algorithm'] = 'aes-192'
expected = {u'priority-id': 222,
u'encryption': u'aes192',
u'hash': u'sha',
u'dhGroup': 5,
u'version': u'v1',
u'lifetime': 3600}
policy_id = self.conn_info['cisco']['ike_policy_id']
policy_info = self.ipsec_conn.create_ike_policy_info(policy_id,
self.conn_info)
self.assertEqual(expected, policy_info)
def test_create_ike_policy_info_non_defaults(self): def test_create_ike_policy_info_non_defaults(self):
"""Ensure that IKE policy info with different values.""" """Ensure that IKE policy info with different values."""
self.conn_info['ike_policy'] = { self.conn_info['ike_policy'] = {
@ -270,7 +284,7 @@ class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
'lifetime_value': 60 'lifetime_value': 60
} }
expected = {u'priority-id': 222, expected = {u'priority-id': 222,
u'encryption': u'aes', # TODO(pcm): fix u'encryption': u'aes256',
u'hash': u'sha', u'hash': u'sha',
u'dhGroup': 14, u'dhGroup': 14,
u'version': u'v1', u'version': u'v1',
@ -281,7 +295,11 @@ class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
self.assertEqual(expected, policy_info) self.assertEqual(expected, policy_info)
def test_ipsec_policy_info(self): def test_ipsec_policy_info(self):
"""Ensure that IPSec policy info is mapped/created correctly.""" """Ensure that IPSec policy info is mapped/created correctly.
Note: That although the default for anti-replay-window-size on the
CSR is 64, we force it to disabled, for OpenStack use.
"""
expected = {u'policy-id': 333, expected = {u'policy-id': 333,
u'protection-suite': { u'protection-suite': {
u'esp-encryption': u'esp-aes', u'esp-encryption': u'esp-aes',
@ -290,7 +308,25 @@ class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
}, },
u'lifetime-sec': 3600, u'lifetime-sec': 3600,
u'pfs': u'group5', u'pfs': u'group5',
u'anti-replay-window-size': u'64'} u'anti-replay-window-size': u'disable'}
ipsec_policy_id = self.conn_info['cisco']['ipsec_policy_id']
policy_info = self.ipsec_conn.create_ipsec_policy_info(ipsec_policy_id,
self.conn_info)
self.assertEqual(expected, policy_info)
def test_ipsec_policy_info_different_encryption(self):
"""Create IPSec policy with different settings."""
self.conn_info['ipsec_policy']['transform_protocol'] = 'ah-esp'
self.conn_info['ipsec_policy']['encryption_algorithm'] = 'aes-192'
expected = {u'policy-id': 333,
u'protection-suite': {
u'esp-encryption': u'esp-192-aes',
u'esp-authentication': u'esp-sha-hmac',
u'ah': u'ah-sha-hmac'
},
u'lifetime-sec': 3600,
u'pfs': u'group5',
u'anti-replay-window-size': u'disable'}
ipsec_policy_id = self.conn_info['cisco']['ipsec_policy_id'] ipsec_policy_id = self.conn_info['cisco']['ipsec_policy_id']
policy_info = self.ipsec_conn.create_ipsec_policy_info(ipsec_policy_id, policy_info = self.ipsec_conn.create_ipsec_policy_info(ipsec_policy_id,
self.conn_info) self.conn_info)
@ -303,7 +339,8 @@ class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
'auth_algorithm': 'sha1', 'auth_algorithm': 'sha1',
'pfs': 'group14', 'pfs': 'group14',
'lifetime_units': 'seconds', 'lifetime_units': 'seconds',
'lifetime_value': 120} 'lifetime_value': 120,
'anti-replay-window-size': 'disable'}
expected = {u'policy-id': 333, expected = {u'policy-id': 333,
u'protection-suite': { u'protection-suite': {
u'esp-encryption': u'esp-3des', u'esp-encryption': u'esp-3des',
@ -311,7 +348,7 @@ class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
}, },
u'lifetime-sec': 120, u'lifetime-sec': 120,
u'pfs': u'group14', u'pfs': u'group14',
u'anti-replay-window-size': u'64'} u'anti-replay-window-size': u'disable'}
ipsec_policy_id = self.conn_info['cisco']['ipsec_policy_id'] ipsec_policy_id = self.conn_info['cisco']['ipsec_policy_id']
policy_info = self.ipsec_conn.create_ipsec_policy_info(ipsec_policy_id, policy_info = self.ipsec_conn.create_ipsec_policy_info(ipsec_policy_id,
self.conn_info) self.conn_info)
@ -1229,7 +1266,7 @@ class TestCiscoCsrIPsecDeviceDriverSyncStatuses(base.BaseTestCase):
Shows the case where all the connections are down, so that the Shows the case where all the connections are down, so that the
service should report as DOWN, as well. service should report as DOWN, as well.
""" """
# Simulated one service with two ACTIVE connections # Simulate one service with two ACTIVE connections
conn1_data = {u'id': u'1', u'status': constants.ACTIVE, conn1_data = {u'id': u'1', u'status': constants.ACTIVE,
u'admin_state_up': True, u'admin_state_up': True,
u'cisco': {u'site_conn_id': u'Tunnel1'}} u'cisco': {u'site_conn_id': u'Tunnel1'}}
@ -1275,7 +1312,7 @@ class TestCiscoCsrIPsecDeviceDriverSyncStatuses(base.BaseTestCase):
is deleted and the remaining connection is DOWN, the service will is deleted and the remaining connection is DOWN, the service will
indicate as DOWN. indicate as DOWN.
""" """
# Simulated one service with one connection up, one down # Simulate one service with one connection up, one down
conn1_data = {u'id': u'1', u'status': constants.ACTIVE, conn1_data = {u'id': u'1', u'status': constants.ACTIVE,
u'admin_state_up': True, u'admin_state_up': True,
u'cisco': {u'site_conn_id': u'Tunnel1'}} u'cisco': {u'site_conn_id': u'Tunnel1'}}
@ -1331,7 +1368,7 @@ class TestCiscoCsrIPsecDeviceDriverSyncStatuses(base.BaseTestCase):
When the service is admin down, all the connections will report When the service is admin down, all the connections will report
as DOWN. as DOWN.
""" """
# Simulated one service (admin down) with two ACTIVE connections # Simulate one service (admin down) with two ACTIVE connections
conn1_data = {u'id': u'1', u'status': constants.ACTIVE, conn1_data = {u'id': u'1', u'status': constants.ACTIVE,
u'admin_state_up': True, u'admin_state_up': True,
u'cisco': {u'site_conn_id': u'Tunnel1'}} u'cisco': {u'site_conn_id': u'Tunnel1'}}