Add tempest tests for keystonev3 driver

This commit adds tempest tests for keystonev3 driver

Also removes/fixes broken columns in the `users` table.

Closes-Bug:#1689471
Change-Id: I8ca8d2121b167f884ac2282a30a6214ba29cb56e
This commit is contained in:
Anusha Ramineni 2017-05-09 11:40:23 +05:30
parent 37f96d81cf
commit cb9399177f
4 changed files with 235 additions and 23 deletions

View File

@ -42,19 +42,15 @@ class KeystoneV3Driver(datasource_driver.PollingDataSourceDriver,
'field-translators':
({'fieldname': 'id', 'desc': 'The ID for the user.',
'translator': value_trans},
{'fieldname': 'description', 'desc': 'user description',
'translator': value_trans},
{'fieldname': 'name', 'desc': 'username, unique within domain',
'translator': value_trans},
{'fieldname': 'enabled', 'desc': 'user is enabled or not',
'translator': value_trans},
{'fieldname': 'project_id',
{'fieldname': 'default_project_id',
'desc': 'ID of the default project for the user',
'translator': value_trans},
{'fieldname': 'domain_id',
'desc': 'The ID of the domain for the user.',
'translator': value_trans},
{'fieldname': 'email', 'desc': 'email address for the user',
'translator': value_trans})}
roles_translator = {

View File

@ -40,20 +40,18 @@ class TestKeystoneDriver(base.TestCase):
self.domains = mock.MagicMock()
self.users_data = [
ResponseObj({'id': '00f2c34a156c40058004ee8eb3320e04',
'description': 'test user 1',
'name': 'alice',
'enabled': True,
'project_id': '019b18a15f2a44c1880d57704b2c4009',
'domain_id': 'default',
'email': 'alice@foo.com'}),
ResponseObj({'id': 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
'description': 'test user 2',
'name': 'bob',
'enabled': False,
'project_id': 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
'domain_id': 'default',
'email': 'bob@bar.edu'})]
ResponseObj({
'id': '00f2c34a156c40058004ee8eb3320e04',
'name': 'alice',
'enabled': True,
'default_project_id': '019b18a15f2a44c1880d57704b2c4009',
'domain_id': 'default'}),
ResponseObj({
'id': 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
'name': 'bob',
'enabled': False,
'default_project_id': 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
'domain_id': 'default'})]
self.roles_data = [
ResponseObj({'id': 'cccccccccccccccccccccccccccccccc',
@ -103,12 +101,12 @@ class TestKeystoneDriver(base.TestCase):
self.assertEqual(2, len(user_list))
# Check an individual user entry
self.assertIn(('00f2c34a156c40058004ee8eb3320e04', 'test user 1',
self.assertIn(('00f2c34a156c40058004ee8eb3320e04',
'alice', 'True', '019b18a15f2a44c1880d57704b2c4009',
'default', 'alice@foo.com'), user_list)
self.assertIn(('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb', 'test user 2',
'default'), user_list)
self.assertIn(('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
'bob', 'False', 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
'default', 'bob@bar.edu'), user_list)
'default'), user_list)
def test_list_roles(self):
"""Test conversion of complex role objects to tables."""

View File

@ -0,0 +1,205 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import clients
from tempest import config
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions
from tempest import test
from congress_tempest_tests.tests.scenario import manager_congress
CONF = config.CONF
class TestKeystoneV3Driver(manager_congress.ScenarioPolicyBase):
@classmethod
def skip_checks(cls):
super(TestKeystoneV3Driver, cls).skip_checks()
if not (CONF.network.project_networks_reachable or
CONF.network.public_network_id):
msg = ('Either project_networks_reachable must be "true", or '
'public_network_id must be defined.')
cls.enabled = False
raise cls.skipException(msg)
def setUp(cls):
super(TestKeystoneV3Driver, cls).setUp()
cls.os = clients.Manager(cls.admin_manager.auth_provider.credentials)
cls.keystone = cls.os.identity_v3_client
cls.projects_client = cls.os.projects_client
cls.domains_client = cls.os.domains_client
cls.roles_client = cls.os.roles_v3_client
cls.users_client = cls.os.users_v3_client
cls.datasource_id = manager_congress.get_datasource_id(
cls.admin_manager.congress_client, 'keystonev3')
@test.attr(type='smoke')
def test_keystone_users_table(self):
user_schema = (
self.admin_manager.congress_client.show_datasource_table_schema(
self.datasource_id, 'users')['columns'])
user_id_col = next(i for i, c in enumerate(user_schema)
if c['name'] == 'id')
def _check_data_table_keystone_users():
# Fetch data from keystone each time, because this test may start
# before keystone has all the users.
users = self.users_client.list_users()['users']
user_map = {}
for user in users:
user_map[user['id']] = user
results = (
self.admin_manager.congress_client.list_datasource_rows(
self.datasource_id, 'users'))
for row in results['results']:
try:
user_row = user_map[row['data'][user_id_col]]
except KeyError:
return False
for index in range(len(user_schema)):
if ((user_schema[index]['name'] == 'default_project_id' and
'default_project_id' not in user_row)):
# Keystone does not return the tenantId or email column
# if not present.
pass
elif (str(row['data'][index]) !=
str(user_row[user_schema[index]['name']])):
return False
return True
if not test_utils.call_until_true(
func=_check_data_table_keystone_users,
duration=100, sleep_for=4):
raise exceptions.TimeoutException("Data did not converge in time "
"or failure in server")
@test.attr(type='smoke')
def test_keystone_roles_table(self):
role_schema = (
self.admin_manager.congress_client.show_datasource_table_schema(
self.datasource_id, 'roles')['columns'])
role_id_col = next(i for i, c in enumerate(role_schema)
if c['name'] == 'id')
def _check_data_table_keystone_roles():
# Fetch data from keystone each time, because this test may start
# before keystone has all the users.
roles = self.roles_client.list_roles()['roles']
roles_map = {}
for role in roles:
roles_map[role['id']] = role
results = (
self.admin_manager.congress_client.list_datasource_rows(
self.datasource_id, 'roles'))
for row in results['results']:
try:
role_row = roles_map[row['data'][role_id_col]]
except KeyError:
return False
for index in range(len(role_schema)):
if (str(row['data'][index]) !=
str(role_row[role_schema[index]['name']])):
return False
return True
if not test_utils.call_until_true(
func=_check_data_table_keystone_roles,
duration=100, sleep_for=4):
raise exceptions.TimeoutException("Data did not converge in time "
"or failure in server")
@test.attr(type='smoke')
def test_keystone_domains_table(self):
domains_schema = (
self.admin_manager.congress_client.show_datasource_table_schema(
self.datasource_id, 'domains')['columns'])
domain_id_col = next(i for i, c in enumerate(domains_schema)
if c['name'] == 'id')
def _check_data_table_keystone_domains():
# Fetch data from keystone each time, because this test may start
# before keystone has all the users.
domains = self.domains_client.list_domains()['domains']
domains_map = {}
for domain in domains:
domains_map[domain['id']] = domain
results = (
self.admin_manager.congress_client.list_datasource_rows(
self.datasource_id, 'domains'))
for row in results['results']:
try:
domain_row = domains_map[row['data'][domain_id_col]]
except KeyError:
return False
for index in range(len(domains_schema)):
if (str(row['data'][index]) !=
str(domain_row[domains_schema[index]['name']])):
return False
return True
if not test_utils.call_until_true(
func=_check_data_table_keystone_domains,
duration=100, sleep_for=4):
raise exceptions.TimeoutException("Data did not converge in time "
"or failure in server")
@test.attr(type='smoke')
def test_keystone_projects_table(self):
projects_schema = (
self.admin_manager.congress_client.show_datasource_table_schema(
self.datasource_id, 'projects')['columns'])
project_id_col = next(i for i, c in enumerate(projects_schema)
if c['name'] == 'id')
def _check_data_table_keystone_projects():
# Fetch data from keystone each time, because this test may start
# before keystone has all the users.
projects = self.projects_client.list_projects()['projects']
projects_map = {}
for project in projects:
projects_map[project['id']] = project
results = (
self.admin_manager.congress_client.list_datasource_rows(
self.datasource_id, 'projects'))
for row in results['results']:
try:
project_row = projects_map[row['data'][project_id_col]]
except KeyError:
return False
for index in range(len(projects_schema)):
if (str(row['data'][index]) !=
str(project_row[projects_schema[index]['name']])):
return False
return True
if not test_utils.call_until_true(
func=_check_data_table_keystone_projects,
duration=100, sleep_for=5):
raise exceptions.TimeoutException("Data did not converge in time "
"or failure in server")
@test.attr(type='smoke')
def test_update_no_error(self):
if not test_utils.call_until_true(
func=lambda: self.check_datasource_no_error('keystonev3'),
duration=30, sleep_for=5):
raise exceptions.TimeoutException('Datasource could not poll '
'without error.')

View File

@ -0,0 +1,13 @@
---
prelude: >
upgrade:
- In keystonev3_driver (experimental) `users` table,
the columns `description` and `email` have been
removed because they are not present in keystone
V3 API response. These columns should be removed
from existing policy rules referring to the `users`
table. The `project_id` column has been replaced
by `default_project_id` because the previous
column name was incorrect. Named column reference
should be similarly replaced in existing policy
rules referring to the `users` table.