Implement JWS token provider

This commit introduces a class that implements the JWS token provider
functionality.

bp json-web-tokens

Change-Id: Ie16110894348a83e3a80cba4649e6cccdc3c84b1
This commit is contained in:
Lance Bragstad 2018-10-31 15:23:07 +00:00
parent 374b03b015
commit 96adccd0ec
7 changed files with 465 additions and 4 deletions

View File

@ -38,10 +38,15 @@ provider = cfg.StrOpt(
help=utils.fmt("""
Entry point for the token provider in the `keystone.token.provider` namespace.
The token provider controls the token construction, validation, and revocation
operations. Keystone includes `fernet` token provider.
`fernet` tokens do not need to be persisted at all, but require that you run
`keystone-manage fernet_setup` (also see the `keystone-manage fernet_rotate`
command).
operations. Supported upstream providers are `fernet` and `jws`. Neither
`fernet` or `jws` tokens require persistence and both require additional setup.
If using `fernet`, you're required to run `keystone-manage fernet_setup`, which
creates symmetric keys used to encrypt tokens. If using `jws`, you're required
to generate an ECDSA keypair using a SHA-256 hash algorithm for signing and
validating token, which can be done with `keystone-manage create_jws_keypair`.
Note that `fernet` tokens are encrypted and `jws` tokens are only signed.
Please be sure to consider this if your deployment has security requirements
regarding payload contents used to generate token IDs.
"""))
caching = cfg.BoolOpt(

View File

@ -2803,6 +2803,64 @@ class TestFernetTokenAPIs(test_v3.RestfulTestCase, TokenAPITests,
)
class TestJWSTokenAPIs(test_v3.RestfulTestCase, TokenAPITests, TokenDataTests):
def config_overrides(self):
super(TestJWSTokenAPIs, self).config_overrides()
self.config_fixture.config(group='token', provider='jws',
cache_on_issue=True)
self.useFixture(ksfixtures.JWSKeyRepository(self.config_fixture))
def setUp(self):
super(TestJWSTokenAPIs, self).setUp()
self.doSetUp()
def _make_auth_request(self, auth_data):
token = super(TestJWSTokenAPIs, self)._make_auth_request(auth_data)
self.assertLess(len(token), 350)
return token
def test_validate_tampered_unscoped_token_fails(self):
unscoped_token = self._get_unscoped_token()
tampered_token = (unscoped_token[:50] + uuid.uuid4().hex +
unscoped_token[50 + 32:])
self._validate_token(tampered_token,
expected_status=http_client.NOT_FOUND)
def test_validate_tampered_project_scoped_token_fails(self):
project_scoped_token = self._get_project_scoped_token()
tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex +
project_scoped_token[50 + 32:])
self._validate_token(tampered_token,
expected_status=http_client.NOT_FOUND)
def test_validate_tampered_trust_scoped_token_fails(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Get a trust scoped token
tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex +
trust_scoped_token[50 + 32:])
self._validate_token(tampered_token,
expected_status=http_client.NOT_FOUND)
def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
# NOTE(amakarov): have to override this test for non-persistent tokens
# as TokenNotFound exception makes no sense for those.
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token
r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r)
# Disable the trustor
trustor_update_ref = dict(enabled=False)
PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
# Ensure validating a token for a disabled user fails
self._validate_token(
trust_scoped_token,
expected_status=http_client.FORBIDDEN
)
class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
"""Test token revoke using v3 Identity API by token owner and admin."""

View File

