Merge "Simulator: handle SIGINT and SIGTERM signals"
This commit is contained in:
		@@ -22,6 +22,7 @@ import json
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
import random
 | 
			
		||||
import signal
 | 
			
		||||
import six
 | 
			
		||||
import string
 | 
			
		||||
import sys
 | 
			
		||||
@@ -40,6 +41,8 @@ RANDOM_GENERATOR = None
 | 
			
		||||
CURRENT_PID = None
 | 
			
		||||
CLIENTS = []
 | 
			
		||||
MESSAGES = []
 | 
			
		||||
IS_RUNNING = True
 | 
			
		||||
SERVERS = []
 | 
			
		||||
 | 
			
		||||
USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
 | 
			
		||||
 {notify-server,notify-client,rpc-server,rpc-client} ...
 | 
			
		||||
@@ -116,6 +119,8 @@ class MessageStatsCollector(object):
 | 
			
		||||
        threading.Timer(1.0, self.monitor).start()  # schedule in a second
 | 
			
		||||
 | 
			
		||||
    def monitor(self):
 | 
			
		||||
        global IS_RUNNING
 | 
			
		||||
        if IS_RUNNING:
 | 
			
		||||
            threading.Timer(1.0, self.monitor).start()
 | 
			
		||||
        now = time.time()
 | 
			
		||||
 | 
			
		||||
@@ -382,16 +387,17 @@ def generate_messages(messages_count):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def run_server(server, duration=None):
 | 
			
		||||
    try:
 | 
			
		||||
    global IS_RUNNING
 | 
			
		||||
    SERVERS.append(server)
 | 
			
		||||
    server.start()
 | 
			
		||||
    if duration:
 | 
			
		||||
        with timeutils.StopWatch(duration) as stop_watch:
 | 
			
		||||
                while not stop_watch.expired():
 | 
			
		||||
            while not stop_watch.expired() and IS_RUNNING:
 | 
			
		||||
                time.sleep(1)
 | 
			
		||||
        server.stop()
 | 
			
		||||
        IS_RUNNING = False
 | 
			
		||||
    server.wait()
 | 
			
		||||
    except KeyboardInterrupt:  # caught SIGINT
 | 
			
		||||
        LOG.info('Caught SIGINT, terminating')
 | 
			
		||||
    LOG.info('The server is terminating')
 | 
			
		||||
    time.sleep(1)  # wait for stats collector to process the last second
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -430,20 +436,24 @@ def spawn_notify_clients(threads, topic, transport, message_count,
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def send_messages(client_id, client_builder, messages_count, duration):
 | 
			
		||||
    global IS_RUNNING
 | 
			
		||||
    client = client_builder()
 | 
			
		||||
    CLIENTS.append(client)
 | 
			
		||||
 | 
			
		||||
    if duration:
 | 
			
		||||
        with timeutils.StopWatch(duration) as stop_watch:
 | 
			
		||||
            while not stop_watch.expired():
 | 
			
		||||
            while not stop_watch.expired() and IS_RUNNING:
 | 
			
		||||
                client.send_msg()
 | 
			
		||||
                eventlet.sleep()
 | 
			
		||||
        IS_RUNNING = False
 | 
			
		||||
    else:
 | 
			
		||||
        LOG.debug("Sending %d messages using client %d",
 | 
			
		||||
                  messages_count, client_id)
 | 
			
		||||
        for _ in six.moves.range(0, messages_count):
 | 
			
		||||
            client.send_msg()
 | 
			
		||||
            eventlet.sleep()
 | 
			
		||||
            if not IS_RUNNING:
 | 
			
		||||
                break
 | 
			
		||||
        LOG.debug("Client %d has sent %d messages", client_id, messages_count)
 | 
			
		||||
 | 
			
		||||
    time.sleep(1)  # wait for replies to be collected
 | 
			
		||||
@@ -522,6 +532,14 @@ def write_json_file(filename, output):
 | 
			
		||||
        LOG.info('Stats are written into %s', filename)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def signal_handler(signum, frame):
 | 
			
		||||
    global IS_RUNNING
 | 
			
		||||
    IS_RUNNING = False
 | 
			
		||||
    LOG.info('Signal %s is caught. Interrupting the execution', signum)
 | 
			
		||||
    for server in SERVERS:
 | 
			
		||||
        server.stop()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _setup_logging(is_debug):
 | 
			
		||||
    log_level = logging.DEBUG if is_debug else logging.INFO
 | 
			
		||||
    logging.basicConfig(
 | 
			
		||||
@@ -626,6 +644,9 @@ def main():
 | 
			
		||||
    cfg.CONF.prog = os.path.basename(__file__)
 | 
			
		||||
    cfg.CONF.project = 'oslo.messaging'
 | 
			
		||||
 | 
			
		||||
    signal.signal(signal.SIGTERM, signal_handler)
 | 
			
		||||
    signal.signal(signal.SIGINT, signal_handler)
 | 
			
		||||
 | 
			
		||||
    if args.mode == 'rpc-server':
 | 
			
		||||
        target = messaging.Target(topic=args.topic, server=args.server)
 | 
			
		||||
        if args.url.startswith('zmq'):
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user