import httplib2 import json import os import re import six import sys import time from urlparse import urlparse import xml.dom.minidom from proboscis import before_class from proboscis import test from proboscis import TestProgram from proboscis.asserts import * from proboscis.asserts import Check from troveclient.compat import Dbaas from troveclient.compat import TroveHTTPClient from client import ConfigFile from client import SnippetWriter from client import JsonClient from client import XmlClient print_req = True class ExampleClient(object): def __init__(self, config_file): if not os.path.exists(config_file): raise RuntimeError("Could not find Example CONF at %s." % config_file) file_contents = open(config_file, "r").read() try: config = json.loads(file_contents) except Exception as exception: msg = 'Error loading config file "%s".' % config_file raise RuntimeError(msg, exception) self.directory = config.get("directory", None) if not self.directory.endswith('/'): self.directory += '/' print("directory = %s" % self.directory) self.api_url = config.get("api_url", None) print("api_url = %s" % self.api_url) #auth auth_url = config.get("auth_url", None) print("auth_url = %s" % auth_url) username = config.get("username", None) print("username = %s" % username) password = config.get("password", None) print("password = %s" % password) self.tenant = config.get("tenant", None) self.replace_host = config.get("replace_host", None) print("tenant = %s" % self.tenant) self.replace_dns_hostname = config.get("replace_dns_hostname", None) if auth_url: auth_id, tenant_id = self.get_auth_token_id_tenant_id(auth_url, username, password) else: auth_id = self.tenant tenant_id = self.tenant print("id = %s" % auth_id) self.headers = { 'X-Auth-Token': str(auth_id) } print("tenantID = %s" % tenant_id) self.tenantID = tenant_id self.dbaas_url = "%s/v1.0/%s" % (self.api_url, self.tenantID) def write_request_file(self, name, content_type, url, method, req_headers, request_body): def write_request(): return self.output_request(url, req_headers, request_body, content_type, method) if print_req: print("\t%s req url:%s" % (content_type, url)) print("\t%s req method:%s" % (content_type, method)) print("\t%s req headers:%s" % (content_type, req_headers)) print("\t%s req body:%s" % (content_type, request_body)) self.write_file(name, content_type, url, method, write_request) def write_response_file(self, name, content_type, url, method, resp, resp_content): def write_response(): return self.output_response(resp, resp_content, content_type) self.write_file(name, content_type, url, method, write_response) if print_req: print("\t%s resp:%s" % (content_type, resp)) print("\t%s resp content:%s" % (content_type, resp_content)) def write_file(self, name, content_type, url, method, func): filename = "%sdb-%s-request.%s" % (self.directory, name, content_type) with open(filename, "w") as file: output = func() output = output.replace(self.tenantID, '1234') if self.replace_host: output = output.replace(self.api_url, self.replace_host) pre_host_port = urlparse(self.api_url).netloc post_host = urlparse(self.replace_host).netloc output = output.replace(pre_host_port, post_host) file.write(output) def version_http_call(self, name, method, json, xml, output=True, print_resp=False): json['url'] = "%s/%s" % (self.api_url, json['url']) xml['url'] = "%s/%s" % (self.api_url, xml['url']) return self.make_request(name, method, json, xml, output, print_resp) def http_call(self, name, method, url, json, xml, output=True, print_resp=False): json['url'] = "%s/%s" % (self.dbaas_url, json['url']) xml['url'] = "%s/%s" % (self.dbaas_url, xml['url']) return self.make_request(name, method, json, xml, output, print_resp) # print_req and print_resp for debugging purposes def make_request(self, name, method, json, xml, output=True, print_resp=False): name = name.replace('_', '-') print("http call for %s" % name) http = httplib2.Http(disable_ssl_certificate_validation=True) req_headers = {'User-Agent': "python-example-client", 'Content-Type': "application/json", 'Accept': "application/json" } req_headers.update(self.headers) content_type = 'json' request_body = json.get('body', None) url = json.get('url') if output: self.write_request_file(name, 'json', url, method, req_headers, request_body) resp, resp_content = http.request(url, method, body=request_body, headers=req_headers) json_resp = resp, resp_content if output: filename = "%sdb-%s-response.%s" % (self.directory, name, content_type) self.write_response_file(name, 'json', url, method, resp, resp_content) content_type = 'xml' req_headers['Accept'] = 'application/xml' req_headers['Content-Type'] = 'application/xml' request_body = xml.get('body', None) url = xml.get('url') if output: filename = "%sdb-%s-request.%s" % (self.directory, name, content_type) output = self.write_request_file(name, 'xml', url, method, req_headers, request_body) resp, resp_content = http.request(url, method, body=request_body, headers=req_headers) xml_resp = resp, resp_content if output: filename = "%sdb-%s-response.%s" % (self.directory, name, content_type) self.write_response_file(name, 'xml', url, method, resp, resp_content) return json_resp, xml_resp def _indent_xml(self, my_string): my_string = my_string.encode("utf-8") # convert to plain string without indents and spaces my_re = re.compile(r'>\s+([^\s])', re.DOTALL) my_string = myre.sub(r'>\g<1>', my_string) my_string = xml.dom.minidom.parseString(my_string).toprettyxml() # remove line breaks my_re = re.compile(r'>\n\s+([^<>\s].*?)\n\s+\g<1>