Optimization on redis communication

1. Reduce to only use 2 channels for bi-direction communication;
2. Use metadata services to pass orchestration information;
3. Decouple VM creation from other resources;
4. Fix traceback during failure cleanup;

Change-Id: Ie8d00bedc7d7c148dc7f7a6eec9b5eb8747b514d
This commit is contained in:
Yichen Wang 2015-04-23 15:47:41 -07:00
parent 8a9dda8f7d
commit 351abc2c2e
11 changed files with 359 additions and 322 deletions

View File

@ -27,9 +27,8 @@ class BaseCompute(object):
"""
def __init__(self, vm_name, nova_client, user_name):
self.novaclient = nova_client
self.user_name = user_name
def __init__(self, vm_name, network):
self.novaclient = network.router.user.nova_client
self.vm_name = vm_name
self.instance = None
self.fip = None

View File

@ -78,21 +78,20 @@ class BaseNetwork(object):
def __init__(self, neutron_client, nova_client, user_name, shared_interface_ip=None):
def __init__(self, router):
"""
Store the neutron client
User name for this network
and network object
"""
self.neutron_client = neutron_client
self.nova_client = nova_client
self.user_name = user_name
self.neutron_client = router.user.neutron_client
self.nova_client = router.user.nova_client
self.router = router
self.network = None
self.instance_list = []
self.secgroup_list = []
self.keypair_list = []
# Store the shared interface ip of router for tested and testing cloud
self.shared_interface_ip = shared_interface_ip
def create_compute_resources(self, network_prefix, config_scale):
"""
@ -115,47 +114,28 @@ class BaseNetwork(object):
keypair_name = network_prefix + "-K" + str(keypair_count)
keypair_instance.add_public_key(keypair_name, config_scale['public_key_file'])
# Create the required number of VMs
# Create the VMs on specified network, first keypair, first secgroup
LOG.info("Scheduled to create virtual machines...")
if config_scale['use_floatingip']:
external_network = find_external_network(self.neutron_client)
LOG.info("Creating Virtual machines for user %s" % self.user_name)
if 'redis_server' in config_scale:
# Here we are creating a testing VM (client), put the redis server
# information in the user_data.
redis_server = config_scale['redis_server']
redis_server_port = config_scale['redis_server_port']
user_data = redis_server + ":" + str(redis_server_port)
else:
user_data = None
# Schedule to create the required number of VMs
for instance_count in range(config_scale['vms_per_network']):
vm_name = network_prefix + "-I" + str(instance_count)
perf_instance = PerfInstance(vm_name, self.nova_client, self.user_name, config_scale)
perf_instance = PerfInstance(vm_name, self, config_scale)
self.instance_list.append(perf_instance)
nic_used = [{'net-id': self.network['id']}]
LOG.info("Creating Instance: " + vm_name)
perf_instance.create_server(config_scale['image_name'],
config_scale['flavor_type'],
self.keypair_list[0].keypair_name,
nic_used,
self.secgroup_list[0].secgroup,
user_data=user_data)
# Store the subnet info and fixed ip address in instance
perf_instance.subnet_ip = self.network['subnet_ip']
perf_instance.fixed_ip = perf_instance.instance.networks.values()[0][0]
if self.shared_interface_ip:
perf_instance.shared_interface_ip = self.shared_interface_ip
if config_scale['use_floatingip']:
# Create the floating ip for the instance
# store it and the ip address in instance object
# store it and the ip address in perf_instance object
perf_instance.fip = create_floating_ip(self.neutron_client, external_network)
perf_instance.fip_ip = perf_instance.fip['floatingip']['floating_ip_address']
# Associate the floating ip with this instance
perf_instance.instance.add_floating_ip(perf_instance.fip_ip)
perf_instance.ssh_ip = perf_instance.fip_ip
else:
# Store the fixed ip as ssh ip since there is no floating ip
perf_instance.ssh_ip = perf_instance.fixed_ip
# Create the VMs on specified network, first keypair, first secgroup
perf_instance.boot_info['image_name'] = config_scale['image_name']
perf_instance.boot_info['flavor_type'] = config_scale['flavor_type']
perf_instance.boot_info['keyname'] = self.keypair_list[0].keypair_name
perf_instance.boot_info['nic'] = [{'net-id': self.network['id']}]
perf_instance.boot_info['sec_group'] = self.secgroup_list[0].secgroup
def delete_compute_resources(self):
"""
@ -234,15 +214,15 @@ class Router(object):
of network interfaces to router
"""
def __init__(self, neutron_client, nova_client, user_name, shared_network=None):
self.neutron_client = neutron_client
self.nova_client = nova_client
def __init__(self, user):
self.neutron_client = user.neutron_client
self.nova_client = user.nova_client
self.router = None
self.user_name = user_name
self.user = user
# Stores the list of networks
self.network_list = []
# Store the shared network
self.shared_network = shared_network
self.shared_network = None
self.shared_port_id = None
# Store the interface ip of shared network attached to router
self.shared_interface_ip = None
@ -253,16 +233,11 @@ class Router(object):
Also triggers the creation of compute resources inside each
network
"""
# If a shared network exists create a port on this
# network and attach to router interface
if self.shared_network:
self.attach_router_interface(self.shared_network, use_port=True)
for network_count in range(config_scale['networks_per_router']):
network_instance = BaseNetwork(self.neutron_client, self.nova_client, self.user_name,
self.shared_interface_ip)
network_instance = BaseNetwork(self)
self.network_list.append(network_instance)
# Create the network and subnet
network_name = self.user_name + "-N" + str(network_count)
network_name = self.user.user_name + "-N" + str(network_count)
network_instance.create_network_and_subnet(network_name)
# Attach the created network to router interface
self.attach_router_interface(network_instance)
@ -289,7 +264,8 @@ class Router(object):
for network in self.network_list:
# Now delete the compute resources and the network resources
network.delete_compute_resources()
self.remove_router_interface(network)
if network.network:
self.remove_router_interface(network)
network.delete_network()
# Also delete the shared port and remove it from router interface
if self.shared_network:
@ -364,8 +340,6 @@ class Router(object):
}
self.neutron_client.add_interface_router(self.router['router']['id'], body)
def remove_router_interface(self, network_instance, use_port=False):
"""
Remove the network interface from router

