Elastic search connector - Rally

+ Added function for Rally Results
+ Modified the Service filed
+ Rally.py now produces a json file and returns json
+ Grafana to return the URL list
+ Quick single workload test worked.. +1
+ Switched to a List for the config
+ Rebase
+ Remove parent/child work
+ Error check
+ Default 3 jsons in the config file
+ Rebase
+ Pep
+ Remove Grafana-API Key per Akrzos
+ Small bug
+ Removed getter/setter in Grafana.py

Change-Id: I9f1d5f27597c4c4897fc4ee821aad309fa11a04b
This commit is contained in:
Joe Talerico 2016-03-30 17:50:18 -04:00 committed by Joe
parent f174b67e2b
commit 13c67b4e11
6 changed files with 196 additions and 39 deletions

@ -2,6 +2,17 @@
browbeat: browbeat:
results : results/ results : results/
rerun: 3 rerun: 3
elasticsearch:
enabled: false
host: 1.1.1.1
port: 9200
metadata_files:
- name: hardware-metadata
file: hardware-metadata.json
- name: os-metadata
file: os-metadata.json
- name: software-metadata
file: software-metadata.json
ansible: ansible:
hosts: ansible/hosts hosts: ansible/hosts
adjust: adjust:
@ -200,7 +211,7 @@ rally:
#shaker scenarios require atleast 2 compute nodes #shaker scenarios require atleast 2 compute nodes
shaker: shaker:
enabled: true enabled: true
server: (Address of machine running browbeat) server: 1.1.1.1
port: 5555 port: 5555
flavor: m1.small flavor: m1.small
join_timeout: 600 join_timeout: 600

@ -2,6 +2,17 @@
browbeat: browbeat:
results : results/ results : results/
rerun: 1 rerun: 1
elasticsearch:
enabled: false
host: 1.1.1.1
port: 9200
metadata_files:
- name: hardware-metadata
file: hardware-metadata.json
- name: os-metadata
file: os-metadata.json
- name: software-metadata
file: software-metadata.json
ansible: ansible:
hosts: ansible/hosts hosts: ansible/hosts
adjust: adjust:
@ -42,7 +53,7 @@ perfkit:
data_disk_size: 4 data_disk_size: 4
shaker: shaker:
enabled: true enabled: true
server: (Address of machine running browbeat-Undercloud) server: 1.1.1.1
port: 5555 port: 5555
flavor: m1.small flavor: m1.small
join_timeout: 600 join_timeout: 600

63
lib/Elastic.py Normal file

@ -0,0 +1,63 @@
from elasticsearch import Elasticsearch
import logging
import json
import pprint
import numpy
class Elastic:
"""
"""
def __init__(self,config,tool="browbeat") :
self.config = config
self.logger = logging.getLogger('browbeat.Elastic')
self.es = Elasticsearch([
{'host': self.config['elasticsearch']['host'],
'port': self.config['elasticsearch']['port']}],
send_get_body_as='POST'
)
self.index = tool
"""
"""
def load_json(self,result):
json_data = None
self.logger.info("Loading JSON")
json_data = json.loads(result)
return json_data
"""
"""
def load_json_file(self,result):
json_data = None
self.logger.info("Loading JSON file : {}".format(result))
try :
with open(result) as jdata :
json_data = json.load(jdata)
except (IOError, OSError) as e:
self.logger.error("Error loading JSON file : {}".format(result))
return False
return json_data
"""
"""
def combine_metadata(self,result):
if len(self.config['elasticsearch']['metadata_files']) > 0 :
meta = self.config['elasticsearch']['metadata_files']
for _meta in meta:
try :
with open(_meta['file']) as jdata :
result[_meta['name']] = json.load(jdata)
except (IOError, OSError) as e:
self.logger.error("Error loading Metadata file : {}".format(_meta['file']))
return False
return result
"""
"""
def index_result(self,result,_id=None) :
return self.es.index(index=self.index,
id=_id,
body=result,
doc_type='result',
refresh=True
)

