Merge "Fix H302 violations in unit tests"
This commit is contained in:
commit
6e30f8dadf
@ -23,8 +23,7 @@ import os
|
|||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
import webob.exc
|
import webob.exc
|
||||||
|
|
||||||
from neutron.api.extensions import ExtensionMiddleware
|
from neutron.api import extensions as api_extensions
|
||||||
from neutron.api.extensions import PluginAwareExtensionManager
|
|
||||||
from neutron.common import config
|
from neutron.common import config
|
||||||
from neutron import context
|
from neutron import context
|
||||||
from neutron.db import agentschedulers_db
|
from neutron.db import agentschedulers_db
|
||||||
@ -441,13 +440,13 @@ class VPNPluginDbTestCase(VPNTestMixin,
|
|||||||
self._subnet_id = uuidutils.generate_uuid()
|
self._subnet_id = uuidutils.generate_uuid()
|
||||||
self.core_plugin = TestVpnCorePlugin
|
self.core_plugin = TestVpnCorePlugin
|
||||||
self.plugin = vpn_plugin.VPNPlugin()
|
self.plugin = vpn_plugin.VPNPlugin()
|
||||||
ext_mgr = PluginAwareExtensionManager(
|
ext_mgr = api_extensions.PluginAwareExtensionManager(
|
||||||
extensions_path,
|
extensions_path,
|
||||||
{constants.CORE: self.core_plugin,
|
{constants.CORE: self.core_plugin,
|
||||||
constants.VPN: self.plugin}
|
constants.VPN: self.plugin}
|
||||||
)
|
)
|
||||||
app = config.load_paste_app('extensions_test_app')
|
app = config.load_paste_app('extensions_test_app')
|
||||||
self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
|
self.ext_api = api_extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
|
||||||
|
|
||||||
|
|
||||||
class TestVpnaas(VPNPluginDbTestCase):
|
class TestVpnaas(VPNPluginDbTestCase):
|
||||||
|
@ -18,7 +18,7 @@
|
|||||||
|
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from functools import wraps
|
import functools
|
||||||
# import httmock
|
# import httmock
|
||||||
import requests
|
import requests
|
||||||
from requests import exceptions as r_exc
|
from requests import exceptions as r_exc
|
||||||
@ -26,10 +26,12 @@ from requests import exceptions as r_exc
|
|||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
# TODO(pcm) Remove once httmock package is added to test-requirements. For
|
# TODO(pcm) Remove once httmock package is added to test-requirements. For
|
||||||
# now, uncomment and include httmock source to UT
|
# now, uncomment and include httmock source to UT
|
||||||
from neutron.tests.unit.services.vpn.device_drivers import httmock
|
from neutron.tests.unit.services.vpn import device_drivers
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
httmock = device_drivers.httmock
|
||||||
|
|
||||||
|
|
||||||
def repeat(n):
|
def repeat(n):
|
||||||
"""Decorator to limit the number of times a handler is called.
|
"""Decorator to limit the number of times a handler is called.
|
||||||
@ -43,7 +45,7 @@ def repeat(n):
|
|||||||
retries = n
|
retries = n
|
||||||
|
|
||||||
def decorator(func):
|
def decorator(func):
|
||||||
@wraps(func)
|
@functools.wraps(func)
|
||||||
def wrapped(*args, **kwargs):
|
def wrapped(*args, **kwargs):
|
||||||
if static.retries == 0:
|
if static.retries == 0:
|
||||||
return None
|
return None
|
||||||
@ -66,7 +68,7 @@ def filter_request(methods, resource):
|
|||||||
target_resource = resource
|
target_resource = resource
|
||||||
|
|
||||||
def decorator(func):
|
def decorator(func):
|
||||||
@wraps(func)
|
@functools.wraps(func)
|
||||||
def wrapped(*args, **kwargs):
|
def wrapped(*args, **kwargs):
|
||||||
if (args[1].method in static.target_methods and
|
if (args[1].method in static.target_methods and
|
||||||
static.target_resource in args[0].path):
|
static.target_resource in args[0].path):
|
||||||
|
@ -27,11 +27,9 @@ from neutron.openstack.common import log as logging
|
|||||||
from neutron.services.vpn.device_drivers import (
|
from neutron.services.vpn.device_drivers import (
|
||||||
cisco_csr_rest_client as csr_client)
|
cisco_csr_rest_client as csr_client)
|
||||||
from neutron.tests import base
|
from neutron.tests import base
|
||||||
from neutron.tests.unit.services.vpn.device_drivers import (
|
from neutron.tests.unit.services.vpn import device_drivers
|
||||||
cisco_csr_mock as csr_request)
|
|
||||||
# TODO(pcm) Remove once httmock is available. In the meantime, use
|
# TODO(pcm) Remove once httmock is available. In the meantime, use
|
||||||
# temporary local copy of httmock source to run UT
|
# temporary local copy of httmock source to run UT
|
||||||
from neutron.tests.unit.services.vpn.device_drivers import httmock
|
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -42,6 +40,8 @@ if True:
|
|||||||
|
|
||||||
dummy_policy_id = 'dummy-ipsec-policy-id-name'
|
dummy_policy_id = 'dummy-ipsec-policy-id-name'
|
||||||
|
|
||||||
|
httmock = device_drivers.httmock
|
||||||
|
|
||||||
|
|
||||||
# Note: Helper functions to test reuse of IDs.
|
# Note: Helper functions to test reuse of IDs.
|
||||||
def generate_pre_shared_key_id():
|
def generate_pre_shared_key_id():
|
||||||
@ -67,7 +67,7 @@ class TestCsrLoginRestApi(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_get_token(self):
|
def test_get_token(self):
|
||||||
"""Obtain the token and its expiration time."""
|
"""Obtain the token and its expiration time."""
|
||||||
with httmock.HTTMock(csr_request.token):
|
with httmock.HTTMock(device_drivers.csr_request.token):
|
||||||
self.assertTrue(self.csr.authenticate())
|
self.assertTrue(self.csr.authenticate())
|
||||||
self.assertEqual(requests.codes.OK, self.csr.status)
|
self.assertEqual(requests.codes.OK, self.csr.status)
|
||||||
self.assertIsNotNone(self.csr.token)
|
self.assertIsNotNone(self.csr.token)
|
||||||
@ -75,7 +75,7 @@ class TestCsrLoginRestApi(base.BaseTestCase):
|
|||||||
def test_unauthorized_token_request(self):
|
def test_unauthorized_token_request(self):
|
||||||
"""Negative test of invalid user/password."""
|
"""Negative test of invalid user/password."""
|
||||||
self.csr.auth = ('stack', 'bogus')
|
self.csr.auth = ('stack', 'bogus')
|
||||||
with httmock.HTTMock(csr_request.token_unauthorized):
|
with httmock.HTTMock(device_drivers.csr_request.token_unauthorized):
|
||||||
self.assertIsNone(self.csr.authenticate())
|
self.assertIsNone(self.csr.authenticate())
|
||||||
self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status)
|
self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status)
|
||||||
|
|
||||||
@ -83,14 +83,14 @@ class TestCsrLoginRestApi(base.BaseTestCase):
|
|||||||
"""Negative test of request to non-existent host."""
|
"""Negative test of request to non-existent host."""
|
||||||
self.csr.host = 'wrong-host'
|
self.csr.host = 'wrong-host'
|
||||||
self.csr.token = 'Set by some previously successful access'
|
self.csr.token = 'Set by some previously successful access'
|
||||||
with httmock.HTTMock(csr_request.token_wrong_host):
|
with httmock.HTTMock(device_drivers.csr_request.token_wrong_host):
|
||||||
self.assertIsNone(self.csr.authenticate())
|
self.assertIsNone(self.csr.authenticate())
|
||||||
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
|
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
|
||||||
self.assertIsNone(self.csr.token)
|
self.assertIsNone(self.csr.token)
|
||||||
|
|
||||||
def test_timeout_on_token_access(self):
|
def test_timeout_on_token_access(self):
|
||||||
"""Negative test of a timeout on a request."""
|
"""Negative test of a timeout on a request."""
|
||||||
with httmock.HTTMock(csr_request.token_timeout):
|
with httmock.HTTMock(device_drivers.csr_request.token_timeout):
|
||||||
self.assertIsNone(self.csr.authenticate())
|
self.assertIsNone(self.csr.authenticate())
|
||||||
self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status)
|
self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status)
|
||||||
self.assertIsNone(self.csr.token)
|
self.assertIsNone(self.csr.token)
|
||||||
@ -112,7 +112,8 @@ class TestCsrGetRestApi(base.BaseTestCase):
|
|||||||
that there are two interfaces on the CSR.
|
that there are two interfaces on the CSR.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
content = self.csr.get_request('global/host-name')
|
content = self.csr.get_request('global/host-name')
|
||||||
self.assertEqual(requests.codes.OK, self.csr.status)
|
self.assertEqual(requests.codes.OK, self.csr.status)
|
||||||
self.assertIn('host-name', content)
|
self.assertIn('host-name', content)
|
||||||
@ -139,7 +140,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
|
|||||||
that there are two interfaces (Ge1 and Ge2) on the CSR.
|
that there are two interfaces (Ge1 and Ge2) on the CSR.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post):
|
||||||
content = self.csr.post_request(
|
content = self.csr.post_request(
|
||||||
'interfaces/GigabitEthernet1/statistics',
|
'interfaces/GigabitEthernet1/statistics',
|
||||||
payload={'action': 'clear'})
|
payload={'action': 'clear'})
|
||||||
@ -153,7 +155,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_post_with_location(self):
|
def test_post_with_location(self):
|
||||||
"""Create a user and verify that location returned."""
|
"""Create a user and verify that location returned."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post):
|
||||||
location = self.csr.post_request(
|
location = self.csr.post_request(
|
||||||
'global/local-users',
|
'global/local-users',
|
||||||
payload={'username': 'test-user',
|
payload={'username': 'test-user',
|
||||||
@ -164,7 +167,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_post_missing_required_attribute(self):
|
def test_post_missing_required_attribute(self):
|
||||||
"""Negative test of POST with missing mandatory info."""
|
"""Negative test of POST with missing mandatory info."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post):
|
||||||
self.csr.post_request('global/local-users',
|
self.csr.post_request('global/local-users',
|
||||||
payload={'password': 'pass12345',
|
payload={'password': 'pass12345',
|
||||||
'privilege': 15})
|
'privilege': 15})
|
||||||
@ -172,7 +176,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_post_invalid_attribute(self):
|
def test_post_invalid_attribute(self):
|
||||||
"""Negative test of POST with invalid info."""
|
"""Negative test of POST with invalid info."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post):
|
||||||
self.csr.post_request('global/local-users',
|
self.csr.post_request('global/local-users',
|
||||||
payload={'username': 'test-user',
|
payload={'username': 'test-user',
|
||||||
'password': 'pass12345',
|
'password': 'pass12345',
|
||||||
@ -185,7 +190,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
|
|||||||
Uses the lower level _do_request() API to just perform the POST and
|
Uses the lower level _do_request() API to just perform the POST and
|
||||||
obtain the response, without any error processing.
|
obtain the response, without any error processing.
|
||||||
"""
|
"""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post):
|
||||||
location = self.csr._do_request(
|
location = self.csr._do_request(
|
||||||
'POST',
|
'POST',
|
||||||
'global/local-users',
|
'global/local-users',
|
||||||
@ -195,8 +201,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
|
|||||||
more_headers=csr_client.HEADER_CONTENT_TYPE_JSON)
|
more_headers=csr_client.HEADER_CONTENT_TYPE_JSON)
|
||||||
self.assertEqual(requests.codes.CREATED, self.csr.status)
|
self.assertEqual(requests.codes.CREATED, self.csr.status)
|
||||||
self.assertIn('global/local-users/test-user', location)
|
self.assertIn('global/local-users/test-user', location)
|
||||||
with httmock.HTTMock(csr_request.token,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.post_change_attempt):
|
device_drivers.csr_request.post_change_attempt):
|
||||||
self.csr._do_request(
|
self.csr._do_request(
|
||||||
'POST',
|
'POST',
|
||||||
'global/local-users',
|
'global/local-users',
|
||||||
@ -210,7 +216,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_post_changing_value(self):
|
def test_post_changing_value(self):
|
||||||
"""Negative test of a POST trying to change a value."""
|
"""Negative test of a POST trying to change a value."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post):
|
||||||
location = self.csr.post_request(
|
location = self.csr.post_request(
|
||||||
'global/local-users',
|
'global/local-users',
|
||||||
payload={'username': 'test-user',
|
payload={'username': 'test-user',
|
||||||
@ -218,8 +225,8 @@ class TestCsrPostRestApi(base.BaseTestCase):
|
|||||||
'privilege': 15})
|
'privilege': 15})
|
||||||
self.assertEqual(requests.codes.CREATED, self.csr.status)
|
self.assertEqual(requests.codes.CREATED, self.csr.status)
|
||||||
self.assertIn('global/local-users/test-user', location)
|
self.assertIn('global/local-users/test-user', location)
|
||||||
with httmock.HTTMock(csr_request.token,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.post_change_attempt):
|
device_drivers.csr_request.post_change_attempt):
|
||||||
content = self.csr.post_request('global/local-users',
|
content = self.csr.post_request('global/local-users',
|
||||||
payload={'username': 'test-user',
|
payload={'username': 'test-user',
|
||||||
'password': 'changed',
|
'password': 'changed',
|
||||||
@ -235,7 +242,8 @@ class TestCsrPutRestApi(base.BaseTestCase):
|
|||||||
"""Test CSR PUT REST API."""
|
"""Test CSR PUT REST API."""
|
||||||
|
|
||||||
def _save_resources(self):
|
def _save_resources(self):
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
details = self.csr.get_request('global/host-name')
|
details = self.csr.get_request('global/host-name')
|
||||||
if self.csr.status != requests.codes.OK:
|
if self.csr.status != requests.codes.OK:
|
||||||
self.fail("Unable to save original host name")
|
self.fail("Unable to save original host name")
|
||||||
@ -258,7 +266,8 @@ class TestCsrPutRestApi(base.BaseTestCase):
|
|||||||
|
|
||||||
self.csr.auth = (user, password)
|
self.csr.auth = (user, password)
|
||||||
self.csr.token = None
|
self.csr.token = None
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.put):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.put):
|
||||||
payload = {'host-name': self.original_host}
|
payload = {'host-name': self.original_host}
|
||||||
self.csr.put_request('global/host-name', payload=payload)
|
self.csr.put_request('global/host-name', payload=payload)
|
||||||
if self.csr.status != requests.codes.NO_CONTENT:
|
if self.csr.status != requests.codes.NO_CONTENT:
|
||||||
@ -288,8 +297,9 @@ class TestCsrPutRestApi(base.BaseTestCase):
|
|||||||
that there are two interfaces on the CSR (Ge1 and Ge2).
|
that there are two interfaces on the CSR (Ge1 and Ge2).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.put,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.put,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
payload = {'host-name': 'TestHost'}
|
payload = {'host-name': 'TestHost'}
|
||||||
content = self.csr.put_request('global/host-name',
|
content = self.csr.put_request('global/host-name',
|
||||||
payload=payload)
|
payload=payload)
|
||||||
@ -308,8 +318,9 @@ class TestCsrPutRestApi(base.BaseTestCase):
|
|||||||
This was a problem with an earlier version of the CSR image and is
|
This was a problem with an earlier version of the CSR image and is
|
||||||
here to prevent regression.
|
here to prevent regression.
|
||||||
"""
|
"""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.put,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.put,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
payload = {'description': u'Changed description',
|
payload = {'description': u'Changed description',
|
||||||
'if-name': self.original_if['if-name'],
|
'if-name': self.original_if['if-name'],
|
||||||
'ip-address': self.original_if['ip-address'],
|
'ip-address': self.original_if['ip-address'],
|
||||||
@ -334,8 +345,9 @@ class TestCsrPutRestApi(base.BaseTestCase):
|
|||||||
test setup to change the description to a non-empty string to
|
test setup to change the description to a non-empty string to
|
||||||
avoid failures in other tests.
|
avoid failures in other tests.
|
||||||
"""
|
"""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.put,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.put,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
payload = {'description': '',
|
payload = {'description': '',
|
||||||
'if-name': self.original_if['if-name'],
|
'if-name': self.original_if['if-name'],
|
||||||
'ip-address': self.original_if['ip-address'],
|
'ip-address': self.original_if['ip-address'],
|
||||||
@ -370,8 +382,9 @@ class TestCsrDeleteRestApi(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_delete_requests(self):
|
def test_delete_requests(self):
|
||||||
"""Simple DELETE requests (creating entry first)."""
|
"""Simple DELETE requests (creating entry first)."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.delete):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.delete):
|
||||||
self._make_dummy_user()
|
self._make_dummy_user()
|
||||||
self.csr.token = None # Force login
|
self.csr.token = None # Force login
|
||||||
self.csr.delete_request('global/local-users/dummy')
|
self.csr.delete_request('global/local-users/dummy')
|
||||||
@ -383,7 +396,8 @@ class TestCsrDeleteRestApi(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_delete_non_existent_entry(self):
|
def test_delete_non_existent_entry(self):
|
||||||
"""Negative test of trying to delete a non-existent user."""
|
"""Negative test of trying to delete a non-existent user."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.delete_unknown):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.delete_unknown):
|
||||||
content = self.csr.delete_request('global/local-users/unknown')
|
content = self.csr.delete_request('global/local-users/unknown')
|
||||||
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
|
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
|
||||||
expected = {u'error-code': -1,
|
expected = {u'error-code': -1,
|
||||||
@ -392,8 +406,8 @@ class TestCsrDeleteRestApi(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_delete_not_allowed(self):
|
def test_delete_not_allowed(self):
|
||||||
"""Negative test of trying to delete the host-name."""
|
"""Negative test of trying to delete the host-name."""
|
||||||
with httmock.HTTMock(csr_request.token,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.delete_not_allowed):
|
device_drivers.csr_request.delete_not_allowed):
|
||||||
self.csr.delete_request('global/host-name')
|
self.csr.delete_request('global/host-name')
|
||||||
self.assertEqual(requests.codes.METHOD_NOT_ALLOWED,
|
self.assertEqual(requests.codes.METHOD_NOT_ALLOWED,
|
||||||
self.csr.status)
|
self.csr.status)
|
||||||
@ -414,14 +428,16 @@ class TestCsrRestApiFailures(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_request_for_non_existent_resource(self):
|
def test_request_for_non_existent_resource(self):
|
||||||
"""Negative test of non-existent resource on REST request."""
|
"""Negative test of non-existent resource on REST request."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.no_such_resource):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.no_such_resource):
|
||||||
self.csr.post_request('no/such/request')
|
self.csr.post_request('no/such/request')
|
||||||
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
|
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
|
||||||
# The result is HTTP 404 message, so no error content to check
|
# The result is HTTP 404 message, so no error content to check
|
||||||
|
|
||||||
def test_timeout_during_request(self):
|
def test_timeout_during_request(self):
|
||||||
"""Negative test of timeout during REST request."""
|
"""Negative test of timeout during REST request."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.timeout):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.timeout):
|
||||||
self.csr._do_request('GET', 'global/host-name')
|
self.csr._do_request('GET', 'global/host-name')
|
||||||
self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status)
|
self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status)
|
||||||
|
|
||||||
@ -433,8 +449,9 @@ class TestCsrRestApiFailures(base.BaseTestCase):
|
|||||||
token by changing it.
|
token by changing it.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.expired_request,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.expired_request,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
self.csr.token = '123' # These are 44 characters, so won't match
|
self.csr.token = '123' # These are 44 characters, so won't match
|
||||||
content = self.csr._do_request('GET', 'global/host-name')
|
content = self.csr._do_request('GET', 'global/host-name')
|
||||||
self.assertEqual(requests.codes.OK, self.csr.status)
|
self.assertEqual(requests.codes.OK, self.csr.status)
|
||||||
@ -444,7 +461,7 @@ class TestCsrRestApiFailures(base.BaseTestCase):
|
|||||||
def test_failed_to_obtain_token_for_request(self):
|
def test_failed_to_obtain_token_for_request(self):
|
||||||
"""Negative test of unauthorized user for REST request."""
|
"""Negative test of unauthorized user for REST request."""
|
||||||
self.csr.auth = ('stack', 'bogus')
|
self.csr.auth = ('stack', 'bogus')
|
||||||
with httmock.HTTMock(csr_request.token_unauthorized):
|
with httmock.HTTMock(device_drivers.csr_request.token_unauthorized):
|
||||||
self.csr._do_request('GET', 'global/host-name')
|
self.csr._do_request('GET', 'global/host-name')
|
||||||
self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status)
|
self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status)
|
||||||
|
|
||||||
@ -460,8 +477,9 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_create_delete_ike_policy(self):
|
def test_create_delete_ike_policy(self):
|
||||||
"""Create and then delete IKE policy."""
|
"""Create and then delete IKE policy."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
policy_id = '2'
|
policy_id = '2'
|
||||||
policy_info = {u'priority-id': u'%s' % policy_id,
|
policy_info = {u'priority-id': u'%s' % policy_id,
|
||||||
u'encryption': u'aes256',
|
u'encryption': u'aes256',
|
||||||
@ -480,8 +498,9 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
|
|||||||
expected_policy.update(policy_info)
|
expected_policy.update(policy_info)
|
||||||
self.assertEqual(expected_policy, content)
|
self.assertEqual(expected_policy, content)
|
||||||
# Now delete and verify the IKE policy is gone
|
# Now delete and verify the IKE policy is gone
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.delete,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.no_such_resource):
|
device_drivers.csr_request.delete,
|
||||||
|
device_drivers.csr_request.no_such_resource):
|
||||||
self.csr.delete_ike_policy(policy_id)
|
self.csr.delete_ike_policy(policy_id)
|
||||||
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
||||||
content = self.csr.get_request(location, full_url=True)
|
content = self.csr.get_request(location, full_url=True)
|
||||||
@ -489,8 +508,9 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_create_ike_policy_with_defaults(self):
|
def test_create_ike_policy_with_defaults(self):
|
||||||
"""Create IKE policy using defaults for all optional values."""
|
"""Create IKE policy using defaults for all optional values."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.get_defaults):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.get_defaults):
|
||||||
policy_id = '2'
|
policy_id = '2'
|
||||||
policy_info = {u'priority-id': u'%s' % policy_id}
|
policy_info = {u'priority-id': u'%s' % policy_id}
|
||||||
location = self.csr.create_ike_policy(policy_info)
|
location = self.csr.create_ike_policy(policy_info)
|
||||||
@ -512,8 +532,9 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_create_duplicate_ike_policy(self):
|
def test_create_duplicate_ike_policy(self):
|
||||||
"""Negative test of trying to create a duplicate IKE policy."""
|
"""Negative test of trying to create a duplicate IKE policy."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
policy_id = '2'
|
policy_id = '2'
|
||||||
policy_info = {u'priority-id': u'%s' % policy_id,
|
policy_info = {u'priority-id': u'%s' % policy_id,
|
||||||
u'encryption': u'aes',
|
u'encryption': u'aes',
|
||||||
@ -523,7 +544,8 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase):
|
|||||||
location = self.csr.create_ike_policy(policy_info)
|
location = self.csr.create_ike_policy(policy_info)
|
||||||
self.assertEqual(requests.codes.CREATED, self.csr.status)
|
self.assertEqual(requests.codes.CREATED, self.csr.status)
|
||||||
self.assertIn('vpn-svc/ike/policies/%s' % policy_id, location)
|
self.assertIn('vpn-svc/ike/policies/%s' % policy_id, location)
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post_duplicate):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post_duplicate):
|
||||||
location = self.csr.create_ike_policy(policy_info)
|
location = self.csr.create_ike_policy(policy_info)
|
||||||
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
|
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
|
||||||
expected = {u'error-code': -1,
|
expected = {u'error-code': -1,
|
||||||
@ -543,8 +565,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_create_delete_ipsec_policy(self):
|
def test_create_delete_ipsec_policy(self):
|
||||||
"""Create and then delete IPSec policy."""
|
"""Create and then delete IPSec policy."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
policy_id = '123'
|
policy_id = '123'
|
||||||
policy_info = {
|
policy_info = {
|
||||||
u'policy-id': u'%s' % policy_id,
|
u'policy-id': u'%s' % policy_id,
|
||||||
@ -572,8 +595,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
|
|||||||
expected_policy[u'anti-replay-window-size'] = u'Disable'
|
expected_policy[u'anti-replay-window-size'] = u'Disable'
|
||||||
self.assertEqual(expected_policy, content)
|
self.assertEqual(expected_policy, content)
|
||||||
# Now delete and verify the IPSec policy is gone
|
# Now delete and verify the IPSec policy is gone
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.delete,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.no_such_resource):
|
device_drivers.csr_request.delete,
|
||||||
|
device_drivers.csr_request.no_such_resource):
|
||||||
self.csr.delete_ipsec_policy(policy_id)
|
self.csr.delete_ipsec_policy(policy_id)
|
||||||
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
||||||
content = self.csr.get_request(location, full_url=True)
|
content = self.csr.get_request(location, full_url=True)
|
||||||
@ -581,8 +605,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_create_ipsec_policy_with_defaults(self):
|
def test_create_ipsec_policy_with_defaults(self):
|
||||||
"""Create IPSec policy with default for all optional values."""
|
"""Create IPSec policy with default for all optional values."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.get_defaults):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.get_defaults):
|
||||||
policy_id = '123'
|
policy_id = '123'
|
||||||
policy_info = {
|
policy_info = {
|
||||||
u'policy-id': u'%s' % policy_id,
|
u'policy-id': u'%s' % policy_id,
|
||||||
@ -606,8 +631,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_create_ipsec_policy_with_uuid(self):
|
def test_create_ipsec_policy_with_uuid(self):
|
||||||
"""Create IPSec policy using UUID for id."""
|
"""Create IPSec policy using UUID for id."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
policy_info = {
|
policy_info = {
|
||||||
u'policy-id': u'%s' % dummy_policy_id,
|
u'policy-id': u'%s' % dummy_policy_id,
|
||||||
u'protection-suite': {
|
u'protection-suite': {
|
||||||
@ -637,8 +663,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_create_ipsec_policy_without_ah(self):
|
def test_create_ipsec_policy_without_ah(self):
|
||||||
"""Create IPSec policy."""
|
"""Create IPSec policy."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.get_no_ah):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.get_no_ah):
|
||||||
policy_id = '10'
|
policy_id = '10'
|
||||||
policy_info = {
|
policy_info = {
|
||||||
u'policy-id': u'%s' % policy_id,
|
u'policy-id': u'%s' % policy_id,
|
||||||
@ -665,8 +692,8 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_invalid_ipsec_policy_lifetime(self):
|
def test_invalid_ipsec_policy_lifetime(self):
|
||||||
"""Failure test of IPSec policy with unsupported lifetime."""
|
"""Failure test of IPSec policy with unsupported lifetime."""
|
||||||
with httmock.HTTMock(csr_request.token,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.post_bad_lifetime):
|
device_drivers.csr_request.post_bad_lifetime):
|
||||||
policy_id = '123'
|
policy_id = '123'
|
||||||
policy_info = {
|
policy_info = {
|
||||||
u'policy-id': u'%s' % policy_id,
|
u'policy-id': u'%s' % policy_id,
|
||||||
@ -684,8 +711,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_create_ipsec_policy_with_invalid_name(self):
|
def test_create_ipsec_policy_with_invalid_name(self):
|
||||||
"""Failure test of creating IPSec policy with name too long."""
|
"""Failure test of creating IPSec policy with name too long."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post_bad_name,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.get_defaults):
|
device_drivers.csr_request.post_bad_name,
|
||||||
|
device_drivers.csr_request.get_defaults):
|
||||||
policy_id = 'policy-name-is-too-long-32-chars'
|
policy_id = 'policy-name-is-too-long-32-chars'
|
||||||
policy_info = {
|
policy_info = {
|
||||||
u'policy-id': u'%s' % policy_id,
|
u'policy-id': u'%s' % policy_id,
|
||||||
@ -705,8 +733,9 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_create_delete_pre_shared_key(self):
|
def test_create_delete_pre_shared_key(self):
|
||||||
"""Create and then delete a keyring entry for pre-shared key."""
|
"""Create and then delete a keyring entry for pre-shared key."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
psk_id = '5'
|
psk_id = '5'
|
||||||
psk_info = {u'keyring-name': u'%s' % psk_id,
|
psk_info = {u'keyring-name': u'%s' % psk_id,
|
||||||
u'pre-shared-key-list': [
|
u'pre-shared-key-list': [
|
||||||
@ -727,8 +756,9 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
|
|||||||
u'10.10.10.20 255.255.255.0')
|
u'10.10.10.20 255.255.255.0')
|
||||||
self.assertEqual(expected_policy, content)
|
self.assertEqual(expected_policy, content)
|
||||||
# Now delete and verify pre-shared key is gone
|
# Now delete and verify pre-shared key is gone
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.delete,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.no_such_resource):
|
device_drivers.csr_request.delete,
|
||||||
|
device_drivers.csr_request.no_such_resource):
|
||||||
self.csr.delete_pre_shared_key(psk_id)
|
self.csr.delete_pre_shared_key(psk_id)
|
||||||
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
||||||
content = self.csr.get_request(location, full_url=True)
|
content = self.csr.get_request(location, full_url=True)
|
||||||
@ -736,8 +766,9 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_create_pre_shared_key_with_fqdn_peer(self):
|
def test_create_pre_shared_key_with_fqdn_peer(self):
|
||||||
"""Create pre-shared key using FQDN for peer address."""
|
"""Create pre-shared key using FQDN for peer address."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.get_fqdn):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.get_fqdn):
|
||||||
psk_id = '5'
|
psk_id = '5'
|
||||||
psk_info = {u'keyring-name': u'%s' % psk_id,
|
psk_info = {u'keyring-name': u'%s' % psk_id,
|
||||||
u'pre-shared-key-list': [
|
u'pre-shared-key-list': [
|
||||||
@ -757,8 +788,9 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_create_pre_shared_key_with_duplicate_peer_address(self):
|
def test_create_pre_shared_key_with_duplicate_peer_address(self):
|
||||||
"""Negative test of creating a second pre-shared key with same peer."""
|
"""Negative test of creating a second pre-shared key with same peer."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
psk_id = '5'
|
psk_id = '5'
|
||||||
psk_info = {u'keyring-name': u'%s' % psk_id,
|
psk_info = {u'keyring-name': u'%s' % psk_id,
|
||||||
u'pre-shared-key-list': [
|
u'pre-shared-key-list': [
|
||||||
@ -769,7 +801,8 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
|
|||||||
location = self.csr.create_pre_shared_key(psk_info)
|
location = self.csr.create_pre_shared_key(psk_info)
|
||||||
self.assertEqual(requests.codes.CREATED, self.csr.status)
|
self.assertEqual(requests.codes.CREATED, self.csr.status)
|
||||||
self.assertIn('vpn-svc/ike/keyrings/%s' % psk_id, location)
|
self.assertIn('vpn-svc/ike/keyrings/%s' % psk_id, location)
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post_duplicate):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post_duplicate):
|
||||||
psk_id = u'6'
|
psk_id = u'6'
|
||||||
another_psk_info = {u'keyring-name': psk_id,
|
another_psk_info = {u'keyring-name': psk_id,
|
||||||
u'pre-shared-key-list': [
|
u'pre-shared-key-list': [
|
||||||
@ -799,7 +832,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
psk_id = generate_pre_shared_key_id()
|
psk_id = generate_pre_shared_key_id()
|
||||||
self._remove_resource_for_test(self.csr.delete_pre_shared_key,
|
self._remove_resource_for_test(self.csr.delete_pre_shared_key,
|
||||||
psk_id)
|
psk_id)
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post):
|
||||||
psk_info = {u'keyring-name': u'%d' % psk_id,
|
psk_info = {u'keyring-name': u'%d' % psk_id,
|
||||||
u'pre-shared-key-list': [
|
u'pre-shared-key-list': [
|
||||||
{u'key': u'super-secret',
|
{u'key': u'super-secret',
|
||||||
@ -817,7 +851,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
policy_id = generate_ike_policy_id()
|
policy_id = generate_ike_policy_id()
|
||||||
self._remove_resource_for_test(self.csr.delete_ike_policy,
|
self._remove_resource_for_test(self.csr.delete_ike_policy,
|
||||||
policy_id)
|
policy_id)
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post):
|
||||||
policy_info = {u'priority-id': u'%d' % policy_id,
|
policy_info = {u'priority-id': u'%d' % policy_id,
|
||||||
u'encryption': u'aes',
|
u'encryption': u'aes',
|
||||||
u'hash': u'sha',
|
u'hash': u'sha',
|
||||||
@ -834,7 +869,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
policy_id = generate_ipsec_policy_id()
|
policy_id = generate_ipsec_policy_id()
|
||||||
self._remove_resource_for_test(self.csr.delete_ipsec_policy,
|
self._remove_resource_for_test(self.csr.delete_ipsec_policy,
|
||||||
policy_id)
|
policy_id)
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post):
|
||||||
policy_info = {
|
policy_info = {
|
||||||
u'policy-id': u'%d' % policy_id,
|
u'policy-id': u'%d' % policy_id,
|
||||||
u'protection-suite': {
|
u'protection-suite': {
|
||||||
@ -854,7 +890,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
return policy_id
|
return policy_id
|
||||||
|
|
||||||
def _remove_resource_for_test(self, delete_resource, resource_id):
|
def _remove_resource_for_test(self, delete_resource, resource_id):
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.delete):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.delete):
|
||||||
delete_resource(resource_id)
|
delete_resource(resource_id)
|
||||||
|
|
||||||
def _prepare_for_site_conn_create(self, skip_psk=False, skip_ike=False,
|
def _prepare_for_site_conn_create(self, skip_psk=False, skip_ike=False,
|
||||||
@ -876,8 +913,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
def test_create_delete_ipsec_connection(self):
|
def test_create_delete_ipsec_connection(self):
|
||||||
"""Create and then delete an IPSec connection."""
|
"""Create and then delete an IPSec connection."""
|
||||||
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
connection_info = {
|
connection_info = {
|
||||||
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
||||||
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
||||||
@ -903,8 +941,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
expected_connection.update(connection_info)
|
expected_connection.update(connection_info)
|
||||||
self.assertEqual(expected_connection, content)
|
self.assertEqual(expected_connection, content)
|
||||||
# Now delete and verify that site-to-site connection is gone
|
# Now delete and verify that site-to-site connection is gone
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.delete,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.no_such_resource):
|
device_drivers.csr_request.delete,
|
||||||
|
device_drivers.csr_request.no_such_resource):
|
||||||
# Only delete connection. Cleanup will take care of prerequisites
|
# Only delete connection. Cleanup will take care of prerequisites
|
||||||
self.csr.delete_ipsec_connection('Tunnel%d' % tunnel_id)
|
self.csr.delete_ipsec_connection('Tunnel%d' % tunnel_id)
|
||||||
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
||||||
@ -914,8 +953,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
def test_create_ipsec_connection_with_no_tunnel_subnet(self):
|
def test_create_ipsec_connection_with_no_tunnel_subnet(self):
|
||||||
"""Create an IPSec connection without an IP address on tunnel."""
|
"""Create an IPSec connection without an IP address on tunnel."""
|
||||||
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.get_unnumbered):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.get_unnumbered):
|
||||||
connection_info = {
|
connection_info = {
|
||||||
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
||||||
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
||||||
@ -949,8 +989,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
|
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
|
||||||
skip_psk=True)
|
skip_psk=True)
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
connection_info = {
|
connection_info = {
|
||||||
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
||||||
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
||||||
@ -985,8 +1026,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
|
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
|
||||||
skip_ike=True)
|
skip_ike=True)
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
connection_info = {
|
connection_info = {
|
||||||
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
||||||
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
||||||
@ -1016,7 +1058,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
"""Create IPSec connection in admin down state."""
|
"""Create IPSec connection in admin down state."""
|
||||||
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
||||||
tunnel = u'Tunnel%d' % tunnel_id
|
tunnel = u'Tunnel%d' % tunnel_id
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post):
|
||||||
connection_info = {
|
connection_info = {
|
||||||
u'vpn-interface-name': tunnel,
|
u'vpn-interface-name': tunnel,
|
||||||
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
||||||
@ -1038,14 +1081,16 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
u'vpn-interface-name': tunnel,
|
u'vpn-interface-name': tunnel,
|
||||||
u'line-protocol-state': u'down',
|
u'line-protocol-state': u'down',
|
||||||
u'enabled': False}
|
u'enabled': False}
|
||||||
with httmock.HTTMock(csr_request.put, csr_request.get_admin_down):
|
with httmock.HTTMock(device_drivers.csr_request.put,
|
||||||
|
device_drivers.csr_request.get_admin_down):
|
||||||
self.csr.set_ipsec_connection_state(tunnel, admin_up=False)
|
self.csr.set_ipsec_connection_state(tunnel, admin_up=False)
|
||||||
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
||||||
content = self.csr.get_request(state_uri, full_url=True)
|
content = self.csr.get_request(state_uri, full_url=True)
|
||||||
self.assertEqual(requests.codes.OK, self.csr.status)
|
self.assertEqual(requests.codes.OK, self.csr.status)
|
||||||
self.assertEqual(expected_state, content)
|
self.assertEqual(expected_state, content)
|
||||||
|
|
||||||
with httmock.HTTMock(csr_request.put, csr_request.get_admin_up):
|
with httmock.HTTMock(device_drivers.csr_request.put,
|
||||||
|
device_drivers.csr_request.get_admin_up):
|
||||||
self.csr.set_ipsec_connection_state(tunnel, admin_up=True)
|
self.csr.set_ipsec_connection_state(tunnel, admin_up=True)
|
||||||
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
||||||
content = self.csr.get_request(state_uri, full_url=True)
|
content = self.csr.get_request(state_uri, full_url=True)
|
||||||
@ -1057,8 +1102,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
"""Negative test of connection create without IPSec policy."""
|
"""Negative test of connection create without IPSec policy."""
|
||||||
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
|
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
|
||||||
skip_ipsec=True)
|
skip_ipsec=True)
|
||||||
with httmock.HTTMock(csr_request.token,
|
with httmock.HTTMock(
|
||||||
csr_request.post_missing_ipsec_policy):
|
device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post_missing_ipsec_policy):
|
||||||
connection_info = {
|
connection_info = {
|
||||||
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
||||||
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
||||||
@ -1073,7 +1119,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
|
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
|
||||||
|
|
||||||
def _determine_conflicting_ip(self):
|
def _determine_conflicting_ip(self):
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.get_local_ip):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.get_local_ip):
|
||||||
details = self.csr.get_request('interfaces/GigabitEthernet3')
|
details = self.csr.get_request('interfaces/GigabitEthernet3')
|
||||||
if self.csr.status != requests.codes.OK:
|
if self.csr.status != requests.codes.OK:
|
||||||
self.fail("Unable to obtain interface GigabitEthernet3's IP")
|
self.fail("Unable to obtain interface GigabitEthernet3's IP")
|
||||||
@ -1091,7 +1138,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
conflicting_ip = self._determine_conflicting_ip()
|
conflicting_ip = self._determine_conflicting_ip()
|
||||||
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post_bad_ip):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post_bad_ip):
|
||||||
connection_info = {
|
connection_info = {
|
||||||
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
||||||
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
||||||
@ -1108,8 +1156,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
def test_create_ipsec_connection_with_max_mtu(self):
|
def test_create_ipsec_connection_with_max_mtu(self):
|
||||||
"""Create an IPSec connection with max MTU value."""
|
"""Create an IPSec connection with max MTU value."""
|
||||||
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.get_mtu):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.get_mtu):
|
||||||
connection_info = {
|
connection_info = {
|
||||||
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
||||||
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
||||||
@ -1137,7 +1186,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
def test_create_ipsec_connection_with_bad_mtu(self):
|
def test_create_ipsec_connection_with_bad_mtu(self):
|
||||||
"""Negative test of connection create with unsupported MTU value."""
|
"""Negative test of connection create with unsupported MTU value."""
|
||||||
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post_bad_mtu):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.post_bad_mtu):
|
||||||
connection_info = {
|
connection_info = {
|
||||||
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
|
||||||
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
||||||
@ -1154,7 +1204,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_status_when_no_tunnels_exist(self):
|
def test_status_when_no_tunnels_exist(self):
|
||||||
"""Get status, when there are no tunnels."""
|
"""Get status, when there are no tunnels."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.get_none):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.get_none):
|
||||||
tunnels = self.csr.read_tunnel_statuses()
|
tunnels = self.csr.read_tunnel_statuses()
|
||||||
self.assertEqual(requests.codes.OK, self.csr.status)
|
self.assertEqual(requests.codes.OK, self.csr.status)
|
||||||
self.assertEqual([], tunnels)
|
self.assertEqual([], tunnels)
|
||||||
@ -1164,8 +1215,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
# Create the IPsec site-to-site connection first
|
# Create the IPsec site-to-site connection first
|
||||||
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
|
||||||
tunnel_id = 123 # Must hard code to work with mock
|
tunnel_id = 123 # Must hard code to work with mock
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
connection_info = {
|
connection_info = {
|
||||||
u'vpn-interface-name': u'Tunnel123',
|
u'vpn-interface-name': u'Tunnel123',
|
||||||
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
|
||||||
@ -1180,7 +1232,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase):
|
|||||||
self.assertEqual(requests.codes.CREATED, self.csr.status)
|
self.assertEqual(requests.codes.CREATED, self.csr.status)
|
||||||
self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id,
|
self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id,
|
||||||
location)
|
location)
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
tunnels = self.csr.read_tunnel_statuses()
|
tunnels = self.csr.read_tunnel_statuses()
|
||||||
self.assertEqual(requests.codes.OK, self.csr.status)
|
self.assertEqual(requests.codes.OK, self.csr.status)
|
||||||
self.assertEqual([(u'Tunnel123', u'DOWN-NEGOTIATING'), ], tunnels)
|
self.assertEqual([(u'Tunnel123', u'DOWN-NEGOTIATING'), ], tunnels)
|
||||||
@ -1197,7 +1250,8 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def _save_dpd_info(self):
|
def _save_dpd_info(self):
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.normal_get):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
details = self.csr.get_request('vpn-svc/ike/keepalive')
|
details = self.csr.get_request('vpn-svc/ike/keepalive')
|
||||||
if self.csr.status == requests.codes.OK:
|
if self.csr.status == requests.codes.OK:
|
||||||
self.dpd = details
|
self.dpd = details
|
||||||
@ -1206,7 +1260,8 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
|
|||||||
self.fail("Unable to save original DPD info")
|
self.fail("Unable to save original DPD info")
|
||||||
|
|
||||||
def _restore_dpd_info(self):
|
def _restore_dpd_info(self):
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.put):
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
|
device_drivers.csr_request.put):
|
||||||
payload = {'interval': self.dpd['interval'],
|
payload = {'interval': self.dpd['interval'],
|
||||||
'retry': self.dpd['retry']}
|
'retry': self.dpd['retry']}
|
||||||
self.csr.put_request('vpn-svc/ike/keepalive', payload=payload)
|
self.csr.put_request('vpn-svc/ike/keepalive', payload=payload)
|
||||||
@ -1222,8 +1277,9 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_configure_ike_keepalive(self):
|
def test_configure_ike_keepalive(self):
|
||||||
"""Set IKE keep-alive (aka Dead Peer Detection) for the CSR."""
|
"""Set IKE keep-alive (aka Dead Peer Detection) for the CSR."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.put,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.put,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
keepalive_info = {'interval': 60, 'retry': 4}
|
keepalive_info = {'interval': 60, 'retry': 4}
|
||||||
self.csr.configure_ike_keepalive(keepalive_info)
|
self.csr.configure_ike_keepalive(keepalive_info)
|
||||||
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
||||||
@ -1235,8 +1291,10 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_disable_ike_keepalive(self):
|
def test_disable_ike_keepalive(self):
|
||||||
"""Disable IKE keep-alive (aka Dead Peer Detection) for the CSR."""
|
"""Disable IKE keep-alive (aka Dead Peer Detection) for the CSR."""
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.delete,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.put, csr_request.get_not_configured):
|
device_drivers.csr_request.delete,
|
||||||
|
device_drivers.csr_request.put,
|
||||||
|
device_drivers.csr_request.get_not_configured):
|
||||||
keepalive_info = {'interval': 0, 'retry': 4}
|
keepalive_info = {'interval': 0, 'retry': 4}
|
||||||
self.csr.configure_ike_keepalive(keepalive_info)
|
self.csr.configure_ike_keepalive(keepalive_info)
|
||||||
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
||||||
@ -1260,8 +1318,9 @@ class TestCsrRestStaticRoute(base.BaseTestCase):
|
|||||||
cidr = u'10.1.0.0/24'
|
cidr = u'10.1.0.0/24'
|
||||||
interface = u'GigabitEthernet1'
|
interface = u'GigabitEthernet1'
|
||||||
expected_id = '10.1.0.0_24_GigabitEthernet1'
|
expected_id = '10.1.0.0_24_GigabitEthernet1'
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.post,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.normal_get):
|
device_drivers.csr_request.post,
|
||||||
|
device_drivers.csr_request.normal_get):
|
||||||
route_info = {u'destination-network': cidr,
|
route_info = {u'destination-network': cidr,
|
||||||
u'outgoing-interface': interface}
|
u'outgoing-interface': interface}
|
||||||
location = self.csr.create_static_route(route_info)
|
location = self.csr.create_static_route(route_info)
|
||||||
@ -1277,8 +1336,9 @@ class TestCsrRestStaticRoute(base.BaseTestCase):
|
|||||||
expected_route.update(route_info)
|
expected_route.update(route_info)
|
||||||
self.assertEqual(expected_route, content)
|
self.assertEqual(expected_route, content)
|
||||||
# Now delete and verify that static route is gone
|
# Now delete and verify that static route is gone
|
||||||
with httmock.HTTMock(csr_request.token, csr_request.delete,
|
with httmock.HTTMock(device_drivers.csr_request.token,
|
||||||
csr_request.no_such_resource):
|
device_drivers.csr_request.delete,
|
||||||
|
device_drivers.csr_request.no_such_resource):
|
||||||
route_id = csr_client.make_route_id(cidr, interface)
|
route_id = csr_client.make_route_id(cidr, interface)
|
||||||
self.csr.delete_static_route(route_id)
|
self.csr.delete_static_route(route_id)
|
||||||
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
|
||||||
|
Loading…
Reference in New Issue
Block a user