View File

@ -1,7 +1,7 @@
# KloudBuster Default configuration file
server:
# Number of tenants to be created on the cloud
number_tenants: 1
number_tenants: 2
# Number of Users to be created inside the tenant
users_per_tenant: 1
@ -15,7 +15,7 @@ server:
routers_per_user: 1
# Number of VM instances to be created within the context of each User
vms_per_network: 2
vms_per_network: 1
# Number of security groups per network
secgroups_per_network: 1
@ -63,7 +63,7 @@ client:
keypairs_per_network: 1
# Assign floating IP for every VM
use_floatingip: False
use_floatingip: False
# Specify whether the testing cloud is running in same cloud
run_on_same_cloud: True
@ -79,12 +79,26 @@ client:
redis_retry_count: 50
polling_interval: 5
# Duration of testing tools (seconds)
exec_time: 30
# Tooling
tp_tool: 'nuttcp'
http_tool: 'wrk'
tp_tool:
name: 'nuttcp'
dest_path: '/var/tmp/nuttcp-7.3.2'
http_tool:
name: 'wrk'
dest_path: '/var/tmp/wrk-4.0.1'
# HTTP Tool Specific Configs
http_tool_configs:
# Threads to run tests
threads: 2
# Connections to be kept concurrently
connections: 5000
# Timeout for HTTP requests
timeout: 5
# Connection Type: "Keep-alive", "New"
connection_type: 'Keep-alive'
# Duration of testing tools (seconds)
duration: 30
# Prompt before running benchmarking tools
prompt_before_run: False
@ -93,5 +107,5 @@ client:
keystone_admin_role: "admin"
cleanup_resources: True
public_key_file: '../ssh/id_rsa.pub'
image_name: 'Scale Image v4'
flavor_type: 'm1.small'
image_name: 'Scale Image v4a'
flavor_type: 'kb_flavor'

View File

