Add password expiration queries for PCI-DSS
New query added for determining when passwords will expire. The following are the new queries: /v3/groups/{group_id}/users?password_expires_at={operator}:{timestamp} - Lists users belonging to a group whose password will expire based on the operator given. /v3/users?password_expires_at={operator}:{timestamp} - Lists users whose password will expire based on the operator given. {timestamp} is a datetime in the format "YYYY-MM-DDTHH:mm:ssZ". {operator} is one of lt, lte, gt gte, eq, and neq to filter in. If no operator is given, it is treated as eq. Examples: - GET /v3/users?password_expires_at=lt:2016-11-06T15:32:17Z - GET /v3/groups/079c578fd99b428ab61fcd4c9bd88ecd/users?password_expires_at=gt:2016-12-08T22:02:00Z Partially-Implements: bp pci-dss-query-password-expired-users Change-Id: If0b9cc3c8af92b2ea5d41a0e8afeb78e12b7689c
This commit is contained in:
parent
35deec22f8
commit
28c70f48dd
@ -133,6 +133,12 @@ class ValidationTimeStampError(Error):
|
||||
title = 'Bad Request'
|
||||
|
||||
|
||||
class InvalidOperatorError(ValidationError):
|
||||
message_format = _("The given operator %(_op)s is not valid."
|
||||
" It must be one of the following:"
|
||||
" 'eq', 'neq', 'lt', 'lte', 'gt', or 'gte'.")
|
||||
|
||||
|
||||
class ValidationExpirationError(Error):
|
||||
message_format = _("The 'expires_at' must not be before now."
|
||||
" The server could not comply with the request"
|
||||
|
@ -126,10 +126,29 @@ class Identity(base.IdentityDriverBase):
|
||||
session.add(user_ref)
|
||||
return base.filter_user(user_ref.to_dict())
|
||||
|
||||
def _create_password_expires_query(self, session, query, hints):
|
||||
for filter_ in hints.filters:
|
||||
if 'password_expires_at' == filter_['name']:
|
||||
# Filter on users who's password expires based on the operator
|
||||
# specified in `filter_['comparator']`
|
||||
query = query.filter(sqlalchemy.and_(
|
||||
model.LocalUser.id == model.Password.local_user_id,
|
||||
filter_['comparator'](model.Password.expires_at,
|
||||
filter_['value'])))
|
||||
# Removes the `password_expired_at` filters so there are no errors
|
||||
# if the call is filtered further. This is because the
|
||||
# `password_expires_at` value is not stored in the `User` table but
|
||||
# derived from the `Password` table's value `expires_at`.
|
||||
hints.filters = [x for x in hints.filters if x['name'] !=
|
||||
'password_expires_at']
|
||||
return query, hints
|
||||
|
||||
@driver_hints.truncated
|
||||
def list_users(self, hints):
|
||||
with sql.session_for_read() as session:
|
||||
query = session.query(model.User).outerjoin(model.LocalUser)
|
||||
query, hints = self._create_password_expires_query(session, query,
|
||||
hints)
|
||||
user_refs = sql.filter_limit_query(model.User, query, hints)
|
||||
return [base.filter_user(x.to_dict()) for x in user_refs]
|
||||
|
||||
@ -263,6 +282,8 @@ class Identity(base.IdentityDriverBase):
|
||||
query = query.join(model.UserGroupMembership)
|
||||
query = query.filter(
|
||||
model.UserGroupMembership.group_id == group_id)
|
||||
query, hints = self._create_password_expires_query(session, query,
|
||||
hints)
|
||||
query = sql.filter_limit_query(model.User, query, hints)
|
||||
return [base.filter_user(u.to_dict()) for u in query]
|
||||
|
||||
|
@ -222,7 +222,8 @@ class UserV3(controller.V3Controller):
|
||||
return UserV3.wrap_member(request.context_dict, ref)
|
||||
|
||||
@controller.filterprotected('domain_id', 'enabled', 'idp_id', 'name',
|
||||
'protocol_id', 'unique_id')
|
||||
'protocol_id', 'unique_id',
|
||||
'password_expires_at')
|
||||
def list_users(self, request, filters):
|
||||
hints = UserV3.build_driver_hints(request, filters)
|
||||
domain = self._get_domain_id_for_list_request(request)
|
||||
@ -230,6 +231,7 @@ class UserV3(controller.V3Controller):
|
||||
return UserV3.wrap_collection(request.context_dict, refs, hints=hints)
|
||||
|
||||
@controller.filterprotected('domain_id', 'enabled', 'name',
|
||||
'password_expires_at',
|
||||
callback=_check_group_protection)
|
||||
def list_users_in_group(self, request, filters, group_id):
|
||||
hints = UserV3.build_driver_hints(request, filters)
|
||||
|
@ -15,6 +15,7 @@
|
||||
"""Main entry point into the Identity service."""
|
||||
|
||||
import functools
|
||||
import operator
|
||||
import os
|
||||
import threading
|
||||
import uuid
|
||||
@ -36,6 +37,7 @@ from keystone import exception
|
||||
from keystone.i18n import _, _LW
|
||||
from keystone.identity.mapping_backends import mapping
|
||||
from keystone import notifications
|
||||
from oslo_utils import timeutils
|
||||
|
||||
|
||||
CONF = keystone.conf.CONF
|
||||
@ -958,6 +960,45 @@ class Manager(manager.Manager):
|
||||
return self._set_domain_id_and_mapping(
|
||||
ref, domain_id, driver, mapping.EntityType.USER)
|
||||
|
||||
def _translate_expired_password_hints(self, hints):
|
||||
"""Clean Up Expired Password Hints.
|
||||
|
||||
Any `password_expires_at` filters on the `list_users` or
|
||||
`list_users_in_group` queries are modified so the call will
|
||||
return valid data.
|
||||
|
||||
The filters `comparator` is changed to the operator specified in
|
||||
the call, otherwise it is assumed to be `equals`. The filters
|
||||
`value` becomes the timestamp specified. Both the operator and
|
||||
timestamp are validated, and will raise a InvalidOperatorError
|
||||
or ValidationTimeStampError exception respectively if invalid.
|
||||
|
||||
"""
|
||||
operators = {'lt': operator.lt, 'gt': operator.gt,
|
||||
'eq': operator.eq, 'lte': operator.le,
|
||||
'gte': operator.ge, 'neq': operator.ne}
|
||||
for filter_ in hints.filters:
|
||||
if 'password_expires_at' == filter_['name']:
|
||||
# password_expires_at must be in the format
|
||||
# 'lt:2016-11-06T15:32:17Z'. So we can assume the position
|
||||
# of the ':' otherwise assign the operator to equals.
|
||||
if ':' in filter_['value'][2:4]:
|
||||
op, timestamp = filter_['value'].split(':', 1)
|
||||
else:
|
||||
op = 'eq'
|
||||
timestamp = filter_['value']
|
||||
|
||||
try:
|
||||
filter_['value'] = timeutils.parse_isotime(timestamp)
|
||||
except ValueError:
|
||||
raise exception.ValidationTimeStampError
|
||||
|
||||
try:
|
||||
filter_['comparator'] = operators[op]
|
||||
except KeyError:
|
||||
raise exception.InvalidOperatorError(op)
|
||||
return hints
|
||||
|
||||
def _handle_federated_attributes_in_hints(self, driver, hints):
|
||||
federated_attributes = ['idp_id', 'protocol_id', 'unique_id']
|
||||
for filter_ in hints.filters:
|
||||
@ -979,6 +1020,7 @@ class Manager(manager.Manager):
|
||||
# We are effectively satisfying any domain_id filter by the above
|
||||
# driver selection, so remove any such filter.
|
||||
self._mark_domain_id_filter_satisfied(hints)
|
||||
hints = self._translate_expired_password_hints(hints)
|
||||
ref_list = self._handle_federated_attributes_in_hints(driver, hints)
|
||||
return self._set_domain_id_and_mapping(
|
||||
ref_list, domain_scope, driver, mapping.EntityType.USER)
|
||||
@ -1264,6 +1306,7 @@ class Manager(manager.Manager):
|
||||
# We are effectively satisfying any domain_id filter by the above
|
||||
# driver selection, so remove any such filter
|
||||
self._mark_domain_id_filter_satisfied(hints)
|
||||
hints = self._translate_expired_password_hints(hints)
|
||||
ref_list = driver.list_users_in_group(entity_id, hints)
|
||||
return self._set_domain_id_and_mapping(
|
||||
ref_list, domain_id, driver, mapping.EntityType.USER)
|
||||
|
@ -16,7 +16,9 @@
|
||||
import datetime
|
||||
|
||||
import freezegun
|
||||
from oslo_config import fixture as config_fixture
|
||||
from oslo_serialization import jsonutils
|
||||
from six.moves import http_client
|
||||
from six.moves import range
|
||||
|
||||
import keystone.conf
|
||||
@ -327,6 +329,397 @@ class IdentityTestFilteredCase(filtering.FilterTests,
|
||||
self.assertGreater(len(r.result.get('groups')), 0)
|
||||
|
||||
|
||||
class IdentityPasswordExpiryFilteredTestCase(filtering.FilterTests,
|
||||
test_v3.RestfulTestCase):
|
||||
"""Test password expiring filter on the v3 Identity API."""
|
||||
|
||||
def setUp(self):
|
||||
"""Setup for Identity Filter Test Cases."""
|
||||
self.config_fixture = self.useFixture(config_fixture.Config(CONF))
|
||||
super(IdentityPasswordExpiryFilteredTestCase, self).setUp()
|
||||
|
||||
def load_sample_data(self):
|
||||
"""Create sample data for password expiry tests.
|
||||
|
||||
The test environment will consist of a single domain, containing
|
||||
a single project. It will create three users and one group.
|
||||
Each user is going to be given a role assignment on the project
|
||||
and the domain. Two of the three users are going to be placed into
|
||||
the group, which won't have any role assignments to either the
|
||||
project or the domain.
|
||||
|
||||
"""
|
||||
self._populate_default_domain()
|
||||
self.domain = unit.new_domain_ref()
|
||||
self.resource_api.create_domain(self.domain['id'], self.domain)
|
||||
self.domain_id = self.domain['id']
|
||||
self.project = unit.new_project_ref(domain_id=self.domain_id)
|
||||
self.project_id = self.project['id']
|
||||
self.project = self.resource_api.create_project(self.project_id,
|
||||
self.project)
|
||||
self.group = unit.new_group_ref(domain_id=self.domain_id)
|
||||
self.group = self.identity_api.create_group(self.group)
|
||||
self.group_id = self.group['id']
|
||||
# Creates three users each with password expiration offset
|
||||
# by one day, starting with the current time frozen.
|
||||
self.starttime = datetime.datetime.utcnow()
|
||||
with freezegun.freeze_time(self.starttime):
|
||||
self.config_fixture.config(group='security_compliance',
|
||||
password_expires_days=1)
|
||||
self.user = unit.create_user(self.identity_api,
|
||||
domain_id=self.domain_id)
|
||||
self.config_fixture.config(group='security_compliance',
|
||||
password_expires_days=2)
|
||||
self.user2 = unit.create_user(self.identity_api,
|
||||
domain_id=self.domain_id)
|
||||
self.config_fixture.config(group='security_compliance',
|
||||
password_expires_days=3)
|
||||
self.user3 = unit.create_user(self.identity_api,
|
||||
domain_id=self.domain_id)
|
||||
self.role = unit.new_role_ref(name='admin')
|
||||
self.role_api.create_role(self.role['id'], self.role)
|
||||
self.role_id = self.role['id']
|
||||
# Grant admin role to the users created.
|
||||
self.assignment_api.create_grant(self.role_id,
|
||||
user_id=self.user['id'],
|
||||
domain_id=self.domain_id)
|
||||
self.assignment_api.create_grant(self.role_id,
|
||||
user_id=self.user2['id'],
|
||||
domain_id=self.domain_id)
|
||||
self.assignment_api.create_grant(self.role_id,
|
||||
user_id=self.user3['id'],
|
||||
domain_id=self.domain_id)
|
||||
self.assignment_api.create_grant(self.role_id,
|
||||
user_id=self.user['id'],
|
||||
project_id=self.project_id)
|
||||
self.assignment_api.create_grant(self.role_id,
|
||||
user_id=self.user2['id'],
|
||||
project_id=self.project_id)
|
||||
self.assignment_api.create_grant(self.role_id,
|
||||
user_id=self.user3['id'],
|
||||
project_id=self.project_id)
|
||||
# Add the last two users to the group.
|
||||
self.identity_api.add_user_to_group(self.user2['id'],
|
||||
self.group_id)
|
||||
self.identity_api.add_user_to_group(self.user3['id'],
|
||||
self.group_id)
|
||||
|
||||
def _list_users_by_password_expires_at(self, time, operator=None):
|
||||
"""Call `list_users` with `password_expires_at` filter.
|
||||
|
||||
GET /users?password_expires_at={operator}:{timestamp}
|
||||
|
||||
"""
|
||||
url = '/users?password_expires_at='
|
||||
if operator:
|
||||
url += operator + ':'
|
||||
url += str(time)
|
||||
return url
|
||||
|
||||
def _list_users_by_multiple_password_expires_at(
|
||||
self, first_time, first_operator, second_time, second_operator):
|
||||
"""Call `list_users` with two `password_expires_at` filters.
|
||||
|
||||
GET /users?password_expires_at={operator}:{timestamp}&
|
||||
{operator}:{timestamp}
|
||||
|
||||
"""
|
||||
url = ('/users?password_expires_at=%s:%s&password_expires_at=%s:%s' %
|
||||
(first_operator, first_time, second_operator, second_time))
|
||||
return url
|
||||
|
||||
def _format_timestamp(self, timestamp):
|
||||
return timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
def test_list_users_by_password_expires_at(self):
|
||||
"""Ensure users can be filtered on no operator, eq and neq.
|
||||
|
||||
GET /users?password_expires_at={timestamp}
|
||||
GET /users?password_expires_at=eq:{timestamp}
|
||||
|
||||
"""
|
||||
expire_at_url = self._list_users_by_password_expires_at(
|
||||
self._format_timestamp(
|
||||
self.starttime + datetime.timedelta(days=2)))
|
||||
resp_users = self.get(expire_at_url).result.get('users')
|
||||
self.assertEqual(self.user2['id'], resp_users[0]['id'])
|
||||
|
||||
# Same call as above, only explicitly stating equals
|
||||
expire_at_url = self._list_users_by_password_expires_at(
|
||||
self._format_timestamp(
|
||||
self.starttime + datetime.timedelta(days=2)), 'eq')
|
||||
resp_users = self.get(expire_at_url).result.get('users')
|
||||
self.assertEqual(self.user2['id'], resp_users[0]['id'])
|
||||
|
||||
expire_at_url = self._list_users_by_password_expires_at(
|
||||
self._format_timestamp(
|
||||
self.starttime + datetime.timedelta(days=2)), 'neq')
|
||||
resp_users = self.get(expire_at_url).result.get('users')
|
||||
self.assertEqual(self.user['id'], resp_users[0]['id'])
|
||||
self.assertEqual(self.user3['id'], resp_users[1]['id'])
|
||||
|
||||
def test_list_users_by_password_expires_before(self):
|
||||
"""Ensure users can be filtered on lt and lte.
|
||||
|
||||
GET /users?password_expires_at=lt:{timestamp}
|
||||
GET /users?password_expires_at=lte:{timestamp}
|
||||
|
||||
"""
|
||||
expire_before_url = self._list_users_by_password_expires_at(
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=2, seconds=1)), 'lt')
|
||||
resp_users = self.get(expire_before_url).result.get('users')
|
||||
self.assertEqual(self.user['id'], resp_users[0]['id'])
|
||||
self.assertEqual(self.user2['id'], resp_users[1]['id'])
|
||||
|
||||
expire_before_url = self._list_users_by_password_expires_at(
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=2)), 'lte')
|
||||
resp_users = self.get(expire_before_url).result.get('users')
|
||||
self.assertEqual(self.user['id'], resp_users[0]['id'])
|
||||
self.assertEqual(self.user2['id'], resp_users[1]['id'])
|
||||
|
||||
def test_list_users_by_password_expires_after(self):
|
||||
"""Ensure users can be filtered on gt and gte.
|
||||
|
||||
GET /users?password_expires_at=gt:{timestamp}
|
||||
GET /users?password_expires_at=gte:{timestamp}
|
||||
|
||||
"""
|
||||
expire_after_url = self._list_users_by_password_expires_at(
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=2, seconds=1)), 'gt')
|
||||
resp_users = self.get(expire_after_url).result.get('users')
|
||||
self.assertEqual(self.user3['id'], resp_users[0]['id'])
|
||||
|
||||
expire_after_url = self._list_users_by_password_expires_at(
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=2)), 'gte')
|
||||
resp_users = self.get(expire_after_url).result.get('users')
|
||||
self.assertEqual(self.user2['id'], resp_users[0]['id'])
|
||||
self.assertEqual(self.user3['id'], resp_users[1]['id'])
|
||||
|
||||
def test_list_users_by_password_expires_interval(self):
|
||||
"""Ensure users can be filtered on time intervals.
|
||||
|
||||
GET /users?password_expires_at=lt:{timestamp}>:{timestamp}
|
||||
GET /users?password_expires_at=lte:{timestamp}>e:{timestamp}
|
||||
|
||||
Time intervals are defined by using lt or lte and gt or gte,
|
||||
where the lt/lte time is greater than the gt/gte time.
|
||||
|
||||
"""
|
||||
expire_interval_url = (
|
||||
self._list_users_by_multiple_password_expires_at(
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=3)), 'lt', self._format_timestamp(
|
||||
self.starttime + datetime.timedelta(days=1)), 'gt'))
|
||||
resp_users = self.get(expire_interval_url).result.get('users')
|
||||
self.assertEqual(self.user2['id'], resp_users[0]['id'])
|
||||
|
||||
expire_interval_url = (
|
||||
self._list_users_by_multiple_password_expires_at(
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=2)), 'gte', self._format_timestamp(
|
||||
self.starttime + datetime.timedelta(
|
||||
days=2, seconds=1)), 'lte'))
|
||||
resp_users = self.get(expire_interval_url).result.get('users')
|
||||
self.assertEqual(self.user2['id'], resp_users[0]['id'])
|
||||
|
||||
def test_list_users_by_password_expires_with_bad_operator_fails(self):
|
||||
"""Ensure an invalid operator returns a Bad Request.
|
||||
|
||||
GET /users?password_expires_at={invalid_operator}:{timestamp}
|
||||
GET /users?password_expires_at={operator}:{timestamp}&
|
||||
{invalid_operator}:{timestamp}
|
||||
|
||||
"""
|
||||
bad_op_url = self._list_users_by_password_expires_at(
|
||||
self._format_timestamp(self.starttime), 'x')
|
||||
self.get(bad_op_url, expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
bad_op_url = self._list_users_by_multiple_password_expires_at(
|
||||
self._format_timestamp(self.starttime), 'lt',
|
||||
self._format_timestamp(self.starttime), 'x')
|
||||
self.get(bad_op_url, expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
def test_list_users_by_password_expires_with_bad_timestamp_fails(self):
|
||||
"""Ensure a invalid timestamp returns a Bad Request.
|
||||
|
||||
GET /users?password_expires_at={invalid_timestamp}
|
||||
GET /users?password_expires_at={operator}:{timestamp}&
|
||||
{operator}:{invalid_timestamp}
|
||||
|
||||
"""
|
||||
bad_ts_url = self._list_users_by_password_expires_at(
|
||||
self.starttime.strftime('%S:%M:%ST%Y-%m-%d'))
|
||||
self.get(bad_ts_url, expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
bad_ts_url = self._list_users_by_multiple_password_expires_at(
|
||||
self._format_timestamp(self.starttime), 'lt',
|
||||
self.starttime.strftime('%S:%M:%ST%Y-%m-%d'), 'gt')
|
||||
self.get(bad_ts_url, expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
def _list_users_in_group_by_password_expires_at(
|
||||
self, time, operator=None, expected_status=http_client.OK):
|
||||
"""Call `list_users_in_group` with `password_expires_at` filter.
|
||||
|
||||
GET /groups/{group_id}/users?password_expires_at=
|
||||
{operator}:{timestamp}&{operator}:{timestamp}
|
||||
|
||||
"""
|
||||
url = '/groups/' + self.group_id + '/users?password_expires_at='
|
||||
if operator:
|
||||
url += operator + ':'
|
||||
url += str(time)
|
||||
return url
|
||||
|
||||
def _list_users_in_group_by_multiple_password_expires_at(
|
||||
self, first_time, first_operator, second_time, second_operator,
|
||||
expected_status=http_client.OK):
|
||||
"""Call `list_users_in_group` with two `password_expires_at` filters.
|
||||
|
||||
GET /groups/{group_id}/users?password_expires_at=
|
||||
{operator}:{timestamp}&{operator}:{timestamp}
|
||||
|
||||
"""
|
||||
url = ('/groups/' + self.group_id + '/users'
|
||||
'?password_expires_at=%s:%s&password_expires_at=%s:%s' %
|
||||
(first_operator, first_time, second_operator, second_time))
|
||||
return url
|
||||
|
||||
def test_list_users_in_group_by_password_expires_at(self):
|
||||
"""Ensure users in a group can be filtered on no operator, eq, and neq.
|
||||
|
||||
GET /groups/{groupid}/users?password_expires_at={timestamp}
|
||||
GET /groups/{groupid}/users?password_expires_at=eq:{timestamp}
|
||||
|
||||
"""
|
||||
expire_at_url = self._list_users_in_group_by_password_expires_at(
|
||||
self._format_timestamp(
|
||||
self.starttime + datetime.timedelta(days=2)))
|
||||
resp_users = self.get(expire_at_url).result.get('users')
|
||||
self.assertEqual(self.user2['id'], resp_users[0]['id'])
|
||||
|
||||
# Same call as above, only explicitly stating equals
|
||||
expire_at_url = self._list_users_in_group_by_password_expires_at(
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=2)), 'eq')
|
||||
resp_users = self.get(expire_at_url).result.get('users')
|
||||
self.assertEqual(self.user2['id'], resp_users[0]['id'])
|
||||
|
||||
expire_at_url = self._list_users_in_group_by_password_expires_at(
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=2)), 'neq')
|
||||
resp_users = self.get(expire_at_url).result.get('users')
|
||||
self.assertEqual(self.user3['id'], resp_users[0]['id'])
|
||||
|
||||
def test_list_users_in_group_by_password_expires_before(self):
|
||||
"""Ensure users in a group can be filtered on with lt and lte.
|
||||
|
||||
GET /groups/{groupid}/users?password_expires_at=lt:{timestamp}
|
||||
GET /groups/{groupid}/users?password_expires_at=lte:{timestamp}
|
||||
|
||||
"""
|
||||
expire_before_url = self._list_users_in_group_by_password_expires_at(
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=2, seconds=1)), 'lt')
|
||||
resp_users = self.get(expire_before_url).result.get('users')
|
||||
self.assertEqual(self.user2['id'], resp_users[0]['id'])
|
||||
|
||||
expire_before_url = self._list_users_in_group_by_password_expires_at(
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=2)), 'lte')
|
||||
resp_users = self.get(expire_before_url).result.get('users')
|
||||
self.assertEqual(self.user2['id'], resp_users[0]['id'])
|
||||
|
||||
def test_list_users_in_group_by_password_expires_after(self):
|
||||
"""Ensure users in a group can be filtered on with gt and gte.
|
||||
|
||||
GET /groups/{groupid}/users?password_expires_at=gt:{timestamp}
|
||||
GET /groups/{groupid}/users?password_expires_at=gte:{timestamp}
|
||||
|
||||
"""
|
||||
expire_after_url = self._list_users_in_group_by_password_expires_at(
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=2, seconds=1)), 'gt')
|
||||
resp_users = self.get(expire_after_url).result.get('users')
|
||||
self.assertEqual(self.user3['id'], resp_users[0]['id'])
|
||||
|
||||
expire_after_url = self._list_users_in_group_by_password_expires_at(
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=2)), 'gte')
|
||||
resp_users = self.get(expire_after_url).result.get('users')
|
||||
self.assertEqual(self.user2['id'], resp_users[0]['id'])
|
||||
self.assertEqual(self.user3['id'], resp_users[1]['id'])
|
||||
|
||||
def test_list_users_in_group_by_password_expires_interval(self):
|
||||
"""Ensure users in a group can be filtered on time intervals.
|
||||
|
||||
GET /groups/{groupid}/users?password_expires_at=
|
||||
lt:{timestamp}>:{timestamp}
|
||||
GET /groups/{groupid}/users?password_expires_at=
|
||||
lte:{timestamp}>e:{timestamp}
|
||||
|
||||
Time intervals are defined by using lt or lte and gt or gte,
|
||||
where the lt/lte time is greater than the gt/gte time.
|
||||
|
||||
"""
|
||||
expire_interval_url = (
|
||||
self._list_users_in_group_by_multiple_password_expires_at(
|
||||
self._format_timestamp(self.starttime), 'gt',
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=3, seconds=1)), 'lt'))
|
||||
resp_users = self.get(expire_interval_url).result.get('users')
|
||||
self.assertEqual(self.user2['id'], resp_users[0]['id'])
|
||||
self.assertEqual(self.user3['id'], resp_users[1]['id'])
|
||||
|
||||
expire_interval_url = (
|
||||
self._list_users_in_group_by_multiple_password_expires_at(
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=2)), 'gte',
|
||||
self._format_timestamp(self.starttime + datetime.timedelta(
|
||||
days=3)), 'lte'))
|
||||
resp_users = self.get(expire_interval_url).result.get('users')
|
||||
self.assertEqual(self.user2['id'], resp_users[0]['id'])
|
||||
self.assertEqual(self.user3['id'], resp_users[1]['id'])
|
||||
|
||||
def test_list_users_in_group_by_password_expires_bad_operator_fails(self):
|
||||
"""Ensure an invalid operator returns a Bad Request.
|
||||
|
||||
GET /groups/{groupid}/users?password_expires_at=
|
||||
{invalid_operator}:{timestamp}
|
||||
GET /groups/{group_id}/users?password_expires_at=
|
||||
{operator}:{timestamp}&{invalid_operator}:{timestamp}
|
||||
|
||||
"""
|
||||
bad_op_url = self._list_users_in_group_by_password_expires_at(
|
||||
self._format_timestamp(self.starttime), 'bad')
|
||||
self.get(bad_op_url, expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
bad_op_url = self._list_users_in_group_by_multiple_password_expires_at(
|
||||
self._format_timestamp(self.starttime), 'lt',
|
||||
self._format_timestamp(self.starttime), 'x')
|
||||
self.get(bad_op_url, expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
def test_list_users_in_group_by_password_expires_bad_timestamp_fails(self):
|
||||
"""Ensure and invalid timestamp returns a Bad Request.
|
||||
|
||||
GET /groups/{groupid}/users?password_expires_at={invalid_timestamp}
|
||||
GET /groups/{groupid}/users?password_expires_at={operator}:{timestamp}&
|
||||
{operator}:{invalid_timestamp}
|
||||
|
||||
"""
|
||||
bad_ts_url = self._list_users_in_group_by_password_expires_at(
|
||||
self.starttime.strftime('%S:%M:%ST%Y-%m-%d'))
|
||||
self.get(bad_ts_url, expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
bad_ts_url = self._list_users_in_group_by_multiple_password_expires_at(
|
||||
self._format_timestamp(self.starttime), 'lt',
|
||||
self.starttime.strftime('%S:%M:%ST%Y-%m-%d'), 'gt')
|
||||
self.get(bad_ts_url, expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
|
||||
class IdentityTestListLimitCase(IdentityTestFilteredCase):
|
||||
"""Test list limiting enforcement on the v3 Identity API."""
|
||||
|
||||
|
@ -0,0 +1,15 @@
|
||||
---
|
||||
features:
|
||||
- >
|
||||
[`blueprint pci-dss-query-password-expired-users <https://blueprints.launchpad.net/keystone/+spec/pci-dss-query-password-expired-users>`_]
|
||||
Added a ``password_expires_at`` query to ``/v3/users`` and
|
||||
``/v3/groups/{group_id}/users``. The ``password_expires_at`` query is
|
||||
comprised of two parts, an ``operator`` (valid choices listed below)
|
||||
and a ``timestamp`` (of form ``YYYY-MM-DDTHH:mm:ssZ``). The APIs will
|
||||
filter the list of users based on the ``operator`` and ``timestamp`` given.
|
||||
* lt - password expires before the timestamp
|
||||
* lte - password expires at or before timestamp
|
||||
* gt - password expires after the timestamp
|
||||
* gte - password expires at or after the timestamp
|
||||
* eq - password expires at the timestamp
|
||||
* neq - password expires not at the timestamp
|
Loading…
x
Reference in New Issue
Block a user