From 787fecb99909f47714ad1fbc77384a8f606b2baa Mon Sep 17 00:00:00 2001 From: Paul Michali Date: Mon, 7 Jul 2014 18:57:43 -0400 Subject: [PATCH] VPNaaS Cisco REST client enhance CSR create For the create of the REST client object that represents a Cisco CSR, all of the info needed were passed in as separate parameters. This change just uses a dict instead, so that additional parameters can be added w/o changing the API. Updated the currently unused UT module, just so that it can be used locally and stays up-to-date until it can be converted to use the new requests-mock package. Change-Id: I5d4f439cc7ffe125cea9ed3407b70645587a739a Closes-Bug: 1336478 --- .../device_drivers/cisco_csr_rest_client.py | 10 +- .../vpn/device_drivers/cisco_ipsec.py | 6 +- .../vpn/device_drivers/cisco_csr_mock.py | 10 +- .../device_drivers/notest_cisco_csr_rest.py | 400 +++++++++--------- 4 files changed, 218 insertions(+), 208 deletions(-) diff --git a/neutron/services/vpn/device_drivers/cisco_csr_rest_client.py b/neutron/services/vpn/device_drivers/cisco_csr_rest_client.py index 61693e9e1..d8e79789a 100644 --- a/neutron/services/vpn/device_drivers/cisco_csr_rest_client.py +++ b/neutron/services/vpn/device_drivers/cisco_csr_rest_client.py @@ -44,13 +44,13 @@ class CsrRestClient(object): """REST CsrRestClient for accessing the Cisco Cloud Services Router.""" - def __init__(self, host, tunnel_ip, username, password, timeout=None): - self.host = host - self.tunnel_ip = tunnel_ip - self.auth = (username, password) + def __init__(self, settings): + self.host = settings['rest_mgmt'] + self.tunnel_ip = settings['tunnel_ip'] + self.auth = (settings['username'], settings['password']) self.token = None self.status = requests.codes.OK - self.timeout = timeout + self.timeout = settings.get('timeout') self.max_tries = 5 self.session = requests.Session() diff --git a/neutron/services/vpn/device_drivers/cisco_ipsec.py b/neutron/services/vpn/device_drivers/cisco_ipsec.py index 33a806d4b..b9ff2cb9e 100644 --- a/neutron/services/vpn/device_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/device_drivers/cisco_ipsec.py @@ -216,11 +216,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver): else: raise SystemExit(_('No Cisco CSR configurations found in: %s') % cfg.CONF.config_file) - self.csrs = dict([(k, csr_client.CsrRestClient(v['rest_mgmt'], - v['tunnel_ip'], - v['username'], - v['password'], - v['timeout'])) + self.csrs = dict([(k, csr_client.CsrRestClient(v)) for k, v in csrs_found.items()]) def vpnservice_updated(self, context, **kwargs): diff --git a/neutron/tests/unit/services/vpn/device_drivers/cisco_csr_mock.py b/neutron/tests/unit/services/vpn/device_drivers/cisco_csr_mock.py index fe2223a74..a63ebe5a1 100644 --- a/neutron/tests/unit/services/vpn/device_drivers/cisco_csr_mock.py +++ b/neutron/tests/unit/services/vpn/device_drivers/cisco_csr_mock.py @@ -19,19 +19,19 @@ import re import functools -# import httmock +# TODO(pcm): Remove when switch to requests-mock package. Comment out, if use +# local copy of httmock.py source. Needed for PEP8. +import httmock import requests from requests import exceptions as r_exc from neutron.openstack.common import log as logging # TODO(pcm) Remove once httmock package is added to test-requirements. For -# now, uncomment and include httmock source to UT -from neutron.tests.unit.services.vpn import device_drivers +# now, uncomment and include httmock source to unit test. +# from neutron.tests.unit.services.vpn.device_drivers import httmock LOG = logging.getLogger(__name__) -httmock = device_drivers.httmock - def repeat(n): """Decorator to limit the number of times a handler is called. diff --git a/neutron/tests/unit/services/vpn/device_drivers/notest_cisco_csr_rest.py b/neutron/tests/unit/services/vpn/device_drivers/notest_cisco_csr_rest.py index 45bf97238..e28af3094 100644 --- a/neutron/tests/unit/services/vpn/device_drivers/notest_cisco_csr_rest.py +++ b/neutron/tests/unit/services/vpn/device_drivers/notest_cisco_csr_rest.py @@ -19,17 +19,20 @@ import random -# TODO(pcm) Uncomment when httmock is added to test requirements. -# import httmock +# TODO(pcm): Remove when update to requests-mock package. Comment out, if use +# local copy of httmock.py source. Needed for PEP8. +import httmock import requests from neutron.openstack.common import log as logging from neutron.services.vpn.device_drivers import ( cisco_csr_rest_client as csr_client) from neutron.tests import base -from neutron.tests.unit.services.vpn import device_drivers -# TODO(pcm) Remove once httmock is available. In the meantime, use -# temporary local copy of httmock source to run UT +from neutron.tests.unit.services.vpn.device_drivers import ( + cisco_csr_mock as csr_request) +# TODO(pcm) Uncomment to run w/local copy of httmock.py source. Remove when +# update to requests-mock package. +# from neutron.tests.unit.services.vpn.device_drivers import httmock LOG = logging.getLogger(__name__) @@ -40,8 +43,6 @@ if True: dummy_policy_id = 'dummy-ipsec-policy-id-name' -httmock = device_drivers.httmock - # Note: Helper functions to test reuse of IDs. def generate_pre_shared_key_id(): @@ -62,12 +63,13 @@ class TestCsrLoginRestApi(base.BaseTestCase): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): super(TestCsrLoginRestApi, self).setUp() - self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco', - timeout) + info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) def test_get_token(self): """Obtain the token and its expiration time.""" - with httmock.HTTMock(device_drivers.csr_request.token): + with httmock.HTTMock(csr_request.token): self.assertTrue(self.csr.authenticate()) self.assertEqual(requests.codes.OK, self.csr.status) self.assertIsNotNone(self.csr.token) @@ -75,7 +77,7 @@ class TestCsrLoginRestApi(base.BaseTestCase): def test_unauthorized_token_request(self): """Negative test of invalid user/password.""" self.csr.auth = ('stack', 'bogus') - with httmock.HTTMock(device_drivers.csr_request.token_unauthorized): + with httmock.HTTMock(csr_request.token_unauthorized): self.assertIsNone(self.csr.authenticate()) self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status) @@ -83,14 +85,14 @@ class TestCsrLoginRestApi(base.BaseTestCase): """Negative test of request to non-existent host.""" self.csr.host = 'wrong-host' self.csr.token = 'Set by some previously successful access' - with httmock.HTTMock(device_drivers.csr_request.token_wrong_host): + with httmock.HTTMock(csr_request.token_wrong_host): self.assertIsNone(self.csr.authenticate()) self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) self.assertIsNone(self.csr.token) def test_timeout_on_token_access(self): """Negative test of a timeout on a request.""" - with httmock.HTTMock(device_drivers.csr_request.token_timeout): + with httmock.HTTMock(csr_request.token_timeout): self.assertIsNone(self.csr.authenticate()) self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status) self.assertIsNone(self.csr.token) @@ -102,8 +104,9 @@ class TestCsrGetRestApi(base.BaseTestCase): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): super(TestCsrGetRestApi, self).setUp() - self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco', - timeout) + info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) def test_valid_rest_gets(self): """Simple GET requests. @@ -112,8 +115,8 @@ class TestCsrGetRestApi(base.BaseTestCase): that there are two interfaces on the CSR. """ - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.normal_get): content = self.csr.get_request('global/host-name') self.assertEqual(requests.codes.OK, self.csr.status) self.assertIn('host-name', content) @@ -130,8 +133,9 @@ class TestCsrPostRestApi(base.BaseTestCase): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): super(TestCsrPostRestApi, self).setUp() - self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco', - timeout) + info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) def test_post_requests(self): """Simple POST requests (repeatable). @@ -140,8 +144,8 @@ class TestCsrPostRestApi(base.BaseTestCase): that there are two interfaces (Ge1 and Ge2) on the CSR. """ - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post): + with httmock.HTTMock(csr_request.token, + csr_request.post): content = self.csr.post_request( 'interfaces/GigabitEthernet1/statistics', payload={'action': 'clear'}) @@ -155,8 +159,8 @@ class TestCsrPostRestApi(base.BaseTestCase): def test_post_with_location(self): """Create a user and verify that location returned.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post): + with httmock.HTTMock(csr_request.token, + csr_request.post): location = self.csr.post_request( 'global/local-users', payload={'username': 'test-user', @@ -167,8 +171,8 @@ class TestCsrPostRestApi(base.BaseTestCase): def test_post_missing_required_attribute(self): """Negative test of POST with missing mandatory info.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post): + with httmock.HTTMock(csr_request.token, + csr_request.post): self.csr.post_request('global/local-users', payload={'password': 'pass12345', 'privilege': 15}) @@ -176,8 +180,8 @@ class TestCsrPostRestApi(base.BaseTestCase): def test_post_invalid_attribute(self): """Negative test of POST with invalid info.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post): + with httmock.HTTMock(csr_request.token, + csr_request.post): self.csr.post_request('global/local-users', payload={'username': 'test-user', 'password': 'pass12345', @@ -190,8 +194,8 @@ class TestCsrPostRestApi(base.BaseTestCase): Uses the lower level _do_request() API to just perform the POST and obtain the response, without any error processing. """ - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post): + with httmock.HTTMock(csr_request.token, + csr_request.post): location = self.csr._do_request( 'POST', 'global/local-users', @@ -201,8 +205,8 @@ class TestCsrPostRestApi(base.BaseTestCase): more_headers=csr_client.HEADER_CONTENT_TYPE_JSON) self.assertEqual(requests.codes.CREATED, self.csr.status) self.assertIn('global/local-users/test-user', location) - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post_change_attempt): + with httmock.HTTMock(csr_request.token, + csr_request.post_change_attempt): self.csr._do_request( 'POST', 'global/local-users', @@ -216,8 +220,8 @@ class TestCsrPostRestApi(base.BaseTestCase): def test_post_changing_value(self): """Negative test of a POST trying to change a value.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post): + with httmock.HTTMock(csr_request.token, + csr_request.post): location = self.csr.post_request( 'global/local-users', payload={'username': 'test-user', @@ -225,8 +229,8 @@ class TestCsrPostRestApi(base.BaseTestCase): 'privilege': 15}) self.assertEqual(requests.codes.CREATED, self.csr.status) self.assertIn('global/local-users/test-user', location) - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post_change_attempt): + with httmock.HTTMock(csr_request.token, + csr_request.post_change_attempt): content = self.csr.post_request('global/local-users', payload={'username': 'test-user', 'password': 'changed', @@ -242,8 +246,8 @@ class TestCsrPutRestApi(base.BaseTestCase): """Test CSR PUT REST API.""" def _save_resources(self): - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.normal_get): details = self.csr.get_request('global/host-name') if self.csr.status != requests.codes.OK: self.fail("Unable to save original host name") @@ -266,8 +270,8 @@ class TestCsrPutRestApi(base.BaseTestCase): self.csr.auth = (user, password) self.csr.token = None - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.put): + with httmock.HTTMock(csr_request.token, + csr_request.put): payload = {'host-name': self.original_host} self.csr.put_request('global/host-name', payload=payload) if self.csr.status != requests.codes.NO_CONTENT: @@ -285,8 +289,10 @@ class TestCsrPutRestApi(base.BaseTestCase): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): """Prepare for PUT API tests.""" super(TestCsrPutRestApi, self).setUp() - self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco', - timeout) + info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) + self._save_resources() self.addCleanup(self._restore_resources, 'stack', 'cisco') @@ -297,9 +303,9 @@ class TestCsrPutRestApi(base.BaseTestCase): that there are two interfaces on the CSR (Ge1 and Ge2). """ - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.put, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.put, + csr_request.normal_get): payload = {'host-name': 'TestHost'} content = self.csr.put_request('global/host-name', payload=payload) @@ -318,9 +324,9 @@ class TestCsrPutRestApi(base.BaseTestCase): This was a problem with an earlier version of the CSR image and is here to prevent regression. """ - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.put, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.put, + csr_request.normal_get): payload = {'description': u'Changed description', 'if-name': self.original_if['if-name'], 'ip-address': self.original_if['ip-address'], @@ -345,9 +351,9 @@ class TestCsrPutRestApi(base.BaseTestCase): test setup to change the description to a non-empty string to avoid failures in other tests. """ - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.put, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.put, + csr_request.normal_get): payload = {'description': '', 'if-name': self.original_if['if-name'], 'ip-address': self.original_if['ip-address'], @@ -369,8 +375,9 @@ class TestCsrDeleteRestApi(base.BaseTestCase): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): super(TestCsrDeleteRestApi, self).setUp() - self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco', - timeout) + info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) def _make_dummy_user(self): """Create a user that will be later deleted.""" @@ -382,9 +389,9 @@ class TestCsrDeleteRestApi(base.BaseTestCase): def test_delete_requests(self): """Simple DELETE requests (creating entry first).""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.delete): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.delete): self._make_dummy_user() self.csr.token = None # Force login self.csr.delete_request('global/local-users/dummy') @@ -396,8 +403,8 @@ class TestCsrDeleteRestApi(base.BaseTestCase): def test_delete_non_existent_entry(self): """Negative test of trying to delete a non-existent user.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.delete_unknown): + with httmock.HTTMock(csr_request.token, + csr_request.delete_unknown): content = self.csr.delete_request('global/local-users/unknown') self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) expected = {u'error-code': -1, @@ -406,8 +413,8 @@ class TestCsrDeleteRestApi(base.BaseTestCase): def test_delete_not_allowed(self): """Negative test of trying to delete the host-name.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.delete_not_allowed): + with httmock.HTTMock(csr_request.token, + csr_request.delete_not_allowed): self.csr.delete_request('global/host-name') self.assertEqual(requests.codes.METHOD_NOT_ALLOWED, self.csr.status) @@ -423,21 +430,22 @@ class TestCsrRestApiFailures(base.BaseTestCase): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=0.1): super(TestCsrRestApiFailures, self).setUp() - self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco', - timeout) + info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) def test_request_for_non_existent_resource(self): """Negative test of non-existent resource on REST request.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.no_such_resource): + with httmock.HTTMock(csr_request.token, + csr_request.no_such_resource): self.csr.post_request('no/such/request') self.assertEqual(requests.codes.NOT_FOUND, self.csr.status) # The result is HTTP 404 message, so no error content to check def test_timeout_during_request(self): """Negative test of timeout during REST request.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.timeout): + with httmock.HTTMock(csr_request.token, + csr_request.timeout): self.csr._do_request('GET', 'global/host-name') self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status) @@ -449,9 +457,9 @@ class TestCsrRestApiFailures(base.BaseTestCase): token by changing it. """ - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.expired_request, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.expired_request, + csr_request.normal_get): self.csr.token = '123' # These are 44 characters, so won't match content = self.csr._do_request('GET', 'global/host-name') self.assertEqual(requests.codes.OK, self.csr.status) @@ -461,7 +469,7 @@ class TestCsrRestApiFailures(base.BaseTestCase): def test_failed_to_obtain_token_for_request(self): """Negative test of unauthorized user for REST request.""" self.csr.auth = ('stack', 'bogus') - with httmock.HTTMock(device_drivers.csr_request.token_unauthorized): + with httmock.HTTMock(csr_request.token_unauthorized): self.csr._do_request('GET', 'global/host-name') self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status) @@ -472,14 +480,15 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): super(TestCsrRestIkePolicyCreate, self).setUp() - self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco', - timeout) + info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) def test_create_delete_ike_policy(self): """Create and then delete IKE policy.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.normal_get): policy_id = '2' policy_info = {u'priority-id': u'%s' % policy_id, u'encryption': u'aes256', @@ -498,9 +507,9 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase): expected_policy.update(policy_info) self.assertEqual(expected_policy, content) # Now delete and verify the IKE policy is gone - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.delete, - device_drivers.csr_request.no_such_resource): + with httmock.HTTMock(csr_request.token, + csr_request.delete, + csr_request.no_such_resource): self.csr.delete_ike_policy(policy_id) self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) content = self.csr.get_request(location, full_url=True) @@ -508,9 +517,9 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase): def test_create_ike_policy_with_defaults(self): """Create IKE policy using defaults for all optional values.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.get_defaults): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.get_defaults): policy_id = '2' policy_info = {u'priority-id': u'%s' % policy_id} location = self.csr.create_ike_policy(policy_info) @@ -532,9 +541,9 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase): def test_create_duplicate_ike_policy(self): """Negative test of trying to create a duplicate IKE policy.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.normal_get): policy_id = '2' policy_info = {u'priority-id': u'%s' % policy_id, u'encryption': u'aes', @@ -544,8 +553,8 @@ class TestCsrRestIkePolicyCreate(base.BaseTestCase): location = self.csr.create_ike_policy(policy_info) self.assertEqual(requests.codes.CREATED, self.csr.status) self.assertIn('vpn-svc/ike/policies/%s' % policy_id, location) - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post_duplicate): + with httmock.HTTMock(csr_request.token, + csr_request.post_duplicate): location = self.csr.create_ike_policy(policy_info) self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) expected = {u'error-code': -1, @@ -560,14 +569,15 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): super(TestCsrRestIPSecPolicyCreate, self).setUp() - self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco', - timeout) + info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) def test_create_delete_ipsec_policy(self): """Create and then delete IPSec policy.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.normal_get): policy_id = '123' policy_info = { u'policy-id': u'%s' % policy_id, @@ -595,9 +605,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase): expected_policy[u'anti-replay-window-size'] = u'Disable' self.assertEqual(expected_policy, content) # Now delete and verify the IPSec policy is gone - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.delete, - device_drivers.csr_request.no_such_resource): + with httmock.HTTMock(csr_request.token, + csr_request.delete, + csr_request.no_such_resource): self.csr.delete_ipsec_policy(policy_id) self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) content = self.csr.get_request(location, full_url=True) @@ -605,9 +615,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase): def test_create_ipsec_policy_with_defaults(self): """Create IPSec policy with default for all optional values.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.get_defaults): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.get_defaults): policy_id = '123' policy_info = { u'policy-id': u'%s' % policy_id, @@ -631,9 +641,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase): def test_create_ipsec_policy_with_uuid(self): """Create IPSec policy using UUID for id.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.normal_get): policy_info = { u'policy-id': u'%s' % dummy_policy_id, u'protection-suite': { @@ -663,9 +673,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase): def test_create_ipsec_policy_without_ah(self): """Create IPSec policy.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.get_no_ah): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.get_no_ah): policy_id = '10' policy_info = { u'policy-id': u'%s' % policy_id, @@ -692,8 +702,8 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase): def test_invalid_ipsec_policy_lifetime(self): """Failure test of IPSec policy with unsupported lifetime.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post_bad_lifetime): + with httmock.HTTMock(csr_request.token, + csr_request.post_bad_lifetime): policy_id = '123' policy_info = { u'policy-id': u'%s' % policy_id, @@ -711,9 +721,9 @@ class TestCsrRestIPSecPolicyCreate(base.BaseTestCase): def test_create_ipsec_policy_with_invalid_name(self): """Failure test of creating IPSec policy with name too long.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post_bad_name, - device_drivers.csr_request.get_defaults): + with httmock.HTTMock(csr_request.token, + csr_request.post_bad_name, + csr_request.get_defaults): policy_id = 'policy-name-is-too-long-32-chars' policy_info = { u'policy-id': u'%s' % policy_id, @@ -728,14 +738,15 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): super(TestCsrRestPreSharedKeyCreate, self).setUp() - self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco', - timeout) + info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) def test_create_delete_pre_shared_key(self): """Create and then delete a keyring entry for pre-shared key.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.normal_get): psk_id = '5' psk_info = {u'keyring-name': u'%s' % psk_id, u'pre-shared-key-list': [ @@ -756,9 +767,9 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase): u'10.10.10.20 255.255.255.0') self.assertEqual(expected_policy, content) # Now delete and verify pre-shared key is gone - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.delete, - device_drivers.csr_request.no_such_resource): + with httmock.HTTMock(csr_request.token, + csr_request.delete, + csr_request.no_such_resource): self.csr.delete_pre_shared_key(psk_id) self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) content = self.csr.get_request(location, full_url=True) @@ -766,9 +777,9 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase): def test_create_pre_shared_key_with_fqdn_peer(self): """Create pre-shared key using FQDN for peer address.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.get_fqdn): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.get_fqdn): psk_id = '5' psk_info = {u'keyring-name': u'%s' % psk_id, u'pre-shared-key-list': [ @@ -788,9 +799,9 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase): def test_create_pre_shared_key_with_duplicate_peer_address(self): """Negative test of creating a second pre-shared key with same peer.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.normal_get): psk_id = '5' psk_info = {u'keyring-name': u'%s' % psk_id, u'pre-shared-key-list': [ @@ -801,8 +812,8 @@ class TestCsrRestPreSharedKeyCreate(base.BaseTestCase): location = self.csr.create_pre_shared_key(psk_info) self.assertEqual(requests.codes.CREATED, self.csr.status) self.assertIn('vpn-svc/ike/keyrings/%s' % psk_id, location) - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post_duplicate): + with httmock.HTTMock(csr_request.token, + csr_request.post_duplicate): psk_id = u'6' another_psk_info = {u'keyring-name': psk_id, u'pre-shared-key-list': [ @@ -825,15 +836,16 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): super(TestCsrRestIPSecConnectionCreate, self).setUp() - self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco', - timeout) + info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) def _make_psk_for_test(self): psk_id = generate_pre_shared_key_id() self._remove_resource_for_test(self.csr.delete_pre_shared_key, psk_id) - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post): + with httmock.HTTMock(csr_request.token, + csr_request.post): psk_info = {u'keyring-name': u'%d' % psk_id, u'pre-shared-key-list': [ {u'key': u'super-secret', @@ -851,8 +863,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): policy_id = generate_ike_policy_id() self._remove_resource_for_test(self.csr.delete_ike_policy, policy_id) - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post): + with httmock.HTTMock(csr_request.token, + csr_request.post): policy_info = {u'priority-id': u'%d' % policy_id, u'encryption': u'aes', u'hash': u'sha', @@ -869,8 +881,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): policy_id = generate_ipsec_policy_id() self._remove_resource_for_test(self.csr.delete_ipsec_policy, policy_id) - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post): + with httmock.HTTMock(csr_request.token, + csr_request.post): policy_info = { u'policy-id': u'%d' % policy_id, u'protection-suite': { @@ -890,8 +902,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): return policy_id def _remove_resource_for_test(self, delete_resource, resource_id): - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.delete): + with httmock.HTTMock(csr_request.token, + csr_request.delete): delete_resource(resource_id) def _prepare_for_site_conn_create(self, skip_psk=False, skip_ike=False, @@ -913,9 +925,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): def test_create_delete_ipsec_connection(self): """Create and then delete an IPSec connection.""" tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.normal_get): connection_info = { u'vpn-interface-name': u'Tunnel%d' % tunnel_id, u'ipsec-policy-id': u'%d' % ipsec_policy_id, @@ -941,9 +953,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): expected_connection.update(connection_info) self.assertEqual(expected_connection, content) # Now delete and verify that site-to-site connection is gone - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.delete, - device_drivers.csr_request.no_such_resource): + with httmock.HTTMock(csr_request.token, + csr_request.delete, + csr_request.no_such_resource): # Only delete connection. Cleanup will take care of prerequisites self.csr.delete_ipsec_connection('Tunnel%d' % tunnel_id) self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) @@ -953,9 +965,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): def test_create_ipsec_connection_with_no_tunnel_subnet(self): """Create an IPSec connection without an IP address on tunnel.""" tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.get_unnumbered): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.get_unnumbered): connection_info = { u'vpn-interface-name': u'Tunnel%d' % tunnel_id, u'ipsec-policy-id': u'%d' % ipsec_policy_id, @@ -989,9 +1001,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create( skip_psk=True) - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.normal_get): connection_info = { u'vpn-interface-name': u'Tunnel%d' % tunnel_id, u'ipsec-policy-id': u'%d' % ipsec_policy_id, @@ -1026,9 +1038,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create( skip_ike=True) - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.normal_get): connection_info = { u'vpn-interface-name': u'Tunnel%d' % tunnel_id, u'ipsec-policy-id': u'%d' % ipsec_policy_id, @@ -1058,8 +1070,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): """Create IPSec connection in admin down state.""" tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() tunnel = u'Tunnel%d' % tunnel_id - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post): + with httmock.HTTMock(csr_request.token, + csr_request.post): connection_info = { u'vpn-interface-name': tunnel, u'ipsec-policy-id': u'%d' % ipsec_policy_id, @@ -1081,16 +1093,16 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): u'vpn-interface-name': tunnel, u'line-protocol-state': u'down', u'enabled': False} - with httmock.HTTMock(device_drivers.csr_request.put, - device_drivers.csr_request.get_admin_down): + with httmock.HTTMock(csr_request.put, + csr_request.get_admin_down): self.csr.set_ipsec_connection_state(tunnel, admin_up=False) self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) content = self.csr.get_request(state_uri, full_url=True) self.assertEqual(requests.codes.OK, self.csr.status) self.assertEqual(expected_state, content) - with httmock.HTTMock(device_drivers.csr_request.put, - device_drivers.csr_request.get_admin_up): + with httmock.HTTMock(csr_request.put, + csr_request.get_admin_up): self.csr.set_ipsec_connection_state(tunnel, admin_up=True) self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) content = self.csr.get_request(state_uri, full_url=True) @@ -1103,8 +1115,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create( skip_ipsec=True) with httmock.HTTMock( - device_drivers.csr_request.token, - device_drivers.csr_request.post_missing_ipsec_policy): + csr_request.token, + csr_request.post_missing_ipsec_policy): connection_info = { u'vpn-interface-name': u'Tunnel%d' % tunnel_id, u'ipsec-policy-id': u'%d' % ipsec_policy_id, @@ -1119,8 +1131,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status) def _determine_conflicting_ip(self): - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.get_local_ip): + with httmock.HTTMock(csr_request.token, + csr_request.get_local_ip): details = self.csr.get_request('interfaces/GigabitEthernet3') if self.csr.status != requests.codes.OK: self.fail("Unable to obtain interface GigabitEthernet3's IP") @@ -1138,8 +1150,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): conflicting_ip = self._determine_conflicting_ip() tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post_bad_ip): + with httmock.HTTMock(csr_request.token, + csr_request.post_bad_ip): connection_info = { u'vpn-interface-name': u'Tunnel%d' % tunnel_id, u'ipsec-policy-id': u'%d' % ipsec_policy_id, @@ -1156,9 +1168,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): def test_create_ipsec_connection_with_max_mtu(self): """Create an IPSec connection with max MTU value.""" tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.get_mtu): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.get_mtu): connection_info = { u'vpn-interface-name': u'Tunnel%d' % tunnel_id, u'ipsec-policy-id': u'%d' % ipsec_policy_id, @@ -1186,8 +1198,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): def test_create_ipsec_connection_with_bad_mtu(self): """Negative test of connection create with unsupported MTU value.""" tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post_bad_mtu): + with httmock.HTTMock(csr_request.token, + csr_request.post_bad_mtu): connection_info = { u'vpn-interface-name': u'Tunnel%d' % tunnel_id, u'ipsec-policy-id': u'%d' % ipsec_policy_id, @@ -1204,8 +1216,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): def test_status_when_no_tunnels_exist(self): """Get status, when there are no tunnels.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.get_none): + with httmock.HTTMock(csr_request.token, + csr_request.get_none): tunnels = self.csr.read_tunnel_statuses() self.assertEqual(requests.codes.OK, self.csr.status) self.assertEqual([], tunnels) @@ -1215,9 +1227,9 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): # Create the IPsec site-to-site connection first tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create() tunnel_id = 123 # Must hard code to work with mock - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.normal_get): connection_info = { u'vpn-interface-name': u'Tunnel123', u'ipsec-policy-id': u'%d' % ipsec_policy_id, @@ -1232,8 +1244,8 @@ class TestCsrRestIPSecConnectionCreate(base.BaseTestCase): self.assertEqual(requests.codes.CREATED, self.csr.status) self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id, location) - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.normal_get): tunnels = self.csr.read_tunnel_statuses() self.assertEqual(requests.codes.OK, self.csr.status) self.assertEqual([(u'Tunnel123', u'DOWN-NEGOTIATING'), ], tunnels) @@ -1250,8 +1262,8 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase): """ def _save_dpd_info(self): - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.normal_get): details = self.csr.get_request('vpn-svc/ike/keepalive') if self.csr.status == requests.codes.OK: self.dpd = details @@ -1260,8 +1272,8 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase): self.fail("Unable to save original DPD info") def _restore_dpd_info(self): - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.put): + with httmock.HTTMock(csr_request.token, + csr_request.put): payload = {'interval': self.dpd['interval'], 'retry': self.dpd['retry']} self.csr.put_request('vpn-svc/ike/keepalive', payload=payload) @@ -1270,16 +1282,17 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): super(TestCsrRestIkeKeepaliveCreate, self).setUp() - self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco', - timeout) + info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) self._save_dpd_info() self.csr.token = None def test_configure_ike_keepalive(self): """Set IKE keep-alive (aka Dead Peer Detection) for the CSR.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.put, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.put, + csr_request.normal_get): keepalive_info = {'interval': 60, 'retry': 4} self.csr.configure_ike_keepalive(keepalive_info) self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) @@ -1291,10 +1304,10 @@ class TestCsrRestIkeKeepaliveCreate(base.BaseTestCase): def test_disable_ike_keepalive(self): """Disable IKE keep-alive (aka Dead Peer Detection) for the CSR.""" - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.delete, - device_drivers.csr_request.put, - device_drivers.csr_request.get_not_configured): + with httmock.HTTMock(csr_request.token, + csr_request.delete, + csr_request.put, + csr_request.get_not_configured): keepalive_info = {'interval': 0, 'retry': 4} self.csr.configure_ike_keepalive(keepalive_info) self.assertEqual(requests.codes.NO_CONTENT, self.csr.status) @@ -1310,17 +1323,18 @@ class TestCsrRestStaticRoute(base.BaseTestCase): def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None): super(TestCsrRestStaticRoute, self).setUp() - self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco', - timeout) + info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip, + 'username': 'stack', 'password': 'cisco', 'timeout': timeout} + self.csr = csr_client.CsrRestClient(info) def test_create_delete_static_route(self): """Create and then delete a static route for the tunnel.""" cidr = u'10.1.0.0/24' interface = u'GigabitEthernet1' expected_id = '10.1.0.0_24_GigabitEthernet1' - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.post, - device_drivers.csr_request.normal_get): + with httmock.HTTMock(csr_request.token, + csr_request.post, + csr_request.normal_get): route_info = {u'destination-network': cidr, u'outgoing-interface': interface} location = self.csr.create_static_route(route_info) @@ -1336,9 +1350,9 @@ class TestCsrRestStaticRoute(base.BaseTestCase): expected_route.update(route_info) self.assertEqual(expected_route, content) # Now delete and verify that static route is gone - with httmock.HTTMock(device_drivers.csr_request.token, - device_drivers.csr_request.delete, - device_drivers.csr_request.no_such_resource): + with httmock.HTTMock(csr_request.token, + csr_request.delete, + csr_request.no_such_resource): route_id = csr_client.make_route_id(cidr, interface) self.csr.delete_static_route(route_id) self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)