Adding keystone user password rules test for StarlingX pytest framework
Include: -added test case for keystone user password rules under security -editing set_user definition in keystone_helper -editing create_user definition in keystone_helper -adding set_current_user_password definition used by the testcase -editing add definition for updating user dictionary in auth file -editing get definition to handle attribute error in auth file Story: 2007406 Task: 39011 Change-Id: I505519c40aa4fc9f58c11bab7ab39175e1c7098d Author: Yang Liu <yang.liu@windriver.com> Co-Authored-By: Hrishit Mazumder <hrishit.mazumder@windriver.com> Signed-off-by: Hrishit Mazumder <hrishit.mazumder@windriver.com>
This commit is contained in:
parent
189fdcc6d8
commit
bf901d03ea
@ -83,21 +83,23 @@ class Tenant:
|
||||
cls.__REGION = region
|
||||
|
||||
@classmethod
|
||||
def add(cls, tenantname, dictname=None, username=None, password=None,
|
||||
region=None, auth_url=None, domain='Default'):
|
||||
tenant_dict = dict(tenant=tenantname)
|
||||
tenant_dict['user'] = username if username else tenantname
|
||||
tenant_dict['password'] = password if password else cls.__PASSWORD
|
||||
tenant_dict['domain'] = domain
|
||||
def add(cls, username, tenantname=None, dictname=None, password=None,
|
||||
region=None, auth_url=None, domain='Default', **kwargs):
|
||||
user_dict = dict(user=username)
|
||||
user_dict['tenant'] = tenantname
|
||||
user_dict['password'] = password if password else cls.__PASSWORD
|
||||
user_dict['domain'] = domain
|
||||
if region:
|
||||
tenant_dict['region'] = region
|
||||
user_dict['region'] = region
|
||||
if auth_url:
|
||||
tenant_dict['auth_url'] = auth_url
|
||||
user_dict['auth_url'] = auth_url
|
||||
if kwargs:
|
||||
user_dict.update(kwargs)
|
||||
|
||||
dictname = dictname.upper() if dictname else tenantname.upper().\
|
||||
dictname = dictname.upper() if dictname else username.upper(). \
|
||||
replace('-', '_')
|
||||
cls.__tenants[dictname] = tenant_dict
|
||||
return tenant_dict
|
||||
cls.__tenants[dictname] = user_dict
|
||||
return user_dict
|
||||
|
||||
__primary = 'TENANT1'
|
||||
|
||||
@ -115,14 +117,20 @@ class Tenant:
|
||||
will update as well.
|
||||
|
||||
"""
|
||||
|
||||
tenant_dictname = tenant_dictname.upper().replace('-', '_')
|
||||
tenant_dict = cls.__tenants.get(tenant_dictname)
|
||||
if not tenant_dict:
|
||||
return tenant_dict
|
||||
|
||||
if dc_region:
|
||||
region_dict = cls.__DC_MAP.get(dc_region, None)
|
||||
if not region_dict:
|
||||
raise ValueError(
|
||||
'Distributed cloud region {} is not added to '
|
||||
'DC_MAP yet. DC_MAP: {}'.format(dc_region, cls.__DC_MAP))
|
||||
|
||||
tenant_dict = dict(tenant_dict)
|
||||
tenant_dict.update({'region': region_dict['region']})
|
||||
else:
|
||||
tenant_dict.pop('region', None)
|
||||
|
@ -77,8 +77,11 @@ def add_or_remove_role(add_=True, role='admin', project=None, user=None,
|
||||
tenant_dict = {}
|
||||
|
||||
if project is None:
|
||||
tenant_dict = Tenant.get_primary()
|
||||
project = tenant_dict['tenant']
|
||||
if auth_info and auth_info.get('platform'):
|
||||
project = auth_info['tenant']
|
||||
else:
|
||||
tenant_dict = Tenant.get_primary()
|
||||
project = tenant_dict['tenant']
|
||||
|
||||
if user is None:
|
||||
user = tenant_dict.get('user', project)
|
||||
@ -223,6 +226,53 @@ def get_role_assignments(field='Role', names=True, role=None, user=None,
|
||||
return table_parser.get_multi_values(role_assignment_tab, field)
|
||||
|
||||
|
||||
def set_current_user_password(original_password, new_password, fail_ok=False,
|
||||
auth_info=None, con_ssh=None):
|
||||
"""
|
||||
Set password for current user
|
||||
Args:
|
||||
original_password:
|
||||
new_password:
|
||||
fail_ok:
|
||||
auth_info:
|
||||
con_ssh:
|
||||
|
||||
Returns (tuple):
|
||||
|
||||
"""
|
||||
args = "--password '{}' --original-password '{}'".format(new_password, original_password)
|
||||
code, output = cli.openstack('user password set', args, ssh_client=con_ssh,
|
||||
auth_info=auth_info, fail_ok=fail_ok)
|
||||
if code > 0:
|
||||
return 1, output
|
||||
|
||||
if not auth_info:
|
||||
auth_info = Tenant.get_primary()
|
||||
|
||||
user = auth_info['user']
|
||||
tenant_dictname = user
|
||||
if auth_info.get('platform'):
|
||||
tenant_dictname += '_platform'
|
||||
Tenant.update(tenant_dictname, password=new_password)
|
||||
|
||||
if user == 'admin':
|
||||
from consts.proj_vars import ProjVar
|
||||
if ProjVar.get_var('REGION') != 'RegionOne':
|
||||
LOG.info(
|
||||
"Run openstack_update_admin_password on secondary region "
|
||||
"after admin password change")
|
||||
if not con_ssh:
|
||||
con_ssh = ControllerClient.get_active_controller()
|
||||
with con_ssh.login_as_root(timeout=30) as con_ssh:
|
||||
con_ssh.exec_cmd(
|
||||
"echo 'y' | openstack_update_admin_password '{}'".format(new_password))
|
||||
|
||||
msg = 'User {} password successfully updated from {} to {}'.format(user, original_password,
|
||||
new_password)
|
||||
LOG.info(msg)
|
||||
return 0, output
|
||||
|
||||
|
||||
def set_user(user, name=None, project=None, password=None, project_doamin=None,
|
||||
email=None, description=None,
|
||||
enable=None, fail_ok=False, auth_info=Tenant.get('admin'),
|
||||
@ -250,14 +300,16 @@ def set_user(user, name=None, project=None, password=None, project_doamin=None,
|
||||
|
||||
arg += user
|
||||
|
||||
code, output = cli.openstack('user set', arg, ssh_client=con_ssh,
|
||||
code, output = cli.openstack('user set', arg, ssh_client=con_ssh, timeout=120,
|
||||
fail_ok=fail_ok, auth_info=auth_info)
|
||||
|
||||
if code > 0:
|
||||
return 1, output
|
||||
|
||||
if name or project or password:
|
||||
tenant_dictname = user.upper()
|
||||
tenant_dictname = user
|
||||
if auth_info and auth_info.get('platform'):
|
||||
tenant_dictname += '_platform'
|
||||
Tenant.update(tenant_dictname, username=name, password=password,
|
||||
tenant=project)
|
||||
|
||||
@ -365,7 +417,7 @@ def is_https_enabled(con_ssh=None, source_openrc=True,
|
||||
auth_info=Tenant.get('admin_platform')):
|
||||
if not con_ssh:
|
||||
con_name = auth_info.get('region') if (
|
||||
auth_info and ProjVar.get_var('IS_DC')) else None
|
||||
auth_info and ProjVar.get_var('IS_DC')) else None
|
||||
con_ssh = ControllerClient.get_active_controller(name=con_name)
|
||||
|
||||
table_ = table_parser.table(
|
||||
@ -391,6 +443,8 @@ def delete_users(user, fail_ok=False, auth_info=Tenant.get('admin'),
|
||||
|
||||
Returns: tuple, (code, msg)
|
||||
"""
|
||||
LOG.info('Deleting {} keystone user: {}'.format('platform' if auth_info and auth_info.get(
|
||||
'platform') else 'containerized', user))
|
||||
return cli.openstack('user delete', user, ssh_client=con_ssh,
|
||||
fail_ok=fail_ok, auth_info=auth_info)
|
||||
|
||||
@ -483,7 +537,7 @@ def create_project(name=None, field='ID', domain=None, parent=None,
|
||||
|
||||
def create_user(name=None, field='name', domain=None, project=None,
|
||||
project_domain=None, rtn_exist=None,
|
||||
password=HostLinuxUser.get_password(), email=None,
|
||||
password=None, email=None,
|
||||
description=None, enable=None,
|
||||
auth_info=Tenant.get('admin'), fail_ok=False, con_ssh=None):
|
||||
"""
|
||||
@ -508,6 +562,8 @@ def create_user(name=None, field='name', domain=None, project=None,
|
||||
(1, <std_err>)
|
||||
|
||||
"""
|
||||
if not password:
|
||||
password = HostLinuxUser.get_password()
|
||||
|
||||
if not name:
|
||||
name = 'user'
|
||||
@ -533,8 +589,25 @@ def create_user(name=None, field='name', domain=None, project=None,
|
||||
if code > 0:
|
||||
return 1, output
|
||||
|
||||
user = table_parser.get_value_two_col_table(table_parser.table(output),
|
||||
field=field)
|
||||
LOG.info("Openstack user {} successfully created/showed".format(user))
|
||||
table_ = table_parser.table(output)
|
||||
username = table_parser.get_value_two_col_table(table_, field='name')
|
||||
user = username if field == 'name' else table_parser.get_value_two_col_table(table_,
|
||||
field=field)
|
||||
|
||||
is_platform = auth_info and auth_info.get('platform')
|
||||
keystone = 'platform' if is_platform else 'containerized'
|
||||
dictname = user + '_platform' if is_platform else user
|
||||
existing_auth = Tenant.get(dictname)
|
||||
if existing_auth:
|
||||
if existing_auth['user'] != username:
|
||||
raise ValueError('Tenant.{} already exists for a different user {}'.format(
|
||||
dictname, existing_auth['user']))
|
||||
Tenant.update(dictname, username=username, password=password, tenant=project,
|
||||
platform=is_platform)
|
||||
else:
|
||||
Tenant.add(username=username, tenantname=project, dictname=dictname, password=password,
|
||||
platform=is_platform)
|
||||
LOG.info('Tenant.{} for {} keystone user {} is added'.format(dictname, keystone, user))
|
||||
|
||||
LOG.info("{} keystone user {} successfully created/showed".format(keystone, user))
|
||||
return 0, user
|
||||
|
@ -0,0 +1,625 @@
|
||||
#
|
||||
# Copyright (c) 2019 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
|
||||
import random
|
||||
import re
|
||||
import time
|
||||
import copy
|
||||
from string import ascii_lowercase, ascii_uppercase, digits, ascii_letters
|
||||
|
||||
from pytest import mark, skip, fixture
|
||||
|
||||
from consts.auth import Tenant
|
||||
from keywords import keystone_helper, container_helper, kube_helper
|
||||
from utils import cli
|
||||
from utils.tis_log import LOG
|
||||
from utils.clients.ssh import ControllerClient
|
||||
|
||||
|
||||
SPECIAL_CHARACTERS = r'!@#$%^&*()<>{}+=_\\\[\]\-?|~`,.;:'
|
||||
MIN_PASSWORD_LEN = 7
|
||||
MAX_PASSWORD_LEN = 128
|
||||
|
||||
# keystone.conf security_compliance configs
|
||||
LOCKOUT_DURATION = 300
|
||||
FAILURE_ATTEMPTS = 5
|
||||
UNIQUE_LAST_COUNT = 2
|
||||
|
||||
# Test user
|
||||
TEST_USER_NAME = 'stxtestuser'
|
||||
TEST_PASSWORD = 'Password*Rule1Test'
|
||||
USED_PASSWORDS = {}
|
||||
WAIT_BETWEEN_CHANGE = 6
|
||||
|
||||
SIMPLE_WORD_DICTIONARY = '''
|
||||
and is being proof-read and supplemented by volunteers from around the
|
||||
world. This is an unfunded project, and future enhancement of this
|
||||
dictionary will depend on the efforts of volunteers willing to help build
|
||||
this free resource into a comprehensive body of general information. New
|
||||
definitions for missing words or words senses and longer explanatory notes,
|
||||
as well as images to accompany the articles are needed. More modern
|
||||
illustrative quotations giving recent examples of usage of the words in
|
||||
their various senses will be very helpful, since most quotations in the
|
||||
original 1913 dictionary are now well over 100 years old
|
||||
'''
|
||||
|
||||
|
||||
def save_used_password(keystone, password):
|
||||
if keystone not in USED_PASSWORDS:
|
||||
USED_PASSWORDS[keystone] = [password]
|
||||
else:
|
||||
used_passwords = USED_PASSWORDS[keystone]
|
||||
used_passwords.append(password)
|
||||
if len(used_passwords) > UNIQUE_LAST_COUNT:
|
||||
used_passwords.pop(0)
|
||||
|
||||
LOG.info('{} keystone user {} password saved. \nUsed passwords: {}'.format(
|
||||
keystone, TEST_USER_NAME, USED_PASSWORDS[keystone]))
|
||||
|
||||
|
||||
def is_last_used(password, keystone, depth=UNIQUE_LAST_COUNT):
|
||||
used_passwords = USED_PASSWORDS.get(keystone, [])
|
||||
if used_passwords:
|
||||
if len(used_passwords) >= UNIQUE_LAST_COUNT:
|
||||
return password in used_passwords[-1 * depth:]
|
||||
else:
|
||||
return password in used_passwords
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def get_valid_password(keystone):
|
||||
total_length = random.randint(MIN_PASSWORD_LEN, MAX_PASSWORD_LEN)
|
||||
password = None
|
||||
frequently_used_words = re.split(r'\W', SIMPLE_WORD_DICTIONARY.strip())
|
||||
|
||||
attempt = 0
|
||||
while attempt < 60:
|
||||
attempt += 1
|
||||
left_count = total_length
|
||||
lower_case_len = random.randint(1, left_count - 3)
|
||||
left_count -= lower_case_len
|
||||
|
||||
upper_case_len = random.randint(1, left_count - 2)
|
||||
left_count -= upper_case_len
|
||||
|
||||
digit_len = random.randint(1, left_count - 1)
|
||||
left_count -= digit_len
|
||||
|
||||
special_char_len = random.randint(1, left_count)
|
||||
|
||||
lower_case = random.sample(ascii_lowercase, min(lower_case_len, len(ascii_lowercase)))
|
||||
upper_case = random.sample(ascii_uppercase, min(upper_case_len, len(ascii_uppercase)))
|
||||
password_digits = random.sample(digits, min(digit_len, len(digits)))
|
||||
special_char = random.sample(SPECIAL_CHARACTERS, min(special_char_len,
|
||||
len(SPECIAL_CHARACTERS)))
|
||||
|
||||
actual_len = len(lower_case) + len(upper_case) + len(password_digits) + len(special_char)
|
||||
|
||||
password = random.sample(lower_case + upper_case + password_digits + special_char,
|
||||
min(actual_len, total_length))
|
||||
alphabet = ascii_lowercase + ascii_uppercase + digits + SPECIAL_CHARACTERS
|
||||
|
||||
password = ''.join(password)
|
||||
if actual_len != len(password):
|
||||
LOG.warn('actual_len:{}, password len:{}, password:{}\n'.format(
|
||||
actual_len, len(password), password))
|
||||
|
||||
if len(password) < total_length:
|
||||
password += \
|
||||
''.join(random.choice(alphabet) for _ in range(total_length - len(password) + 1))
|
||||
|
||||
list_of_chars = list(password)
|
||||
|
||||
if (list_of_chars[0] == '{') or (list_of_chars[0] == '}') or (list_of_chars[0] == '-'):
|
||||
list_of_chars[0] = 'a'
|
||||
|
||||
if (list_of_chars[-1] == '{') or (list_of_chars[-1] == '}'):
|
||||
list_of_chars[-1] = 'a'
|
||||
|
||||
for index, char in enumerate(list_of_chars):
|
||||
next_char = list_of_chars[index + 1] if index != len(list_of_chars) - 1 else ''
|
||||
|
||||
if char == '{':
|
||||
if next_char == '{' or next_char == '}':
|
||||
list_of_chars[index + 1] = 'a'
|
||||
list_of_chars[index - 1] = '{'
|
||||
else:
|
||||
list_of_chars[index - 1] = '{'
|
||||
if char == '}':
|
||||
if next_char != '{':
|
||||
list_of_chars[index - 1] = '}'
|
||||
|
||||
password = ''.join(list_of_chars)
|
||||
|
||||
if not is_last_used(password, keystone=keystone) and password not in \
|
||||
frequently_used_words:
|
||||
break
|
||||
|
||||
if attempt < 60:
|
||||
LOG.debug('Found valid password:\n{}\n'.format(password))
|
||||
else:
|
||||
LOG.debug('Cannot found valid password, attempted:{}\n'.format(attempt))
|
||||
|
||||
return password
|
||||
|
||||
|
||||
def multiple_attempts_generator():
|
||||
LOG.info('Attempt with wrong passwords multiple times')
|
||||
invalid_password = ''.join(random.sample(ascii_letters, MIN_PASSWORD_LEN - 1))
|
||||
|
||||
while True:
|
||||
count, keystone, is_admin, user_name = yield
|
||||
current_password = USED_PASSWORDS[keystone][-1]
|
||||
for n in range(int(count)):
|
||||
verify_user(user_name, invalid_password, is_admin=is_admin, expect_fail=True,
|
||||
keystone=keystone)
|
||||
LOG.info('Command rejected with INVALID password as expected, count: {}'.format(n + 1))
|
||||
time.sleep(10)
|
||||
|
||||
time.sleep(20)
|
||||
|
||||
LOG.tc_step('Verify {} keystone user {} is locked out after {} failed '
|
||||
'attempts'.format(keystone, user_name, count))
|
||||
verify_user(user_name, current_password, is_admin=is_admin, expect_fail=True,
|
||||
keystone=keystone)
|
||||
|
||||
LOG.tc_step('Wait for {} seconds and verify account is unlocked'.format(
|
||||
LOCKOUT_DURATION + WAIT_BETWEEN_CHANGE))
|
||||
|
||||
time.sleep(LOCKOUT_DURATION + WAIT_BETWEEN_CHANGE)
|
||||
verify_user(user_name, current_password, is_admin=is_admin, expect_fail=False,
|
||||
keystone=keystone)
|
||||
LOG.info('OK, {} keystone user is unlocked after {} seconds'.format(keystone,
|
||||
LOCKOUT_DURATION))
|
||||
|
||||
yield
|
||||
|
||||
|
||||
def special_char_generator():
|
||||
while True:
|
||||
(args, keystone, _), expecting_pass = yield
|
||||
|
||||
password = list(get_valid_password(keystone=keystone))
|
||||
|
||||
if not expecting_pass:
|
||||
|
||||
special_to_letter = \
|
||||
dict(zip(SPECIAL_CHARACTERS, ascii_letters[:len(SPECIAL_CHARACTERS) + 1]))
|
||||
password = \
|
||||
''.join(special_to_letter[c] if c in SPECIAL_CHARACTERS else c for c in password)
|
||||
else:
|
||||
while True:
|
||||
password = get_valid_password(keystone=keystone)
|
||||
if not is_last_used(password, keystone=keystone):
|
||||
break
|
||||
|
||||
yield password
|
||||
|
||||
|
||||
def case_numerical_generator():
|
||||
while True:
|
||||
(args, keystone, _), expecting_pass = yield
|
||||
|
||||
password = list(get_valid_password(keystone=keystone))
|
||||
|
||||
if not expecting_pass:
|
||||
if args == 'lower':
|
||||
password = ''.join(c.upper() if c.isalpha() else c for c in password
|
||||
if not c.isalpha() or c.islower())
|
||||
elif args == 'upper':
|
||||
password = ''.join(c.lower() if c.isalpha() else c for c in password
|
||||
if not c.isalpha() or c.isupper())
|
||||
elif args == 'digit':
|
||||
digit_to_letter = dict(zip('0123456789', 'abcdefghij'))
|
||||
password = ''.join(digit_to_letter[c] if c.isdigit() else c for c in password)
|
||||
else:
|
||||
skip('Unknown args: case_numerical_generator: user_name={}, args={}, '
|
||||
'expecting_pass={}\n'.format(keystone, args, expecting_pass))
|
||||
return
|
||||
|
||||
else:
|
||||
while True:
|
||||
password = get_valid_password(keystone=keystone)
|
||||
if not is_last_used(password, keystone=keystone):
|
||||
break
|
||||
|
||||
yield password
|
||||
|
||||
|
||||
def change_history_generator():
|
||||
while True:
|
||||
(args, keystone, _), expecting_pass = yield
|
||||
|
||||
used_passwords = USED_PASSWORDS[keystone]
|
||||
if not expecting_pass:
|
||||
if args == 'not_last_2':
|
||||
password = used_passwords[0]
|
||||
|
||||
elif args == '3_diff':
|
||||
previous = used_passwords[-1]
|
||||
total_to_change = random.randrange(0, 2)
|
||||
rand_indice = random.sample(range(len(previous)), total_to_change)
|
||||
new_chars = []
|
||||
for i in range(len(previous)):
|
||||
if i in rand_indice:
|
||||
while True:
|
||||
new_char = random.choice(ascii_letters)
|
||||
if new_char != previous[i]:
|
||||
new_chars.append(new_char)
|
||||
break
|
||||
else:
|
||||
new_chars.append(previous[i])
|
||||
password = ''.join(new_chars)
|
||||
|
||||
elif args == 'reversed':
|
||||
password = ''.join(used_passwords[-1::-1])
|
||||
|
||||
else:
|
||||
password = ''
|
||||
skip('Unknown arg:{} for change_history_generator'.format(args))
|
||||
|
||||
else:
|
||||
while True:
|
||||
password = get_valid_password(keystone=keystone)
|
||||
if password not in used_passwords:
|
||||
break
|
||||
|
||||
yield password
|
||||
|
||||
|
||||
def length_generator():
|
||||
while True:
|
||||
(args, keystone, _), expecting_pass = yield
|
||||
|
||||
password = ''
|
||||
for _ in range(30):
|
||||
password = get_valid_password(keystone=keystone)
|
||||
|
||||
if not expecting_pass:
|
||||
password = password[:random.randint(1, MIN_PASSWORD_LEN - 1)]
|
||||
break
|
||||
|
||||
if not is_last_used(password, keystone=keystone):
|
||||
break
|
||||
|
||||
yield password
|
||||
|
||||
|
||||
def verify_user(user_name, password, is_admin=True, expect_fail=False, keystone=None):
|
||||
scenario = ' and expect failure' if expect_fail else ''
|
||||
LOG.info('Run {} OpenStack command with {} role {}'.format(
|
||||
keystone, 'admin' if is_admin else 'member', scenario))
|
||||
|
||||
dict_name = '{}_platform'.format(user_name) if keystone == 'platform' else user_name
|
||||
auth_info = Tenant.get(dict_name)
|
||||
auth_info = copy.deepcopy(auth_info)
|
||||
auth_info['password'] = password
|
||||
if is_admin:
|
||||
command = 'endpoint list'
|
||||
code, output = cli.openstack(command, fail_ok=expect_fail, auth_info=auth_info)
|
||||
else:
|
||||
command = 'user show {}'.format(user_name)
|
||||
code, output = cli.openstack(command, fail_ok=expect_fail, auth_info=auth_info)
|
||||
|
||||
message = 'command:{}\nauth_info:{}\noutput:{}'.format(command, auth_info, output)
|
||||
|
||||
if expect_fail:
|
||||
assert 1 == code, "OpenStack command ran successfully while rejection is " \
|
||||
"expected: {}".format(message)
|
||||
|
||||
|
||||
def change_user_password(user_name, password, keystone, by_admin=True, expect_fail=None):
|
||||
scenario = 'Change platform keystone user password with rule {} unsatisfied'.format(
|
||||
expect_fail) if expect_fail else 'Change platform keyword user password to a valid password'
|
||||
|
||||
if by_admin and expect_fail == 'not_last_used':
|
||||
scenario += ', but still allowed when operated by admin user'
|
||||
expect_fail = None
|
||||
|
||||
LOG.info(scenario)
|
||||
|
||||
dict_name = '{}_platform'.format(user_name) if keystone == 'platform' else user_name
|
||||
user_auth = Tenant.get(dict_name)
|
||||
original_password = user_auth['password']
|
||||
|
||||
if by_admin:
|
||||
admin_auth = Tenant.get('admin_platform') if keystone == 'platform' else Tenant.get('admin')
|
||||
code, output = keystone_helper.set_user(user=user_name, password=password, project='admin',
|
||||
auth_info=admin_auth, fail_ok=expect_fail)
|
||||
else:
|
||||
code, output = keystone_helper.set_current_user_password(
|
||||
fail_ok=expect_fail, original_password=original_password, new_password=password,
|
||||
auth_info=user_auth)
|
||||
|
||||
if code == 0:
|
||||
save_used_password(keystone, password=password)
|
||||
|
||||
if expect_fail:
|
||||
assert 1 == code, "{} keystone user password change accepted unexpectedly with " \
|
||||
"password rule violated: {}".format(keystone, password)
|
||||
|
||||
LOG.info('{} keystone password change {} as expected'.format(
|
||||
keystone, 'rejected' if expect_fail else 'accepted'))
|
||||
|
||||
return code, output
|
||||
|
||||
|
||||
PASSWORD_RULE_INFO = [
|
||||
('minimum_7_chars', (length_generator, '')),
|
||||
('at_least_1_lower_case', (case_numerical_generator, 'lower')),
|
||||
('at_least_1_upper_case', (case_numerical_generator, 'upper')),
|
||||
('at_least_1_digit', (case_numerical_generator, 'digit')),
|
||||
('at_least_1_special_case', (special_char_generator, '')),
|
||||
('not_last_used', (change_history_generator, 'not_last_2')),
|
||||
]
|
||||
|
||||
KEYSTONES = ['platform', 'stx-openstack']
|
||||
|
||||
|
||||
@fixture(scope='module', params=KEYSTONES)
|
||||
def create_test_user(request):
|
||||
keystone = request.param
|
||||
if keystone == 'stx-openstack' and not container_helper.is_stx_openstack_deployed():
|
||||
skip('stx-openstack is not applied')
|
||||
|
||||
LOG.fixture_step("Creating {} keystone user {} for password rules testing".format(
|
||||
keystone, TEST_USER_NAME))
|
||||
auth_info = Tenant.get('admin_platform') if keystone == 'platform' else Tenant.get('admin')
|
||||
existing_users = keystone_helper.get_users(field='Name', auth_info=auth_info)
|
||||
print(existing_users, "exiting userssss")
|
||||
if TEST_USER_NAME in existing_users:
|
||||
keystone_helper.delete_users(TEST_USER_NAME, auth_info=auth_info)
|
||||
|
||||
keystone_helper.create_user(name=TEST_USER_NAME, password=TEST_PASSWORD,
|
||||
auth_info=auth_info, project='admin')
|
||||
existing_users = keystone_helper.get_users(field='Name', auth_info=auth_info)
|
||||
print(existing_users, "exiting userssss")
|
||||
save_used_password(keystone, TEST_PASSWORD)
|
||||
keystone_helper.add_or_remove_role(add_=True, role='member', user=TEST_USER_NAME,
|
||||
auth_info=auth_info, project='admin')
|
||||
|
||||
def delete():
|
||||
LOG.fixture_step("Delete keystone test {}".format(TEST_USER_NAME))
|
||||
keystone_helper.delete_users(TEST_USER_NAME, auth_info=auth_info)
|
||||
|
||||
request.addfinalizer(delete)
|
||||
|
||||
return keystone
|
||||
|
||||
|
||||
class TestKeystonePassword:
|
||||
@mark.parametrize(('role', 'scenario'), [
|
||||
('admin_role', 'change_by_admin_user'),
|
||||
('admin_role', 'change_by_current_user'),
|
||||
('member_role', 'change_by_current_user'),
|
||||
('member_role', 'change_by_admin_user'),
|
||||
])
|
||||
def test_keystone_password_rules(self, create_test_user, role, scenario):
|
||||
"""
|
||||
Test keystone password rules when attempt to change the password
|
||||
Args:
|
||||
create_test_user:
|
||||
role:
|
||||
scenario (str): operator for the password change
|
||||
|
||||
Setups:
|
||||
- Create a platform/stx-openstack keystone user (class)
|
||||
|
||||
Test Steps:
|
||||
- Assign member/admin role to test user
|
||||
- Ensure test user can run openstack command
|
||||
- Attempt to change the test user password using current user or the default keystone
|
||||
admin user
|
||||
- Ensure the valid password is accepted while the invalid ones are rejected
|
||||
|
||||
Teardown:
|
||||
- Remove test user (class)
|
||||
|
||||
"""
|
||||
keystone = create_test_user
|
||||
user_name = TEST_USER_NAME
|
||||
is_admin = True if role == 'admin_role' else False
|
||||
assign_role(keystone=keystone, user_name=user_name, role=role, is_admin=is_admin)
|
||||
|
||||
random.seed()
|
||||
by_admin = True if 'admin_user' in scenario else False
|
||||
for item in PASSWORD_RULE_INFO:
|
||||
rule, generator_args = item
|
||||
|
||||
LOG.tc_step('Verify {} keystone password rule {} when {}'.format(
|
||||
keystone, rule, scenario))
|
||||
password_gen, args = generator_args
|
||||
|
||||
password_producer = password_gen()
|
||||
password_producer.send(None)
|
||||
send_args = (args, keystone, is_admin)
|
||||
valid_pwd = password_producer.send((send_args, True))
|
||||
change_user_password(user_name, valid_pwd, by_admin=is_admin, keystone=keystone)
|
||||
verify_user(user_name, valid_pwd, is_admin=is_admin, keystone=keystone)
|
||||
|
||||
next(password_producer)
|
||||
invalid_pwd = password_producer.send((send_args, False))
|
||||
wait = WAIT_BETWEEN_CHANGE + 1
|
||||
LOG.info('Wait for {} seconds to test {} violation'.format(wait, rule))
|
||||
time.sleep(wait)
|
||||
change_user_password(user_name, invalid_pwd, expect_fail=rule,
|
||||
by_admin=by_admin, keystone=keystone)
|
||||
|
||||
LOG.info('Password rule {} verified passed'.format(rule))
|
||||
|
||||
@fixture(scope='class')
|
||||
def configure_keystone_lockout(self, create_test_user):
|
||||
keystone = create_test_user
|
||||
set_keystone_lockout(keystone, lockout_duration=LOCKOUT_DURATION,
|
||||
failure_attempts=FAILURE_ATTEMPTS)
|
||||
return keystone
|
||||
|
||||
@mark.parametrize('role', [
|
||||
'admin_role',
|
||||
'member_role'
|
||||
])
|
||||
def test_keystone_account_lockout(self, configure_keystone_lockout, role):
|
||||
"""
|
||||
Test keystone password rules when attempt to change the password
|
||||
Args:
|
||||
configure_keystone_lockout:
|
||||
role:
|
||||
|
||||
Setups:
|
||||
- Create a platform/stx-openstack keystone user (class)
|
||||
- Check lockout config exists in keystone.conf (class)
|
||||
- Set lockout configs to 5 failed attempts and 300 lockout duration for testing purpose
|
||||
|
||||
Test Steps:
|
||||
- Assign member/admin role to test user
|
||||
- Attempt to run openstack command using incorrect passwords for 5 times
|
||||
- Check test account is locked by running openstack command using correct password
|
||||
- Wait for lockout duration
|
||||
- Check user is unlocked
|
||||
|
||||
Teardown:
|
||||
- Remove test user (class)
|
||||
|
||||
"""
|
||||
keystone = configure_keystone_lockout
|
||||
user_name = TEST_USER_NAME
|
||||
is_admin = True if role == 'admin_role' else False
|
||||
assign_role(keystone=keystone, user_name=user_name, role=role, is_admin=is_admin)
|
||||
|
||||
random.seed()
|
||||
LOG.tc_step('Set {} keystone lockout_duration to 300 and lockout_failure_attempts to 5 for '
|
||||
'testing purpose'.format(keystone))
|
||||
set_keystone_lockout(keystone=keystone, lockout_duration=LOCKOUT_DURATION,
|
||||
failure_attempts=5)
|
||||
|
||||
LOG.tc_step('Attempt to run {} keystone command using incorrect password multiple times '
|
||||
'and ensure account is locked out'.format(keystone))
|
||||
args = (5, keystone, is_admin, user_name)
|
||||
password_producer = multiple_attempts_generator()
|
||||
password_producer.send(None)
|
||||
password_producer.send(args)
|
||||
|
||||
|
||||
def assign_role(keystone, user_name, role, is_admin):
|
||||
is_platform = True if keystone == 'platform' else False
|
||||
|
||||
LOG.tc_step('Assign test user {} with {}'.format(user_name, role))
|
||||
admin_auth = Tenant.get('admin_platform') if is_platform else Tenant.get('admin')
|
||||
keystone_helper.add_or_remove_role(add_=is_admin, role='admin', user=user_name,
|
||||
auth_info=admin_auth, project='admin')
|
||||
|
||||
user_dict_name = '{}_platform'.format(user_name) if is_platform else user_name
|
||||
password = Tenant.get(user_dict_name)['password']
|
||||
LOG.tc_step('Run {} OpenStack command using {}/{} and ensure it works'.format(
|
||||
keystone, user_name, password))
|
||||
verify_user(user_name, password, is_admin=is_admin, keystone=keystone)
|
||||
|
||||
|
||||
def __set_non_platform_lockout(current_values, expt_values):
|
||||
app_name = 'stx-openstack'
|
||||
service = 'keystone'
|
||||
namespace = 'openstack'
|
||||
section = 'conf.keystone.security_compliance'
|
||||
fields = ['lockout_duration', 'lockout_failure_attempts']
|
||||
kv_pairs = {}
|
||||
for i in range(2):
|
||||
if current_values[i] != expt_values[i]:
|
||||
kv_pairs['{}.{}'.format(section, fields[i])] = expt_values[i]
|
||||
|
||||
if not kv_pairs:
|
||||
LOG.info('stx-openstack keystone lockout values already set to: {}'.format(expt_values))
|
||||
return
|
||||
|
||||
container_helper.update_helm_override(
|
||||
chart=service, namespace=namespace, reset_vals=False,
|
||||
kv_pairs=kv_pairs)
|
||||
|
||||
override_info = container_helper.get_helm_override_values(
|
||||
chart=service, namespace=namespace, fields='user_overrides')
|
||||
LOG.debug('override_info:{}'.format(override_info))
|
||||
|
||||
container_helper.apply_app(
|
||||
app_name=app_name, check_first=False, applied_timeout=1800)
|
||||
|
||||
post_values = get_lockout_values(keystone='stx-openstack')
|
||||
assert expt_values == post_values, "lockout values did not set to expected after helm " \
|
||||
"override update"
|
||||
LOG.info('stx-openstack keystone lockout values set successfully')
|
||||
|
||||
|
||||
def __set_platform_lockout(current_values, expt_values):
|
||||
conf_file = '/etc/keystone/keystone.conf'
|
||||
fields = ['lockout_duration', 'lockout_failure_attempts']
|
||||
con_ssh = ControllerClient.get_active_controller()
|
||||
for i in range(2):
|
||||
if current_values[i] == expt_values[i]:
|
||||
continue
|
||||
|
||||
field = fields[i]
|
||||
con_ssh.exec_sudo_cmd("sed -i 's/^{}.*=.*/{} = {}/g' "
|
||||
"{}".format(field, field, expt_values[i], conf_file), fail_ok=False)
|
||||
|
||||
post_values = get_lockout_values('platform')
|
||||
assert expt_values == post_values, "platform keystone lockout values unexpected after sed"
|
||||
|
||||
LOG.info("Restart platform keystone service after changing keystone config")
|
||||
con_ssh.exec_sudo_cmd('sm-restart-safe service keystone', fail_ok=False)
|
||||
time.sleep(30)
|
||||
|
||||
|
||||
def set_keystone_lockout(keystone, lockout_duration=300, failure_attempts=5):
|
||||
current_values = get_lockout_values(keystone=keystone)
|
||||
expt_values = [lockout_duration, failure_attempts]
|
||||
if current_values == expt_values:
|
||||
return
|
||||
|
||||
if keystone == 'platform':
|
||||
__set_platform_lockout(current_values, expt_values)
|
||||
else:
|
||||
__set_non_platform_lockout(current_values, expt_values)
|
||||
|
||||
|
||||
def get_lockout_values(keystone):
|
||||
conf_file = '/etc/keystone/keystone.conf'
|
||||
fields = ['lockout_duration', 'lockout_failure_attempts']
|
||||
section = 'security_compliance'
|
||||
config_fields = {section: fields}
|
||||
|
||||
LOG.info('Getting {} keystone account lockout values'.format(keystone))
|
||||
|
||||
if keystone == 'platform':
|
||||
con_ssh = ControllerClient.get_active_controller()
|
||||
code, out = con_ssh.exec_sudo_cmd('grep -E "^{}|^{}" {}'.format(
|
||||
fields[0], fields[1], conf_file))
|
||||
assert code == 0, "platform keystone lockout is not configured"
|
||||
for field in fields:
|
||||
assert field in out, "platform keystone {} is not configured".format(field)
|
||||
|
||||
values_dict = {}
|
||||
for line in out.splitlines():
|
||||
key, val = line.split(sep='=')
|
||||
values_dict[key.strip()] = int(val.strip())
|
||||
values = [values_dict[field] for field in fields]
|
||||
|
||||
else:
|
||||
configs = kube_helper.get_openstack_configs(
|
||||
conf_file=conf_file, configs=config_fields,
|
||||
label_app='keystone', label_component='api')
|
||||
|
||||
values = [(item.get(section, fields[0], fallback=None),
|
||||
item.get(section, fields[1], fallback=None))
|
||||
for item in list(configs.values())]
|
||||
|
||||
assert len(set(values)) == 1, 'keystone conf differs in different keystone api pods'
|
||||
values = values[0]
|
||||
for value in values:
|
||||
assert value is not None, "{} keystone account lockout is not " \
|
||||
"configured".format(keystone)
|
||||
values = [int(val.strip()) for val in values]
|
||||
|
||||
LOG.info("Lockout configs in {} keystone.conf: {}".format(keystone, values))
|
||||
return values
|
Loading…
Reference in New Issue
Block a user