# Copyright 2011 OpenStack Foundation # Copyright 2013 Rackspace Hosting # Copyright 2013 Hewlett-Packard Development Company, L.P. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """ OpenStack Client interface. Handles the REST calls and responses. """ from __future__ import print_function import logging from keystoneauth1 import adapter from oslo_utils import importutils import requests import six.moves.urllib.parse as urlparse from troveclient.apiclient import client from troveclient import exceptions from troveclient import service_catalog try: import eventlet as sleep_lib except ImportError: import time as sleep_lib try: import json except ImportError: import simplejson as json # Python 2.5 compat fix if not hasattr(urlparse, 'parse_qsl'): import cgi urlparse.parse_qsl = cgi.parse_qsl osprofiler_web = importutils.try_import("osprofiler.web") class TroveClientMixin(object): def get_database_api_version_from_endpoint(self): magic_tuple = urlparse.urlsplit(self.management_url) scheme, netloc, path, query, frag = magic_tuple v = path.split("/")[1] valid_versions = ['v1.0'] if v not in valid_versions: msg = "Invalid client version '%s'. must be one of: %s" % ( (v, ', '.join(valid_versions))) raise exceptions.UnsupportedVersion(msg) return v[1:] class HTTPClient(TroveClientMixin): USER_AGENT = 'python-troveclient' def __init__(self, user, password, projectid, auth_url, insecure=False, timeout=None, tenant_id=None, proxy_tenant_id=None, proxy_token=None, region_name=None, endpoint_type='publicURL', service_type=None, service_name=None, database_service_name=None, retries=None, http_log_debug=False, cacert=None, bypass_url=None, auth_system='keystone', auth_plugin=None): if auth_system and auth_system != 'keystone' and not auth_plugin: raise exceptions.AuthSystemNotFound(auth_system) if not auth_url and auth_system and auth_system != 'keystone': auth_url = auth_plugin.get_auth_url() if not auth_url: raise exceptions.EndpointNotFound() self.user = user self.password = password self.projectid = projectid self.tenant_id = tenant_id self.auth_url = auth_url.rstrip('/') if auth_url else auth_url self.version = 'v1' self.region_name = region_name self.endpoint_type = endpoint_type self.service_type = service_type self.service_name = service_name self.database_service_name = database_service_name self.retries = int(retries or 0) self.http_log_debug = http_log_debug self.management_url = None self.auth_token = None self.proxy_token = proxy_token self.proxy_tenant_id = proxy_tenant_id self.timeout = timeout self.bypass_url = bypass_url self.auth_system = auth_system self.auth_plugin = auth_plugin if insecure: self.verify_cert = False else: if cacert: self.verify_cert = cacert else: self.verify_cert = True self.auth_system = auth_system self.auth_plugin = auth_plugin self.LOG = logging.getLogger(__name__) if self.http_log_debug and not self.LOG.handlers: ch = logging.StreamHandler() self.LOG.setLevel(logging.DEBUG) self.LOG.addHandler(ch) if hasattr(requests, 'logging'): requests.logging.getLogger(requests.__name__).addHandler(ch) def http_log_req(self, args, kwargs): if not self.http_log_debug: return string_parts = ['curl -i'] for element in args: if element in ('GET', 'POST', 'DELETE', 'PUT'): string_parts.append(' -X %s' % element) else: string_parts.append(' %s' % element) for element in kwargs['headers']: header = ' -H "%s: %s"' % (element, kwargs['headers'][element]) string_parts.append(header) if 'data' in kwargs: string_parts.append(" -d '%s'" % (kwargs['data'])) self.LOG.debug("\nREQ: %s\n", "".join(string_parts)) def http_log_resp(self, resp): if not self.http_log_debug: return self.LOG.debug( "RESP: [%s] %s\nRESP BODY: %s\n", resp.status_code, resp.headers, resp.text) def request(self, url, method, **kwargs): kwargs.setdefault('headers', kwargs.get('headers', {})) kwargs['headers']['User-Agent'] = self.USER_AGENT kwargs['headers']['Accept'] = 'application/json' if osprofiler_web: kwargs['headers'].update(osprofiler_web.get_trace_id_headers()) if 'body' in kwargs: kwargs['headers']['Content-Type'] = 'application/json' kwargs['data'] = json.dumps(kwargs['body']) del kwargs['body'] if self.timeout: kwargs.setdefault('timeout', self.timeout) self.http_log_req((url, method,), kwargs) resp = requests.request( method, url, verify=self.verify_cert, **kwargs) self.http_log_resp(resp) if resp.text: try: body = json.loads(resp.text) except ValueError: pass body = None else: body = None if resp.status_code >= 400: raise exceptions.from_response(resp, body, url) return resp, body def _cs_request(self, url, method, **kwargs): auth_attempts = 0 attempts = 0 backoff = 1 while True: attempts += 1 if not self.management_url or not self.auth_token: self.authenticate() kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token if self.projectid: kwargs['headers']['X-Auth-Project-Id'] = self.projectid try: resp, body = self.request(self.management_url + url, method, **kwargs) return resp, body except exceptions.BadRequest: if attempts > self.retries: raise except exceptions.Unauthorized: if auth_attempts > 0: raise self.LOG.debug("Unauthorized, reauthenticating.") self.management_url = self.auth_token = None # First reauth. Discount this attempt. attempts -= 1 auth_attempts += 1 continue except exceptions.ClientException as e: if attempts > self.retries: raise if 500 <= e.code <= 599: pass else: raise except requests.exceptions.ConnectionError as e: # Catch a connection refused from requests.request self.LOG.debug("Connection refused: %s", e) msg = 'Unable to establish connection: %s' % e raise exceptions.ConnectionRefused(msg) self.LOG.debug( "Failed attempt(%s of %s), retrying in %s seconds", attempts, self.retries, backoff) sleep_lib.sleep(backoff) backoff *= 2 def get(self, url, **kwargs): return self._cs_request(url, 'GET', **kwargs) def patch(self, url, **kwargs): return self._cs_request(url, 'PATCH', **kwargs) def post(self, url, **kwargs): return self._cs_request(url, 'POST', **kwargs) def put(self, url, **kwargs): return self._cs_request(url, 'PUT', **kwargs) def delete(self, url, **kwargs): return self._cs_request(url, 'DELETE', **kwargs) def _extract_service_catalog(self, url, resp, body, extract_token=True): """See what the auth service told us and process the response. We may get redirected to another site, fail or actually get back a service catalog with a token and our endpoints. """ if resp.status_code == 200: # content must always present try: self.auth_url = url self.service_catalog = \ service_catalog.ServiceCatalog(body) if extract_token: self.auth_token = self.service_catalog.get_token() management_url = self.service_catalog.url_for( attr='region', filter_value=self.region_name, endpoint_type=self.endpoint_type, service_type=self.service_type, service_name=self.service_name, database_service_name=self.database_service_name) self.management_url = management_url.rstrip('/') return None except exceptions.AmbiguousEndpoints: print("Found more than one valid endpoint. Use a more " "restrictive filter") raise except KeyError: raise exceptions.AuthorizationFailure() except exceptions.EndpointNotFound: print("Could not find any suitable endpoint. Correct region?") raise elif resp.status_code == 305: return resp['location'] else: raise exceptions.from_response(resp, body, url) def _fetch_endpoints_from_auth(self, url): """We have a token, but don't know the final endpoint for the region. We have to go back to the auth service and ask again. This request requires an admin-level token to work. The proxy token supplied could be from a low-level enduser. We can't get this from the keystone service endpoint, we have to use the admin endpoint. This will overwrite our admin token with the user token. """ # GET ...:5001/v2.0/tokens/#####/endpoints url = '/'.join([url, 'tokens', '%s?belongsTo=%s' % (self.proxy_token, self.proxy_tenant_id)]) self.LOG.debug("Using Endpoint URL: %s", url) resp, body = self.request(url, "GET", headers={'X-Auth-Token': self.auth_token}) return self._extract_service_catalog(url, resp, body, extract_token=False) def authenticate(self): magic_tuple = urlparse.urlsplit(self.auth_url) scheme, netloc, path, query, frag = magic_tuple port = magic_tuple.port if port is None: port = 80 path_parts = path.split('/') for part in path_parts: if len(part) > 0 and part[0] == 'v': self.version = part break # TODO(sandy): Assume admin endpoint is 35357 for now. # Ideally this is going to have to be provided by the service catalog. new_netloc = netloc.replace(':%d' % port, ':%d' % (35357,)) admin_url = urlparse.urlunsplit((scheme, new_netloc, path, query, frag)) auth_url = self.auth_url if self.version == "v2.0": while auth_url: if not self.auth_system or self.auth_system == 'keystone': auth_url = self._v2_auth(auth_url) else: auth_url = self._plugin_auth(auth_url) # Are we acting on behalf of another user via an # existing token? If so, our actual endpoints may # be different than that of the admin token. if self.proxy_token: self._fetch_endpoints_from_auth(admin_url) # Since keystone no longer returns the user token # with the endpoints any more, we need to replace # our service account token with the user token. self.auth_token = self.proxy_token else: try: while auth_url: auth_url = self._v1_auth(auth_url) # In some configurations trove makes redirection to # v2.0 keystone endpoint. Also, new location does not contain # real endpoint, only hostname and port. except exceptions.AuthorizationFailure: if auth_url.find('v2.0') < 0: auth_url = auth_url + '/v2.0' self._v2_auth(auth_url) # Allows for setting an endpoint not defined in the catalog if self.bypass_url is not None and self.bypass_url != '': self.management_url = self.bypass_url def _plugin_auth(self, auth_url): return self.auth_plugin.authenticate(self, auth_url) def _v1_auth(self, url): if self.proxy_token: raise exceptions.NoTokenLookupException() headers = {'X-Auth-User': self.user, 'X-Auth-Key': self.password} if self.projectid: headers['X-Auth-Project-Id'] = self.projectid resp, body = self.request(url, 'GET', headers=headers) if resp.status_code in (200, 204): # in some cases we get No Content try: mgmt_header = 'x-server-management-url' self.management_url = resp.headers[mgmt_header].rstrip('/') self.auth_token = resp.headers['x-auth-token'] self.auth_url = url except (KeyError, TypeError): raise exceptions.AuthorizationFailure() elif resp.status_code == 305: return resp.headers['location'] else: raise exceptions.from_response(resp, body, url) def _v2_auth(self, url): """Authenticate against a v2.0 auth service.""" body = {"auth": { "passwordCredentials": {"username": self.user, "password": self.password}}} if self.projectid: body['auth']['tenantName'] = self.projectid elif self.tenant_id: body['auth']['tenantId'] = self.tenant_id self._authenticate(url, body) def _authenticate(self, url, body): """Authenticate and extract the service catalog.""" token_url = url + "/tokens" # Make sure we follow redirects when trying to reach Keystone resp, body = self.request( token_url, "POST", body=body, allow_redirects=True) return self._extract_service_catalog(url, resp, body) class SessionClient(adapter.LegacyJsonAdapter, TroveClientMixin): def __init__(self, session, auth, **kwargs): self.database_service_name = kwargs.pop('database_service_name', None) super(SessionClient, self).__init__(session=session, auth=auth, **kwargs) # FIXME(jamielennox): this is going to cause an authentication request # on client init. This is different to how the other clients work. endpoint = self.get_endpoint() if not endpoint: raise exceptions.EndpointNotFound() self.management_url = endpoint.rstrip('/') def request(self, url, method, **kwargs): raise_exc = kwargs.pop('raise_exc', True) resp, body = super(SessionClient, self).request(url, method, raise_exc=False, **kwargs) if raise_exc and resp.status_code >= 400: raise exceptions.from_response(resp, body, url) return resp, body def _construct_http_client(username=None, password=None, project_id=None, auth_url=None, insecure=False, timeout=None, proxy_tenant_id=None, proxy_token=None, region_name=None, endpoint_type='publicURL', service_type='database', service_name=None, database_service_name=None, retries=None, http_log_debug=False, auth_system='keystone', auth_plugin=None, cacert=None, bypass_url=None, tenant_id=None, session=None, **kwargs): if session: try: kwargs.setdefault('interface', endpoint_type) except KeyError: pass return SessionClient(session=session, service_type=service_type, service_name=service_name, region_name=region_name, database_service_name=database_service_name, connect_retries=retries, **kwargs) else: return HTTPClient(username, password, projectid=project_id, auth_url=auth_url, insecure=insecure, timeout=timeout, tenant_id=tenant_id, proxy_token=proxy_token, proxy_tenant_id=proxy_tenant_id, region_name=region_name, endpoint_type=endpoint_type, service_type=service_type, service_name=service_name, database_service_name=database_service_name, retries=retries, http_log_debug=http_log_debug, cacert=cacert, bypass_url=bypass_url, auth_system=auth_system, auth_plugin=auth_plugin, ) def get_version_map(): return { '1.0': 'troveclient.v1.client.Client', } def Client(version, *args, **kwargs): version_map = get_version_map() client_class = client.BaseClient.get_class('database', version, version_map) return client_class(*args, **kwargs)