Properly cleanup listener and driver on simulator exit

Change-Id: Id04d4d1ce131bf7a4681273c438cbe6e58b44e78
Closes-Bug: #1584743
Co-Authored-By: Oleksii Zamiatin <ozamiatin@mirantis.com>
This commit is contained in:
Gevorg Davoian 2016-07-24 19:12:23 +03:00
parent 0ecc25509f
commit 564e423d24

@ -43,6 +43,7 @@ CLIENTS = []
MESSAGES = [] MESSAGES = []
IS_RUNNING = True IS_RUNNING = True
SERVERS = [] SERVERS = []
TRANSPORT = None
USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\ USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
{notify-server,notify-client,rpc-server,rpc-client} ... {notify-server,notify-client,rpc-server,rpc-client} ...
@ -134,7 +135,7 @@ class MessageStatsCollector(object):
max_latency = 0 max_latency = 0
sum_latencies = 0 sum_latencies = 0
for i in range(count): for i in six.moves.range(count):
p = self.buffer[i] p = self.buffer[i]
size += len(p.cargo) size += len(p.cargo)
@ -380,10 +381,10 @@ def generate_messages(messages_count):
messages_count = MESSAGES_LIMIT messages_count = MESSAGES_LIMIT
LOG.info("Generating %d random messages", messages_count) LOG.info("Generating %d random messages", messages_count)
for i in range(messages_count): for i in six.moves.range(messages_count):
length = RANDOM_GENERATOR() length = RANDOM_GENERATOR()
msg = ''.join(random.choice( msg = ''.join(random.choice(
string.ascii_lowercase) for x in range(length)) string.ascii_lowercase) for x in six.moves.range(length))
MESSAGES.append(msg) MESSAGES.append(msg)
LOG.info("Messages has been prepared") LOG.info("Messages has been prepared")
@ -398,6 +399,10 @@ def wrap_sigexit(f):
e.signo) e.signo)
for server in SERVERS: for server in SERVERS:
server.stop() server.stop()
server.wait()
finally:
if TRANSPORT:
TRANSPORT.cleanup()
return inner return inner
@ -433,7 +438,7 @@ def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout,
is_cast, messages_count, duration): is_cast, messages_count, duration):
p = eventlet.GreenPool(size=threads) p = eventlet.GreenPool(size=threads)
targets = itertools.cycle(targets) targets = itertools.cycle(targets)
for i in range(0, threads): for i in six.moves.range(threads):
target = next(targets) target = next(targets)
LOG.debug("starting RPC client for target %s", target) LOG.debug("starting RPC client for target %s", target)
client_builder = functools.partial(RPCClient, i, transport, target, client_builder = functools.partial(RPCClient, i, transport, target,
@ -446,7 +451,7 @@ def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout,
def spawn_notify_clients(threads, topic, transport, message_count, def spawn_notify_clients(threads, topic, transport, message_count,
wait_after_msg, timeout, duration): wait_after_msg, timeout, duration):
p = eventlet.GreenPool(size=threads) p = eventlet.GreenPool(size=threads)
for i in range(0, threads): for i in six.moves.range(threads):
client_builder = functools.partial(NotifyClient, i, transport, topic, client_builder = functools.partial(NotifyClient, i, transport, topic,
wait_after_msg) wait_after_msg)
p.spawn_n(send_messages, i, client_builder, message_count, duration) p.spawn_n(send_messages, i, client_builder, message_count, duration)
@ -472,7 +477,7 @@ def send_messages(client_id, client_builder, messages_count, duration):
else: else:
LOG.debug("Sending %d messages using client %d", LOG.debug("Sending %d messages using client %d",
messages_count, client_id) messages_count, client_id)
for _ in six.moves.range(0, messages_count): for _ in six.moves.range(messages_count):
client.send_msg() client.send_msg()
eventlet.sleep() eventlet.sleep()
if not IS_RUNNING: if not IS_RUNNING:
@ -657,10 +662,11 @@ def main():
if args.config_file: if args.config_file:
cfg.CONF(["--config-file", args.config_file]) cfg.CONF(["--config-file", args.config_file])
global TRANSPORT
if args.mode in ['rpc-server', 'rpc-client']: if args.mode in ['rpc-server', 'rpc-client']:
transport = messaging.get_transport(cfg.CONF, url=args.url) TRANSPORT = messaging.get_transport(cfg.CONF, url=args.url)
else: else:
transport = messaging.get_notification_transport(cfg.CONF, TRANSPORT = messaging.get_notification_transport(cfg.CONF,
url=args.url) url=args.url)
if args.mode in ['rpc-client', 'notify-client']: if args.mode in ['rpc-client', 'notify-client']:
@ -680,24 +686,24 @@ def main():
if args.url.startswith('zmq'): if args.url.startswith('zmq'):
cfg.CONF.rpc_zmq_matchmaker = "redis" cfg.CONF.rpc_zmq_matchmaker = "redis"
endpoint = rpc_server(transport, target, args.wait_before_answer, endpoint = rpc_server(TRANSPORT, target, args.wait_before_answer,
args.executor, args.duration) args.executor, args.duration)
show_server_stats(endpoint, args.json_filename) show_server_stats(endpoint, args.json_filename)
elif args.mode == 'notify-server': elif args.mode == 'notify-server':
endpoint = notify_server(transport, args.topic, endpoint = notify_server(TRANSPORT, args.topic,
args.wait_before_answer, args.duration, args.wait_before_answer, args.duration,
args.requeue) args.requeue)
show_server_stats(endpoint, args.json_filename) show_server_stats(endpoint, args.json_filename)
elif args.mode == 'batch-notify-server': elif args.mode == 'batch-notify-server':
endpoint = batch_notify_server(transport, args.topic, endpoint = batch_notify_server(TRANSPORT, args.topic,
args.wait_before_answer, args.wait_before_answer,
args.duration, args.requeue) args.duration, args.requeue)
show_server_stats(endpoint, args.json_filename) show_server_stats(endpoint, args.json_filename)
elif args.mode == 'notify-client': elif args.mode == 'notify-client':
spawn_notify_clients(args.threads, args.topic, transport, spawn_notify_clients(args.threads, args.topic, TRANSPORT,
args.messages, args.wait_after_msg, args.messages, args.wait_after_msg,
args.timeout, args.duration) args.timeout, args.duration)
show_client_stats(CLIENTS, args.json_filename) show_client_stats(CLIENTS, args.json_filename)
@ -707,7 +713,7 @@ def main():
targets = [messaging.Target( targets = [messaging.Target(
topic=topic, server=server_name, fanout=args.is_fanout) for topic=topic, server=server_name, fanout=args.is_fanout) for
topic, server_name in targets] topic, server_name in targets]
spawn_rpc_clients(args.threads, transport, targets, spawn_rpc_clients(args.threads, TRANSPORT, targets,
args.wait_after_msg, args.timeout, args.is_cast, args.wait_after_msg, args.timeout, args.is_cast,
args.messages, args.duration) args.messages, args.duration)