Add support to stop testing via Rest API

1. Add support to stop testing via Rest API;
2. Automatically enable floating IP if under dual-cloud mode;
3. Enhance to support running KloudBuster without keypair;
4. Disable the percentile_of_packet_not_timeout check for progression runs;

Change-Id: I4904d7322719b4784f5cd40250529dfa7578bd0a
This commit is contained in:
Yichen Wang 2015-10-13 01:27:07 -07:00
parent 6d358ae31c
commit dbd455bc7d
7 changed files with 99 additions and 23 deletions

View File

@ -31,6 +31,8 @@ import redis
# are added to the agent VM # are added to the agent VM
__version__ = '4' __version__ = '4'
# TODO(Logging on Agent)
def get_image_name(): def get_image_name():
'''Return the versioned VM image name that corresponds to this '''Return the versioned VM image name that corresponds to this
agent code. This string must match the way DIB names the kloudbuster image. agent code. This string must match the way DIB names the kloudbuster image.
@ -61,7 +63,6 @@ class KB_Instance(object):
if if_name: if if_name:
debug_msg += " and %s" % if_name debug_msg += " and %s" % if_name
cmd += " dev %s" % if_name cmd += " dev %s" % if_name
# TODO(Logging on Agent)
print debug_msg print debug_msg
return cmd return cmd
@ -89,7 +90,6 @@ class KB_Instance(object):
else: else:
debug_msg = "with next hop %s" % if_name debug_msg = "with next hop %s" % if_name
cmd += " dev %s" % if_name cmd += " dev %s" % if_name
# TODO(Logging on Agent)
print debug_msg print debug_msg
return cmd return cmd
@ -121,6 +121,7 @@ class KB_VM_Agent(object):
self.orches_chan_name = "kloudbuster_orches" self.orches_chan_name = "kloudbuster_orches"
self.report_chan_name = "kloudbuster_report" self.report_chan_name = "kloudbuster_report"
self.last_cmd = None self.last_cmd = None
self.last_process = None
def setup_channels(self): def setup_channels(self):
# Check for connections to redis server # Check for connections to redis server
@ -151,6 +152,7 @@ class KB_VM_Agent(object):
cmds = ['bash', '-c'] cmds = ['bash', '-c']
cmds.append(cmd) cmds.append(cmd)
p = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.last_process = p
(stdout, stderr) = p.communicate() (stdout, stderr) = p.communicate()
return (p.returncode, stdout, stderr) return (p.returncode, stdout, stderr)
@ -162,6 +164,7 @@ class KB_VM_Agent(object):
cmds.append(cmd) cmds.append(cmd)
p_output = '' p_output = ''
p = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.last_process = p
lines_iterator = iter(p.stdout.readline, b"") lines_iterator = iter(p.stdout.readline, b"")
for line in lines_iterator: for line in lines_iterator:
@ -181,6 +184,23 @@ class KB_VM_Agent(object):
stderr = p.communicate()[1] stderr = p.communicate()[1]
return (p.returncode, p_output, stderr) return (p.returncode, p_output, stderr)
def work(self):
for item in self.pubsub.listen():
if item['type'] != 'message':
continue
# Convert the string representation of dict to real dict obj
message = eval(item['data'])
if message['cmd'] == 'ABORT':
try:
self.last_process.kill()
except Exception:
pass
else:
work_thread = threading.Thread(target=agent.process_cmd, args=[message])
work_thread.daemon = True
work_thread.start()
def process_cmd(self, message): def process_cmd(self, message):
if message['cmd'] == 'ACK': if message['cmd'] == 'ACK':
# When 'ACK' is received, means the master node # When 'ACK' is received, means the master node
@ -205,22 +225,10 @@ class KB_VM_Agent(object):
"stderr": str(exc) "stderr": str(exc)
} }
self.report('DONE', message['client-type'], cmd_res_dict) self.report('DONE', message['client-type'], cmd_res_dict)
elif message['cmd'] == 'ABORT':
# TODO(Add support to abort a session)
pass
else: else:
# Unexpected # Unexpected
# TODO(Logging on Agent)
print 'ERROR: Unexpected command received!' print 'ERROR: Unexpected command received!'
def work(self):
for item in self.pubsub.listen():
if item['type'] != 'message':
continue
# Convert the string representation of dict to real dict obj
message = eval(item['data'])
self.process_cmd(message)
def exec_setup_static_route(self): def exec_setup_static_route(self):
self.last_cmd = KB_Instance.get_static_route(self.user_data['target_subnet_ip']) self.last_cmd = KB_Instance.get_static_route(self.user_data['target_subnet_ip'])
result = self.exec_command(self.last_cmd) result = self.exec_command(self.last_cmd)
@ -273,7 +281,6 @@ if __name__ == "__main__":
with open('user-data', 'r') as f: with open('user-data', 'r') as f:
user_data = eval(f.read()) user_data = eval(f.read())
except Exception as e: except Exception as e:
# TODO(Logging on Agent)
print e.message print e.message
sys.exit(1) sys.exit(1)

