# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import eventlet eventlet.monkey_patch() import argparse import bisect import collections import functools import itertools import json import logging import os import random import six import string import sys import threading import time import yaml from oslo_config import cfg import oslo_messaging as messaging from oslo_messaging import notify # noqa from oslo_messaging import rpc # noqa from oslo_utils import timeutils LOG = logging.getLogger() RANDOM_GENERATOR = None CURRENT_PID = None CLIENTS = [] MESSAGES = [] USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\ {notify-server,notify-client,rpc-server,rpc-client} ... Usage example: python tools/simulator.py\ --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-server python tools/simulator.py\ --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-client\ --exit-wait 15000 -p 64 -m 64""" MESSAGES_LIMIT = 1000 DISTRIBUTION_BUCKET_SIZE = 500 def init_random_generator(): data = [] with open('./messages_length.yaml') as m_file: content = yaml.safe_load(m_file) data += [int(n) for n in content[ 'test_data']['string_lengths'].split(', ')] ranges = collections.defaultdict(int) for msg_length in data: range_start = ((msg_length // DISTRIBUTION_BUCKET_SIZE) * DISTRIBUTION_BUCKET_SIZE + 1) ranges[range_start] += 1 ranges_start = sorted(ranges.keys()) total_count = len(data) accumulated_distribution = [] running_total = 0 for range_start in ranges_start: norm = float(ranges[range_start]) / total_count running_total += norm accumulated_distribution.append(running_total) def weighted_random_choice(): r = random.random() * running_total start = ranges_start[bisect.bisect_right(accumulated_distribution, r)] return random.randrange(start, start + DISTRIBUTION_BUCKET_SIZE) return weighted_random_choice class LoggingNoParsingFilter(logging.Filter): def filter(self, record): msg = record.getMessage() for i in ['received {', 'MSG_ID is ']: if i in msg: return False return True Message = collections.namedtuple( 'Message', ['seq', 'cargo', 'client_ts', 'server_ts', 'return_ts']) def make_message(seq, cargo, client_ts=0, server_ts=0, return_ts=0): return Message(seq, cargo, client_ts, server_ts, return_ts) def update_message(message, **kwargs): return Message(*message)._replace(**kwargs) class MessageStatsCollector(object): def __init__(self, label): self.label = label self.buffer = [] # buffer to store messages during report interval self.series = [] # stats for every report interval threading.Timer(1.0, self.monitor).start() # schedule in a second def monitor(self): threading.Timer(1.0, self.monitor).start() now = time.time() count = len(self.buffer) size = 0 min_latency = sys.maxsize max_latency = 0 sum_latencies = 0 for i in range(count): p = self.buffer[i] size += len(p.cargo) latency = None if p.return_ts: latency = p.return_ts - p.client_ts # round-trip elif p.server_ts: latency = p.server_ts - p.client_ts # client -> server if latency: sum_latencies += latency min_latency = min(min_latency, latency) max_latency = max(max_latency, latency) del self.buffer[:count] # trim processed items seq = len(self.series) stats = dict(seq=seq, timestamp=now, count=count, size=size) msg = ('%-14s: seq: %-4d count: %-6d bytes: %-10d' % (self.label, seq, count, size)) if sum_latencies: latency = sum_latencies / count stats.update(dict(latency=latency, min_latency=min_latency, max_latency=max_latency)) msg += (' latency: %-9.3f min: %-9.3f max: %-9.3f' % (latency, min_latency, max_latency)) self.series.append(stats) LOG.info(msg) def push(self, parsed_message): self.buffer.append(parsed_message) def get_series(self): return self.series @staticmethod def calc_stats(label, *collectors): count = 0 size = 0 min_latency = sys.maxsize max_latency = 0 sum_latencies = 0 start = sys.maxsize end = 0 for point in itertools.chain(*(c.get_series() for c in collectors)): count += point['count'] size += point['size'] start = min(start, point['timestamp']) end = max(end, point['timestamp']) if 'latency' in point: sum_latencies += point['latency'] * point['count'] min_latency = min(min_latency, point['min_latency']) max_latency = max(max_latency, point['max_latency']) # start is the timestamp of the earliest block, which inclides samples # for the prior second start -= 1 duration = end - start if count else 0 stats = dict(count=count, size=size, duration=duration, count_p_s=0, size_p_s=0) if duration: stats.update(dict(start=start, end=end, count_p_s=count / duration, size_p_s=size / duration)) msg = ('%s: duration: %.2f count: %d (%.1f msg/sec) ' 'bytes: %d (%.0f bps)' % (label, duration, count, stats['count_p_s'], size, stats['size_p_s'])) if sum_latencies: latency = sum_latencies / count stats.update(dict(latency=latency, min_latency=min_latency, max_latency=max_latency)) msg += (' latency: %.3f min: %.3f max: %.3f' % (latency, min_latency, max_latency)) LOG.info(msg) return stats class NotifyEndpoint(object): def __init__(self, wait_before_answer, requeue): self.wait_before_answer = wait_before_answer self.requeue = requeue self.received_messages = MessageStatsCollector('server') self.cache = set() def info(self, ctxt, publisher_id, event_type, payload, metadata): LOG.debug("%s %s %s %s", ctxt, publisher_id, event_type, payload) server_ts = time.time() message = update_message(payload, server_ts=server_ts) self.received_messages.push(message) if self.requeue and message.seq not in self.cache: self.cache.add(message.seq) if self.wait_before_answer > 0: time.sleep(self.wait_before_answer) return messaging.NotificationResult.REQUEUE return messaging.NotificationResult.HANDLED def notify_server(transport, topic, wait_before_answer, duration, requeue): endpoints = [NotifyEndpoint(wait_before_answer, requeue)] target = messaging.Target(topic=topic) server = notify.get_notification_listener(transport, [target], endpoints, executor='eventlet') run_server(server, duration=duration) return endpoints[0] class BatchNotifyEndpoint(object): def __init__(self, wait_before_answer, requeue): self.wait_before_answer = wait_before_answer self.requeue = requeue self.received_messages = MessageStatsCollector('server') self.cache = set() def info(self, batch): LOG.debug('msg rcv') LOG.debug("%s", batch) server_ts = time.time() for item in batch: message = update_message(item['payload'], server_ts=server_ts) self.received_messages.push(message) return messaging.NotificationResult.HANDLED def batch_notify_server(transport, topic, wait_before_answer, duration, requeue): endpoints = [BatchNotifyEndpoint(wait_before_answer, requeue)] target = messaging.Target(topic=topic) server = notify.get_batch_notification_listener( transport, [target], endpoints, executor='eventlet', batch_size=1000, batch_timeout=5) run_server(server, duration=duration) return endpoints[0] class RpcEndpoint(object): def __init__(self, wait_before_answer): self.wait_before_answer = wait_before_answer self.received_messages = MessageStatsCollector('server') def info(self, ctxt, message): server_ts = time.time() LOG.debug("######## RCV: %s", message) reply = update_message(message, server_ts=server_ts) self.received_messages.push(reply) if self.wait_before_answer > 0: time.sleep(self.wait_before_answer) return reply class Client(object): def __init__(self, client_id, client, method, has_result, wait_after_msg): self.client_id = client_id self.client = client self.method = method self.wait_after_msg = wait_after_msg self.seq = 0 self.messages_count = len(MESSAGES) # Start sending the messages from a random position to avoid # memory re-usage and generate more realistic load on the library # and a message transport self.position = random.randint(0, self.messages_count - 1) self.sent_messages = MessageStatsCollector('client-%s' % client_id) self.errors = MessageStatsCollector('error-%s' % client_id) if has_result: self.round_trip_messages = MessageStatsCollector( 'round-trip-%s' % client_id) def send_msg(self): msg = make_message(self.seq, MESSAGES[self.position], time.time()) self.sent_messages.push(msg) res = None try: res = self.method(self.client, msg) except Exception: self.errors.push(msg) else: LOG.debug("SENT: %s", msg) if res: return_ts = time.time() res = update_message(res, return_ts=return_ts) self.round_trip_messages.push(res) self.seq += 1 self.position = (self.position + 1) % self.messages_count if self.wait_after_msg > 0: time.sleep(self.wait_after_msg) class RPCClient(Client): def __init__(self, client_id, transport, target, timeout, is_cast, wait_after_msg): client = rpc.RPCClient(transport, target).prepare(timeout=timeout) method = _rpc_cast if is_cast else _rpc_call super(RPCClient, self).__init__(client_id, client, method, not is_cast, wait_after_msg) class NotifyClient(Client): def __init__(self, client_id, transport, topic, wait_after_msg): client = notify.Notifier(transport, driver='messaging', topic=topic) client = client.prepare(publisher_id='publisher-%d' % client_id) method = _notify super(NotifyClient, self).__init__(client_id, client, method, False, wait_after_msg) def generate_messages(messages_count): # Limit the messages amount. Clients will reiterate the array again # if an amount of messages to be sent is bigger than MESSAGES_LIMIT if messages_count > MESSAGES_LIMIT: messages_count = MESSAGES_LIMIT LOG.info("Generating %d random messages", messages_count) for i in range(messages_count): length = RANDOM_GENERATOR() msg = ''.join(random.choice( string.ascii_lowercase) for x in range(length)) MESSAGES.append(msg) LOG.info("Messages has been prepared") def run_server(server, duration=None): try: server.start() if duration: with timeutils.StopWatch(duration) as stop_watch: while not stop_watch.expired(): time.sleep(1) server.stop() server.wait() except KeyboardInterrupt: # caught SIGINT LOG.info('Caught SIGINT, terminating') time.sleep(1) # wait for stats collector to process the last second def rpc_server(transport, target, wait_before_answer, executor, duration): endpoints = [RpcEndpoint(wait_before_answer)] server = rpc.get_rpc_server(transport, target, endpoints, executor=executor) LOG.debug("starting RPC server for target %s", target) run_server(server, duration=duration) return server.dispatcher.endpoints[0] def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout, is_cast, messages_count, duration): p = eventlet.GreenPool(size=threads) targets = itertools.cycle(targets) for i in range(0, threads): target = next(targets) LOG.debug("starting RPC client for target %s", target) client_builder = functools.partial(RPCClient, i, transport, target, timeout, is_cast, wait_after_msg) p.spawn_n(send_messages, i, client_builder, messages_count, duration) p.waitall() def spawn_notify_clients(threads, topic, transport, message_count, wait_after_msg, timeout, duration): p = eventlet.GreenPool(size=threads) for i in range(0, threads): client_builder = functools.partial(NotifyClient, i, transport, topic, wait_after_msg) p.spawn_n(send_messages, i, client_builder, message_count, duration) p.waitall() def send_messages(client_id, client_builder, messages_count, duration): client = client_builder() CLIENTS.append(client) if duration: with timeutils.StopWatch(duration) as stop_watch: while not stop_watch.expired(): client.send_msg() eventlet.sleep() else: LOG.debug("Sending %d messages using client %d", messages_count, client_id) for _ in six.moves.range(0, messages_count): client.send_msg() eventlet.sleep() LOG.debug("Client %d has sent %d messages", client_id, messages_count) time.sleep(1) # wait for replies to be collected def _rpc_call(client, msg): try: res = client.call({}, 'info', message=msg) except Exception as e: LOG.exception('Error %s on CALL for message %s', str(e), msg) raise else: LOG.debug("SENT: %s, RCV: %s", msg, res) return res def _rpc_cast(client, msg): try: client.cast({}, 'info', message=msg) except Exception as e: LOG.exception('Error %s on CAST for message %s', str(e), msg) raise else: LOG.debug("SENT: %s", msg) def _notify(notification_client, msg): notification_client.info({}, 'compute.start', msg) def show_server_stats(endpoint, json_filename): LOG.info('=' * 35 + ' summary ' + '=' * 35) output = dict(series={}, summary={}) output['series']['server'] = endpoint.received_messages.get_series() stats = MessageStatsCollector.calc_stats( 'server', endpoint.received_messages) output['summary'] = stats if json_filename: write_json_file(json_filename, output) def show_client_stats(clients, json_filename, has_reply=False): LOG.info('=' * 35 + ' summary ' + '=' * 35) output = dict(series={}, summary={}) for cl in clients: cl_id = cl.client_id output['series']['client_%s' % cl_id] = cl.sent_messages.get_series() output['series']['error_%s' % cl_id] = cl.errors.get_series() if has_reply: output['series']['round_trip_%s' % cl_id] = ( cl.round_trip_messages.get_series()) sent_stats = MessageStatsCollector.calc_stats( 'client', *(cl.sent_messages for cl in clients)) output['summary']['client'] = sent_stats error_stats = MessageStatsCollector.calc_stats( 'error', *(cl.errors for cl in clients)) output['summary']['error'] = error_stats if has_reply: round_trip_stats = MessageStatsCollector.calc_stats( 'round-trip', *(cl.round_trip_messages for cl in clients)) output['summary']['round_trip'] = round_trip_stats if json_filename: write_json_file(json_filename, output) def write_json_file(filename, output): with open(filename, 'w') as f: f.write(json.dumps(output)) LOG.info('Stats are written into %s', filename) def _setup_logging(is_debug): log_level = logging.DEBUG if is_debug else logging.INFO logging.basicConfig( stream=sys.stdout, level=log_level, format="%(asctime)-15s %(levelname)s %(name)s %(message)s") logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter()) for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging' 'oslo.messaging._drivers.amqp', ]: logging.getLogger(i).setLevel(logging.WARN) def main(): parser = argparse.ArgumentParser( description='Tools to play with oslo.messaging\'s RPC', usage=USAGE, ) parser.add_argument('--url', dest='url', default='rabbit://guest:password@localhost/', help="oslo.messaging transport url") parser.add_argument('-d', '--debug', dest='debug', type=bool, default=False, help="Turn on DEBUG logging level instead of WARN") parser.add_argument('-tp', '--topic', dest='topic', default="profiler_topic", help="Topics to publish/receive messages to/from.") parser.add_argument('-s', '--server', dest='server', default="profiler_server", help="Servers to publish/receive messages to/from.") parser.add_argument('-tg', '--targets', dest='targets', nargs="+", default=["profiler_topic.profiler_server"], help="Targets to publish/receive messages to/from.") parser.add_argument('-l', dest='duration', type=int, help='send messages for certain time') parser.add_argument('-j', '--json', dest='json_filename', help='File name to store results in JSON format') parser.add_argument('--config-file', dest='config_file', type=str, help="Oslo messaging config file") subparsers = parser.add_subparsers(dest='mode', help='notify/rpc server/client mode') server = subparsers.add_parser('notify-server') server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) server.add_argument('--requeue', dest='requeue', action='store_true') server = subparsers.add_parser('batch-notify-server') server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) server.add_argument('--requeue', dest='requeue', action='store_true') client = subparsers.add_parser('notify-client') client.add_argument('-p', dest='threads', type=int, default=1, help='number of client threads') client.add_argument('-m', dest='messages', type=int, default=1, help='number of call per threads') client.add_argument('-w', dest='wait_after_msg', type=int, default=-1, help='sleep time between two messages') client.add_argument('--timeout', dest='timeout', type=int, default=3, help='client timeout') server = subparsers.add_parser('rpc-server') server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) server.add_argument('-e', '--executor', dest='executor', type=str, default='eventlet', help='name of a message executor') client = subparsers.add_parser('rpc-client') client.add_argument('-p', dest='threads', type=int, default=1, help='number of client threads') client.add_argument('-m', dest='messages', type=int, default=1, help='number of call per threads') client.add_argument('-w', dest='wait_after_msg', type=int, default=-1, help='sleep time between two messages') client.add_argument('--timeout', dest='timeout', type=int, default=3, help='client timeout') client.add_argument('--exit-wait', dest='exit_wait', type=int, default=0, help='Keep connections open N seconds after calls ' 'have been done') client.add_argument('--is-cast', dest='is_cast', type=bool, default=False, help='Use `call` or `cast` RPC methods') client.add_argument('--is-fanout', dest='is_fanout', type=bool, default=False, help='fanout=True for CAST messages') args = parser.parse_args() _setup_logging(is_debug=args.debug) if args.config_file: cfg.CONF(["--config-file", args.config_file]) if args.mode in ['rpc-server', 'rpc-client']: transport = messaging.get_transport(cfg.CONF, url=args.url) else: transport = messaging.get_notification_transport(cfg.CONF, url=args.url) if args.mode in ['rpc-client', 'notify-client']: # always generate maximum number of messages for duration-limited tests generate_messages(MESSAGES_LIMIT if args.duration else args.messages) # oslo.config defaults cfg.CONF.heartbeat_interval = 5 cfg.CONF.prog = os.path.basename(__file__) cfg.CONF.project = 'oslo.messaging' if args.mode == 'rpc-server': target = messaging.Target(topic=args.topic, server=args.server) if args.url.startswith('zmq'): cfg.CONF.rpc_zmq_matchmaker = "redis" endpoint = rpc_server(transport, target, args.wait_before_answer, args.executor, args.duration) show_server_stats(endpoint, args.json_filename) elif args.mode == 'notify-server': endpoint = notify_server(transport, args.topic, args.wait_before_answer, args.duration, args.requeue) show_server_stats(endpoint, args.json_filename) elif args.mode == 'batch-notify-server': endpoint = batch_notify_server(transport, args.topic, args.wait_before_answer, args.duration, args.requeue) show_server_stats(endpoint, args.json_filename) elif args.mode == 'notify-client': spawn_notify_clients(args.threads, args.topic, transport, args.messages, args.wait_after_msg, args.timeout, args.duration) show_client_stats(CLIENTS, args.json_filename) elif args.mode == 'rpc-client': targets = [target.partition('.')[::2] for target in args.targets] targets = [messaging.Target( topic=topic, server=server_name, fanout=args.is_fanout) for topic, server_name in targets] spawn_rpc_clients(args.threads, transport, targets, args.wait_after_msg, args.timeout, args.is_cast, args.messages, args.duration) show_client_stats(CLIENTS, args.json_filename, not args.is_cast) if args.exit_wait: LOG.info("Finished. waiting for %d seconds", args.exit_wait) time.sleep(args.exit_wait) if __name__ == '__main__': RANDOM_GENERATOR = init_random_generator() CURRENT_PID = os.getpid() main()