diff --git a/tools/simulator.py b/tools/simulator.py index 6ab9e6dce..0bc0c3ba3 100755 --- a/tools/simulator.py +++ b/tools/simulator.py @@ -165,8 +165,10 @@ class RpcEndpoint(Monitor): def __init__(self, wait_before_answer, show_stats): self.count = None self.wait_before_answer = wait_before_answer + self.messages_received = 0 def info(self, ctxt, message): + self.messages_received += 1 i = int(message.split(' ')[-1]) if self.count is None: self.count = i @@ -184,7 +186,7 @@ class RpcEndpoint(Monitor): class RPCClient(object): def __init__(self, transport, target, timeout, method, wait_after_msg): self.client = rpc.RPCClient(transport, target) - self.client.prepare(timeout=timeout) + self.client = self.client.prepare(timeout=timeout) self.method = method self.bytes = 0 self.msg_sent = 0 @@ -222,11 +224,21 @@ def init_msg(messages_count): LOG.info("Messages has been prepared") -def rpc_server(transport, target, wait_before_answer, executor, show_stats): +def rpc_server(transport, target, wait_before_answer, executor, show_stats, + duration): endpoints = [RpcEndpoint(wait_before_answer, show_stats)] server = rpc.get_rpc_server(transport, target, endpoints, executor=executor) server.start() + if duration: + start_t = time.time() + while time.time() - start_t < duration: + time.sleep(1) + server.stop() + server.wait() + LOG.info("Received total messages: %d", + server.dispatcher.endpoints[0].messages_received) + return server.wait() @@ -238,14 +250,20 @@ def threads_spawner(threads, method, *args, **kwargs): def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast, - messages_count): - LOG.debug("Sending %d messages using client %d", messages_count, c_id) + messages_count, duration): rpc_method = _rpc_cast if is_cast else _rpc_call client = RPCClient(transport, target, timeout, rpc_method, wait_after_msg) RPC_CLIENTS.append(client) - for _ in xrange(0, messages_count): - client.send_msg() - LOG.debug("Client %d has sent all messages", c_id) + + if duration: + start_time = time.time() + while time.time() - start_time < duration: + client.send_msg() + else: + LOG.debug("Sending %d messages using client %d", messages_count, c_id) + for _ in xrange(0, messages_count): + client.send_msg() + LOG.debug("Client %d has sent %d messages", c_id, messages_count) def _rpc_call(client, msg): @@ -304,6 +322,9 @@ def main(): parser.add_argument('-tp', '--topic', dest='topic', default="profiler_topic", help="Topic to publish/receive messages to/from.") + parser.add_argument('-l', dest='duration', type=int, + help='send messages for certain time') + subparsers = parser.add_subparsers(dest='mode', help='notify/rpc server/client mode') @@ -369,7 +390,7 @@ def main(): cfg.CONF.rpc_zmq_matchmaker = "redis" transport._driver.matchmaker._redis.flushdb() rpc_server(transport, target, args.wait_before_answer, args.executor, - args.show_stats) + args.show_stats, args.duration) elif args.mode == 'notify-server': notify_server(transport, args.show_stats) elif args.mode == 'batch-notify-server': @@ -383,7 +404,7 @@ def main(): start = datetime.datetime.now() threads_spawner(args.threads, send_msg, transport, target, args.wait_after_msg, args.timeout, args.is_cast, - args.messages) + args.messages, args.duration) time_elapsed = (datetime.datetime.now() - start).total_seconds() msg_count = 0