Merge "Support enhancements to Cisco CSR VPN REST APIs"

This commit is contained in:
Jenkins 2014-04-17 19:25:32 +00:00 committed by Gerrit Code Review
commit 9a1af60ae0
5 changed files with 170 additions and 109 deletions

View File

@ -567,9 +567,8 @@ class CiscoCsrIPSecConnection(object):
# encryption_algorithm -> encryption
'3des': u'3des',
'aes-128': u'aes',
# TODO(pcm) update these 2 once CSR updated
'aes-192': u'aes',
'aes-256': u'aes',
'aes-192': u'aes192',
'aes-256': u'aes256',
# pfs -> dhGroup
'group2': 2,
'group5': 5,
@ -584,9 +583,8 @@ class CiscoCsrIPSecConnection(object):
# encryption_algorithm -> esp-encryption
'3des': u'esp-3des',
'aes-128': u'esp-aes',
# TODO(pcm) update these 2 once CSR updated
'aes-192': u'esp-aes',
'aes-256': u'esp-aes',
'aes-192': u'esp-192-aes',
'aes-256': u'esp-256-aes',
# pfs -> pfs
'group2': u'group2',
'group5': u'group5',
@ -664,8 +662,7 @@ class CiscoCsrIPSecConnection(object):
u'esp-authentication': auth_algorithm},
u'lifetime-sec': lifetime,
u'pfs': group,
# TODO(pcm): Remove when CSR fixes 'Disable'
u'anti-replay-window-size': u'64'}
u'anti-replay-window-size': u'disable'}
if transform_protocol:
settings[u'protection-suite'][u'ah'] = transform_protocol
return settings

View File

