Remove Deprecated AuthMiddleware

* Remove nova.api.openstack.auth:AuthMiddleware
* Remove associated tests
* Remove 'deprecated' api pipelines
* Partially implements bp remove-deprecated-auth

Change-Id: Ifcfac573a569f982c2626bbd0bc740d3c908c9fa
This commit is contained in:
Brian Waldon 2012-05-07 14:58:15 -07:00
parent f729925f5a
commit dac602738e
11 changed files with 8 additions and 488 deletions

View File

@ -93,23 +93,18 @@ use = call:nova.api.openstack.urlmap:urlmap_factory
[composite:openstack_compute_api_v2]
use = call:nova.api.auth:pipeline_factory
noauth = faultwrap sizelimit noauth ratelimit osapi_compute_app_v2
deprecated = faultwrap sizelimit auth ratelimit osapi_compute_app_v2
keystone = faultwrap sizelimit authtoken keystonecontext ratelimit osapi_compute_app_v2
keystone_nolimit = faultwrap sizelimit authtoken keystonecontext osapi_compute_app_v2
[composite:openstack_volume_api_v1]
use = call:nova.api.auth:pipeline_factory
noauth = faultwrap sizelimit noauth ratelimit osapi_volume_app_v1
deprecated = faultwrap sizelimit auth ratelimit osapi_volume_app_v1
keystone = faultwrap sizelimit authtoken keystonecontext ratelimit osapi_volume_app_v1
keystone_nolimit = faultwrap sizelimit authtoken keystonecontext osapi_volume_app_v1
[filter:faultwrap]
paste.filter_factory = nova.api.openstack:FaultWrapper.factory
[filter:auth]
paste.filter_factory = nova.api.openstack.auth:AuthMiddleware.factory
[filter:noauth]
paste.filter_factory = nova.api.openstack.auth:NoAuthMiddleware.factory

View File

