86 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			86 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 | 
						|
 | 
						|
# Copyright 2010 United States Government as represented by the
 | 
						|
# Administrator of the National Aeronautics and Space Administration.
 | 
						|
# All Rights Reserved.
 | 
						|
#
 | 
						|
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
						|
#    not use this file except in compliance with the License. You may obtain
 | 
						|
#    a copy of the License at
 | 
						|
#
 | 
						|
#         http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
#    Unless required by applicable law or agreed to in writing, software
 | 
						|
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
						|
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
						|
#    License for the specific language governing permissions and limitations
 | 
						|
#    under the License.
 | 
						|
 | 
						|
import webob
 | 
						|
import webob.dec
 | 
						|
import webob.exc
 | 
						|
 | 
						|
from nova.api import ec2
 | 
						|
from nova import flags
 | 
						|
from nova import test
 | 
						|
from nova import utils
 | 
						|
 | 
						|
 | 
						|
FLAGS = flags.FLAGS
 | 
						|
 | 
						|
 | 
						|
@webob.dec.wsgify
 | 
						|
def conditional_forbid(req):
 | 
						|
    """Helper wsgi app returns 403 if param 'die' is 1."""
 | 
						|
    if 'die' in req.params and req.params['die'] == '1':
 | 
						|
        raise webob.exc.HTTPForbidden()
 | 
						|
    return 'OK'
 | 
						|
 | 
						|
 | 
						|
class LockoutTestCase(test.TestCase):
 | 
						|
    """Test case for the Lockout middleware."""
 | 
						|
    def setUp(self):  # pylint: disable=C0103
 | 
						|
        super(LockoutTestCase, self).setUp()
 | 
						|
        utils.set_time_override()
 | 
						|
        self.lockout = ec2.Lockout(conditional_forbid)
 | 
						|
 | 
						|
    def tearDown(self):  # pylint: disable=C0103
 | 
						|
        utils.clear_time_override()
 | 
						|
        super(LockoutTestCase, self).tearDown()
 | 
						|
 | 
						|
    def _send_bad_attempts(self, access_key, num_attempts=1):
 | 
						|
        """Fail x."""
 | 
						|
        for i in xrange(num_attempts):
 | 
						|
            req = webob.Request.blank('/?AWSAccessKeyId=%s&die=1' % access_key)
 | 
						|
            self.assertEqual(req.get_response(self.lockout).status_int, 403)
 | 
						|
 | 
						|
    def _is_locked_out(self, access_key):
 | 
						|
        """Sends a test request to see if key is locked out."""
 | 
						|
        req = webob.Request.blank('/?AWSAccessKeyId=%s' % access_key)
 | 
						|
        return (req.get_response(self.lockout).status_int == 403)
 | 
						|
 | 
						|
    def test_lockout(self):
 | 
						|
        self._send_bad_attempts('test', FLAGS.lockout_attempts)
 | 
						|
        self.assertTrue(self._is_locked_out('test'))
 | 
						|
 | 
						|
    def test_timeout(self):
 | 
						|
        self._send_bad_attempts('test', FLAGS.lockout_attempts)
 | 
						|
        self.assertTrue(self._is_locked_out('test'))
 | 
						|
        utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
 | 
						|
        self.assertFalse(self._is_locked_out('test'))
 | 
						|
 | 
						|
    def test_multiple_keys(self):
 | 
						|
        self._send_bad_attempts('test1', FLAGS.lockout_attempts)
 | 
						|
        self.assertTrue(self._is_locked_out('test1'))
 | 
						|
        self.assertFalse(self._is_locked_out('test2'))
 | 
						|
        utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
 | 
						|
        self.assertFalse(self._is_locked_out('test1'))
 | 
						|
        self.assertFalse(self._is_locked_out('test2'))
 | 
						|
 | 
						|
    def test_window_timeout(self):
 | 
						|
        self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
 | 
						|
        self.assertFalse(self._is_locked_out('test'))
 | 
						|
        utils.advance_time_seconds(FLAGS.lockout_window * 60)
 | 
						|
        self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
 | 
						|
        self.assertFalse(self._is_locked_out('test'))
 |