@ -627,7 +627,7 @@ class IPsecDriver(device_drivers.DeviceDriver):
def update_downed_connections(self, process_id, new_status):
"""Update info to be reported, if connections just went down.
If there is no longer any information for a connection (because it
If there is no longer any information for a connection, because it
has been removed (e.g. due to an admin down of VPN service or IPSec
connection), but there was previous status information for the
connection, mark the connection as down for reporting purposes.

View File

@ -28,11 +28,6 @@ from neutron.openstack.common import log as logging
# now, uncomment and include httmock source to UT
from neutron.tests.unit.services.vpn.device_drivers import httmock
# TODO(pcm) Remove, once verified these have been fixed
FIXED_CSCum35484 = False
FIXED_CSCul82396 = False
FIXED_CSCum10324 = False
LOG = logging.getLogger(__name__)
@ -143,7 +138,7 @@ def expired_request(url, request):
def normal_get(url, request):
if request.method != 'GET':
return
LOG.debug("DEBUG: GET mock for %s", url)
LOG.debug("GET mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
if 'global/host-name' in url.path:
@ -174,7 +169,7 @@ def normal_get(url, request):
u'priority-id': u'2',
u'version': u'v1',
u'local-auth-method': u'pre-share',
u'encryption': u'aes',
u'encryption': u'aes256',
u'hash': u'sha',
u'dhGroup': 5,
u'lifetime': 3600}
@ -194,11 +189,11 @@ def normal_get(url, request):
u'mode': u'tunnel',
u'policy-id': u'%s' % ipsec_policy_id,
u'protection-suite': {
u'esp-encryption': u'esp-aes',
u'esp-encryption': u'esp-256-aes',
u'esp-authentication': u'esp-sha-hmac',
u'ah': u'ah-sha-hmac',
},
u'anti-replay-window-size': u'128',
u'anti-replay-window-size': u'Disable',
u'lifetime-sec': 120,
u'pfs': u'group5',
u'lifetime-kb': 4608000,
@ -246,7 +241,7 @@ def normal_get(url, request):
@filter_request(['get'], 'vpn-svc/ike/keyrings')
@httmock.urlmatch(netloc=r'localhost')
def get_fqdn(url, request):
LOG.debug("DEBUG: GET FQDN mock for %s", url)
LOG.debug("GET FQDN mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
content = {u'kind': u'object#ike-keyring',
@ -262,7 +257,7 @@ def get_fqdn(url, request):
@filter_request(['get'], 'vpn-svc/ipsec/policies/')
@httmock.urlmatch(netloc=r'localhost')
def get_no_ah(url, request):
LOG.debug("DEBUG: GET No AH mock for %s", url)
LOG.debug("GET No AH mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
ipsec_policy_id = url.path.split('/')[-1]
@ -285,7 +280,7 @@ def get_no_ah(url, request):
def get_defaults(url, request):
if request.method != 'GET':
return
LOG.debug("DEBUG: GET mock for %s", url)
LOG.debug("GET mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
if 'vpn-svc/ike/policies/2' in url.path:
@ -393,7 +388,7 @@ def get_local_ip(url, request):
def post(url, request):
if request.method != 'POST':
return
LOG.debug("DEBUG: POST mock for %s", url)
LOG.debug("POST mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
if 'interfaces/GigabitEthernet' in url.path:
@ -432,7 +427,7 @@ def post(url, request):
@filter_request(['post'], 'global/local-users')
@httmock.urlmatch(netloc=r'localhost')
def post_change_attempt(url, request):
LOG.debug("DEBUG: POST change value mock for %s", url)
LOG.debug("POST change value mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.NOT_FOUND,
@ -443,7 +438,7 @@ def post_change_attempt(url, request):
@httmock.urlmatch(netloc=r'localhost')
def post_duplicate(url, request):
LOG.debug("DEBUG: POST duplicate mock for %s", url)
LOG.debug("POST duplicate mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST,
@ -456,7 +451,7 @@ def post_duplicate(url, request):
@filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
def post_missing_ipsec_policy(url, request):
LOG.debug("DEBUG: POST missing ipsec policy mock for %s", url)
LOG.debug("POST missing ipsec policy mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@ -465,7 +460,7 @@ def post_missing_ipsec_policy(url, request):
@filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
def post_missing_ike_policy(url, request):
LOG.debug("DEBUG: POST missing ike policy mock for %s", url)
LOG.debug("POST missing ike policy mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@ -474,7 +469,7 @@ def post_missing_ike_policy(url, request):
@filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
def post_bad_ip(url, request):
LOG.debug("DEBUG: POST bad IP mock for %s", url)
LOG.debug("POST bad IP mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@ -483,7 +478,7 @@ def post_bad_ip(url, request):
@filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
def post_bad_mtu(url, request):
LOG.debug("DEBUG: POST bad mtu mock for %s", url)
LOG.debug("POST bad mtu mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@ -492,7 +487,16 @@ def post_bad_mtu(url, request):
@filter_request(['post'], 'vpn-svc/ipsec/policies')
@httmock.urlmatch(netloc=r'localhost')
def post_bad_lifetime(url, request):
LOG.debug("DEBUG: POST bad lifetime mock for %s", url)
LOG.debug("POST bad lifetime mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@filter_request(['post'], 'vpn-svc/ipsec/policies')
@httmock.urlmatch(netloc=r'localhost')
def post_bad_name(url, request):
LOG.debug("POST bad IPSec policy name for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@ -502,7 +506,7 @@ def post_bad_lifetime(url, request):
def put(url, request):
if request.method != 'PUT':
return
LOG.debug("DEBUG: PUT mock for %s", url)
LOG.debug("PUT mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource
@ -513,7 +517,7 @@ def put(url, request):
def delete(url, request):
if request.method != 'DELETE':
return
LOG.debug("DEBUG: DELETE mock for %s", url)
LOG.debug("DELETE mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource
@ -524,7 +528,7 @@ def delete(url, request):
def delete_unknown(url, request):
if request.method != 'DELETE':
return
LOG.debug("DEBUG: DELETE unknown mock for %s", url)
LOG.debug("DELETE unknown mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource
@ -538,7 +542,7 @@ def delete_unknown(url, request):
def delete_not_allowed(url, request):
if request.method != 'DELETE':
return
LOG.debug("DEBUG: DELETE not allowed mock for %s", url)
LOG.debug("DELETE not allowed mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource

View File

@ -19,7 +19,8 @@
import random
import httmock
# TODO(pcm) Uncomment when httmock is added to test requirements.
# import httmock
import requests
from neutron.openstack.common import log as logging
@ -28,9 +29,9 @@ from neutron.services.vpn.device_drivers import (
from neutron.tests import base
from neutron.tests.unit.services.vpn.device_drivers import (
cisco_csr_mock as csr_request)
# TODO(pcm) Remove once httmock is available. In the meantime, use temp
# copy of hhtmock source to run UT
# from neutron.tests.unit.services.vpn.device_drivers import httmock
# TODO(pcm) Remove once httmock is available. In the meantime, use
# temporary local copy of httmock source to run UT
from neutron.tests.unit.services.vpn.device_drivers import httmock
LOG = logging.getLogger(__name__)
@ -39,10 +40,7 @@ if True:
logging.CONF.set_override('debug', True)
logging.setup('neutron')
if csr_request.FIXED_CSCum35484:
dummy_uuid = '1eb4ee6b-0870-45a0-b554-7b69096'
else:
dummy_uuid = '1eb4ee6b-0870-45a0-b554-7b'
dummy_policy_id = 'dummy-ipsec-policy-id-name'
# Note: Helper functions to test reuse of IDs.
@ -62,10 +60,10 @@ class TestCsrLoginRestApi(base.BaseTestCase):
"""Test logging into CSR to obtain token-id."""
def setUp(self):
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrLoginRestApi, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
'stack', 'cisco')
self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
timeout)
def test_get_token(self):
"""Obtain the token and its expiration time."""
@ -102,10 +100,10 @@ class TestCsrGetRestApi(base.BaseTestCase):
"""Test CSR GET REST API."""
def setUp(self):
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrGetRestApi, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
'stack', 'cisco')
self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
timeout)
def test_valid_rest_gets(self):
"""Simple GET requests.
@ -129,10 +127,10 @@ class TestCsrPostRestApi(base.BaseTestCase):
"""Test CSR POST REST API."""
def setUp(self):
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrPostRestApi, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
'stack', 'cisco')
self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
timeout)
def test_post_requests(self):
"""Simple POST requests (repeatable).
@ -275,11 +273,11 @@ class TestCsrPutRestApi(base.BaseTestCase):
if self.csr.status != requests.codes.NO_CONTENT:
self.fail("Unable to restore I/F Ge1 description after test")
def setUp(self):
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
"""Prepare for PUT API tests."""
super(TestCsrPutRestApi, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
'stack', 'cisco')
self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
timeout)
self._save_resources()
self.addCleanup(self._restore_resources, 'stack', 'cisco')
@ -357,10 +355,10 @@ class TestCsrDeleteRestApi(base.BaseTestCase):
"""Test CSR DELETE REST API."""
def setUp(self):
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrDeleteRestApi, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
'stack', 'cisco')
self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
timeout)
def _make_dummy_user(self):
"""Create a user that will be later deleted."""
@ -409,10 +407,10 @@ class TestCsrRestApiFailures(base.BaseTestCase):
the result, without any error handling.
"""
def setUp(self):
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=0.1):
super(TestCsrRestApiFailures, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
'stack', 'cisco', timeout=0.1)
self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
timeout)
def test_request_for_non_existent_resource(self):
"""Negative test of non-existent resource on REST request."""
@ -455,10 +453,10 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
"""Test IKE policy create REST requests."""
def setUp(self):
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIkePolicyCreate, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
'stack', 'cisco')
self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
timeout)
def test_create_delete_ike_policy(self):
"""Create and then delete IKE policy."""
@ -466,7 +464,7 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
csr_request.normal_get):
policy_id = '2'
policy_info = {u'priority-id': u'%s' % policy_id,
u'encryption': u'aes',
u'encryption': u'aes256',
u'hash': u'sha',
u'dhGroup': 5,
u'lifetime': 3600}
@ -538,10 +536,10 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
"""Test IPSec policy create REST requests."""
def setUp(self):
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIPSecPolicyCreate, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
'stack', 'cisco')
self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
timeout)
def test_create_delete_ipsec_policy(self):
"""Create and then delete IPSec policy."""
@ -551,13 +549,13 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
policy_info = {
u'policy-id': u'%s' % policy_id,
u'protection-suite': {
u'esp-encryption': u'esp-aes',
u'esp-encryption': u'esp-256-aes',
u'esp-authentication': u'esp-sha-hmac',
u'ah': u'ah-sha-hmac',
},
u'lifetime-sec': 120,
u'pfs': u'group5',
u'anti-replay-window-size': u'128'
u'anti-replay-window-size': u'disable'
}
location = self.csr.create_ipsec_policy(policy_info)
self.assertEqual(requests.codes.CREATED, self.csr.status)
@ -570,6 +568,8 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
u'lifetime-kb': 4608000,
u'idle-time': None}
expected_policy.update(policy_info)
# CSR will respond with capitalized value
expected_policy[u'anti-replay-window-size'] = u'Disable'
self.assertEqual(expected_policy, content)
# Now delete and verify the IPSec policy is gone
with httmock.HTTMock(csr_request.token, csr_request.delete,
@ -609,19 +609,19 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
policy_info = {
u'policy-id': u'%s' % dummy_uuid,
u'policy-id': u'%s' % dummy_policy_id,
u'protection-suite': {
u'esp-encryption': u'esp-aes',
u'esp-encryption': u'esp-256-aes',
u'esp-authentication': u'esp-sha-hmac',
u'ah': u'ah-sha-hmac',
},
u'lifetime-sec': 120,
u'pfs': u'group5',
u'anti-replay-window-size': u'128'
u'anti-replay-window-size': u'disable'
}
location = self.csr.create_ipsec_policy(policy_info)
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('vpn-svc/ipsec/policies/%s' % dummy_uuid,
self.assertIn('vpn-svc/ipsec/policies/%s' % dummy_policy_id,
location)
# Check the hard-coded items that get set as well...
content = self.csr.get_request(location, full_url=True)
@ -631,6 +631,8 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
u'lifetime-kb': 4608000,
u'idle-time': None}
expected_policy.update(policy_info)
# CSR will respond with capitalized value
expected_policy[u'anti-replay-window-size'] = u'Disable'
self.assertEqual(expected_policy, content)
def test_create_ipsec_policy_without_ah(self):
@ -680,15 +682,26 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
self.csr.create_ipsec_policy(policy_info)
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
def test_create_ipsec_policy_with_invalid_name(self):
"""Failure test of creating IPSec policy with name too long."""
with httmock.HTTMock(csr_request.token, csr_request.post_bad_name,
csr_request.get_defaults):
policy_id = 'policy-name-is-too-long-32-chars'
policy_info = {
u'policy-id': u'%s' % policy_id,
}
self.csr.create_ipsec_policy(policy_info)
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
"""Test Pre-shared key (PSK) create REST requests."""
def setUp(self):
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestPreSharedKeyCreate, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
'stack', 'cisco')
self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
timeout)
def test_create_delete_pre_shared_key(self):
"""Create and then delete a keyring entry for pre-shared key."""
@ -777,10 +790,10 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
with a real CSR (as we can't mock out these pre-conditions.
"""
def setUp(self):
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIPSecConnectionCreate, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
'stack', 'cisco')
self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
timeout)
def _make_psk_for_test(self):
psk_id = generate_pre_shared_key_id()
@ -831,7 +844,7 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
},
u'lifetime-sec': 120,
u'pfs': u'group5',
u'anti-replay-window-size': u'64'
u'anti-replay-window-size': u'disable'
}
self.csr.create_ipsec_policy(policy_info)
if self.csr.status != requests.codes.CREATED:
@ -1136,16 +1149,35 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
"""Test IKE keepalive REST requests.
This is a global configuration that will apply to all VPN tunnels and
is used to specify Dead Peer Detection information. Currently, the API
supports DELETE API, but a bug has been created to remove the API and
add an indicator of when the capability is disabled.
Note: On the Cisco CSR, the IKE keepalive for v1 is a global configuration
that applies to all VPN tunnels to specify Dead Peer Detection information.
As a result, this REST API is not used in the OpenStack device driver, and
the keepalive will default to zero (disabled).
"""
def setUp(self):
def _save_dpd_info(self):
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
details = self.csr.get_request('vpn-svc/ike/keepalive')
if self.csr.status == requests.codes.OK:
self.dpd = details
self.addCleanup(self._restore_dpd_info)
elif self.csr.status != requests.codes.NOT_FOUND:
self.fail("Unable to save original DPD info")
def _restore_dpd_info(self):
with httmock.HTTMock(csr_request.token, csr_request.put):
payload = {'interval': self.dpd['interval'],
'retry': self.dpd['retry']}
self.csr.put_request('vpn-svc/ike/keepalive', payload=payload)
if self.csr.status != requests.codes.NO_CONTENT:
self.fail("Unable to restore DPD info after test")
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIkeKeepaliveCreate, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
'stack', 'cisco')
self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
timeout)
self._save_dpd_info()
self.csr.token = None
def test_configure_ike_keepalive(self):
"""Set IKE keep-alive (aka Dead Peer Detection) for the CSR."""
@ -1164,18 +1196,9 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
"""Disable IKE keep-alive (aka Dead Peer Detection) for the CSR."""
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.put, csr_request.get_not_configured):
if csr_request.FIXED_CSCum10324:
# TODO(pcm) Is this how to disable?
keepalive_info = {'interval': 0, 'retry': 4}
self.csr.configure_ike_keepalive(keepalive_info)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
else:
self.csr.delete_request('vnc-svc/ike/keepalive')
self.assertIn(self.csr.status,
(requests.codes.NO_CONTENT,
requests.codes.NOT_FOUND))
self.csr.get_request('vpn-svc/ike/keepalive')
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
keepalive_info = {'interval': 0, 'retry': 4}
self.csr.configure_ike_keepalive(keepalive_info)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
class TestCsrRestStaticRoute(base.BaseTestCase):
@ -1186,10 +1209,10 @@ class TestCsrRestStaticRoute(base.BaseTestCase):
a route for each of the peer CIDRs specified for the VPN connection.
"""
def setUp(self):
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestStaticRoute, self).setUp()
self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
'stack', 'cisco')
self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
timeout)
def test_create_delete_static_route(self):
"""Create and then delete a static route for the tunnel."""