@ -12,12 +12,11 @@ class Grafana:
self.grafana_ip = self.config['grafana']['grafana_ip'] self.grafana_ip = self.config['grafana']['grafana_ip']
self.grafana_port = self.config['grafana']['grafana_port'] self.grafana_port = self.config['grafana']['grafana_port']
self.playbook = self.config['ansible']['grafana_snapshot'] self.playbook = self.config['ansible']['grafana_snapshot']
self.grafana_url = []
def get_extra_vars(self, from_ts, to_ts, result_dir, test_name): def extra_vars(self, from_ts, to_ts, result_dir, test_name):
extra_vars = 'grafana_ip={} '.format( extra_vars = 'grafana_ip={} '.format(self.config['grafana']['grafana_ip'])
self.config['grafana']['grafana_ip']) extra_vars += 'grafana_port={} '.format(self.config['grafana']['grafana_port'])
extra_vars += 'grafana_port={} '.format(
self.config['grafana']['grafana_port'])
extra_vars += 'from={} '.format(from_ts) extra_vars += 'from={} '.format(from_ts)
extra_vars += 'to={} '.format(to_ts) extra_vars += 'to={} '.format(to_ts)
extra_vars += 'results_dir={}/{} '.format(result_dir, test_name) extra_vars += 'results_dir={}/{} '.format(result_dir, test_name)
@ -26,19 +25,26 @@ class Grafana:
extra_vars += 'snapshot_compute=true ' extra_vars += 'snapshot_compute=true '
return extra_vars return extra_vars
def print_dashboard_url(self, from_ts, to_ts, test_name): def grafana_urls(self):
return self.grafana_url
def create_grafana_urls(self, time):
if 'grafana' in self.config and self.config['grafana']['enabled']: if 'grafana' in self.config and self.config['grafana']['enabled']:
from_ts = time['from_ts']
to_ts = time['to_ts']
url = 'http://{}:{}/dashboard/db/'.format( url = 'http://{}:{}/dashboard/db/'.format(
self.grafana_ip, self.grafana_port) self.grafana_ip, self.grafana_port)
for dashboard in self.config['grafana']['dashboards']: for dashboard in self.config['grafana']['dashboards']:
full_url = '{}{}?from={}&to={}&var-Cloud={}'.format( self.grafana_url.append('{}{}?from={}&to={}&var-Cloud={}'.format(
url, dashboard, from_ts, to_ts, self.cloud_name) url, dashboard, from_ts, to_ts, self.cloud_name))
self.logger.info(
'{} - Grafana URL: {}'.format(test_name, full_url)) def print_dashboard_url(self,test_name):
for full_url in self.grafana_url:
self.logger.info('{} - Grafana URL: {}'.format(test_name, full_url))
def log_snapshot_playbook_cmd(self, from_ts, to_ts, result_dir, test_name): def log_snapshot_playbook_cmd(self, from_ts, to_ts, result_dir, test_name):
if 'grafana' in self.config and self.config['grafana']['enabled']: if 'grafana' in self.config and self.config['grafana']['enabled']:
extra_vars = self.get_extra_vars( extra_vars = self.extra_vars(
from_ts, to_ts, result_dir, test_name) from_ts, to_ts, result_dir, test_name)
snapshot_cmd = 'ansible-playbook -i {} {} -e "{}"'.format( snapshot_cmd = 'ansible-playbook -i {} {} -e "{}"'.format(
self.hosts_file, self.playbook, extra_vars) self.hosts_file, self.playbook, extra_vars)
@ -47,12 +53,10 @@ class Grafana:
def run_playbook(self, from_ts, to_ts, result_dir, test_name): def run_playbook(self, from_ts, to_ts, result_dir, test_name):
if 'grafana' in self.config and self.config['grafana']['enabled']: if 'grafana' in self.config and self.config['grafana']['enabled']:
if self.config['grafana']['snapshot']['enabled']: if self.config['grafana']['snapshot']['enabled']:
extra_vars = self.get_extra_vars( extra_vars = self.extra_vars(
from_ts, to_ts, result_dir, test_name) from_ts, to_ts, result_dir, test_name)
subprocess_cmd = ['ansible-playbook', '-i', self.hosts_file, self.playbook, '-e', subprocess_cmd = ['ansible-playbook', '-i', self.hosts_file, self.playbook, '-e',
'{}'.format(extra_vars)] '{}'.format(extra_vars)]
snapshot_log = open('{}/snapshot.log'.format(result_dir), 'a+') snapshot_log = open('{}/snapshot.log'.format(result_dir), 'a+')
self.logger.info( self.logger.info('Running ansible to create snapshots for: {}'.format(test_name))
'Running ansible to create snapshots for: {}'.format(test_name)) subprocess.Popen(subprocess_cmd, stdout=snapshot_log, stderr=subprocess.STDOUT)
subprocess.Popen(
subprocess_cmd, stdout=snapshot_log, stderr=subprocess.STDOUT)

