Adding rerun to Shaker and Enhancing ES Integration

Although other tools make use of the rerun parameter, Shaker never
really used it since the beginning due to an overlook. Adding that
functionality to shaker with this commit. Having a rerun ensures the
samples are more representative.

Refactoring ES work to enable average visualization from reruns- this
work ensures that we can consume averages from multiple runs rather than
relying on one-off values

Change-Id: I5872f39d5d61c1c0fe4e56bd0300227c04e04d84
This commit is contained in:
Sai Sindhur Malleni 2016-07-01 13:41:22 -04:00
parent f01b3dbcbb
commit 687550e970

View File

@ -82,9 +82,7 @@ class Shaker(WorkloadBase.WorkloadBase):
# Method to process JSON outputted by Shaker, model data in a format that can be consumed
# by ElasticSearch and ship the data to ES
def send_to_elastic(self, outputfile, browbeat_scenario, shaker_uuid):
es_ts = datetime.datetime.utcnow()
es_list = []
def send_to_elastic(self, outputfile, browbeat_scenario, shaker_uuid, es_ts, es_list, run):
fname = outputfile
# Load output json
try:
@ -99,7 +97,8 @@ class Shaker(WorkloadBase.WorkloadBase):
'timestamp': str(es_ts).replace(" ", "T"),
'browbeat_scenario': browbeat_scenario,
'shaker_uuid': str(shaker_uuid),
'record': record
'record': record,
'run': run
}
result = self.elastic.combine_metadata(shaker_stats)
@ -108,22 +107,14 @@ class Shaker(WorkloadBase.WorkloadBase):
# Dictionary to capture common test data
shaker_test_meta = {}
for scenario in data['scenarios'].iterkeys():
test_time = data['scenarios'][scenario][
'execution']['tests'][0]['time']
# Setting up the timestamp list based on the time the test ran for
for interval in range(0, test_time + 9):
es_list.append(
datetime.datetime.utcnow() +
datetime.timedelta(
0,
interval))
# Populating common test data
if 'shaker_test_info' not in shaker_test_meta:
shaker_test_meta['shaker_test_info'] = data[
'scenarios'][scenario]
if "progression" not in shaker_test_meta[
'shaker_test_info']['execution']:
shaker_test_meta['shaker_test_info']['execution']['progression'] = "all"
shaker_test_meta['shaker_test_info'][
'execution']['progression'] = "all"
var = data['scenarios'][scenario][
'deployment'].pop('accommodation')
if 'deployment' not in shaker_test_meta:
@ -165,19 +156,21 @@ class Shaker(WorkloadBase.WorkloadBase):
result['metric'] = outputs[key]
result['result_type'] = key
# Populate shaker_stats dictionary with individual result value from the
# list of samples for each test type(tcp download/ping_icm) for each
# list of samples for each test type(tcp download/ping_icmp) for each
# record afterrecord after flattening out data
shaker_stats = {
'record': data['records'][record],
'run': run,
'shaker_test_info': shaker_test_meta['shaker_test_info'],
'timestamp': elastic_timestamp,
'accommodation': shaker_test_meta['deployment']['accommodation'],
'template': shaker_test_meta['deployment']['template'],
'result': result,
'browbeat_scenario': browbeat_scenario,
'grafana_url': [self.grafana.grafana_urls()],
'grafana_url': [
self.grafana.grafana_urls()],
'shaker_uuid': str(shaker_uuid)}
# Ship Data to Es when record status is ok
# Ship Data to ES when record status is ok
result = self.elastic.combine_metadata(shaker_stats)
self.elastic.index_result(result)
else:
@ -195,7 +188,7 @@ class Shaker(WorkloadBase.WorkloadBase):
result = self.elastic.combine_metadata(shaker_stats)
self.elastic.index_result(result)
def set_scenario(self, scenario, fname):
def set_scenario(self, scenario, fname, default_time):
stream = open(fname, 'r')
data = yaml.load(stream)
stream.close()
@ -203,7 +196,6 @@ class Shaker(WorkloadBase.WorkloadBase):
default_density = 1
default_compute = 1
default_progression = "linear"
default_time = 60
if "placement" in scenario:
data['deployment']['accommodation'][1] = scenario['placement']
else:
@ -341,14 +333,14 @@ class Shaker(WorkloadBase.WorkloadBase):
workload,
"pass")
def run_scenario(self, scenario, result_dir, test_name, filename):
def run_scenario(self, scenario, result_dir, test_name, filename,
shaker_uuid, es_ts, es_list, run):
server_endpoint = self.config['shaker']['server']
port_no = self.config['shaker']['port']
flavor = self.config['shaker']['flavor']
venv = self.config['shaker']['venv']
shaker_region = self.config['shaker']['shaker_region']
timeout = self.config['shaker']['join_timeout']
shaker_uuid = uuid.uuid4()
self.logger.info(
"The uuid for this shaker scenario is {}".format(shaker_uuid))
cmd_1 = (
@ -388,7 +380,8 @@ class Shaker(WorkloadBase.WorkloadBase):
self.grafana.run_playbook(from_ts, to_ts, result_dir, test_name)
# Send Data to elastic
if self.config['elasticsearch']['enabled']:
self.send_to_elastic(outputfile, scenario['name'], shaker_uuid)
self.send_to_elastic(outputfile, scenario['name'], shaker_uuid,
es_ts, es_list, run)
def run_shaker(self):
self.logger.info("Starting Shaker workloads")
@ -396,29 +389,46 @@ class Shaker(WorkloadBase.WorkloadBase):
self.logger.debug("Time Stamp (Prefix): {}".format(time_stamp))
scenarios = self.config.get('shaker')['scenarios']
venv = self.config['shaker']['venv']
default_time = 60
self.shaker_checks()
if (scenarios is not None and len(scenarios) > 0):
for scenario in scenarios:
if scenario['enabled']:
self.update_scenarios()
self.update_total_scenarios()
self.logger.info("Scenario: {}".format(scenario['name']))
fname = os.path.join(venv, scenario['file'])
self.set_scenario(scenario, fname)
self.logger.debug("Set Scenario File: {}".format(
fname))
result_dir = self.tools.create_results_dir(
self.config['browbeat'][
'results'], time_stamp, "shaker",
scenario['name'])
workload = self.__class__.__name__
self.workload_logger(result_dir, workload)
time_stamp1 = datetime.datetime.utcnow().strftime(
"%Y%m%d-%H%M%S")
test_name = "{}-browbeat-{}-{}".format(
time_stamp1, "shaker", scenario['name'])
self.run_scenario(scenario, result_dir, test_name, fname)
self.get_stats()
shaker_uuid = uuid.uuid4()
es_ts = datetime.datetime.utcnow()
es_list = []
if "time" in scenario:
test_time = scenario['time']
else:
test_time = default_time
for interval in range(0, test_time + 9):
es_list.append(
datetime.datetime.utcnow() +
datetime.timedelta(0,interval))
for run in range(self.config['browbeat']['rerun']):
self.logger.info("Scenario: {}".format(scenario['name']))
self.logger.info("Run: {}".format(run))
fname = os.path.join(venv, scenario['file'])
self.set_scenario(scenario, fname, default_time)
self.logger.debug("Set Scenario File: {}".format(
fname))
result_dir = self.tools.create_results_dir(
self.config['browbeat'][
'results'], time_stamp, "shaker",
scenario['name'] + "-" + str(run))
workload = self.__class__.__name__
self.workload_logger(result_dir, workload)
time_stamp1 = datetime.datetime.now().strftime(
"%Y%m%d-%H%M%S")
test_name = "{}-browbeat-{}-{}-{}".format(
time_stamp1, "shaker", scenario['name'], run)
self.run_scenario(
scenario, result_dir, test_name, fname, shaker_uuid,
es_ts, es_list, run)
self.get_stats()
else:
self.logger.info(
"Skipping {} as scenario enabled: false".format(