Add duration option to simulator.py

Change-Id: I992fdc1e22ee0debed34b4beb62cbd563351d12f
This commit is contained in:
Yulia Portnova 2016-01-11 18:46:40 +02:00
parent 7265a82a4e
commit 2c8f39312f
1 changed files with 30 additions and 9 deletions

View File

@ -165,8 +165,10 @@ class RpcEndpoint(Monitor):
def __init__(self, wait_before_answer, show_stats):
self.count = None
self.wait_before_answer = wait_before_answer
self.messages_received = 0
def info(self, ctxt, message):
self.messages_received += 1
i = int(message.split(' ')[-1])
if self.count is None:
self.count = i
@ -184,7 +186,7 @@ class RpcEndpoint(Monitor):
class RPCClient(object):
def __init__(self, transport, target, timeout, method, wait_after_msg):
self.client = rpc.RPCClient(transport, target)
self.client.prepare(timeout=timeout)
self.client = self.client.prepare(timeout=timeout)
self.method = method
self.bytes = 0
self.msg_sent = 0
@ -222,11 +224,21 @@ def init_msg(messages_count):
LOG.info("Messages has been prepared")
def rpc_server(transport, target, wait_before_answer, executor, show_stats):
def rpc_server(transport, target, wait_before_answer, executor, show_stats,
duration):
endpoints = [RpcEndpoint(wait_before_answer, show_stats)]
server = rpc.get_rpc_server(transport, target, endpoints,
executor=executor)
server.start()
if duration:
start_t = time.time()
while time.time() - start_t < duration:
time.sleep(1)
server.stop()
server.wait()
LOG.info("Received total messages: %d",
server.dispatcher.endpoints[0].messages_received)
return
server.wait()
@ -238,14 +250,20 @@ def threads_spawner(threads, method, *args, **kwargs):
def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
messages_count):
LOG.debug("Sending %d messages using client %d", messages_count, c_id)
messages_count, duration):
rpc_method = _rpc_cast if is_cast else _rpc_call
client = RPCClient(transport, target, timeout, rpc_method, wait_after_msg)
RPC_CLIENTS.append(client)
for _ in xrange(0, messages_count):
client.send_msg()
LOG.debug("Client %d has sent all messages", c_id)
if duration:
start_time = time.time()
while time.time() - start_time < duration:
client.send_msg()
else:
LOG.debug("Sending %d messages using client %d", messages_count, c_id)
for _ in xrange(0, messages_count):
client.send_msg()
LOG.debug("Client %d has sent %d messages", c_id, messages_count)
def _rpc_call(client, msg):
@ -304,6 +322,9 @@ def main():
parser.add_argument('-tp', '--topic', dest='topic',
default="profiler_topic",
help="Topic to publish/receive messages to/from.")
parser.add_argument('-l', dest='duration', type=int,
help='send messages for certain time')
subparsers = parser.add_subparsers(dest='mode',
help='notify/rpc server/client mode')
@ -369,7 +390,7 @@ def main():
cfg.CONF.rpc_zmq_matchmaker = "redis"
transport._driver.matchmaker._redis.flushdb()
rpc_server(transport, target, args.wait_before_answer, args.executor,
args.show_stats)
args.show_stats, args.duration)
elif args.mode == 'notify-server':
notify_server(transport, args.show_stats)
elif args.mode == 'batch-notify-server':
@ -383,7 +404,7 @@ def main():
start = datetime.datetime.now()
threads_spawner(args.threads, send_msg, transport, target,
args.wait_after_msg, args.timeout, args.is_cast,
args.messages)
args.messages, args.duration)
time_elapsed = (datetime.datetime.now() - start).total_seconds()
msg_count = 0