Merge "External OAuth2.0 Authorization Server Support"

This commit is contained in:
Zuul 2023-09-15 11:18:27 +00:00 committed by Gerrit Code Review
commit 2d780609c2
22 changed files with 1794 additions and 32 deletions

View File

@ -14,6 +14,7 @@ namespace = oslo.service.service
namespace = tacker.alarm_receiver
namespace = tacker.auth
namespace = tacker.common.config
namespace = tacker.common.ext_oauth2_auth
namespace = tacker.conductor.conductor_server
namespace = tacker.conf
namespace = tacker.keymgr

View File

@ -0,0 +1,6 @@
---
features:
- Support Tacker service to obtain an OAuth 2.0 access token from an
external authorization server, and then use the access token to access
related OpenStack services that uses the external_oauth2_token filter
provided by the keystone middleware for permission authentication.

View File

@ -53,6 +53,7 @@ setuptools!=24.0.0,!=34.0.0,!=34.0.1,!=34.0.2,!=34.0.3,!=34.1.0,!=34.1.1,!=34.2.
tooz>=1.58.0 # Apache-2.0
PyYAML>=5.4.1 # MIT
PyMySQL>=0.10.1 # MIT
PyJWT>=2.4.0 # MIT
# Glance Store
glance-store>=2.4.0 # Apache-2.0

View File

@ -81,6 +81,7 @@ oslo.config.opts =
tacker.alarm_receiver = tacker.alarm_receiver:config_opts
tacker.auth = tacker.auth:config_opts
tacker.common.config = tacker.common.config:config_opts
tacker.common.ext_oauth2_auth = tacker.common.ext_oauth2_auth:config_opts
tacker.conductor.conductor_server = tacker.conductor.conductor_server:config_opts
tacker.conf = tacker.conf.opts:list_opts
tacker.keymgr = tacker.keymgr:config_opts

View File

@ -246,30 +246,20 @@ class CryptKeyBase(metaclass=abc.ABCMeta):
class CryptKeyBarbican(CryptKeyBase):
def load_key(self, id):
k_context = t_context.generate_tacker_service_context()
# After external authorization server support for barbican is
# implemented, the endpoint retrieval method for keymgr_api must be
# changed by enabling the following commented out part:
# if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
# keymgr_api = KEYMGR_API(CONF.ext_oauth2_auth.token_endpoint)
# else:
# keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
secret_obj = keymgr_api.get(k_context, id)
master_key = secret_obj.payload
return master_key
def save_key(self, key):
k_context = t_context.generate_tacker_service_context()
# After external authorization server support for barbican is
# implemented, the endpoint retrieval method for keymgr_api must be
# changed by enabling the following commented out part:
# if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
# keymgr_api = KEYMGR_API(CONF.ext_oauth2_auth.token_endpoint)
# else:
# keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
secret_uuid = keymgr_api.store(k_context, key)
return secret_uuid

View File

