VPNaaS REST Client UT Broken

When the H302 work was done, it broke two modules used for UT of
Cisco CSR VPNaaS REST client. Since the UT is currently disabled
(because the httmock library was not approved for test-requirements),
the error was not detected.

This fixes the files, so that they can still be used manually (by
locally importing httmock or its source file, and renaming the
module to remove the "_" prefix).

In the long term, the UT needs to be reworked to use httpretty,
which uses a register based mechanism, instead of a context
manager based mechanism, so this will take some effort. In addition.
there is discussion of switching to a new mock-request package,
instead of httpretty.

Change-Id: I8badc249ad021fdbdb2367b5416c72435ed58994
Closes-Bug: 1335977
Closes-Bug: 1326785
This commit is contained in:
Paul Michali 2014-06-30 18:31:42 -04:00
parent 6e877945c9
commit 7f0a8f09ab
2 changed files with 131 additions and 177 deletions

View File

@ -14,22 +14,30 @@
#
# @author: Paul Michali, Cisco Systems, Inc.
#TODO(pcm): Rename this file to remove the "no" prefix, once httmock is
# approved and added to requirements.txt
# Note: Currently, to run this under unit tests, you must make the
# following local changes:
#
# - Rename this file (or symlink it) so that the 'no' prefix is removed.
# - Include the httmock package in test_requirements or add the httmock.py
# source module to this directory and import (as commented out below).
#
# TODO(pcm): Rework this module to use the httpretty package and rename.
import random
# TODO(pcm) Uncomment when httmock is added to test requirements.
# import httmock
try:
import httmock
except (NameError, ImportError):
exit()
import requests
from neutron.openstack.common import log as logging
from neutron.services.vpn.device_drivers import (
cisco_csr_rest_client as csr_client)
from neutron.tests import base
from neutron.tests.unit.services.vpn import device_drivers
# TODO(pcm) Remove once httmock is available. In the meantime, use
# temporary local copy of httmock source to run UT
from neutron.tests.unit.services.vpn.device_drivers import (
cisco_csr_mock as csr_request)
# from neutron.tests.unit.services.vpn.device_drivers import httmock
LOG = logging.getLogger(__name__)
@ -40,8 +48,6 @@ if True:
dummy_policy_id = 'dummy-ipsec-policy-id-name'
httmock = device_drivers.httmock
# Note: Helper functions to test reuse of IDs.
def generate_pre_shared_key_id():
@ -67,7 +73,7 @@ class TestCsrLoginRestApi(base.BaseTestCase):
def test_get_token(self):
"""Obtain the token and its expiration time."""
with httmock.HTTMock(device_drivers.csr_request.token):
with httmock.HTTMock(csr_request.token):
self.assertTrue(self.csr.authenticate())
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertIsNotNone(self.csr.token)
@ -75,7 +81,7 @@ class TestCsrLoginRestApi(base.BaseTestCase):
def test_unauthorized_token_request(self):
"""Negative test of invalid user/password."""
self.csr.auth = ('stack', 'bogus')
with httmock.HTTMock(device_drivers.csr_request.token_unauthorized):
with httmock.HTTMock(csr_request.token_unauthorized):
self.assertIsNone(self.csr.authenticate())
self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status)
@ -83,14 +89,14 @@ class TestCsrLoginRestApi(base.BaseTestCase):
"""Negative test of request to non-existent host."""
self.csr.host = 'wrong-host'
self.csr.token = 'Set by some previously successful access'
with httmock.HTTMock(device_drivers.csr_request.token_wrong_host):
with httmock.HTTMock(csr_request.token_wrong_host):
self.assertIsNone(self.csr.authenticate())
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
self.assertIsNone(self.csr.token)
def test_timeout_on_token_access(self):
"""Negative test of a timeout on a request."""
with httmock.HTTMock(device_drivers.csr_request.token_timeout):
with httmock.HTTMock(csr_request.token_timeout):
self.assertIsNone(self.csr.authenticate())
self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status)
self.assertIsNone(self.csr.token)
@ -112,8 +118,7 @@ class TestCsrGetRestApi(base.BaseTestCase):
that there are two interfaces on the CSR.
"""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
content = self.csr.get_request('global/host-name')
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertIn('host-name', content)
@ -140,8 +145,7 @@ class TestCsrPostRestApi(base.BaseTestCase):
that there are two interfaces (Ge1 and Ge2) on the CSR.
"""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
with httmock.HTTMock(csr_request.token, csr_request.post):
content = self.csr.post_request(
'interfaces/GigabitEthernet1/statistics',
payload={'action': 'clear'})
@ -155,8 +159,7 @@ class TestCsrPostRestApi(base.BaseTestCase):
def test_post_with_location(self):
"""Create a user and verify that location returned."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
with httmock.HTTMock(csr_request.token, csr_request.post):
location = self.csr.post_request(
'global/local-users',
payload={'username': 'test-user',
@ -167,8 +170,7 @@ class TestCsrPostRestApi(base.BaseTestCase):
def test_post_missing_required_attribute(self):
"""Negative test of POST with missing mandatory info."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
with httmock.HTTMock(csr_request.token, csr_request.post):
self.csr.post_request('global/local-users',
payload={'password': 'pass12345',
'privilege': 15})
@ -176,8 +178,7 @@ class TestCsrPostRestApi(base.BaseTestCase):
def test_post_invalid_attribute(self):
"""Negative test of POST with invalid info."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
with httmock.HTTMock(csr_request.token, csr_request.post):
self.csr.post_request('global/local-users',
payload={'username': 'test-user',
'password': 'pass12345',
@ -190,8 +191,7 @@ class TestCsrPostRestApi(base.BaseTestCase):
Uses the lower level _do_request() API to just perform the POST and
obtain the response, without any error processing.
"""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
with httmock.HTTMock(csr_request.token, csr_request.post):
location = self.csr._do_request(
'POST',
'global/local-users',
@ -201,8 +201,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
more_headers=csr_client.HEADER_CONTENT_TYPE_JSON)
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('global/local-users/test-user', location)
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_change_attempt):
with httmock.HTTMock(csr_request.token,
csr_request.post_change_attempt):
self.csr._do_request(
'POST',
'global/local-users',
@ -216,8 +216,7 @@ class TestCsrPostRestApi(base.BaseTestCase):
def test_post_changing_value(self):
"""Negative test of a POST trying to change a value."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
with httmock.HTTMock(csr_request.token, csr_request.post):
location = self.csr.post_request(
'global/local-users',
payload={'username': 'test-user',
@ -225,8 +224,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
'privilege': 15})
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('global/local-users/test-user', location)
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_change_attempt):
with httmock.HTTMock(csr_request.token,
csr_request.post_change_attempt):
content = self.csr.post_request('global/local-users',
payload={'username': 'test-user',
'password': 'changed',
@ -242,8 +241,7 @@ class TestCsrPutRestApi(base.BaseTestCase):
"""Test CSR PUT REST API."""
def _save_resources(self):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
details = self.csr.get_request('global/host-name')
if self.csr.status != requests.codes.OK:
self.fail("Unable to save original host name")
@ -266,8 +264,7 @@ class TestCsrPutRestApi(base.BaseTestCase):
self.csr.auth = (user, password)
self.csr.token = None
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.put):
with httmock.HTTMock(csr_request.token, csr_request.put):
payload = {'host-name': self.original_host}
self.csr.put_request('global/host-name', payload=payload)
if self.csr.status != requests.codes.NO_CONTENT:
@ -297,9 +294,8 @@ class TestCsrPutRestApi(base.BaseTestCase):
that there are two interfaces on the CSR (Ge1 and Ge2).
"""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.put,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.put,
csr_request.normal_get):
payload = {'host-name': 'TestHost'}
content = self.csr.put_request('global/host-name',
payload=payload)
@ -318,9 +314,8 @@ class TestCsrPutRestApi(base.BaseTestCase):
This was a problem with an earlier version of the CSR image and is
here to prevent regression.
"""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.put,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.put,
csr_request.normal_get):
payload = {'description': u'Changed description',
'if-name': self.original_if['if-name'],
'ip-address': self.original_if['ip-address'],
@ -345,9 +340,8 @@ class TestCsrPutRestApi(base.BaseTestCase):
test setup to change the description to a non-empty string to
avoid failures in other tests.
"""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.put,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.put,
csr_request.normal_get):
payload = {'description': '',
'if-name': self.original_if['if-name'],
'ip-address': self.original_if['ip-address'],
@ -382,9 +376,8 @@ class TestCsrDeleteRestApi(base.BaseTestCase):
def test_delete_requests(self):
"""Simple DELETE requests (creating entry first)."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.delete):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.delete):
self._make_dummy_user()
self.csr.token = None # Force login
self.csr.delete_request('global/local-users/dummy')
@ -396,8 +389,7 @@ class TestCsrDeleteRestApi(base.BaseTestCase):
def test_delete_non_existent_entry(self):
"""Negative test of trying to delete a non-existent user."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete_unknown):
with httmock.HTTMock(csr_request.token, csr_request.delete_unknown):
content = self.csr.delete_request('global/local-users/unknown')
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
expected = {u'error-code': -1,
@ -406,8 +398,8 @@ class TestCsrDeleteRestApi(base.BaseTestCase):
def test_delete_not_allowed(self):
"""Negative test of trying to delete the host-name."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete_not_allowed):
with httmock.HTTMock(csr_request.token,
csr_request.delete_not_allowed):
self.csr.delete_request('global/host-name')
self.assertEqual(requests.codes.METHOD_NOT_ALLOWED,
self.csr.status)
@ -428,16 +420,14 @@ class TestCsrRestApiFailures(base.BaseTestCase):
def test_request_for_non_existent_resource(self):
"""Negative test of non-existent resource on REST request."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.no_such_resource):
with httmock.HTTMock(csr_request.token, csr_request.no_such_resource):
self.csr.post_request('no/such/request')
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
# The result is HTTP 404 message, so no error content to check
def test_timeout_during_request(self):
"""Negative test of timeout during REST request."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.timeout):
with httmock.HTTMock(csr_request.token, csr_request.timeout):
self.csr._do_request('GET', 'global/host-name')
self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status)
@ -449,9 +439,8 @@ class TestCsrRestApiFailures(base.BaseTestCase):
token by changing it.
"""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.expired_request,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.expired_request,
csr_request.normal_get):
self.csr.token = '123' # These are 44 characters, so won't match
content = self.csr._do_request('GET', 'global/host-name')
self.assertEqual(requests.codes.OK, self.csr.status)
@ -461,7 +450,7 @@ class TestCsrRestApiFailures(base.BaseTestCase):
def test_failed_to_obtain_token_for_request(self):
"""Negative test of unauthorized user for REST request."""
self.csr.auth = ('stack', 'bogus')
with httmock.HTTMock(device_drivers.csr_request.token_unauthorized):
with httmock.HTTMock(csr_request.token_unauthorized):
self.csr._do_request('GET', 'global/host-name')
self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status)
@ -477,9 +466,8 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
def test_create_delete_ike_policy(self):
"""Create and then delete IKE policy."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
policy_id = '2'
policy_info = {u'priority-id': u'%s' % policy_id,
u'encryption': u'aes256',
@ -498,9 +486,8 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
expected_policy.update(policy_info)
self.assertEqual(expected_policy, content)
# Now delete and verify the IKE policy is gone
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete,
device_drivers.csr_request.no_such_resource):
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.no_such_resource):
self.csr.delete_ike_policy(policy_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(location, full_url=True)
@ -508,9 +495,8 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
def test_create_ike_policy_with_defaults(self):
"""Create IKE policy using defaults for all optional values."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.get_defaults):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.get_defaults):
policy_id = '2'
policy_info = {u'priority-id': u'%s' % policy_id}
location = self.csr.create_ike_policy(policy_info)
@ -532,9 +518,8 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
def test_create_duplicate_ike_policy(self):
"""Negative test of trying to create a duplicate IKE policy."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
policy_id = '2'
policy_info = {u'priority-id': u'%s' % policy_id,
u'encryption': u'aes',
@ -544,8 +529,7 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
location = self.csr.create_ike_policy(policy_info)
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('vpn-svc/ike/policies/%s' % policy_id, location)
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_duplicate):
with httmock.HTTMock(csr_request.token, csr_request.post_duplicate):
location = self.csr.create_ike_policy(policy_info)
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
expected = {u'error-code': -1,
@ -565,9 +549,8 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
def test_create_delete_ipsec_policy(self):
"""Create and then delete IPSec policy."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
policy_id = '123'
policy_info = {
u'policy-id': u'%s' % policy_id,
@ -595,9 +578,8 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
expected_policy[u'anti-replay-window-size'] = u'Disable'
self.assertEqual(expected_policy, content)
# Now delete and verify the IPSec policy is gone
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete,
device_drivers.csr_request.no_such_resource):
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.no_such_resource):
self.csr.delete_ipsec_policy(policy_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(location, full_url=True)
@ -605,9 +587,8 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
def test_create_ipsec_policy_with_defaults(self):
"""Create IPSec policy with default for all optional values."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.get_defaults):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.get_defaults):
policy_id = '123'
policy_info = {
u'policy-id': u'%s' % policy_id,
@ -631,9 +612,8 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
def test_create_ipsec_policy_with_uuid(self):
"""Create IPSec policy using UUID for id."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
policy_info = {
u'policy-id': u'%s' % dummy_policy_id,
u'protection-suite': {
@ -663,9 +643,8 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
def test_create_ipsec_policy_without_ah(self):
"""Create IPSec policy."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.get_no_ah):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.get_no_ah):
policy_id = '10'
policy_info = {
u'policy-id': u'%s' % policy_id,
@ -692,8 +671,7 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
def test_invalid_ipsec_policy_lifetime(self):
"""Failure test of IPSec policy with unsupported lifetime."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_bad_lifetime):
with httmock.HTTMock(csr_request.token, csr_request.post_bad_lifetime):
policy_id = '123'
policy_info = {
u'policy-id': u'%s' % policy_id,
@ -711,9 +689,8 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
def test_create_ipsec_policy_with_invalid_name(self):
"""Failure test of creating IPSec policy with name too long."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_bad_name,
device_drivers.csr_request.get_defaults):
with httmock.HTTMock(csr_request.token, csr_request.post_bad_name,
csr_request.get_defaults):
policy_id = 'policy-name-is-too-long-32-chars'
policy_info = {
u'policy-id': u'%s' % policy_id,
@ -733,9 +710,8 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
def test_create_delete_pre_shared_key(self):
"""Create and then delete a keyring entry for pre-shared key."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
psk_id = '5'
psk_info = {u'keyring-name': u'%s' % psk_id,
u'pre-shared-key-list': [
@ -756,9 +732,8 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
u'10.10.10.20 255.255.255.0')
self.assertEqual(expected_policy, content)
# Now delete and verify pre-shared key is gone
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete,
device_drivers.csr_request.no_such_resource):
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.no_such_resource):
self.csr.delete_pre_shared_key(psk_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(location, full_url=True)
@ -766,9 +741,8 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
def test_create_pre_shared_key_with_fqdn_peer(self):
"""Create pre-shared key using FQDN for peer address."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.get_fqdn):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.get_fqdn):
psk_id = '5'
psk_info = {u'keyring-name': u'%s' % psk_id,
u'pre-shared-key-list': [
@ -788,9 +762,8 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
def test_create_pre_shared_key_with_duplicate_peer_address(self):
"""Negative test of creating a second pre-shared key with same peer."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
psk_id = '5'
psk_info = {u'keyring-name': u'%s' % psk_id,
u'pre-shared-key-list': [
@ -801,8 +774,7 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
location = self.csr.create_pre_shared_key(psk_info)
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('vpn-svc/ike/keyrings/%s' % psk_id, location)
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_duplicate):
with httmock.HTTMock(csr_request.token, csr_request.post_duplicate):
psk_id = u'6'
another_psk_info = {u'keyring-name': psk_id,
u'pre-shared-key-list': [
@ -832,8 +804,7 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
psk_id = generate_pre_shared_key_id()
self._remove_resource_for_test(self.csr.delete_pre_shared_key,
psk_id)
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
with httmock.HTTMock(csr_request.token, csr_request.post):
psk_info = {u'keyring-name': u'%d' % psk_id,
u'pre-shared-key-list': [
{u'key': u'super-secret',
@ -851,8 +822,7 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
policy_id = generate_ike_policy_id()
self._remove_resource_for_test(self.csr.delete_ike_policy,
policy_id)
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
with httmock.HTTMock(csr_request.token, csr_request.post):
policy_info = {u'priority-id': u'%d' % policy_id,
u'encryption': u'aes',
u'hash': u'sha',
@ -869,8 +839,7 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
policy_id = generate_ipsec_policy_id()
self._remove_resource_for_test(self.csr.delete_ipsec_policy,
policy_id)
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
with httmock.HTTMock(csr_request.token, csr_request.post):
policy_info = {
u'policy-id': u'%d' % policy_id,
u'protection-suite': {
@ -890,8 +859,7 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
return policy_id
def _remove_resource_for_test(self, delete_resource, resource_id):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete):
with httmock.HTTMock(csr_request.token, csr_request.delete):
delete_resource(resource_id)
def _prepare_for_site_conn_create(self, skip_psk=False, skip_ike=False,
@ -913,9 +881,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
def test_create_delete_ipsec_connection(self):
"""Create and then delete an IPSec connection."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -941,9 +908,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
expected_connection.update(connection_info)
self.assertEqual(expected_connection, content)
# Now delete and verify that site-to-site connection is gone
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete,
device_drivers.csr_request.no_such_resource):
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.no_such_resource):
# Only delete connection. Cleanup will take care of prerequisites
self.csr.delete_ipsec_connection('Tunnel%d' % tunnel_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
@ -953,9 +919,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
def test_create_ipsec_connection_with_no_tunnel_subnet(self):
"""Create an IPSec connection without an IP address on tunnel."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.get_unnumbered):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.get_unnumbered):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -989,9 +954,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
skip_psk=True)
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1026,9 +990,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
skip_ike=True)
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1058,8 +1021,7 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
"""Create IPSec connection in admin down state."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
tunnel = u'Tunnel%d' % tunnel_id
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
with httmock.HTTMock(csr_request.token, csr_request.post):
connection_info = {
u'vpn-interface-name': tunnel,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1081,16 +1043,14 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
u'vpn-interface-name': tunnel,
u'line-protocol-state': u'down',
u'enabled': False}
with httmock.HTTMock(device_drivers.csr_request.put,
device_drivers.csr_request.get_admin_down):
with httmock.HTTMock(csr_request.put, csr_request.get_admin_down):
self.csr.set_ipsec_connection_state(tunnel, admin_up=False)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(state_uri, full_url=True)
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertEqual(expected_state, content)
with httmock.HTTMock(device_drivers.csr_request.put,
device_drivers.csr_request.get_admin_up):
with httmock.HTTMock(csr_request.put, csr_request.get_admin_up):
self.csr.set_ipsec_connection_state(tunnel, admin_up=True)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(state_uri, full_url=True)
@ -1103,8 +1063,7 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
skip_ipsec=True)
with httmock.HTTMock(
device_drivers.csr_request.token,
device_drivers.csr_request.post_missing_ipsec_policy):
csr_request.token, csr_request.post_missing_ipsec_policy):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1119,8 +1078,7 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
def _determine_conflicting_ip(self):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.get_local_ip):
with httmock.HTTMock(csr_request.token, csr_request.get_local_ip):
details = self.csr.get_request('interfaces/GigabitEthernet3')
if self.csr.status != requests.codes.OK:
self.fail("Unable to obtain interface GigabitEthernet3's IP")
@ -1138,8 +1096,7 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
conflicting_ip = self._determine_conflicting_ip()
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_bad_ip):
with httmock.HTTMock(csr_request.token, csr_request.post_bad_ip):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1156,9 +1113,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
def test_create_ipsec_connection_with_max_mtu(self):
"""Create an IPSec connection with max MTU value."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.get_mtu):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.get_mtu):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1186,8 +1142,7 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
def test_create_ipsec_connection_with_bad_mtu(self):
"""Negative test of connection create with unsupported MTU value."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_bad_mtu):
with httmock.HTTMock(csr_request.token, csr_request.post_bad_mtu):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1204,8 +1159,7 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
def test_status_when_no_tunnels_exist(self):
"""Get status, when there are no tunnels."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.get_none):
with httmock.HTTMock(csr_request.token, csr_request.get_none):
tunnels = self.csr.read_tunnel_statuses()
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertEqual([], tunnels)
@ -1215,9 +1169,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
# Create the IPsec site-to-site connection first
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
tunnel_id = 123 # Must hard code to work with mock
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
connection_info = {
u'vpn-interface-name': u'Tunnel123',
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1232,8 +1185,7 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id,
location)
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
tunnels = self.csr.read_tunnel_statuses()
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertEqual([(u'Tunnel123', u'DOWN-NEGOTIATING'), ], tunnels)
@ -1250,8 +1202,7 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
"""
def _save_dpd_info(self):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
details = self.csr.get_request('vpn-svc/ike/keepalive')
if self.csr.status == requests.codes.OK:
self.dpd = details
@ -1260,8 +1211,7 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
self.fail("Unable to save original DPD info")
def _restore_dpd_info(self):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.put):
with httmock.HTTMock(csr_request.token, csr_request.put):
payload = {'interval': self.dpd['interval'],
'retry': self.dpd['retry']}
self.csr.put_request('vpn-svc/ike/keepalive', payload=payload)
@ -1277,9 +1227,8 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
def test_configure_ike_keepalive(self):
"""Set IKE keep-alive (aka Dead Peer Detection) for the CSR."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.put,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.put,
csr_request.normal_get):
keepalive_info = {'interval': 60, 'retry': 4}
self.csr.configure_ike_keepalive(keepalive_info)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
@ -1291,10 +1240,8 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
def test_disable_ike_keepalive(self):
"""Disable IKE keep-alive (aka Dead Peer Detection) for the CSR."""
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete,
device_drivers.csr_request.put,
device_drivers.csr_request.get_not_configured):
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.put, csr_request.get_not_configured):
keepalive_info = {'interval': 0, 'retry': 4}
self.csr.configure_ike_keepalive(keepalive_info)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
@ -1318,9 +1265,8 @@ class TestCsrRestStaticRoute(base.BaseTestCase):
cidr = u'10.1.0.0/24'
interface = u'GigabitEthernet1'
expected_id = '10.1.0.0_24_GigabitEthernet1'
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
route_info = {u'destination-network': cidr,
u'outgoing-interface': interface}
location = self.csr.create_static_route(route_info)
@ -1336,9 +1282,8 @@ class TestCsrRestStaticRoute(base.BaseTestCase):
expected_route.update(route_info)
self.assertEqual(expected_route, content)
# Now delete and verify that static route is gone
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete,
device_drivers.csr_request.no_such_resource):
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.no_such_resource):
route_id = csr_client.make_route_id(cidr, interface)
self.csr.delete_static_route(route_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)

View File

@ -16,22 +16,31 @@
"""Mock REST requests to Cisco Cloud Services Router."""
# Note: Currently, this makes use of the httmock package, which it was decided
# not to be included in test_requirements. To use this module in testing, you
# must locally add httmock to test_requirements and import it, or place the
# httmock.py source file in this directory, and import that file (as commented
# out below).
#
# TODO(pcm): Rework this module to use the httpretty package, which uses a
# register mechanism, instead of a context manager.
import re
import functools
# import httmock
try:
import httmock
except (NameError, ImportError):
exit()
import requests
from requests import exceptions as r_exc
from neutron.openstack.common import log as logging
# TODO(pcm) Remove once httmock package is added to test-requirements. For
# now, uncomment and include httmock source to UT
from neutron.tests.unit.services.vpn import device_drivers
# from neutron.tests.unit.services.vpn.device_drivers import httmock
LOG = logging.getLogger(__name__)
httmock = device_drivers.httmock
def repeat(n):
"""Decorator to limit the number of times a handler is called.