Use authentication with keystone ec2token api

As a result of fixing OSSA-2025-002, ec2tokens API in Keystone
now by default requires authentication.

The config section `[ec2authtoken]` is now expected to have
auth information for Heat to be able to use this API.

In multicloud configuration keystone auth credentials are
required for each cloud. These can be configured using the new clouds
option and the ``[ec2authtoken.{cloud}]`` sections.

NOTE:
 Disables test_software_config.ParallelDeploymentsTest in
 grenade. Should follow up to re-enable once fix has been
 backported.

Related-Bug: #2119646
Change-Id: Ib41f76c1ba56005b6c4233424cca5768657a7686
Co-Authored-By: Adrian Jarvis <adrian.jarvis@catalystcloud.nz>
Co-Authored-By: Takashi Kajinami <kajinamit@oss.nttdata.com>
Signed-off-by: Pavlo Shchelokovskyy <shchelokovskyy@gmail.com>
This commit is contained in:
Pavlo Shchelokovskyy
2025-11-04 16:41:47 +00:00
committed by Takashi Kajinami
parent 4134b05868
commit 0242c28d5f
17 changed files with 755 additions and 233 deletions

View File

@@ -25,6 +25,7 @@ repos:
hooks:
- id: bandit
args: ['-c', 'pyproject.toml']
additional_dependencies: ["bandit[toml]"]
- repo: https://github.com/PyCQA/doc8
rev: v1.1.2
hooks:

View File