@ -0,0 +1,370 @@
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Auth for External Server OAuth2.0 authentication
"""
import time
import uuid
import jwt.utils
from oslo_config import cfg
from oslo_log import log as logging
import requests.auth
from keystoneauth1 import exceptions as ksa_exceptions
from keystoneauth1.loading import session as session_loading
from tacker._i18n import _
from tacker.common.exceptions import TackerException
LOG = logging.getLogger(__name__)
_EXT_AUTH_CONFIG_GROUP_NAME = 'ext_oauth2_auth'
_EXTERNAL_AUTH2_OPTS = [
cfg.BoolOpt('use_ext_oauth2_auth', default=False,
help='Set True to use external Oauth2.0 auth server.'),
cfg.StrOpt('token_endpoint',
help='The endpoint for access token API.'),
cfg.StrOpt('scope',
help='The scope that the access token can access.'),
]
_EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS = [
cfg.StrOpt('certfile',
help='Required if identity server requires client '
'certificate.'),
cfg.StrOpt('keyfile',
help='Required if identity server requires client '
'private key.'),
cfg.StrOpt('cafile',
help='A PEM encoded Certificate Authority to use when '
'verifying HTTPs connections. Defaults to system CAs.'),
cfg.BoolOpt('insecure', default=False, help='Verify HTTPS connections.'),
cfg.IntOpt('http_connect_timeout',
help='Request timeout value for communicating with Identity '
'API server.'),
cfg.StrOpt('audience',
help='The Audience should be the URL of the Authorization '
"Server's Token Endpoint. The Authorization Server will "
'verify that it is an intended audience for the token.'),
cfg.StrOpt('auth_method',
default='client_secret_basic',
choices=('client_secret_basic', 'client_secret_post',
'tls_client_auth', 'private_key_jwt',
'client_secret_jwt'),
help='The auth_method must use the authentication method '
'specified by the Authorization Server.'),
cfg.StrOpt('client_id',
help='The OAuth 2.0 Client Identifier valid at the '
'Authorization Server.'),
cfg.StrOpt('client_secret',
help='The OAuth 2.0 client secret. When the auth_method is '
'client_secret_basic, client_secret_post, or '
'client_secret_jwt, the value is used, and otherwise the '
'value is ignored.'),
cfg.StrOpt('jwt_key_file',
help='The jwt_key_file must use the certificate key file which '
'has been registered with the Authorization Server. '
'When the auth_method is private_key_jwt, the value is '
'used, and otherwise the value is ignored.'),
cfg.StrOpt('jwt_algorithm',
help='The jwt_algorithm must use the algorithm specified by '
'the Authorization Server. When the auth_method is '
'client_secret_jwt, this value is often set to HS256,'
'when the auth_method is private_key_jwt, the value is '
'often set to RS256, and otherwise the value is ignored.'),
cfg.IntOpt('jwt_bearer_time_out', default=3600,
help='This value is used to calculate the expiration time. If '
'after the expiration time, the access token cannot be '
'accepted. When the auth_method is client_secret_jwt or '
'private_key_jwt, the value is used, and otherwise the '
'value is ignored.'),
]
def config_opts():
return [(_EXT_AUTH_CONFIG_GROUP_NAME,
_EXTERNAL_AUTH2_OPTS + _EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS)]
cfg.CONF.register_opts(_EXTERNAL_AUTH2_OPTS,
group=_EXT_AUTH_CONFIG_GROUP_NAME)
class ExtOAuth2Auth(object):
"""Construct an Auth to fetch an access token for HTTP access."""
def __init__(self):
self._conf = cfg.CONF.ext_oauth2_auth
# Check whether the configuration parameter has been registered
if 'auth_method' not in self._conf:
LOG.debug('The relevant config parameters are not registered '
'and need to be registered before they can be used.')
cfg.CONF.register_opts(_EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS,
group=_EXT_AUTH_CONFIG_GROUP_NAME)
self.token_endpoint = self._get_config_option(
'token_endpoint', is_required=True)
self.auth_method = self._get_config_option(
'auth_method', is_required=True)
self.client_id = self._get_config_option(
'client_id', is_required=True)
self.scope = self._get_config_option(
'scope', is_required=True)
self.access_token = None
def _get_config_option(self, key, is_required):
"""Read the value from config file by the config key."""
try:
value = getattr(self._conf, key)
except cfg.NoSuchOptError:
value = None
if not value:
if is_required:
LOG.error('The value is required for option %s '
'in group [%s]' % (key,
_EXT_AUTH_CONFIG_GROUP_NAME))
raise TackerException(
_('Configuration error. The parameter '
'is not set for "%s" in group [%s].') % (
key, _EXT_AUTH_CONFIG_GROUP_NAME))
else:
return None
else:
return value
def create_session(self, **kwargs):
"""Create session for HTTP access."""
kwargs.setdefault('cert', self._get_config_option(
'certfile', is_required=False))
kwargs.setdefault('key', self._get_config_option(
'keyfile', is_required=False))
kwargs.setdefault('cacert', self._get_config_option(
'cafile', is_required=False))
kwargs.setdefault('insecure', self._get_config_option(
'insecure', is_required=False))
kwargs.setdefault('timeout', self._get_config_option(
'http_connect_timeout', is_required=False))
kwargs.setdefault('user_agent', 'tacker service')
sess = session_loading.Session().load_from_options(**kwargs)
sess.auth = self
return sess
def get_connection_params(self, session, **kwargs):
"""Get connection params for HTTP access."""
return {}
def invalidate(self):
"""Invalidate the current authentication data."""
self.access_token = None
return True
def _get_token_by_client_secret_basic(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'client_secret_basic'.
"""
para = {
'scope': self.scope,
'grant_type': 'client_credentials'
}
auth = requests.auth.HTTPBasicAuth(
self.client_id, self._get_config_option(
'client_secret', is_required=True))
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para,
requests_auth=auth)
return http_response
def _get_token_by_client_secret_post(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'client_secret_post'.
"""
para = {
'client_id': self.client_id,
'client_secret': self._get_config_option(
'client_secret', is_required=True),
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def _get_token_by_tls_client_auth(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'tls_client_auth'.
"""
para = {
'client_id': self.client_id,
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def _get_token_by_private_key_jwt(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'private_key_jwt'.
"""
jwt_key_file = self._get_config_option(
'jwt_key_file', is_required=True)
with open(jwt_key_file, 'r') as jwt_file:
jwt_key = jwt_file.read()
ita = round(time.time())
exp = ita + self._get_config_option(
'jwt_bearer_time_out', is_required=True)
alg = self._get_config_option('jwt_algorithm', is_required=True)
client_assertion = jwt.encode(
payload={
'jti': str(uuid.uuid4()),
'iat': str(ita),
'exp': str(exp),
'iss': self.client_id,
'sub': self.client_id,
'aud': self._get_config_option('audience', is_required=True)},
headers={
'typ': 'JWT',
'alg': alg},
key=jwt_key,
algorithm=alg)
para = {
'client_id': self.client_id,
'client_assertion_type':
'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion,
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def _get_token_by_client_secret_jwt(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'client_secret_jwt'.
"""
ita = round(time.time())
exp = ita + self._get_config_option(
'jwt_bearer_time_out', is_required=True)
alg = self._get_config_option('jwt_algorithm', is_required=True)
client_secret = self._get_config_option(
'client_secret', is_required=True)
client_assertion = jwt.encode(
payload={
'jti': str(uuid.uuid4()),
'iat': str(ita),
'exp': str(exp),
'iss': self.client_id,
'sub': self.client_id,
'aud': self._get_config_option('audience', is_required=True)},
headers={
'typ': 'JWT',
'alg': alg},
key=client_secret,
algorithm=alg)
para = {
'client_id': self.client_id,
'client_secret': client_secret,
'client_assertion_type':
'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion,
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def get_headers(self, session, **kwargs):
"""Get an access token and add to request header for HTTP access."""
if not self.access_token:
try:
if self.auth_method == 'tls_client_auth':
http_response = self._get_token_by_tls_client_auth(session)
elif self.auth_method == 'client_secret_post':
http_response = self._get_token_by_client_secret_post(
session)
elif self.auth_method == 'client_secret_basic':
http_response = self._get_token_by_client_secret_basic(
session)
elif self.auth_method == 'private_key_jwt':
http_response = self._get_token_by_private_key_jwt(
session)
elif self.auth_method == 'client_secret_jwt':
http_response = self._get_token_by_client_secret_jwt(
session)
else:
LOG.error('The value is incorrect for option '
'auth_method in group [%s]' %
_EXT_AUTH_CONFIG_GROUP_NAME)
raise TackerException(
_('The configuration parameter for '
'key "auth_method" in group [%s] is incorrect.') %
_EXT_AUTH_CONFIG_GROUP_NAME)
LOG.debug(http_response.text)
if http_response.status_code != 200:
LOG.error('The OAuth2.0 access token API returns an '
'incorrect response. '
'response_status: %s, response_text: %s' %
(http_response.status_code,
http_response.text))
raise TackerException(_('Failed to get an access token.'))
access_token = http_response.json().get('access_token')
if not access_token:
LOG.error('Failed to get an access token: %s',
http_response.text)
raise TackerException(_('Failed to get an access token.'))
self.access_token = access_token
except (ksa_exceptions.ConnectFailure,
ksa_exceptions.DiscoveryFailure,
ksa_exceptions.RequestTimeout) as error:
LOG.error('Unable to get an access token: %s', error)
raise TackerException(
_('The OAuth2.0 access token API service is '
'temporarily unavailable.'))
except TackerException:
raise
except Exception as error:
LOG.error('Unable to get an access token: %s', error)
raise TackerException(
_('An exception occurred during the processing '
'of getting an access token'))
header = {'Authorization': f'Bearer {self.access_token}'}
return header

View File

@ -24,6 +24,7 @@ from oslo_context import context as oslo_context
from oslo_db.sqlalchemy import enginefacade
from tacker.common import exceptions
from tacker.common.ext_oauth2_auth import ExtOAuth2Auth
from tacker.db import api as db_api
from tacker import policy
@ -217,6 +218,8 @@ def is_user_context(context):
def generate_tacker_service_context():
if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
return ExtOAuth2Auth()
return keystone_password.KeystonePassword(
password=CONF.keystone_authtoken.password,
auth_url=CONF.keystone_authtoken.auth_url,

View File

@ -20,6 +20,11 @@ key_manager_opts = [
default='tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager',
help='The full class name of the key manager API class'),
cfg.StrOpt('barbican_endpoint',
help='The endpoint for barbican API.'),
cfg.StrOpt('barbican_version',
default='v1',
help='The version for barbican API.'),
]

View File

@ -20,9 +20,11 @@ from barbicanclient import client as barbican_client
from barbicanclient import exceptions as barbican_exception
from keystoneauth1 import identity
from keystoneauth1 import session
from oslo_config import cfg
from oslo_log import log as logging
from tacker._i18n import _
from tacker.common.exceptions import TackerException
from tacker.keymgr import exception
from tacker.keymgr import key_manager
@ -57,6 +59,30 @@ class BarbicanKeyManager(key_manager.KeyManager):
if self._barbican_client and self._current_context == context:
return self._barbican_client
if cfg.CONF.ext_oauth2_auth.use_ext_oauth2_auth:
try:
barbican_endpoint = cfg.CONF.key_manager.barbican_endpoint
barbican_version = cfg.CONF.key_manager.barbican_version
if not barbican_endpoint:
msg = _('The value is required for option %s in group '
'[key_manager]') % 'barbican_endpoint'
raise TackerException(msg)
sess = context.create_session()
self._barbican_endpoint = barbican_endpoint
if self._barbican_endpoint[-1] == '/':
self._barbican_endpoint = self._barbican_endpoint[:-1]
self._barbican_client = barbican_client.Client(
session=sess,
endpoint=self._barbican_endpoint)
self._current_context = context
self._base_url = '%s/%s/' % (
self._barbican_endpoint,
barbican_version)
return self._barbican_client
except Exception as e:
LOG.error('Error creating Barbican client: %s', e)
raise exception.KeyManagerError(reason=e)
try:
auth = self._get_keystone_auth(context)
sess = session.Session(auth=auth)

View File

@ -177,9 +177,13 @@ class Kubernetes_Driver(abstract_vim_driver.VimAbstractDriver):
try:
k_context = \
t_context.generate_tacker_service_context()
keystone_conf = CONF.keystone_authtoken
secret_uuid = auth['secret_uuid']
keymgr_api = KEYMGR_API(keystone_conf.auth_url)
if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(
CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(
CONF.keystone_authtoken.auth_url)
keymgr_api.delete(k_context, secret_uuid)
LOG.debug('VIM key deleted successfully for vim %s',
vim_id)
@ -217,8 +221,11 @@ class Kubernetes_Driver(abstract_vim_driver.VimAbstractDriver):
if CONF.k8s_vim.use_barbican:
try:
k_context = t_context.generate_tacker_service_context()
keystone_conf = CONF.keystone_authtoken
keymgr_api = KEYMGR_API(keystone_conf.auth_url)
if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(
CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
secret_uuid = keymgr_api.store(k_context, fernet_key)
auth['key_type'] = 'barbican_key'

View File

@ -194,9 +194,12 @@ class OpenStack_Driver(abstract_vim_driver.VimAbstractDriver,
if auth.get('key_type') == 'barbican_key':
try:
k_context = t_context.generate_tacker_service_context()
keystone_conf = CONF.keystone_authtoken
secret_uuid = auth['secret_uuid']
keymgr_api = KEYMGR_API(keystone_conf.auth_url)
if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(
CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
keymgr_api.delete(k_context, secret_uuid)
LOG.debug('VIM key deleted successfully for vim %s',
vim_id)
@ -227,8 +230,11 @@ class OpenStack_Driver(abstract_vim_driver.VimAbstractDriver,
if CONF.vim_keys.use_barbican:
try:
k_context = t_context.generate_tacker_service_context()
keystone_conf = CONF.keystone_authtoken
keymgr_api = KEYMGR_API(keystone_conf.auth_url)
if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(
CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
secret_uuid = keymgr_api.store(k_context, fernet_key)
auth['key_type'] = 'barbican_key'

View File

@ -569,9 +569,11 @@ class NfvoPlugin(nfvo_db_plugin.NfvoPluginDb, vnffg_db.VnffgPluginDbMixin,
cred = auth['password'].encode('utf-8')
if auth.get('key_type') == 'barbican_key':
k_context = t_context.generate_tacker_service_context()
keystone_conf = CONF.keystone_authtoken
secret_uuid = auth['secret_uuid']
keymgr_api = KEYMGR_API(keystone_conf.auth_url)
if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
secret_obj = keymgr_api.get(k_context, secret_uuid)
vim_key = secret_obj.payload
else:

View File

@ -0,0 +1,52 @@
-----BEGIN PRIVATE KEY-----
MIIJQwIBADANBgkqhkiG9w0BAQEFAASCCS0wggkpAgEAAoICAQDegNuQgmQL7n10
+Z3itXtpiNHlvZwCYOS66+3PakAw1OoRB6SiHeNYnuVRHlraTDKnnfgHhX/1AVs7
P36QU5PVYznGip2PXZlCh8MeQhpXgKKt25LPnpQOnUssHyq+OqTHZB6eS2C7xMHf
wzPrYRwxhbVgUUVe85cdiXaL5ZRqXNotM00wH1hck4s+1fsnKv7UeGbwM1WwMn6/
0E1eKwYzlKm4Vmkcivy8WBI7Ijp/MPOUyRXN/mPh8L2VOq0D1E3pufYoYmpBkiQi
Ii8nz5CXrhDpM0tGKD+RZ+howE2i+frI2gNDfU5xMx+k+qjD0jftDrQ+OZUujUtq
6JfdrvtPBT01XZw8GV5Rm9vEwMRduWUDGdRB3chOTeTUdsIG765+Ot7GE7nYrAs0
s/ryAm1FnNJocTzje7k07IzdBpWzrTrx087Kfcsn6evEABOxim0i+AHUR94QR9/V
EP3/+SkJ7zl9P1KzOZZCWtUTnfQxrLhEnCwwjtl35vWlzst+TR7HDwIzQRQVLFH9
zMTz8tw6coPifkbVzdwCLGoKge4llDPcVx/TmIGFD3saT0E68yxXe6k3cdIg6lZf
dB0yutVBzECrx+LiIpxwQWRKHNiR58KsHHmgXDb8ORBCjpmctD+JsdBhf8hDRMXP
9sV/fbMUwgrRceyj9AV2x59tE9+UHwIDAQABAoICABb6V7JkxNA2oN4jqRpwg34y
kvqWyjW0q+ph0v1Ii7h/RGzdzTKww3mzbxshd2Bz3gdRWPvt3Xj/2twTgo6FEw9G
YAEQ75SOpfUo8A1/5hiDQEmUE2U9iyy3Mbwsu81JYRr2S/Ms9aBugVcKYaI9NRwo
IsL/oZpcrY5vU76+xsT1MdLZKW9+zTFCS28Byh4RYp+uj3Le2kqH7G8Co/rFlq5c
++n9gn1gHRmWPsu8jS31cDI9UfMkAkyi//EZTiTHGAS7H6CsCS0cWn7r6NLDrLr9
TuHGWk+0eFwbzvSCZ4IdLrjvSsb9ecxW6z2uZR9T5lKk4hhK+g0EqnUv7/8Eww8E
wA2J1zhuQ0UzoAowjj5338whBQROKSO4u3ppxhNUSP7fUgYdEKUQEg7rlfEzI+pG
dV1LtG0GZBzdZXpE/PTpASjefCkC6olmZpUvajHJGqP0a/ygA9SEBm+B/Q4ii7+0
luk6Lj6z+vSWatU7LrLnQeprN82NWxtkH+u2gjMOq1N8r4FOFvbZYBp1NMvtH4iP
R6jLdJWYx/KOr4lCkbgTszlVhPop8dktOPQSPL4u6RxdmsGBf028oWKXLrj1D1Ua
dBWR1L1CCnI8X6jxL6eT52qF+NY2JxanX6NnzxE/KqedWXmKDxn0M3ETfizz9UG4
8UmsMgJ8UUALRbWHjlEBAoIBAQDvQmYWhYtUQjcaeLkhjd9qPXjLGIL6NYnrYEUK
Yenn0mh7SKZTTL8yz/QVOecD/QiBetmQJ5FsmUmqorGWYfbWs6C+p2yHQ0U9X7rE
3ynFm0MDluuNZMWYRu0Yb6gvCYjitlt/efGKDalP1Al1hX2w9fUGdj32K6fulEX6
dcl4r2bq4i+rOwe9YDD9yvkvh1+aCwA56JCTBoEBsbmOdKTC7431rT8BTLbBaXwy
hf35P9wzU079QwwqDKdUlMQjUz9gWZkYFHkPfce2MCm+T0aHNnjQtLXRGOcIj15P
B64+GB9b86XNZlqpuY2rceF+LDwaw4rgQkXDr+TdAsjrtcdHAoIBAQDuElNxRc9t
hKwZGBVIWaHI3Y3oaTymi277DwhDzvmJgwBAddfEaC1rCt/giXxtmhhnAXpDD4sk
3m8iWw1jODRxOv2UDkUvSRV5tfY+QTG0nVVmMpX3lPWpIYxEVg34WYSq0xnXKrpW
zxUOqD1fW2i2lXZtFAb6ZNt/hHts7KUPzk9/ZbAomVHO6JO4Ac3n0LTDSCmQHhRO
5gV0ea4Sh6AVOiFD20rMAnTFNnxnI+wLMt0SNAzouhRMulDqOcAmoH2DKG8PCcEt
dQpUDwITxXuomsjhIHIli760MwSlwWZbrh5h7NAj1VmnQBtMkLnBtnE7cFSVdcPt
BAFnq72txGhpAoIBAQDIWYKhM1zTxsrbyOHF3kaKcUVYVINBQFnolunZYtp6vG+v
ZMuaj3a/9vE+YQk5Bsb7ncLXerrFBKtyTuCEvC315d8iJ5KyxbsSRLpiJzmUdoos
VFGVSiBIfoQF5WIhWUueBPQjkBqZ7wyrgzQUjB8PczamHZePL0lleBYNQFrgS4jU
AWnHahv2EbmUnEYD7ck5diLPWxbNdzHKGGf4iWZ6shze8B8FWJbk6Q8OQ7PD5xze
gdFwNJfYElaAdj60Ef7NENopFuO0/C+jOTuLWFkH2q5anihuGvtD6MIhTZ4z8wE3
f5SEpkQfQfkG6srXW/VMuBfv6K8AyabNB4r2Dnb7AoIBADHy2lrroKeDrG/fY6e4
Vn9ELJ/UZIs0ueYmsz82z5gQSh88Gjb0/IJ2153OerKsH+6MmtAzFKh5mquEmvx0
MFyJWeaUT+Op272bdbx+BSW11NMKTfiR4jDH/xvfSjMO5QzKGaPRLSNFc0+N8MJu
9TtJhH1CNGyYeIz6iMLDq6XzTS6XcSwzbryQg12Z00+NtD88hqvcA7rB++cCGIl+
txF9Drmj6r9+zG0MD3G8UavP0h4dmY/CarvmY0+hKjVweqTn+NUY4NTet3oHZBIt
3tHzF65UFl7WQP6hrZnxR754e5tkCg9aleLHSnL38mE4G+2ylax99stlib3shHFO
wfECggEBAJrW8BmZXbD8ss3c7kHPzleAf1q/6bPnxRXB0luCPz7tkMfdkOQ2cG1t
rcnsKcyR2woEbtdRK938KxZgTgzKYVhR8spKFSh01/d9OZAP6f+iCoR2zzOlSFo4
pejnQY0LHEwGZmnzghLoqJSUgROAR49CvLO1mI48CaEUuLmqzPYWNXMHDDU2N5XO
uF0/ph68fnI+f+0ZUgdpVPFRnfSrAqzEhzEMh1vnZ4ZxEVpgUcn/hRfNZ3hN0LEr
fjm2bWxg2j0rxjS0mUDQpaMj0253jVYRiC3M3cCh0NSZtwaXVJYCVxetpjBTPfJr
jIgmPTKGR0FedjAeCBByH9vkw8iRg7w=
-----END PRIVATE KEY-----

View File

@ -0,0 +1,457 @@
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import copy
import os
from unittest import mock
import uuid
from oslo_config import cfg
from requests_mock.contrib import fixture as rm_fixture
from keystoneauth1 import exceptions as ksa_exceptions
from tacker.common.exceptions import TackerException
from tacker import context
from tacker.tests.unit import base
JWT_KEY_FILE = 'jwt_private.key'
def _get_sample_key(name):
filename = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"./sample_keys/", name)
with open(filename, "r") as f:
content = f.read()
return content
def get_mock_conf_effect(audience=None, token_endpoint=None,
auth_method=None, client_id=None, client_secret=None,
scope=None, jwt_key_file=None, jwt_algorithm=None,
jwt_bearer_time_out=None, certfile=None, keyfile=None,
cafile=None, http_connect_timeout=None, insecure=None):
def mock_conf_key_effect(name):
if name == 'keystone_authtoken':
return MockConfig(conf=None)
elif name == 'ext_oauth2_auth':
config = {'use_ext_oauth2_auth': True}
if audience:
config['audience'] = audience
if token_endpoint:
config['token_endpoint'] = token_endpoint
if auth_method:
config['auth_method'] = auth_method
if client_id:
config['client_id'] = client_id
if client_secret:
config['client_secret'] = client_secret
if scope:
config['scope'] = scope
if jwt_key_file:
config['jwt_key_file'] = jwt_key_file
if jwt_algorithm:
config['jwt_algorithm'] = jwt_algorithm
if jwt_bearer_time_out:
config['jwt_bearer_time_out'] = jwt_bearer_time_out
if certfile:
config['certfile'] = certfile
if keyfile:
config['keyfile'] = keyfile
if cafile:
config['cafile'] = cafile
if cafile:
config['http_connect_timeout'] = http_connect_timeout
if cafile:
config['insecure'] = insecure
return MockConfig(
conf=config)
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf or name not in self.conf:
raise cfg.NoSuchOptError(f'not found {name}')
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class MockSession(object):
def __init__(self, ):
self.auth = None
class TestExtOAuth2Auth(base.TestCase):
def setUp(self):
super(TestExtOAuth2Auth, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture())
self.token_endpoint = 'http://demo/token_endpoint'
self.auth_method = 'client_secret_post'
self.client_id = 'test_client_id'
self.client_secret = 'test_client_secret'
self.scope = 'tacker_api'
self.access_token = f'access_token_{str(uuid.uuid4())}'
self.audience = 'http://demo/audience'
self.jwt_bearer_time_out = 2800
self.addCleanup(mock.patch.stopall)
def _get_access_token_response(self, request, context,
auth_method=None,
client_id=None,
client_secret=None,
scope=None,
access_token=None,
status_code=200,
raise_error=None,
resp=None
):
if raise_error:
raise raise_error
if auth_method == 'tls_client_auth':
body = (f'client_id={client_id}&scope={scope}'
f'&grant_type=client_credentials')
self.assertEqual(request.text, body)
elif auth_method == 'client_secret_post':
body = (f'client_id={client_id}&client_secret={client_secret}'
f'&scope={scope}&grant_type=client_credentials')
self.assertEqual(request.text, body)
elif auth_method == 'client_secret_basic':
body = f'scope={scope}&grant_type=client_credentials'
self.assertEqual(request.text, body)
auth_basic = request._request.headers.get('Authorization')
self.assertIsNotNone(auth_basic)
auth = 'Basic ' + base64.standard_b64encode(
f'{client_id}:{client_secret}'.encode('ascii')).decode('ascii')
self.assertEqual(auth_basic, auth)
elif auth_method == 'private_key_jwt':
self.assertIn(f'client_id={client_id}', request.text)
self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A'
'oauth%3Aclient-assertion-type%3Ajwt-bearer'),
request.text)
self.assertIn('client_assertion=', request.text)
self.assertIn(f'scope={scope}', request.text)
self.assertIn('grant_type=client_credentials', request.text)
elif auth_method == 'client_secret_jwt':
self.assertIn(f'client_id={client_id}', request.text)
self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A'
'oauth%3Aclient-assertion-type%3Ajwt-bearer'),
request.text)
self.assertIn('client_assertion=', request.text)
self.assertIn(f'scope={scope}', request.text)
self.assertIn('grant_type=client_credentials', request.text)
if not access_token:
access_token = f'access_token{str(uuid.uuid4())}'
if not resp:
if status_code == 200:
response = {
'access_token': access_token,
'expires_in': 1800,
'refresh_expires_in': 0,
'token_type': 'Bearer',
'not-before-policy': 0,
'scope': scope
}
else:
response = {'error': 'error_title',
'error_description': 'error message'}
else:
response = copy.deepcopy(resp)
context.status_code = status_code
return response
def _get_default_mock_conf_effect(self):
return get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
def _check_authorization_header(self):
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
headers = auth_context.get_headers(session)
bearer = f'Bearer {self.access_token}'
self.assertIn('Authorization', headers)
self.assertEqual(bearer, headers.get('Authorization'))
return auth_context
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_token_endpoint(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint='',
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope
)
self.assertRaises(TackerException,
context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_scope(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret)
self.assertRaises(TackerException,
context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_keystone_middleware_opts(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
self.assertRaises(TackerException,
context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('keystoneauth1.loading.session.Session.load_from_options')
def test_create_session(self, mock_load_from_options, mock_get_conf_key):
certfile = f'/demo/certfile{str(uuid.uuid4())}'
keyfile = f'/demo/keyfile{str(uuid.uuid4())}'
cafile = f'/demo/cafile{str(uuid.uuid4())}'
conf_insecure = True
http_connect_timeout = 1000
def load_side_effect(**kwargs):
self.assertEqual(conf_insecure, kwargs.get('insecure'))
self.assertEqual(cafile, kwargs.get('cacert'))
self.assertEqual(certfile, kwargs.get('cert'))
self.assertEqual(keyfile, kwargs.get('key'))
self.assertEqual(http_connect_timeout, kwargs.get('timeout'))
return MockSession()
mock_load_from_options.side_effect = load_side_effect
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='tls_client_auth',
client_id=self.client_id,
scope=self.scope,
certfile=certfile,
keyfile=keyfile,
cafile=cafile,
insecure=conf_insecure,
http_connect_timeout=http_connect_timeout)
auth_context = context.generate_tacker_service_context()
auth_context.create_session()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_connection_params(self, mock_get_conf_key):
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
params = auth_context.get_connection_params(session)
self.assertDictEqual(params, {})
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_tls_client_auth(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='tls_client_auth',
client_id=self.client_id,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='tls_client_auth',
client_id=self.client_id,
scope=self.scope)
auth_context = self._check_authorization_header()
result = auth_context.invalidate()
self.assertEqual(True, result)
self.assertIsNone(auth_context.access_token)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_post(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='client_secret_post',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token
)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_post',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_basic(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='client_secret_basic',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_basic',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
self._check_authorization_header()
@mock.patch('builtins.open', mock.mock_open(read_data=_get_sample_key(
JWT_KEY_FILE)))
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_private_key_jwt(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='private_key_jwt',
client_id=self.client_id,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='private_key_jwt',
client_id=self.client_id,
audience=self.audience,
jwt_key_file=f'/demo/jwt_key_file{str(uuid.uuid4())}',
jwt_algorithm='RS256',
jwt_bearer_time_out=self.jwt_bearer_time_out,
scope=self.scope)
self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_jwt(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='client_secret_jwt',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_jwt',
client_id=self.client_id,
audience=self.audience,
client_secret=self.client_secret,
jwt_algorithm='HS256',
jwt_bearer_time_out=self.jwt_bearer_time_out,
scope=self.scope)
self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_invalid_auth_method(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_other',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope
)
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_connect_fail(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
raise_error=ksa_exceptions.RequestTimeout('connect time out.'))
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_is_not_200(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
status_code=201)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_not_include_access_token(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
status_code=200,
resp={'error': 'invalid_client',
'error_description': 'The client is not found.'})
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_unknown_error(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
raise_error=Exception('unknown error occurred.'))
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)

View File

@ -0,0 +1,255 @@
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import uuid
from oslo_config import cfg
from requests_mock.contrib import fixture as rm_fixture
from tacker import context as t_context
from tacker.keymgr.barbican_key_manager import BarbicanKeyManager
from tacker.keymgr import exception
from tacker.tests.unit import base
def get_mock_conf_key_effect(barbican_endpoint=None):
def mock_conf_key_effect(name):
if name == 'ext_oauth2_auth':
return MockConfig(
conf={
'use_ext_oauth2_auth': True,
'token_endpoint': 'http://demo/token_endpoint',
'auth_method': 'client_secret_post',
'client_id': 'client_id',
'client_secret': 'client_secret',
'scope': 'client_secret'
})
elif name == 'key_manager':
conf = {
'api_class': 'tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager',
'barbican_version': 'v1',
'barbican_endpoint': barbican_endpoint
}
return MockConfig(conf=conf)
elif name == 'k8s_vim':
return MockConfig(
conf={
'use_barbican': True
})
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf and name not in self.conf:
raise cfg.NoSuchOptError(f'not found {name}')
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class TestBarbicanKeyManager(base.TestCase):
def setUp(self):
super(TestBarbicanKeyManager, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture())
self.token_endpoint = 'http://demo/token_endpoint'
self.auth_method = 'client_secret_post'
self.client_id = 'test_client_id'
self.client_secret = 'test_client_secret'
self.scope = 'tacker_api'
self.access_token = f'access_token_{str(uuid.uuid4())}'
self.audience = 'http://demo/audience'
self.jwt_bearer_time_out = 2800
self.addCleanup(mock.patch.stopall)
def _mock_external_token_api(self):
def mock_token_resp(request, context):
response = {
'access_token': self.access_token,
'expires_in': 1800,
'refresh_expires_in': 0,
'token_type': 'Bearer',
'not-before-policy': 0,
'scope': 'tacker_api'
}
context.status_code = 200
return response
self.requests_mock.post('http://demo/token_endpoint',
json=mock_token_resp)
def _mock_barbican_get_version_resp(self):
def mock_barbican_get_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
response = {
"versions": {
"values": [
{
"id": "v1",
"status": "stable",
"links": [
{
"rel": "self",
"href": "http://demo/barbican/v1/"
},
{
"rel": "describedby",
"type": "text/html",
"href": "https://docs.openstack.org/"}
],
"media-types": [
{
"base": "application/json",
"type": "application/"
"vnd.openstack.key-manager-v1+json"
}
]
}
]
}
}
return response
return mock_barbican_get_resp
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
def test_delete_ext_oauth2_auth(self, mock_validate, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican/')
self._mock_external_token_api()
mock_validate.return_value = True
def mock_barbican_delete_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 204
return ''
def mock_barbican_get_for_check_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
return {}
self.requests_mock.get(
'http://demo/barbican',
json=self._mock_barbican_get_version_resp())
self.requests_mock.delete(
'http://demo/barbican/v1/secrets/True',
json=mock_barbican_delete_resp)
self.requests_mock.get(
'http://demo/barbican/v1/secrets/True',
json=mock_barbican_get_for_check_resp)
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
keymgr.delete(auth, 'test')
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('tacker.keymgr.barbican_key_manager.'
'BarbicanKeyManager._retrieve_secret_uuid')
def test_store_ext_oauth2_auth(self, mock_secret_uuid,
mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican')
secret_id = 'store_secret_uuid'
mock_secret_uuid.return_value = secret_id
self._mock_external_token_api()
def mock_barbican_post_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
response = {
'name': 'AES key',
'expiration': '2023-01-13T19:14:44.180394',
'algorithm': 'aes',
'bit_length': 256,
'mode': 'cbc',
'payload': 'YmVlcg==',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64'
}
context.status_code = 201
return response
self.requests_mock.get(
'http://demo/barbican',
json=self._mock_barbican_get_version_resp())
self.requests_mock.post('http://demo/barbican/v1/secrets/',
json=mock_barbican_post_resp)
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
result = keymgr.store(auth, 'test')
self.assertEqual(result, secret_id)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
def test_get_ext_oauth2_auth(self, mock_validate, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican/')
self._mock_external_token_api()
mock_validate.return_value = True
def mock_barbican_get_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
response = {
'id': 'test001'
}
return response
self.requests_mock.get(
'http://demo/barbican',
json=self._mock_barbican_get_version_resp())
self.requests_mock.get(
'http://demo/barbican/v1/secrets/True',
json=mock_barbican_get_resp)
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
result = keymgr.get(auth, 'test001')
self.assertEqual(result.secret_ref,
'http://demo/barbican/v1/secrets/test001')
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_ext_oauth2_auth_no_endpoint(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='')
self._mock_external_token_api()
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
self.assertRaises(exception.KeyManagerError, keymgr.get, auth, 'test')

View File

@ -52,6 +52,36 @@ cfg.CONF.register_opts(OPTS, 'keystone_authtoken')
CONF = cfg.CONF
def get_mock_conf_key_effect():
def mock_conf_key_effect(name):
if name == 'ext_oauth2_auth':
return MockConfig(
conf={
'use_ext_oauth2_auth': True,
'token_endpoint': 'http://demo/token_endpoint',
'auth_method': 'client_secret_post',
'client_id': 'client_id',
'client_secret': 'client_secret',
'scope': 'client_secret'
})
elif name == 'key_manager':
conf = {
'api_class': 'tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager',
'barbican_version': 'v1',
'barbican_endpoint': 'http://test/barbican'
}
return MockConfig(conf=conf)
elif name == 'k8s_vim':
return MockConfig(
conf={
'use_barbican': True
})
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class FakeKubernetesAPI(mock.Mock):
pass
@ -67,6 +97,21 @@ class mock_dict(dict):
__delattr__ = dict.__delitem__
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
if name not in self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class TestKubernetes_Driver(base.TestCase):
def setUp(self):
super(TestKubernetes_Driver, self).setUp()
@ -213,3 +258,32 @@ class TestKubernetes_Driver(base.TestCase):
del self.vim_obj['auth_cred']['ssl_ca_cert']
self.assertRaises(nfvo.VimUnauthorizedException,
self.kubernetes_driver.register_vim, self.vim_obj)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_deregister_vim_barbican_ext_oauth2_auth(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect()
self.keymgr.delete.return_value = None
vim_obj = self.get_vim_obj_barbican()
self.kubernetes_driver.deregister_vim(vim_obj)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_encode_vim_auth_barbican_ext_oauth2_auth(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect()
self.config_fixture.config(group='k8s_vim',
use_barbican=True)
fernet_attrs = {'encrypt.return_value': 'encrypted_password'}
mock_fernet_obj = mock.Mock(**fernet_attrs)
mock_fernet_key = 'test_fernet_key'
self.keymgr.store.return_value = 'fake-secret-uuid'
self.kubernetes_api.create_fernet_key.return_value = (mock_fernet_key,
mock_fernet_obj)
vim_obj = self.get_vim_obj()
self.kubernetes_driver.encode_vim_auth(
vim_obj['id'], vim_obj['auth_cred'])
mock_fernet_obj.encrypt.assert_called_once_with(mock.ANY)
self.assertEqual(vim_obj['auth_cred']['key_type'],
'barbican_key')
self.assertEqual(vim_obj['auth_cred']['secret_uuid'],
'fake-secret-uuid')

View File

@ -16,9 +16,10 @@
import os
from unittest import mock
from keystoneauth1 import exceptions
from oslo_config import cfg
from keystoneauth1 import exceptions
from tacker import context as t_context
from tacker.extensions import nfvo
from tacker.nfvo.drivers.vim import openstack_driver
@ -54,6 +55,38 @@ cfg.CONF.register_opts(OPTS, 'keystone_authtoken')
CONF = cfg.CONF
def get_mock_conf_key_effect():
def mock_conf_key_effect(name):
if name == 'keystone_authtoken':
return MockConfig(conf=None)
elif name == 'ext_oauth2_auth':
return MockConfig(
conf={
'use_ext_oauth2_auth': True,
'token_endpoint': 'http://demo/token_endpoint',
'auth_method': 'client_secret_post',
'client_id': 'client_id',
'client_secret': 'client_secret',
'scope': 'client_secret'
})
elif name == 'key_manager':
conf = {
'api_class': 'tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager',
'barbican_version': 'v1',
'barbican_endpoint': 'http://test/barbican'
}
return MockConfig(conf=conf)
elif name == 'vim_keys':
return MockConfig(
conf={
'use_barbican': True
})
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class FakeKeystone(mock.Mock):
pass
@ -74,6 +107,21 @@ class mock_dict(dict):
__delattr__ = dict.__delitem__
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
if name not in self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class TestOpenstack_Driver(base.TestCase):
def setUp(self):
super(TestOpenstack_Driver, self).setUp()
@ -255,3 +303,31 @@ class TestOpenstack_Driver(base.TestCase):
self.assertRaises(nfvo.VimGetResourceNotFoundException,
self.openstack_driver.get_vim_resource_id,
self.vim_obj, resource_type, resource_name)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_deregister_vim_barbican_ext_oauth2_auth(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect()
self.keymgr.delete.return_value = None
vim_obj = self.get_vim_obj_barbican()
self.openstack_driver.deregister_vim(vim_obj)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_encode_vim_auth_barbican_ext_oauth2_auth(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect()
fernet_attrs = {'encrypt.return_value': 'encrypted_password'}
mock_fernet_obj = mock.Mock(**fernet_attrs)
mock_fernet_key = 'test_fernet_key'
self.keymgr.store.return_value = 'fake-secret-uuid'
self.keystone.create_fernet_key.return_value = (mock_fernet_key,
mock_fernet_obj)
vim_obj = self.get_vim_obj()
self.openstack_driver.encode_vim_auth(
vim_obj['id'], vim_obj['auth_cred'])
mock_fernet_obj.encrypt.assert_called_once_with(mock.ANY)
self.assertEqual(vim_obj['auth_cred']['key_type'],
'barbican_key')
self.assertEqual(vim_obj['auth_cred']['secret_uuid'],
'fake-secret-uuid')

View File

@ -19,7 +19,9 @@ import os
from unittest import mock
from unittest.mock import patch
from oslo_config import cfg
from oslo_utils import uuidutils
from requests_mock.contrib import fixture as rm_fixture
from tacker.common import exceptions
from tacker import context
@ -28,6 +30,7 @@ from tacker.db.nfvo import nfvo_db
from tacker.db.nfvo import ns_db
from tacker.db.nfvo import vnffg_db
from tacker.extensions import nfvo
from tacker.keymgr import API as KEYMGR_API
from tacker.manager import TackerManager
from tacker.nfvo import nfvo_plugin
from tacker.plugins.common import constants
@ -40,6 +43,53 @@ SECRET_PASSWORD = '***'
DUMMY_NS_2 = 'ba6bf017-f6f7-45f1-a280-57b073bf78ef'
def get_mock_conf_key_effect():
def mock_conf_key_effect(name):
if name == 'keystone_authtoken':
return MockConfig(conf=None)
elif name == 'ext_oauth2_auth':
return MockConfig(
conf={
'use_ext_oauth2_auth': True,
'token_endpoint': 'http://demo/token_endpoint',
'auth_method': 'client_secret_post',
'client_id': 'client_id',
'client_secret': 'client_secret',
'scope': 'client_secret'
})
elif name == 'key_manager':
conf = {
'api_class': 'tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager',
'barbican_endpoint': 'http://demo/barbican',
'barbican_version': 'v1'
}
return MockConfig(conf=conf)
elif name == 'k8s_vim':
return MockConfig(
conf={
'use_barbican': True
})
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
if name not in self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
def dummy_get_vim(*args, **kwargs):
vim_obj = dict()
vim_obj['auth_cred'] = utils.get_vim_auth_obj()
@ -203,6 +253,9 @@ class FakeVNFMPlugin(mock.Mock):
class TestNfvoPlugin(db_base.SqlTestCase):
def setUp(self):
super(TestNfvoPlugin, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture())
KEYMGR_API('')
self.access_token = 'access_token_uuid'
self.addCleanup(mock.patch.stopall)
self.context = context.get_admin_context()
self.nfvo_plugin = nfvo_plugin.NfvoPlugin()
@ -216,6 +269,22 @@ class TestNfvoPlugin(db_base.SqlTestCase):
self._mock(
'tacker.common.driver_manager.DriverManager', fake_driver_manager)
def _mock_external_token_api(self):
def mock_token_resp(request, context):
response = {
'access_token': self.access_token,
'expires_in': 1800,
'refresh_expires_in': 0,
'token_type': 'Bearer',
'not-before-policy': 0,
'scope': 'tacker_api'
}
context.status_code = 200
return response
self.requests_mock.post('http://demo/token_endpoint',
json=mock_token_resp)
def _insert_dummy_vim(self):
session = self.context.session
vim_db = nfvo_db.Vim(
@ -1607,3 +1676,97 @@ class TestNfvoPlugin(db_base.SqlTestCase):
non_admin_context,
'ba6bf017-f6f7-45f1-a280-57b073bf78ea',
ns=nsattr)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
@mock.patch('cryptography.fernet.Fernet.decrypt')
def test_build_vim_auth_barbican_external(
self, mock_decrypt, mock_validate, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect()
self._mock_external_token_api()
barbican_uuid = 'test_uuid'
mock_validate.return_value = barbican_uuid
vim_dict = {'id': 'aaaa', 'name': 'VIM0', 'type': 'test_vim',
'auth_cred': {'username': 'test',
'user_domain_name': 'test',
'cert_verify': 'True',
'project_id': 'test',
'project_name': 'test',
'project_domain_name': 'test',
'auth_url': 'http://test/identity/v3',
'key_type': 'barbican_key',
'secret_uuid': '***',
'password': '***'},
'auth_url': 'http://127.0.0.1/identity/v3',
'placement_attr': {'regions': ['TestRegionOne']},
'tenant_id': 'test'}
def mock_barbican_resp(request, context):
auth_value = 'Bearer %s' % self.access_token
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
response = {
'name': 'AES key',
'expiration': '2023-01-13T19:14:44.180394',
'algorithm': 'aes',
'bit_length': 256,
'mode': 'cbc',
'payload': 'YmVlcg==',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64'
}
context.status_code = 200
return response
self.requests_mock.get('http://demo/barbican/v1/secrets/%s' %
barbican_uuid,
json=mock_barbican_resp)
def mock_barbican_payload_resp(request, context):
auth_value = 'Bearer %s' % self.access_token
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
response = '5cJeztZKzISf1JAt73oBeTPPCrymn96A3wqG96F4RxU='
context.status_code = 200
return response
def mock_get_barbican_resp(request, context):
auth_value = 'Bearer %s' % self.access_token
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
response = {
"versions": {
"values": [
{
"id": "v1",
"status": "stable",
"links": [
{
"rel": "self",
"href": "http://demo/barbican/v1/"
},
{
"rel": "describedby",
"type": "text/html",
"href": "https://docs.openstack.org/"}
],
"media-types": [
{
"base": "application/json",
"type": "application/"
"vnd.openstack.key-manager-v1+json"
}
]
}
]
}
}
return response
self.requests_mock.get('http://demo/barbican/v1/secrets/%s/payload' %
barbican_uuid,
json=mock_barbican_payload_resp)
self.requests_mock.get('http://demo/barbican',
json=mock_get_barbican_resp)
mock_decrypt.return_value = 'test'.encode('utf-8')
self.nfvo_plugin._build_vim_auth(vim_dict)

View File

@ -15,13 +15,45 @@
from unittest import mock
from castellan.common.credentials import keystone_password
from oslo_config import cfg
from oslo_context import context as oslo_context
from testtools import matchers
from tacker.common.ext_oauth2_auth import ExtOAuth2Auth
from tacker import context
from tacker.tests import base
def get_mock_conf_key_effect(cfg_keystone_authtoken=None,
cfg_ext_oauth2_auth=None):
def mock_conf_key_effect(name):
if name == 'keystone_authtoken':
return MockConfig(name, conf=cfg_keystone_authtoken)
elif name == 'ext_oauth2_auth':
return MockConfig(name, conf=cfg_ext_oauth2_auth)
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(cfg.OptGroup):
def __init__(self, name, conf=None):
self.name = name
self.conf = conf
def __getattr__(self, name):
if not self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
if name not in self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class TestTackerContext(base.BaseTestCase):
def setUp(self):
@ -131,3 +163,66 @@ class TestTackerContext(base.BaseTestCase):
self.assertEqual(req_id_before,
oslo_context.get_current().request_id)
self.assertNotEqual(req_id_before, ctx_admin.request_id)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_generate_tacker_service_context_keystone(self, mock_get_conf_key):
password = 'test_password'
auth_url = 'http://keystone/test/auth_url'
username = 'test_user_name'
user_domain_name = 'test_user_domain_name'
project_name = 'test_project_name'
project_domain_name = 'test_project_domain_name'
token_endpoint = 'http://demo/token'
auth_method = 'client_secret_basic'
client_id = 'test_client_id'
client_secret = 'client_secret'
scope = 'tacker'
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
cfg_keystone_authtoken={
'password': password,
'auth_url': auth_url,
'username': username,
'user_domain_name': user_domain_name,
'project_name': project_name,
'project_domain_name': project_domain_name},
cfg_ext_oauth2_auth={
'token_endpoint': token_endpoint,
'auth_method': auth_method,
'client_id': client_id,
'client_secret': client_secret,
'scope': scope,
'use_ext_oauth2_auth': False}
)
auth_context = context.generate_tacker_service_context()
self.assertIsInstance(auth_context, keystone_password.KeystonePassword)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_generate_tacker_service_context_external(self, mock_get_conf_key):
password = 'test_password'
auth_url = 'http://keystone/test/auth_url'
username = 'test_user_name'
user_domain_name = 'test_user_domain_name'
project_name = 'test_project_name'
project_domain_name = 'test_project_domain_name'
token_endpoint = 'http://demo/token'
auth_method = 'client_secret_basic'
client_id = 'test_client_id'
client_secret = 'client_secret'
scope = 'tacker'
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
cfg_keystone_authtoken={
'password': password,
'auth_url': auth_url,
'username': username,
'user_domain_name': user_domain_name,
'project_name': project_name,
'project_domain_name': project_domain_name},
cfg_ext_oauth2_auth={
'token_endpoint': token_endpoint,
'auth_method': auth_method,
'client_id': client_id,
'client_secret': client_secret,
'scope': scope,
'use_ext_oauth2_auth': True})
auth_context = context.generate_tacker_service_context()
self.assertIsInstance(auth_context, ExtOAuth2Auth)

View File

@ -13,16 +13,70 @@
from sqlalchemy.orm import exc as orm_exc
from unittest import mock
from oslo_config import cfg
from requests_mock.contrib import fixture as rm_fixture
from tacker.extensions import nfvo
from tacker.keymgr import API as KEYMGR_API
from tacker import manager
from tacker.tests.unit import base
from tacker.vnfm import vim_client
def get_mock_conf_key_effect():
def mock_conf_key_effect(name):
if name == 'keystone_authtoken':
return MockConfig(conf=None)
elif name == 'ext_oauth2_auth':
return MockConfig(
conf={
'use_ext_oauth2_auth': True,
'token_endpoint': 'http://demo/token_endpoint',
'auth_method': 'client_secret_post',
'client_id': 'client_id',
'client_secret': 'client_secret',
'scope': 'client_secret'
})
elif name == 'key_manager':
conf = {
'api_class': 'tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager',
'barbican_endpoint': 'http://demo/barbican',
'barbican_version': 'v1'
}
return MockConfig(conf=conf)
elif name == 'k8s_vim':
return MockConfig(
conf={
'use_barbican': True
})
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
if name not in self.conf:
raise cfg.NoSuchOptError('not found %s' % name)
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class TestVIMClient(base.TestCase):
def setUp(self):
super(TestVIMClient, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture())
KEYMGR_API('')
self.access_token = 'access_token_uuid'
self.vim_info = {'id': 'aaaa', 'name': 'VIM0', 'type': 'test_vim',
'auth_cred': {'password': '****'},
'auth_url': 'http://127.0.0.1/identity/v3',
@ -32,6 +86,22 @@ class TestVIMClient(base.TestCase):
self.service_plugins = mock.Mock()
self.nfvo_plugin = mock.Mock()
def _mock_external_token_api(self):
def mock_token_resp(request, context):
response = {
'access_token': self.access_token,
'expires_in': 1800,
'refresh_expires_in': 0,
'token_type': 'Bearer',
'not-before-policy': 0,
'scope': 'tacker_api'
}
context.status_code = 200
return response
self.requests_mock.post('http://demo/token_endpoint',
json=mock_token_resp)
def test_get_vim_without_defined_default_vim(self):
self.nfvo_plugin.get_default_vim.side_effect = \
orm_exc.NoResultFound()
@ -138,3 +208,103 @@ class TestVIMClient(base.TestCase):
vim_regions = ['TestRegionOne', 'TestRegionTwo']
region_name = 'TestRegionOne'
self.assertTrue(self.vimclient.region_valid(vim_regions, region_name))
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
@mock.patch('cryptography.fernet.Fernet.decrypt')
def test_get_vim_extenal(self, mock_decrypt, mock_validate,
mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect()
self._mock_external_token_api()
barbican_uuid = 'test_uuid'
mock_validate.return_value = barbican_uuid
vim_info = {'id': 'aaaa', 'name': 'VIM0', 'type': 'test_vim',
'auth_cred': {'username': 'test',
'user_domain_name': 'test',
'cert_verify': 'True',
'project_id': 'test',
'project_name': 'test',
'project_domain_name': 'test',
'auth_url': 'http://test/identity/v3',
'key_type': 'barbican_key',
'secret_uuid': '***',
'password': '***'},
'auth_url': 'http://127.0.0.1/identity/v3',
'placement_attr': {'regions': ['TestRegionOne']},
'tenant_id': 'test'}
self.nfvo_plugin.get_vim.return_value = vim_info
self.service_plugins.get.return_value = self.nfvo_plugin
def mock_barbican_resp(request, context):
auth_value = 'Bearer %s' % self.access_token
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
response = {
'name': 'AES key',
'expiration': '2023-01-13T19:14:44.180394',
'algorithm': 'aes',
'bit_length': 256,
'mode': 'cbc',
'payload': 'YmVlcg==',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64'
}
context.status_code = 200
return response
self.requests_mock.get('http://demo/barbican/v1/secrets/%s' %
barbican_uuid,
json=mock_barbican_resp)
def mock_barbican_payload_resp(request, context):
auth_value = 'Bearer %s' % self.access_token
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
response = '5cJeztZKzISf1JAt73oBeTPPCrymn96A3wqG96F4RxU='
context.status_code = 200
return response
def mock_get_barbican_resp(request, context):
auth_value = 'Bearer %s' % self.access_token
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
response = {
"versions": {
"values": [
{
"id": "v1",
"status": "stable",
"links": [
{
"rel": "self",
"href": "http://demo/barbican/v1/"
},
{
"rel": "describedby",
"type": "text/html",
"href": "https://docs.openstack.org/"}
],
"media-types": [
{
"base": "application/json",
"type": "application/"
"vnd.openstack.key-manager-v1+json"
}
]
}
]
}
}
return response
self.requests_mock.get('http://demo/barbican/v1/secrets/%s/payload' %
barbican_uuid,
json=mock_barbican_payload_resp)
self.requests_mock.get('http://demo/barbican',
json=mock_get_barbican_resp)
mock_decrypt.return_value = 'test'.encode('utf-8')
with mock.patch.object(manager.TackerManager, 'get_service_plugins',
return_value=self.service_plugins):
self.vimclient.get_vim(None,
vim_id=self.vim_info['id'],
region_name='TestRegionOne')

View File

@ -19,7 +19,6 @@ from cryptography import fernet
from oslo_config import cfg
from oslo_log import log as logging
from tacker.common import utils
from tacker import context as t_context
from tacker.extensions import nfvo
@ -120,9 +119,11 @@ class VimClient(object):
"""
cred = secret_value.encode('utf-8')
if auth.get('key_type') == 'barbican_key':
keystone_conf = CONF.keystone_authtoken
secret_uuid = auth['secret_uuid']
keymgr_api = KEYMGR_API(keystone_conf.auth_url)
if CONF.ext_oauth2_auth.use_ext_oauth2_auth:
keymgr_api = KEYMGR_API(CONF.ext_oauth2_auth.token_endpoint)
else:
keymgr_api = KEYMGR_API(CONF.keystone_authtoken.auth_url)
k_context = t_context.generate_tacker_service_context()
secret_obj = keymgr_api.get(k_context, secret_uuid)
vim_key = secret_obj.payload

View File

@ -22,3 +22,4 @@ python-blazarclient>=1.0.1 # Apache-2.0
requests-mock>=1.2.0 # Apache-2.0
PyMySQL>=0.10.1 # MIT
freezegun>=1.2.2 # Apache-2.0
PyJWT>=2.4.0 # MIT