compass-core/compass/tests/api/test_auth.py
grace.yu 4c69d120cd Add initial code for user authentication
Change-Id: Ie0c439244f1ae3af707b73ef64b1a411c2aede20
2014-04-14 18:13:04 -07:00

127 lines
4.1 KiB
Python

# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import simplejson as json
import time
import unittest2
os.environ['COMPASS_IGNORE_SETTING'] = 'true'
from compass.utils import setting_wrapper as setting
reload(setting)
from compass.api import app
from compass.api import auth
from compass.api import login_manager
from compass.db import database
from compass.db.model import User
from compass.utils import flags
from compass.utils import logsetting
login_manager.init_app(app)
class AuthTestCase(unittest2.TestCase):
DATABASE_URL = 'sqlite://'
USER_CREDENTIALS = {"email": "admin@abc.com", "password": "admin"}
def setUp(self):
super(AuthTestCase, self).setUp()
logsetting.init()
database.init(self.DATABASE_URL)
database.create_db()
self.test_client = app.test_client()
def tearDown(self):
database.drop_db()
super(AuthTestCase, self).tearDown()
def test_login_logout(self):
url = '/login'
# a. successfully login
data = self.USER_CREDENTIALS
return_value = self.test_client.post(url, data=data,
follow_redirects=True)
self.assertIn("Logged in successfully!", return_value.get_data())
url = '/logout'
return_value = self.test_client.get(url, follow_redirects=True)
self.assertIn("You have logged out!", return_value.get_data())
def test_login_failed(self):
url = '/login'
# a. Failed to login with incorrect user info
data_list = [{"email": "xxx", "password": "admin"},
{"email": "admin@abc.com", "password": "xxx"}]
for data in data_list:
return_value = self.test_client.post(url, data=data,
follow_redirects=True)
self.assertIn("Wrong username or password!",
return_value.get_data())
# b. Inactive user
User.query.filter_by(email="admin@abc.com").update({"active": False})
data = {"email": "admin@abc.com", "password": "admin"}
return_value = self.test_client.post(url, data=data,
follow_redirects=True)
self.assertIn("This username is disabled!", return_value.get_data())
def test_get_token(self):
url = '/token'
# a. Failed to get token by posting incorrect user email
req_data = json.dumps({"email": "xxx", "password": "admin"})
return_value = self.test_client.post(url, data=req_data)
self.assertEqual(401, return_value.status_code)
# b. Success to get token
req_data = json.dumps(self.USER_CREDENTIALS)
return_value = self.test_client.post(url, data=req_data)
resp = json.loads(return_value.get_data())
self.assertIsNotNone(resp['token'])
def test_header_loader(self):
# Get Token
url = '/token'
req_data = json.dumps(self.USER_CREDENTIALS)
return_value = self.test_client.post(url, data=req_data)
token = json.loads(return_value.get_data())['token']
user_id = auth.get_user_info_from_token(token, 50)
self.assertEqual(1, user_id)
# Test on token expiration.
# Sleep 5 seconds but only allow token lifetime of 2 seconds
time.sleep(5)
user_id = auth.get_user_info_from_token(token, 2)
self.assertIsNone(user_id)
# Get None user from the incorrect token
result = auth.get_user_info_from_token("xxx", 50)
self.assertIsNone(result)
if __name__ == '__main__':
flags.init()
logsetting.init()
unittest2.main()