Merge "Add duration option to simulator.py"
This commit is contained in:
commit
62fc6ab094
@ -165,8 +165,10 @@ class RpcEndpoint(Monitor):
|
|||||||
def __init__(self, wait_before_answer, show_stats):
|
def __init__(self, wait_before_answer, show_stats):
|
||||||
self.count = None
|
self.count = None
|
||||||
self.wait_before_answer = wait_before_answer
|
self.wait_before_answer = wait_before_answer
|
||||||
|
self.messages_received = 0
|
||||||
|
|
||||||
def info(self, ctxt, message):
|
def info(self, ctxt, message):
|
||||||
|
self.messages_received += 1
|
||||||
i = int(message.split(' ')[-1])
|
i = int(message.split(' ')[-1])
|
||||||
if self.count is None:
|
if self.count is None:
|
||||||
self.count = i
|
self.count = i
|
||||||
@ -184,7 +186,7 @@ class RpcEndpoint(Monitor):
|
|||||||
class RPCClient(object):
|
class RPCClient(object):
|
||||||
def __init__(self, transport, target, timeout, method, wait_after_msg):
|
def __init__(self, transport, target, timeout, method, wait_after_msg):
|
||||||
self.client = rpc.RPCClient(transport, target)
|
self.client = rpc.RPCClient(transport, target)
|
||||||
self.client.prepare(timeout=timeout)
|
self.client = self.client.prepare(timeout=timeout)
|
||||||
self.method = method
|
self.method = method
|
||||||
self.bytes = 0
|
self.bytes = 0
|
||||||
self.msg_sent = 0
|
self.msg_sent = 0
|
||||||
@ -222,11 +224,21 @@ def init_msg(messages_count):
|
|||||||
LOG.info("Messages has been prepared")
|
LOG.info("Messages has been prepared")
|
||||||
|
|
||||||
|
|
||||||
def rpc_server(transport, target, wait_before_answer, executor, show_stats):
|
def rpc_server(transport, target, wait_before_answer, executor, show_stats,
|
||||||
|
duration):
|
||||||
endpoints = [RpcEndpoint(wait_before_answer, show_stats)]
|
endpoints = [RpcEndpoint(wait_before_answer, show_stats)]
|
||||||
server = rpc.get_rpc_server(transport, target, endpoints,
|
server = rpc.get_rpc_server(transport, target, endpoints,
|
||||||
executor=executor)
|
executor=executor)
|
||||||
server.start()
|
server.start()
|
||||||
|
if duration:
|
||||||
|
start_t = time.time()
|
||||||
|
while time.time() - start_t < duration:
|
||||||
|
time.sleep(1)
|
||||||
|
server.stop()
|
||||||
|
server.wait()
|
||||||
|
LOG.info("Received total messages: %d",
|
||||||
|
server.dispatcher.endpoints[0].messages_received)
|
||||||
|
return
|
||||||
server.wait()
|
server.wait()
|
||||||
|
|
||||||
|
|
||||||
@ -238,14 +250,20 @@ def threads_spawner(threads, method, *args, **kwargs):
|
|||||||
|
|
||||||
|
|
||||||
def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
|
def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
|
||||||
messages_count):
|
messages_count, duration):
|
||||||
LOG.debug("Sending %d messages using client %d", messages_count, c_id)
|
|
||||||
rpc_method = _rpc_cast if is_cast else _rpc_call
|
rpc_method = _rpc_cast if is_cast else _rpc_call
|
||||||
client = RPCClient(transport, target, timeout, rpc_method, wait_after_msg)
|
client = RPCClient(transport, target, timeout, rpc_method, wait_after_msg)
|
||||||
RPC_CLIENTS.append(client)
|
RPC_CLIENTS.append(client)
|
||||||
for _ in xrange(0, messages_count):
|
|
||||||
client.send_msg()
|
if duration:
|
||||||
LOG.debug("Client %d has sent all messages", c_id)
|
start_time = time.time()
|
||||||
|
while time.time() - start_time < duration:
|
||||||
|
client.send_msg()
|
||||||
|
else:
|
||||||
|
LOG.debug("Sending %d messages using client %d", messages_count, c_id)
|
||||||
|
for _ in xrange(0, messages_count):
|
||||||
|
client.send_msg()
|
||||||
|
LOG.debug("Client %d has sent %d messages", c_id, messages_count)
|
||||||
|
|
||||||
|
|
||||||
def _rpc_call(client, msg):
|
def _rpc_call(client, msg):
|
||||||
@ -304,6 +322,9 @@ def main():
|
|||||||
parser.add_argument('-tp', '--topic', dest='topic',
|
parser.add_argument('-tp', '--topic', dest='topic',
|
||||||
default="profiler_topic",
|
default="profiler_topic",
|
||||||
help="Topic to publish/receive messages to/from.")
|
help="Topic to publish/receive messages to/from.")
|
||||||
|
parser.add_argument('-l', dest='duration', type=int,
|
||||||
|
help='send messages for certain time')
|
||||||
|
|
||||||
subparsers = parser.add_subparsers(dest='mode',
|
subparsers = parser.add_subparsers(dest='mode',
|
||||||
help='notify/rpc server/client mode')
|
help='notify/rpc server/client mode')
|
||||||
|
|
||||||
@ -369,7 +390,7 @@ def main():
|
|||||||
cfg.CONF.rpc_zmq_matchmaker = "redis"
|
cfg.CONF.rpc_zmq_matchmaker = "redis"
|
||||||
transport._driver.matchmaker._redis.flushdb()
|
transport._driver.matchmaker._redis.flushdb()
|
||||||
rpc_server(transport, target, args.wait_before_answer, args.executor,
|
rpc_server(transport, target, args.wait_before_answer, args.executor,
|
||||||
args.show_stats)
|
args.show_stats, args.duration)
|
||||||
elif args.mode == 'notify-server':
|
elif args.mode == 'notify-server':
|
||||||
notify_server(transport, args.show_stats)
|
notify_server(transport, args.show_stats)
|
||||||
elif args.mode == 'batch-notify-server':
|
elif args.mode == 'batch-notify-server':
|
||||||
@ -383,7 +404,7 @@ def main():
|
|||||||
start = datetime.datetime.now()
|
start = datetime.datetime.now()
|
||||||
threads_spawner(args.threads, send_msg, transport, target,
|
threads_spawner(args.threads, send_msg, transport, target,
|
||||||
args.wait_after_msg, args.timeout, args.is_cast,
|
args.wait_after_msg, args.timeout, args.is_cast,
|
||||||
args.messages)
|
args.messages, args.duration)
|
||||||
time_elapsed = (datetime.datetime.now() - start).total_seconds()
|
time_elapsed = (datetime.datetime.now() - start).total_seconds()
|
||||||
|
|
||||||
msg_count = 0
|
msg_count = 0
|
||||||
|
Loading…
Reference in New Issue
Block a user