Fix H302 violations in unit tests

H302 violation is reported by flake8 when importing separated objects from
modules instead of importing the whole module.
e.g.   from package.module import function
       function()
is changed to
       from package import module
       module.function()

Change-Id: Ic6975f39c755ded54149a9c01fcdcfaf78c596fc
Partial-Bug: #1291032
This commit is contained in:
Jakub Libosvar 2014-04-18 15:30:32 +02:00
parent 1e0f4389f3
commit b91495fe72
3 changed files with 175 additions and 114 deletions

View File

@ -23,8 +23,7 @@ import os
from oslo.config import cfg
import webob.exc
from neutron.api.extensions import ExtensionMiddleware
from neutron.api.extensions import PluginAwareExtensionManager
from neutron.api import extensions as api_extensions
from neutron.common import config
from neutron import context
from neutron.db import agentschedulers_db
@ -441,13 +440,13 @@ class VPNPluginDbTestCase(VPNTestMixin,
self._subnet_id = uuidutils.generate_uuid()
self.core_plugin = TestVpnCorePlugin
self.plugin = vpn_plugin.VPNPlugin()
ext_mgr = PluginAwareExtensionManager(
ext_mgr = api_extensions.PluginAwareExtensionManager(
extensions_path,
{constants.CORE: self.core_plugin,
constants.VPN: self.plugin}
)
app = config.load_paste_app('extensions_test_app')
self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
self.ext_api = api_extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
class TestVpnaas(VPNPluginDbTestCase):

View File

@ -18,7 +18,7 @@
import re
from functools import wraps
import functools
# import httmock
import requests
from requests import exceptions as r_exc
@ -26,10 +26,12 @@ from requests import exceptions as r_exc
from neutron.openstack.common import log as logging
# TODO(pcm) Remove once httmock package is added to test-requirements. For
# now, uncomment and include httmock source to UT
from neutron.tests.unit.services.vpn.device_drivers import httmock
from neutron.tests.unit.services.vpn import device_drivers
LOG = logging.getLogger(__name__)
httmock = device_drivers.httmock
def repeat(n):
"""Decorator to limit the number of times a handler is called.
@ -43,7 +45,7 @@ def repeat(n):
retries = n
def decorator(func):
@wraps(func)
@functools.wraps(func)
def wrapped(*args, **kwargs):
if static.retries == 0:
return None
@ -66,7 +68,7 @@ def filter_request(methods, resource):
target_resource = resource
def decorator(func):
@wraps(func)
@functools.wraps(func)
def wrapped(*args, **kwargs):
if (args[1].method in static.target_methods and
static.target_resource in args[0].path):

View File

@ -27,11 +27,9 @@ from neutron.openstack.common import log as logging
from neutron.services.vpn.device_drivers import (
cisco_csr_rest_client as csr_client)
from neutron.tests import base
from neutron.tests.unit.services.vpn.device_drivers import (
cisco_csr_mock as csr_request)
from neutron.tests.unit.services.vpn import device_drivers
# TODO(pcm) Remove once httmock is available. In the meantime, use
# temporary local copy of httmock source to run UT
from neutron.tests.unit.services.vpn.device_drivers import httmock
LOG = logging.getLogger(__name__)
@ -42,6 +40,8 @@ if True:
dummy_policy_id = 'dummy-ipsec-policy-id-name'
httmock = device_drivers.httmock
# Note: Helper functions to test reuse of IDs.
def generate_pre_shared_key_id():
@ -67,7 +67,7 @@ class TestCsrLoginRestApi(base.BaseTestCase):
def test_get_token(self):
"""Obtain the token and its expiration time."""
with httmock.HTTMock(csr_request.token):
with httmock.HTTMock(device_drivers.csr_request.token):
self.assertTrue(self.csr.authenticate())
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertIsNotNone(self.csr.token)
@ -75,7 +75,7 @@ class TestCsrLoginRestApi(base.BaseTestCase):
def test_unauthorized_token_request(self):
"""Negative test of invalid user/password."""
self.csr.auth = ('stack', 'bogus')
with httmock.HTTMock(csr_request.token_unauthorized):
with httmock.HTTMock(device_drivers.csr_request.token_unauthorized):
self.assertIsNone(self.csr.authenticate())
self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status)
@ -83,14 +83,14 @@ class TestCsrLoginRestApi(base.BaseTestCase):
"""Negative test of request to non-existent host."""
self.csr.host = 'wrong-host'
self.csr.token = 'Set by some previously successful access'
with httmock.HTTMock(csr_request.token_wrong_host):
with httmock.HTTMock(device_drivers.csr_request.token_wrong_host):
self.assertIsNone(self.csr.authenticate())
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
self.assertIsNone(self.csr.token)
def test_timeout_on_token_access(self):
"""Negative test of a timeout on a request."""
with httmock.HTTMock(csr_request.token_timeout):
with httmock.HTTMock(device_drivers.csr_request.token_timeout):
self.assertIsNone(self.csr.authenticate())
self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status)
self.assertIsNone(self.csr.token)
@ -112,7 +112,8 @@ class TestCsrGetRestApi(base.BaseTestCase):
that there are two interfaces on the CSR.
"""
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.normal_get):
content = self.csr.get_request('global/host-name')
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertIn('host-name', content)
@ -139,7 +140,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
that there are two interfaces (Ge1 and Ge2) on the CSR.
"""
with httmock.HTTMock(csr_request.token, csr_request.post):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
content = self.csr.post_request(
'interfaces/GigabitEthernet1/statistics',
payload={'action': 'clear'})
@ -153,7 +155,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
def test_post_with_location(self):
"""Create a user and verify that location returned."""
with httmock.HTTMock(csr_request.token, csr_request.post):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
location = self.csr.post_request(
'global/local-users',
payload={'username': 'test-user',
@ -164,7 +167,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
def test_post_missing_required_attribute(self):
"""Negative test of POST with missing mandatory info."""
with httmock.HTTMock(csr_request.token, csr_request.post):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
self.csr.post_request('global/local-users',
payload={'password': 'pass12345',
'privilege': 15})
@ -172,7 +176,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
def test_post_invalid_attribute(self):
"""Negative test of POST with invalid info."""
with httmock.HTTMock(csr_request.token, csr_request.post):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
self.csr.post_request('global/local-users',
payload={'username': 'test-user',
'password': 'pass12345',
@ -185,7 +190,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
Uses the lower level _do_request() API to just perform the POST and
obtain the response, without any error processing.
"""
with httmock.HTTMock(csr_request.token, csr_request.post):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
location = self.csr._do_request(
'POST',
'global/local-users',
@ -195,8 +201,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
more_headers=csr_client.HEADER_CONTENT_TYPE_JSON)
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('global/local-users/test-user', location)
with httmock.HTTMock(csr_request.token,
csr_request.post_change_attempt):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_change_attempt):
self.csr._do_request(
'POST',
'global/local-users',
@ -210,7 +216,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
def test_post_changing_value(self):
"""Negative test of a POST trying to change a value."""
with httmock.HTTMock(csr_request.token, csr_request.post):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
location = self.csr.post_request(
'global/local-users',
payload={'username': 'test-user',
@ -218,8 +225,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
'privilege': 15})
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('global/local-users/test-user', location)
with httmock.HTTMock(csr_request.token,
csr_request.post_change_attempt):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_change_attempt):
content = self.csr.post_request('global/local-users',
payload={'username': 'test-user',
'password': 'changed',
@ -235,7 +242,8 @@ class TestCsrPutRestApi(base.BaseTestCase):
"""Test CSR PUT REST API."""
def _save_resources(self):
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.normal_get):
details = self.csr.get_request('global/host-name')
if self.csr.status != requests.codes.OK:
self.fail("Unable to save original host name")
@ -258,7 +266,8 @@ class TestCsrPutRestApi(base.BaseTestCase):
self.csr.auth = (user, password)
self.csr.token = None
with httmock.HTTMock(csr_request.token, csr_request.put):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.put):
payload = {'host-name': self.original_host}
self.csr.put_request('global/host-name', payload=payload)
if self.csr.status != requests.codes.NO_CONTENT:
@ -288,8 +297,9 @@ class TestCsrPutRestApi(base.BaseTestCase):
that there are two interfaces on the CSR (Ge1 and Ge2).
"""
with httmock.HTTMock(csr_request.token, csr_request.put,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.put,
device_drivers.csr_request.normal_get):
payload = {'host-name': 'TestHost'}
content = self.csr.put_request('global/host-name',
payload=payload)
@ -308,8 +318,9 @@ class TestCsrPutRestApi(base.BaseTestCase):
This was a problem with an earlier version of the CSR image and is
here to prevent regression.
"""
with httmock.HTTMock(csr_request.token, csr_request.put,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.put,
device_drivers.csr_request.normal_get):
payload = {'description': u'Changed description',
'if-name': self.original_if['if-name'],
'ip-address': self.original_if['ip-address'],
@ -334,8 +345,9 @@ class TestCsrPutRestApi(base.BaseTestCase):
test setup to change the description to a non-empty string to
avoid failures in other tests.
"""
with httmock.HTTMock(csr_request.token, csr_request.put,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.put,
device_drivers.csr_request.normal_get):
payload = {'description': '',
'if-name': self.original_if['if-name'],
'ip-address': self.original_if['ip-address'],
@ -370,8 +382,9 @@ class TestCsrDeleteRestApi(base.BaseTestCase):
def test_delete_requests(self):
"""Simple DELETE requests (creating entry first)."""
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.delete):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.delete):
self._make_dummy_user()
self.csr.token = None # Force login
self.csr.delete_request('global/local-users/dummy')
@ -383,7 +396,8 @@ class TestCsrDeleteRestApi(base.BaseTestCase):
def test_delete_non_existent_entry(self):
"""Negative test of trying to delete a non-existent user."""
with httmock.HTTMock(csr_request.token, csr_request.delete_unknown):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete_unknown):
content = self.csr.delete_request('global/local-users/unknown')
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
expected = {u'error-code': -1,
@ -392,8 +406,8 @@ class TestCsrDeleteRestApi(base.BaseTestCase):
def test_delete_not_allowed(self):
"""Negative test of trying to delete the host-name."""
with httmock.HTTMock(csr_request.token,
csr_request.delete_not_allowed):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete_not_allowed):
self.csr.delete_request('global/host-name')
self.assertEqual(requests.codes.METHOD_NOT_ALLOWED,
self.csr.status)
@ -414,14 +428,16 @@ class TestCsrRestApiFailures(base.BaseTestCase):
def test_request_for_non_existent_resource(self):
"""Negative test of non-existent resource on REST request."""
with httmock.HTTMock(csr_request.token, csr_request.no_such_resource):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.no_such_resource):
self.csr.post_request('no/such/request')
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
# The result is HTTP 404 message, so no error content to check
def test_timeout_during_request(self):
"""Negative test of timeout during REST request."""
with httmock.HTTMock(csr_request.token, csr_request.timeout):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.timeout):
self.csr._do_request('GET', 'global/host-name')
self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status)
@ -433,8 +449,9 @@ class TestCsrRestApiFailures(base.BaseTestCase):
token by changing it.
"""
with httmock.HTTMock(csr_request.token, csr_request.expired_request,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.expired_request,
device_drivers.csr_request.normal_get):
self.csr.token = '123' # These are 44 characters, so won't match
content = self.csr._do_request('GET', 'global/host-name')
self.assertEqual(requests.codes.OK, self.csr.status)
@ -444,7 +461,7 @@ class TestCsrRestApiFailures(base.BaseTestCase):
def test_failed_to_obtain_token_for_request(self):
"""Negative test of unauthorized user for REST request."""
self.csr.auth = ('stack', 'bogus')
with httmock.HTTMock(csr_request.token_unauthorized):
with httmock.HTTMock(device_drivers.csr_request.token_unauthorized):
self.csr._do_request('GET', 'global/host-name')
self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status)
@ -460,8 +477,9 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
def test_create_delete_ike_policy(self):
"""Create and then delete IKE policy."""
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
policy_id = '2'
policy_info = {u'priority-id': u'%s' % policy_id,
u'encryption': u'aes256',
@ -480,8 +498,9 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
expected_policy.update(policy_info)
self.assertEqual(expected_policy, content)
# Now delete and verify the IKE policy is gone
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.no_such_resource):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete,
device_drivers.csr_request.no_such_resource):
self.csr.delete_ike_policy(policy_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(location, full_url=True)
@ -489,8 +508,9 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
def test_create_ike_policy_with_defaults(self):
"""Create IKE policy using defaults for all optional values."""
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.get_defaults):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.get_defaults):
policy_id = '2'
policy_info = {u'priority-id': u'%s' % policy_id}
location = self.csr.create_ike_policy(policy_info)
@ -512,8 +532,9 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
def test_create_duplicate_ike_policy(self):
"""Negative test of trying to create a duplicate IKE policy."""
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
policy_id = '2'
policy_info = {u'priority-id': u'%s' % policy_id,
u'encryption': u'aes',
@ -523,7 +544,8 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
location = self.csr.create_ike_policy(policy_info)
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('vpn-svc/ike/policies/%s' % policy_id, location)
with httmock.HTTMock(csr_request.token, csr_request.post_duplicate):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_duplicate):
location = self.csr.create_ike_policy(policy_info)
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
expected = {u'error-code': -1,
@ -543,8 +565,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
def test_create_delete_ipsec_policy(self):
"""Create and then delete IPSec policy."""
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
policy_id = '123'
policy_info = {
u'policy-id': u'%s' % policy_id,
@ -572,8 +595,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
expected_policy[u'anti-replay-window-size'] = u'Disable'
self.assertEqual(expected_policy, content)
# Now delete and verify the IPSec policy is gone
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.no_such_resource):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete,
device_drivers.csr_request.no_such_resource):
self.csr.delete_ipsec_policy(policy_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(location, full_url=True)
@ -581,8 +605,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
def test_create_ipsec_policy_with_defaults(self):
"""Create IPSec policy with default for all optional values."""
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.get_defaults):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.get_defaults):
policy_id = '123'
policy_info = {
u'policy-id': u'%s' % policy_id,
@ -606,8 +631,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
def test_create_ipsec_policy_with_uuid(self):
"""Create IPSec policy using UUID for id."""
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
policy_info = {
u'policy-id': u'%s' % dummy_policy_id,
u'protection-suite': {
@ -637,8 +663,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
def test_create_ipsec_policy_without_ah(self):
"""Create IPSec policy."""
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.get_no_ah):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.get_no_ah):
policy_id = '10'
policy_info = {
u'policy-id': u'%s' % policy_id,
@ -665,8 +692,8 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
def test_invalid_ipsec_policy_lifetime(self):
"""Failure test of IPSec policy with unsupported lifetime."""
with httmock.HTTMock(csr_request.token,
csr_request.post_bad_lifetime):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_bad_lifetime):
policy_id = '123'
policy_info = {
u'policy-id': u'%s' % policy_id,
@ -684,8 +711,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
def test_create_ipsec_policy_with_invalid_name(self):
"""Failure test of creating IPSec policy with name too long."""
with httmock.HTTMock(csr_request.token, csr_request.post_bad_name,
csr_request.get_defaults):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_bad_name,
device_drivers.csr_request.get_defaults):
policy_id = 'policy-name-is-too-long-32-chars'
policy_info = {
u'policy-id': u'%s' % policy_id,
@ -705,8 +733,9 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
def test_create_delete_pre_shared_key(self):
"""Create and then delete a keyring entry for pre-shared key."""
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
psk_id = '5'
psk_info = {u'keyring-name': u'%s' % psk_id,
u'pre-shared-key-list': [
@ -727,8 +756,9 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
u'10.10.10.20 255.255.255.0')
self.assertEqual(expected_policy, content)
# Now delete and verify pre-shared key is gone
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.no_such_resource):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete,
device_drivers.csr_request.no_such_resource):
self.csr.delete_pre_shared_key(psk_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(location, full_url=True)
@ -736,8 +766,9 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
def test_create_pre_shared_key_with_fqdn_peer(self):
"""Create pre-shared key using FQDN for peer address."""
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.get_fqdn):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.get_fqdn):
psk_id = '5'
psk_info = {u'keyring-name': u'%s' % psk_id,
u'pre-shared-key-list': [
@ -757,8 +788,9 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
def test_create_pre_shared_key_with_duplicate_peer_address(self):
"""Negative test of creating a second pre-shared key with same peer."""
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
psk_id = '5'
psk_info = {u'keyring-name': u'%s' % psk_id,
u'pre-shared-key-list': [
@ -769,7 +801,8 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
location = self.csr.create_pre_shared_key(psk_info)
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('vpn-svc/ike/keyrings/%s' % psk_id, location)
with httmock.HTTMock(csr_request.token, csr_request.post_duplicate):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_duplicate):
psk_id = u'6'
another_psk_info = {u'keyring-name': psk_id,
u'pre-shared-key-list': [
@ -799,7 +832,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
psk_id = generate_pre_shared_key_id()
self._remove_resource_for_test(self.csr.delete_pre_shared_key,
psk_id)
with httmock.HTTMock(csr_request.token, csr_request.post):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
psk_info = {u'keyring-name': u'%d' % psk_id,
u'pre-shared-key-list': [
{u'key': u'super-secret',
@ -817,7 +851,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
policy_id = generate_ike_policy_id()
self._remove_resource_for_test(self.csr.delete_ike_policy,
policy_id)
with httmock.HTTMock(csr_request.token, csr_request.post):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
policy_info = {u'priority-id': u'%d' % policy_id,
u'encryption': u'aes',
u'hash': u'sha',
@ -834,7 +869,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
policy_id = generate_ipsec_policy_id()
self._remove_resource_for_test(self.csr.delete_ipsec_policy,
policy_id)
with httmock.HTTMock(csr_request.token, csr_request.post):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
policy_info = {
u'policy-id': u'%d' % policy_id,
u'protection-suite': {
@ -854,7 +890,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
return policy_id
def _remove_resource_for_test(self, delete_resource, resource_id):
with httmock.HTTMock(csr_request.token, csr_request.delete):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete):
delete_resource(resource_id)
def _prepare_for_site_conn_create(self, skip_psk=False, skip_ike=False,
@ -876,8 +913,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
def test_create_delete_ipsec_connection(self):
"""Create and then delete an IPSec connection."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -903,8 +941,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
expected_connection.update(connection_info)
self.assertEqual(expected_connection, content)
# Now delete and verify that site-to-site connection is gone
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.no_such_resource):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete,
device_drivers.csr_request.no_such_resource):
# Only delete connection. Cleanup will take care of prerequisites
self.csr.delete_ipsec_connection('Tunnel%d' % tunnel_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
@ -914,8 +953,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
def test_create_ipsec_connection_with_no_tunnel_subnet(self):
"""Create an IPSec connection without an IP address on tunnel."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.get_unnumbered):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.get_unnumbered):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -949,8 +989,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
skip_psk=True)
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -985,8 +1026,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
skip_ike=True)
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1016,7 +1058,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
"""Create IPSec connection in admin down state."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
tunnel = u'Tunnel%d' % tunnel_id
with httmock.HTTMock(csr_request.token, csr_request.post):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post):
connection_info = {
u'vpn-interface-name': tunnel,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1038,14 +1081,16 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
u'vpn-interface-name': tunnel,
u'line-protocol-state': u'down',
u'enabled': False}
with httmock.HTTMock(csr_request.put, csr_request.get_admin_down):
with httmock.HTTMock(device_drivers.csr_request.put,
device_drivers.csr_request.get_admin_down):
self.csr.set_ipsec_connection_state(tunnel, admin_up=False)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(state_uri, full_url=True)
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertEqual(expected_state, content)
with httmock.HTTMock(csr_request.put, csr_request.get_admin_up):
with httmock.HTTMock(device_drivers.csr_request.put,
device_drivers.csr_request.get_admin_up):
self.csr.set_ipsec_connection_state(tunnel, admin_up=True)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(state_uri, full_url=True)
@ -1057,8 +1102,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
"""Negative test of connection create without IPSec policy."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
skip_ipsec=True)
with httmock.HTTMock(csr_request.token,
csr_request.post_missing_ipsec_policy):
with httmock.HTTMock(
device_drivers.csr_request.token,
device_drivers.csr_request.post_missing_ipsec_policy):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1073,7 +1119,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
def _determine_conflicting_ip(self):
with httmock.HTTMock(csr_request.token, csr_request.get_local_ip):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.get_local_ip):
details = self.csr.get_request('interfaces/GigabitEthernet3')
if self.csr.status != requests.codes.OK:
self.fail("Unable to obtain interface GigabitEthernet3's IP")
@ -1091,7 +1138,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
conflicting_ip = self._determine_conflicting_ip()
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
with httmock.HTTMock(csr_request.token, csr_request.post_bad_ip):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_bad_ip):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1108,8 +1156,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
def test_create_ipsec_connection_with_max_mtu(self):
"""Create an IPSec connection with max MTU value."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.get_mtu):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.get_mtu):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1137,7 +1186,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
def test_create_ipsec_connection_with_bad_mtu(self):
"""Negative test of connection create with unsupported MTU value."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
with httmock.HTTMock(csr_request.token, csr_request.post_bad_mtu):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post_bad_mtu):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1154,7 +1204,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
def test_status_when_no_tunnels_exist(self):
"""Get status, when there are no tunnels."""
with httmock.HTTMock(csr_request.token, csr_request.get_none):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.get_none):
tunnels = self.csr.read_tunnel_statuses()
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertEqual([], tunnels)
@ -1164,8 +1215,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
# Create the IPsec site-to-site connection first
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
tunnel_id = 123 # Must hard code to work with mock
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
connection_info = {
u'vpn-interface-name': u'Tunnel123',
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
@ -1180,7 +1232,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id,
location)
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.normal_get):
tunnels = self.csr.read_tunnel_statuses()
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertEqual([(u'Tunnel123', u'DOWN-NEGOTIATING'), ], tunnels)
@ -1197,7 +1250,8 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
"""
def _save_dpd_info(self):
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.normal_get):
details = self.csr.get_request('vpn-svc/ike/keepalive')
if self.csr.status == requests.codes.OK:
self.dpd = details
@ -1206,7 +1260,8 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
self.fail("Unable to save original DPD info")
def _restore_dpd_info(self):
with httmock.HTTMock(csr_request.token, csr_request.put):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.put):
payload = {'interval': self.dpd['interval'],
'retry': self.dpd['retry']}
self.csr.put_request('vpn-svc/ike/keepalive', payload=payload)
@ -1222,8 +1277,9 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
def test_configure_ike_keepalive(self):
"""Set IKE keep-alive (aka Dead Peer Detection) for the CSR."""
with httmock.HTTMock(csr_request.token, csr_request.put,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.put,
device_drivers.csr_request.normal_get):
keepalive_info = {'interval': 60, 'retry': 4}
self.csr.configure_ike_keepalive(keepalive_info)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
@ -1235,8 +1291,10 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
def test_disable_ike_keepalive(self):
"""Disable IKE keep-alive (aka Dead Peer Detection) for the CSR."""
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.put, csr_request.get_not_configured):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete,
device_drivers.csr_request.put,
device_drivers.csr_request.get_not_configured):
keepalive_info = {'interval': 0, 'retry': 4}
self.csr.configure_ike_keepalive(keepalive_info)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
@ -1260,8 +1318,9 @@ class TestCsrRestStaticRoute(base.BaseTestCase):
cidr = u'10.1.0.0/24'
interface = u'GigabitEthernet1'
expected_id = '10.1.0.0_24_GigabitEthernet1'
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.post,
device_drivers.csr_request.normal_get):
route_info = {u'destination-network': cidr,
u'outgoing-interface': interface}
location = self.csr.create_static_route(route_info)
@ -1277,8 +1336,9 @@ class TestCsrRestStaticRoute(base.BaseTestCase):
expected_route.update(route_info)
self.assertEqual(expected_route, content)
# Now delete and verify that static route is gone
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.no_such_resource):
with httmock.HTTMock(device_drivers.csr_request.token,
device_drivers.csr_request.delete,
device_drivers.csr_request.no_such_resource):
route_id = csr_client.make_route_id(cidr, interface)
self.csr.delete_static_route(route_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)