designate/functionaltests/common/client.py
Paul Glass 81cd2994a9 A request hook interface for the functional test client
This lets us write custom hooks out of tree that are called whenever a
request is made in the functional tests.

The hook gets notified:

1. Just before a request, with the request args
2. Just after a request, with the resp and resp_body
3. When a request causes an exception, with the exception object (which
contains the resp and resp_body if it came from tempest_lib)

This is useful for maintaining certain out-of-tree test modifications,
like:

    1. Rate limiting client requests
    2. Additional custom logging
    3. ???

Change-Id: Ie2ca4ee85972aa8f9b22c402ed8fad368d2ff7d9
2016-02-15 10:23:03 -06:00

243 lines
7.9 KiB
Python

"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import abc
import logging
from config import cfg
from noauth import NoAuthAuthProvider
from six import string_types
from six.moves.urllib.parse import quote_plus
from tempest_lib.common.rest_client import RestClient
from tempest_lib.auth import KeystoneV2Credentials
from tempest_lib.auth import KeystoneV2AuthProvider
from functionaltests.common.utils import memoized
from functionaltests.common import hooks
LOG = logging.getLogger(__name__)
class KeystoneV2AuthProviderWithOverridableUrl(KeystoneV2AuthProvider):
def base_url(self, *args, **kwargs):
# use the base url from the config if it was specified
if cfg.CONF.identity.designate_override_url:
return cfg.CONF.identity.designate_override_url
else:
return super(KeystoneV2AuthProviderWithOverridableUrl, self) \
.base_url(*args, **kwargs)
class KeystoneV2AuthProviderNoToken(KeystoneV2AuthProviderWithOverridableUrl):
def _decorate_request(self, filters, method, url, headers=None, body=None,
auth_data=None):
_res = super(KeystoneV2AuthProviderNoToken, self)._decorate_request(
filters, method, url, headers=headers, body=body,
auth_data=auth_data)
_url, _headers, _body = _res
del _headers['X-Auth-Token']
return (_url, _headers, _body)
class BaseDesignateClient(RestClient):
def __init__(self, with_token=True):
no_cert_check = cfg.CONF.testconfig.disable_ssl_certificate_validation
interface = cfg.CONF.designate.interface
if not interface.endswith('URL'):
interface += "URL"
self.hooks = []
self._populate_hooks()
super(BaseDesignateClient, self).__init__(
auth_provider=self.get_auth_provider(with_token),
service=cfg.CONF.designate.service,
region=cfg.CONF.identity.region,
disable_ssl_certificate_validation=no_cert_check,
endpoint_type=interface
)
def _populate_hooks(self):
for name in cfg.CONF.testconfig.hooks:
LOG.debug("Loading request hook '%s' from config", name)
try:
cls = hooks.get_class(name)
if not cls:
LOG.debug("'%s' not found. Call register_hook", name)
else:
self.hooks.append(cls)
except Exception as e:
LOG.exception(e)
def request(self, *args, **kwargs):
req_hooks = [hook_class() for hook_class in self.hooks]
try:
for hook in req_hooks:
hook.before(args, kwargs)
r, b = super(BaseDesignateClient, self).request(*args, **kwargs)
for hook in req_hooks:
hook.after(r, b)
return r, b
except Exception as e:
for hook in req_hooks:
hook.on_exception(e)
raise
def get_auth_provider(self, with_token=True):
if cfg.CONF.noauth.use_noauth:
return self._get_noauth_auth_provider()
return self._get_keystone_auth_provider(with_token)
@abc.abstractmethod
def _get_noauth_auth_provider(self):
pass
@abc.abstractmethod
def _get_keystone_auth_provider(self):
pass
def _create_keystone_auth_provider(self, creds, with_token=True):
if with_token:
auth_provider = KeystoneV2AuthProviderWithOverridableUrl(
creds, cfg.CONF.identity.uri)
else:
auth_provider = KeystoneV2AuthProviderNoToken(
creds, cfg.CONF.identity.uri)
auth_provider.fill_credentials()
return auth_provider
class DesignateClient(BaseDesignateClient):
"""Client with default user"""
def _get_noauth_auth_provider(self):
creds = KeystoneV2Credentials(
tenant_id=cfg.CONF.noauth.tenant_id,
)
return NoAuthAuthProvider(creds, cfg.CONF.noauth.designate_endpoint)
def _get_keystone_auth_provider(self, with_token=True):
creds = KeystoneV2Credentials(
username=cfg.CONF.identity.username,
password=cfg.CONF.identity.password,
tenant_name=cfg.CONF.identity.tenant_name,
)
return self._create_keystone_auth_provider(creds, with_token)
class DesignateAltClient(BaseDesignateClient):
"""Client with alternate user"""
def _get_noauth_auth_provider(self):
creds = KeystoneV2Credentials(
tenant_id=cfg.CONF.noauth.alt_tenant_id,
)
return NoAuthAuthProvider(creds, cfg.CONF.noauth.designate_endpoint)
def _get_keystone_auth_provider(self, with_token=True):
creds = KeystoneV2Credentials(
username=cfg.CONF.identity.alt_username,
password=cfg.CONF.identity.alt_password,
tenant_name=cfg.CONF.identity.alt_tenant_name,
)
return self._create_keystone_auth_provider(creds, with_token)
class DesignateAdminClient(BaseDesignateClient):
"""Client with admin user"""
def _get_noauth_auth_provider(self):
creds = KeystoneV2Credentials(
tenant_id=cfg.CONF.noauth.tenant_id,
)
return NoAuthAuthProvider(creds, cfg.CONF.noauth.designate_endpoint)
def _get_keystone_auth_provider(self, with_token=True):
creds = KeystoneV2Credentials(
username=cfg.CONF.auth.admin_username,
password=cfg.CONF.auth.admin_password,
tenant_name=cfg.CONF.auth.admin_tenant_name,
)
return self._create_keystone_auth_provider(creds, with_token)
class ClientMixin(object):
@classmethod
@memoized
def get_clients(cls, with_token):
return {
'default': DesignateClient(with_token),
'alt': DesignateAltClient(with_token),
'admin': DesignateAdminClient(with_token),
}
def __init__(self, client):
self.client = client
@classmethod
def deserialize(cls, resp, body, model_type):
return resp, model_type.from_json(body)
@classmethod
def as_user(cls, user, with_token=True):
"""
:param user: 'default', 'alt', or 'admin'
:param with_token: Boolean for whether to send the x-auth-token with
requests
"""
return cls(cls.get_clients(with_token)[user])
@property
def tenant_id(self):
return self.client.tenant_id
@classmethod
def add_filters(cls, url, filters):
"""
:param url: base URL for the request
:param filters: dict with var:val pairs to add as parameters to URL
"""
first = True
for f in filters:
if isinstance(filters[f], string_types):
filters[f] = quote_plus(filters[f].encode('utf-8'))
url = '{url}{sep}{var}={val}'.format(
url=url, sep=('?' if first else '&'), var=f, val=filters[f]
)
first = False
return url
def create_uri(self, path, filters=None):
url_pattern = cfg.CONF.testconfig.v2_path_pattern
params = {
'path': path,
'tenant_id': self.client.tenant_id,
'tenant_name': self.client.tenant_name,
'user': self.client.user,
'user_id': self.client.user_id,
}
uri = url_pattern.format(**params)
uri.replace('//', '/')
if filters:
uri = self.add_filters(uri, filters)
return uri