@ -37,10 +37,37 @@ class KBScheduler(object):
"""
def __init__(self, client_list, config):
self.client_list = client_list
self.client_dict = dict(zip([x.vm_name.lower() for x in client_list], client_list))
self.config = config
self.result = {}
self.redis_connection_pool = None
# Redis
self.connection_pool = None
self.redis_obj = None
self.pubsub = None
self.orches_chan_name = "kloudbuster_orches"
self.report_chan_name = "kloudbuster_report"
def setup_redis(self):
self.redis_obj = redis.StrictRedis(connection_pool=self.connection_pool)
# Check for connections to redis server
for retry in xrange(1, self.config.redis_retry_count + 1):
try:
self.redis_obj.get("test")
except (redis.exceptions.ConnectionError):
LOG.warn("Connecting to redis server... Retry #%d", retry)
time.sleep(1)
continue
break
# Subscribe to message channel
self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True)
self.pubsub.subscribe(self.report_chan_name)
def send_cmd(self, cmd, client_type, data):
message = {'cmd': cmd, 'sender-id': 'kb-master',
'client-type': client_type, 'data': data}
LOG.kbdebug(message)
self.redis_obj.publish(self.orches_chan_name, message)
def polling_vms(self, timeout, polling_interval=None):
'''
@ -51,24 +78,40 @@ class KBScheduler(object):
polling_interval = self.config.polling_interval
retry_count = max(timeout / polling_interval, 1)
retry = cnt_succ = cnt_failed = 0
clist = self.client_list
clist = self.client_dict.copy()
while (retry < retry_count and len(clist)):
time.sleep(polling_interval)
for instance in clist:
msg = instance.redis_get_message()
while True:
msg = self.pubsub.get_message()
if not msg:
# No new message, command in executing
continue
elif msg[0]:
# Command returned with non-zero status, command failed
cnt_failed = cnt_failed + 1
else:
# Command returned with zero, command succeed
cnt_succ = cnt_succ + 1
# Current instance finished execution
self.result[instance.vm_name] = msg
clist = [x for x in clist if x != instance]
# No new message, commands are in executing
break
LOG.kbdebug(msg)
payload = eval(msg['data'])
vm_name = payload['sender-id']
instance = self.client_dict[vm_name]
if payload['cmd'] == 'READY':
# If a READY packet is received, the corresponding VM is up
# running. We mark the flag for that VM, and skip all READY
# messages received afterwards.
if instance.up_flag:
continue
else:
self.send_cmd('ACK', None, None)
clist[vm_name].up_flag = True
clist.pop(vm_name)
cnt_succ = cnt_succ + 1
elif payload['cmd'] == 'DONE':
self.result[vm_name] = payload['data']
clist.pop(vm_name)
if self.result[vm_name]['status']:
# Command returned with non-zero status, command failed
cnt_failed = cnt_failed + 1
else:
# Command returned with zero, command succeed
cnt_succ = cnt_succ + 1
LOG.info("%d Succeed, %d Failed, %d Pending... Retry #%d" %
(cnt_succ, cnt_failed, len(clist), retry))
@ -78,45 +121,46 @@ class KBScheduler(object):
def wait_for_vm_up(self, timeout=120):
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_list):
if cnt_succ != len(self.client_dict):
raise KBVMUpException()
def setup_static_route(self, timeout=10):
for instance in self.client_list:
svr = instance.target_server
instance.add_static_route(svr.subnet_ip, svr.shared_interface_ip)
func = {'cmd': 'setup_static_route'}
self.send_cmd('EXEC', 'http', func)
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_list):
if cnt_succ != len(self.client_dict):
raise KBSetStaticRouteException()
def check_http_server_up(self, timeout=60):
for instance in self.client_list:
instance.check_http_service()
def check_http_service(self, timeout=60):
func = {'cmd': 'check_http_service'}
self.send_cmd('EXEC', 'http', func)
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_list):
if cnt_succ != len(self.client_dict):
raise KBHTTPServerUpException()
def run_http_test(self):
for instance in self.client_list:
instance.run_http_client(threads=2, connections=5000, timeout=5,
connection_type="Keep-alive")
func = {'cmd': 'run_http_test'}
self.send_cmd('EXEC', 'http', func)
# Give additional 30 seconds for everybody to report results
timeout = self.config.exec_time + 30
timeout = self.config.http_tool_configs.duration + 3000
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_list):
if cnt_succ != len(self.client_dict):
raise KBHTTPBenchException()
# Parse the results from HTTP Tools
for key, instance in self.client_dict.items():
self.result[key] = instance.http_client_parser(**self.result[key])
def run(self):
LOG.info("Setting up redis connection pool...")
# For now, the redis server is not in the scope of Kloud Buster, which has to be
# pre-configured before executing Kloud Buster.
self.redis_connection_pool = redis.ConnectionPool(
self.connection_pool = redis.ConnectionPool(
host=self.config.redis_server, port=self.config.redis_server_port, db=0)
try:
LOG.info("Setting up the redis connections...")
for instance in self.client_list:
instance.setup_redis(connection_pool=self.redis_connection_pool)
self.setup_redis()
LOG.info("Waiting for agents on VMs to come up...")
self.wait_for_vm_up()
@ -125,7 +169,7 @@ class KBScheduler(object):
self.setup_static_route()
LOG.info("Waiting for HTTP service to come up...")
self.check_http_server_up()
self.check_http_service()
if self.config.prompt_before_run:
print "Press enter to start running benchmarking tools..."
@ -133,9 +177,9 @@ class KBScheduler(object):
LOG.info("Starting HTTP Benchmarking...")
self.run_http_test()
for key in self.result:
for val in self.result.values():
# TODO(Consolidating the data from all VMs)
print "[%s] %s" % (key, self.result[key][1])
print val
except (KBSetStaticRouteException):
LOG.error("Could not set static route.")

View File

@ -21,19 +21,82 @@ import time
import redis
class KB_Instance(object):
# Check whether the HTTP Service is up running)
@staticmethod
def check_http_service(target_url):
cmd = 'while true; do\n'
cmd += 'curl --head %s --connect-timeout 2 --silent\n' % (target_url)
cmd += 'if [ $? -eq 0 ]; then break; fi\n'
cmd += 'done'
return cmd
# Add static route
@staticmethod
def add_static_route(network, next_hop_ip, if_name=None):
debug_msg = "Adding static route %s with next hop %s" % (network, next_hop_ip)
cmd = "sudo ip route add %s via %s" % (network, next_hop_ip)
if if_name:
debug_msg += " and %s" % if_name
cmd += " dev %s" % if_name
# TODO(Logging on Agent)
print debug_msg
return cmd
# Get static route
@staticmethod
def get_static_route(network, next_hop_ip=None, if_name=None):
cmd = "ip route show %s" % network
if next_hop_ip:
cmd += " via %s" % next_hop_ip
if if_name:
cmd += " dev %s" % if_name
return cmd
# Delete static route
@staticmethod
def delete_static_route(network, next_hop_ip=None, if_name=None):
debug_msg = "Deleting static route %s" % network
cmd = "sudo ip route del %s" % network
if next_hop_ip:
debug_msg = " with next hop %s" % next_hop_ip
cmd += " via %s" % next_hop_ip
if if_name:
if next_hop_ip:
debug_msg = " and %s" % if_name
else:
debug_msg = "with next hop %s" % if_name
cmd += " dev %s" % if_name
# TODO(Logging on Agent)
print debug_msg
return cmd
# Run the HTTP benchmarking tool
@staticmethod
def run_http_test(dest_path, target_url, threads, connections,
duration, timeout, connection_type):
cmd = '%s -t%d -c%d -d%ds --timeout %ds --latency %s' % \
(dest_path, threads, connections, duration, timeout, target_url)
return cmd
class KB_VM_Agent(object):
def __init__(self, host, port=6379):
def __init__(self, user_data):
host = user_data['redis_server']
port = user_data['redis_server_port']
self.user_data = user_data
self.redis_obj = redis.StrictRedis(host=host, port=port)
self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True)
self.hello_thread = None
self.stop_hello = threading.Event()
# Assumption:
# Here we assume the vm_name is the same as the host name, which is
# true if the VM is spawned by Kloud Buster.
# Here we assume the vm_name is the same as the host name (lower case),
# which is true if the VM is spawned by Kloud Buster.
self.vm_name = socket.gethostname().lower()
self.orches_chan_name = self.vm_name.lower() + "_orches"
self.report_chan_name = self.vm_name.lower() + "_report"
self.orches_chan_name = "kloudbuster_orches"
self.report_chan_name = "kloudbuster_report"
def setup_channels(self):
# Check for connections to redis server
@ -48,6 +111,17 @@ class KB_VM_Agent(object):
# Subscribe to orchestration channel
self.pubsub.subscribe(self.orches_chan_name)
def report(self, cmd, client_type, data):
message = {'cmd': cmd, 'sender-id': self.vm_name,
'client-type': client_type, 'data': data}
self.redis_obj.publish(self.report_chan_name, message)
def send_hello(self):
# Sending "hello" message to master node every 2 seconds
while not self.stop_hello.is_set():
self.report('READY', None, None)
time.sleep(2)
def exec_command(self, cmd):
# Execute the command, and returns the outputs
cmds = ['bash', '-c']
@ -57,44 +131,56 @@ class KB_VM_Agent(object):
return (p.returncode, stdout, stderr)
def report(self, result):
self.redis_obj.publish(self.report_chan_name, result)
def process_cmd(self, cmd_data):
cmd_res_tuple = self.exec_command(cmd_data['cmd'])
cmd_res_dict = dict(zip(("status", "stdout", "stderr"), cmd_res_tuple))
cmd_res_dict['parser_cb'] = cmd_data['parser_cb']
self.report(cmd_res_dict)
def send_hello(self):
# Sending "hello" message to master node every 2 seconds
while not self.stop_hello.is_set():
self.report("hello")
time.sleep(2)
def process_cmd(self, msg):
if msg['cmd'] == 'ACK':
# When 'ACK' is received, means the master node
# acknowledged the current VM. So stopped sending more
# "hello" packet to the master node.
# Unfortunately, there is no thread.stop() in Python 2.x
self.stop_hello.set()
elif msg['cmd'] == 'EXEC':
cmd_res_tuple = eval('self.exec_' + msg['data']['cmd'] + '()')
cmd_res_dict = dict(zip(("status", "stdout", "stderr"), cmd_res_tuple))
self.report('DONE', msg['client-type'], cmd_res_dict)
elif msg['cmd'] == 'ABORT':
# TODO(Add support to abort a session)
pass
def work(self):
for item in self.pubsub.listen():
if item['type'] != 'message':
continue
if item['data'] == 'iamhere':
# When a "iamhere" packet is received, means the master node
# acknowledged the current VM. So stopped sending more
# "hello" packet to the master node.
# Unfortunately, there is no thread.stop() in Python 2.x
self.stop_hello.set()
continue
# Convert the string representation of dict to real dict obj
cmd_data = eval(item['data'])
self.process_cmd(cmd_data)
msg = eval(item['data'])
self.process_cmd(msg)
def exec_setup_static_route(self):
cmd = KB_Instance.add_static_route(self.user_data['target_subnet_ip'],
self.user_data['target_shared_interface_ip'])
return self.exec_command(cmd)
def exec_check_http_service(self):
cmd = KB_Instance.check_http_service(self.user_data['target_url'])
return self.exec_command(cmd)
def exec_run_http_test(self):
cmd = KB_Instance.run_http_test(dest_path=self.user_data['http_tool']['dest_path'],
target_url=self.user_data['target_url'],
**self.user_data['http_tool_configs'])
return self.exec_command(cmd)
if __name__ == "__main__":
if (len(sys.argv) <= 1):
print("ERROR: Expecting the redis server address.")
try:
f = open('/var/tmp/user-data', 'r')
user_data = eval(f.read())
except Exception as e:
# TODO(Logging on Agent)
print e.message
sys.exit(1)
redis_server, redis_server_port = sys.argv[1].split(':', 1)
agent = KB_VM_Agent(redis_server, redis_server_port)
agent = KB_VM_Agent(user_data)
agent.setup_channels()
agent.hello_thread = threading.Thread(target=agent.send_hello)
agent.hello_thread.daemon = True

View File

@ -69,10 +69,8 @@ class Kloud(object):
# if this cloud is sharing a network then all tenants must hook up to
# it and on deletion that shared network must NOT be deleted
# as it will be deleted by the owner
self.shared_network = None
def create_resources(self, shared_net=None):
self.shared_network = shared_net
def create_resources(self):
for tenant_count in xrange(self.scale_cfg['number_tenants']):
tenant_name = self.prefix + "-T" + str(tenant_count)
new_tenant = tenant.Tenant(tenant_name, self)
@ -94,6 +92,34 @@ class Kloud(object):
all_instances.extend(tnt.get_all_instances())
return all_instances
def attach_to_shared_net(self, shared_net):
# If a shared network exists create a port on this
# network and attach to router interface
for tnt in self.tenant_list:
for usr in tnt.user_list:
for rtr in usr.router_list:
rtr.shared_network = shared_net
rtr.attach_router_interface(shared_net, use_port=True)
for net in rtr.network_list:
for ins in net.instance_list:
ins.shared_interface_ip = rtr.shared_interface_ip
def create_vms(self):
# TODO(Make the creation concurrently)
for instance in self.get_all_instances():
LOG.info("Creating Instance: " + instance.vm_name)
instance.create_server(**instance.boot_info)
instance.fixed_ip = instance.instance.networks.values()[0][0]
if instance.config['use_floatingip']:
# Associate the floating ip with this instance
instance.instance.add_floating_ip(instance.fip_ip)
instance.ssh_ip = instance.fip_ip
else:
# Store the fixed ip as ssh ip since there is no floating ip
instance.ssh_ip = instance.fixed_ip
class KloudBuster(object):
"""
Creates resources on the cloud for loading up the cloud
@ -139,6 +165,26 @@ class KloudBuster(object):
LOG.info('Provision Details (Testing Kloud)\n' +
tabulate(table, headers="firstrow", tablefmt="psql"))
def gen_user_data(self):
LOG.info("Preparing metadata for testing VMs...")
# We supposed to have a mapping framework/algorithm to mapping clients to servers.
# e.g. 1:1 mapping, 1:n mapping, n:1 mapping, etc.
# Here we are using N*1:1
client_list = self.testing_kloud.get_all_instances()
svr_list = self.kloud.get_all_instances()
for idx, ins in enumerate(client_list):
ins.target_url = "http://%s/index.html" %\
(svr_list[idx].fip_ip or svr_list[idx].fixed_ip)
ins.user_data['redis_server'] = ins.config['redis_server']
ins.user_data['redis_server_port'] = ins.config['redis_server_port']
ins.user_data['target_subnet_ip'] = svr_list[idx].subnet_ip
ins.user_data['target_shared_interface_ip'] = svr_list[idx].shared_interface_ip
ins.user_data['target_url'] = ins.target_url
ins.user_data['http_tool'] = ins.config['http_tool']
ins.user_data['http_tool_configs'] = ins.config['http_tool_configs']
ins.boot_info['user_data'] = str(ins.user_data)
def run(self):
"""
The runner for KloudBuster Tests
@ -147,27 +193,21 @@ class KloudBuster(object):
"""
try:
# Create the testing cloud resources
self.kloud.create_resources()
self.kloud.create_vms()
self.testing_kloud.create_resources()
# Find the shared network if the cloud used to testing is same
if self.single_cloud:
shared_network = self.testing_kloud.get_first_network()
else:
shared_network = None
self.kloud.create_resources(shared_network)
# Find the shared network if the cloud used to testing is same
# Attach the router in tested kloud to the shared network
shared_net = self.testing_kloud.get_first_network()
self.kloud.attach_to_shared_net(shared_net)
self.gen_user_data()
self.testing_kloud.create_vms()
# Function that print all the provisioning info
self.print_provision_info()
# We supposed to have a mapping framework/algorithm to mapping clients to servers.
# e.g. 1:1 mapping, 1:n mapping, n:1 mapping, etc.
# Here we are using N*1:1
client_list = self.testing_kloud.get_all_instances()
for idx, svr in enumerate(self.kloud.get_all_instances()):
client_list[idx].target_server = svr
client_list[idx].target_url = "http://%s/index.html" %\
(svr.fip_ip or svr.fixed_ip)
kbscheduler = kb_scheduler.KBScheduler(client_list, config_scale.client)
kbscheduler.run()
except KeyboardInterrupt:
@ -178,9 +218,16 @@ class KloudBuster(object):
# Cleanup: start with tested side first
# then testing side last (order is important because of the shared network)
if config_scale.server['cleanup_resources']:
self.kloud.delete_resources()
try:
self.kloud.delete_resources()
except Exception:
traceback.print_exc()
if config_scale.client['cleanup_resources']:
self.testing_kloud.delete_resources()
try:
self.testing_kloud.delete_resources()
except Exception:
traceback.print_exc()
if __name__ == '__main__':
# The default configuration file for KloudBuster