View File

@ -84,6 +84,17 @@ class KBController(object):
LOG.warn(traceback.format_exc()) LOG.warn(traceback.format_exc())
kb_session.kb_status = 'ERROR' kb_session.kb_status = 'ERROR'
def kb_stop_test_thread_handler(self, session_id):
kb_session = KBSessionManager.get(session_id)
kb_session.kb_status = 'STOPPING'
kloudbuster = kb_session.kloudbuster
try:
kloudbuster.stop_test()
kb_session.kb_status = 'STAGED'
except Exception:
LOG.warn(traceback.format_exc())
kb_session.kb_status = 'ERROR'
def kb_cleanup_thread_handler(self, session_id): def kb_cleanup_thread_handler(self, session_id):
kb_session = KBSessionManager.get(session_id) kb_session = KBSessionManager.get(session_id)
kb_session.kb_status = 'CLEANING' kb_session.kb_status = 'CLEANING'
@ -176,7 +187,7 @@ class KBController(object):
session_id = args[0] session_id = args[0]
if KBSessionManager.get(session_id).kb_status != 'STAGED': if KBSessionManager.get(session_id).kb_status != 'STAGED':
response.status = 403 response.status = 403
response.text = u"Unable to start the tests when status is not STAGED." response.text = u"Unable to start the tests when status is not at STAGED."
return response.text return response.text
self.kb_thread = threading.Thread(target=self.kb_run_test_thread_handler, args=[session_id]) self.kb_thread = threading.Thread(target=self.kb_run_test_thread_handler, args=[session_id])
@ -185,6 +196,28 @@ class KBController(object):
return "OK!" return "OK!"
@expose(generic=True)
def stop_test(self, *args):
response.status = 400
response.text = u"Please POST to this resource."
return response.text
@stop_test.when(method='POST')
@check_session_id
def stop_test_POST(self, *args):
session_id = args[0]
if KBSessionManager.get(session_id).kb_status != 'RUNNING':
response.status = 403
response.text = u"Unable to stop the tests when status is not at RUNNING."
return response.text
self.kb_thread = threading.Thread(target=self.kb_stop_test_thread_handler,
args=[session_id])
self.kb_thread.daemon = True
self.kb_thread.start()
return "OK!"
@expose(generic=True) @expose(generic=True)
def cleanup(self, *args): def cleanup(self, *args):
response.status = 400 response.status = 400

View File

@ -274,6 +274,29 @@ paths:
404: 404:
description: The session_id is not found or invalid description: The session_id is not found or invalid
/kloudbuster/stop_test/{session_id}:
post:
description: |
Stop the KloudBuster tests for a given session
parameters:
- name: session_id
type: string
format: md5sum
in: path
description: |
The session to be stopped
required: true
tags:
- kloudbuster
responses:
200:
description: Scheduled to stop the tests for the given session
403:
description: |
Unable to stop the tests when status is not at RUNNING
404:
description: The session_id is not found or invalid
/kloudbuster/cleanup/{session_id}: /kloudbuster/cleanup/{session_id}:
post: post:
description: | description: |

View File

@ -121,7 +121,8 @@ client:
# (1) The timeout value is defined in the client:http_tool_config section; # (1) The timeout value is defined in the client:http_tool_config section;
# (2) The percentile of packets must be in the below list: # (2) The percentile of packets must be in the below list:
# [50, 75, 90, 99, 99.9, 99.99, 99.999] # [50, 75, 90, 99, 99.9, 99.99, 99.999]
stop_limit: [50, 99.99] # (3) Sets percentile to 0 to disable timeout checks;
stop_limit: [50, 0]
# Assign floating IP for every client side test VM # Assign floating IP for every client side test VM
# Default: no floating IP (only assign internal fixed IP) # Default: no floating IP (only assign internal fixed IP)

