Add scripts for marathon health check testing
Change-Id: Ia492436f27b31051f6bc7936da9d599ba72c5ff0
This commit is contained in:
		
							
								
								
									
										9
									
								
								scripts/marathon-health-check-testing/Dockerfile
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										9
									
								
								scripts/marathon-health-check-testing/Dockerfile
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,9 @@ | ||||
| FROM python:2 | ||||
|  | ||||
| RUN /bin/sh -c 'mkdir -p /usr/src/app' | ||||
|  | ||||
| ADD server.py /usr/src/app/server.py | ||||
|  | ||||
| WORKDIR /usr/src/app | ||||
|  | ||||
| CMD ["python", "./server.py"] | ||||
							
								
								
									
										254
									
								
								scripts/marathon-health-check-testing/HealthCheckBencher.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										254
									
								
								scripts/marathon-health-check-testing/HealthCheckBencher.py
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,254 @@ | ||||
| #!/usr/bin/python | ||||
|  | ||||
|  | ||||
| from argparse import ArgumentParser | ||||
| from datetime import datetime | ||||
| from hashlib import md5 | ||||
| from marathon import MarathonClient | ||||
| from marathon.models.container import MarathonContainer | ||||
| from marathon.models.container import MarathonContainerPortMapping | ||||
| from marathon.models.container import MarathonDockerContainer | ||||
| from marathon.models import MarathonApp | ||||
| from marathon.models import MarathonHealthCheck | ||||
| from Queue import Empty | ||||
| from Queue import Queue | ||||
| from random import random | ||||
| from threading import Thread | ||||
| from time import sleep | ||||
| from urllib2 import urlopen | ||||
|  | ||||
| MEM = 50 | ||||
| CPUS = 1 | ||||
| DISK = 50 | ||||
|  | ||||
|  | ||||
| class HealthCheckBencher(object): | ||||
|     def __init__(self, marathon_url, image, tasks): | ||||
|         self.concurrency = 20 | ||||
|         self.docker_image = image | ||||
|         self.app_base_name = 'health-check-test-' | ||||
|         self.total_tasks_cout = int(tasks) | ||||
|         self.instances_per_app = 50 | ||||
|         if tasks < self.instances_per_app: | ||||
|             self.instances_per_app = self.total_tasks_cout | ||||
|             self.app_count = 1 | ||||
|         else: | ||||
|             self.app_count = self.total_tasks_cout/self.instances_per_app | ||||
|         self.heath_check_interval = 30 | ||||
|         self.test_duration = 20 | ||||
|         self.marathon_cluster = MarathonClient(marathon_url, timeout=240) | ||||
|         self.work_queue = Queue() | ||||
|         self.result_queue = Queue() | ||||
|         self.app_list_queue = Queue() | ||||
|         self.action_list = [self.start_collect, | ||||
|                             'sleep={}'.format(self.test_duration), | ||||
|                             self.get_stats] | ||||
|  | ||||
|     def remove_apps(self): | ||||
|         apps = self.marathon_cluster.list_apps() | ||||
|         for app in apps: | ||||
|             if app.id.startswith("/"+self.app_base_name): | ||||
|                 self.marathon_cluster.delete_app(app.id) | ||||
|         active = 0 | ||||
|         while True: | ||||
|             apps = self.marathon_cluster.list_apps() | ||||
|             for app in apps: | ||||
|                 if app.id.startswith(self.app_base_name): | ||||
|                     active += 1 | ||||
|             if active == 0: | ||||
|                 break | ||||
|  | ||||
|     def create_app(self, id): | ||||
|         port_mapping = MarathonContainerPortMapping(container_port=80, | ||||
|                                                     protocol="tcp") | ||||
|         app_docker = MarathonDockerContainer( | ||||
|             image=self.docker_image, | ||||
|             network="BRIDGE", | ||||
|             force_pull_image=True, | ||||
|             port_mappings=[port_mapping]) | ||||
|         app_container = MarathonContainer(docker=app_docker) | ||||
|         http_health_check = MarathonHealthCheck( | ||||
|             protocol="HTTP", | ||||
|             path="/status", | ||||
|             grace_period_seconds=300, | ||||
|             interval_seconds=self.heath_check_interval, | ||||
|             timeout_seconds=20, | ||||
|             max_consecutive_failures=0 | ||||
|         ) | ||||
|  | ||||
|         app_suffix = str(md5(str(random())).hexdigest()) | ||||
|         app_name = self.app_base_name + app_suffix | ||||
|         new_app = MarathonApp(cpus=CPUS, mem=MEM, disk=DISK, | ||||
|                               container=app_container, | ||||
|                               health_checks=[http_health_check], | ||||
|                               instances=self.instances_per_app, | ||||
|                               max_launch_delay_seconds=5) | ||||
|         print("Creating {}".format(app_name)) | ||||
|         self.marathon_cluster.create_app(app_id=app_name, app=new_app) | ||||
|         self.app_list_queue.put(app_name) | ||||
|         return None | ||||
|  | ||||
|     def wait_instances(self, app_name): | ||||
|         health_ok = 0 | ||||
|         while health_ok < self.instances_per_app: | ||||
|             health_ok = 0 | ||||
|             tasks = self.marathon_cluster.list_tasks(app_name) | ||||
|             for task in tasks: | ||||
|                 if task.health_check_results: | ||||
|                     health_ok += 1 | ||||
|  | ||||
|     def start_collect(self, task): | ||||
|         url = 'http://'+task['host']+':'+str(task['port'])+'/start_collect' | ||||
|         res = urlopen(url) | ||||
|         if res.getcode() == 200: | ||||
|             print(task['id']+': collecter was started') | ||||
|         else: | ||||
|             print(task['id']+': failed to start collecter') | ||||
|  | ||||
|     def stop_collect(self, task): | ||||
|         url = 'http://'+task['host']+':'+str(task['port'])+'/stop_collect' | ||||
|         res = urlopen(url) | ||||
|         if res.getcode() == 200: | ||||
|             print(task['id']+': collecter was stopped') | ||||
|         else: | ||||
|             print(task['id']+': failed to stop collecter') | ||||
|  | ||||
|     def clear_stats(self, task): | ||||
|         url = 'http://'+task['host']+':'+str(task['port'])+'/clear_stats' | ||||
|         res = urlopen(url) | ||||
|         if res.getcode() == 200: | ||||
|             print(task['id']+': stats was dropped') | ||||
|         else: | ||||
|             print(task['id']+': stats was dropped') | ||||
|  | ||||
|     def get_stats(self, task): | ||||
|         url = 'http://'+task['host']+':'+str(task['port'])+'/get_timestamps' | ||||
|         try: | ||||
|             res = urlopen(url) | ||||
|         except Exception: | ||||
|             print("URL req failed") | ||||
|             self.result_queue.put({'id': task['id'], | ||||
|                                    'status': 'Failed', | ||||
|                                    'data': []}) | ||||
|             return | ||||
|         if res.getcode() == 200: | ||||
|             data = res.read() | ||||
|             timestamps = data.split(',') | ||||
|             self.result_queue.put({'id': task['id'], | ||||
|                                    'status': 'ok', | ||||
|                                    'data': timestamps}) | ||||
|         elif res.getcode() == 202: | ||||
|             print("Collecting is not enabled") | ||||
|             self.result_queue.put({'id': task['id'], | ||||
|                                    'status': 'Collecting is not enabled', | ||||
|                                    'data': []}) | ||||
|         else: | ||||
|             print("Unknown response code") | ||||
|             self.result_queue.put({'id': task['id'], | ||||
|                                    'status': 'Unknown response code', | ||||
|                                    'data': []}) | ||||
|  | ||||
|     def repeat(self, action): | ||||
|         while self.work_queue.empty() is False: | ||||
|             try: | ||||
|                 iteration = self.work_queue.get_nowait() | ||||
|             except Empty: | ||||
|                 continue | ||||
|             action(iteration) | ||||
|             self.work_queue.task_done() | ||||
|  | ||||
|     def fill_queue(self, iterations): | ||||
|         for iteration in iterations: | ||||
|             self.work_queue.put(iteration) | ||||
|  | ||||
|     def get_tasks(self): | ||||
|         res = [] | ||||
|         tasks = self.marathon_cluster.list_tasks() | ||||
|         for task in tasks: | ||||
|             if not task.id.startswith('health-check-test-'): | ||||
|                 continue | ||||
|             res.append({'id': str(task.id), | ||||
|                         'host': str(task.host), | ||||
|                         'port': str(task.ports[0])}) | ||||
|         return res | ||||
|  | ||||
|     def create_apps(self): | ||||
|         self.fill_queue(range(self.app_count)) | ||||
|         for thread_num in range(self.concurrency): | ||||
|             if self.work_queue.empty() is True: | ||||
|                 break | ||||
|             worker = Thread(target=self.repeat, args=(self.create_app,)) | ||||
|             worker.start() | ||||
|         self.work_queue.join() | ||||
|  | ||||
|         while self.app_list_queue.empty() is False: | ||||
|             try: | ||||
|                 app_name = self.app_list_queue.get_nowait() | ||||
|             except Empty: | ||||
|                 continue | ||||
|             self.work_queue.put(app_name) | ||||
|  | ||||
|         for thread_num in range(self.concurrency): | ||||
|             if self.work_queue.empty() is True: | ||||
|                 break | ||||
|             worker = Thread(target=self.repeat, args=(self.wait_instances,)) | ||||
|             worker.start() | ||||
|         self.work_queue.join() | ||||
|  | ||||
|     def start_test(self): | ||||
|         task_list = self.get_tasks() | ||||
|         for action in self.action_list: | ||||
|             if isinstance(action, basestring): | ||||
|                 if action.startswith('sleep='): | ||||
|                     amount = int(action.split('=')[1]) | ||||
|                     sleep(60*amount) | ||||
|                 continue | ||||
|             self.fill_queue(task_list) | ||||
|             for thread_num in range(self.concurrency): | ||||
|                 if self.work_queue.empty() is True: | ||||
|                     break | ||||
|                 worker = Thread(target=self.repeat, args=(action,)) | ||||
|                 worker.start() | ||||
|             self.work_queue.join() | ||||
|  | ||||
|     def generate_report(self): | ||||
|         today = datetime.today() | ||||
|         file_prefix = "{:%Y-%m-%d_%H_%M_%S-}".format(today) | ||||
|         file_name = (file_prefix + | ||||
|                      'health_check_result-' + | ||||
|                      str(self.total_tasks_cout) + | ||||
|                      'tasks.csv') | ||||
|  | ||||
|         f = open(file_name, "w") | ||||
|         f.write("Task ID,Health check timestamp") | ||||
|  | ||||
|         while self.result_queue.empty() is False: | ||||
|             try: | ||||
|                 result = self.result_queue.get_nowait() | ||||
|             except Empty: | ||||
|                 continue | ||||
|             for timestamp in result['data']: | ||||
|                 f.write("\n%s,%s" % (result['id'], timestamp)) | ||||
|  | ||||
|         f.close() | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     parser = ArgumentParser() | ||||
|     parser.add_argument("-m", "--marathon", | ||||
|                         help="Marathon URL, on example " | ||||
|                              "http://172.20.8.34:8080/virt-env-2/marathon", | ||||
|                         required=True) | ||||
|     parser.add_argument("-t", "--tasks", | ||||
|                         help="Total tasks count", | ||||
|                         required=True) | ||||
|     parser.add_argument("-i", "--image", | ||||
|                         help="Docker image path", | ||||
|                         required=True) | ||||
|     args = parser.parse_args() | ||||
|  | ||||
|     bencher = HealthCheckBencher(args.marathon, args.image, int(args.tasks)) | ||||
|  | ||||
|     bencher.create_apps() | ||||
|     bencher.start_test() | ||||
|     bencher.remove_apps() | ||||
|     bencher.generate_report() | ||||
							
								
								
									
										52
									
								
								scripts/marathon-health-check-testing/ParseRawResults.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										52
									
								
								scripts/marathon-health-check-testing/ParseRawResults.py
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,52 @@ | ||||
| #!/usr/bin/python | ||||
| from argparse import ArgumentParser | ||||
| import numpy as np | ||||
|  | ||||
|  | ||||
| def parse(file, percentile, interval): | ||||
|     data = {} | ||||
|     with open(file) as fp: | ||||
|         for line in fp: | ||||
|             record = line.rstrip().split(',') | ||||
|             try: | ||||
|                 timestamp = float(record[1]) | ||||
|                 if record[0] not in data: | ||||
|                     data[record[0]] = [] | ||||
|                 data[record[0]].append(timestamp) | ||||
|             except ValueError: | ||||
|                 continue | ||||
|  | ||||
|     deviations = [] | ||||
|     for task in data: | ||||
|         data[task].sort() | ||||
|         last_timestamp = 0 | ||||
|         for timestamp in data[task]: | ||||
|             if last_timestamp == 0: | ||||
|                 last_timestamp = timestamp | ||||
|                 continue | ||||
|             cur_interval = timestamp - last_timestamp | ||||
|             last_timestamp = timestamp | ||||
|             deviations.append(np.fabs(interval - cur_interval)) | ||||
|  | ||||
|     print("Total tasks: {}. Total health checks: {}".format(len(data.keys()), | ||||
|                                                             len(deviations))) | ||||
|     print("min: {}. max: {}, average: {}," | ||||
|           " percentile: {}".format(np.min(deviations), | ||||
|                                    np.max(deviations), | ||||
|                                    np.average(deviations), | ||||
|                                    np.percentile(deviations, percentile))) | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     parser = ArgumentParser() | ||||
|     parser.add_argument("-f", "--file", | ||||
|                         help="File to be parsed", | ||||
|                         required=True) | ||||
|     parser.add_argument("-i", "--interval", | ||||
|                         help="Configured health check interval(sec)", | ||||
|                         required=True) | ||||
|     parser.add_argument("-p", "--persentile", | ||||
|                         help="Percentile value [0-100]. Default 95", | ||||
|                         required=False, default=95.0) | ||||
|     args = parser.parse_args() | ||||
|  | ||||
|     parse(args.file, float(args.persentile), int(args.interval)) | ||||
							
								
								
									
										108
									
								
								scripts/marathon-health-check-testing/server.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										108
									
								
								scripts/marathon-health-check-testing/server.py
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,108 @@ | ||||
| #!/usr/bin/python | ||||
| from BaseHTTPServer import BaseHTTPRequestHandler | ||||
| import os | ||||
| import time | ||||
|  | ||||
| VERSION = 1.0 | ||||
|  | ||||
|  | ||||
| class ServerStatus(object): | ||||
|     collecting = 0 | ||||
|     status = 1 | ||||
|     last_timestamp = 0 | ||||
|     interval_arr = [] | ||||
|     timestamp_arr = [] | ||||
|  | ||||
|  | ||||
| class GetHandler(BaseHTTPRequestHandler): | ||||
|     def do_GET(self): | ||||
|         if self.path == '/start': | ||||
|             ServerStatus.status = 1 | ||||
|             self.send_response(200) | ||||
|             self.end_headers() | ||||
|         elif self.path == '/stop': | ||||
|             ServerStatus.status = 0 | ||||
|             self.send_response(200) | ||||
|             self.end_headers() | ||||
|         elif self.path == '/start_collect': | ||||
|             ServerStatus.collecting = 1 | ||||
|             self.send_response(200) | ||||
|             self.end_headers() | ||||
|         elif self.path == '/stop_collect': | ||||
|             ServerStatus.collecting = 0 | ||||
|             self.send_response(200) | ||||
|             self.end_headers() | ||||
|         elif self.path == '/get_intervals': | ||||
|             self.send_response(200) | ||||
|             self.send_header("Context-Type", "text/plain") | ||||
|             self.end_headers() | ||||
|             tmp_str = ','.join(str(x) for x in ServerStatus.interval_arr) | ||||
|             self.wfile.write(tmp_str) | ||||
|             self.wfile.close() | ||||
|         elif self.path == '/version': | ||||
|             self.send_response(200) | ||||
|             self.send_header("Context-Type", "text/plain") | ||||
|             self.end_headers() | ||||
|             self.wfile.write(VERSION) | ||||
|             self.wfile.close() | ||||
|         elif self.path == '/get_stats': | ||||
|             if ServerStatus.collecting == 0: | ||||
|                 self.send_response(202) | ||||
|             else: | ||||
|                 self.send_response(200) | ||||
|             self.send_header("Context-Type", "text/plain") | ||||
|             self.end_headers() | ||||
|             timestamp_str = ','.join(str(x) | ||||
|                                      for x in ServerStatus.timestamp_arr) | ||||
|             intervals_str = ','.join(str(x) for x in ServerStatus.interval_arr) | ||||
|             self.wfile.write(timestamp_str+'\n'+intervals_str) | ||||
|             self.wfile.close() | ||||
|         elif self.path == '/get_timestamps': | ||||
|             if ServerStatus.collecting == 0: | ||||
|                 self.send_response(202) | ||||
|             else: | ||||
|                 self.send_response(200) | ||||
|             self.send_header("Context-Type", "text/plain") | ||||
|             self.end_headers() | ||||
|             tmp_str = ','.join(str(x) for x in ServerStatus.timestamp_arr) | ||||
|             self.wfile.write(tmp_str) | ||||
|             self.wfile.close() | ||||
|         elif self.path == '/is_collecter_start': | ||||
|             self.send_response(200) | ||||
|             self.send_header("Context-Type", "text/plain") | ||||
|             self.end_headers() | ||||
|             if ServerStatus.collecting == 1: | ||||
|                 self.wfile.write("Yes") | ||||
|             else: | ||||
|                 self.wfile.write("No") | ||||
|             self.wfile.close() | ||||
|         elif self.path == '/clear_stats': | ||||
|             ServerStatus.interval_arr = [] | ||||
|             ServerStatus.timestamp_arr = [] | ||||
|             ServerStatus.last_timestamp = 0 | ||||
|             self.send_response(200) | ||||
|             self.end_headers() | ||||
|         elif self.path == '/status': | ||||
|             if ServerStatus.collecting == 1: | ||||
|                 if ServerStatus.last_timestamp == 0: | ||||
|                     ServerStatus.last_timestamp = time.time() | ||||
|                 else: | ||||
|                     current_time = time.time() | ||||
|                     interval = round(current_time - | ||||
|                                      ServerStatus.last_timestamp, 3) | ||||
|                     ServerStatus.interval_arr.append(interval) | ||||
|                     ServerStatus.timestamp_arr.append(round(current_time, 3)) | ||||
|                     ServerStatus.last_timestamp = current_time | ||||
|             if ServerStatus.status == 0: | ||||
|                 self.send_response(503) | ||||
|             else: | ||||
|                 self.send_response(200) | ||||
|             self.end_headers() | ||||
|  | ||||
|         return | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     port = int(os.getenv('SERVER_PORT', '80')) | ||||
|     from BaseHTTPServer import HTTPServer | ||||
|     server = HTTPServer(('0.0.0.0', port), GetHandler) | ||||
|     server.serve_forever() | ||||
		Reference in New Issue
	
	Block a user
	 David Burnazyan
					David Burnazyan