Simulator: handle SIGINT and SIGTERM signals
Now it is possible to stop simulator client and server by sending SIGINT or SIGTERM signals. Note that both stop gracefully and it takes some time to do this. Change-Id: Ie3fd1ea8b146070d61a247fd8ccc124df8d34848
This commit is contained in:
		@@ -22,6 +22,7 @@ import json
 | 
				
			|||||||
import logging
 | 
					import logging
 | 
				
			||||||
import os
 | 
					import os
 | 
				
			||||||
import random
 | 
					import random
 | 
				
			||||||
 | 
					import signal
 | 
				
			||||||
import six
 | 
					import six
 | 
				
			||||||
import string
 | 
					import string
 | 
				
			||||||
import sys
 | 
					import sys
 | 
				
			||||||
@@ -40,6 +41,8 @@ RANDOM_GENERATOR = None
 | 
				
			|||||||
CURRENT_PID = None
 | 
					CURRENT_PID = None
 | 
				
			||||||
CLIENTS = []
 | 
					CLIENTS = []
 | 
				
			||||||
MESSAGES = []
 | 
					MESSAGES = []
 | 
				
			||||||
 | 
					IS_RUNNING = True
 | 
				
			||||||
 | 
					SERVERS = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
 | 
					USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
 | 
				
			||||||
 {notify-server,notify-client,rpc-server,rpc-client} ...
 | 
					 {notify-server,notify-client,rpc-server,rpc-client} ...
 | 
				
			||||||
@@ -115,7 +118,9 @@ class MessageStatsCollector(object):
 | 
				
			|||||||
        threading.Timer(1.0, self.monitor).start()  # schedule in a second
 | 
					        threading.Timer(1.0, self.monitor).start()  # schedule in a second
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def monitor(self):
 | 
					    def monitor(self):
 | 
				
			||||||
        threading.Timer(1.0, self.monitor).start()
 | 
					        global IS_RUNNING
 | 
				
			||||||
 | 
					        if IS_RUNNING:
 | 
				
			||||||
 | 
					            threading.Timer(1.0, self.monitor).start()
 | 
				
			||||||
        now = time.time()
 | 
					        now = time.time()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        count = len(self.buffer)
 | 
					        count = len(self.buffer)
 | 
				
			||||||
@@ -381,17 +386,18 @@ def generate_messages(messages_count):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def run_server(server, duration=None):
 | 
					def run_server(server, duration=None):
 | 
				
			||||||
    try:
 | 
					    global IS_RUNNING
 | 
				
			||||||
        server.start()
 | 
					    SERVERS.append(server)
 | 
				
			||||||
        if duration:
 | 
					    server.start()
 | 
				
			||||||
            with timeutils.StopWatch(duration) as stop_watch:
 | 
					    if duration:
 | 
				
			||||||
                while not stop_watch.expired():
 | 
					        with timeutils.StopWatch(duration) as stop_watch:
 | 
				
			||||||
                    time.sleep(1)
 | 
					            while not stop_watch.expired() and IS_RUNNING:
 | 
				
			||||||
            server.stop()
 | 
					                time.sleep(1)
 | 
				
			||||||
        server.wait()
 | 
					        server.stop()
 | 
				
			||||||
    except KeyboardInterrupt:  # caught SIGINT
 | 
					        IS_RUNNING = False
 | 
				
			||||||
        LOG.info('Caught SIGINT, terminating')
 | 
					    server.wait()
 | 
				
			||||||
        time.sleep(1)  # wait for stats collector to process the last second
 | 
					    LOG.info('The server is terminating')
 | 
				
			||||||
 | 
					    time.sleep(1)  # wait for stats collector to process the last second
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def rpc_server(transport, target, wait_before_answer, executor, duration):
 | 
					def rpc_server(transport, target, wait_before_answer, executor, duration):
 | 
				
			||||||