@ -15,21 +15,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import os
import time
import webob.dec
import webob.exc
from nova.api.openstack import wsgi
from nova.auth import manager
from nova import context
from nova import exception
from nova import flags
from nova import log as logging
from nova.openstack.common import importutils
from nova import utils
from nova import wsgi as base_wsgi
LOG = logging.getLogger(__name__)
@ -69,195 +63,3 @@ class NoAuthMiddleware(base_wsgi.Middleware):
req.environ['nova.context'] = ctx
return self.application
class AuthMiddleware(base_wsgi.Middleware):
"""Authorize the openstack API request or return an HTTP Forbidden."""
def __init__(self, application, db_driver=None):
if not db_driver:
db_driver = FLAGS.db_driver
self.db = importutils.import_module(db_driver)
self.auth = manager.AuthManager()
super(AuthMiddleware, self).__init__(application)
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
if not self.has_authentication(req):
return self.authenticate(req)
user_id = self.get_user_by_authentication(req)
if not user_id:
token = req.headers["X-Auth-Token"]
msg = _("%(user_id)s could not be found with token '%(token)s'")
LOG.warn(msg % locals())
return wsgi.Fault(webob.exc.HTTPUnauthorized())
# Get all valid projects for the user
projects = self.auth.get_projects(user_id)
if not projects:
return wsgi.Fault(webob.exc.HTTPUnauthorized())
project_id = ""
path_parts = req.path.split('/')
# TODO(wwolf): this v1.1 check will be temporary as
# keystone should be taking this over at some point
if len(path_parts) > 1 and path_parts[1] in ('v1.1', 'v2'):
project_id = path_parts[2]
# Check that the project for project_id exists, and that user
# is authorized to use it
try:
self.auth.get_project(project_id)
except exception.ProjectNotFound:
return wsgi.Fault(webob.exc.HTTPUnauthorized())
if project_id not in [p.id for p in projects]:
return wsgi.Fault(webob.exc.HTTPUnauthorized())
else:
# As a fallback, set project_id from the headers, which is the v1.0
# behavior. As a last resort, be forgiving to the user and set
# project_id based on a valid project of theirs.
try:
project_id = req.headers["X-Auth-Project-Id"]
except KeyError:
project_id = projects[0].id
is_admin = self.auth.is_admin(user_id)
remote_address = getattr(req, 'remote_address', '127.0.0.1')
if FLAGS.use_forwarded_for:
remote_address = req.headers.get('X-Forwarded-For', remote_address)
ctx = context.RequestContext(user_id,
project_id,
is_admin=is_admin,
remote_address=remote_address)
req.environ['nova.context'] = ctx
if not is_admin and not self.auth.is_project_member(user_id,
project_id):
msg = _("%(user_id)s must be an admin or a "
"member of %(project_id)s")
LOG.warn(msg % locals())
return wsgi.Fault(webob.exc.HTTPUnauthorized())
return self.application
def has_authentication(self, req):
return 'X-Auth-Token' in req.headers
def get_user_by_authentication(self, req):
return self.authorize_token(req.headers["X-Auth-Token"])
def authenticate(self, req):
# Unless the request is explicitly made against /<version>/ don't
# honor it
path_info = req.path_info
if len(path_info) > 1:
msg = _("Authentication requests must be made against a version "
"root (e.g. /v2).")
LOG.warn(msg)
return wsgi.Fault(webob.exc.HTTPUnauthorized(explanation=msg))
def _get_auth_header(key):
"""Ensures that the KeyError returned is meaningful."""
try:
return req.headers[key]
except KeyError:
raise KeyError(key)
try:
username = _get_auth_header('X-Auth-User')
key = _get_auth_header('X-Auth-Key')
except KeyError as ex:
msg = _("Could not find %s in request.") % ex
LOG.warn(msg)
return wsgi.Fault(webob.exc.HTTPUnauthorized(explanation=msg))
token, user = self._authorize_user(username, key, req)
if user and token:
res = webob.Response()
res.headers['X-Auth-Token'] = token['token_hash']
_x_server_url = 'X-Server-Management-Url'
_server_url = 'server_management_url'
res.headers[_x_server_url] = token[_server_url]
if token['storage_url']:
_x_storage_url = 'X-Storage-Url'
_storage_url = 'storage_url'
res.headers[_x_storage_url] = token[_storage_url]
if token['cdn_management_url']:
_x_cdn_url = 'X-CDN-Management-Url'
_cdn_url = 'cdn_management_url'
res.headers[_x_cdn_url] = token[_cdn_url]
res.content_type = 'text/plain'
res.status = '204'
LOG.debug(_("Successfully authenticated '%s'") % username)
return res
else:
return wsgi.Fault(webob.exc.HTTPUnauthorized())
def authorize_token(self, token_hash):
""" retrieves user information from the datastore given a token
If the token has expired, returns None
If the token is not found, returns None
Otherwise returns dict(id=(the authorized user's id))
This method will also remove the token if the timestamp is older than
2 days ago.
"""
ctxt = context.get_admin_context()
try:
token = self.db.auth_token_get(ctxt, token_hash)
except exception.NotFound:
return None
if token:
delta = utils.utcnow() - token['created_at']
if delta.days >= 2:
self.db.auth_token_destroy(ctxt, token['token_hash'])
else:
return token['user_id']
return None
def _authorize_user(self, username, key, req):
"""Generates a new token and assigns it to a user.
username - string
key - string API key
req - wsgi.Request object
"""
ctxt = context.get_admin_context()
project_id = req.headers.get('X-Auth-Project-Id')
if project_id is None:
# If the project_id is not provided in the headers, be forgiving to
# the user and set project_id based on a valid project of theirs.
user = self.auth.get_user_from_access_key(key)
projects = self.auth.get_projects(user.id)
if not projects:
raise webob.exc.HTTPUnauthorized()
project_id = projects[0].id
try:
user = self.auth.get_user_from_access_key(key)
except exception.NotFound:
LOG.warn(_("User not found with provided API key."))
user = None
if user and utils.strcmp_const_time(user.name, username):
token_hash = hashlib.sha1('%s%s%f' % (username, key,
time.time())).hexdigest()
token_dict = {}
token_dict['token_hash'] = token_hash
token_dict['cdn_management_url'] = ''
os_url = req.url.strip('/')
os_url += '/' + project_id
token_dict['server_management_url'] = os_url
token_dict['storage_url'] = ''
token_dict['user_id'] = user.id
token = self.db.auth_token_create(ctxt, token_dict)
return token, user
elif user and user.name != username:
msg = _("Provided API key is valid, but not for user "
"'%(username)s'") % locals()
LOG.warn(msg)
return None, None

