External OAuth2.0 Authorization Server Support

Support Tacker service to obtain an OAuth 2.0 access token from an
external authorization server, and then use the access token to access
related OpenStack services that uses the external_oauth2_token filter
provided by the keystone middleware for permission authentication.

Implements: blueprint enhance-oauth2-interoperability
Change-Id: Ic6ee3c74f5a0e6b7c566033b32ae7308bc198a49
This commit is contained in:
sunyonggen 2023-01-20 09:43:50 +09:00 committed by Yusuke Niimi
parent c0edafba32
commit 4a2cce57c2
22 changed files with 1794 additions and 32 deletions

View File

@ -14,6 +14,7 @@ namespace = oslo.service.service
namespace = tacker.alarm_receiver namespace = tacker.alarm_receiver
namespace = tacker.auth namespace = tacker.auth
namespace = tacker.common.config namespace = tacker.common.config
namespace = tacker.common.ext_oauth2_auth
namespace = tacker.conductor.conductor_server namespace = tacker.conductor.conductor_server
namespace = tacker.conf namespace = tacker.conf
namespace = tacker.keymgr namespace = tacker.keymgr

View File

@ -0,0 +1,6 @@
---
features:
- Support Tacker service to obtain an OAuth 2.0 access token from an
external authorization server, and then use the access token to access
related OpenStack services that uses the external_oauth2_token filter
provided by the keystone middleware for permission authentication.

View File

@ -53,6 +53,7 @@ setuptools!=24.0.0,!=34.0.0,!=34.0.1,!=34.0.2,!=34.0.3,!=34.1.0,!=34.1.1,!=34.2.
tooz>=1.58.0 # Apache-2.0 tooz>=1.58.0 # Apache-2.0
PyYAML>=5.4.1 # MIT PyYAML>=5.4.1 # MIT
PyMySQL>=0.10.1 # MIT PyMySQL>=0.10.1 # MIT
PyJWT>=2.4.0 # MIT
# Glance Store # Glance Store
glance-store>=2.4.0 # Apache-2.0 glance-store>=2.4.0 # Apache-2.0

View File

@ -81,6 +81,7 @@ oslo.config.opts =
tacker.alarm_receiver = tacker.alarm_receiver:config_opts tacker.alarm_receiver = tacker.alarm_receiver:config_opts
tacker.auth = tacker.auth:config_opts tacker.auth = tacker.auth:config_opts
tacker.common.config = tacker.common.config:config_opts tacker.common.config = tacker.common.config:config_opts
tacker.common.ext_oauth2_auth = tacker.common.ext_oauth2_auth:config_opts
tacker.conductor.conductor_server = tacker.conductor.conductor_server:config_opts tacker.conductor.conductor_server = tacker.conductor.conductor_server:config_opts
tacker.conf = tacker.conf.opts:list_opts tacker.conf = tacker.conf.opts:list_opts
tacker.keymgr = tacker.keymgr:config_opts tacker.keymgr = tacker.keymgr:config_opts

View File

@ -246,14 +246,9 @@ class CryptKeyBase(metaclass=abc.ABCMeta):
class CryptKeyBarbican(CryptKeyBase): class CryptKeyBarbican(CryptKeyBase):
def load_key(self, id): def load_key(self, id):
k_context = t_context.generate_tacker_service_context() k_context = t_context.generate_tacker_service_context()
# After external authorization server support for barbican is if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
# implemented, the endpoint retrieval method for keymgr_api must be keymgr_api = KEYMGR_API(CONF.ext_oauth2_auth.token_endpoint)
# changed by enabling the following commented out part: else:
# if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
# keymgr_api = KEYMGR_API(CONF.ext_oauth2_auth.token_endpoint)
# else:
# keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url) keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
secret_obj = keymgr_api.get(k_context, id) secret_obj = keymgr_api.get(k_context, id)
master_key = secret_obj.payload master_key = secret_obj.payload
@ -261,14 +256,9 @@ class CryptKeyBarbican(CryptKeyBase):
def save_key(self, key): def save_key(self, key):
k_context = t_context.generate_tacker_service_context() k_context = t_context.generate_tacker_service_context()
# After external authorization server support for barbican is if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
# implemented, the endpoint retrieval method for keymgr_api must be keymgr_api = KEYMGR_API(CONF.ext_oauth2_auth.token_endpoint)
# changed by enabling the following commented out part: else:
# if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
# keymgr_api = KEYMGR_API(CONF.ext_oauth2_auth.token_endpoint)
# else:
# keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url) keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
secret_uuid = keymgr_api.store(k_context, key) secret_uuid = keymgr_api.store(k_context, key)
return secret_uuid return secret_uuid

View File

