Rework and enable VPNaaS UT for Cisco CSR REST
The Cisco CSR REST client library unit tests were developed in Icehouse, using the httmock library. However, the community did not want to add this library to global requirements, as there was a similar httpretty library available (albeit with some short- comings). As a result, the test module was renamed with a "no" prefix, to prevent inclusion in automated tests. Since then, a new library, requests-mock, has been added to global requirements, to replace httpretty, and is being used on several other projects. This commit reworks the unit test to use requests-mock, instead of httmock. The functionality is the same, but the mechanism (a fixture with URI registration vs context manager) is different. This commit provides coverage for the REST client code, by using a mock for the Cisco CSR VM. The unit test module can be subclassed, and used with a real CSR VM, for 3rd party CI testing, in the future. Change-Id: I55c8a253eb32985bc2016ae748b1ded58d021e1a Closes-Bug: 1358470
This commit is contained in:
parent
7b5d47d10a
commit
1683bea1c6
@ -1,577 +0,0 @@
|
||||
# Copyright 2014 Cisco Systems, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Mock REST requests to Cisco Cloud Services Router."""
|
||||
|
||||
import re
|
||||
|
||||
import functools
|
||||
# TODO(pcm): Remove when switch to requests-mock package. Comment out, if use
|
||||
# local copy of httmock.py source. Needed for PEP8.
|
||||
import httmock
|
||||
import requests
|
||||
from requests import exceptions as r_exc
|
||||
|
||||
from neutron.openstack.common import log as logging
|
||||
# TODO(pcm) Remove once httmock package is added to test-requirements. For
|
||||
# now, uncomment and include httmock source to unit test.
|
||||
# from neutron.tests.unit.services.vpn.device_drivers import httmock
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def repeat(n):
|
||||
"""Decorator to limit the number of times a handler is called.
|
||||
|
||||
Will allow the wrapped function (handler) to be called 'n' times.
|
||||
After that, this will return None for any additional calls,
|
||||
allowing other handlers, if any, to be invoked.
|
||||
"""
|
||||
|
||||
class static:
|
||||
retries = n
|
||||
|
||||
def decorator(func):
|
||||
@functools.wraps(func)
|
||||
def wrapped(*args, **kwargs):
|
||||
if static.retries == 0:
|
||||
return None
|
||||
static.retries -= 1
|
||||
return func(*args, **kwargs)
|
||||
return wrapped
|
||||
return decorator
|
||||
|
||||
|
||||
def filter_request(methods, resource):
|
||||
"""Decorator to invoke handler once for a specific resource.
|
||||
|
||||
This will call the handler only for a specific resource using
|
||||
a specific method(s). Any other resource request or method will
|
||||
return None, allowing other handlers, if any, to be invoked.
|
||||
"""
|
||||
|
||||
class static:
|
||||
target_methods = [m.upper() for m in methods]
|
||||
target_resource = resource
|
||||
|
||||
def decorator(func):
|
||||
@functools.wraps(func)
|
||||
def wrapped(*args, **kwargs):
|
||||
if (args[1].method in static.target_methods and
|
||||
static.target_resource in args[0].path):
|
||||
return func(*args, **kwargs)
|
||||
else:
|
||||
return None # Not for this resource
|
||||
return wrapped
|
||||
return decorator
|
||||
|
||||
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def token(url, request):
|
||||
if 'auth/token-services' in url.path:
|
||||
return {'status_code': requests.codes.OK,
|
||||
'content': {'token-id': 'dummy-token'}}
|
||||
|
||||
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def token_unauthorized(url, request):
|
||||
if 'auth/token-services' in url.path:
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
|
||||
|
||||
@httmock.urlmatch(netloc=r'wrong-host')
|
||||
def token_wrong_host(url, request):
|
||||
raise r_exc.ConnectionError()
|
||||
|
||||
|
||||
@httmock.all_requests
|
||||
def token_timeout(url, request):
|
||||
raise r_exc.Timeout()
|
||||
|
||||
|
||||
@filter_request(['get'], 'global/host-name')
|
||||
@httmock.all_requests
|
||||
def timeout(url, request):
|
||||
"""Simulated timeout of a normal request."""
|
||||
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
raise r_exc.Timeout()
|
||||
|
||||
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def no_such_resource(url, request):
|
||||
"""Indicate not found error, when invalid resource requested."""
|
||||
return {'status_code': requests.codes.NOT_FOUND}
|
||||
|
||||
|
||||
@filter_request(['get'], 'global/host-name')
|
||||
@repeat(1)
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def expired_request(url, request):
|
||||
"""Simulate access denied failure on first request for this resource.
|
||||
|
||||
Intent here is to simulate that the token has expired, by failing
|
||||
the first request to the resource. Because of the repeat=1, this
|
||||
will only be called once, and subsequent calls will not be handled
|
||||
by this function, but instead will access the normal handler and
|
||||
will pass. Currently configured for a GET request, but will work
|
||||
with POST and PUT as well. For DELETE, would need to filter_request on a
|
||||
different resource (e.g. 'global/local-users')
|
||||
"""
|
||||
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
|
||||
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def normal_get(url, request):
|
||||
if request.method != 'GET':
|
||||
return
|
||||
LOG.debug("GET mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
if 'global/host-name' in url.path:
|
||||
content = {u'kind': u'object#host-name',
|
||||
u'host-name': u'Router'}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
if 'global/local-users' in url.path:
|
||||
content = {u'kind': u'collection#local-user',
|
||||
u'users': ['peter', 'paul', 'mary']}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
if 'interfaces/GigabitEthernet' in url.path:
|
||||
actual_interface = url.path.split('/')[-1]
|
||||
ip = actual_interface[-1]
|
||||
content = {u'kind': u'object#interface',
|
||||
u'description': u'Changed description',
|
||||
u'if-name': actual_interface,
|
||||
u'proxy-arp': True,
|
||||
u'subnet-mask': u'255.255.255.0',
|
||||
u'icmp-unreachable': True,
|
||||
u'nat-direction': u'',
|
||||
u'icmp-redirects': True,
|
||||
u'ip-address': u'192.168.200.%s' % ip,
|
||||
u'verify-unicast-source': False,
|
||||
u'type': u'ethernet'}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
if 'vpn-svc/ike/policies/2' in url.path:
|
||||
content = {u'kind': u'object#ike-policy',
|
||||
u'priority-id': u'2',
|
||||
u'version': u'v1',
|
||||
u'local-auth-method': u'pre-share',
|
||||
u'encryption': u'aes256',
|
||||
u'hash': u'sha',
|
||||
u'dhGroup': 5,
|
||||
u'lifetime': 3600}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
if 'vpn-svc/ike/keyrings' in url.path:
|
||||
content = {u'kind': u'object#ike-keyring',
|
||||
u'keyring-name': u'5',
|
||||
u'pre-shared-key-list': [
|
||||
{u'key': u'super-secret',
|
||||
u'encrypted': False,
|
||||
u'peer-address': u'10.10.10.20 255.255.255.0'}
|
||||
]}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
if 'vpn-svc/ipsec/policies/' in url.path:
|
||||
ipsec_policy_id = url.path.split('/')[-1]
|
||||
content = {u'kind': u'object#ipsec-policy',
|
||||
u'mode': u'tunnel',
|
||||
u'policy-id': u'%s' % ipsec_policy_id,
|
||||
u'protection-suite': {
|
||||
u'esp-encryption': u'esp-256-aes',
|
||||
u'esp-authentication': u'esp-sha-hmac',
|
||||
u'ah': u'ah-sha-hmac',
|
||||
},
|
||||
u'anti-replay-window-size': u'Disable',
|
||||
u'lifetime-sec': 120,
|
||||
u'pfs': u'group5',
|
||||
u'lifetime-kb': 4608000,
|
||||
u'idle-time': None}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
if 'vpn-svc/site-to-site/Tunnel' in url.path:
|
||||
tunnel = url.path.split('/')[-1]
|
||||
# Use same number, to allow mock to generate IPSec policy ID
|
||||
ipsec_policy_id = tunnel[6:]
|
||||
content = {u'kind': u'object#vpn-site-to-site',
|
||||
u'vpn-interface-name': u'%s' % tunnel,
|
||||
u'ip-version': u'ipv4',
|
||||
u'vpn-type': u'site-to-site',
|
||||
u'ipsec-policy-id': u'%s' % ipsec_policy_id,
|
||||
u'ike-profile-id': None,
|
||||
u'mtu': 1500,
|
||||
u'local-device': {
|
||||
u'ip-address': '10.3.0.1/24',
|
||||
u'tunnel-ip-address': '10.10.10.10'
|
||||
},
|
||||
u'remote-device': {
|
||||
u'tunnel-ip-address': '10.10.10.20'
|
||||
}}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
if 'vpn-svc/ike/keepalive' in url.path:
|
||||
content = {u'interval': 60,
|
||||
u'retry': 4,
|
||||
u'periodic': True}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
if 'routing-svc/static-routes' in url.path:
|
||||
content = {u'destination-network': u'10.1.0.0/24',
|
||||
u'kind': u'object#static-route',
|
||||
u'next-hop-router': None,
|
||||
u'outgoing-interface': u'GigabitEthernet1',
|
||||
u'admin-distance': 1}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
if 'vpn-svc/site-to-site/active/sessions' in url.path:
|
||||
# Only including needed fields for mock
|
||||
content = {u'kind': u'collection#vpn-active-sessions',
|
||||
u'items': [{u'status': u'DOWN-NEGOTIATING',
|
||||
u'vpn-interface-name': u'Tunnel123'}, ]}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
|
||||
|
||||
@filter_request(['get'], 'vpn-svc/ike/keyrings')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def get_fqdn(url, request):
|
||||
LOG.debug("GET FQDN mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
content = {u'kind': u'object#ike-keyring',
|
||||
u'keyring-name': u'5',
|
||||
u'pre-shared-key-list': [
|
||||
{u'key': u'super-secret',
|
||||
u'encrypted': False,
|
||||
u'peer-address': u'cisco.com'}
|
||||
]}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
|
||||
|
||||
@filter_request(['get'], 'vpn-svc/ipsec/policies/')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def get_no_ah(url, request):
|
||||
LOG.debug("GET No AH mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
ipsec_policy_id = url.path.split('/')[-1]
|
||||
content = {u'kind': u'object#ipsec-policy',
|
||||
u'mode': u'tunnel',
|
||||
u'anti-replay-window-size': u'128',
|
||||
u'policy-id': u'%s' % ipsec_policy_id,
|
||||
u'protection-suite': {
|
||||
u'esp-encryption': u'esp-aes',
|
||||
u'esp-authentication': u'esp-sha-hmac',
|
||||
},
|
||||
u'lifetime-sec': 120,
|
||||
u'pfs': u'group5',
|
||||
u'lifetime-kb': 4608000,
|
||||
u'idle-time': None}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
|
||||
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def get_defaults(url, request):
|
||||
if request.method != 'GET':
|
||||
return
|
||||
LOG.debug("GET mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
if 'vpn-svc/ike/policies/2' in url.path:
|
||||
content = {u'kind': u'object#ike-policy',
|
||||
u'priority-id': u'2',
|
||||
u'version': u'v1',
|
||||
u'local-auth-method': u'pre-share',
|
||||
u'encryption': u'des',
|
||||
u'hash': u'sha',
|
||||
u'dhGroup': 1,
|
||||
u'lifetime': 86400}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
if 'vpn-svc/ipsec/policies/' in url.path:
|
||||
ipsec_policy_id = url.path.split('/')[-1]
|
||||
content = {u'kind': u'object#ipsec-policy',
|
||||
u'mode': u'tunnel',
|
||||
u'policy-id': u'%s' % ipsec_policy_id,
|
||||
u'protection-suite': {},
|
||||
u'lifetime-sec': 3600,
|
||||
u'pfs': u'Disable',
|
||||
u'anti-replay-window-size': u'None',
|
||||
u'lifetime-kb': 4608000,
|
||||
u'idle-time': None}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
|
||||
|
||||
@filter_request(['get'], 'vpn-svc/site-to-site')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def get_unnumbered(url, request):
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
tunnel = url.path.split('/')[-1]
|
||||
ipsec_policy_id = tunnel[6:]
|
||||
content = {u'kind': u'object#vpn-site-to-site',
|
||||
u'vpn-interface-name': u'%s' % tunnel,
|
||||
u'ip-version': u'ipv4',
|
||||
u'vpn-type': u'site-to-site',
|
||||
u'ipsec-policy-id': u'%s' % ipsec_policy_id,
|
||||
u'ike-profile-id': None,
|
||||
u'mtu': 1500,
|
||||
u'local-device': {
|
||||
u'ip-address': u'GigabitEthernet3',
|
||||
u'tunnel-ip-address': u'10.10.10.10'
|
||||
},
|
||||
u'remote-device': {
|
||||
u'tunnel-ip-address': u'10.10.10.20'
|
||||
}}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
|
||||
|
||||
@filter_request(['get'], 'vpn-svc/site-to-site/Tunnel')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def get_admin_down(url, request):
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
# URI has .../Tunnel#/state, so get number from 2nd to last element
|
||||
tunnel = url.path.split('/')[-2]
|
||||
content = {u'kind': u'object#vpn-site-to-site-state',
|
||||
u'vpn-interface-name': u'%s' % tunnel,
|
||||
u'line-protocol-state': u'down',
|
||||
u'enabled': False}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
|
||||
|
||||
@filter_request(['get'], 'vpn-svc/site-to-site/Tunnel')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def get_admin_up(url, request):
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
# URI has .../Tunnel#/state, so get number from 2nd to last element
|
||||
tunnel = url.path.split('/')[-2]
|
||||
content = {u'kind': u'object#vpn-site-to-site-state',
|
||||
u'vpn-interface-name': u'%s' % tunnel,
|
||||
u'line-protocol-state': u'down',
|
||||
u'enabled': True}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
|
||||
|
||||
@filter_request(['get'], 'vpn-svc/site-to-site')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def get_mtu(url, request):
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
tunnel = url.path.split('/')[-1]
|
||||
ipsec_policy_id = tunnel[6:]
|
||||
content = {u'kind': u'object#vpn-site-to-site',
|
||||
u'vpn-interface-name': u'%s' % tunnel,
|
||||
u'ip-version': u'ipv4',
|
||||
u'vpn-type': u'site-to-site',
|
||||
u'ipsec-policy-id': u'%s' % ipsec_policy_id,
|
||||
u'ike-profile-id': None,
|
||||
u'mtu': 9192,
|
||||
u'local-device': {
|
||||
u'ip-address': u'10.3.0.1/24',
|
||||
u'tunnel-ip-address': u'10.10.10.10'
|
||||
},
|
||||
u'remote-device': {
|
||||
u'tunnel-ip-address': u'10.10.10.20'
|
||||
}}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
|
||||
|
||||
@filter_request(['get'], 'vpn-svc/ike/keepalive')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def get_not_configured(url, request):
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
return {'status_code': requests.codes.NOT_FOUND}
|
||||
|
||||
|
||||
@filter_request(['get'], 'vpn-svc/site-to-site/active/sessions')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def get_none(url, request):
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
content = {u'kind': u'collection#vpn-active-sessions',
|
||||
u'items': []}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
|
||||
|
||||
@filter_request(['get'], 'interfaces/GigabitEthernet3')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def get_local_ip(url, request):
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
content = {u'kind': u'object#interface',
|
||||
u'subnet-mask': u'255.255.255.0',
|
||||
u'ip-address': u'10.5.0.2'}
|
||||
return httmock.response(requests.codes.OK, content=content)
|
||||
|
||||
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def post(url, request):
|
||||
if request.method != 'POST':
|
||||
return
|
||||
LOG.debug("POST mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
if 'interfaces/GigabitEthernet' in url.path:
|
||||
return {'status_code': requests.codes.NO_CONTENT}
|
||||
if 'global/local-users' in url.path:
|
||||
if 'username' not in request.body:
|
||||
return {'status_code': requests.codes.BAD_REQUEST}
|
||||
if '"privilege": 20' in request.body:
|
||||
return {'status_code': requests.codes.BAD_REQUEST}
|
||||
headers = {'location': '%s/test-user' % url.geturl()}
|
||||
return httmock.response(requests.codes.CREATED, headers=headers)
|
||||
if 'vpn-svc/ike/policies' in url.path:
|
||||
headers = {'location': "%s/2" % url.geturl()}
|
||||
return httmock.response(requests.codes.CREATED, headers=headers)
|
||||
if 'vpn-svc/ipsec/policies' in url.path:
|
||||
m = re.search(r'"policy-id": "(\S+)"', request.body)
|
||||
if m:
|
||||
headers = {'location': "%s/%s" % (url.geturl(), m.group(1))}
|
||||
return httmock.response(requests.codes.CREATED, headers=headers)
|
||||
return {'status_code': requests.codes.BAD_REQUEST}
|
||||
if 'vpn-svc/ike/keyrings' in url.path:
|
||||
headers = {'location': "%s/5" % url.geturl()}
|
||||
return httmock.response(requests.codes.CREATED, headers=headers)
|
||||
if 'vpn-svc/site-to-site' in url.path:
|
||||
m = re.search(r'"vpn-interface-name": "(\S+)"', request.body)
|
||||
if m:
|
||||
headers = {'location': "%s/%s" % (url.geturl(), m.group(1))}
|
||||
return httmock.response(requests.codes.CREATED, headers=headers)
|
||||
return {'status_code': requests.codes.BAD_REQUEST}
|
||||
if 'routing-svc/static-routes' in url.path:
|
||||
headers = {'location':
|
||||
"%s/10.1.0.0_24_GigabitEthernet1" % url.geturl()}
|
||||
return httmock.response(requests.codes.CREATED, headers=headers)
|
||||
|
||||
|
||||
@filter_request(['post'], 'global/local-users')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def post_change_attempt(url, request):
|
||||
LOG.debug("POST change value mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
return {'status_code': requests.codes.NOT_FOUND,
|
||||
'content': {
|
||||
u'error-code': -1,
|
||||
u'error-message': u'user test-user already exists'}}
|
||||
|
||||
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def post_duplicate(url, request):
|
||||
LOG.debug("POST duplicate mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
return {'status_code': requests.codes.BAD_REQUEST,
|
||||
'content': {
|
||||
u'error-code': -1,
|
||||
u'error-message': u'policy 2 exist, not allow to '
|
||||
u'update policy using POST method'}}
|
||||
|
||||
|
||||
@filter_request(['post'], 'vpn-svc/site-to-site')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def post_missing_ipsec_policy(url, request):
|
||||
LOG.debug("POST missing ipsec policy mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
return {'status_code': requests.codes.BAD_REQUEST}
|
||||
|
||||
|
||||
@filter_request(['post'], 'vpn-svc/site-to-site')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def post_missing_ike_policy(url, request):
|
||||
LOG.debug("POST missing ike policy mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
return {'status_code': requests.codes.BAD_REQUEST}
|
||||
|
||||
|
||||
@filter_request(['post'], 'vpn-svc/site-to-site')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def post_bad_ip(url, request):
|
||||
LOG.debug("POST bad IP mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
return {'status_code': requests.codes.BAD_REQUEST}
|
||||
|
||||
|
||||
@filter_request(['post'], 'vpn-svc/site-to-site')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def post_bad_mtu(url, request):
|
||||
LOG.debug("POST bad mtu mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
return {'status_code': requests.codes.BAD_REQUEST}
|
||||
|
||||
|
||||
@filter_request(['post'], 'vpn-svc/ipsec/policies')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def post_bad_lifetime(url, request):
|
||||
LOG.debug("POST bad lifetime mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
return {'status_code': requests.codes.BAD_REQUEST}
|
||||
|
||||
|
||||
@filter_request(['post'], 'vpn-svc/ipsec/policies')
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def post_bad_name(url, request):
|
||||
LOG.debug("POST bad IPSec policy name for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
return {'status_code': requests.codes.BAD_REQUEST}
|
||||
|
||||
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def put(url, request):
|
||||
if request.method != 'PUT':
|
||||
return
|
||||
LOG.debug("PUT mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
# Any resource
|
||||
return {'status_code': requests.codes.NO_CONTENT}
|
||||
|
||||
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def delete(url, request):
|
||||
if request.method != 'DELETE':
|
||||
return
|
||||
LOG.debug("DELETE mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
# Any resource
|
||||
return {'status_code': requests.codes.NO_CONTENT}
|
||||
|
||||
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def delete_unknown(url, request):
|
||||
if request.method != 'DELETE':
|
||||
return
|
||||
LOG.debug("DELETE unknown mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
# Any resource
|
||||
return {'status_code': requests.codes.NOT_FOUND,
|
||||
'content': {
|
||||
u'error-code': -1,
|
||||
u'error-message': 'user unknown not found'}}
|
||||
|
||||
|
||||
@httmock.urlmatch(netloc=r'localhost')
|
||||
def delete_not_allowed(url, request):
|
||||
if request.method != 'DELETE':
|
||||
return
|
||||
LOG.debug("DELETE not allowed mock for %s", url)
|
||||
if not request.headers.get('X-auth-token', None):
|
||||
return {'status_code': requests.codes.UNAUTHORIZED}
|
||||
# Any resource
|
||||
return {'status_code': requests.codes.METHOD_NOT_ALLOWED}
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user