View File

@ -224,8 +224,6 @@ class HostController(object):
"""
host = id
context = req.environ['nova.context']
# Expected to use AuthMiddleware.
# Otherwise, non-admin user can use describe-resource
if not context.is_admin:
msg = _("Describe-resource is admin only functionality")
raise webob.exc.HTTPForbidden(explanation=msg)

View File

@ -39,7 +39,6 @@ class AccountsTest(test.TestCase):
fakes.FakeAuthDatabase.data = {}
fakes.stub_out_networking(self.stubs)
fakes.stub_out_rate_limiting(self.stubs)
fakes.stub_out_auth(self.stubs)
fakemgr = fakes.FakeAuthManager()
joeuser = auth_manager.User('id1', 'guy1', 'acc1', 'secret1', False)

View File

@ -95,7 +95,6 @@ class SnapshotApiTest(test.TestCase):
fakes.FakeAuthDatabase.data = {}
fakes.stub_out_networking(self.stubs)
fakes.stub_out_rate_limiting(self.stubs)
fakes.stub_out_auth(self.stubs)
self.stubs.Set(volume.api.API, "create_snapshot", stub_snapshot_create)
self.stubs.Set(volume.api.API, "create_snapshot_force",
stub_snapshot_create)

View File

@ -42,7 +42,6 @@ class UsersTest(test.TestCase):
fakes.FakeAuthDatabase.data = {}
fakes.stub_out_networking(self.stubs)
fakes.stub_out_rate_limiting(self.stubs)
fakes.stub_out_auth(self.stubs)
fakemgr = fakes.FakeAuthManager()
fakemgr.add_user(auth_manager.User('id1', 'guy1',

View File

@ -104,7 +104,6 @@ class VolumeApiTest(test.TestCase):
fakes.FakeAuthDatabase.data = {}
fakes.stub_out_networking(self.stubs)
fakes.stub_out_rate_limiting(self.stubs)
fakes.stub_out_auth(self.stubs)
self.stubs.Set(nova.db, 'volume_get', return_volume)
self.stubs.Set(volume.api.API, "delete", fakes.stub_volume_delete)

View File

@ -15,267 +15,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import webob
import webob.dec
import nova.api.openstack.compute
import nova.auth.manager
from nova.api.openstack import auth
from nova import context
from nova import db
from nova import test
from nova.tests.api.openstack import fakes
class Test(test.TestCase):
def setUp(self):
super(Test, self).setUp()
self.stubs.Set(auth.AuthMiddleware,
'__init__', fakes.fake_auth_init)
self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext)
fakes.FakeAuthManager.clear_fakes()
fakes.FakeAuthDatabase.data = {}
fakes.stub_out_rate_limiting(self.stubs)
fakes.stub_out_networking(self.stubs)
def test_authorize_user(self):
f = fakes.FakeAuthManager()
user = nova.auth.manager.User('id1', 'user1', 'user1_key', None, None)
f.add_user(user)
req = webob.Request.blank('/v2/')
req.headers['X-Auth-User'] = 'user1'
req.headers['X-Auth-Key'] = 'user1_key'
req.headers['X-Auth-Project-Id'] = 'user1_project'
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '204 No Content')
self.assertEqual(len(result.headers['X-Auth-Token']), 40)
def test_authorize_token(self):
f = fakes.FakeAuthManager()
user = nova.auth.manager.User('id1', 'user1', 'user1_key', None, None)
f.add_user(user)
f.create_project('user1_project', user)
req = webob.Request.blank('/v2/', {'HTTP_HOST': 'foo'})
req.headers['X-Auth-User'] = 'user1'
req.headers['X-Auth-Key'] = 'user1_key'
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '204 No Content')
self.assertEqual(len(result.headers['X-Auth-Token']), 40)
self.assertEqual(result.headers['X-Server-Management-Url'],
"http://foo/v2/user1_project")
token = result.headers['X-Auth-Token']
self.stubs.Set(nova.api.openstack.compute, 'APIRouter',
fakes.FakeRouter)
req = webob.Request.blank('/v2/user1_project')
req.headers['X-Auth-Token'] = token
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '200 OK')
self.assertEqual(result.headers['X-Test-Success'], 'True')
def test_token_expiry(self):
self.destroy_called = False
def destroy_token_mock(meh, context, token):
self.destroy_called = True
def bad_token(meh, context, token_hash):
return fakes.FakeToken(
token_hash=token_hash,
created_at=datetime.datetime(1990, 1, 1))
self.stubs.Set(fakes.FakeAuthDatabase, 'auth_token_destroy',
destroy_token_mock)
self.stubs.Set(fakes.FakeAuthDatabase, 'auth_token_get',
bad_token)
req = webob.Request.blank('/v2/')
req.headers['X-Auth-Token'] = 'token_hash'
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '401 Unauthorized')
self.assertEqual(self.destroy_called, True)
def test_authorize_project(self):
f = fakes.FakeAuthManager()
user = nova.auth.manager.User('id1', 'user1', 'user1_key', None, None)
f.add_user(user)
f.create_project('user1_project', user)
f.create_project('user2_project', user)
req = webob.Request.blank('/v2/', {'HTTP_HOST': 'foo'})
req.headers['X-Auth-User'] = 'user1'
req.headers['X-Auth-Key'] = 'user1_key'
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '204 No Content')
token = result.headers['X-Auth-Token']
self.stubs.Set(nova.api.openstack.compute, 'APIRouter',
fakes.FakeRouter)
req = webob.Request.blank('/v2/user2_project')
req.headers['X-Auth-Token'] = token
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '200 OK')
self.assertEqual(result.headers['X-Test-Success'], 'True')
def test_bad_user_bad_key(self):
req = webob.Request.blank('/v2/')
req.headers['X-Auth-User'] = 'unknown_user'
req.headers['X-Auth-Key'] = 'unknown_user_key'
req.headers['X-Auth-Project-Id'] = 'user_project'
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '401 Unauthorized')
def test_bad_user_good_key(self):
f = fakes.FakeAuthManager()
user = nova.auth.manager.User('id1', 'user1', 'user1_key', None, None)
f.add_user(user)
req = webob.Request.blank('/v2/')
req.headers['X-Auth-User'] = 'unknown_user'
req.headers['X-Auth-Key'] = 'user1_key'
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '401 Unauthorized')
def test_no_user(self):
req = webob.Request.blank('/v2/')
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '401 Unauthorized')
def test_bad_token(self):
req = webob.Request.blank('/v2/')
req.headers['X-Auth-Token'] = 'unknown_token'
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '401 Unauthorized')
def test_bad_project(self):
f = fakes.FakeAuthManager()
user1 = nova.auth.manager.User('id1', 'user1', 'user1_key', None, None)
user2 = nova.auth.manager.User('id2', 'user2', 'user2_key', None, None)
f.add_user(user1)
f.add_user(user2)
f.create_project('user1_project', user1)
f.create_project('user2_project', user2)
req = webob.Request.blank('/v2/', {'HTTP_HOST': 'foo'})
req.headers['X-Auth-User'] = 'user1'
req.headers['X-Auth-Key'] = 'user1_key'
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '204 No Content')
token = result.headers['X-Auth-Token']
self.stubs.Set(nova.api.openstack.compute, 'APIRouter',
fakes.FakeRouter)
req = webob.Request.blank('/v2/user2_project')
req.headers['X-Auth-Token'] = token
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '401 Unauthorized')
def test_not_authorized_project(self):
f = fakes.FakeAuthManager()
user1 = nova.auth.manager.User('id1', 'user1', 'user1_key', None, None)
f.add_user(user1)
f.create_project('user1_project', user1)
user2 = nova.auth.manager.User('id2', 'user2', 'user2_key', None, None)
f.add_user(user2)
f.create_project('user2_project', user2)
req = webob.Request.blank('/v2/', {'HTTP_HOST': 'foo'})
req.headers['X-Auth-User'] = 'user1'
req.headers['X-Auth-Key'] = 'user1_key'
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '204 No Content')
token = result.headers['X-Auth-Token']
self.stubs.Set(nova.api.openstack.compute, 'APIRouter',
fakes.FakeRouter)
req = webob.Request.blank('/v2/user2_project')
req.headers['X-Auth-Token'] = token
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '401 Unauthorized')
def test_auth_token_no_empty_headers(self):
f = fakes.FakeAuthManager()
user = nova.auth.manager.User('id1', 'user1', 'user1_key', None, None)
f.add_user(user)
req = webob.Request.blank('/v2/')
req.headers['X-Auth-User'] = 'user1'
req.headers['X-Auth-Key'] = 'user1_key'
req.headers['X-Auth-Project-Id'] = 'user1_project'
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '204 No Content')
self.assertEqual(len(result.headers['X-Auth-Token']), 40)
self.assertFalse('X-CDN-Management-Url' in result.headers)
self.assertFalse('X-Storage-Url' in result.headers)
class TestFunctional(test.TestCase):
def test_token_expiry(self):
ctx = context.get_admin_context()
tok = db.auth_token_create(ctx, dict(
token_hash='test_token_hash',
cdn_management_url='',
server_management_url='',
storage_url='',
user_id='user1',
))
db.auth_token_update(ctx, tok.token_hash, dict(
created_at=datetime.datetime(2000, 1, 1, 12, 0, 0),
))
req = webob.Request.blank('/v2/')
req.headers['X-Auth-Token'] = 'test_token_hash'
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '401 Unauthorized')
def test_token_doesnotexist(self):
req = webob.Request.blank('/v2/')
req.headers['X-Auth-Token'] = 'nonexistant_token_hash'
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '401 Unauthorized')
class TestLimiter(test.TestCase):
def setUp(self):
super(TestLimiter, self).setUp()
self.stubs.Set(auth.AuthMiddleware,
'__init__', fakes.fake_auth_init)
self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext)
fakes.FakeAuthManager.clear_fakes()
fakes.FakeAuthDatabase.data = {}
fakes.stub_out_networking(self.stubs)
def test_authorize_token(self):
f = fakes.FakeAuthManager()
user = nova.auth.manager.User('id1', 'user1', 'user1_key', None, None)
f.add_user(user)
f.create_project('test', user)
req = webob.Request.blank('/v2/')
req.headers['X-Auth-User'] = 'user1'
req.headers['X-Auth-Key'] = 'user1_key'
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(len(result.headers['X-Auth-Token']), 40)
token = result.headers['X-Auth-Token']
self.stubs.Set(nova.api.openstack.compute, 'APIRouter',
fakes.FakeRouter)
req = webob.Request.blank('/v2/test')
req.method = 'POST'
req.headers['X-Auth-Token'] = token
result = req.get_response(fakes.wsgi_app(fake_auth=False))
self.assertEqual(result.status, '200 OK')
self.assertEqual(result.headers['X-Test-Success'], 'True')
class TestNoAuthMiddleware(test.TestCase):
def setUp(self):
@ -291,8 +38,7 @@ class TestNoAuthMiddleware(test.TestCase):
req.headers['X-Auth-User'] = 'user1'
req.headers['X-Auth-Key'] = 'user1_key'
req.headers['X-Auth-Project-Id'] = 'user1_project'
result = req.get_response(fakes.wsgi_app(fake_auth=False,
use_no_auth=True))
result = req.get_response(fakes.wsgi_app(use_no_auth=True))
self.assertEqual(result.status, '204 No Content')
self.assertEqual(result.headers['X-Server-Management-Url'],
"http://localhost/v2/user1_project")
@ -303,8 +49,7 @@ class TestNoAuthMiddleware(test.TestCase):
req.headers['X-Auth-User'] = 'user1'
req.headers['X-Auth-Key'] = 'user1_key'
req.headers['X-Auth-Project-Id'] = 'user1_project'
result = req.get_response(fakes.wsgi_app(fake_auth=False,
use_no_auth=True))
result = req.get_response(fakes.wsgi_app(use_no_auth=True))
self.assertEqual(result.status, '204 No Content')
self.assertEqual(result.headers['X-Server-Management-Url'],
"http://localhost/v2/user1_project")
@ -314,8 +59,7 @@ class TestNoAuthMiddleware(test.TestCase):
req.headers['X-Auth-User'] = 'user1'
req.headers['X-Auth-Key'] = 'user1_key'
req.headers['X-Auth-Project-Id'] = 'user1_project'
result = req.get_response(fakes.wsgi_app(fake_auth=False,
use_no_auth=True))
result = req.get_response(fakes.wsgi_app(use_no_auth=True))
self.assertEqual(result.status, '204 No Content')
self.assertFalse('X-CDN-Management-Url' in result.headers)
self.assertFalse('X-Storage-Url' in result.headers)

View File

@ -56,7 +56,6 @@ class ServerActionsControllerTest(test.TestCase):
def setUp(self):
super(ServerActionsControllerTest, self).setUp()
fakes.stub_out_auth(self.stubs)
self.stubs.Set(nova.db, 'instance_get_by_uuid',
fakes.fake_instance_get(vm_state=vm_states.ACTIVE,
host='fake_host'))

View File

@ -80,7 +80,6 @@ VERSIONS = {
class VersionsTest(test.TestCase):
def setUp(self):
super(VersionsTest, self).setUp()
fakes.stub_out_auth(self.stubs)
self.stubs.Set(versions, 'VERSIONS', VERSIONS)
def test_get_version_list(self):

View File

@ -77,24 +77,21 @@ def fake_wsgi(self, req):
return self.application
def wsgi_app(inner_app_v2=None, fake_auth=True, fake_auth_context=None,
def wsgi_app(inner_app_v2=None, fake_auth_context=None,
use_no_auth=False, ext_mgr=None):
if not inner_app_v2:
inner_app_v2 = compute.APIRouter(ext_mgr)
if fake_auth:
if use_no_auth:
api_v2 = openstack_api.FaultWrapper(auth.NoAuthMiddleware(
limits.RateLimitingMiddleware(inner_app_v2)))
else:
if fake_auth_context is not None:
ctxt = fake_auth_context
else:
ctxt = context.RequestContext('fake', 'fake', auth_token=True)
api_v2 = openstack_api.FaultWrapper(api_auth.InjectContext(ctxt,
limits.RateLimitingMiddleware(inner_app_v2)))
elif use_no_auth:
api_v2 = openstack_api.FaultWrapper(auth.NoAuthMiddleware(
limits.RateLimitingMiddleware(inner_app_v2)))
else:
api_v2 = openstack_api.FaultWrapper(auth.AuthMiddleware(
limits.RateLimitingMiddleware(inner_app_v2)))
mapper = urlmap.URLMap()
mapper['/v2'] = api_v2
@ -131,16 +128,6 @@ def stub_out_image_service(stubs):
lambda: nova.image.fake.FakeImageService())
def stub_out_auth(stubs):
def fake_auth_init(self, app):
self.application = app
stubs.Set(auth.AuthMiddleware,
'__init__', fake_auth_init)
stubs.Set(auth.AuthMiddleware,
'__call__', fake_wsgi)
def stub_out_rate_limiting(stubs):
def fake_rate_init(self, app):
super(limits.RateLimitingMiddleware, self).__init__(app)