@ -3,6 +3,9 @@ from Tools import Tools
from collections import OrderedDict from collections import OrderedDict
from Grafana import Grafana from Grafana import Grafana
from WorkloadBase import WorkloadBase from WorkloadBase import WorkloadBase
from Elastic import Elastic
import pprint
import numpy
import datetime import datetime
import glob import glob
import logging import logging
@ -19,6 +22,7 @@ class Rally(WorkloadBase):
self.tools = Tools(self.config) self.tools = Tools(self.config)
self.connmon = Connmon(self.config) self.connmon = Connmon(self.config)
self.grafana = Grafana(self.config) self.grafana = Grafana(self.config)
self.elastic = Elastic(self.config)
self.error_count = 0 self.error_count = 0
self.pass_count = 0 self.pass_count = 0
self.test_count = 0 self.test_count = 0
@ -54,11 +58,11 @@ class Rally(WorkloadBase):
if 'sleep_after' in self.config['rally']: if 'sleep_after' in self.config['rally']:
time.sleep(self.config['rally']['sleep_after']) time.sleep(self.config['rally']['sleep_after'])
to_ts = int(time.time() * 1000) to_ts = int(time.time() * 1000)
return (from_time, to_time) self.grafana.create_grafana_urls({'from_ts':from_ts, 'to_ts':to_ts})
self.grafana.print_dashboard_url(from_ts, to_ts, test_name) self.grafana.print_dashboard_url(test_name)
self.grafana.log_snapshot_playbook_cmd( self.grafana.log_snapshot_playbook_cmd(from_ts, to_ts, result_dir, test_name)
from_ts, to_ts, result_dir, test_name)
self.grafana.run_playbook(from_ts, to_ts, result_dir, test_name) self.grafana.run_playbook(from_ts, to_ts, result_dir, test_name)
return (from_time, to_time)
def update_tests(self): def update_tests(self):
self.test_count += 1 self.test_count += 1
@ -92,17 +96,63 @@ class Rally(WorkloadBase):
all_task_ids, test_name) all_task_ids, test_name)
return self.tools.run_cmd(cmd) return self.tools.run_cmd(cmd)
def gen_scenario_json(self, task_id, test_name): def gen_scenario_json(self, task_id):
cmd = "source {}; ".format(self.config['rally']['venv'])
cmd += "rally task results {}".format(task_id)
return self.tools.run_cmd(cmd)
def gen_scenario_json_file(self, task_id, test_name):
cmd = "source {}; ".format(self.config['rally']['venv']) cmd = "source {}; ".format(self.config['rally']['venv'])
cmd += "rally task results {} > {}.json".format(task_id, test_name) cmd += "rally task results {} > {}.json".format(task_id, test_name)
return self.tools.run_cmd(cmd) return self.tools.run_cmd(cmd)
def rally_metadata(self, result, meta) :
result['rally_metadata'] = meta
return result
def json_result(self,task_id):
rally_data = {}
rally_errors = []
rally_sla = []
self.logger.info("Loadding Task_ID {} JSON".format(task_id))
rally_json = self.elastic.load_json(self.gen_scenario_json(task_id))
if len(rally_json) < 1 :
self.logger.error("Issue with Rally Results")
return False
for metrics in rally_json[0]['result']:
for workload in metrics :
if type(metrics[workload]) is dict:
for value in metrics[workload] :
if not type(metrics[workload][value]) is list:
if value not in rally_json:
rally_data[value] = []
rally_data[value].append(metrics[workload][value])
if len(metrics['error']) > 0 :
rally_errors.append({'action_name': value,
'error': metrics['error']})
rally_doc = []
for workload in rally_data:
if not type(rally_data[workload]) is dict :
rally_stats = {'action': workload,
'90th':numpy.percentile(rally_data[workload], 90),
'95th':numpy.percentile(rally_data[workload], 95),
'Max':numpy.max(rally_data[workload]),
'Min':numpy.min(rally_data[workload]),
'Average':numpy.average(rally_data[workload]),
'Median':numpy.median(rally_data[workload])}
rally_doc.append(rally_stats)
return {'rally_stats' : rally_doc,
'rally_errors' : rally_errors,
'rally_setup' : rally_json[0]['key']}
def start_workloads(self): def start_workloads(self):
"""Iterates through all rally scenarios in browbeat yaml config file""" """Iterates through all rally scenarios in browbeat yaml config file"""
results = OrderedDict() results = OrderedDict()
self.logger.info("Starting Rally workloads") self.logger.info("Starting Rally workloads")
time_stamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") es_ts = datetime.datetime.now()
self.logger.debug("Time Stamp (Prefix): {}".format(time_stamp)) dir_ts = es_ts.strftime("%Y%m%d-%H%M%S")
self.logger.debug("Time Stamp (Prefix): {}".format(dir_ts))
benchmarks = self.config.get('rally')['benchmarks'] benchmarks = self.config.get('rally')['benchmarks']
if len(benchmarks) > 0: if len(benchmarks) > 0:
for benchmark in benchmarks: for benchmark in benchmarks:
@ -134,7 +184,7 @@ class Rally(WorkloadBase):
result_dir = self.tools.create_results_dir( result_dir = self.tools.create_results_dir(
self.config['browbeat'][ self.config['browbeat'][
'results'], time_stamp, benchmark['name'], 'results'], dir_ts, benchmark['name'],
scenario_name) scenario_name)
self.logger.debug("Created result directory: {}".format(result_dir)) self.logger.debug("Created result directory: {}".format(result_dir))
workload = self.__class__.__name__ workload = self.__class__.__name__
@ -157,7 +207,7 @@ class Rally(WorkloadBase):
self.update_tests() self.update_tests()
self.update_total_tests() self.update_total_tests()
test_name = "{}-browbeat-{}-{}-iteration-{}".format( test_name = "{}-browbeat-{}-{}-iteration-{}".format(
time_stamp, scenario_name, concurrency, run) dir_ts, scenario_name, concurrency, run)
if not result_dir: if not result_dir:
self.logger.error( self.logger.error(
@ -195,17 +245,35 @@ class Rally(WorkloadBase):
self.logger.info( self.logger.info(
"Generating Rally HTML for task_id : {}". "Generating Rally HTML for task_id : {}".
format(task_id)) format(task_id))
self.gen_scenario_html( self.gen_scenario_html([task_id], test_name)
[task_id], test_name) self.gen_scenario_json_file(task_id, test_name)
self.gen_scenario_json(
task_id, test_name)
results[run].append(task_id) results[run].append(task_id)
self.update_pass_tests() if self.config['elasticsearch']['enabled'] :
self.update_total_pass_tests() # Start indexing
self.get_time_dict( result_json = self.json_result(task_id)
to_time, from_time, benchmark['name'], new_test_name, _meta = {'taskid' : task_id,
workload, "pass") 'timestamp': es_ts,
'workload' : {
'name' : benchmark['name'],
'scenario' : scenario_name,
'times' : scenario['times'],
'concurrency' : scenario['concurrency']},
'grafana': ",".join(self.grafana.grafana_urls())
}
if result_json :
result = self.elastic.combine_metadata(
self.rally_metadata(result_json,_meta))
if result is False :
self.logger.error
("Error with ElasticSerach connector")
else :
if len(result) < 1 :
self.logger.error(
"Issue with ElasticSearch Data, \
for task_id {}".format(task_id))
else :
self.elastic.index_result(result,
_id=task_id)
else: else:
self.logger.error("Cannot find task_id") self.logger.error("Cannot find task_id")
self.update_fail_tests() self.update_fail_tests()
@ -231,6 +299,6 @@ class Rally(WorkloadBase):
self.gen_scenario_html(results[run], combined_html_name) self.gen_scenario_html(results[run], combined_html_name)
if os.path.isfile('{}.html'.format(combined_html_name)): if os.path.isfile('{}.html'.format(combined_html_name)):
shutil.move('{}.html'.format(combined_html_name), shutil.move('{}.html'.format(combined_html_name),
'{}/{}'.format(self.config['browbeat']['results'], time_stamp)) '{}/{}'.format(self.config['browbeat']['results'], dir_ts))
else: else:
self.logger.error("Config file contains no rally benchmarks.") self.logger.error("Config file contains no rally benchmarks.")

@ -2,4 +2,4 @@ ansible
matplotlib matplotlib
python-dateutil==2.4.2 python-dateutil==2.4.2
pykwalify pykwalify
elasticsearch