@@ -150,6 +150,7 @@ function configure_heat {
iniset $HEAT_CONF clients_heat url "$SERVICE_PROTOCOL://$HEAT_API_HOST/heat-api/v1/%(tenant_id)s"
else
configure_keystone_authtoken_middleware $HEAT_CONF heat
configure_keystone_authtoken_middleware $HEAT_CONF heat ec2authtoken
fi
# If HEAT_DEFERRED_AUTH is unset or explicitly set to trusts, configure

View File

@@ -36,6 +36,9 @@ function _heat_set_user {
OS_PROJECT_DOMAIN_ID=$DEFAULT_DOMAIN
}
# TODO: Add this test back once https://review.opendev.org/966092
# has landed and backported.
# heat_tempest_plugin.tests.functional.test_software_config.ParallelDeploymentsTest
function _write_heat_integrationtests {
local upgrade_tests=$1
cat > $upgrade_tests <<EOF
@@ -46,7 +49,6 @@ heat_integrationtests.functional.test_resource_group.ResourceGroupTest
heat_integrationtests.functional.test_resource_group.ResourceGroupUpdatePolicyTest
heat_integrationtests.functional.test_software_deployment_group
heat_integrationtests.functional.test_validation
heat_tempest_plugin.tests.functional.test_software_config.ParallelDeploymentsTest
heat_tempest_plugin.tests.functional.test_nova_server_networks
EOF
}

View File

@@ -358,7 +358,7 @@ Install and configure components
Replace ``RABBIT_PASS`` with the password you chose for the
``openstack`` account in ``RabbitMQ``.
* In the ``[keystone_authtoken]``, ``[trustee]``,
* In the ``[keystone_authtoken]``, ``[ec2authtoken]``, ``[trustee]``,
and ``[clients_keystone]`` sections,
configure Identity service access:
@@ -376,6 +376,16 @@ Install and configure components
username = heat
password = HEAT_PASS
[ec2authtoken]
...
auth_url = http://controller:5000
auth_type = password
project_domain_name = Default
user_domain_name = Default
project_name = service
username = heat
password = HEAT_PASS
[trustee]
...
auth_type = password

View File

@@ -347,7 +347,7 @@ Install and configure components
Replace ``RABBIT_PASS`` with the password you chose for the
``openstack`` account in ``RabbitMQ``.
* In the ``[keystone_authtoken]``, ``[trustee]`` and
* In the ``[keystone_authtoken]``, ``[ec2authtoken]``, ``[trustee]`` and
``[clients_keystone]`` sections,
configure Identity service access:
@@ -365,6 +365,16 @@ Install and configure components
username = heat
password = HEAT_PASS
[ec2authtoken]
...
auth_url = http://controller:5000
auth_type = password
project_domain_name = Default
user_domain_name = Default
project_name = service
username = heat
password = HEAT_PASS
[trustee]
...
auth_type = password

View File

@@ -12,14 +12,21 @@
# under the License.
import hashlib
import itertools
from oslo_config import cfg
from oslo_config import types
from oslo_log import log as logging
from oslo_serialization import jsonutils as json
import requests
from oslo_utils import strutils
import webob
from keystoneauth1 import adapter as ks_adapter
from keystoneauth1 import exceptions as ks_exceptions
from keystoneauth1 import loading as ks_loading
from keystoneauth1 import noauth as ks_noauth
from keystoneauth1 import session as ks_session
from heat.api.aws import exception
from heat.common import endpoint_utils
from heat.common.i18n import _
@@ -35,6 +42,12 @@ opts = [
cfg.BoolOpt('multi_cloud',
default=False,
help=_('Allow orchestration of multiple clouds.')),
cfg.ListOpt('clouds',
default=[],
help=_('A list of names of clouds when multicloud is enabled. '
'At least one should be defined when multi_cloud is '
'enabled. For each name there must be a section '
'[ec2authtoken.<name>] with keystone auth settings.'),),
cfg.ListOpt('allowed_auth_uris',
default=[],
item_type=types.URI(schemes=['http', 'https']),
@@ -48,16 +61,14 @@ opts = [
'private key.')),
cfg.StrOpt('ca_file',
help=_('Optional CA cert file to use in SSL connections.')),
cfg.BoolOpt('insecure',
default=False,
help=_('If set, then the server\'s certificate will not '
'be verified.')),
cfg.FloatOpt('timeout',
default=60,
min=0,
help=_('Timeout in seconds for HTTP requests.')),
]
cfg.CONF.register_opts(opts, group='ec2authtoken')
ks_loading.register_auth_conf_options(cfg.CONF, 'ec2authtoken')
ks_loading.register_session_conf_options(
cfg.CONF, 'ec2authtoken')
ks_loading.register_adapter_conf_options(cfg.CONF, 'ec2authtoken')
cfg.CONF.set_default('service_type', 'identity', group='ec2authtoken')
class EC2Token(wsgi.Middleware):
@@ -66,7 +77,92 @@ class EC2Token(wsgi.Middleware):
def __init__(self, app, conf):
self.conf = conf
self.application = app
self._ssl_options = None
self._ks_adapters = self._create_keystone_adapters()
def _register_ks_opts(self, cfg_group):
ks_loading.register_auth_conf_options(cfg.CONF, cfg_group)
ks_loading.register_session_conf_options(cfg.CONF, cfg_group)
ks_loading.register_adapter_conf_options(cfg.CONF, cfg_group)
cfg.CONF.set_default('service_type', 'identity', group=cfg_group)
def _create_ks_adapter(self, cfg_group):
auth = ks_loading.load_auth_from_conf_options(
cfg.CONF, cfg_group)
session = ks_loading.load_session_from_conf_options(
cfg.CONF, cfg_group, auth=auth)
return ks_loading.load_adapter_from_conf_options(
cfg.CONF, cfg_group, session=session)
def _create_noauth_ks_adapter(self, auth_url):
insecure = strutils.bool_from_string(self._conf_get('insecure'))
certfile = self._conf_get('cert_file')
keyfile = self._conf_get('key_file')
cafile = self._conf_get('ca_file')
verify = False
cert = None
if not insecure:
verify = cafile or True
cert = (certfile, keyfile) if keyfile else certfile
auth = ks_noauth.NoAuth()
session = ks_session.Session(
auth=auth,
verify=verify,
cert=cert,
timeout=cfg.CONF.ec2authtoken.timeout,
)
return ks_adapter.Adapter(session=session,
endpoint_override=auth_url)
def _create_keystone_adapters(self):
# Create a keystone adapters for each auth_uri to make requests
# against the v3/ec2token endpoint.
ks_adapters = {}
if self._conf_get('multi_cloud'):
allowed_auth_uris = self._conf_get('allowed_auth_uris')
clouds = self._conf_get('clouds')
if clouds:
# match each clouds value with an
# ec2authtoken.<value> section.
for cloud in clouds:
cfg_group = f'ec2authtoken.{cloud}'
self._register_ks_opts(cfg_group)
ks_adapters[cloud] = self._create_ks_adapter(cfg_group)
elif allowed_auth_uris:
LOG.warning(
'ec2tokens API calls will be unauthenticated because of '
'legacy allowed_auth_uris being used. The API call may be '
'rejected by keystone due to recent policy change.')
for auth_uri in allowed_auth_uris:
ks_adapters[auth_uri] = self._create_noauth_ks_adapter(
self._strip_ec2tokens_uri(auth_uri))
else:
LOG.error(
'Configuration multi_cloud enabled but neither '
'allowed_auth_uris or clouds set. '
'ec2tokenauth will not be able to validate EC2 '
'credentials.'
)
else:
adapter = self._create_ks_adapter('ec2authtoken')
if adapter.session.auth and adapter.get_endpoint():
ks_adapters[None] = adapter
else:
LOG.warning(
'The [ec2authtoken] section does not include details to '
'detect endpoint url. Using the legacy endpoint detection '
'and API call without authentication. This may be '
'rejected by keystone due to recent policy change.')
auth_uri = self._conf_get_auth_uri()
if auth_uri:
adapter = self._create_noauth_ks_adapter(
self._strip_ec2tokens_uri(auth_uri))
ks_adapters[None] = adapter
return ks_adapters
def _conf_get(self, name):
# try config from paste-deploy first
@@ -75,20 +171,21 @@ class EC2Token(wsgi.Middleware):
else:
return cfg.CONF.ec2authtoken[name]
def _strip_ec2tokens_uri(self, auth_uri):
# NOTE(tkajinam): Due to heat accepted auth_uri with full URI for
# ec2tokens API, we strip the uri part here. This will be removed
# when auth_uri is removed.
auth_uri = auth_uri.replace('v2.0', 'v3')
for suffix in ['/', '/ec2tokens', '/v3']:
if auth_uri.endswith(suffix):
auth_uri = auth_uri.rsplit(suffix, 1)[0]
return auth_uri
def _conf_get_auth_uri(self):
auth_uri = self._conf_get('auth_uri')
if auth_uri:
return auth_uri.replace('v2.0', 'v3')
else:
return endpoint_utils.get_auth_uri()
@staticmethod
def _conf_get_keystone_ec2_uri(auth_uri):
if auth_uri.endswith('ec2tokens'):
return auth_uri
if auth_uri.endswith('/'):
return '%sec2tokens' % auth_uri
return '%s/ec2tokens' % auth_uri
return endpoint_utils.get_auth_uri()
def _get_signature(self, req):
"""Extract the signature from the request.
@@ -130,7 +227,7 @@ class EC2Token(wsgi.Middleware):
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
if not self._conf_get('multi_cloud'):
return self._authorize(req, self._conf_get_auth_uri())
return self._authorize(req, None)
else:
# attempt to authorize for each configured allowed_auth_uris
# until one is successful.
@@ -138,29 +235,28 @@ class EC2Token(wsgi.Middleware):
# 1. AWSAccessKeyId is a randomly generated sequence
# 2. No secret is transferred to validate a request
last_failure = None
for auth_uri in self._conf_get('allowed_auth_uris'):
try:
LOG.debug("Attempt authorize on %s" % auth_uri)
return self._authorize(req, auth_uri)
except exception.HeatAPIException as e:
LOG.debug("Authorize failed: %s" % e.__class__)
last_failure = e
clouds = self._conf_get('clouds')
if clouds:
for cloud in clouds:
try:
LOG.debug("Attempt authorize on %s" % cloud)
return self._authorize(req, cloud)
except exception.HeatAPIException as e:
LOG.debug("Authorize failed: %s" % e.__class__)
last_failure = e
else:
for auth_uri in self._conf_get('allowed_auth_uris'):
try:
LOG.debug("Attempt authorize on %s" % auth_uri)
return self._authorize(req, auth_uri)
except exception.HeatAPIException as e:
LOG.debug("Authorize failed: %s" % e.__class__)
last_failure = e
raise last_failure or exception.HeatAccessDeniedError()
@property
def ssl_options(self):
if not self._ssl_options:
cacert = self._conf_get('ca_file')
insecure = self._conf_get('insecure')
cert = self._conf_get('cert_file')
key = self._conf_get('key_file')
self._ssl_options = {
'verify': cacert if cacert else not insecure,
'cert': (cert, key) if cert else None
}
return self._ssl_options
def _authorize(self, req, auth_uri):
def _authorize(self, req, cloud):
# Read request signature and access id.
# If we find X-Auth-User in the headers we ignore a key error
# here so that we can use both authentication methods.
@@ -185,12 +281,13 @@ class EC2Token(wsgi.Middleware):
raise exception.HeatMissingAuthenticationTokenError()
LOG.info("AWS credentials found, checking against keystone.")
if not auth_uri:
LOG.error("Ec2Token authorization failed, no auth_uri "
"specified in config file")
adapter = self._ks_adapters.get(cloud)
if not adapter:
LOG.error("Ec2Token authorization failed due to missing "
"keystone auth configuration for %s" % cloud)
raise exception.HeatInternalFailureError(_('Service '
'misconfigured'))
# Make a copy of args for authentication and signature verification.
auth_params = dict(req.params)
# 'Signature' param Not part of authentication args
@@ -211,21 +308,23 @@ class EC2Token(wsgi.Middleware):
creds_json = json.dumps(creds)
headers = {'Content-Type': 'application/json'}
keystone_ec2_uri = self._conf_get_keystone_ec2_uri(auth_uri)
timeout = self._conf_get('timeout')
keystone_uri = adapter.get_endpoint()
keystone_ec2_uri = keystone_uri + '/v3/ec2tokens'
LOG.info('Authenticating with %s', keystone_ec2_uri)
response = requests.post(keystone_ec2_uri, data=creds_json,
headers=headers,
verify=self.ssl_options['verify'],
cert=self.ssl_options['cert'],
timeout=timeout)
result = response.json()
LOG.debug('Sending ec2tokens API request to %s using auth plugin %s',
cloud, adapter.session.auth)
try:
response = adapter.post(keystone_ec2_uri, data=creds_json,
headers=headers)
result = response.json()
token_id = response.headers['X-Subject-Token']
tenant = result['token']['project']['name']
tenant_id = result['token']['project']['id']
roles = [role['name']
for role in result['token'].get('roles', [])]
except ks_exceptions.Unauthorized:
LOG.error("Failed to obtain a Keystone token from %s", cloud)
raise exception.HeatAccessDeniedError()
except (AttributeError, KeyError):
LOG.info("AWS authentication failure.")
# Try to extract the reason for failure so we can return the
@@ -234,11 +333,20 @@ class EC2Token(wsgi.Middleware):
reason = result['error']['message']
except KeyError:
reason = None
# Keystone will return a 401 request for each of the following
# reasons so we have to check the error message
if reason == "EC2 access key not found.":
raise exception.HeatInvalidClientTokenIdError()
elif reason == "EC2 signature not supplied.":
raise exception.HeatSignatureError()
elif (reason ==
"The request you have made requires authentication."):
# We tried to make an unauthenticated requests to Keystone
LOG.error(
"Keystone endpoint %s requires authentication",
keystone_ec2_uri
)
raise exception.HeatAccessDeniedError()
else:
raise exception.HeatAccessDeniedError()
else:
@@ -251,7 +359,7 @@ class EC2Token(wsgi.Middleware):
req.headers['X-Auth-Token'] = token_id
req.headers['X-Tenant-Name'] = tenant
req.headers['X-Tenant-Id'] = tenant_id
req.headers['X-Auth-URL'] = auth_uri
req.headers['X-Auth-URL'] = keystone_uri
req.headers['X-Roles'] = ','.join(roles)
@@ -270,4 +378,10 @@ def EC2Token_filter_factory(global_conf, **local_conf):
def list_opts():
yield 'ec2authtoken', opts
yield 'ec2authtoken', itertools.chain(
opts,
ks_loading.get_auth_common_conf_options(),
ks_loading.get_auth_plugin_conf_options('v3password'),
ks_loading.get_session_conf_options(),
ks_loading.get_adapter_conf_options()
)

View File

@@ -0,0 +1,9 @@
[ec2authtoken]
auth_type = password
auth_url = http://key1.example.com:5000/v3
username = alice
password = secret
user_domain_name = Default
project_name = service
project_domain_name = Default
endpoint_override = http://key1.example.com:5000

View File

@@ -0,0 +1,6 @@
[keystone_authtoken]
auth_type = password
auth_url = http://key1.example.com:5000/v2.0
username = alice
password = secret
project_name = service

View File

@@ -0,0 +1,14 @@
[ec2authtoken]
[clients_keystone]
auth_uri = http://key1.example.com/identity
[keystone_authtoken]
auth_type = password
interface = public
auth_url = http://key1.example.com/identity
username = alice
password = secret
user_domain_name = Default
project_name = service
project_domain_name = Default

View File

@@ -0,0 +1,10 @@
[ec2authtoken]
multi_cloud = True
allowed_auth_uris = http://key1.example.com:5000/v2.0,http://key2.example.com:5000/v3
[ec2authtoken]
auth_type = password
auth_url = http://key1.example.com:5000/v2.0
username = alice
password = secret
project_name = service

View File

@@ -0,0 +1,21 @@
[ec2authtoken]
multi_cloud = True
clouds = alice,bob
[ec2authtoken.alice]
auth_type = password
auth_url = http://key1.example.com:5000/v2.0
username = alice
password = secret
project_name = service
endpoint_override = http://key1.example.com:5000
[ec2authtoken.bob]
auth_type = v3password
auth_url = http://key2.example.com:5000/v3
username = bob
user_domain_name = default
password = secret
project_name = service
project_domain_name = Default
endpoint_override = http://key2.example.com:5000

View File

@@ -0,0 +1,17 @@
[ec2authtoken.alice]
auth_type = password
auth_url = http://key1.example.com:5000/v2.0
username = alice
password = secret
endpoint_override = http://key1.example.com:5000
[ec2authtoken.bob]
auth_type = v3password
auth_url = http://key2.example.com:5000/v3
username = bob
user_domain_name = default
password = secret
user_domain_name = Default
project_name = service
project_domain_name = Default
endpoint_override = http://key2.example.com:5000

View File

@@ -0,0 +1,14 @@
[ec2authtoken.alice]
auth_type = password
auth_url = http://key1.example.com:5000/v2.0
username = alice
password = secret
project_name = service
[ec2authtoken.bob]
auth_type = password
auth_url = http://key2.example.com:5000/v2.0
username = bob
password = secret
project_name = service

View File

@@ -0,0 +1,15 @@
[ec2authtoken]
; empty we use the uri from the [clients_keystone] section instead
[clients_keystone]
auth_uri = http://key1.example.com:5000
[keystone_authtoken]
auth_type = password
interface = public
auth_url = http://key1.example.com:5000
username = alice
password = secret
user_domain_name = Default
project_name = service
project_domain_name = Default

View File

@@ -0,0 +1,12 @@
[ec2authtoken]
auth_type = password
username = alice
password = secret
user_domain_name = Default
project_name = service
project_domain_name = Default
auth_url = http://key1.example.com:5000/3.0
endpoint_override = http://key1.example.com:5000
[clients_keystone]
auth_uri = http://key1.example.com:5000/3.0

View File

@@ -13,11 +13,16 @@
import json
import pathlib
from unittest import mock
from oslo_config import cfg
from oslo_utils import importutils
import requests
import keystoneauth1.discover
from keystoneauth1 import exceptions as ks_exceptions
import keystoneauth1.loading.conf
from keystoneauth1 import noauth as ks_noauth
import keystoneauth1.session
from heat.api.aws import ec2token
from heat.api.aws import exception
@@ -31,24 +36,29 @@ class Ec2TokenTest(common.HeatTestCase):
def setUp(self):
super(Ec2TokenTest, self).setUp()
self.patchobject(requests, 'post')
def _dummy_GET_request(self, params=None, environ=None):
# Mangle the params dict into a query string
params = params or {}
environ = environ or {}
qs = "&".join(["=".join([k, str(params[k])]) for k in params])
environ.update({'REQUEST_METHOD': 'GET', 'QUERY_STRING': qs})
req = wsgi.Request(environ)
return req
self.mock_adapter = mock.MagicMock(
name='adapter',
spec=('get_endpoint', 'post', 'session'))
self.create_keystone_adapters = self.patchobject(
ec2token.EC2Token, '_create_keystone_adapters')
self.mock_adapter.get_endpoint.return_value = \
'http://key1.example.com:5000'
# Ensure that the various auth urls are available for the tests.
self.create_keystone_adapters.return_value = {
None: self.mock_adapter,
'http://192.0.2.9/v2.0': self.mock_adapter,
'http://192.0.2.9/v3': self.mock_adapter,
'http://key1.example.com:5000/v3': self.mock_adapter,
'http://key1.example.com:5000/v2.0': self.mock_adapter,
'http://key2.example.com:5000/v2.0': self.mock_adapter,
}
def test_conf_get_paste(self):
dummy_conf = {'auth_uri': 'http://192.0.2.9/v2.0'}
ec2 = ec2token.EC2Token(app=None, conf=dummy_conf)
self.assertEqual('http://192.0.2.9/v2.0', ec2._conf_get('auth_uri'))
self.assertEqual(
'http://192.0.2.9/v2.0/ec2tokens',
ec2._conf_get_keystone_ec2_uri('http://192.0.2.9/v2.0'))
'http://192.0.2.9/v3', ec2._conf_get_auth_uri())
def test_conf_get_opts(self):
cfg.CONF.set_default('auth_uri', 'http://192.0.2.9/v2.0/',
@@ -58,60 +68,27 @@ class Ec2TokenTest(common.HeatTestCase):
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertEqual('http://192.0.2.9/v2.0/', ec2._conf_get('auth_uri'))
self.assertEqual(
'http://192.0.2.9/v2.0/ec2tokens',
ec2._conf_get_keystone_ec2_uri('http://192.0.2.9/v2.0/'))
'http://192.0.2.9/v3/', ec2._conf_get_auth_uri())
def test_conf_get_clients_keystone_opts(self):
cfg.CONF.set_default('auth_uri', None, group='ec2authtoken')
cfg.CONF.set_default('auth_uri', 'http://192.0.2.9',
group='clients_keystone')
with mock.patch('keystoneauth1.discover.Discover') as discover:
class MockDiscover(object):
def url_for(self, endpoint):
return 'http://192.0.2.9/v3/'
discover.return_value = MockDiscover()
with mock.patch.object(keystoneauth1.discover, 'Discover') as discover:
discover.return_value.url_for.return_value = 'http://192.0.2.9/v3/'
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertEqual(
'http://192.0.2.9/v3/ec2tokens',
ec2._conf_get_keystone_ec2_uri('http://192.0.2.9/v3/'))
def test_conf_get_ssl_default_options(self):
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertTrue(ec2.ssl_options['verify'],
"SSL verify should be True by default")
self.assertIsNone(ec2.ssl_options['cert'],
"SSL client cert should be None by default")
def test_conf_ssl_insecure_option(self):
ec2 = ec2token.EC2Token(app=None, conf={})
cfg.CONF.set_default('insecure', 'True', group='ec2authtoken')
cfg.CONF.set_default('ca_file', None, group='ec2authtoken')
self.assertFalse(ec2.ssl_options['verify'])
def test_conf_get_ssl_opts(self):
cfg.CONF.set_default('auth_uri', 'https://192.0.2.9/v2.0/',
group='ec2authtoken')
cfg.CONF.set_default('ca_file', '/home/user/cacert.pem',
group='ec2authtoken')
cfg.CONF.set_default('insecure', 'false', group='ec2authtoken')
cfg.CONF.set_default('cert_file', '/home/user/mycert',
group='ec2authtoken')
cfg.CONF.set_default('key_file', '/home/user/mykey',
group='ec2authtoken')
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertEqual('/home/user/cacert.pem', ec2.ssl_options['verify'])
self.assertEqual(('/home/user/mycert', '/home/user/mykey'),
ec2.ssl_options['cert'])
'http://192.0.2.9/v3/', ec2._conf_get_auth_uri())
def test_get_signature_param_old(self):
params = {'Signature': 'foo'}
dummy_req = self._dummy_GET_request(params)
dummy_req = _dummy_GET_request(params)
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertEqual('foo', ec2._get_signature(dummy_req))
def test_get_signature_param_new(self):
params = {'X-Amz-Signature': 'foo'}
dummy_req = self._dummy_GET_request(params)
dummy_req = _dummy_GET_request(params)
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertEqual('foo', ec2._get_signature(dummy_req))
@@ -120,7 +97,7 @@ class Ec2TokenTest(common.HeatTestCase):
('Authorization: foo Credential=foo/bar, '
'SignedHeaders=content-type;host;x-amz-date, '
'Signature=xyz')}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertEqual('xyz', ec2._get_signature(dummy_req))
@@ -129,7 +106,7 @@ class Ec2TokenTest(common.HeatTestCase):
('Authorization: foo Credential=foo/bar, '
'Signature=xyz,'
'SignedHeaders=content-type;host;x-amz-date ')}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertEqual('xyz', ec2._get_signature(dummy_req))
@@ -138,19 +115,19 @@ class Ec2TokenTest(common.HeatTestCase):
('Authorization: foo Credential=foo/bar,'
'SignedHeaders=content-type;host;x-amz-date,'
'Signature=xyz')}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertEqual('xyz', ec2._get_signature(dummy_req))
def test_get_access_param_old(self):
params = {'AWSAccessKeyId': 'foo'}
dummy_req = self._dummy_GET_request(params)
dummy_req = _dummy_GET_request(params)
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertEqual('foo', ec2._get_access(dummy_req))
def test_get_access_param_new(self):
params = {'X-Amz-Credential': 'foo/bar'}
dummy_req = self._dummy_GET_request(params)
dummy_req = _dummy_GET_request(params)
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertEqual('foo', ec2._get_access(dummy_req))
@@ -159,7 +136,7 @@ class Ec2TokenTest(common.HeatTestCase):
('Authorization: foo Credential=foo/bar, '
'SignedHeaders=content-type;host;x-amz-date, '
'Signature=xyz')}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertEqual('foo', ec2._get_access(dummy_req))
@@ -168,7 +145,7 @@ class Ec2TokenTest(common.HeatTestCase):
('Authorization: foo Credential=foo/bar,'
'SignedHeaders=content-type;host;x-amz-date,'
'Signature=xyz')}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertEqual('foo', ec2._get_access(dummy_req))
@@ -177,13 +154,13 @@ class Ec2TokenTest(common.HeatTestCase):
('Authorization: foo '
'SignedHeaders=content-type;host;x-amz-date,'
'Signature=xyz,Credential=foo/bar')}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
ec2 = ec2token.EC2Token(app=None, conf={})
self.assertEqual('foo', ec2._get_access(dummy_req))
def test_call_x_auth_user(self):
req_env = {'HTTP_X_AUTH_USER': 'foo'}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
ec2 = ec2token.EC2Token(app='xyz', conf={})
self.assertEqual('xyz', ec2.__call__(dummy_req))
@@ -191,7 +168,7 @@ class Ec2TokenTest(common.HeatTestCase):
req_env = {'HTTP_AUTHORIZATION':
('Authorization: foo Credential=foo/bar, '
'SignedHeaders=content-type;host;x-amz-date')}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
ec2 = ec2token.EC2Token(app='xyz', conf={})
self.assertRaises(exception.HeatIncompleteSignatureError,
ec2.__call__, dummy_req)
@@ -201,7 +178,7 @@ class Ec2TokenTest(common.HeatTestCase):
('Authorization: foo '
'SignedHeaders=content-type;host;x-amz-date,'
'Signature=xyz')}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
ec2 = ec2token.EC2Token(app='xyz', conf={})
self.assertRaises(exception.HeatMissingAuthenticationTokenError,
ec2.__call__, dummy_req)
@@ -210,7 +187,7 @@ class Ec2TokenTest(common.HeatTestCase):
# If there's no accesskey in params or header, but there is a
# Signature, we expect HeatMissingAuthenticationTokenError
params = {'Signature': 'foo'}
dummy_req = self._dummy_GET_request(params)
dummy_req = _dummy_GET_request(params)
ec2 = ec2token.EC2Token(app='xyz', conf={})
self.assertRaises(exception.HeatMissingAuthenticationTokenError,
ec2.__call__, dummy_req)
@@ -221,13 +198,14 @@ class Ec2TokenTest(common.HeatTestCase):
('Authorization: foo '
'SignedHeaders=content-type;host;x-amz-date,'
'Signature=xyz')}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
ec2 = ec2token.EC2Token(app='xyz', conf={})
self.assertEqual('xyz', ec2.__call__(dummy_req))
def _stub_http_connection(self, headers=None, params=None, response=None,
req_url='http://123:5000/v3/ec2tokens',
verify=True, cert=None, direct_mock=True):
def _stub_http_connection(
self, headers=None, params=None, response=None,
req_url='http://key1.example.com:5000/v3/ec2tokens',
verify=True, cert=None, direct_mock=True):
headers = headers or {}
params = params or {}
@@ -256,16 +234,14 @@ class Ec2TokenTest(common.HeatTestCase):
req_headers = {'Content-Type': 'application/json'}
self.verify_req_url = req_url
self.verify_data = utils.JsonRepr(req_creds)
self.verify_verify = verify
self.verify_cert = cert
self.verify_req_headers = req_headers
if direct_mock:
requests.post.return_value = DummyHTTPResponse()
self.mock_adapter.post.return_value = DummyHTTPResponse()
else:
return DummyHTTPResponse()
def test_call_ok(self):
dummy_conf = {'auth_uri': 'http://123:5000/v2.0'}
dummy_conf = {'auth_uri': 'http://key1.example.com:5000/v2.0'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
auth_str = ('Authorization: foo Credential=foo/bar, '
@@ -275,7 +251,7 @@ class Ec2TokenTest(common.HeatTestCase):
'SERVER_PORT': '8000',
'PATH_INFO': '/v1',
'HTTP_AUTHORIZATION': auth_str}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
ok_resp = json.dumps({'token': {
'project': {'name': 'tenant', 'id': 'abcd1234'}}})
@@ -285,14 +261,12 @@ class Ec2TokenTest(common.HeatTestCase):
self.assertEqual('tenant', dummy_req.headers['X-Tenant-Name'])
self.assertEqual('abcd1234', dummy_req.headers['X-Tenant-Id'])
requests.post.assert_called_once_with(
self.mock_adapter.post.assert_called_once_with(
self.verify_req_url, data=self.verify_data,
verify=self.verify_verify,
cert=self.verify_cert, headers=self.verify_req_headers,
timeout=60)
headers=self.verify_req_headers)
def test_call_ok_roles(self):
dummy_conf = {'auth_uri': 'http://123:5000/v2.0'}
dummy_conf = {'auth_uri': 'http://key1.example.com:5000/v2.0'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
auth_str = ('Authorization: foo Credential=foo/bar, '
@@ -302,7 +276,7 @@ class Ec2TokenTest(common.HeatTestCase):
'SERVER_PORT': '8000',
'PATH_INFO': '/v1',
'HTTP_AUTHORIZATION': auth_str}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
ok_resp = json.dumps({
'token': {
@@ -315,14 +289,12 @@ class Ec2TokenTest(common.HeatTestCase):
self.assertEqual('woot', ec2.__call__(dummy_req))
self.assertEqual('aa,bb,cc', dummy_req.headers['X-Roles'])
requests.post.assert_called_once_with(
self.mock_adapter.post.assert_called_once_with(
self.verify_req_url, data=self.verify_data,
verify=self.verify_verify,
cert=self.verify_cert, headers=self.verify_req_headers,
timeout=60)
headers=self.verify_req_headers)
def test_call_err_tokenid(self):
dummy_conf = {'auth_uri': 'http://123:5000/v2.0/'}
dummy_conf = {'auth_uri': 'http://key1.example.com:5000/v2.0/'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
auth_str = ('Authorization: foo Credential=foo/bar, '
@@ -332,7 +304,7 @@ class Ec2TokenTest(common.HeatTestCase):
'SERVER_PORT': '8000',
'PATH_INFO': '/v1',
'HTTP_AUTHORIZATION': auth_str}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
err_msg = "EC2 access key not found."
err_resp = json.dumps({'error': {'message': err_msg}})
@@ -341,14 +313,12 @@ class Ec2TokenTest(common.HeatTestCase):
self.assertRaises(exception.HeatInvalidClientTokenIdError,
ec2.__call__, dummy_req)
requests.post.assert_called_once_with(
self.mock_adapter.post.assert_called_once_with(
self.verify_req_url, data=self.verify_data,
verify=self.verify_verify,
cert=self.verify_cert, headers=self.verify_req_headers,
timeout=60)
headers=self.verify_req_headers)
def test_call_err_signature(self):
dummy_conf = {'auth_uri': 'http://123:5000/v2.0'}
dummy_conf = {'auth_uri': 'http://key1.example.com:5000/v2.0'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
auth_str = ('Authorization: foo Credential=foo/bar, '
@@ -358,7 +328,7 @@ class Ec2TokenTest(common.HeatTestCase):
'SERVER_PORT': '8000',
'PATH_INFO': '/v1',
'HTTP_AUTHORIZATION': auth_str}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
err_msg = "EC2 signature not supplied."
err_resp = json.dumps({'error': {'message': err_msg}})
@@ -367,14 +337,12 @@ class Ec2TokenTest(common.HeatTestCase):
self.assertRaises(exception.HeatSignatureError,
ec2.__call__, dummy_req)
requests.post.assert_called_once_with(
self.mock_adapter.post.assert_called_once_with(
self.verify_req_url, data=self.verify_data,
verify=self.verify_verify,
cert=self.verify_cert, headers=self.verify_req_headers,
timeout=60)
headers=self.verify_req_headers)
def test_call_err_denied(self):
dummy_conf = {'auth_uri': 'http://123:5000/v2.0'}
dummy_conf = {'auth_uri': 'http://key1.example.com:5000/v2.0'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
auth_str = ('Authorization: foo Credential=foo/bar, '
@@ -384,7 +352,7 @@ class Ec2TokenTest(common.HeatTestCase):
'SERVER_PORT': '8000',
'PATH_INFO': '/v1',
'HTTP_AUTHORIZATION': auth_str}
dummy_req = self._dummy_GET_request(environ=req_env)
dummy_req = _dummy_GET_request(environ=req_env)
err_resp = json.dumps({})
self._stub_http_connection(headers={'Authorization': auth_str},
@@ -392,20 +360,65 @@ class Ec2TokenTest(common.HeatTestCase):
self.assertRaises(exception.HeatAccessDeniedError,
ec2.__call__, dummy_req)
requests.post.assert_called_once_with(
self.mock_adapter.post.assert_called_once_with(
self.verify_req_url, data=self.verify_data,
verify=self.verify_verify,
cert=self.verify_cert, headers=self.verify_req_headers,
timeout=60)
headers=self.verify_req_headers)
def test_call_err_unauthorized(self):
# test when Keystone returns unauthenticated error.
dummy_conf = {'auth_uri': 'http://key1.example.com:5000/v3'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
auth_str = ('Authorization: foo Credential=foo/bar, '
'SignedHeaders=content-type;host;x-amz-date, '
'Signature=xyz')
req_env = {'SERVER_NAME': 'heat',
'SERVER_PORT': '8000',
'PATH_INFO': '/v1',
'HTTP_AUTHORIZATION': auth_str}
dummy_req = _dummy_GET_request(environ=req_env)
msg = "The request you have made requires authentication."
bad_resp = json.dumps({'error': {'message': msg}})
self._stub_http_connection(headers={'Authorization': auth_str},
response=bad_resp)
self.assertRaises(exception.HeatAccessDeniedError,
ec2.__call__, dummy_req)
self.mock_adapter.post.assert_called_once_with(
self.verify_req_url, data=self.verify_data,
headers=self.verify_req_headers)
def test_call_err_ks_plugin_unauthorized(self):
# test when the keystone session fails to auth while obtaining
# an auth token
dummy_conf = {'auth_uri': 'http://key1.example.com:5000/v3'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
auth_str = ('Authorization: foo Credential=foo/bar, '
'SignedHeaders=content-type;host;x-amz-date, '
'Signature=xyz')
req_env = {'SERVER_NAME': 'heat',
'SERVER_PORT': '8000',
'PATH_INFO': '/v1',
'HTTP_AUTHORIZATION': auth_str}
dummy_req = _dummy_GET_request(environ=req_env)
self._stub_http_connection(headers={'Authorization': auth_str},
response={})
self.mock_adapter.post.side_effect = ks_exceptions.Unauthorized()
self.assertRaises(exception.HeatAccessDeniedError,
ec2.__call__, dummy_req)
self.mock_adapter.post.assert_called_once_with(
self.verify_req_url, data=self.verify_data,
headers=self.verify_req_headers)
def test_call_ok_v2(self):
dummy_conf = {'auth_uri': 'http://123:5000/v2.0'}
dummy_conf = {'auth_uri': 'http://key1.example.com:5000/v2.0'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
params = {'AWSAccessKeyId': 'foo', 'Signature': 'xyz'}
req_env = {'SERVER_NAME': 'heat',
'SERVER_PORT': '8000',
'PATH_INFO': '/v1'}
dummy_req = self._dummy_GET_request(params, req_env)
dummy_req = _dummy_GET_request(params, req_env)
ok_resp = json.dumps({'token': {
'project': {'name': 'tenant', 'id': 'abcd1234'}}})
@@ -413,16 +426,15 @@ class Ec2TokenTest(common.HeatTestCase):
params={'AWSAccessKeyId': 'foo'})
self.assertEqual('woot', ec2.__call__(dummy_req))
requests.post.assert_called_once_with(
self.mock_adapter.post.assert_called_once_with(
self.verify_req_url, data=self.verify_data,
verify=self.verify_verify,
cert=self.verify_cert, headers=self.verify_req_headers,
timeout=60)
headers=self.verify_req_headers)
def test_call_ok_multicloud(self):
dummy_conf = {
'allowed_auth_uris': [
'http://123:5000/v2.0', 'http://456:5000/v2.0'],
'http://key1.example.com:5000/v2.0',
'http://key2.example.com:5000/v2.0'],
'multi_cloud': True
}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
@@ -430,40 +442,48 @@ class Ec2TokenTest(common.HeatTestCase):
req_env = {'SERVER_NAME': 'heat',
'SERVER_PORT': '8000',
'PATH_INFO': '/v1'}
dummy_req = self._dummy_GET_request(params, req_env)
dummy_req = _dummy_GET_request(params, req_env)
ok_resp = json.dumps({'token': {
'project': {'name': 'tenant', 'id': 'abcd1234'}}})
err_msg = "EC2 access key not found."
err_resp = json.dumps({'error': {'message': err_msg}})
self.mock_adapter.get_endpoint.reset()
self.mock_adapter.get_endpoint.side_effect = [
'http://key1.example.com:5000', 'http://key2.example.com:5000']
# first request fails
m_p = self._stub_http_connection(
req_url='http://123:5000/v2.0/ec2tokens',
req_url='http://key1.example.com:5000/v3/ec2tokens',
response=err_resp,
params={'AWSAccessKeyId': 'foo'}, direct_mock=False)
# second request passes
m_p2 = self._stub_http_connection(
req_url='http://456:5000/v2.0/ec2tokens',
req_url='http://key2.example.com:5000/v3/ec2tokens',
response=ok_resp,
params={'AWSAccessKeyId': 'foo'}, direct_mock=False)
requests.post.side_effect = [m_p, m_p2]
self.mock_adapter.post.side_effect = [m_p, m_p2]
self.assertEqual('woot', ec2.__call__(dummy_req))
self.assertEqual(2, requests.post.call_count)
requests.post.assert_called_with(
self.verify_req_url, data=self.verify_data,
verify=self.verify_verify,
cert=self.verify_cert, headers=self.verify_req_headers,
timeout=60)
self.assertEqual(2, self.mock_adapter.post.call_count)
self.mock_adapter.post.assert_has_calls([
mock.call('http://key1.example.com:5000/v3/ec2tokens',
data=self.verify_data,
headers=self.verify_req_headers),
mock.call('http://key2.example.com:5000/v3/ec2tokens',
data=self.verify_data,
headers=self.verify_req_headers)
])
def test_call_err_multicloud(self):
dummy_conf = {
'allowed_auth_uris': [
'http://123:5000/v2.0', 'http://456:5000/v2.0'],
'http://key1.example.com:5000/v2.0',
'http://key2.example.com:5000/v2.0'],
'multi_cloud': True
}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
@@ -471,7 +491,7 @@ class Ec2TokenTest(common.HeatTestCase):
req_env = {'SERVER_NAME': 'heat',
'SERVER_PORT': '8000',
'PATH_INFO': '/v1'}
dummy_req = self._dummy_GET_request(params, req_env)
dummy_req = _dummy_GET_request(params, req_env)
err_resp1 = json.dumps({})
@@ -480,27 +500,33 @@ class Ec2TokenTest(common.HeatTestCase):
# first request fails with HeatAccessDeniedError
m_p = self._stub_http_connection(
req_url='http://123:5000/v2.0/ec2tokens',
req_url='http://key1.example.com:5000/v2.0/ec2tokens',
response=err_resp1,
params={'AWSAccessKeyId': 'foo'}, direct_mock=False)
# second request fails with HeatInvalidClientTokenIdError
m_p2 = self._stub_http_connection(
req_url='http://456:5000/v2.0/ec2tokens',
req_url='http://key2.example.com:5000/v2.0/ec2tokens',
response=err_resp2,
params={'AWSAccessKeyId': 'foo'}, direct_mock=False)
requests.post.side_effect = [m_p, m_p2]
self.mock_adapter.post.side_effect = [m_p, m_p2]
self.mock_adapter.get_endpoint.reset()
self.mock_adapter.get_endpoint.side_effect = [
'http://key1.example.com:5000', 'http://key2.example.com:5000']
# raised error matches last failure
self.assertRaises(exception.HeatInvalidClientTokenIdError,
ec2.__call__, dummy_req)
self.assertEqual(2, requests.post.call_count)
requests.post.assert_called_with(
self.verify_req_url, data=self.verify_data,
verify=self.verify_verify,
cert=self.verify_cert, headers=self.verify_req_headers,
timeout=60)
self.assertEqual(2, self.mock_adapter.post.call_count)
self.mock_adapter.post.assert_has_calls([
mock.call('http://key1.example.com:5000/v3/ec2tokens',
data=self.verify_data,
headers=self.verify_req_headers),
mock.call('http://key2.example.com:5000/v3/ec2tokens',
data=self.verify_data,
headers=self.verify_req_headers)
])
def test_call_err_multicloud_none_allowed(self):
dummy_conf = {
@@ -512,25 +538,27 @@ class Ec2TokenTest(common.HeatTestCase):
req_env = {'SERVER_NAME': 'heat',
'SERVER_PORT': '8000',
'PATH_INFO': '/v1'}
dummy_req = self._dummy_GET_request(params, req_env)
dummy_req = _dummy_GET_request(params, req_env)
self.assertRaises(exception.HeatAccessDeniedError,
ec2.__call__, dummy_req)
def test_call_badconf_no_authuri(self):
ec2 = ec2token.EC2Token(app='woot', conf={})
# Clear _ks_adapters to simulate no authuri
ec2._ks_adapters = {}
params = {'AWSAccessKeyId': 'foo', 'Signature': 'xyz'}
req_env = {'SERVER_NAME': 'heat',
'SERVER_PORT': '8000',
'PATH_INFO': '/v1'}
dummy_req = self._dummy_GET_request(params, req_env)
dummy_req = _dummy_GET_request(params, req_env)
ex = self.assertRaises(exception.HeatInternalFailureError,
ec2.__call__, dummy_req)
self.assertEqual('Service misconfigured', str(ex))
def test_call_ok_auth_uri_ec2authtoken(self):
dummy_url = 'http://123:5000/v2.0'
dummy_url = 'http://key1.example.com:5000/v2.0'
cfg.CONF.set_default('auth_uri', dummy_url, group='ec2authtoken')
ec2 = ec2token.EC2Token(app='woot', conf={})
@@ -538,7 +566,7 @@ class Ec2TokenTest(common.HeatTestCase):
req_env = {'SERVER_NAME': 'heat',
'SERVER_PORT': '8000',
'PATH_INFO': '/v1'}
dummy_req = self._dummy_GET_request(params, req_env)
dummy_req = _dummy_GET_request(params, req_env)
ok_resp = json.dumps({'token': {
'project': {'name': 'tenant', 'id': 'abcd1234'}}})
@@ -546,15 +574,13 @@ class Ec2TokenTest(common.HeatTestCase):
params={'AWSAccessKeyId': 'foo'})
self.assertEqual('woot', ec2.__call__(dummy_req))
requests.post.assert_called_with(
self.mock_adapter.post.assert_called_once_with(
self.verify_req_url, data=self.verify_data,
verify=self.verify_verify,
cert=self.verify_cert, headers=self.verify_req_headers,
timeout=60)
headers=self.verify_req_headers)
def test_call_ok_auth_uri_ec2authtoken_long(self):
# Prove we tolerate a url which already includes the /ec2tokens path
dummy_url = 'http://123:5000/v2.0/ec2tokens'
dummy_url = 'http://key1.example.com:5000/v2.0/ec2tokens'
cfg.CONF.set_default('auth_uri', dummy_url, group='ec2authtoken')
ec2 = ec2token.EC2Token(app='woot', conf={})
@@ -562,7 +588,7 @@ class Ec2TokenTest(common.HeatTestCase):
req_env = {'SERVER_NAME': 'heat',
'SERVER_PORT': '8000',
'PATH_INFO': '/v1'}
dummy_req = self._dummy_GET_request(params, req_env)
dummy_req = _dummy_GET_request(params, req_env)
ok_resp = json.dumps({'token': {
'project': {'name': 'tenant', 'id': 'abcd1234'}}})
@@ -570,40 +596,9 @@ class Ec2TokenTest(common.HeatTestCase):
params={'AWSAccessKeyId': 'foo'})
self.assertEqual('woot', ec2.__call__(dummy_req))
requests.post.assert_called_with(
self.mock_adapter.post.assert_called_once_with(
self.verify_req_url, data=self.verify_data,
verify=self.verify_verify,
cert=self.verify_cert, headers=self.verify_req_headers,
timeout=60)
def test_call_ok_auth_uri_ks_authtoken(self):
# Import auth_token to have keystone_authtoken settings setup.
importutils.import_module('keystonemiddleware.auth_token')
dummy_url = 'http://123:5000/v2.0'
try:
cfg.CONF.set_override('www_authenticate_uri', dummy_url,
group='keystone_authtoken')
except cfg.NoSuchOptError:
cfg.CONF.set_override('auth_uri', dummy_url,
group='keystone_authtoken')
ec2 = ec2token.EC2Token(app='woot', conf={})
params = {'AWSAccessKeyId': 'foo', 'Signature': 'xyz'}
req_env = {'SERVER_NAME': 'heat',
'SERVER_PORT': '8000',
'PATH_INFO': '/v1'}
dummy_req = self._dummy_GET_request(params, req_env)
ok_resp = json.dumps({'token': {
'project': {'name': 'tenant', 'id': 'abcd1234'}}})
self._stub_http_connection(response=ok_resp,
params={'AWSAccessKeyId': 'foo'})
self.assertEqual('woot', ec2.__call__(dummy_req))
requests.post.assert_called_with(
self.verify_req_url, data=self.verify_data,
verify=self.verify_verify,
cert=self.verify_cert, headers=self.verify_req_headers,
timeout=60)
headers=self.verify_req_headers)
def test_filter_factory(self):
ec2_filter = ec2token.EC2Token_filter_factory(global_conf={})
@@ -614,3 +609,257 @@ class Ec2TokenTest(common.HeatTestCase):
ec2_filter = ec2token.EC2Token_filter_factory(global_conf={})
self.assertIsNone(ec2_filter(None).application)
class Ec2TokenConfigurationTest(common.HeatTestCase):
"""Tests the Ec2Token middleware configuration options."""
def setUp(self, **kwargs):
super().setUp(**kwargs)
self.mock_discover_cls = self.patchobject(keystoneauth1.discover,
'Discover')
self.mock_discover = self.mock_discover_cls.return_value
def tearDown(self):
super().tearDown()
# unregister any dynamic opts that were creating in the testing.
cfg.CONF.reset()
opts = keystoneauth1.loading.conf.get_plugin_conf_options('password')
for group in cfg.CONF.keys():
if (group.startswith("ec2authtoken") or
group == "keystone_authtoken"):
cfg.CONF.unregister_opts(opts, group)
def test_init_ks_session_fails(self):
ec2 = ec2token.EC2Token(app='woot', conf={})
self.assertEqual(ec2._ks_adapters, {})
def test_init_ks_session_multicloud(self):
load_config_file('multi_cloud_enabled.conf')
ec2 = ec2token.EC2Token(app='woot', conf={})
self.assertEqual(2, len(ec2._ks_adapters))
self.assertIsNotNone(ec2._ks_adapters['alice'].session.auth)
self.assertEqual(
'http://key1.example.com:5000',
ec2._ks_adapters['alice'].get_endpoint())
self.assertIsNotNone(ec2._ks_adapters['bob'].session.auth)
self.assertEqual(
'http://key2.example.com:5000',
ec2._ks_adapters['bob'].get_endpoint())
def test_init_ks_session_multicloud_missing(self):
# The clouds defines three clouds
# but only two configuration sections are present
load_config_file('multi_cloud_partial.conf')
dummy_conf = {'multi_cloud': True,
'clouds': ['alice', 'fred', 'bob']
}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
self.assertEqual(3, len(ec2._ks_adapters))
self.assertIsNotNone(ec2._ks_adapters['alice'].session.auth)
self.assertEqual(
'http://key1.example.com:5000',
ec2._ks_adapters['alice'].get_endpoint())
self.assertIsNotNone(ec2._ks_adapters['bob'].session.auth)
self.assertEqual(
'http://key2.example.com:5000',
ec2._ks_adapters['bob'].get_endpoint())
self.assertIsNone(ec2._ks_adapters['fred'].session.auth)
def test_init_ks_session_allowed_auth_uris_in_conf(self):
load_config_file('multi_cloud_auth_uris.conf')
ec2 = ec2token.EC2Token(app='woot', conf={})
self.assertEqual(2, len(ec2._ks_adapters))
for url in [
'http://key1.example.com:5000/v2.0',
'http://key2.example.com:5000/v3']:
self.assertIsInstance(ec2._ks_adapters[url].session.auth,
ks_noauth.NoAuth)
self.assertEqual(url.rsplit('/', 1)[0],
ec2._ks_adapters[url].get_endpoint())
def test_init_ks_session_allowed_auth_uris_in_paste(self):
load_config_file('ec2authtoken.conf')
paste_conf = {
"multi_cloud": True,
"allowed_auth_uris": ["http://key1.example.com:5000/v2.0",
"http://key2.example.com:5000/v3"]}
ec2 = ec2token.EC2Token(app='woot', conf=paste_conf)
self.assertSequenceEqual(
["http://key1.example.com:5000/v2.0",
"http://key2.example.com:5000/v3"],
list(ec2._ks_adapters.keys()))
for url in [
'http://key1.example.com:5000/v2.0',
'http://key2.example.com:5000/v3']:
self.assertIsInstance(ec2._ks_adapters[url].session.auth,
ks_noauth.NoAuth)
self.assertEqual(url.rsplit('/', 1)[0],
ec2._ks_adapters[url].get_endpoint())
def test_init_ks_session_from_keystone_authtoken_section(self):
load_config_file('keystone_authtoken.conf')
dummy_conf = {'auth_uri': 'http://key1.example.com:5000'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
self.assertEqual(1, len(ec2._ks_adapters))
self.assertIsNotNone(ec2._ks_adapters[None].session.auth)
self.assertEqual('http://key1.example.com:5000',
ec2._ks_adapters[None].get_endpoint())
def test_init_ks_session_auth_uri(self):
dummy_conf = {'auth_uri': 'http://key1.example.com:5000'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
self.assertEqual(1, len(ec2._ks_adapters))
self.assertIsInstance(ec2._ks_adapters[None].session.auth,
ks_noauth.NoAuth)
self.assertEqual('http://key1.example.com:5000',
ec2._ks_adapters[None].get_endpoint())
def test_init_ks_session_auth_uri_trailing_slash(self):
dummy_conf = {'auth_uri': 'http://key1.example.com:5000/'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
self.assertEqual(1, len(ec2._ks_adapters))
self.assertIsInstance(ec2._ks_adapters[None].session.auth,
ks_noauth.NoAuth)
self.assertEqual('http://key1.example.com:5000',
ec2._ks_adapters[None].get_endpoint())
def test_init_ks_session_auth_uri_v2(self):
dummy_conf = {'auth_uri': 'http://key1.example.com:5000/v2.0'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
self.assertEqual(1, len(ec2._ks_adapters))
self.assertIsInstance(ec2._ks_adapters[None].session.auth,
ks_noauth.NoAuth)
self.assertEqual('http://key1.example.com:5000',
ec2._ks_adapters[None].get_endpoint())
def test_init_ks_session_auth_uri_v3(self):
dummy_conf = {'auth_uri': 'http://key1.example.com:5000/v3'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
self.assertEqual(1, len(ec2._ks_adapters))
self.assertIsInstance(ec2._ks_adapters[None].session.auth,
ks_noauth.NoAuth)
self.assertEqual('http://key1.example.com:5000',
ec2._ks_adapters[None].get_endpoint())
def test_init_ks_session_auth_uri_ec2tokens(self):
dummy_conf = {'auth_uri': 'http://key1.example.com:5000/v3/ec2tokens'}
ec2 = ec2token.EC2Token(app='woot', conf=dummy_conf)
self.assertEqual(1, len(ec2._ks_adapters))
self.assertIsInstance(ec2._ks_adapters[None].session.auth,
ks_noauth.NoAuth)
self.assertEqual('http://key1.example.com:5000',
ec2._ks_adapters[None].get_endpoint())
def test_init_ks_session_typical_standalone_config(self):
load_config_file('typical_standalone.conf')
self.mock_discover.url_for.return_value = \
'http://key1.example.com:5000/v3'
ec2 = ec2token.EC2Token(app='woot', conf={})
self.assertEqual(1, len(ec2._ks_adapters))
self.assertIsNotNone(ec2._ks_adapters[None].session.auth)
self.assertEqual('http://key1.example.com:5000',
ec2._ks_adapters[None].get_endpoint())
def test_init_ks_session_typical_config(self):
# The auth_uri is from the clients_keystone section
# the auth details are in the keystone_authtoken section
load_config_file('typical.conf')
self.mock_discover.url_for.return_value = \
'http://key1.example.com:5000/v3'
ec2 = ec2token.EC2Token(app='woot', conf={})
self.assertEqual(1, len(ec2._ks_adapters))
self.assertIsNotNone(ec2._ks_adapters[None].session.auth)
self.assertEqual('http://key1.example.com:5000',
ec2._ks_adapters[None].get_endpoint())
def test_init_ks_session_ec2authtoken_config(self):
load_config_file('ec2authtoken.conf')
ec2 = ec2token.EC2Token(app='woot', conf={})
self.assertEqual(1, len(ec2._ks_adapters))
self.assertIn(None, ec2._ks_adapters)
self.assertIsNotNone(ec2._ks_adapters[None].session.auth)
self.assertEqual('http://key1.example.com:5000',
ec2._ks_adapters[None].get_endpoint())
def test_init_ks_session_devstack_config(self):
load_config_file('long_auth_path.conf')
self.mock_discover.url_for.return_value = \
'http://key1.example.com/identity/v3'
ec2 = ec2token.EC2Token(app='woot', conf={})
self.assertEqual(1, len(ec2._ks_adapters))
self.assertIn(None, ec2._ks_adapters)
self.assertIsNotNone(ec2._ks_adapters[None].session.auth)
self.assertEqual('http://key1.example.com/identity',
ec2._ks_adapters[None].get_endpoint())
def test_conf_ssl_opttions_default(self):
cfg.CONF.set_default('auth_uri', 'https://192.0.2.9/v2.0/',
group='ec2authtoken')
ec2 = ec2token.EC2Token(app=None, conf={})
adapter = ec2._ks_adapters[None]
self.assertTrue(adapter.session.verify)
self.assertIsNone(adapter.session.cert)
def test_conf_ssl_insecure(self):
cfg.CONF.set_default('auth_uri', 'https://192.0.2.9/v2.0/',
group='ec2authtoken')
cfg.CONF.set_default('insecure', True,
group='ec2authtoken')
ec2 = ec2token.EC2Token(app=None, conf={})
adapter = ec2._ks_adapters[None]
self.assertFalse(adapter.session.verify)
self.assertIsNone(adapter.session.cert)
def test_conf_ssl_opttions(self):
cfg.CONF.set_default('auth_uri', 'https://192.0.2.9/v2.0/',
group='ec2authtoken')
cfg.CONF.set_default('ca_file', '/home/user/cacert.pem',
group='ec2authtoken')
cfg.CONF.set_default('insecure', False, group='ec2authtoken')
cfg.CONF.set_default('cert_file', '/home/user/mycert',
group='ec2authtoken')
cfg.CONF.set_default('key_file', '/home/user/mykey',
group='ec2authtoken')
ec2 = ec2token.EC2Token(app=None, conf={})
adapter = ec2._ks_adapters[None]
self.assertEqual('/home/user/cacert.pem', adapter.session.verify)
self.assertEqual(('/home/user/mycert', '/home/user/mykey'),
adapter.session.cert)
def test_conf_ssl_insecure_paste(self):
cfg.CONF.set_default('auth_uri', 'https://192.0.2.9/v2.0/',
group='ec2authtoken')
ec2 = ec2token.EC2Token(app=None, conf={
'insecure': 'True'
})
adapter = ec2._ks_adapters[None]
self.assertFalse(adapter.session.verify)
def test_conf_ssl_options_paste(self):
cfg.CONF.set_default('auth_uri', 'https://192.0.2.9/v2.0/',
group='ec2authtoken')
ec2 = ec2token.EC2Token(app=None, conf={
'ca_file': '/home/user/cacert.pem',
'cert_file': '/home/user/mycert',
'key_file': '/home/user/mykey'
})
adapter = ec2._ks_adapters[None]
self.assertEqual('/home/user/cacert.pem', adapter.session.verify)
self.assertEqual(('/home/user/mycert', '/home/user/mykey'),
adapter.session.cert)
def load_config_file(file_name):
file_path = pathlib.Path(__file__).parent / "config" / file_name
cfg.CONF([], default_config_files=[file_path])
def _dummy_GET_request(params=None, environ=None):
# Mangle the params dict into a query string
params = params or {}
environ = environ or {}
qs = "&".join(["=".join([k, str(params[k])]) for k in params])
environ.update({'REQUEST_METHOD': 'GET', 'QUERY_STRING': qs})
req = wsgi.Request(environ)
return req

View File

@@ -0,0 +1,17 @@
---
fixes:
- |
The Keystone v3 ec2token end point requires authenticated access. The Heat
ec2token filter now requires Keystone auth settings to be able to
verify EC2 credentials. For single cloud mode the ec2token filter will
look in the ``[ec2authtoken]`` section of the heat configuration for
keystone authentication settings.
In multicloud mode keystone auth settings must be supplied in configuration
file sections names ``[ec2authtoken.<name>]`` .
The ``[ec2authtoken] clouds`` option should be also configured to define
the list of names.
Note that ec2token request will be sent without authentication if
the legacy settings (``auth_uri`` and ``allowed_auth_uris``) are still
used.