Fix trust notifier

The keystone methods to get a client with a trust is broken, and simply
use service credentials while ignoring the trust. We can't load the
options directly from the configuration, we need to build a auth plugin
manually.

Change-Id: Ic5edb144eac3a4b30b9835bf251f25f65b8c29b5
(cherry picked from commit ff3d24ecbd)
This commit is contained in:
Thomas Herve
2016-06-10 14:32:19 +02:00
parent 0a0a02aa00
commit 0393448617
4 changed files with 34 additions and 26 deletions

View File

@@ -430,8 +430,7 @@ class Alarm(base.Base):
auth_plugin = pecan.request.environ.get('keystone.token_auth')
url = netutils.urlsplit(action)
if self._is_trust_url(url) and url.password:
keystone_client.delete_trust_id(pecan.request.cfg,
url.username, auth_plugin)
keystone_client.delete_trust_id(url.username, auth_plugin)
Alarm.add_attributes(**{"%s_rule" % ext.name: ext.plugin

View File

@@ -18,7 +18,9 @@ import os
from keystoneauth1 import exceptions as ka_exception
from keystoneauth1 import identity as ka_identity
from keystoneauth1.identity.generic import password
from keystoneauth1 import loading as ka_loading
from keystoneclient import session
from keystoneclient.v3 import client as ks_client_v3
from oslo_config import cfg
from oslo_log import log
@@ -28,36 +30,42 @@ LOG = log.getLogger(__name__)
CFG_GROUP = "service_credentials"
def get_session(conf, requests_session=None):
"""Get a aodh service credentials auth session."""
def get_session(conf):
"""Get an aodh service credentials auth session."""
auth_plugin = ka_loading.load_auth_from_conf_options(conf, CFG_GROUP)
session = ka_loading.load_session_from_conf_options(
conf, CFG_GROUP, auth=auth_plugin, session=requests_session
return ka_loading.load_session_from_conf_options(
conf, CFG_GROUP, auth=auth_plugin
)
return session
def get_client(conf, trust_id=None, requests_session=None):
"""Return a client for keystone v3 endpoint, optionally using a trust."""
session = get_session(conf, requests_session=requests_session)
return ks_client_v3.Client(session=session, trust_id=trust_id)
def get_client(conf):
"""Return a client for keystone v3 endpoint."""
sess = get_session(conf)
return ks_client_v3.Client(session=sess)
def get_service_catalog(client):
return client.session.auth.get_access(client.session).service_catalog
def get_trusted_client(conf, trust_id):
# Ideally we would use load_session_from_conf_options, but we can't do that
# *and* specify a trust, so let's create the object manually.
auth_plugin = password.Password(
username=conf[CFG_GROUP].username,
password=conf[CFG_GROUP].password,
auth_url=conf[CFG_GROUP].auth_url,
user_domain_id=conf[CFG_GROUP].user_domain_id,
trust_id=trust_id)
sess = session.Session(auth=auth_plugin)
return ks_client_v3.Client(session=sess)
def get_auth_token(client):
return client.session.auth.get_access(client.session).auth_token
def get_client_on_behalf_user(conf, auth_plugin, trust_id=None,
requests_session=None):
"""Return a client for keystone v3 endpoint, optionally using a trust."""
session = ka_loading.load_session_from_conf_options(
conf, CFG_GROUP, auth=auth_plugin, session=requests_session
)
return ks_client_v3.Client(session=session, trust_id=trust_id)
def get_client_on_behalf_user(auth_plugin):
"""Return a client for keystone v3 endpoint."""
sess = session.Session(auth=auth_plugin)
return ks_client_v3.Client(session=sess)
def create_trust_id(conf, trustor_user_id, trustor_project_id, roles,
@@ -66,7 +74,7 @@ def create_trust_id(conf, trustor_user_id, trustor_project_id, roles,
admin_client = get_client(conf)
trustee_user_id = admin_client.session.get_user_id()
client = get_client_on_behalf_user(conf, auth_plugin=auth_plugin)
client = get_client_on_behalf_user(auth_plugin)
trust = client.trusts.create(trustor_user=trustor_user_id,
trustee_user=trustee_user_id,
project=trustor_project_id,
@@ -75,9 +83,9 @@ def create_trust_id(conf, trustor_user_id, trustor_project_id, roles,
return trust.id
def delete_trust_id(conf, trust_id, auth_plugin):
def delete_trust_id(trust_id, auth_plugin):
"""Delete a trust previously setup for the aodh user."""
client = get_client_on_behalf_user(conf, auth_plugin=auth_plugin)
client = get_client_on_behalf_user(auth_plugin)
try:
client.trusts.delete(trust_id)
except ka_exception.NotFound:

View File

@@ -34,7 +34,7 @@ class TrustRestAlarmNotifier(rest.RestAlarmNotifier):
reason, reason_data):
trust_id = action.username
client = keystone_client.get_client(self.conf, trust_id)
client = keystone_client.get_trusted_client(self.conf, trust_id)
# Remove the fake user
netloc = action.netloc.split("@")[1]

View File

@@ -251,8 +251,9 @@ class TestAlarmNotifier(tests_base.BaseTestCase):
headers = {'X-Auth-Token': 'token_1234'}
headers.update(self.HTTP_HEADERS)
self.useFixture(mockpatch.Patch('keystoneclient.v3.client.Client',
lambda **kwargs: client))
self.useFixture(
mockpatch.Patch('aodh.keystone_client.get_trusted_client',
lambda *args: client))
with mock.patch.object(requests.Session, 'post') as poster:
self.service.notify_alarm({},