The Gatekeeper, or a project gating system
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

97 lines
3.0 KiB

# Copyright 2020 OpenStack Foundation
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from unittest import mock
import os.path
import jwt
import time
from zuul.driver import auth
from tests.base import BaseTestCase, FIXTURE_DIR
with open(os.path.join(FIXTURE_DIR,
'auth/openid-configuration.json'), 'r') as well_known:
FAKE_WELL_KNOWN_CONFIG = json.loads(well_known.read())
algo = jwt.algorithms.RSAAlgorithm(jwt.algorithms.RSAAlgorithm.SHA256)
with open(os.path.join(FIXTURE_DIR,
'auth/oidc-key'), 'r') as k:
OIDC_PRIVATE_KEY = algo.prepare_key(k.read().encode('utf-8'))
with open(os.path.join(FIXTURE_DIR,
'auth/oidc-key.pub'), 'r') as k:
pub_key = algo.prepare_key(k.read().encode('utf-8'))
pub_jwk = algo.to_jwk(pub_key)
key = {
"kid": "OwO",
"use": "sig",
"alg": "RS256"
}
key.update(json.loads(pub_jwk))
# not present in keycloak jwks
if "key_ops" in key:
del key["key_ops"]
FAKE_CERTS = {
"keys": [
key
]
}
def mock_get(url, params=None, **kwargs):
if url == ("https://my.oidc.provider/auth/realms/realm-one/"
".well-known/openid-configuration"):
return FakeResponse(FAKE_WELL_KNOWN_CONFIG)
elif url == ("https://my.oidc.provider/auth/realms/realm-one/"
"protocol/openid-connect/certs"):
return FakeResponse(FAKE_CERTS)
else:
raise Exception("Unknown URL %s" % url)
class FakeResponse:
def __init__(self, json_dict):
self._json = json_dict
def json(self):
return self._json
class TestOpenIDConnectAuthenticator(BaseTestCase):
def test_decodeToken(self):
"""Test the decoding workflow"""
config = {
'issuer_id': FAKE_WELL_KNOWN_CONFIG['issuer'],
'client_id': 'zuul-app',
'realm': 'realm-one',
}
OIDCAuth = auth.jwt.OpenIDConnectAuthenticator(**config)
payload = {
'iss': FAKE_WELL_KNOWN_CONFIG['issuer'],
'aud': config['client_id'],
'exp': time.time() + 3600,
'sub': 'someone'
}
token = jwt.encode(
payload,
OIDC_PRIVATE_KEY,
algorithm='RS256',
headers={'kid': 'OwO'})
with mock.patch('requests.get', side_effect=mock_get):
decoded = OIDCAuth.decodeToken(token)
for claim in payload.keys():
self.assertEqual(payload[claim], decoded[claim])