From 87b57578bfde91120358b3eec1f9c15c3f6f9d4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Ollivier?= Date: Tue, 8 Oct 2019 21:26:52 +0200 Subject: [PATCH] Update code to python3 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 2to3 was selected to update to python3 [1]. Functest is offering a new testcase based on kloudbuster[2]. [1] https://pythonclock.org/ [2] https://gerrit.opnfv.org/gerrit/c/functest/+/68560 Change-Id: Ia1b07295a3685dae0d2dedfce53d78f3ff2c7aac Signed-off-by: Cédric Ollivier --- kloudbuster/base_compute.py | 2 +- kloudbuster/base_network.py | 10 +- kloudbuster/base_storage.py | 2 +- kloudbuster/credentials.py | 2 +- kloudbuster/fio_tool.py | 4 +- kloudbuster/force_cleanup.py | 132 +++--- kloudbuster/kb_config.py | 6 +- kloudbuster/kb_res_logger.py | 2 +- kloudbuster/kb_runner_base.py | 14 +- kloudbuster/kb_runner_http.py | 26 +- kloudbuster/kb_runner_multicast.py | 22 +- kloudbuster/kb_runner_storage.py | 24 +- kloudbuster/kb_scheduler.py | 2 +- kloudbuster/kb_vm_agent.py | 652 ++++++++++++++++++++++++++++- kloudbuster/kloudbuster.py | 42 +- kloudbuster/nuttcp_tool.py | 2 +- kloudbuster/perf_instance.py | 8 +- kloudbuster/perf_tool.py | 6 +- kloudbuster/prometheus.py | 2 +- kloudbuster/start_server.py | 12 +- kloudbuster/tenant.py | 12 +- kloudbuster/users.py | 6 +- kloudbuster/wrk_tool.py | 6 +- 23 files changed, 821 insertions(+), 175 deletions(-) mode change 120000 => 100644 kloudbuster/kb_vm_agent.py diff --git a/kloudbuster/base_compute.py b/kloudbuster/base_compute.py index 38d6297..8beac8f 100644 --- a/kloudbuster/base_compute.py +++ b/kloudbuster/base_compute.py @@ -15,7 +15,7 @@ import os import time -import log as logging +from . import log as logging from novaclient.exceptions import BadRequest LOG = logging.getLogger(__name__) diff --git a/kloudbuster/base_network.py b/kloudbuster/base_network.py index 73b3c38..676b157 100644 --- a/kloudbuster/base_network.py +++ b/kloudbuster/base_network.py @@ -14,11 +14,11 @@ import time -from perf_instance import PerfInstance +from .perf_instance import PerfInstance -import base_compute -import base_storage -import log as logging +from . import base_compute +from . import base_storage +from . import log as logging import netaddr from neutronclient.common.exceptions import NetworkInUseClient @@ -177,7 +177,7 @@ class BaseNetwork(object): vol_size = 0 # Schedule to create the required number of VMs - for instance_count in xrange(vm_total): + for instance_count in range(vm_total): vm_name = network_prefix + "-I" + str(instance_count) perf_instance = PerfInstance(vm_name, self, config_scale) self.instance_list.append(perf_instance) diff --git a/kloudbuster/base_storage.py b/kloudbuster/base_storage.py index 58eaa77..061a898 100644 --- a/kloudbuster/base_storage.py +++ b/kloudbuster/base_storage.py @@ -14,7 +14,7 @@ import time -import log as logging +from . import log as logging LOG = logging.getLogger(__name__) diff --git a/kloudbuster/credentials.py b/kloudbuster/credentials.py index aca0349..0bdbec2 100644 --- a/kloudbuster/credentials.py +++ b/kloudbuster/credentials.py @@ -21,7 +21,7 @@ from keystoneauth1 import session import os import re -import log as logging +from . import log as logging LOG = logging.getLogger(__name__) diff --git a/kloudbuster/fio_tool.py b/kloudbuster/fio_tool.py index d0bffc5..3f2a7e9 100644 --- a/kloudbuster/fio_tool.py +++ b/kloudbuster/fio_tool.py @@ -15,7 +15,7 @@ import json -from perf_tool import PerfTool +from .perf_tool import PerfTool from hdrh.histogram import HdrHistogram @@ -99,7 +99,7 @@ class FioTool(PerfTool): histogram.decode_and_add(item['results'][clat]) latency_dict = histogram.get_percentile_to_value_dict(perc_list) - for key, value in latency_dict.iteritems(): + for key, value in latency_dict.items(): all_res[clat].append([key, value]) all_res[clat].sort() diff --git a/kloudbuster/force_cleanup.py b/kloudbuster/force_cleanup.py index e473c17..d77aa4f 100755 --- a/kloudbuster/force_cleanup.py +++ b/kloudbuster/force_cleanup.py @@ -63,14 +63,14 @@ from novaclient.exceptions import NotFound from tabulate import tabulate # kloudbuster base code -import credentials +from . import credentials resource_name_re = None def prompt_to_run(): - print "Warning: You didn't specify a resource list file as the input. "\ - "The script will delete all resources shown above." - answer = raw_input("Are you sure? (y/n) ") + print("Warning: You didn't specify a resource list file as the input. "\ + "The script will delete all resources shown above.") + answer = input("Are you sure? (y/n) ") if answer.lower() != 'y': sys.exit(0) @@ -83,7 +83,7 @@ def fetch_resources(fetcher, options=None): except Exception as e: res_list = [] traceback.print_exc() - print "Warning exception while listing resources:" + str(e) + print("Warning exception while listing resources:" + str(e)) resources = {} for res in res_list: # some objects provide direct access some @@ -98,16 +98,14 @@ def fetch_resources(fetcher, options=None): resources[resid] = resname return resources -class AbstractCleaner(object): - __metaclass__ = ABCMeta - +class AbstractCleaner(object, metaclass=ABCMeta): def __init__(self, res_category, res_desc, resources, dryrun): self.dryrun = dryrun self.category = res_category self.resources = {} if not resources: - print 'Discovering %s resources...' % (res_category) - for rtype, fetch_args in res_desc.iteritems(): + print('Discovering %s resources...' % (res_category)) + for rtype, fetch_args in res_desc.items(): if resources: if rtype in resources: self.resources[rtype] = resources[rtype] @@ -116,20 +114,20 @@ class AbstractCleaner(object): def report_deletion(self, rtype, name): if self.dryrun: - print ' + ' + rtype + ' ' + name + ' should be deleted (but is not deleted: dry run)' + print(' + ' + rtype + ' ' + name + ' should be deleted (but is not deleted: dry run)') else: - print ' + ' + rtype + ' ' + name + ' is successfully deleted' + print(' + ' + rtype + ' ' + name + ' is successfully deleted') def report_not_found(self, rtype, name): - print ' ? ' + rtype + ' ' + name + ' not found (already deleted?)' + print(' ? ' + rtype + ' ' + name + ' not found (already deleted?)') def report_error(self, rtype, name, reason): - print ' - ' + rtype + ' ' + name + ' ERROR:' + reason + print(' - ' + rtype + ' ' + name + ' ERROR:' + reason) def get_resource_list(self): result = [] - for rtype, rdict in self.resources.iteritems(): - for resid, resname in rdict.iteritems(): + for rtype, rdict in self.resources.items(): + for resid, resname in rdict.items(): result.append([rtype, resname, resid]) return result @@ -149,11 +147,11 @@ class StorageCleaner(AbstractCleaner): super(StorageCleaner, self).__init__('Storage', res_desc, resources, dryrun) def clean(self): - print '*** STORAGE cleanup' + print('*** STORAGE cleanup') try: kb_volumes = [] kb_detaching_volumes = [] - for id, name in self.resources['volumes'].iteritems(): + for id, name in self.resources['volumes'].items(): try: vol = self.cinder.volumes.get(id) if vol.attachments: @@ -162,15 +160,15 @@ class StorageCleaner(AbstractCleaner): if not self.dryrun: ins_id = vol.attachments[0]['server_id'] self.nova.volumes.delete_server_volume(ins_id, id) - print ' . VOLUME ' + vol.name + ' detaching...' + print(' . VOLUME ' + vol.name + ' detaching...') else: - print ' . VOLUME ' + vol.name + ' to be detached...' + print(' . VOLUME ' + vol.name + ' to be detached...') kb_detaching_volumes.append(vol) except NotFound: - print 'WARNING: Volume %s attached to an instance that no longer '\ - 'exists (will require manual cleanup of the database)' % (id) + print('WARNING: Volume %s attached to an instance that no longer '\ + 'exists (will require manual cleanup of the database)' % (id)) except Exception as e: - print str(e) + print(str(e)) else: # no attachments kb_volumes.append(vol) @@ -180,8 +178,8 @@ class StorageCleaner(AbstractCleaner): # check that the volumes are no longer attached if kb_detaching_volumes: if not self.dryrun: - print ' . Waiting for %d volumes to be fully detached...' % \ - (len(kb_detaching_volumes)) + print(' . Waiting for %d volumes to be fully detached...' % \ + (len(kb_detaching_volumes))) retry_count = 5 + len(kb_detaching_volumes) while True: retry_count -= 1 @@ -190,19 +188,19 @@ class StorageCleaner(AbstractCleaner): latest_vol = self.cinder.volumes.get(kb_detaching_volumes[0].id) if self.dryrun or not latest_vol.attachments: if not self.dryrun: - print ' + VOLUME ' + vol.name + ' detach complete' + print(' + VOLUME ' + vol.name + ' detach complete') kb_detaching_volumes.remove(vol) kb_volumes.append(vol) if kb_detaching_volumes and not self.dryrun: if retry_count: - print ' . VOLUME %d left to be detached, retries left=%d...' % \ - (len(kb_detaching_volumes), retry_count) + print(' . VOLUME %d left to be detached, retries left=%d...' % \ + (len(kb_detaching_volumes), retry_count)) time.sleep(2) else: - print ' - VOLUME detach timeout, %d volumes left:' % \ - (len(kb_detaching_volumes)) + print(' - VOLUME detach timeout, %d volumes left:' % \ + (len(kb_detaching_volumes))) for vol in kb_detaching_volumes: - print ' ', vol.name, vol.status, vol.id, vol.attachments + print(' ', vol.name, vol.status, vol.id, vol.attachments) break else: break @@ -213,7 +211,7 @@ class StorageCleaner(AbstractCleaner): try: vol.force_delete() except cinderclient.exceptions.BadRequest as exc: - print str(exc) + print(str(exc)) self.report_deletion('VOLUME', vol.name) except KeyError: pass @@ -232,15 +230,15 @@ class ComputeCleaner(AbstractCleaner): super(ComputeCleaner, self).__init__('Compute', res_desc, resources, dryrun) def clean(self): - print '*** COMPUTE cleanup' + print('*** COMPUTE cleanup') try: # Get a list of floating IPs fip_lst = self.neutron_client.list_floatingips()['floatingips'] deleting_instances = self.resources['instances'] - for id, name in self.resources['instances'].iteritems(): + for id, name in self.resources['instances'].items(): try: - if self.nova_client.servers.get(id).addresses.values(): - ins_addr = self.nova_client.servers.get(id).addresses.values()[0] + if list(self.nova_client.servers.get(id).addresses.values()): + ins_addr = list(self.nova_client.servers.get(id).addresses.values())[0] fips = [x['addr'] for x in ins_addr if x['OS-EXT-IPS:type'] == 'floating'] else: fips = [] @@ -260,12 +258,12 @@ class ComputeCleaner(AbstractCleaner): self.report_not_found('INSTANCE', name) if not self.dryrun and len(deleting_instances): - print ' . Waiting for %d instances to be fully deleted...' % \ - (len(deleting_instances)) + print(' . Waiting for %d instances to be fully deleted...' % \ + (len(deleting_instances))) retry_count = 5 + len(deleting_instances) while True: retry_count -= 1 - for ins_id in deleting_instances.keys(): + for ins_id in list(deleting_instances.keys()): try: self.nova_client.servers.get(ins_id) except NotFound: @@ -276,25 +274,25 @@ class ComputeCleaner(AbstractCleaner): break if retry_count: - print ' . INSTANCE %d left to be deleted, retries left=%d...' % \ - (len(deleting_instances), retry_count) + print(' . INSTANCE %d left to be deleted, retries left=%d...' % \ + (len(deleting_instances), retry_count)) time.sleep(2) else: - print ' - INSTANCE deletion timeout, %d instances left:' % \ - (len(deleting_instances)) - for ins_id in deleting_instances.keys(): + print(' - INSTANCE deletion timeout, %d instances left:' % \ + (len(deleting_instances))) + for ins_id in list(deleting_instances.keys()): try: ins = self.nova_client.servers.get(ins_id) - print ' ', ins.name, ins.status, ins.id + print(' ', ins.name, ins.status, ins.id) except NotFound: - print(' ', deleting_instances[ins_id], - '(just deleted)', ins_id) + print((' ', deleting_instances[ins_id], + '(just deleted)', ins_id)) break except KeyError: pass try: - for id, name in self.resources['flavors'].iteritems(): + for id, name in self.resources['flavors'].items(): try: flavor = self.nova_client.flavors.find(name=name) if not self.dryrun: @@ -306,7 +304,7 @@ class ComputeCleaner(AbstractCleaner): pass try: - for id, name in self.resources['keypairs'].iteritems(): + for id, name in self.resources['keypairs'].items(): try: if self.dryrun: self.nova_client.keypairs.get(name) @@ -357,10 +355,10 @@ class NetworkCleaner(AbstractCleaner): pass def clean(self): - print '*** NETWORK cleanup' + print('*** NETWORK cleanup') try: - for id, name in self.resources['sec_groups'].iteritems(): + for id, name in self.resources['sec_groups'].items(): try: if self.dryrun: self.neutron.show_security_group(id) @@ -373,7 +371,7 @@ class NetworkCleaner(AbstractCleaner): pass try: - for id, name in self.resources['floating_ips'].iteritems(): + for id, name in self.resources['floating_ips'].items(): try: if self.dryrun: self.neutron.show_floatingip(id) @@ -386,7 +384,7 @@ class NetworkCleaner(AbstractCleaner): pass try: - for id, name in self.resources['routers'].iteritems(): + for id, name in self.resources['routers'].items(): try: if self.dryrun: self.neutron.show_router(id) @@ -412,7 +410,7 @@ class NetworkCleaner(AbstractCleaner): except KeyError: pass try: - for id, name in self.resources['networks'].iteritems(): + for id, name in self.resources['networks'].items(): try: if self.dryrun: self.neutron.show_network(id) @@ -439,9 +437,9 @@ class KeystoneCleaner(AbstractCleaner): super(KeystoneCleaner, self).__init__('Keystone', res_desc, resources, dryrun) def clean(self): - print '*** KEYSTONE cleanup' + print('*** KEYSTONE cleanup') try: - for id, name in self.resources['users'].iteritems(): + for id, name in self.resources['users'].items(): try: if self.dryrun: self.keystone.users.get(id) @@ -454,7 +452,7 @@ class KeystoneCleaner(AbstractCleaner): pass try: - for id, name in self.resources['tenants'].iteritems(): + for id, name in self.resources['tenants'].items(): try: if self.dryrun: self.tenant_api.get(id) @@ -479,13 +477,13 @@ class KbCleaners(object): for cleaner in self.cleaners: table.extend(cleaner.get_resource_list()) count = len(table) - 1 - print + print() if count: - print 'SELECTED RESOURCES:' - print tabulate(table, headers="firstrow", tablefmt="psql") + print('SELECTED RESOURCES:') + print(tabulate(table, headers="firstrow", tablefmt="psql")) else: - print 'There are no resources to delete.' - print + print('There are no resources to delete.') + print() return count def clean(self): @@ -511,7 +509,7 @@ def get_resources_from_cleanup_log(logfile): if not resid: # normally only the keypairs have no ID if restype != "keypairs": - print 'Error: resource type %s has no ID - ignored!!!' % (restype) + print('Error: resource type %s has no ID - ignored!!!' % (restype)) else: resid = '0' if restype not in resources: @@ -556,8 +554,8 @@ def main(): try: resource_name_re = re.compile(opts.filter) except Exception as exc: - print 'Provided filter is not a valid python regular expression: ' + opts.filter - print str(exc) + print('Provided filter is not a valid python regular expression: ' + opts.filter) + print(str(exc)) sys.exit(1) else: resource_name_re = re.compile('KB') @@ -566,9 +564,9 @@ def main(): cleaners = KbCleaners(cred, resources, opts.dryrun) if opts.dryrun: - print + print() print('!!! DRY RUN - RESOURCES WILL BE CHECKED BUT WILL NOT BE DELETED !!!') - print + print() # Display resources to be deleted count = cleaners.show_resources() diff --git a/kloudbuster/kb_config.py b/kloudbuster/kb_config.py index 736eed2..8061310 100644 --- a/kloudbuster/kb_config.py +++ b/kloudbuster/kb_config.py @@ -16,13 +16,13 @@ import os import sys import yaml -from __init__ import __version__ +from .__init__ import __version__ from attrdict import AttrDict -import log as logging +from . import log as logging from oslo_config import cfg from pkg_resources import resource_string -import credentials +from . import credentials CONF = cfg.CONF LOG = logging.getLogger(__name__) diff --git a/kloudbuster/kb_res_logger.py b/kloudbuster/kb_res_logger.py index 4221758..581b823 100644 --- a/kloudbuster/kb_res_logger.py +++ b/kloudbuster/kb_res_logger.py @@ -12,7 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. -import log as logging +from . import log as logging from time import gmtime from time import strftime diff --git a/kloudbuster/kb_runner_base.py b/kloudbuster/kb_runner_base.py index 3bb484d..ae2225a 100644 --- a/kloudbuster/kb_runner_base.py +++ b/kloudbuster/kb_runner_base.py @@ -12,11 +12,11 @@ # License for the specific language governing permissions and limitations # under the License. -from __future__ import division + import abc from collections import deque import json -import log as logging +from . import log as logging import redis import sys import threading @@ -42,7 +42,7 @@ class KBRunner(object): """ def __init__(self, client_list, config, single_cloud=True): - self.full_client_dict = dict(zip([x.vm_name for x in client_list], client_list)) + self.full_client_dict = dict(list(zip([x.vm_name for x in client_list], client_list))) self.client_dict = self.full_client_dict self.config = config self.single_cloud = single_cloud @@ -77,7 +77,7 @@ class KBRunner(object): success = False retry_count = max(timeout // self.config.polling_interval, 1) # Check for connections to redis server - for retry in xrange(retry_count): + for retry in range(retry_count): try: self.redis_obj.get("test") success = True @@ -125,7 +125,7 @@ class KBRunner(object): retry = cnt_succ = cnt_failed = 0 clist = self.client_dict.copy() samples = [] - perf_tool = self.client_dict.values()[0].perf_tool + perf_tool = list(self.client_dict.values())[0].perf_tool while (retry < retry_count and len(clist)): time.sleep(polling_interval) @@ -207,13 +207,13 @@ class KBRunner(object): def gen_host_stats(self): self.host_stats = {} - for vm in self.result.keys(): + for vm in list(self.result.keys()): phy_host = self.client_dict[vm].host if phy_host not in self.host_stats: self.host_stats[phy_host] = [] self.host_stats[phy_host].append(self.result[vm]) - perf_tool = self.client_dict.values()[0].perf_tool + perf_tool = list(self.client_dict.values())[0].perf_tool for phy_host in self.host_stats: self.host_stats[phy_host] = perf_tool.consolidate_results(self.host_stats[phy_host]) diff --git a/kloudbuster/kb_runner_http.py b/kloudbuster/kb_runner_http.py index 0c2130e..520c540 100644 --- a/kloudbuster/kb_runner_http.py +++ b/kloudbuster/kb_runner_http.py @@ -12,11 +12,11 @@ # License for the specific language governing permissions and limitations # under the License. -from __future__ import division -from kb_runner_base import KBException -from kb_runner_base import KBRunner -import log as logging + +from .kb_runner_base import KBException +from .kb_runner_base import KBRunner +from . import log as logging LOG = logging.getLogger(__name__) @@ -70,7 +70,7 @@ class KBRunner_HTTP(KBRunner): "summary shown below may not be accurate!") # Parse the results from HTTP benchmarking tool - for key, instance in self.client_dict.items(): + for key, instance in list(self.client_dict.items()): self.result[key] = instance.perf_client_parser(**self.result[key]) def single_run(self, active_range=None, http_test_only=False): @@ -84,8 +84,8 @@ class KBRunner_HTTP(KBRunner): self.check_http_service(active_range) if self.config.prompt_before_run: - print "Press enter to start running benchmarking tools..." - raw_input() + print("Press enter to start running benchmarking tools...") + input() LOG.info("Running HTTP Benchmarking...") self.report = {'seq': 0, 'report': None} @@ -93,9 +93,9 @@ class KBRunner_HTTP(KBRunner): self.run_http_test(active_range) # Call the method in corresponding tools to consolidate results - perf_tool = self.client_dict.values()[0].perf_tool - LOG.kbdebug(self.result.values()) - self.tool_result = perf_tool.consolidate_results(self.result.values()) + perf_tool = list(self.client_dict.values())[0].perf_tool + LOG.kbdebug(list(self.result.values())) + self.tool_result = perf_tool.consolidate_results(list(self.result.values())) self.tool_result['http_rate_limit'] =\ len(self.client_dict) * self.config.http_tool_configs.rate_limit self.tool_result['total_connections'] =\ @@ -120,7 +120,7 @@ class KBRunner_HTTP(KBRunner): multiple = self.config.progression.vm_multiple limit = self.config.progression.http_stop_limit timeout = self.config.http_tool_configs.timeout - vm_list = self.full_client_dict.keys() + vm_list = list(self.full_client_dict.keys()) vm_list.sort(cmp=lambda x, y: cmp(int(x[x.rfind('I') + 1:]), int(y[y.rfind('I') + 1:]))) self.client_dict = {} cur_stage = 1 @@ -137,7 +137,7 @@ class KBRunner_HTTP(KBRunner): if self.tool_result and 'latency_stats' in self.tool_result: err = self.tool_result['http_sock_err'] + self.tool_result['http_sock_timeout'] pert_dict = dict(self.tool_result['latency_stats']) - if limit[1] in pert_dict.keys(): + if limit[1] in list(pert_dict.keys()): timeout_at_percentile = pert_dict[limit[1]] // 1000000 elif limit[1] != 0: LOG.warning('Percentile %s%% is not a standard statistic point.' % limit[1]) @@ -146,7 +146,7 @@ class KBRunner_HTTP(KBRunner): 'reaches the stop limit.') break - for idx in xrange(cur_vm_count, target_vm_count): + for idx in range(cur_vm_count, target_vm_count): self.client_dict[vm_list[idx]] = self.full_client_dict[vm_list[idx]] description = "-- %s --" % self.header_formatter(cur_stage, len(self.client_dict)) LOG.info(description) diff --git a/kloudbuster/kb_runner_multicast.py b/kloudbuster/kb_runner_multicast.py index 591c6ed..fa8644a 100644 --- a/kloudbuster/kb_runner_multicast.py +++ b/kloudbuster/kb_runner_multicast.py @@ -12,11 +12,11 @@ # License for the specific language governing permissions and limitations # under the License. -from __future__ import division -from kb_runner_base import KBException -from kb_runner_base import KBRunner -import log as logging + +from .kb_runner_base import KBException +from .kb_runner_base import KBRunner +from . import log as logging LOG = logging.getLogger(__name__) class KBMulticastServerUpException(KBException): @@ -61,12 +61,12 @@ class KBRunner_Multicast(KBRunner): @staticmethod def json_to_csv(jsn): csv = "Test,receivers,addresses,ports,bitrate,pkt_size," - firstKey = [x for x in jsn.keys()][0] - keys = jsn[firstKey].keys() + firstKey = [x for x in list(jsn.keys())][0] + keys = list(jsn[firstKey].keys()) csv += ",".join(keys) + "\r\n" - for obj_k in jsn.keys(): + for obj_k in list(jsn.keys()): obj = jsn[obj_k] - obj_vals = map(str, obj.values()) + obj_vals = list(map(str, list(obj.values()))) csv += '"' + obj_k + '"' + "," + obj_k + "," + ",".join(obj_vals) + "\r\n" return csv @@ -81,8 +81,8 @@ class KBRunner_Multicast(KBRunner): self.check_multicast_service(active_range) if self.config.prompt_before_run: - print "Press enter to start running benchmarking tools..." - raw_input() + print("Press enter to start running benchmarking tools...") + input() LOG.info("Running Multicast Benchmarking...") self.report = {'seq': 0, 'report': None} @@ -101,7 +101,7 @@ class KBRunner_Multicast(KBRunner): def run(self, test_only=False, run_label=None): self.tool_result = {} - vm_list = self.full_client_dict.keys() + vm_list = list(self.full_client_dict.keys()) vm_list.sort(cmp=lambda x, y: cmp(int(x[x.rfind('I') + 1:]), int(y[y.rfind('I') + 1:]))) self.client_dict = {} cur_stage = 1 diff --git a/kloudbuster/kb_runner_storage.py b/kloudbuster/kb_runner_storage.py index 3a7363e..6d25f4e 100644 --- a/kloudbuster/kb_runner_storage.py +++ b/kloudbuster/kb_runner_storage.py @@ -12,11 +12,11 @@ # License for the specific language governing permissions and limitations # under the License. -from __future__ import division -from kb_runner_base import KBException -from kb_runner_base import KBRunner -import log as logging + +from .kb_runner_base import KBException +from .kb_runner_base import KBRunner +from . import log as logging LOG = logging.getLogger(__name__) @@ -100,7 +100,7 @@ class KBRunner_Storage(KBRunner): "summary shown will be partial!" % (cnt_pending, timeout)) else: # Parse the results from storage benchmarking tool - for key, instance in self.client_dict.items(): + for key, instance in list(self.client_dict.items()): self.result[key] = instance.perf_client_parser(**self.result[key]) return cnt_pending @@ -114,11 +114,11 @@ class KBRunner_Storage(KBRunner): self.init_volume(active_range) if self.config.prompt_before_run: - print "Press enter to start running benchmarking tools..." - raw_input() + print("Press enter to start running benchmarking tools...") + input() test_count = len(self.config.storage_tool_configs) - perf_tool = self.client_dict.values()[0].perf_tool + perf_tool = list(self.client_dict.values())[0].perf_tool self.tool_result = [] vm_count = active_range[1] - active_range[0] + 1\ if active_range else len(self.full_client_dict) @@ -130,9 +130,9 @@ class KBRunner_Storage(KBRunner): timeout_vms = self.run_storage_test(active_range, dict(cur_config)) # Call the method in corresponding tools to consolidate results - LOG.kbdebug(self.result.values()) + LOG.kbdebug(list(self.result.values())) - tc_result = perf_tool.consolidate_results(self.result.values()) + tc_result = perf_tool.consolidate_results(list(self.result.values())) tc_result['description'] = cur_config['description'] tc_result['mode'] = cur_config['mode'] tc_result['block_size'] = cur_config['block_size'] @@ -168,7 +168,7 @@ class KBRunner_Storage(KBRunner): start = self.config.progression.vm_start multiple = self.config.progression.vm_multiple limit = self.config.progression.storage_stop_limit - vm_list = self.full_client_dict.keys() + vm_list = list(self.full_client_dict.keys()) vm_list.sort(cmp=lambda x, y: cmp(int(x[x.rfind('I') + 1:]), int(y[y.rfind('I') + 1:]))) self.client_dict = {} cur_stage = 1 @@ -183,7 +183,7 @@ class KBRunner_Storage(KBRunner): if target_vm_count > len(self.full_client_dict): break - for idx in xrange(cur_vm_count, target_vm_count): + for idx in range(cur_vm_count, target_vm_count): self.client_dict[vm_list[idx]] = self.full_client_dict[vm_list[idx]] description = "-- %s --" % self.header_formatter(cur_stage, len(self.client_dict)) diff --git a/kloudbuster/kb_scheduler.py b/kloudbuster/kb_scheduler.py index 2eabe34..b4bae30 100644 --- a/kloudbuster/kb_scheduler.py +++ b/kloudbuster/kb_scheduler.py @@ -12,7 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. -import log as logging +from . import log as logging LOG = logging.getLogger(__name__) diff --git a/kloudbuster/kb_vm_agent.py b/kloudbuster/kb_vm_agent.py deleted file mode 120000 index 78c7285..0000000 --- a/kloudbuster/kb_vm_agent.py +++ /dev/null @@ -1 +0,0 @@ -../kb_dib/elements/kloudbuster/static/kb_test/kb_vm_agent.py \ No newline at end of file diff --git a/kloudbuster/kb_vm_agent.py b/kloudbuster/kb_vm_agent.py new file mode 100644 index 0000000..3313d17 --- /dev/null +++ b/kloudbuster/kb_vm_agent.py @@ -0,0 +1,651 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from hdrh.histogram import HdrHistogram +import json +import multiprocessing +import redis +import socket +import struct +import subprocess +from subprocess import Popen +import sys +import syslog +import threading +import time +import traceback + +# Define the version of the KloudBuster agent and VM image +# +# When VM is up running, the agent will send the READY message to the +# KloudBuster main program, along with its version. +# +# This version is no longer checked starting from release 7 +# and can be left constant moving forward. +__version__ = '7' + +# TODO(Logging on Agent) + +def exec_command(cmd, cwd=None): + p = subprocess.Popen(cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = p.communicate() + if p.returncode: + syslog.syslog("Command failed: " + ' '.join(cmd)) + if stderr: + syslog.syslog(stderr) + return p.returncode + +def refresh_clock(clocks, force_sync=False): + step = " " + if force_sync: + step = " -b " + command = "sudo ntpdate" + step + clocks + exec_command(command.split(" ")) + +class KB_Instance(object): + + # Check whether the HTTP Service is up running + @staticmethod + def check_http_service(target_url): + cmd = 'while true; do\n' + cmd += 'curl --head %s --connect-timeout 2 --silent\n' % (target_url) + cmd += 'if [ $? -eq 0 ]; then break; fi\n' + cmd += 'done' + return cmd + + # Add static route + @staticmethod + def add_static_route(network, next_hop_ip, if_name=None): + debug_msg = "Adding static route %s with next hop %s" % (network, next_hop_ip) + cmd = "sudo ip route add %s via %s" % (network, next_hop_ip) + if if_name: + debug_msg += " and %s" % if_name + cmd += " dev %s" % if_name + print(debug_msg) + return cmd + + @staticmethod + def add_multicast_route(): + cmd = "sudo route add -net 224.0.0.0/8 dev eth0" + return exec_command(cmd.split(" ")) + + # Get static route + @staticmethod + def get_static_route(network, next_hop_ip=None, if_name=None): + cmd = "ip route show %s" % network + if next_hop_ip: + cmd += " via %s" % next_hop_ip + if if_name: + cmd += " dev %s" % if_name + return cmd + + # Delete static route + @staticmethod + def delete_static_route(network, next_hop_ip=None, if_name=None): + debug_msg = "Deleting static route %s" % network + cmd = "sudo ip route del %s" % network + if next_hop_ip: + debug_msg = " with next hop %s" % next_hop_ip + cmd += " via %s" % next_hop_ip + if if_name: + if next_hop_ip: + debug_msg = " and %s" % if_name + else: + debug_msg = "with next hop %s" % if_name + cmd += " dev %s" % if_name + print(debug_msg) + return cmd + + # Run the HTTP benchmarking tool + @staticmethod + def run_wrk2(dest_path, target_url, threads, connections, + rate_limit, duration, timeout, connection_type, + report_interval): + if not rate_limit: + rate_limit = 65535 + + cmd = '%s -t%d -c%d -R%d -d%ds -p%ds --timeout %ds -D2 -e %s' % \ + (dest_path, threads, connections, rate_limit, duration, + report_interval, timeout, target_url) + return cmd + + # Run the multicast benchmarking tool + # -p$1:$2 => $1 is the local port, $2 is the multicast port + # SOURCE PORT should be the same for any one address that you want to + @staticmethod + def run_nuttcp(dest_path, target_url, target_port, multicast_addr, multicast_port, bitrate=100, + pktsize=1316, report_interval=4, transmit_receive='t'): + + if bitrate < 1: + bitrate = str(bitrate * 1000) + "k" + else: + bitrate = str(bitrate) + "m" + + cmd = '%s -%s -fxmitstats -j -o -fparse -m30 -Ri%s/%s -l%i -T%i -i1 -g%s -p%d -P%d %s' % \ + (dest_path, transmit_receive, bitrate, bitrate, pktsize, report_interval, + multicast_addr, multicast_port, target_port, target_url) + return cmd + + # Init volume + @staticmethod + def init_volume(dest_path, size, mkfs): + cmd = 'if [ ! -e /kb_mnt ]; then\n' + cmd += 'mkfs.xfs /dev/vdb && ' if mkfs else '' + cmd += 'mkdir -p /kb_mnt && ' + cmd += 'mount /dev/vdb /kb_mnt && ' + cmd += '%s --name=create_file --filename=/kb_mnt/kb_storage_test.bin '\ + '--size=%s --create_only=1\n' % (dest_path, size) + cmd += 'fi' + return cmd + + # Run fio + @staticmethod + def run_fio(dest_path, name, description, mode, block_size, iodepth, runtime, + rate_iops=None, rate=None, rwmixread=None, status_interval=None, extra_opts=None): + fixed_opt = '--thread --ioengine=libaio --output-format=json+ --direct=1 ' + fixed_opt += '--filename=/kb_mnt/kb_storage_test.bin ' + required_opt = '--name=%s --rw=%s --bs=%s --iodepth=%s --runtime=%s ' %\ + (name, mode, block_size, iodepth, runtime) + optional_opt = '' + optional_opt += '--rate_iops=%s ' % rate_iops if rate_iops else '' + optional_opt += '--rate=%s ' % rate if rate else '' + optional_opt += '--rwmixread=%s ' % rwmixread if rwmixread else '' + optional_opt += '--status-interval=%s ' % status_interval if status_interval else '' + optional_opt += extra_opts if extra_opts else '' + cmd = '%s %s %s %s' % (dest_path, fixed_opt, required_opt, optional_opt) + return cmd + +class KBA_Client(object): + + def __init__(self, user_data): + host = user_data['redis_server'] + port = user_data['redis_server_port'] + self.user_data = user_data + self.redis_obj = redis.Redis(host=host, port=port) + self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True) + self.hello_thread = None + self.stop_hello = threading.Event() + self.vm_name = user_data['vm_name'] + self.orches_chan_name = "kloudbuster_orches" + self.report_chan_name = "kloudbuster_report" + self.last_cmd = None + self.last_process = None + + def setup_channels(self): + # Check for connections to redis server + while (True): + try: + self.redis_obj.get("test") + except (redis.exceptions.ConnectionError): + time.sleep(1) + continue + break + + # Subscribe to orchestration channel + self.pubsub.subscribe(self.orches_chan_name) + + def report(self, cmd, client_type, data): + message = {'cmd': cmd, 'sender-id': self.vm_name, + 'client-type': client_type, 'data': data} + self.redis_obj.publish(self.report_chan_name, str(message)) + + def send_hello(self): + # Sending "hello" message to master node every 2 seconds + while not self.stop_hello.is_set(): + self.report('READY', None, __version__) + time.sleep(2) + + def post_processing(self, p_out): + return p_out + + def exec_command(self, cmd): + # Execute the command, and returns the outputs + cmds = ['bash', '-c'] + cmds.append(cmd) + p = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.last_process = p + (stdout, stderr) = p.communicate() + + return (p.returncode, stdout, stderr) + + def exec_command_report(self, cmd): + # Execute the command, reporting periodically, and returns the outputs + cmd_res_dict = None + cmds = ['bash', '-c'] + cmds.append(cmd) + p_output = '' + p = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.last_process = p + lines_iterator = iter(p.stdout.readline, b"") + for line in lines_iterator: + # One exception, if this is the very last report, we will send it + # through "DONE" command, not "REPORT". So what's happening here + # is to determine whether this is the last report. + if cmd_res_dict: + self.report('REPORT', 'http', cmd_res_dict) + cmd_res_dict = None + p_output = line + else: + p_output += line + if line.rstrip() == "}": + p_output = self.post_processing(p_output) + cmd_res_dict = dict(list(zip(("status", "stdout", "stderr"), (0, p_output, '')))) + + stderr = p.communicate()[1] + return (p.returncode, p_output, stderr) + + def work(self): + for item in self.pubsub.listen(): + if item['type'] != 'message': + continue + # Convert the string representation of dict to real dict obj + message = eval(item['data']) + if message['cmd'] == 'ABORT': + try: + self.last_process.kill() + except Exception: + pass + else: + work_thread = threading.Thread(target=agent.process_cmd, args=[message]) + work_thread.daemon = True + work_thread.start() + + def process_cmd(self, message): + if message['cmd'] == 'ACK': + # When 'ACK' is received, means the master node + # acknowledged the current VM. So stopped sending more + # "hello" packet to the master node. + # Unfortunately, there is no thread.stop() in Python 2.x + self.stop_hello.set() + elif message['cmd'] == 'EXEC': + self.last_cmd = "" + arange = message['data']['active_range'] + my_id = int(self.vm_name[self.vm_name.rindex('I') + 1:]) + if (not arange) or (my_id >= arange[0] and my_id <= arange[1]): + try: + par = message['data'].get('parameter', '') + str_par = 'par' if par else '' + cmd_res_tuple = eval('self.exec_%s(%s)' % (message['data']['cmd'], str_par)) + cmd_res_dict = dict(list(zip(("status", "stdout", "stderr"), cmd_res_tuple))) + except Exception as exc: + cmd_res_dict = { + "status": 1, + "stdout": self.last_cmd, + "stderr": str(exc) + } + if self.__class__.__name__ == "KBA_Multicast_Client": + self.report('DONE_MC', message['client-type'], cmd_res_dict) + else: + self.report('DONE', message['client-type'], cmd_res_dict) + else: + # Unexpected + print('ERROR: Unexpected command received!') + +class KBA_HTTP_Client(KBA_Client): + + def exec_setup_static_route(self): + self.last_cmd = KB_Instance.get_static_route(self.user_data['target_subnet_ip']) + result = self.exec_command(self.last_cmd) + if (self.user_data['target_subnet_ip'] not in result[1]): + self.last_cmd = KB_Instance.add_static_route( + self.user_data['target_subnet_ip'], + self.user_data['target_shared_interface_ip']) + return self.exec_command(self.last_cmd) + else: + return (0, '', '') + + def exec_check_http_service(self): + self.last_cmd = KB_Instance.check_http_service(self.user_data['target_url']) + return self.exec_command(self.last_cmd) + + def exec_run_http_test(self, http_tool_configs): + self.last_cmd = KB_Instance.run_wrk2( + dest_path='/usr/local/bin/wrk2', + target_url=self.user_data['target_url'], + **http_tool_configs) + return self.exec_command_report(self.last_cmd) + + +class KBA_Multicast_Client(KBA_Client): + def exec_setup_static_route(self): + self.last_cmd = KB_Instance.get_static_route(self.user_data['target_subnet_ip']) + result = self.exec_command(self.last_cmd) + + if (self.user_data['target_subnet_ip'] not in result[1]): + self.last_cmd = KB_Instance.add_static_route( + self.user_data['target_subnet_ip'], + self.user_data['target_shared_interface_ip']) + return self.exec_command(self.last_cmd) + else: + return (0, '', '') + + def exec_check_nuttcp_service(self): + return 0 + + def post_processing(self, p_out): + # Converts form: 'msmaxjitter=59.6045 ms...' into: {'jitter' : 59.6045, 'la'...} + kmap = {'pkt': 'packets_recv', 'data_loss': 'data_loss', 'drop': 'packets_dropped', + 'megabytes': 'megabytes', 'rate_Mbps': 'mbps', 'msmaxjitter': 'jitter', + 'msavgOWD': 'latency'} # Format/Include Keys + try: + return {kmap[k]: abs(float(v)) + for (k, v) in [c.split("=") + for c in p_out.split(" ")] + if k in kmap} + except Exception: + return {'error': '0'} + + + + def exec_multicast_commands_report_helper(self, cmds, timeout=20, trans_recv=0): + """Start len(cmds) threads to test""" + queue = multiprocessing.Queue() + cmd_index = 0 + j = 0 + output = {} + + # Function for Process # + def spawn(cmd, queue): + p = Popen(cmds[cmd][1].split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out = p.communicate()[0] + out = out.replace("\n\n", "\n").split("\n") + out = out[len(out) / 2] + queue.put([cmds[cmd][0], out]) + # End Function # + + for cmd in cmds: + multiprocessing.Process(target=spawn, args=(cmd_index, queue)).start() + cmd_index += 1 + p_err = "" + try: + while(j < len(cmds)): + out = queue.get(True, timeout) + key = out[0] + j += 1 + p_out = self.post_processing(out[1].rstrip()) + if key in output: + for k in output[key]: + output[key][k] += abs(p_out[k]) + else: + output[key] = p_out + + except Exception: + exc_type, exc_value, exc_traceback = sys.exc_info() + p_err = repr(traceback.format_exception(exc_type, exc_value, + exc_traceback)) + + return (output, str(p_err)) + + def exec_mulitcast_commands_report(self, cmds, timeout=20, trans_recv=0): + """For each batch, pass it off to the helper to start threads. + exec_multicast_commands_report_helper is blocking. + """ + j_output = {} + n_err = 0 + r_err = "" + for cmd_list in cmds: + try: + refresh_clock(self.user_data['ntp_clocks']) + round_output, err = self.exec_multicast_commands_report_helper(cmd_list, + timeout, trans_recv) + output_key = list(round_output.keys())[0] + for key in round_output[output_key]: + round_output[output_key][key] /= float(len(cmd_list)) + j_output.update(round_output) + except Exception: + exc_type, exc_value, exc_traceback = sys.exc_info() + err = repr(traceback.format_exception(exc_type, exc_value, exc_traceback)) + finally: + if err != "": + n_err += 1 + r_err = err + return (0, json.dumps(j_output), r_err) + + def exec_run_multicast_test(self, nutt_tool_configs): + """Tests varying nAddresses/ports/bandwidths for a constant packet size. + Creates Batches of tests to run. One batch of commands is of size nAddresses * nPorts. + Sends off len(bitrates) list of Batches to exec_mulitcast_commands_report + """ + + commands_list = [] + startAddr = nutt_tool_configs['multicast_address_start'].split(".") + offset = int(startAddr[-1]) + startAddr = startAddr[:-1] + startPort = 12000 + max_nAddr = nutt_tool_configs['address_pattern'][-1] + max_nPort = nutt_tool_configs['port_pattern'][-1] + duration = nutt_tool_configs['duration'] + server_address = nutt_tool_configs['server_address'] + target_url = self.user_data['target_url'] if server_address == '0.0.0.0' else server_address + transmit_receive = 't' if server_address == '0.0.0.0' else 'r' + target_port = 5000 if server_address == '0.0.0.0' else nutt_tool_configs['server_port'] + + if transmit_receive == 'r': # Manually join multicast group... + for addr_i in range(0, max_nAddr): + for port_i in range(0, max_nPort): + m_port = startPort + ((addr_i) * max_nPort) + (port_i) + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + m_addr = ".".join(startAddr + [str(offset + addr_i)]) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + mreq = struct.pack("4sl", socket.inet_aton(m_addr), socket.INADDR_ANY) + s.bind((m_addr, m_port)) + s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + # Run this garbage test to ensure the listener is in the multicast group. + comm = KB_Instance.run_nuttcp(dest_path='/usr/local/bin/nuttcp', + target_url=target_url, + target_port=target_port, + multicast_port=12000, + multicast_addr='231.0.0.1', + bitrate=100, + pktsize=1316, + report_interval=60, + transmit_receive='r') + + exec_command(comm.split(" ")) + + nReceivers = nutt_tool_configs['receivers'] + pkt_s = nutt_tool_configs['pktsize'] + for index in range(0, len(nutt_tool_configs['address_pattern'])): + nAddresses = nutt_tool_configs['address_pattern'][index] + nPorts = nutt_tool_configs['port_pattern'][index] + for bitrate in nutt_tool_configs['bitrates']: + commands = [] + for addr_i in range(0, nAddresses): + for port_i in range(0, nPorts): + m_addr = ".".join(startAddr + [str(addr_i + offset)]) + m_port = startPort + (addr_i * max_nPort) + (port_i) + + comm = KB_Instance.run_nuttcp(dest_path='/usr/local/bin/nuttcp', + target_url=target_url, + target_port=target_port, + multicast_port=m_port, + multicast_addr=m_addr, + bitrate=bitrate, + pktsize=pkt_s, + report_interval=duration, + transmit_receive=transmit_receive) + key = "%d,%d,%d,%f,%d" % (nReceivers, nAddresses, nPorts, bitrate, pkt_s) + commands.append([key, comm]) + commands_list.append(commands) + + self.last_cmd = commands_list[-1][-1] + t_r = 0 if transmit_receive == 't' else duration + return self.exec_mulitcast_commands_report(commands_list, + trans_recv=t_r) + +class KBA_Storage_Client(KBA_Client): + + def encode_bins(self, p_output): + p_output = json.loads(p_output) + p_output['jobs'][0].pop('trim') + test_list = ['read', 'write'] + + for test in test_list: + histogram = HdrHistogram(1, 5 * 3600 * 1000, 3) + clat = p_output['jobs'][0][test]['clat']['bins'] + total_buckets = clat['FIO_IO_U_PLAT_NR'] + grp_msb_bits = clat['FIO_IO_U_PLAT_BITS'] + buckets_per_grp = clat['FIO_IO_U_PLAT_VAL'] + + for bucket in range(total_buckets): + if clat[str(bucket)]: + grp = bucket / buckets_per_grp + subbucket = bucket % buckets_per_grp + if grp == 0: + val = subbucket - 1 + else: + base = 2 ** (grp_msb_bits + grp - 1) + val = int(base + (base / buckets_per_grp) * (subbucket - 0.5)) + histogram.record_value(val, clat[str(bucket)]) + + p_output['jobs'][0][test]['clat']['hist'] = histogram.encode() + p_output['jobs'][0][test]['clat'].pop('bins') + p_output['jobs'][0][test]['clat'].pop('percentile') + + return json.dumps(p_output) + + def exec_init_volume(self, vol_init_configs): + self.last_cmd = KB_Instance.init_volume( + dest_path='/usr/local/bin/fio', + **vol_init_configs) + return self.exec_command(self.last_cmd) + + def exec_run_storage_test(self, fio_configs): + self.last_cmd = KB_Instance.run_fio( + dest_path='/usr/local/bin/fio', + name='kb_storage_test', + **fio_configs) + return self.exec_command_report(self.last_cmd) + + def post_processing(self, p_out): + return self.encode_bins(p_out) + + +class KBA_Server(object): + + def __init__(self, user_data): + self.user_data = user_data + + def config_nginx_server(self): + # Generate the HTML file with specified size + html_size = self.user_data['http_server_configs']['html_size'] + cmd_str = 'dd if=/dev/zero of=/data/www/index.html bs=%s count=1' % html_size + cmd = cmd_str.split() + return False if exec_command(cmd) else True + + def start_nginx_server(self): + cmd = ['sudo', 'service', 'nginx', 'start'] + return exec_command(cmd) + + def start_nuttcp_server(self): + cmd = ['/usr/local/bin/nuttcp', '-S' '-P5000'] + return exec_command(cmd) + + def start_multicast_listener(self, mc_addrs, multicast_ports, start_address="231.0.0.128"): + '''Starts Listeners at second /25 (.128). + These listeners are created when nReceivers > 1. + ''' + startPort = 12000 + startAddr = start_address.split(".")[:-1] + start_offset = int(start_address.split(".")[-1]) + + # Thread Function # + def spawn_mcl(addr_i, port): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + m_addr = ".".join(startAddr + [str(addr_i)]) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + ttl = struct.pack('B', 150) + s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl) + + mreq = struct.pack("4sl", socket.inet_aton(m_addr), socket.INADDR_ANY) + s.bind((m_addr, port)) + s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + while True: + d, e = s.recvfrom(10240) + + # End Function # + + for addr_i in range(0, mc_addrs): + for port_i in range(0, multicast_ports): + m_port = startPort + ((addr_i) * multicast_ports) + (port_i) + multiprocessing.Process(target=spawn_mcl, + args=(start_offset + addr_i, m_port,)).start() + + while True: + continue + +class KBA_Proxy(object): + def start_redis_server(self): + cmd = ['sudo', 'service', 'redis-server', 'start'] + return exec_command(cmd) + + +if __name__ == "__main__": + try: + with open('user_data', 'r') as f: + user_data = dict(eval(f.read())) + except Exception as e: + # KloudBuster starts without user-data + cwd = 'kloudbuster/kb_server' + cmd = ['python', 'setup.py', 'develop'] + rc = exec_command(cmd, cwd=cwd) + if not rc: + syslog.syslog("Starting kloudbuster HTTP server") + cmd = ['/usr/local/bin/pecan', 'serve', 'config.py'] + sys.exit(exec_command(cmd, cwd=cwd)) + + if user_data.get('role') == 'KB-PROXY': + agent = KBA_Proxy() + syslog.syslog("Starting kloudbuster proxy server") + sys.exit(agent.start_redis_server()) + if user_data.get('role').endswith('Server'): + agent = KBA_Server(user_data) + if user_data['role'].startswith('Multicast'): + KB_Instance.add_multicast_route() + if user_data['n_id'] == 0: + refresh_clock(user_data.get('ntp_clocks'), force_sync=True) + agent.start_nuttcp_server() + while True: + refresh_clock(user_data.get('ntp_clocks')) + time.sleep(10) + sys.exit(0) + else: + agent.start_multicast_listener(user_data.get('multicast_addresses'), + user_data.get('multicast_ports'), + user_data.get('multicast_listener_address_start')) + if agent.config_nginx_server(): + syslog.syslog("Starting kloudbuster nginx server") + sys.exit(agent.start_nginx_server()) + else: + sys.exit(1) + elif user_data.get('role').endswith('Client'): + if user_data['role'].startswith('HTTP'): + syslog.syslog("Starting kloudbuster HTTP client") + agent = KBA_HTTP_Client(user_data) + elif user_data['role'].startswith('Multicast'): + KB_Instance.add_multicast_route() + refresh_clock(user_data.get('ntp_clocks'), force_sync=True) + agent = KBA_Multicast_Client(user_data) + else: + syslog.syslog("Starting kloudbuster storage client") + agent = KBA_Storage_Client(user_data) + agent.setup_channels() + agent.hello_thread = threading.Thread(target=agent.send_hello) + agent.hello_thread.daemon = True + agent.hello_thread.start() + agent.work() + else: + sys.exit(1) diff --git a/kloudbuster/kloudbuster.py b/kloudbuster/kloudbuster.py index 035cbd5..3b09d84 100755 --- a/kloudbuster/kloudbuster.py +++ b/kloudbuster/kloudbuster.py @@ -13,7 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. -from __init__ import __version__ +from .__init__ import __version__ from concurrent.futures import ThreadPoolExecutor import datetime @@ -26,30 +26,30 @@ import time import traceback import webbrowser -import base_compute -import base_network +from . import base_compute +from . import base_network from cinderclient import client as cinderclient from glanceclient import exc as glance_exception from glanceclient.v2 import client as glanceclient -from kb_config import KBConfig -from kb_res_logger import KBResLogger -from kb_runner_base import KBException -from kb_runner_http import KBRunner_HTTP -from kb_runner_multicast import KBRunner_Multicast -from kb_runner_storage import KBRunner_Storage -from kb_scheduler import KBScheduler +from .kb_config import KBConfig +from .kb_res_logger import KBResLogger +from .kb_runner_base import KBException +from .kb_runner_http import KBRunner_HTTP +from .kb_runner_multicast import KBRunner_Multicast +from .kb_runner_storage import KBRunner_Storage +from .kb_scheduler import KBScheduler import keystoneauth1 from keystoneclient import client as keystoneclient -import log as logging +from . import log as logging from neutronclient.neutron import client as neutronclient from novaclient import client as novaclient from oslo_config import cfg from pkg_resources import resource_filename from pkg_resources import resource_string from tabulate import tabulate -import tenant +from . import tenant CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -111,7 +111,7 @@ class Kloud(object): def select_flavor(self): # Select an existing flavor that Flavor check flavor_manager = base_compute.Flavor(self.nova_client) - fcand = {'vcpus': sys.maxint, 'ram': sys.maxint, 'disk': sys.maxint} + fcand = {'vcpus': sys.maxsize, 'ram': sys.maxsize, 'disk': sys.maxsize} # find the smallest flavor that is at least 1vcpu, 1024MB ram and 10MB disk for flavor in flavor_manager.list(): flavor = vars(flavor) @@ -150,7 +150,7 @@ class Kloud(object): reusing_users=user_list) self.tenant_list.append(tenant_instance) else: - for tenant_count in xrange(self.scale_cfg['number_tenants']): + for tenant_count in range(self.scale_cfg['number_tenants']): tenant_name = self.prefix + "-T" + str(tenant_count) tenant_instance = tenant.Tenant(tenant_name, self, tenant_quota) self.res_logger.log('tenants', tenant_instance.tenant_name, @@ -192,7 +192,7 @@ class Kloud(object): def delete_resources(self): if not self.reusing_tenants: - for fn, flavor in self.flavors.iteritems(): + for fn, flavor in self.flavors.items(): LOG.info('Deleting flavor %s', fn) try: flavor.delete() @@ -249,7 +249,7 @@ class Kloud(object): if instance.vol: instance.attach_vol() - instance.fixed_ip = instance.instance.networks.values()[0][0] + instance.fixed_ip = list(instance.instance.networks.values())[0][0] u_fip = instance.config['use_floatingip'] if self.scale_cfg['provider_network']: instance.fip = None @@ -367,7 +367,7 @@ class KloudBuster(object): glance_client = glanceclient.Client('2', session=sess) try: # Search for the image - img = glance_client.images.list(filters={'name': image_name}).next() + img = next(glance_client.images.list(filters={'name': image_name})) # image found return img except StopIteration: @@ -653,10 +653,10 @@ class KloudBuster(object): self.client_vm_create_thread.join() if self.testing_kloud and self.testing_kloud.exc_info: - raise self.testing_kloud.exc_info[1], None, self.testing_kloud.exc_info[2] + raise self.testing_kloud.exc_info[1].with_traceback(self.testing_kloud.exc_info[2]) if self.kloud and self.kloud.exc_info: - raise self.kloud.exc_info[1], None, self.kloud.exc_info[2] + raise self.kloud.exc_info[1].with_traceback(self.kloud.exc_info[2]) # Function that print all the provisioning info self.print_provision_info() @@ -673,7 +673,7 @@ class KloudBuster(object): while 1: if self.interactive: print() - runlabel = raw_input('>> KB ready, enter label for next run or "q" to quit: ') + runlabel = input('>> KB ready, enter label for next run or "q" to quit: ') if runlabel.lower() == "q": break @@ -1001,7 +1001,7 @@ def main(): sys.exit(0) if CONF.show_config: - print resource_string(__name__, "cfg.scale.yaml") + print(resource_string(__name__, "cfg.scale.yaml").decode("utf8")) sys.exit(0) if CONF.multicast and CONF.storage: diff --git a/kloudbuster/nuttcp_tool.py b/kloudbuster/nuttcp_tool.py index 8bf3053..9faad47 100644 --- a/kloudbuster/nuttcp_tool.py +++ b/kloudbuster/nuttcp_tool.py @@ -15,7 +15,7 @@ import json -from perf_tool import PerfTool +from .perf_tool import PerfTool class NuttcpTool(PerfTool): diff --git a/kloudbuster/perf_instance.py b/kloudbuster/perf_instance.py index a10ec0a..ab62b6a 100644 --- a/kloudbuster/perf_instance.py +++ b/kloudbuster/perf_instance.py @@ -13,10 +13,10 @@ # under the License. # -from base_compute import BaseCompute -from fio_tool import FioTool -from nuttcp_tool import NuttcpTool -from wrk_tool import WrkTool +from .base_compute import BaseCompute +from .fio_tool import FioTool +from .nuttcp_tool import NuttcpTool +from .wrk_tool import WrkTool # An openstack instance (can be a VM or a LXC) diff --git a/kloudbuster/perf_tool.py b/kloudbuster/perf_tool.py index 2ae4d48..01c9601 100644 --- a/kloudbuster/perf_tool.py +++ b/kloudbuster/perf_tool.py @@ -15,15 +15,13 @@ import abc -import log as logging +from . import log as logging LOG = logging.getLogger(__name__) # A base class for all tools that can be associated to an instance -class PerfTool(object): - __metaclass__ = abc.ABCMeta - +class PerfTool(object, metaclass=abc.ABCMeta): def __init__(self, instance, tool_name): self.instance = instance self.name = tool_name diff --git a/kloudbuster/prometheus.py b/kloudbuster/prometheus.py index 7d5e7bc..eb4d67d 100644 --- a/kloudbuster/prometheus.py +++ b/kloudbuster/prometheus.py @@ -38,5 +38,5 @@ class Prometheus(object): self.step_size)).json() except requests.exceptions.RequestException as e: - print e + print(e) return None diff --git a/kloudbuster/start_server.py b/kloudbuster/start_server.py index 84190f8..dfeccba 100755 --- a/kloudbuster/start_server.py +++ b/kloudbuster/start_server.py @@ -22,7 +22,7 @@ def exec_command(cmd, cwd=None, show_console=False): p = subprocess.Popen(cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if show_console: for line in iter(p.stdout.readline, b""): - print line, + print(line, end=' ') p.communicate() return p.returncode @@ -40,10 +40,10 @@ def launch_kb(cwd): except OSError: continue if os.uname()[0] == "Darwin": - print - print "To run the KloudBuster web server you need to install the coreutils package:" - print " brew install coreutils" - print + print() + print("To run the KloudBuster web server you need to install the coreutils package:") + print(" brew install coreutils") + print() raise OSError('Cannot find stdbuf or gstdbuf command') def main(): @@ -52,7 +52,7 @@ def main(): try: return launch_kb(cwd) except KeyboardInterrupt: - print 'Terminating server...' + print('Terminating server...') return 1 if __name__ == '__main__': diff --git a/kloudbuster/tenant.py b/kloudbuster/tenant.py index 814d070..c99ca9f 100644 --- a/kloudbuster/tenant.py +++ b/kloudbuster/tenant.py @@ -12,13 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. -import base_compute -import base_network -import base_storage +from . import base_compute +from . import base_network +from . import base_storage from keystoneclient import exceptions as keystone_exception -import log as logging -import users +from . import log as logging +from . import users LOG = logging.getLogger(__name__) @@ -109,7 +109,7 @@ class Tenant(object): meet_quota = True quota = quota_manager.get() - for key, value in self.tenant_quota[quota_type].iteritems(): + for key, value in self.tenant_quota[quota_type].items(): if quota[key] < value: meet_quota = False break diff --git a/kloudbuster/users.py b/kloudbuster/users.py index f7b4dc1..a10f9bd 100644 --- a/kloudbuster/users.py +++ b/kloudbuster/users.py @@ -12,11 +12,11 @@ # License for the specific language governing permissions and limitations # under the License. -import base_compute -import base_network +from . import base_compute +from . import base_network from cinderclient import client as cinderclient from keystoneclient import exceptions as keystone_exception -import log as logging +from . import log as logging from neutronclient.neutron import client as neutronclient from novaclient import client as novaclient diff --git a/kloudbuster/wrk_tool.py b/kloudbuster/wrk_tool.py index 989517e..3af8006 100644 --- a/kloudbuster/wrk_tool.py +++ b/kloudbuster/wrk_tool.py @@ -15,10 +15,10 @@ import json -from perf_tool import PerfTool +from .perf_tool import PerfTool from hdrh.histogram import HdrHistogram -import log as logging +from . import log as logging LOG = logging.getLogger(__name__) @@ -119,7 +119,7 @@ class WrkTool(PerfTool): err_flag = True perc_list = [50, 75, 90, 99, 99.9, 99.99, 99.999] latency_dict = histogram.get_percentile_to_value_dict(perc_list) - for key, value in latency_dict.iteritems(): + for key, value in latency_dict.items(): all_res['latency_stats'].append([key, value]) all_res['latency_stats'].sort()