View File

@ -16,13 +16,11 @@
import os
import stat
import subprocess
import time
import sshutils
from base_compute import BaseCompute
import log as logging
import redis
from wrk_tool import WrkTool
LOG = logging.getLogger(__name__)
@ -30,12 +28,14 @@ LOG = logging.getLogger(__name__)
# An openstack instance (can be a VM or a LXC)
class PerfInstance(BaseCompute):
def __init__(self, vm_name, nova_client, user_name, config, is_server=False):
BaseCompute.__init__(self, vm_name, nova_client, user_name)
def __init__(self, vm_name, network, config, is_server=False):
BaseCompute.__init__(self, vm_name, network)
self.config = config
self.internal_ip = None
self.is_server = is_server
self.boot_info = {}
self.user_data = {}
self.up_flag = False
# SSH Configuration
self.ssh_ip = None
@ -43,13 +43,6 @@ class PerfInstance(BaseCompute):
self.ssh = None
self.port = None
# Redis Configuration
self.redis_obj = None
self.pubsub = None
self.up_flag = False
self.orches_chan_name = self.vm_name.lower() + "_orches"
self.report_chan_name = self.vm_name.lower() + "_report"
if 'tp_tool' not in config:
self.tp_tool = None
# elif config.tp_tool.lower() == 'nuttcp':
@ -61,9 +54,8 @@ class PerfInstance(BaseCompute):
if 'http_tool' not in config:
self.http_tool = None
elif config.http_tool.lower() == 'wrk':
self.http_tool = WrkTool(self)
self.target_server = None
elif config.http_tool.name.lower() == 'wrk':
self.http_tool = WrkTool(self, config.http_tool)
self.target_url = None
else:
self.http_tool = None
@ -87,8 +79,7 @@ class PerfInstance(BaseCompute):
tp_tool_res = []
res = {'ip_to': dest_ip}
if self.internal_ip:
res['ip_from'] = self.internal_ip
res['ip_from'] = self.ssh_ip
if label:
res['desc'] = label
if self.az:
@ -102,23 +93,11 @@ class PerfInstance(BaseCompute):
res['results'] = tp_tool_res
return res
# Target URL is supposed to be provided during the mapping stage
def run_http_client(self, threads, connections,
timeout=5, connection_type="Keep-alive"):
# HTTP Performance Measurement
cmd = self.http_tool.cmd_run_client(self.target_url,
threads,
connections,
timeout,
connection_type)
parser_cb = 'self.run_http_client_parser'
self.redis_exec_command(cmd, parser_cb)
def run_http_client_parser(self, status, stdout, stderr):
def http_client_parser(self, status, stdout, stderr):
http_tool_res = self.http_tool.cmd_parser_run_client(status, stdout, stderr)
res = {'target_url': self.target_url}
if self.internal_ip:
res['ip_from'] = self.internal_ip
res = {'vm_name': self.vm_name}
res['target_url'] = self.target_url
res['ip_from'] = self.ssh_ip
# consolidate results for all tools
res['results'] = http_tool_res
@ -128,8 +107,6 @@ class PerfInstance(BaseCompute):
# Returns True if success
def setup_ssh(self, ssh_ip, ssh_user):
# used for displaying the source IP in json results
if not self.internal_ip:
self.internal_ip = ssh_ip
self.ssh_ip = ssh_ip
self.ssh_user = ssh_user
self.ssh = sshutils.SSH(self.ssh_user, self.ssh_ip,
@ -142,108 +119,6 @@ class PerfInstance(BaseCompute):
(status, cmd_output, err) = self.ssh.execute(cmd, timeout=timeout)
return (status, cmd_output, err)
# Setup the redis connectivity
def setup_redis(self, host=None, port=None, connection_pool=None):
if connection_pool:
self.redis_obj = redis.StrictRedis(connection_pool=connection_pool)
else:
self.redis_obj = redis.StrictRedis(host=host, port=port)
# Check for connections to redis server
for retry in xrange(1, self.config.redis_retry_count + 1):
try:
self.redis_obj.get("test")
except (redis.exceptions.ConnectionError):
LOG.warn("Connecting to redis server... Retry #%d", retry)
time.sleep(1)
continue
break
# Subscribe to message channel
self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True)
self.pubsub.subscribe(self.report_chan_name)
return True
def redis_get_message(self):
message = self.pubsub.get_message()
while message and message['data'] == 'hello':
# If a "hello" packet is received, the corresponding VM is up
# running. We mark the flag for that VM, and skip all "hello"
# messages received afterwards.
if self.up_flag:
message = self.pubsub.get_message()
else:
self.up_flag = True
self.redis_acknowledge_hello()
return (0, "", "")
if not message:
return None
LOG.kbdebug(message)
msg_body = eval(message['data'])
status = int(msg_body['status'])
stdout = msg_body['stdout']
stderr = msg_body['stderr']
parser_cb = msg_body['parser_cb']
if parser_cb is not None:
stdout = eval("%s(status, stdout, stderr)" % parser_cb)
return (status, stdout, stderr)
def redis_acknowledge_hello(self):
self.redis_obj.publish(self.orches_chan_name, "iamhere")
def redis_exec_command(self, cmd, parser_cb=None, timeout=30):
# TODO(Add timeout support)
msg_body = {'cmd': cmd, 'parser_cb': parser_cb}
LOG.kbdebug(msg_body)
self.redis_obj.publish(self.orches_chan_name, msg_body)
# Check whether the HTTP Service is up running
def check_http_service(self):
cmd = 'while true; do\n'
cmd += 'curl --head %s --connect-timeout 2 --silent\n' % (self.target_url)
cmd += 'if [ $? -eq 0 ]; then break; fi\n'
cmd += 'done'
self.redis_exec_command(cmd, None)
# Add static route
def add_static_route(self, network, next_hop_ip, if_name=None):
debug_msg = "Adding static route %s with next hop %s" % (network, next_hop_ip)
cmd = "sudo ip route add %s via %s" % (network, next_hop_ip)
if if_name:
debug_msg += " and %s" % if_name
cmd += " dev %s" % if_name
LOG.kbdebug(debug_msg)
self.redis_exec_command(cmd, None)
# Get static route
def get_static_route(self, network, next_hop_ip=None, if_name=None):
cmd = "ip route show %s" % network
if next_hop_ip:
cmd += " via %s" % next_hop_ip
if if_name:
cmd += " dev %s" % if_name
# TODO(Need to implement a parser_cb instead of passing None)
self.redis_exec_command(cmd, None)
# Delete static route
def delete_static_route(self, network, next_hop_ip=None, if_name=None):
debug_msg = "[%s] Deleting static route %s" % (self.vm_name, network)
cmd = "sudo ip route del %s" % network
if next_hop_ip:
debug_msg = " with next hop %s" % next_hop_ip
cmd += " via %s" % next_hop_ip
if if_name:
if next_hop_ip:
debug_msg = " and %s" % if_name
else:
debug_msg = "with next hop %s" % if_name
cmd += " dev %s" % if_name
LOG.kbdebug(debug_msg)
self.redis_exec_command(cmd, None)
# scp a file from the local host to the instance
# Returns True if dest file already exists or scp succeeded
# False in case of scp error

