diff --git a/tools/simulator.py b/tools/simulator.py index 0a2af19b7..1ce2419a7 100755 --- a/tools/simulator.py +++ b/tools/simulator.py @@ -22,6 +22,7 @@ import json import logging import os import random +import signal import six import string import sys @@ -40,6 +41,8 @@ RANDOM_GENERATOR = None CURRENT_PID = None CLIENTS = [] MESSAGES = [] +IS_RUNNING = True +SERVERS = [] USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\ {notify-server,notify-client,rpc-server,rpc-client} ... @@ -115,7 +118,9 @@ class MessageStatsCollector(object): threading.Timer(1.0, self.monitor).start() # schedule in a second def monitor(self): - threading.Timer(1.0, self.monitor).start() + global IS_RUNNING + if IS_RUNNING: + threading.Timer(1.0, self.monitor).start() now = time.time() count = len(self.buffer) @@ -381,17 +386,18 @@ def generate_messages(messages_count): def run_server(server, duration=None): - try: - server.start() - if duration: - with timeutils.StopWatch(duration) as stop_watch: - while not stop_watch.expired(): - time.sleep(1) - server.stop() - server.wait() - except KeyboardInterrupt: # caught SIGINT - LOG.info('Caught SIGINT, terminating') - time.sleep(1) # wait for stats collector to process the last second + global IS_RUNNING + SERVERS.append(server) + server.start() + if duration: + with timeutils.StopWatch(duration) as stop_watch: + while not stop_watch.expired() and IS_RUNNING: + time.sleep(1) + server.stop() + IS_RUNNING = False + server.wait() + LOG.info('The server is terminating') + time.sleep(1) # wait for stats collector to process the last second def rpc_server(transport, target, wait_before_answer, executor, duration): @@ -429,20 +435,24 @@ def spawn_notify_clients(threads, topic, transport, message_count, def send_messages(client_id, client_builder, messages_count, duration): + global IS_RUNNING client = client_builder() CLIENTS.append(client) if duration: with timeutils.StopWatch(duration) as stop_watch: - while not stop_watch.expired(): + while not stop_watch.expired() and IS_RUNNING: client.send_msg() eventlet.sleep() + IS_RUNNING = False else: LOG.debug("Sending %d messages using client %d", messages_count, client_id) for _ in six.moves.range(0, messages_count): client.send_msg() eventlet.sleep() + if not IS_RUNNING: + break LOG.debug("Client %d has sent %d messages", client_id, messages_count) time.sleep(1) # wait for replies to be collected @@ -521,6 +531,14 @@ def write_json_file(filename, output): LOG.info('Stats are written into %s', filename) +def signal_handler(signum, frame): + global IS_RUNNING + IS_RUNNING = False + LOG.info('Signal %s is caught. Interrupting the execution', signum) + for server in SERVERS: + server.stop() + + def _setup_logging(is_debug): log_level = logging.DEBUG if is_debug else logging.INFO logging.basicConfig( @@ -625,6 +643,9 @@ def main(): cfg.CONF.prog = os.path.basename(__file__) cfg.CONF.project = 'oslo.messaging' + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + if args.mode == 'rpc-server': target = messaging.Target(topic=args.topic, server=args.server) if args.url.startswith('zmq'):