@@ -429,20 +435,24 @@ def spawn_notify_clients(threads, topic, transport, message_count,
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def send_messages(client_id, client_builder, messages_count, duration):
 | 
					def send_messages(client_id, client_builder, messages_count, duration):
 | 
				
			||||||
 | 
					    global IS_RUNNING
 | 
				
			||||||
    client = client_builder()
 | 
					    client = client_builder()
 | 
				
			||||||
    CLIENTS.append(client)
 | 
					    CLIENTS.append(client)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if duration:
 | 
					    if duration:
 | 
				
			||||||
        with timeutils.StopWatch(duration) as stop_watch:
 | 
					        with timeutils.StopWatch(duration) as stop_watch:
 | 
				
			||||||
            while not stop_watch.expired():
 | 
					            while not stop_watch.expired() and IS_RUNNING:
 | 
				
			||||||
                client.send_msg()
 | 
					                client.send_msg()
 | 
				
			||||||
                eventlet.sleep()
 | 
					                eventlet.sleep()
 | 
				
			||||||
 | 
					        IS_RUNNING = False
 | 
				
			||||||
    else:
 | 
					    else:
 | 
				
			||||||
        LOG.debug("Sending %d messages using client %d",
 | 
					        LOG.debug("Sending %d messages using client %d",
 | 
				
			||||||
                  messages_count, client_id)
 | 
					                  messages_count, client_id)
 | 
				
			||||||
        for _ in six.moves.range(0, messages_count):
 | 
					        for _ in six.moves.range(0, messages_count):
 | 
				
			||||||
            client.send_msg()
 | 
					            client.send_msg()
 | 
				
			||||||
            eventlet.sleep()
 | 
					            eventlet.sleep()
 | 
				
			||||||
 | 
					            if not IS_RUNNING:
 | 
				
			||||||
 | 
					                break
 | 
				
			||||||
        LOG.debug("Client %d has sent %d messages", client_id, messages_count)
 | 
					        LOG.debug("Client %d has sent %d messages", client_id, messages_count)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    time.sleep(1)  # wait for replies to be collected
 | 
					    time.sleep(1)  # wait for replies to be collected
 | 
				
			||||||
@@ -521,6 +531,14 @@ def write_json_file(filename, output):
 | 
				
			|||||||
        LOG.info('Stats are written into %s', filename)
 | 
					        LOG.info('Stats are written into %s', filename)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def signal_handler(signum, frame):
 | 
				
			||||||
 | 
					    global IS_RUNNING
 | 
				
			||||||
 | 
					    IS_RUNNING = False
 | 
				
			||||||
 | 
					    LOG.info('Signal %s is caught. Interrupting the execution', signum)
 | 
				
			||||||
 | 
					    for server in SERVERS:
 | 
				
			||||||
 | 
					        server.stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def _setup_logging(is_debug):
 | 
					def _setup_logging(is_debug):
 | 
				
			||||||
    log_level = logging.DEBUG if is_debug else logging.INFO
 | 
					    log_level = logging.DEBUG if is_debug else logging.INFO
 | 
				
			||||||
    logging.basicConfig(
 | 
					    logging.basicConfig(
 | 
				
			||||||
@@ -625,6 +643,9 @@ def main():
 | 
				
			|||||||
    cfg.CONF.prog = os.path.basename(__file__)
 | 
					    cfg.CONF.prog = os.path.basename(__file__)
 | 
				
			||||||
    cfg.CONF.project = 'oslo.messaging'
 | 
					    cfg.CONF.project = 'oslo.messaging'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    signal.signal(signal.SIGTERM, signal_handler)
 | 
				
			||||||
 | 
					    signal.signal(signal.SIGINT, signal_handler)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if args.mode == 'rpc-server':
 | 
					    if args.mode == 'rpc-server':
 | 
				
			||||||
        target = messaging.Target(topic=args.topic, server=args.server)
 | 
					        target = messaging.Target(topic=args.topic, server=args.server)
 | 
				
			||||||
        if args.url.startswith('zmq'):
 | 
					        if args.url.startswith('zmq'):
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user