View File

@ -259,6 +259,20 @@ class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
self.conn_info)
self.assertEqual(expected, policy_info)
def test_create_ike_policy_info_different_encryption(self):
"""Ensure that IKE policy info is mapped/created correctly."""
self.conn_info['ike_policy']['encryption_algorithm'] = 'aes-192'
expected = {u'priority-id': 222,
u'encryption': u'aes192',
u'hash': u'sha',
u'dhGroup': 5,
u'version': u'v1',
u'lifetime': 3600}
policy_id = self.conn_info['cisco']['ike_policy_id']
policy_info = self.ipsec_conn.create_ike_policy_info(policy_id,
self.conn_info)
self.assertEqual(expected, policy_info)
def test_create_ike_policy_info_non_defaults(self):
"""Ensure that IKE policy info with different values."""
self.conn_info['ike_policy'] = {
@ -270,7 +284,7 @@ class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
'lifetime_value': 60
}
expected = {u'priority-id': 222,
u'encryption': u'aes', # TODO(pcm): fix
u'encryption': u'aes256',
u'hash': u'sha',
u'dhGroup': 14,
u'version': u'v1',
@ -281,7 +295,11 @@ class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
self.assertEqual(expected, policy_info)
def test_ipsec_policy_info(self):
"""Ensure that IPSec policy info is mapped/created correctly."""
"""Ensure that IPSec policy info is mapped/created correctly.
Note: That although the default for anti-replay-window-size on the
CSR is 64, we force it to disabled, for OpenStack use.
"""
expected = {u'policy-id': 333,
u'protection-suite': {
u'esp-encryption': u'esp-aes',
@ -290,7 +308,25 @@ class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
},
u'lifetime-sec': 3600,
u'pfs': u'group5',
u'anti-replay-window-size': u'64'}
u'anti-replay-window-size': u'disable'}
ipsec_policy_id = self.conn_info['cisco']['ipsec_policy_id']
policy_info = self.ipsec_conn.create_ipsec_policy_info(ipsec_policy_id,
self.conn_info)
self.assertEqual(expected, policy_info)
def test_ipsec_policy_info_different_encryption(self):
"""Create IPSec policy with different settings."""
self.conn_info['ipsec_policy']['transform_protocol'] = 'ah-esp'
self.conn_info['ipsec_policy']['encryption_algorithm'] = 'aes-192'
expected = {u'policy-id': 333,
u'protection-suite': {
u'esp-encryption': u'esp-192-aes',
u'esp-authentication': u'esp-sha-hmac',
u'ah': u'ah-sha-hmac'
},
u'lifetime-sec': 3600,
u'pfs': u'group5',
u'anti-replay-window-size': u'disable'}
ipsec_policy_id = self.conn_info['cisco']['ipsec_policy_id']
policy_info = self.ipsec_conn.create_ipsec_policy_info(ipsec_policy_id,
self.conn_info)
@ -303,7 +339,8 @@ class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
'auth_algorithm': 'sha1',
'pfs': 'group14',
'lifetime_units': 'seconds',
'lifetime_value': 120}
'lifetime_value': 120,
'anti-replay-window-size': 'disable'}
expected = {u'policy-id': 333,
u'protection-suite': {
u'esp-encryption': u'esp-3des',
@ -311,7 +348,7 @@ class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
},
u'lifetime-sec': 120,
u'pfs': u'group14',
u'anti-replay-window-size': u'64'}
u'anti-replay-window-size': u'disable'}
ipsec_policy_id = self.conn_info['cisco']['ipsec_policy_id']
policy_info = self.ipsec_conn.create_ipsec_policy_info(ipsec_policy_id,
self.conn_info)
@ -1229,7 +1266,7 @@ class TestCiscoCsrIPsecDeviceDriverSyncStatuses(base.BaseTestCase):
Shows the case where all the connections are down, so that the
service should report as DOWN, as well.
"""
# Simulated one service with two ACTIVE connections
# Simulate one service with two ACTIVE connections
conn1_data = {u'id': u'1', u'status': constants.ACTIVE,
u'admin_state_up': True,
u'cisco': {u'site_conn_id': u'Tunnel1'}}
@ -1275,7 +1312,7 @@ class TestCiscoCsrIPsecDeviceDriverSyncStatuses(base.BaseTestCase):
is deleted and the remaining connection is DOWN, the service will
indicate as DOWN.
"""
# Simulated one service with one connection up, one down
# Simulate one service with one connection up, one down
conn1_data = {u'id': u'1', u'status': constants.ACTIVE,
u'admin_state_up': True,
u'cisco': {u'site_conn_id': u'Tunnel1'}}
@ -1331,7 +1368,7 @@ class TestCiscoCsrIPsecDeviceDriverSyncStatuses(base.BaseTestCase):
When the service is admin down, all the connections will report
as DOWN.
"""
# Simulated one service (admin down) with two ACTIVE connections
# Simulate one service (admin down) with two ACTIVE connections
conn1_data = {u'id': u'1', u'status': constants.ACTIVE,
u'admin_state_up': True,
u'cisco': {u'site_conn_id': u'Tunnel1'}}