@ -0,0 +1,370 @@
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Auth for External Server OAuth2.0 authentication
"""
import time
import uuid
import jwt.utils
from oslo_config import cfg
from oslo_log import log as logging
import requests.auth
from keystoneauth1 import exceptions as ksa_exceptions
from keystoneauth1.loading import session as session_loading
from tacker._i18n import _
from tacker.common.exceptions import TackerException
LOG = logging.getLogger(__name__)
_EXT_AUTH_CONFIG_GROUP_NAME = 'ext_oauth2_auth'
_EXTERNAL_AUTH2_OPTS = [
cfg.BoolOpt('use_ext_oauth2_auth', default=False,
help='Set True to use external Oauth2.0 auth server.'),
cfg.StrOpt('token_endpoint',
help='The endpoint for access token API.'),
cfg.StrOpt('scope',
help='The scope that the access token can access.'),
]
_EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS = [
cfg.StrOpt('certfile',
help='Required if identity server requires client '
'certificate.'),
cfg.StrOpt('keyfile',
help='Required if identity server requires client '
'private key.'),
cfg.StrOpt('cafile',
help='A PEM encoded Certificate Authority to use when '
'verifying HTTPs connections. Defaults to system CAs.'),
cfg.BoolOpt('insecure', default=False, help='Verify HTTPS connections.'),
cfg.IntOpt('http_connect_timeout',
help='Request timeout value for communicating with Identity '
'API server.'),
cfg.StrOpt('audience',
help='The Audience should be the URL of the Authorization '
"Server's Token Endpoint. The Authorization Server will "
'verify that it is an intended audience for the token.'),
cfg.StrOpt('auth_method',
default='client_secret_basic',
choices=('client_secret_basic', 'client_secret_post',
'tls_client_auth', 'private_key_jwt',
'client_secret_jwt'),
help='The auth_method must use the authentication method '
'specified by the Authorization Server.'),
cfg.StrOpt('client_id',
help='The OAuth 2.0 Client Identifier valid at the '
'Authorization Server.'),
cfg.StrOpt('client_secret',
help='The OAuth 2.0 client secret. When the auth_method is '
'client_secret_basic, client_secret_post, or '
'client_secret_jwt, the value is used, and otherwise the '
'value is ignored.'),
cfg.StrOpt('jwt_key_file',
help='The jwt_key_file must use the certificate key file which '
'has been registered with the Authorization Server. '
'When the auth_method is private_key_jwt, the value is '
'used, and otherwise the value is ignored.'),
cfg.StrOpt('jwt_algorithm',
help='The jwt_algorithm must use the algorithm specified by '
'the Authorization Server. When the auth_method is '
'client_secret_jwt, this value is often set to HS256,'
'when the auth_method is private_key_jwt, the value is '
'often set to RS256, and otherwise the value is ignored.'),
cfg.IntOpt('jwt_bearer_time_out', default=3600,
help='This value is used to calculate the expiration time. If '
'after the expiration time, the access token cannot be '
'accepted. When the auth_method is client_secret_jwt or '
'private_key_jwt, the value is used, and otherwise the '
'value is ignored.'),
]
def config_opts():
return [(_EXT_AUTH_CONFIG_GROUP_NAME,
_EXTERNAL_AUTH2_OPTS + _EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS)]
cfg.CONF.register_opts(_EXTERNAL_AUTH2_OPTS,
group=_EXT_AUTH_CONFIG_GROUP_NAME)
class ExtOAuth2Auth(object):
"""Construct an Auth to fetch an access token for HTTP access."""
def __init__(self):
self._conf = cfg.CONF.ext_oauth2_auth
# Check whether the configuration parameter has been registered
if 'auth_method' not in self._conf:
LOG.debug('The relevant config parameters are not registered '
'and need to be registered before they can be used.')
cfg.CONF.register_opts(_EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS,
group=_EXT_AUTH_CONFIG_GROUP_NAME)
self.token_endpoint = self._get_config_option(
'token_endpoint', is_required=True)
self.auth_method = self._get_config_option(
'auth_method', is_required=True)
self.client_id = self._get_config_option(
'client_id', is_required=True)
self.scope = self._get_config_option(
'scope', is_required=True)
self.access_token = None
def _get_config_option(self, key, is_required):
"""Read the value from config file by the config key."""
try:
value = getattr(self._conf, key)
except cfg.NoSuchOptError:
value = None
if not value:
if is_required:
LOG.error('The value is required for option %s '
'in group [%s]' % (key,
_EXT_AUTH_CONFIG_GROUP_NAME))
raise TackerException(
_('Configuration error. The parameter '
'is not set for "%s" in group [%s].') % (
key, _EXT_AUTH_CONFIG_GROUP_NAME))
else:
return None
else:
return value
def create_session(self, **kwargs):
"""Create session for HTTP access."""
kwargs.setdefault('cert', self._get_config_option(
'certfile', is_required=False))
kwargs.setdefault('key', self._get_config_option(
'keyfile', is_required=False))
kwargs.setdefault('cacert', self._get_config_option(
'cafile', is_required=False))
kwargs.setdefault('insecure', self._get_config_option(
'insecure', is_required=False))
kwargs.setdefault('timeout', self._get_config_option(
'http_connect_timeout', is_required=False))
kwargs.setdefault('user_agent', 'tacker service')
sess = session_loading.Session().load_from_options(**kwargs)
sess.auth = self
return sess
def get_connection_params(self, session, **kwargs):
"""Get connection params for HTTP access."""
return {}
def invalidate(self):
"""Invalidate the current authentication data."""
self.access_token = None
return True
def _get_token_by_client_secret_basic(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'client_secret_basic'.
"""
para = {
'scope': self.scope,
'grant_type': 'client_credentials'
}
auth = requests.auth.HTTPBasicAuth(
self.client_id, self._get_config_option(
'client_secret', is_required=True))
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para,
requests_auth=auth)
return http_response
def _get_token_by_client_secret_post(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'client_secret_post'.
"""
para = {
'client_id': self.client_id,
'client_secret': self._get_config_option(
'client_secret', is_required=True),
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def _get_token_by_tls_client_auth(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'tls_client_auth'.
"""
para = {
'client_id': self.client_id,
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def _get_token_by_private_key_jwt(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'private_key_jwt'.
"""
jwt_key_file = self._get_config_option(
'jwt_key_file', is_required=True)
with open(jwt_key_file, 'r') as jwt_file:
jwt_key = jwt_file.read()
ita = round(time.time())
exp = ita + self._get_config_option(
'jwt_bearer_time_out', is_required=True)
alg = self._get_config_option('jwt_algorithm', is_required=True)
client_assertion = jwt.encode(
payload={
'jti': str(uuid.uuid4()),
'iat': str(ita),
'exp': str(exp),
'iss': self.client_id,
'sub': self.client_id,
'aud': self._get_config_option('audience', is_required=True)},
headers={
'typ': 'JWT',
'alg': alg},
key=jwt_key,
algorithm=alg)
para = {
'client_id': self.client_id,
'client_assertion_type':
'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion,
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def _get_token_by_client_secret_jwt(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'client_secret_jwt'.
"""
ita = round(time.time())
exp = ita + self._get_config_option(
'jwt_bearer_time_out', is_required=True)
alg = self._get_config_option('jwt_algorithm', is_required=True)
client_secret = self._get_config_option(
'client_secret', is_required=True)
client_assertion = jwt.encode(
payload={
'jti': str(uuid.uuid4()),
'iat': str(ita),
'exp': str(exp),
'iss': self.client_id,
'sub': self.client_id,
'aud': self._get_config_option('audience', is_required=True)},
headers={
'typ': 'JWT',
'alg': alg},
key=client_secret,
algorithm=alg)
para = {
'client_id': self.client_id,
'client_secret': client_secret,
'client_assertion_type':
'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion,
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def get_headers(self, session, **kwargs):
"""Get an access token and add to request header for HTTP access."""
if not self.access_token:
try:
if self.auth_method == 'tls_client_auth':
http_response = self._get_token_by_tls_client_auth(session)
elif self.auth_method == 'client_secret_post':
http_response = self._get_token_by_client_secret_post(
session)
elif self.auth_method == 'client_secret_basic':
http_response = self._get_token_by_client_secret_basic(
session)
elif self.auth_method == 'private_key_jwt':
http_response = self._get_token_by_private_key_jwt(
session)
elif self.auth_method == 'client_secret_jwt':
http_response = self._get_token_by_client_secret_jwt(
session)
else:
LOG.error('The value is incorrect for option '
'auth_method in group [%s]' %
_EXT_AUTH_CONFIG_GROUP_NAME)
raise TackerException(
_('The configuration parameter for '
'key "auth_method" in group [%s] is incorrect.') %
_EXT_AUTH_CONFIG_GROUP_NAME)
LOG.debug(http_response.text)
if http_response.status_code != 200:
LOG.error('The OAuth2.0 access token API returns an '
'incorrect response. '
'response_status: %s, response_text: %s' %
(http_response.status_code,
http_response.text))
raise TackerException(_('Failed to get an access token.'))
access_token = http_response.json().get('access_token')
if not access_token:
LOG.error('Failed to get an access token: %s',
http_response.text)
raise TackerException(_('Failed to get an access token.'))
self.access_token = access_token
except (ksa_exceptions.ConnectFailure,
ksa_exceptions.DiscoveryFailure,
ksa_exceptions.RequestTimeout) as error:
LOG.error('Unable to get an access token: %s', error)
raise TackerException(
_('The OAuth2.0 access token API service is '
'temporarily unavailable.'))
except TackerException:
raise
except Exception as error:
LOG.error('Unable to get an access token: %s', error)
raise TackerException(
_('An exception occurred during the processing '
'of getting an access token'))
header = {'Authorization': f'Bearer {self.access_token}'}
return header

View File

@ -24,6 +24,7 @@ from oslo_context import context as oslo_context
from oslo_db.sqlalchemy import enginefacade from oslo_db.sqlalchemy import enginefacade
from tacker.common import exceptions from tacker.common import exceptions
from tacker.common.ext_oauth2_auth import ExtOAuth2Auth
from tacker.db import api as db_api from tacker.db import api as db_api
from tacker import policy from tacker import policy
@ -217,6 +218,8 @@ def is_user_context(context):
def generate_tacker_service_context(): def generate_tacker_service_context():
if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
return ExtOAuth2Auth()
return keystone_password.KeystonePassword( return keystone_password.KeystonePassword(
password=CONF.keystone_authtoken.password, password=CONF.keystone_authtoken.password,
auth_url=CONF.keystone_authtoken.auth_url, auth_url=CONF.keystone_authtoken.auth_url,

View File

@ -20,6 +20,11 @@ key_manager_opts = [
default='tacker.keymgr.barbican_key_manager' default='tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager', '.BarbicanKeyManager',
help='The full class name of the key manager API class'), help='The full class name of the key manager API class'),
cfg.StrOpt('barbican_endpoint',
help='The endpoint for barbican API.'),
cfg.StrOpt('barbican_version',
default='v1',
help='The version for barbican API.'),
] ]

View File

@ -20,9 +20,11 @@ from barbicanclient import client as barbican_client
from barbicanclient import exceptions as barbican_exception from barbicanclient import exceptions as barbican_exception
from keystoneauth1 import identity from keystoneauth1 import identity
from keystoneauth1 import session from keystoneauth1 import session
from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from tacker._i18n import _ from tacker._i18n import _
from tacker.common.exceptions import TackerException
from tacker.keymgr import exception from tacker.keymgr import exception
from tacker.keymgr import key_manager from tacker.keymgr import key_manager
@ -57,6 +59,30 @@ class BarbicanKeyManager(key_manager.KeyManager):
if self._barbican_client and self._current_context == context: if self._barbican_client and self._current_context == context:
return self._barbican_client return self._barbican_client
if cfg.CONF.ext_oauth2_auth.use_ext_oauth2_auth:
try:
barbican_endpoint = cfg.CONF.key_manager.barbican_endpoint
barbican_version = cfg.CONF.key_manager.barbican_version
if not barbican_endpoint:
msg = _('The value is required for option %s in group '
'[key_manager]') % 'barbican_endpoint'
raise TackerException(msg)
sess = context.create_session()
self._barbican_endpoint = barbican_endpoint
if self._barbican_endpoint[-1] == '/':
self._barbican_endpoint = self._barbican_endpoint[:-1]
self._barbican_client = barbican_client.Client(
session=sess,
endpoint=self._barbican_endpoint)
self._current_context = context
self._base_url = '%s/%s/' % (
self._barbican_endpoint,
barbican_version)
return self._barbican_client
except Exception as e:
LOG.error('Error creating Barbican client: %s', e)
raise exception.KeyManagerError(reason=e)
try: try:
auth = self._get_keystone_auth(context) auth = self._get_keystone_auth(context)
sess = session.Session(auth=auth) sess = session.Session(auth=auth)

View File

@ -177,9 +177,13 @@ class Kubernetes_Driver(abstract_vim_driver.VimAbstractDriver):
try: try:
k_context = \ k_context = \
t_context.generate_tacker_service_context() t_context.generate_tacker_service_context()
keystone_conf = CONF.keystone_authtoken
secret_uuid = auth['secret_uuid'] secret_uuid = auth['secret_uuid']
keymgr_api = KEYMGR_API(keystone_conf.auth_url) if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(
CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(
CONF.keystone_authtoken.auth_url)
keymgr_api.delete(k_context, secret_uuid) keymgr_api.delete(k_context, secret_uuid)
LOG.debug('VIM key deleted successfully for vim %s', LOG.debug('VIM key deleted successfully for vim %s',
vim_id) vim_id)
@ -217,8 +221,11 @@ class Kubernetes_Driver(abstract_vim_driver.VimAbstractDriver):
if CONF.k8s_vim.use_barbican: if CONF.k8s_vim.use_barbican:
try: try:
k_context = t_context.generate_tacker_service_context() k_context = t_context.generate_tacker_service_context()
keystone_conf = CONF.keystone_authtoken if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(keystone_conf.auth_url) keymgr_api = KEYMGR_API(
CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
secret_uuid = keymgr_api.store(k_context, fernet_key) secret_uuid = keymgr_api.store(k_context, fernet_key)
auth['key_type'] = 'barbican_key' auth['key_type'] = 'barbican_key'

View File

@ -194,9 +194,12 @@ class OpenStack_Driver(abstract_vim_driver.VimAbstractDriver,
if auth.get('key_type') == 'barbican_key': if auth.get('key_type') == 'barbican_key':
try: try:
k_context = t_context.generate_tacker_service_context() k_context = t_context.generate_tacker_service_context()
keystone_conf = CONF.keystone_authtoken
secret_uuid = auth['secret_uuid'] secret_uuid = auth['secret_uuid']
keymgr_api = KEYMGR_API(keystone_conf.auth_url) if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(
CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
keymgr_api.delete(k_context, secret_uuid) keymgr_api.delete(k_context, secret_uuid)
LOG.debug('VIM key deleted successfully for vim %s', LOG.debug('VIM key deleted successfully for vim %s',
vim_id) vim_id)
@ -227,8 +230,11 @@ class OpenStack_Driver(abstract_vim_driver.VimAbstractDriver,
if CONF.vim_keys.use_barbican: if CONF.vim_keys.use_barbican:
try: try:
k_context = t_context.generate_tacker_service_context() k_context = t_context.generate_tacker_service_context()
keystone_conf = CONF.keystone_authtoken if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(keystone_conf.auth_url) keymgr_api = KEYMGR_API(
CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
secret_uuid = keymgr_api.store(k_context, fernet_key) secret_uuid = keymgr_api.store(k_context, fernet_key)
auth['key_type'] = 'barbican_key' auth['key_type'] = 'barbican_key'

View File

@ -569,9 +569,11 @@ class NfvoPlugin(nfvo_db_plugin.NfvoPluginDb, vnffg_db.VnffgPluginDbMixin,
cred = auth['password'].encode('utf-8') cred = auth['password'].encode('utf-8')
if auth.get('key_type') == 'barbican_key': if auth.get('key_type') == 'barbican_key':
k_context = t_context.generate_tacker_service_context() k_context = t_context.generate_tacker_service_context()
keystone_conf = CONF.keystone_authtoken
secret_uuid = auth['secret_uuid'] secret_uuid = auth['secret_uuid']
keymgr_api = KEYMGR_API(keystone_conf.auth_url) if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
secret_obj = keymgr_api.get(k_context, secret_uuid) secret_obj = keymgr_api.get(k_context, secret_uuid)
vim_key = secret_obj.payload vim_key = secret_obj.payload
else: else:

View File

@ -0,0 +1,52 @@
-----BEGIN PRIVATE KEY-----
MIIJQwIBADANBgkqhkiG9w0BAQEFAASCCS0wggkpAgEAAoICAQDegNuQgmQL7n10
+Z3itXtpiNHlvZwCYOS66+3PakAw1OoRB6SiHeNYnuVRHlraTDKnnfgHhX/1AVs7
P36QU5PVYznGip2PXZlCh8MeQhpXgKKt25LPnpQOnUssHyq+OqTHZB6eS2C7xMHf
wzPrYRwxhbVgUUVe85cdiXaL5ZRqXNotM00wH1hck4s+1fsnKv7UeGbwM1WwMn6/
0E1eKwYzlKm4Vmkcivy8WBI7Ijp/MPOUyRXN/mPh8L2VOq0D1E3pufYoYmpBkiQi
Ii8nz5CXrhDpM0tGKD+RZ+howE2i+frI2gNDfU5xMx+k+qjD0jftDrQ+OZUujUtq
6JfdrvtPBT01XZw8GV5Rm9vEwMRduWUDGdRB3chOTeTUdsIG765+Ot7GE7nYrAs0
s/ryAm1FnNJocTzje7k07IzdBpWzrTrx087Kfcsn6evEABOxim0i+AHUR94QR9/V
EP3/+SkJ7zl9P1KzOZZCWtUTnfQxrLhEnCwwjtl35vWlzst+TR7HDwIzQRQVLFH9
zMTz8tw6coPifkbVzdwCLGoKge4llDPcVx/TmIGFD3saT0E68yxXe6k3cdIg6lZf
dB0yutVBzECrx+LiIpxwQWRKHNiR58KsHHmgXDb8ORBCjpmctD+JsdBhf8hDRMXP
9sV/fbMUwgrRceyj9AV2x59tE9+UHwIDAQABAoICABb6V7JkxNA2oN4jqRpwg34y
kvqWyjW0q+ph0v1Ii7h/RGzdzTKww3mzbxshd2Bz3gdRWPvt3Xj/2twTgo6FEw9G
YAEQ75SOpfUo8A1/5hiDQEmUE2U9iyy3Mbwsu81JYRr2S/Ms9aBugVcKYaI9NRwo
IsL/oZpcrY5vU76+xsT1MdLZKW9+zTFCS28Byh4RYp+uj3Le2kqH7G8Co/rFlq5c
++n9gn1gHRmWPsu8jS31cDI9UfMkAkyi//EZTiTHGAS7H6CsCS0cWn7r6NLDrLr9
TuHGWk+0eFwbzvSCZ4IdLrjvSsb9ecxW6z2uZR9T5lKk4hhK+g0EqnUv7/8Eww8E
wA2J1zhuQ0UzoAowjj5338whBQROKSO4u3ppxhNUSP7fUgYdEKUQEg7rlfEzI+pG
dV1LtG0GZBzdZXpE/PTpASjefCkC6olmZpUvajHJGqP0a/ygA9SEBm+B/Q4ii7+0
luk6Lj6z+vSWatU7LrLnQeprN82NWxtkH+u2gjMOq1N8r4FOFvbZYBp1NMvtH4iP
R6jLdJWYx/KOr4lCkbgTszlVhPop8dktOPQSPL4u6RxdmsGBf028oWKXLrj1D1Ua
dBWR1L1CCnI8X6jxL6eT52qF+NY2JxanX6NnzxE/KqedWXmKDxn0M3ETfizz9UG4
8UmsMgJ8UUALRbWHjlEBAoIBAQDvQmYWhYtUQjcaeLkhjd9qPXjLGIL6NYnrYEUK
Yenn0mh7SKZTTL8yz/QVOecD/QiBetmQJ5FsmUmqorGWYfbWs6C+p2yHQ0U9X7rE
3ynFm0MDluuNZMWYRu0Yb6gvCYjitlt/efGKDalP1Al1hX2w9fUGdj32K6fulEX6
dcl4r2bq4i+rOwe9YDD9yvkvh1+aCwA56JCTBoEBsbmOdKTC7431rT8BTLbBaXwy
hf35P9wzU079QwwqDKdUlMQjUz9gWZkYFHkPfce2MCm+T0aHNnjQtLXRGOcIj15P
B64+GB9b86XNZlqpuY2rceF+LDwaw4rgQkXDr+TdAsjrtcdHAoIBAQDuElNxRc9t
hKwZGBVIWaHI3Y3oaTymi277DwhDzvmJgwBAddfEaC1rCt/giXxtmhhnAXpDD4sk
3m8iWw1jODRxOv2UDkUvSRV5tfY+QTG0nVVmMpX3lPWpIYxEVg34WYSq0xnXKrpW
zxUOqD1fW2i2lXZtFAb6ZNt/hHts7KUPzk9/ZbAomVHO6JO4Ac3n0LTDSCmQHhRO
5gV0ea4Sh6AVOiFD20rMAnTFNnxnI+wLMt0SNAzouhRMulDqOcAmoH2DKG8PCcEt
dQpUDwITxXuomsjhIHIli760MwSlwWZbrh5h7NAj1VmnQBtMkLnBtnE7cFSVdcPt
BAFnq72txGhpAoIBAQDIWYKhM1zTxsrbyOHF3kaKcUVYVINBQFnolunZYtp6vG+v
ZMuaj3a/9vE+YQk5Bsb7ncLXerrFBKtyTuCEvC315d8iJ5KyxbsSRLpiJzmUdoos
VFGVSiBIfoQF5WIhWUueBPQjkBqZ7wyrgzQUjB8PczamHZePL0lleBYNQFrgS4jU
AWnHahv2EbmUnEYD7ck5diLPWxbNdzHKGGf4iWZ6shze8B8FWJbk6Q8OQ7PD5xze
gdFwNJfYElaAdj60Ef7NENopFuO0/C+jOTuLWFkH2q5anihuGvtD6MIhTZ4z8wE3
f5SEpkQfQfkG6srXW/VMuBfv6K8AyabNB4r2Dnb7AoIBADHy2lrroKeDrG/fY6e4
Vn9ELJ/UZIs0ueYmsz82z5gQSh88Gjb0/IJ2153OerKsH+6MmtAzFKh5mquEmvx0
MFyJWeaUT+Op272bdbx+BSW11NMKTfiR4jDH/xvfSjMO5QzKGaPRLSNFc0+N8MJu
9TtJhH1CNGyYeIz6iMLDq6XzTS6XcSwzbryQg12Z00+NtD88hqvcA7rB++cCGIl+
txF9Drmj6r9+zG0MD3G8UavP0h4dmY/CarvmY0+hKjVweqTn+NUY4NTet3oHZBIt
3tHzF65UFl7WQP6hrZnxR754e5tkCg9aleLHSnL38mE4G+2ylax99stlib3shHFO
wfECggEBAJrW8BmZXbD8ss3c7kHPzleAf1q/6bPnxRXB0luCPz7tkMfdkOQ2cG1t
rcnsKcyR2woEbtdRK938KxZgTgzKYVhR8spKFSh01/d9OZAP6f+iCoR2zzOlSFo4
pejnQY0LHEwGZmnzghLoqJSUgROAR49CvLO1mI48CaEUuLmqzPYWNXMHDDU2N5XO
uF0/ph68fnI+f+0ZUgdpVPFRnfSrAqzEhzEMh1vnZ4ZxEVpgUcn/hRfNZ3hN0LEr
fjm2bWxg2j0rxjS0mUDQpaMj0253jVYRiC3M3cCh0NSZtwaXVJYCVxetpjBTPfJr
jIgmPTKGR0FedjAeCBByH9vkw8iRg7w=
-----END PRIVATE KEY-----

View File

@ -0,0 +1,457 @@
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import copy
import os
from unittest import mock
import uuid
from oslo_config import cfg
from requests_mock.contrib import fixture as rm_fixture
from keystoneauth1 import exceptions as ksa_exceptions
from tacker.common.exceptions import TackerException
from tacker import context
from tacker.tests.unit import base
JWT_KEY_FILE = 'jwt_private.key'
def _get_sample_key(name):
filename = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"./sample_keys/", name)
with open(filename, "r") as f:
content = f.read()
return content
def get_mock_conf_effect(audience=None, token_endpoint=None,
auth_method=None, client_id=None, client_secret=None,
scope=None, jwt_key_file=None, jwt_algorithm=None,
jwt_bearer_time_out=None, certfile=None, keyfile=None,
cafile=None, http_connect_timeout=None, insecure=None):
def mock_conf_key_effect(name):
if name == 'keystone_authtoken':
return MockConfig(conf=None)
elif name == 'ext_oauth2_auth':
config = {'use_ext_oauth2_auth': True}
if audience:
config['audience'] = audience
if token_endpoint:
config['token_endpoint'] = token_endpoint
if auth_method:
config['auth_method'] = auth_method
if client_id:
config['client_id'] = client_id
if client_secret:
config['client_secret'] = client_secret
if scope:
config['scope'] = scope
if jwt_key_file:
config['jwt_key_file'] = jwt_key_file
if jwt_algorithm:
config['jwt_algorithm'] = jwt_algorithm
if jwt_bearer_time_out:
config['jwt_bearer_time_out'] = jwt_bearer_time_out
if certfile:
config['certfile'] = certfile
if keyfile:
config['keyfile'] = keyfile
if cafile:
config['cafile'] = cafile
if cafile:
config['http_connect_timeout'] = http_connect_timeout
if cafile:
config['insecure'] = insecure
return MockConfig(
conf=config)
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf or name not in self.conf:
raise cfg.NoSuchOptError(f'not found {name}')
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class MockSession(object):
def __init__(self, ):
self.auth = None
class TestExtOAuth2Auth(base.TestCase):
def setUp(self):
super(TestExtOAuth2Auth, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture())
self.token_endpoint = 'http://demo/token_endpoint'
self.auth_method = 'client_secret_post'
self.client_id = 'test_client_id'
self.client_secret = 'test_client_secret'
self.scope = 'tacker_api'
self.access_token = f'access_token_{str(uuid.uuid4())}'
self.audience = 'http://demo/audience'
self.jwt_bearer_time_out = 2800
self.addCleanup(mock.patch.stopall)
def _get_access_token_response(self, request, context,
auth_method=None,
client_id=None,
client_secret=None,
scope=None,
access_token=None,
status_code=200,
raise_error=None,
resp=None
):
if raise_error:
raise raise_error
if auth_method == 'tls_client_auth':
body = (f'client_id={client_id}&scope={scope}'
f'&grant_type=client_credentials')
self.assertEqual(request.text, body)
elif auth_method == 'client_secret_post':
body = (f'client_id={client_id}&client_secret={client_secret}'
f'&scope={scope}&grant_type=client_credentials')
self.assertEqual(request.text, body)
elif auth_method == 'client_secret_basic':
body = f'scope={scope}&grant_type=client_credentials'
self.assertEqual(request.text, body)
auth_basic = request._request.headers.get('Authorization')
self.assertIsNotNone(auth_basic)
auth = 'Basic ' + base64.standard_b64encode(
f'{client_id}:{client_secret}'.encode('ascii')).decode('ascii')
self.assertEqual(auth_basic, auth)
elif auth_method == 'private_key_jwt':
self.assertIn(f'client_id={client_id}', request.text)
self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A'
'oauth%3Aclient-assertion-type%3Ajwt-bearer'),
request.text)
self.assertIn('client_assertion=', request.text)
self.assertIn(f'scope={scope}', request.text)
self.assertIn('grant_type=client_credentials', request.text)
elif auth_method == 'client_secret_jwt':
self.assertIn(f'client_id={client_id}', request.text)
self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A'
'oauth%3Aclient-assertion-type%3Ajwt-bearer'),
request.text)
self.assertIn('client_assertion=', request.text)
self.assertIn(f'scope={scope}', request.text)
self.assertIn('grant_type=client_credentials', request.text)
if not access_token:
access_token = f'access_token{str(uuid.uuid4())}'
if not resp:
if status_code == 200:
response = {
'access_token': access_token,
'expires_in': 1800,
'refresh_expires_in': 0,
'token_type': 'Bearer',
'not-before-policy': 0,
'scope': scope
}
else:
response = {'error': 'error_title',
'error_description': 'error message'}
else:
response = copy.deepcopy(resp)
context.status_code = status_code
return response
def _get_default_mock_conf_effect(self):
return get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
def _check_authorization_header(self):
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
headers = auth_context.get_headers(session)
bearer = f'Bearer {self.access_token}'
self.assertIn('Authorization', headers)
self.assertEqual(bearer, headers.get('Authorization'))
return auth_context
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_token_endpoint(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint='',
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope
)
self.assertRaises(TackerException,
context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_scope(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret)
self.assertRaises(TackerException,
context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_keystone_middleware_opts(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
self.assertRaises(TackerException,
context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('keystoneauth1.loading.session.Session.load_from_options')
def test_create_session(self, mock_load_from_options, mock_get_conf_key):
certfile = f'/demo/certfile{str(uuid.uuid4())}'
keyfile = f'/demo/keyfile{str(uuid.uuid4())}'
cafile = f'/demo/cafile{str(uuid.uuid4())}'
conf_insecure = True
http_connect_timeout = 1000
def load_side_effect(**kwargs):
self.assertEqual(conf_insecure, kwargs.get('insecure'))
self.assertEqual(cafile, kwargs.get('cacert'))
self.assertEqual(certfile, kwargs.get('cert'))
self.assertEqual(keyfile, kwargs.get('key'))
self.assertEqual(http_connect_timeout, kwargs.get('timeout'))
return MockSession()
mock_load_from_options.side_effect = load_side_effect
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='tls_client_auth',
client_id=self.client_id,
scope=self.scope,
certfile=certfile,
keyfile=keyfile,
cafile=cafile,
insecure=conf_insecure,
http_connect_timeout=http_connect_timeout)
auth_context = context.generate_tacker_service_context()
auth_context.create_session()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_connection_params(self, mock_get_conf_key):
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
params = auth_context.get_connection_params(session)
self.assertDictEqual(params, {})
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_tls_client_auth(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='tls_client_auth',
client_id=self.client_id,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='tls_client_auth',
client_id=self.client_id,
scope=self.scope)
auth_context = self._check_authorization_header()
result = auth_context.invalidate()
self.assertEqual(True, result)
self.assertIsNone(auth_context.access_token)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_post(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='client_secret_post',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token
)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_post',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_basic(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='client_secret_basic',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_basic',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
self._check_authorization_header()
@mock.patch('builtins.open', mock.mock_open(read_data=_get_sample_key(
JWT_KEY_FILE)))
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_private_key_jwt(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='private_key_jwt',
client_id=self.client_id,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='private_key_jwt',
client_id=self.client_id,
audience=self.audience,
jwt_key_file=f'/demo/jwt_key_file{str(uuid.uuid4())}',
jwt_algorithm='RS256',
jwt_bearer_time_out=self.jwt_bearer_time_out,
scope=self.scope)
self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_jwt(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='client_secret_jwt',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_jwt',
client_id=self.client_id,
audience=self.audience,
client_secret=self.client_secret,
jwt_algorithm='HS256',
jwt_bearer_time_out=self.jwt_bearer_time_out,
scope=self.scope)
self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_invalid_auth_method(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_other',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope
)
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_connect_fail(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
raise_error=ksa_exceptions.RequestTimeout('connect time out.'))
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_is_not_200(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
status_code=201)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_not_include_access_token(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
status_code=200,
resp={'error': 'invalid_client',
'error_description': 'The client is not found.'})
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_unknown_error(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
raise_error=Exception('unknown error occurred.'))
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)

View File

@ -0,0 +1,255 @@
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import uuid
from oslo_config import cfg
from requests_mock.contrib import fixture as rm_fixture
from tacker import context as t_context
from tacker.keymgr.barbican_key_manager import BarbicanKeyManager
from tacker.keymgr import exception
from tacker.tests.unit import base
def get_mock_conf_key_effect(barbican_endpoint=None):
def mock_conf_key_effect(name):
if name == 'ext_oauth2_auth':
return MockConfig(
conf={
'use_ext_oauth2_auth': True,
'token_endpoint': 'http://demo/token_endpoint',
'auth_method': 'client_secret_post',
'client_id': 'client_id',
'client_secret': 'client_secret',
'scope': 'client_secret'
})
elif name == 'key_manager':
conf = {
'api_class': 'tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager',
'barbican_version': 'v1',
'barbican_endpoint': barbican_endpoint
}
return MockConfig(conf=conf)
elif name == 'k8s_vim':
return MockConfig(
conf={
'use_barbican': True
})
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf and name not in self.conf:
raise cfg.NoSuchOptError(f'not found {name}')
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class TestBarbicanKeyManager(base.TestCase):
def setUp(self):
super(TestBarbicanKeyManager, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture())
self.token_endpoint = 'http://demo/token_endpoint'
self.auth_method = 'client_secret_post'
self.client_id = 'test_client_id'
self.client_secret = 'test_client_secret'
self.scope = 'tacker_api'
self.access_token = f'access_token_{str(uuid.uuid4())}'
self.audience = 'http://demo/audience'
self.jwt_bearer_time_out = 2800
self.addCleanup(mock.patch.stopall)
def _mock_external_token_api(self):
def mock_token_resp(request, context):
response = {
'access_token': self.access_token,
'expires_in': 1800,
'refresh_expires_in': 0,
'token_type': 'Bearer',
'not-before-policy': 0,
'scope': 'tacker_api'
}
context.status_code = 200
return response
self.requests_mock.post('http://demo/token_endpoint',
json=mock_token_resp)
def _mock_barbican_get_version_resp(self):
def mock_barbican_get_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
response = {
"versions": {
"values": [
{
"id": "v1",
"status": "stable",
"links": [
{
"rel": "self",
"href": "http://demo/barbican/v1/"
},
{
"rel": "describedby",
"type": "text/html",
"href": "https://docs.openstack.org/"}
],
"media-types": [
{
"base": "application/json",
"type": "application/"
"vnd.openstack.key-manager-v1+json"
}
]
}
]
}
}
return response
return mock_barbican_get_resp
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
def test_delete_ext_oauth2_auth(self, mock_validate, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican/')
self._mock_external_token_api()
mock_validate.return_value = True
def mock_barbican_delete_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 204
return ''
def mock_barbican_get_for_check_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
return {}
self.requests_mock.get(
'http://demo/barbican',
json=self._mock_barbican_get_version_resp())
self.requests_mock.delete(
'http://demo/barbican/v1/secrets/True',
json=mock_barbican_delete_resp)
self.requests_mock.get(
'http://demo/barbican/v1/secrets/True',
json=mock_barbican_get_for_check_resp)
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
keymgr.delete(auth, 'test')
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('tacker.keymgr.barbican_key_manager.'
'BarbicanKeyManager._retrieve_secret_uuid')
def test_store_ext_oauth2_auth(self, mock_secret_uuid,
mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican')
secret_id = 'store_secret_uuid'
mock_secret_uuid.return_value = secret_id
self._mock_external_token_api()
def mock_barbican_post_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
response = {
'name': 'AES key',
'expiration': '2023-01-13T19:14:44.180394',
'algorithm': 'aes',
'bit_length': 256,
'mode': 'cbc',
'payload': 'YmVlcg==',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64'
}
context.status_code = 201
return response
self.requests_mock.get(
'http://demo/barbican',
json=self._mock_barbican_get_version_resp())
self.requests_mock.post('http://demo/barbican/v1/secrets/',
json=mock_barbican_post_resp)
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
result = keymgr.store(auth, 'test')
self.assertEqual(result, secret_id)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
def test_get_ext_oauth2_auth(self, mock_validate, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican/')
self._mock_external_token_api()
mock_validate.return_value = True
def mock_barbican_get_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
response = {
'id': 'test001'
}
return response
self.requests_mock.get(
'http://demo/barbican',
json=self._mock_barbican_get_version_resp())
self.requests_mock.get(
'http://demo/barbican/v1/secrets/True',
json=mock_barbican_get_resp)
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
result = keymgr.get(auth, 'test001')
self.assertEqual(result.secret_ref,
'http://demo/barbican/v1/secrets/test001')
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_ext_oauth2_auth_no_endpoint(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='')
self._mock_external_token_api()
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
self.assertRaises(exception.KeyManagerError, keymgr.get, auth, 'test')

View File

@ -52,6 +52,36 @@ cfg.CONF.register_opts(OPTS, 'keystone_authtoken')
CONF = cfg.CONF CONF = cfg.CONF
def get_mock_conf_key_effect():
def mock_conf_key_effect(name):
if name == 'ext_oauth2_auth':
return MockConfig(
conf={
'use_ext_oauth2_auth': True,
'token_endpoint': 'http://demo/token_endpoint',
'auth_method': 'client_secret_post',
'client_id': 'client_id',
'client_secret': 'client_secret',
'scope': 'client_secret'
})
elif name == 'key_manager':
conf = {
'api_class': 'tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager',
'barbican_version': 'v1',
'barbican_endpoint': 'http://test/barbican'
}
return MockConfig(conf=conf)
elif name == 'k8s_vim':
return MockConfig(
conf={
'use_barbican': True
})
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class FakeKubernetesAPI(mock.Mock): class FakeKubernetesAPI(mock.Mock):
pass pass
@ -67,6 +97,21 @@ class mock_dict(dict):
__delattr__ = dict.__delitem__ __delattr__ = dict.__delitem__
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
if name not in self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class TestKubernetes_Driver(base.TestCase): class TestKubernetes_Driver(base.TestCase):
def setUp(self): def setUp(self):
super(TestKubernetes_Driver, self).setUp() super(TestKubernetes_Driver, self).setUp()
@ -213,3 +258,32 @@ class TestKubernetes_Driver(base.TestCase):
del self.vim_obj['auth_cred']['ssl_ca_cert'] del self.vim_obj['auth_cred']['ssl_ca_cert']
self.assertRaises(nfvo.VimUnauthorizedException, self.assertRaises(nfvo.VimUnauthorizedException,
self.kubernetes_driver.register_vim, self.vim_obj) self.kubernetes_driver.register_vim, self.vim_obj)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_deregister_vim_barbican_ext_oauth2_auth(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect()
self.keymgr.delete.return_value = None
vim_obj = self.get_vim_obj_barbican()
self.kubernetes_driver.deregister_vim(vim_obj)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_encode_vim_auth_barbican_ext_oauth2_auth(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect()
self.config_fixture.config(group='k8s_vim',
use_barbican=True)
fernet_attrs = {'encrypt.return_value': 'encrypted_password'}
mock_fernet_obj = mock.Mock(**fernet_attrs)
mock_fernet_key = 'test_fernet_key'
self.keymgr.store.return_value = 'fake-secret-uuid'
self.kubernetes_api.create_fernet_key.return_value = (mock_fernet_key,
mock_fernet_obj)
vim_obj = self.get_vim_obj()
self.kubernetes_driver.encode_vim_auth(
vim_obj['id'], vim_obj['auth_cred'])
mock_fernet_obj.encrypt.assert_called_once_with(mock.ANY)
self.assertEqual(vim_obj['auth_cred']['key_type'],
'barbican_key')
self.assertEqual(vim_obj['auth_cred']['secret_uuid'],
'fake-secret-uuid')

View File

@ -16,9 +16,10 @@
import os import os
from unittest import mock from unittest import mock
from keystoneauth1 import exceptions
from oslo_config import cfg from oslo_config import cfg
from keystoneauth1 import exceptions
from tacker import context as t_context from tacker import context as t_context
from tacker.extensions import nfvo from tacker.extensions import nfvo
from tacker.nfvo.drivers.vim import openstack_driver from tacker.nfvo.drivers.vim import openstack_driver
@ -54,6 +55,38 @@ cfg.CONF.register_opts(OPTS, 'keystone_authtoken')
CONF = cfg.CONF CONF = cfg.CONF
def get_mock_conf_key_effect():
def mock_conf_key_effect(name):
if name == 'keystone_authtoken':
return MockConfig(conf=None)
elif name == 'ext_oauth2_auth':
return MockConfig(
conf={
'use_ext_oauth2_auth': True,
'token_endpoint': 'http://demo/token_endpoint',
'auth_method': 'client_secret_post',
'client_id': 'client_id',
'client_secret': 'client_secret',
'scope': 'client_secret'
})
elif name == 'key_manager':
conf = {
'api_class': 'tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager',
'barbican_version': 'v1',
'barbican_endpoint': 'http://test/barbican'
}
return MockConfig(conf=conf)
elif name == 'vim_keys':
return MockConfig(
conf={
'use_barbican': True
})
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class FakeKeystone(mock.Mock): class FakeKeystone(mock.Mock):
pass pass
@ -74,6 +107,21 @@ class mock_dict(dict):
__delattr__ = dict.__delitem__ __delattr__ = dict.__delitem__
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
if name not in self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class TestOpenstack_Driver(base.TestCase): class TestOpenstack_Driver(base.TestCase):
def setUp(self): def setUp(self):
super(TestOpenstack_Driver, self).setUp() super(TestOpenstack_Driver, self).setUp()
@ -255,3 +303,31 @@ class TestOpenstack_Driver(base.TestCase):
self.assertRaises(nfvo.VimGetResourceNotFoundException, self.assertRaises(nfvo.VimGetResourceNotFoundException,
self.openstack_driver.get_vim_resource_id, self.openstack_driver.get_vim_resource_id,
self.vim_obj, resource_type, resource_name) self.vim_obj, resource_type, resource_name)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_deregister_vim_barbican_ext_oauth2_auth(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect()
self.keymgr.delete.return_value = None
vim_obj = self.get_vim_obj_barbican()
self.openstack_driver.deregister_vim(vim_obj)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_encode_vim_auth_barbican_ext_oauth2_auth(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect()
fernet_attrs = {'encrypt.return_value': 'encrypted_password'}
mock_fernet_obj = mock.Mock(**fernet_attrs)
mock_fernet_key = 'test_fernet_key'
self.keymgr.store.return_value = 'fake-secret-uuid'
self.keystone.create_fernet_key.return_value = (mock_fernet_key,
mock_fernet_obj)
vim_obj = self.get_vim_obj()
self.openstack_driver.encode_vim_auth(
vim_obj['id'], vim_obj['auth_cred'])
mock_fernet_obj.encrypt.assert_called_once_with(mock.ANY)
self.assertEqual(vim_obj['auth_cred']['key_type'],
'barbican_key')
self.assertEqual(vim_obj['auth_cred']['secret_uuid'],
'fake-secret-uuid')

View File

@ -19,7 +19,9 @@ import os
from unittest import mock from unittest import mock
from unittest.mock import patch from unittest.mock import patch
from oslo_config import cfg
from oslo_utils import uuidutils from oslo_utils import uuidutils
from requests_mock.contrib import fixture as rm_fixture
from tacker.common import exceptions from tacker.common import exceptions
from tacker import context from tacker import context
@ -28,6 +30,7 @@ from tacker.db.nfvo import nfvo_db
from tacker.db.nfvo import ns_db from tacker.db.nfvo import ns_db
from tacker.db.nfvo import vnffg_db from tacker.db.nfvo import vnffg_db
from tacker.extensions import nfvo from tacker.extensions import nfvo
from tacker.keymgr import API as KEYMGR_API
from tacker.manager import TackerManager from tacker.manager import TackerManager
from tacker.nfvo import nfvo_plugin from tacker.nfvo import nfvo_plugin
from tacker.plugins.common import constants from tacker.plugins.common import constants
@ -40,6 +43,53 @@ SECRET_PASSWORD = '***'
DUMMY_NS_2 = 'ba6bf017-f6f7-45f1-a280-57b073bf78ef' DUMMY_NS_2 = 'ba6bf017-f6f7-45f1-a280-57b073bf78ef'
def get_mock_conf_key_effect():
def mock_conf_key_effect(name):
if name == 'keystone_authtoken':
return MockConfig(conf=None)
elif name == 'ext_oauth2_auth':
return MockConfig(
conf={
'use_ext_oauth2_auth': True,
'token_endpoint': 'http://demo/token_endpoint',
'auth_method': 'client_secret_post',
'client_id': 'client_id',
'client_secret': 'client_secret',
'scope': 'client_secret'
})
elif name == 'key_manager':
conf = {
'api_class': 'tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager',
'barbican_endpoint': 'http://demo/barbican',
'barbican_version': 'v1'
}
return MockConfig(conf=conf)
elif name == 'k8s_vim':
return MockConfig(
conf={
'use_barbican': True
})
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
if name not in self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
def dummy_get_vim(*args, **kwargs): def dummy_get_vim(*args, **kwargs):
vim_obj = dict() vim_obj = dict()
vim_obj['auth_cred'] = utils.get_vim_auth_obj() vim_obj['auth_cred'] = utils.get_vim_auth_obj()
@ -203,6 +253,9 @@ class FakeVNFMPlugin(mock.Mock):
class TestNfvoPlugin(db_base.SqlTestCase): class TestNfvoPlugin(db_base.SqlTestCase):
def setUp(self): def setUp(self):
super(TestNfvoPlugin, self).setUp() super(TestNfvoPlugin, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture())
KEYMGR_API('')
self.access_token = 'access_token_uuid'
self.addCleanup(mock.patch.stopall) self.addCleanup(mock.patch.stopall)
self.context = context.get_admin_context() self.context = context.get_admin_context()
self.nfvo_plugin = nfvo_plugin.NfvoPlugin() self.nfvo_plugin = nfvo_plugin.NfvoPlugin()
@ -216,6 +269,22 @@ class TestNfvoPlugin(db_base.SqlTestCase):
self._mock( self._mock(
'tacker.common.driver_manager.DriverManager', fake_driver_manager) 'tacker.common.driver_manager.DriverManager', fake_driver_manager)
def _mock_external_token_api(self):
def mock_token_resp(request, context):
response = {
'access_token': self.access_token,
'expires_in': 1800,
'refresh_expires_in': 0,
'token_type': 'Bearer',
'not-before-policy': 0,
'scope': 'tacker_api'
}
context.status_code = 200
return response
self.requests_mock.post('http://demo/token_endpoint',
json=mock_token_resp)
def _insert_dummy_vim(self): def _insert_dummy_vim(self):
session = self.context.session session = self.context.session
vim_db = nfvo_db.Vim( vim_db = nfvo_db.Vim(
@ -1607,3 +1676,97 @@ class TestNfvoPlugin(db_base.SqlTestCase):
non_admin_context, non_admin_context,
'ba6bf017-f6f7-45f1-a280-57b073bf78ea', 'ba6bf017-f6f7-45f1-a280-57b073bf78ea',
ns=nsattr) ns=nsattr)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
@mock.patch('cryptography.fernet.Fernet.decrypt')
def test_build_vim_auth_barbican_external(
self, mock_decrypt, mock_validate, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect()
self._mock_external_token_api()
barbican_uuid = 'test_uuid'
mock_validate.return_value = barbican_uuid
vim_dict = {'id': 'aaaa', 'name': 'VIM0', 'type': 'test_vim',
'auth_cred': {'username': 'test',
'user_domain_name': 'test',
'cert_verify': 'True',
'project_id': 'test',
'project_name': 'test',
'project_domain_name': 'test',
'auth_url': 'http://test/identity/v3',
'key_type': 'barbican_key',
'secret_uuid': '***',
'password': '***'},
'auth_url': 'http://127.0.0.1/identity/v3',
'placement_attr': {'regions': ['TestRegionOne']},
'tenant_id': 'test'}
def mock_barbican_resp(request, context):
auth_value = 'Bearer %s' % self.access_token
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
response = {
'name': 'AES key',
'expiration': '2023-01-13T19:14:44.180394',
'algorithm': 'aes',
'bit_length': 256,
'mode': 'cbc',
'payload': 'YmVlcg==',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64'
}
context.status_code = 200
return response
self.requests_mock.get('http://demo/barbican/v1/secrets/%s' %
barbican_uuid,
json=mock_barbican_resp)
def mock_barbican_payload_resp(request, context):
auth_value = 'Bearer %s' % self.access_token
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
response = '5cJeztZKzISf1JAt73oBeTPPCrymn96A3wqG96F4RxU='
context.status_code = 200
return response
def mock_get_barbican_resp(request, context):
auth_value = 'Bearer %s' % self.access_token
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
response = {
"versions": {
"values": [
{
"id": "v1",
"status": "stable",
"links": [
{
"rel": "self",
"href": "http://demo/barbican/v1/"
},
{
"rel": "describedby",
"type": "text/html",
"href": "https://docs.openstack.org/"}
],
"media-types": [
{
"base": "application/json",
"type": "application/"
"vnd.openstack.key-manager-v1+json"
}
]
}
]
}
}
return response
self.requests_mock.get('http://demo/barbican/v1/secrets/%s/payload' %
barbican_uuid,
json=mock_barbican_payload_resp)
self.requests_mock.get('http://demo/barbican',
json=mock_get_barbican_resp)
mock_decrypt.return_value = 'test'.encode('utf-8')
self.nfvo_plugin._build_vim_auth(vim_dict)

View File

@ -15,13 +15,45 @@
from unittest import mock from unittest import mock
from castellan.common.credentials import keystone_password
from oslo_config import cfg
from oslo_context import context as oslo_context from oslo_context import context as oslo_context
from testtools import matchers from testtools import matchers
from tacker.common.ext_oauth2_auth import ExtOAuth2Auth
from tacker import context from tacker import context
from tacker.tests import base from tacker.tests import base
def get_mock_conf_key_effect(cfg_keystone_authtoken=None,
cfg_ext_oauth2_auth=None):
def mock_conf_key_effect(name):
if name == 'keystone_authtoken':
return MockConfig(name, conf=cfg_keystone_authtoken)
elif name == 'ext_oauth2_auth':
return MockConfig(name, conf=cfg_ext_oauth2_auth)
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(cfg.OptGroup):
def __init__(self, name, conf=None):
self.name = name
self.conf = conf
def __getattr__(self, name):
if not self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
if name not in self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class TestTackerContext(base.BaseTestCase): class TestTackerContext(base.BaseTestCase):
def setUp(self): def setUp(self):
@ -131,3 +163,66 @@ class TestTackerContext(base.BaseTestCase):
self.assertEqual(req_id_before, self.assertEqual(req_id_before,
oslo_context.get_current().request_id) oslo_context.get_current().request_id)
self.assertNotEqual(req_id_before, ctx_admin.request_id) self.assertNotEqual(req_id_before, ctx_admin.request_id)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_generate_tacker_service_context_keystone(self, mock_get_conf_key):
password = 'test_password'
auth_url = 'http://keystone/test/auth_url'
username = 'test_user_name'
user_domain_name = 'test_user_domain_name'
project_name = 'test_project_name'
project_domain_name = 'test_project_domain_name'
token_endpoint = 'http://demo/token'
auth_method = 'client_secret_basic'
client_id = 'test_client_id'
client_secret = 'client_secret'
scope = 'tacker'
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
cfg_keystone_authtoken={
'password': password,
'auth_url': auth_url,
'username': username,
'user_domain_name': user_domain_name,
'project_name': project_name,
'project_domain_name': project_domain_name},
cfg_ext_oauth2_auth={
'token_endpoint': token_endpoint,
'auth_method': auth_method,
'client_id': client_id,
'client_secret': client_secret,
'scope': scope,
'use_ext_oauth2_auth': False}
)
auth_context = context.generate_tacker_service_context()
self.assertIsInstance(auth_context, keystone_password.KeystonePassword)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_generate_tacker_service_context_external(self, mock_get_conf_key):
password = 'test_password'
auth_url = 'http://keystone/test/auth_url'
username = 'test_user_name'
user_domain_name = 'test_user_domain_name'
project_name = 'test_project_name'
project_domain_name = 'test_project_domain_name'
token_endpoint = 'http://demo/token'
auth_method = 'client_secret_basic'
client_id = 'test_client_id'
client_secret = 'client_secret'
scope = 'tacker'
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
cfg_keystone_authtoken={
'password': password,
'auth_url': auth_url,
'username': username,
'user_domain_name': user_domain_name,
'project_name': project_name,
'project_domain_name': project_domain_name},
cfg_ext_oauth2_auth={
'token_endpoint': token_endpoint,
'auth_method': auth_method,
'client_id': client_id,
'client_secret': client_secret,
'scope': scope,
'use_ext_oauth2_auth': True})
auth_context = context.generate_tacker_service_context()
self.assertIsInstance(auth_context, ExtOAuth2Auth)

View File

@ -13,16 +13,70 @@
from sqlalchemy.orm import exc as orm_exc from sqlalchemy.orm import exc as orm_exc
from unittest import mock from unittest import mock
from oslo_config import cfg
from requests_mock.contrib import fixture as rm_fixture
from tacker.extensions import nfvo from tacker.extensions import nfvo
from tacker.keymgr import API as KEYMGR_API
from tacker import manager from tacker import manager
from tacker.tests.unit import base from tacker.tests.unit import base
from tacker.vnfm import vim_client from tacker.vnfm import vim_client
def get_mock_conf_key_effect():
def mock_conf_key_effect(name):
if name == 'keystone_authtoken':
return MockConfig(conf=None)
elif name == 'ext_oauth2_auth':
return MockConfig(
conf={
'use_ext_oauth2_auth': True,
'token_endpoint': 'http://demo/token_endpoint',
'auth_method': 'client_secret_post',
'client_id': 'client_id',
'client_secret': 'client_secret',
'scope': 'client_secret'
})
elif name == 'key_manager':
conf = {
'api_class': 'tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager',
'barbican_endpoint': 'http://demo/barbican',
'barbican_version': 'v1'
}
return MockConfig(conf=conf)
elif name == 'k8s_vim':
return MockConfig(
conf={
'use_barbican': True
})
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
if name not in self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class TestVIMClient(base.TestCase): class TestVIMClient(base.TestCase):
def setUp(self): def setUp(self):
super(TestVIMClient, self).setUp() super(TestVIMClient, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture())
KEYMGR_API('')
self.access_token = 'access_token_uuid'
self.vim_info = {'id': 'aaaa', 'name': 'VIM0', 'type': 'test_vim', self.vim_info = {'id': 'aaaa', 'name': 'VIM0', 'type': 'test_vim',
'auth_cred': {'password': '****'}, 'auth_cred': {'password': '****'},
'auth_url': 'http://127.0.0.1/identity/v3', 'auth_url': 'http://127.0.0.1/identity/v3',
@ -32,6 +86,22 @@ class TestVIMClient(base.TestCase):
self.service_plugins = mock.Mock() self.service_plugins = mock.Mock()
self.nfvo_plugin = mock.Mock() self.nfvo_plugin = mock.Mock()
def _mock_external_token_api(self):
def mock_token_resp(request, context):
response = {
'access_token': self.access_token,
'expires_in': 1800,
'refresh_expires_in': 0,
'token_type': 'Bearer',
'not-before-policy': 0,
'scope': 'tacker_api'
}
context.status_code = 200
return response
self.requests_mock.post('http://demo/token_endpoint',
json=mock_token_resp)
def test_get_vim_without_defined_default_vim(self): def test_get_vim_without_defined_default_vim(self):
self.nfvo_plugin.get_default_vim.side_effect = \ self.nfvo_plugin.get_default_vim.side_effect = \
orm_exc.NoResultFound() orm_exc.NoResultFound()
@ -138,3 +208,103 @@ class TestVIMClient(base.TestCase):
vim_regions = ['TestRegionOne', 'TestRegionTwo'] vim_regions = ['TestRegionOne', 'TestRegionTwo']
region_name = 'TestRegionOne' region_name = 'TestRegionOne'
self.assertTrue(self.vimclient.region_valid(vim_regions, region_name)) self.assertTrue(self.vimclient.region_valid(vim_regions, region_name))
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
@mock.patch('cryptography.fernet.Fernet.decrypt')
def test_get_vim_extenal(self, mock_decrypt, mock_validate,
mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect()
self._mock_external_token_api()
barbican_uuid = 'test_uuid'
mock_validate.return_value = barbican_uuid
vim_info = {'id': 'aaaa', 'name': 'VIM0', 'type': 'test_vim',
'auth_cred': {'username': 'test',
'user_domain_name': 'test',
'cert_verify': 'True',
'project_id': 'test',
'project_name': 'test',
'project_domain_name': 'test',
'auth_url': 'http://test/identity/v3',
'key_type': 'barbican_key',
'secret_uuid': '***',
'password': '***'},
'auth_url': 'http://127.0.0.1/identity/v3',
'placement_attr': {'regions': ['TestRegionOne']},
'tenant_id': 'test'}
self.nfvo_plugin.get_vim.return_value = vim_info
self.service_plugins.get.return_value = self.nfvo_plugin
def mock_barbican_resp(request, context):
auth_value = 'Bearer %s' % self.access_token
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
response = {
'name': 'AES key',
'expiration': '2023-01-13T19:14:44.180394',
'algorithm': 'aes',
'bit_length': 256,
'mode': 'cbc',
'payload': 'YmVlcg==',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64'
}
context.status_code = 200
return response
self.requests_mock.get('http://demo/barbican/v1/secrets/%s' %
barbican_uuid,
json=mock_barbican_resp)
def mock_barbican_payload_resp(request, context):
auth_value = 'Bearer %s' % self.access_token
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
response = '5cJeztZKzISf1JAt73oBeTPPCrymn96A3wqG96F4RxU='
context.status_code = 200
return response
def mock_get_barbican_resp(request, context):
auth_value = 'Bearer %s' % self.access_token
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
response = {
"versions": {
"values": [
{
"id": "v1",
"status": "stable",
"links": [
{
"rel": "self",
"href": "http://demo/barbican/v1/"
},
{
"rel": "describedby",
"type": "text/html",
"href": "https://docs.openstack.org/"}
],
"media-types": [
{
"base": "application/json",
"type": "application/"
"vnd.openstack.key-manager-v1+json"
}
]
}
]
}
}
return response
self.requests_mock.get('http://demo/barbican/v1/secrets/%s/payload' %
barbican_uuid,
json=mock_barbican_payload_resp)
self.requests_mock.get('http://demo/barbican',
json=mock_get_barbican_resp)
mock_decrypt.return_value = 'test'.encode('utf-8')
with mock.patch.object(manager.TackerManager, 'get_service_plugins',
return_value=self.service_plugins):
self.vimclient.get_vim(None,
vim_id=self.vim_info['id'],
region_name='TestRegionOne')

View File

@ -19,7 +19,6 @@ from cryptography import fernet
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from tacker.common import utils from tacker.common import utils
from tacker import context as t_context from tacker import context as t_context
from tacker.extensions import nfvo from tacker.extensions import nfvo
@ -120,9 +119,11 @@ class VimClient(object):
""" """
cred = secret_value.encode('utf-8') cred = secret_value.encode('utf-8')
if auth.get('key_type') == 'barbican_key': if auth.get('key_type') == 'barbican_key':
keystone_conf = CONF.keystone_authtoken
secret_uuid = auth['secret_uuid'] secret_uuid = auth['secret_uuid']
keymgr_api = KEYMGR_API(keystone_conf.auth_url) if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
k_context = t_context.generate_tacker_service_context() k_context = t_context.generate_tacker_service_context()
secret_obj = keymgr_api.get(k_context, secret_uuid) secret_obj = keymgr_api.get(k_context, secret_uuid)
vim_key = secret_obj.payload vim_key = secret_obj.payload

View File

@ -22,3 +22,4 @@ python-blazarclient>=1.0.1 # Apache-2.0
requests-mock>=1.2.0 # Apache-2.0 requests-mock>=1.2.0 # Apache-2.0
PyMySQL>=0.10.1 # MIT PyMySQL>=0.10.1 # MIT
freezegun>=1.2.2 # Apache-2.0 freezegun>=1.2.2 # Apache-2.0
PyJWT>=2.4.0 # MIT