From d70dfc2f7e051444d2f02b8539c8a2e4ae1988aa Mon Sep 17 00:00:00 2001 From: Yulia Portnova Date: Wed, 17 Feb 2016 15:03:41 +0200 Subject: [PATCH] Added duration to notify server/client Change-Id: I4feeeec0c69305d92dce5baf60502a39ebe6b247 --- tools/simulator.py | 90 ++++++++++++++++++++++++++-------------------- 1 file changed, 51 insertions(+), 39 deletions(-) diff --git a/tools/simulator.py b/tools/simulator.py index 1551955f7..d9ca8d8b0 100755 --- a/tools/simulator.py +++ b/tools/simulator.py @@ -33,6 +33,7 @@ from oslo_config import cfg import oslo_messaging as messaging from oslo_messaging import notify # noqa from oslo_messaging import rpc # noqa +from oslo_utils import timeutils LOG = logging.getLogger() RANDOM_VARIABLE = None @@ -121,13 +122,12 @@ class NotifyEndpoint(Monitor): return messaging.NotificationResult.HANDLED -def notify_server(transport, topic, show_stats): +def notify_server(transport, topic, show_stats, duration): endpoints = [NotifyEndpoint(show_stats)] target = messaging.Target(topic=topic) server = notify.get_notification_listener(transport, [target], endpoints, executor='eventlet') - server.start() - server.wait() + run_server(server, duration=duration) class BatchNotifyEndpoint(Monitor): @@ -152,15 +152,14 @@ class BatchNotifyEndpoint(Monitor): return messaging.NotificationResult.HANDLED -def batch_notify_server(transport, topic, show_stats): +def batch_notify_server(transport, topic, show_stats, duration): endpoints = [BatchNotifyEndpoint(show_stats)] target = messaging.Target(topic=topic) server = notify.get_batch_notification_listener( transport, [target], endpoints, executor='eventlet', batch_size=1000, batch_time=5) - server.start() - server.wait() + run_server(server, duration=duration) class RpcEndpoint(Monitor): @@ -226,23 +225,32 @@ def init_msg(messages_count): LOG.info("Messages has been prepared") +def run_server(server, duration=None): + server.start() + if duration: + with timeutils.StopWatch(duration) as stop_watch: + while not stop_watch.expired(): + time.sleep(1) + server.stop() + server.wait() + + def rpc_server(transport, target, wait_before_answer, executor, show_stats, duration): endpoints = [RpcEndpoint(wait_before_answer, show_stats)] server = rpc.get_rpc_server(transport, target, endpoints, executor=executor) LOG.debug("starting RPC server for target %s", target) - server.start() - if duration: - start_t = time.time() - while time.time() - start_t < duration: - time.sleep(1) - server.stop() - server.wait() - LOG.info("Received total messages: %d", - server.dispatcher.endpoints[0].messages_received) - return - server.wait() + run_server(server, duration=duration) + LOG.info("Received total messages: %d", + server.dispatcher.endpoints[0].messages_received) + + +def spawn_notify_clients(threads, *args, **kwargs): + p = eventlet.GreenPool(size=threads) + for i in range(0, threads): + p.spawn_n(notifier, i, *args, **kwargs) + p.waitall() def spawn_rpc_clients(threads, transport, targets, @@ -256,13 +264,6 @@ def spawn_rpc_clients(threads, transport, targets, p.waitall() -def threads_spawner(threads, method, *args, **kwargs): - p = eventlet.GreenPool(size=threads) - for i in range(0, threads): - p.spawn_n(method, i, *args, **kwargs) - p.waitall() - - def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast, messages_count, duration): rpc_method = _rpc_cast if is_cast else _rpc_call @@ -270,9 +271,9 @@ def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast, RPC_CLIENTS.append(client) if duration: - start_time = time.time() - while time.time() - start_time < duration: - client.send_msg() + with timeutils.StopWatch(duration) as stop_watch: + while not stop_watch.expired(): + client.send_msg() else: LOG.debug("Sending %d messages using client %d", messages_count, c_id) for _ in six.moves.range(0, messages_count): @@ -298,22 +299,30 @@ def _rpc_cast(client, msg): LOG.debug("SENT: %s", msg) -def notifier(_id, topic, transport, messages, wait_after_msg, timeout): +def notifier(_id, topic, transport, messages, wait_after_msg, timeout, + duration): n1 = notify.Notifier(transport, driver='messaging', topic=topic).prepare( publisher_id='publisher-%d' % _id) - msg = 0 - for i in range(0, messages): - msg = 1 + msg - ctxt = {} - payload = dict(msg=msg, vm='test', otherdata='ahah') - LOG.debug("send msg") - LOG.debug(payload) + payload = dict(msg=0, vm='test', otherdata='ahah') + ctxt = {} + + def send_notif(): + payload['msg'] += 1 + LOG.debug("sending notification %s", payload) n1.info(ctxt, 'compute.start1', payload) if wait_after_msg > 0: time.sleep(wait_after_msg) + if duration: + with timeutils.StopWatch(duration) as stop_watch: + while not stop_watch.expired(): + send_notif() + else: + for i in range(0, messages): + send_notif() + def _setup_logging(is_debug): log_level = logging.DEBUG if is_debug else logging.INFO @@ -360,6 +369,7 @@ def main(): server = subparsers.add_parser('batch-notify-server') server.add_argument('--show-stats', dest='show_stats', type=bool, default=True) + client = subparsers.add_parser('notify-client') client.add_argument('-p', dest='threads', type=int, default=1, help='number of client threads') @@ -420,12 +430,14 @@ def main(): rpc_server(transport, target, args.wait_before_answer, args.executor, args.show_stats, args.duration) elif args.mode == 'notify-server': - notify_server(transport, args.topic, args.show_stats) + notify_server(transport, args.topic, args.show_stats, args.duration) elif args.mode == 'batch-notify-server': - batch_notify_server(transport, args.topic, args.show_stats) + batch_notify_server(transport, args.topic, args.show_stats, + args.duration) elif args.mode == 'notify-client': - threads_spawner(args.threads, notifier, args.topic, transport, - args.messages, args.wait_after_msg, args.timeout) + spawn_notify_clients(args.threads, args.topic, transport, + args.messages, args.wait_after_msg, args.timeout, + args.duration) elif args.mode == 'rpc-client': init_msg(args.messages) targets = [target.partition('.')[::2] for target in args.targets]