83 lines
2.9 KiB
Python
83 lines
2.9 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import webob
|
|
import webob.dec
|
|
import webob.exc
|
|
|
|
from nova.api import ec2
|
|
from nova import flags
|
|
from nova import test
|
|
|
|
|
|
FLAGS = flags.FLAGS
|
|
|
|
|
|
@webob.dec.wsgify
|
|
def conditional_forbid(req):
|
|
"""Helper wsgi app returns 403 if param 'die' is 1."""
|
|
if 'die' in req.params and req.params['die'] == '1':
|
|
raise webob.exc.HTTPForbidden()
|
|
return 'OK'
|
|
|
|
|
|
class LockoutTestCase(test.TrialTestCase):
|
|
"""Test case for the Lockout middleware."""
|
|
def setUp(self): # pylint: disable-msg=C0103
|
|
self.local_time = 0
|
|
self.lockout = ec2.Lockout(conditional_forbid,
|
|
time_fn=self._constant_time)
|
|
super(LockoutTestCase, self).setUp()
|
|
|
|
def _constant_time(self):
|
|
"""Helper method to force timeouts."""
|
|
return self.local_time
|
|
|
|
def _trigger_lockout(self, access_key):
|
|
"""Send x failed requests where x = lockout_attempts."""
|
|
for i in xrange(FLAGS.lockout_attempts):
|
|
req = webob.Request.blank('/?AWSAccessKeyId=%s&die=1' % access_key)
|
|
self.assertEqual(req.get_response(self.lockout).status_int, 403)
|
|
|
|
def _is_locked_out(self, access_key):
|
|
"""Sends a test request to see if key is locked out."""
|
|
req = webob.Request.blank('/?AWSAccessKeyId=%s' % access_key)
|
|
return (req.get_response(self.lockout).status_int == 403)
|
|
|
|
def _timeout(self):
|
|
"""Increment time to 1 second past the lockout."""
|
|
self.local_time = 1 + self.local_time + FLAGS.lockout_minutes * 60
|
|
|
|
def test_lockout(self):
|
|
self._trigger_lockout('test')
|
|
self.assertTrue(self._is_locked_out('test'))
|
|
|
|
def test_timeout(self):
|
|
self._trigger_lockout('test')
|
|
self.assertTrue(self._is_locked_out('test'))
|
|
self._timeout()
|
|
self.assertFalse(self._is_locked_out('test'))
|
|
|
|
def test_multiple_keys(self):
|
|
self._trigger_lockout('test1')
|
|
self.assertTrue(self._is_locked_out('test1'))
|
|
self.assertFalse(self._is_locked_out('test2'))
|
|
self._timeout()
|
|
self.assertFalse(self._is_locked_out('test1'))
|
|
self.assertFalse(self._is_locked_out('test2'))
|