View File

@ -92,6 +92,9 @@ class KBConfig(object):
if os.path.isfile(pub_key): if os.path.isfile(pub_key):
self.config_scale['public_key_file'] = pub_key self.config_scale['public_key_file'] = pub_key
LOG.info('Using %s as public key for all VMs' % (pub_key)) LOG.info('Using %s as public key for all VMs' % (pub_key))
else:
LOG.warn('No public key is found or specified to instantiate VMs. '
'You will not be able to access the VMs spawned by KloudBuster.')
if self.alt_cfg: if self.alt_cfg:
self.config_scale = self.config_scale + AttrDict(self.alt_cfg) self.config_scale = self.config_scale + AttrDict(self.alt_cfg)

View File

@ -319,6 +319,7 @@ class KBRunner(object):
while True: while True:
cur_vm_count = len(self.client_dict) cur_vm_count = len(self.client_dict)
target_vm_count = start + (cur_stage - 1) * step target_vm_count = start + (cur_stage - 1) * step
timeout_at_percentile = 0
if target_vm_count > len(self.full_client_dict): if target_vm_count > len(self.full_client_dict):
break break
if self.tool_result and 'latency_stats' in self.tool_result: if self.tool_result and 'latency_stats' in self.tool_result:
@ -326,8 +327,7 @@ class KBRunner(object):
pert_dict = dict(self.tool_result['latency_stats']) pert_dict = dict(self.tool_result['latency_stats'])
if limit[1] in pert_dict.keys(): if limit[1] in pert_dict.keys():
timeout_at_percentile = pert_dict[limit[1]] // 1000000 timeout_at_percentile = pert_dict[limit[1]] // 1000000
else: elif limit[1] != 0:
timeout_at_percentile = 0
LOG.warn('Percentile %s%% is not a standard statistic point.' % limit[1]) LOG.warn('Percentile %s%% is not a standard statistic point.' % limit[1])
if err > limit[0] or timeout_at_percentile > timeout: if err > limit[0] or timeout_at_percentile > timeout:
LOG.warn('KloudBuster is stopping the iteration because the result ' LOG.warn('KloudBuster is stopping the iteration because the result '
@ -347,3 +347,6 @@ class KBRunner(object):
else: else:
self.single_run(http_test_only=http_test_only) self.single_run(http_test_only=http_test_only)
yield self.tool_result yield self.tool_result
def stop(self):
self.send_cmd('ABORT', 'http', None)

View File

@ -226,10 +226,12 @@ class KloudBuster(object):
else: else:
self.tenants_list = {'server': None, 'client': None} self.tenants_list = {'server': None, 'client': None}
# TODO(check on same auth_url instead) # TODO(check on same auth_url instead)
if server_cred == client_cred: self.single_cloud = True if server_cred == client_cred else False
self.single_cloud = True # Automatically enable the floating IP for server cloud under dual-cloud mode
else: if not self.single_cloud and not self.server_cfg['use_floatingip']:
self.single_cloud = False self.server_cfg['use_floatingip'] = True
LOG.info('Automatically setting "use_floatingip" to True for server cloud...')
self.kloud = Kloud(server_cfg, server_cred, self.tenants_list['server']) self.kloud = Kloud(server_cfg, server_cred, self.tenants_list['server'])
self.testing_kloud = Kloud(client_cfg, client_cred, self.testing_kloud = Kloud(client_cfg, client_cred,
self.tenants_list['client'], self.tenants_list['client'],
@ -438,6 +440,10 @@ class KloudBuster(object):
self.final_result.append(self.kb_runner.tool_result) self.final_result.append(self.kb_runner.tool_result)
LOG.info('SUMMARY: %s' % self.final_result) LOG.info('SUMMARY: %s' % self.final_result)
def stop_test(self):
self.kb_runner.stop()
LOG.info('Testing is stopped by request.')
def cleanup(self): def cleanup(self):
# Cleanup: start with tested side first # Cleanup: start with tested side first
# then testing side last (order is important because of the shared network) # then testing side last (order is important because of the shared network)