@ -2985,6 +2985,68 @@ class FernetFederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin):
self._check_project_scoped_token_attributes(token_resp, project['id'])
class JWSFederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin):
AUTH_METHOD = 'token'
def load_fixtures(self, fixtures):
super(JWSFederatedTokenTests, self).load_fixtures(fixtures)
self.load_federation_sample_data()
def config_overrides(self):
super(JWSFederatedTokenTests, self).config_overrides()
self.config_fixture.config(group='token', provider='jws')
self.useFixture(ksfixtures.JWSKeyRepository(self.config_fixture))
def auth_plugin_config_override(self):
methods = ['saml2', 'token', 'password']
super(JWSFederatedTokenTests,
self).auth_plugin_config_override(methods)
def test_federated_unscoped_token(self):
token_model = self._issue_unscoped_token()
self.assertValidMappedUser(
render_token.render_token_response_from_model(token_model)['token']
)
def test_federated_unscoped_token_with_multiple_groups(self):
assertion = 'ANOTHER_CUSTOMER_ASSERTION'
token_model = self._issue_unscoped_token(assertion=assertion)
self.assertValidMappedUser(
render_token.render_token_response_from_model(token_model)['token']
)
def test_validate_federated_unscoped_token(self):
token_model = self._issue_unscoped_token()
unscoped_token = token_model.id
# assert that the token we received is valid
self.get('/auth/tokens/', headers={'X-Subject-Token': unscoped_token})
def test_jws_full_workflow(self):
"""Test 'standard' workflow for granting JWS tokens.
* Issue unscoped token
* List available projects based on groups
* Scope token to one of available projects
"""
token_model = self._issue_unscoped_token()
self.assertValidMappedUser(
render_token.render_token_response_from_model(token_model)['token']
)
unscoped_token = token_model.id
resp = self.get('/auth/projects', token=unscoped_token)
projects = resp.result['projects']
random_project = random.randint(0, len(projects) - 1)
project = projects[random_project]
v3_scope_request = self._scope_request(unscoped_token,
'project', project['id'])
resp = self.v3_create_token(v3_scope_request)
token_resp = resp.result['token']
self._check_project_scoped_token_attributes(token_resp, project['id'])
class FederatedTokenTestsMethodToken(FederatedTokenTests):
"""Test federation operation with unified scoping auth method.

View File

@ -0,0 +1,126 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import uuid
from keystone.common import jwt_utils
from keystone.common import provider_api
from keystone.common import utils
import keystone.conf
from keystone import exception
from keystone.models import token_model
from keystone.tests import unit
from keystone.tests.unit import ksfixtures
from keystone.token import provider
from keystone.token.providers import jws
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class TestJWSProvider(unit.TestCase):
def setUp(self):
super(TestJWSProvider, self).setUp()
self.config_fixture.config(group='token', provider='jws')
self.useFixture(ksfixtures.JWSKeyRepository(self.config_fixture))
self.provider = jws.Provider()
def test_invalid_token_raises_token_not_found(self):
token_id = uuid.uuid4().hex
self.assertRaises(
exception.TokenNotFound,
self.provider.validate_token,
token_id
)
def test_non_existent_private_key_raises_system_exception(self):
private_key = os.path.join(
CONF.jwt_tokens.jws_private_key_repository, 'private.pem'
)
os.remove(private_key)
self.assertRaises(SystemExit, jws.Provider)
def test_non_existent_public_key_repo_raises_system_exception(self):
for f in os.listdir(CONF.jwt_tokens.jws_public_key_repository):
path = os.path.join(CONF.jwt_tokens.jws_public_key_repository, f)
os.remove(path)
os.rmdir(CONF.jwt_tokens.jws_public_key_repository)
self.assertRaises(SystemExit, jws.Provider)
def test_empty_public_key_repo_raises_system_exception(self):
for f in os.listdir(CONF.jwt_tokens.jws_public_key_repository):
path = os.path.join(CONF.jwt_tokens.jws_public_key_repository, f)
os.remove(path)
self.assertRaises(SystemExit, jws.Provider)
def test_unable_to_verify_token_with_missing_public_key(self):
# create token, signing with private key
token = token_model.TokenModel()
token.methods = ['password']
token.user_id = uuid.uuid4().hex
token.audit_id = provider.random_urlsafe_str()
token.expires_at = utils.isotime(
provider.default_expire_time(), subsecond=True
)
token_id, issued_at = self.provider.generate_id_and_issued_at(token)
# remove the public key for the token we just created
current_pub_key = os.path.join(
CONF.jwt_tokens.jws_public_key_repository, 'public.pem'
)
os.remove(current_pub_key)
# create additional public keys
for _ in range(2):
private_key_path = os.path.join(
CONF.jwt_tokens.jws_private_key_repository,
uuid.uuid4().hex
)
pub_key_path = os.path.join(
CONF.jwt_tokens.jws_public_key_repository,
uuid.uuid4().hex
)
jwt_utils.create_jws_keypair(private_key_path, pub_key_path)
# validate token and ensure it returns a 404
self.assertRaises(
exception.TokenNotFound,
self.provider.validate_token,
token_id
)
def test_verify_token_with_multiple_public_keys_present(self):
token = token_model.TokenModel()
token.methods = ['password']
token.user_id = uuid.uuid4().hex
token.audit_id = provider.random_urlsafe_str()
token.expires_at = utils.isotime(
provider.default_expire_time(), subsecond=True
)
token_id, issued_at = self.provider.generate_id_and_issued_at(token)
for _ in range(2):
private_key_path = os.path.join(
CONF.jwt_tokens.jws_private_key_repository,
uuid.uuid4().hex
)
pub_key_path = os.path.join(
CONF.jwt_tokens.jws_public_key_repository,
uuid.uuid4().hex
)
jwt_utils.create_jws_keypair(private_key_path, pub_key_path)
# make sure we iterate through all public keys on disk and we can still
# validate the token
self.provider.validate_token(token_id)

View File

@ -0,0 +1,13 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.token.providers.jws.core import * # noqa

View File

@ -0,0 +1,196 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import os
import jwt
from oslo_utils import timeutils
from keystone.common import utils
import keystone.conf
from keystone import exception
from keystone.i18n import _
from keystone.token.providers import base
CONF = keystone.conf.CONF
class Provider(base.Provider):
def __init__(self, *args, **kwargs):
super(Provider, self).__init__(*args, **kwargs)
# NOTE(lbragstad): We add these checks here because if the jws
# provider is going to be used and either the `key_repository` is empty
# or doesn't exist we should fail, hard. It doesn't make sense to start
# keystone and just 500 because we can't do anything with an empty or
# non-existant key repository.
private_key = os.path.join(
CONF.jwt_tokens.jws_private_key_repository, 'private.pem'
)
public_key_repo = CONF.jwt_tokens.jws_public_key_repository
if not os.path.exists(private_key):
subs = {'private_key': private_key}
raise SystemExit(_(
'%(private_key)s does not exist. You can generate a key pair '
'using `keystone-manage create_jws_keypair`.') % subs)
if not os.path.exists(public_key_repo):
subs = {'public_key_repo': public_key_repo}
raise SystemExit(_(
'%(public_key_repo)s does not exist. Please make sure the '
'directory exists and is readable by the process running '
'keystone.') % subs)
if len(os.listdir(public_key_repo)) == 0:
subs = {'public_key_repo': public_key_repo}
msg = _(
'%(public_key_repo)s must contain at least one public '
'key but it is empty. You can generate a key pair using '
'`keystone-manage create_jws_keypair`.'
)
raise SystemExit(msg % subs)
self.token_formatter = JWSFormatter()
def generate_id_and_issued_at(self, token):
return self.token_formatter.create_token(
token.user_id, token.expires_at, token.audit_ids, token.methods,
system=token.system, domain_id=token.domain_id,
project_id=token.project_id, trust_id=token.trust_id,
federated_group_ids=token.federated_groups,
identity_provider_id=token.identity_provider_id,
protocol_id=token.protocol_id,
access_token_id=token.access_token_id,
app_cred_id=token.application_credential_id
)
def validate_token(self, token_id):
return self.token_formatter.validate_token(token_id)
class JWSFormatter(object):
# NOTE(lbragstad): If in the future we expand support for different
# algorithms, make this configurable and validate it against a blessed list
# of supported algorithms.
algorithm = 'ES256'
@property
def private_key(self):
private_key_path = os.path.join(
CONF.jwt_tokens.jws_private_key_repository, 'private.pem'
)
with open(private_key_path, 'r') as f:
key = f.read()
return key
@property
def public_keys(self):
keys = []
key_repo = CONF.jwt_tokens.jws_public_key_repository
for keyfile in os.listdir(key_repo):
with open(os.path.join(key_repo, keyfile), 'r') as f:
keys.append(f.read())
return keys
def create_token(self, user_id, expires_at, audit_ids, methods,
system=None, domain_id=None, project_id=None,
trust_id=None, federated_group_ids=None,
identity_provider_id=None, protocol_id=None,
access_token_id=None, app_cred_id=None):
issued_at = utils.isotime(subsecond=True)
issued_at_int = self._convert_time_string_to_int(issued_at)
expires_at_int = self._convert_time_string_to_int(expires_at)
payload = {
# public claims
'sub': user_id,
'iat': issued_at_int,
'exp': expires_at_int,
# private claims
'openstack_methods': methods,
'openstack_audit_ids': audit_ids,
'openstack_system': system,
'openstack_domain_id': domain_id,
'openstack_project_id': project_id,
'openstack_trust_id': trust_id,
'openstack_group_ids': federated_group_ids,
'openstack_idp_id': identity_provider_id,
'openstack_protocol_id': protocol_id,
'openstack_access_token_id': access_token_id,
'openstack_app_cred_id': app_cred_id
}
# NOTE(lbragstad): Calling .items() on a dictionary in python 2 returns
# a list but returns an iterable in python 3. Casting to a list makes
# it safe to modify the dictionary while iterating over it, regardless
# of the python version.
for k, v in list(payload.items()):
if v is None:
payload.pop(k)
token_id = jwt.encode(
payload,
self.private_key,
algorithm=JWSFormatter.algorithm
)
return token_id, issued_at
def validate_token(self, token_id):
payload = self._decode_token_from_id(token_id)
user_id = payload['sub']
expires_at_int = payload['exp']
issued_at_int = payload['iat']
methods = payload['openstack_methods']
audit_ids = payload['openstack_audit_ids']
system = payload.get('openstack_system', None)
domain_id = payload.get('openstack_domain_id', None)
project_id = payload.get('openstack_project_id', None)
trust_id = payload.get('openstack_trust_id', None)
federated_group_ids = payload.get('openstack_group_ids', None)
identity_provider_id = payload.get('openstack_idp_id', None)
protocol_id = payload.get('openstack_protocol_id', None)
access_token_id = payload.get('openstack_access_token_id', None)
app_cred_id = payload.get('openstack_app_cred_id', None)
issued_at = self._convert_time_int_to_string(issued_at_int)
expires_at = self._convert_time_int_to_string(expires_at_int)
return (
user_id, methods, audit_ids, system, domain_id, project_id,
trust_id, federated_group_ids, identity_provider_id, protocol_id,
access_token_id, app_cred_id, issued_at, expires_at
)
def _decode_token_from_id(self, token_id):
for public_key in self.public_keys:
try:
return jwt.decode(
token_id, public_key, algorithms=JWSFormatter.algorithm
)
except (jwt.InvalidSignatureError, jwt.DecodeError,
jwt.ExpiredSignatureError):
pass # nosec: We want to exhaustively try all public keys
raise exception.TokenNotFound(token_id=token_id)
def _convert_time_string_to_int(self, time_str):
time_object = timeutils.parse_isotime(time_str)
normalized = timeutils.normalize_time(time_object)
epoch = datetime.datetime.utcfromtimestamp(0)
return int((normalized - epoch).total_seconds())
def _convert_time_int_to_string(self, time_int):
time_object = datetime.datetime.utcfromtimestamp(time_int)
return utils.isotime(at=time_object, subsecond=True)

View File

@ -144,6 +144,7 @@ keystone.role =
keystone.token.provider =
fernet = keystone.token.providers.fernet:Provider
jws = keystone.token.providers.jws:Provider
keystone.receipt.provider =
fernet = keystone.receipt.providers.fernet:Provider