View File

@ -20,17 +20,14 @@ import log as logging
LOG = logging.getLogger(__name__)
# where to copy the tool on the target, must end with slash
SCP_DEST_DIR = '/var/tmp/'
# A base class for all tools that can be associated to an instance
class PerfTool(object):
__metaclass__ = abc.ABCMeta
def __init__(self, name, instance):
self.name = name
def __init__(self, instance, tool_cfg):
self.name = tool_cfg.name
self.instance = instance
self.dest_path = SCP_DEST_DIR + name
self.dest_path = tool_cfg.dest_path
self.pid = None
# Terminate pid if started

View File

@ -56,7 +56,8 @@ class Tenant(object):
# Conflict: Conflict occurred attempting to store project - Duplicate Entry (HTTP 409)
if exc.http_status != 409:
raise exc
LOG.info("Tenant %s already present, reusing it" % self.tenant_name)
LOG.info("Tenant %s already present, reusing it" % self.tenant_name)
# It is a hassle to find a tenant by name as the only way seems to retrieve
# the list of all tenants which can be very large
tenant_list = self.kloud.keystone.tenants.list()

View File

@ -40,9 +40,9 @@ class User(object):
self.user_id = None
self.router_list = []
# Store the neutron and nova client
self.neutron = None
self.nova = None
admin_user = self._get_user()
self.neutron_client = None
self.nova_client = None
self.admin_user = self._get_user()
# Create the user within the given tenant associate
# admin role with user. We need admin role for user
@ -53,10 +53,10 @@ class User(object):
if role.name == user_role:
current_role = role
break
self.tenant.kloud.keystone.roles.add_user_role(admin_user,
self.tenant.kloud.keystone.roles.add_user_role(self.admin_user,
current_role,
tenant.tenant_id)
self.user_id = admin_user.id
self.user_id = self.admin_user.id
def _create_user(self):
LOG.info("Creating user: " + self.user_name)
@ -78,19 +78,19 @@ class User(object):
# Conflict: Conflict occurred attempting to store user - Duplicate Entry (HTTP 409)
if exc.http_status != 409:
raise exc
# Try to repair keystone by removing that user
LOG.warn("User creation failed due to stale user with same name: " +
self.user_name)
# Again, trying to find a user by name is pretty inefficient as one has to list all
# of them
users_list = self.tenant.kloud.keystone.users.list()
for user in users_list:
if user.name == self.user_name:
# Found it, time to delete it
LOG.info("Deleting stale user with name: " + self.user_name)
self.tenant.kloud.keystone.users.delete(user)
user = self._create_user()
return user
# Try to repair keystone by removing that user
LOG.warn("User creation failed due to stale user with same name: " +
self.user_name)
# Again, trying to find a user by name is pretty inefficient as one has to list all
# of them
users_list = self.tenant.kloud.keystone.users.list()
for user in users_list:
if user.name == self.user_name:
# Found it, time to delete it
LOG.info("Deleting stale user with name: " + self.user_name)
self.tenant.kloud.keystone.users.delete(user)
user = self._create_user()
return user
# Not found there is something wrong
raise Exception('Cannot find stale user:' + self.user_name)
@ -119,7 +119,7 @@ class User(object):
creden['tenant_name'] = self.tenant.tenant_name
# Create the neutron client to be used for all operations
self.neutron = neutronclient.Client(**creden)
self.neutron_client = neutronclient.Client(**creden)
# Create a new nova client for this User with correct credentials
creden_nova = {}
@ -128,21 +128,20 @@ class User(object):
creden_nova['auth_url'] = self.tenant.kloud.auth_url
creden_nova['project_id'] = self.tenant.tenant_name
creden_nova['version'] = 2
self.nova = Client(**creden_nova)
self.nova_client = Client(**creden_nova)
config_scale = self.tenant.kloud.scale_cfg
# Find the external network that routers need to attach to
# if redis_server is configured, we need to attach the router to the
# external network in order to reach the redis_server
if config_scale['use_floatingip'] or 'redis_server' in config_scale:
external_network = base_network.find_external_network(self.neutron)
external_network = base_network.find_external_network(self.neutron_client)
else:
external_network = None
# Create the required number of routers and append them to router list
LOG.info("Creating routers for user %s" % self.user_name)
LOG.info("Creating routers and networks for user %s" % self.user_name)
for router_count in range(config_scale['routers_per_user']):
router_instance = base_network.Router(self.neutron, self.nova, self.user_name,
self.tenant.kloud.shared_network)
router_instance = base_network.Router(self)
self.router_list.append(router_instance)
router_name = self.user_name + "-R" + str(router_count)
# Create the router and also attach it to external network

View File

@ -24,15 +24,15 @@ LOG = logging.getLogger(__name__)
class WrkTool(PerfTool):
def __init__(self, instance):
PerfTool.__init__(self, 'wrk-4.0.1', instance)
def __init__(self, instance, cfg_http_tool):
PerfTool.__init__(self, instance, cfg_http_tool)
def cmd_run_client(self, target_url, threads, connections,
timeout=5, connetion_type='Keep-alive', retry_count=10):
'''
Return the command for running the benchmarking tool
'''
duration_sec = self.instance.config.exec_time
duration_sec = self.instance.config.http_tool_configs.duration
cmd = '%s -t%d -c%d -d%ds --timeout %ds --latency %s' % \
(self.dest_path, threads, connections, duration_sec, timeout, target_url)
LOG.kbdebug("[%s] %s" % (self.instance.vm_name, cmd))
@ -41,6 +41,7 @@ class WrkTool(PerfTool):
def cmd_parser_run_client(self, status, stdout, stderr):
if status:
return [self.parse_error(stderr)]
# Sample Output:
# Running 10s test @ http://192.168.1.1/index.html
# 8 threads and 5000 connections