diff --git a/tools/simulator.py b/tools/simulator.py index c3f94158f..6ab9e6dce 100755 --- a/tools/simulator.py +++ b/tools/simulator.py @@ -35,6 +35,8 @@ from oslo_messaging import rpc # noqa LOG = logging.getLogger() RANDOM_VARIABLE = None CURRENT_PID = None +RPC_CLIENTS = [] +MESSAGES = [] USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\ {notify-server,notify-client,rpc-server,rpc-client} ... @@ -88,8 +90,8 @@ class Monitor(object): def _monitor(self): threading.Timer(1.0, self._monitor).start() - print ("%d msg was received per second" - % (self._count - self._prev_count)) + LOG.debug("%d msg was received per second", + (self._count - self._prev_count)) self._prev_count = self._count def info(self, *args, **kwargs): @@ -104,16 +106,16 @@ class NotifyEndpoint(Monitor): def info(self, ctxt, publisher_id, event_type, payload, metadata): super(NotifyEndpoint, self).info(ctxt, publisher_id, event_type, payload, metadata) - LOG.info('msg rcv') - LOG.info("%s %s %s %s" % (ctxt, publisher_id, event_type, payload)) + LOG.debug('msg rcv') + LOG.debug("%s %s %s %s", ctxt, publisher_id, event_type, payload) if not self.show_stats and payload not in self.cache: - LOG.info('requeue msg') + LOG.debug('requeue msg') self.cache.append(payload) for i in range(15): eventlet.sleep(1) return messaging.NotificationResult.REQUEUE else: - LOG.info('ack msg') + LOG.debug('ack msg') return messaging.NotificationResult.HANDLED @@ -135,16 +137,16 @@ class BatchNotifyEndpoint(Monitor): super(BatchNotifyEndpoint, self).info(messages) self._count += len(messages) - 1 - LOG.info('msg rcv') - LOG.info("%s" % messages) + LOG.debug('msg rcv') + LOG.debug("%s", messages) if not self.show_stats and messages not in self.cache: - LOG.info('requeue msg') + LOG.debug('requeue msg') self.cache.append(messages) for i in range(15): eventlet.sleep(1) return messaging.NotificationResult.REQUEUE else: - LOG.info('ack msg') + LOG.debug('ack msg') return messaging.NotificationResult.HANDLED @@ -173,12 +175,53 @@ class RpcEndpoint(Monitor): else: self.count += 1 - LOG.info("######## RCV: %s/%s" % (self.count, message)) + LOG.debug("######## RCV: %s/%s", self.count, message) if self.wait_before_answer > 0: time.sleep(self.wait_before_answer) return "OK: %s" % message +class RPCClient(object): + def __init__(self, transport, target, timeout, method, wait_after_msg): + self.client = rpc.RPCClient(transport, target) + self.client.prepare(timeout=timeout) + self.method = method + self.bytes = 0 + self.msg_sent = 0 + self.messages_count = len(MESSAGES) + # Start sending the messages from a random position to avoid + # memory re-usage and generate more realistic load on the library + # and a message transport + self.position = random.randint(0, self.messages_count - 1) + self.wait_after_msg = wait_after_msg + + def send_msg(self): + msg = MESSAGES[self.position] + self.method(self.client, msg) + self.bytes += len(msg) + self.msg_sent += 1 + self.position = (self.position + 1) % self.messages_count + if self.wait_after_msg > 0: + time.sleep(self.wait_after_msg) + + +def init_msg(messages_count): + # Limit the messages amount. Clients will reiterate the array again + # if an amount of messages to be sent is bigger than 1000 + if messages_count > 1000: + messages_count = 1000 + LOG.info("Preparing %d messages", messages_count) + ranges = RANDOM_VARIABLE.rvs(size=messages_count) + i = 0 + for range_start in ranges: + length = random.randint(range_start, range_start + 497) + msg = ''.join(random.choice(string.lowercase) for x in range(length)) \ + + ' ' + str(i) + MESSAGES.append(msg) + i += 1 + LOG.info("Messages has been prepared") + + def rpc_server(transport, target, wait_before_answer, executor, show_stats): endpoints = [RpcEndpoint(wait_before_answer, show_stats)] server = rpc.get_rpc_server(transport, target, endpoints, @@ -194,46 +237,33 @@ def threads_spawner(threads, method, *args, **kwargs): p.waitall() -def send_msg(_id, transport, target, messages, wait_after_msg, timeout, - is_cast): - client = rpc.RPCClient(transport, target) - client = client.prepare(timeout=timeout) +def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast, + messages_count): + LOG.debug("Sending %d messages using client %d", messages_count, c_id) rpc_method = _rpc_cast if is_cast else _rpc_call - - ranges = RANDOM_VARIABLE.rvs(size=messages) - i = 0 - for range_start in ranges: - length = random.randint(range_start, range_start + 497) - msg = ''.join(random.choice(string.lowercase) for x in range(length)) \ - + ' ' + str(i) - i += 1 - # temporary file to log approximate bytes size of messages - with open('./oslo_%s_%s.log' % (target.topic, CURRENT_PID), 'a+') as f: - # 37 additional bytes for Python String object size canculation. - # In fact we may ignore these bytes, and estimate the data flow - # via number of symbols - f.write(str(length + 37) + '\n') - rpc_method(client, msg) - if wait_after_msg > 0: - time.sleep(wait_after_msg) + client = RPCClient(transport, target, timeout, rpc_method, wait_after_msg) + RPC_CLIENTS.append(client) + for _ in xrange(0, messages_count): + client.send_msg() + LOG.debug("Client %d has sent all messages", c_id) def _rpc_call(client, msg): try: res = client.call({}, 'info', message=msg) except Exception as e: - LOG.exception('Error %s on CALL for message %s' % (str(e), msg)) + LOG.exception('Error %s on CALL for message %s', str(e), msg) else: - LOG.info("SENT: %s, RCV: %s" % (msg, res)) + LOG.debug("SENT: %s, RCV: %s", msg, res) def _rpc_cast(client, msg): try: client.cast({}, 'info', message=msg) except Exception as e: - LOG.exception('Error %s on CAST for message %s' % (str(e), msg)) + LOG.exception('Error %s on CAST for message %s', str(e), msg) else: - LOG.info("SENT: %s" % msg) + LOG.debug("SENT: %s", msg) def notifier(_id, transport, messages, wait_after_msg, timeout): @@ -244,15 +274,15 @@ def notifier(_id, transport, messages, wait_after_msg, timeout): msg = 1 + msg ctxt = {} payload = dict(msg=msg, vm='test', otherdata='ahah') - LOG.info("send msg") - LOG.info(payload) + LOG.debug("send msg") + LOG.debug(payload) n1.info(ctxt, 'compute.start1', payload) if wait_after_msg > 0: time.sleep(wait_after_msg) def _setup_logging(is_debug): - log_level = logging.DEBUG if is_debug else logging.WARN + log_level = logging.DEBUG if is_debug else logging.INFO logging.basicConfig(stream=sys.stdout, level=log_level) logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter()) for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging' @@ -348,29 +378,28 @@ def main(): threads_spawner(args.threads, notifier, transport, args.messages, args.wait_after_msg, args.timeout) elif args.mode == 'rpc-client': + init_msg(args.messages) + start = datetime.datetime.now() threads_spawner(args.threads, send_msg, transport, target, - args.messages, args.wait_after_msg, args.timeout, - args.is_cast) - time_ellapsed = (datetime.datetime.now() - start).total_seconds() - msg_count = args.messages * args.threads - log_msg = '%d messages was sent for %s seconds. ' \ - 'Bandwidth is %s msg/sec' % (msg_count, time_ellapsed, - (msg_count / time_ellapsed)) - print (log_msg) - with open('./oslo_res_%s.txt' % args.topic, 'a+') as f: - f.write(log_msg + '\n') + args.wait_after_msg, args.timeout, args.is_cast, + args.messages) + time_elapsed = (datetime.datetime.now() - start).total_seconds() - with open('./oslo_%s_%s.log' % (args.topic, CURRENT_PID), 'a+') as f: - data = f.read() - data = [int(i) for i in data.split()] - data_sum = sum(data) - log_msg = '%s bytes were sent for %s seconds. Bandwidth is %s b/s' % ( - data_sum, time_ellapsed, (data_sum / time_ellapsed)) - print(log_msg) + msg_count = 0 + total_bytes = 0 + for client in RPC_CLIENTS: + msg_count += client.msg_sent + total_bytes += client.bytes + + LOG.info('%d messages were sent for %d seconds. ' + 'Bandwidth was %d msg/sec', msg_count, time_elapsed, + (msg_count / time_elapsed)) + log_msg = '%s bytes were sent for %d seconds. Bandwidth is %d b/s' % ( + total_bytes, time_elapsed, (total_bytes / time_elapsed)) + LOG.info(log_msg) with open('./oslo_res_%s.txt' % args.topic, 'a+') as f: f.write(log_msg + '\n') - os.remove('./oslo_%s_%s.log' % (args.topic, CURRENT_PID)) LOG.info("calls finished, wait %d seconds" % args.exit_wait) time.sleep(args.exit_wait)