Added duration to notify server/client

Change-Id: I4feeeec0c69305d92dce5baf60502a39ebe6b247
This commit is contained in:
Yulia Portnova
2016-02-17 15:03:41 +02:00
committed by Dmitry Mescheryakov
parent 629632bfff
commit d70dfc2f7e

View File

@@ -33,6 +33,7 @@ from oslo_config import cfg
import oslo_messaging as messaging import oslo_messaging as messaging
from oslo_messaging import notify # noqa from oslo_messaging import notify # noqa
from oslo_messaging import rpc # noqa from oslo_messaging import rpc # noqa
from oslo_utils import timeutils
LOG = logging.getLogger() LOG = logging.getLogger()
RANDOM_VARIABLE = None RANDOM_VARIABLE = None
@@ -121,13 +122,12 @@ class NotifyEndpoint(Monitor):
return messaging.NotificationResult.HANDLED return messaging.NotificationResult.HANDLED
def notify_server(transport, topic, show_stats): def notify_server(transport, topic, show_stats, duration):
endpoints = [NotifyEndpoint(show_stats)] endpoints = [NotifyEndpoint(show_stats)]
target = messaging.Target(topic=topic) target = messaging.Target(topic=topic)
server = notify.get_notification_listener(transport, [target], server = notify.get_notification_listener(transport, [target],
endpoints, executor='eventlet') endpoints, executor='eventlet')
server.start() run_server(server, duration=duration)
server.wait()
class BatchNotifyEndpoint(Monitor): class BatchNotifyEndpoint(Monitor):
@@ -152,15 +152,14 @@ class BatchNotifyEndpoint(Monitor):
return messaging.NotificationResult.HANDLED return messaging.NotificationResult.HANDLED
def batch_notify_server(transport, topic, show_stats): def batch_notify_server(transport, topic, show_stats, duration):
endpoints = [BatchNotifyEndpoint(show_stats)] endpoints = [BatchNotifyEndpoint(show_stats)]
target = messaging.Target(topic=topic) target = messaging.Target(topic=topic)
server = notify.get_batch_notification_listener( server = notify.get_batch_notification_listener(
transport, [target], transport, [target],
endpoints, executor='eventlet', endpoints, executor='eventlet',
batch_size=1000, batch_time=5) batch_size=1000, batch_time=5)
server.start() run_server(server, duration=duration)
server.wait()
class RpcEndpoint(Monitor): class RpcEndpoint(Monitor):
@@ -226,23 +225,32 @@ def init_msg(messages_count):
LOG.info("Messages has been prepared") LOG.info("Messages has been prepared")
def run_server(server, duration=None):
server.start()
if duration:
with timeutils.StopWatch(duration) as stop_watch:
while not stop_watch.expired():
time.sleep(1)
server.stop()
server.wait()
def rpc_server(transport, target, wait_before_answer, executor, show_stats, def rpc_server(transport, target, wait_before_answer, executor, show_stats,
duration): duration):
endpoints = [RpcEndpoint(wait_before_answer, show_stats)] endpoints = [RpcEndpoint(wait_before_answer, show_stats)]
server = rpc.get_rpc_server(transport, target, endpoints, server = rpc.get_rpc_server(transport, target, endpoints,
executor=executor) executor=executor)
LOG.debug("starting RPC server for target %s", target) LOG.debug("starting RPC server for target %s", target)
server.start() run_server(server, duration=duration)
if duration: LOG.info("Received total messages: %d",
start_t = time.time() server.dispatcher.endpoints[0].messages_received)
while time.time() - start_t < duration:
time.sleep(1)
server.stop() def spawn_notify_clients(threads, *args, **kwargs):
server.wait() p = eventlet.GreenPool(size=threads)
LOG.info("Received total messages: %d", for i in range(0, threads):
server.dispatcher.endpoints[0].messages_received) p.spawn_n(notifier, i, *args, **kwargs)
return p.waitall()
server.wait()
def spawn_rpc_clients(threads, transport, targets, def spawn_rpc_clients(threads, transport, targets,
@@ -256,13 +264,6 @@ def spawn_rpc_clients(threads, transport, targets,
p.waitall() p.waitall()
def threads_spawner(threads, method, *args, **kwargs):
p = eventlet.GreenPool(size=threads)
for i in range(0, threads):
p.spawn_n(method, i, *args, **kwargs)
p.waitall()
def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast, def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
messages_count, duration): messages_count, duration):
rpc_method = _rpc_cast if is_cast else _rpc_call rpc_method = _rpc_cast if is_cast else _rpc_call
@@ -270,9 +271,9 @@ def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
RPC_CLIENTS.append(client) RPC_CLIENTS.append(client)
if duration: if duration:
start_time = time.time() with timeutils.StopWatch(duration) as stop_watch:
while time.time() - start_time < duration: while not stop_watch.expired():
client.send_msg() client.send_msg()
else: else:
LOG.debug("Sending %d messages using client %d", messages_count, c_id) LOG.debug("Sending %d messages using client %d", messages_count, c_id)
for _ in six.moves.range(0, messages_count): for _ in six.moves.range(0, messages_count):
@@ -298,22 +299,30 @@ def _rpc_cast(client, msg):
LOG.debug("SENT: %s", msg) LOG.debug("SENT: %s", msg)
def notifier(_id, topic, transport, messages, wait_after_msg, timeout): def notifier(_id, topic, transport, messages, wait_after_msg, timeout,
duration):
n1 = notify.Notifier(transport, n1 = notify.Notifier(transport,
driver='messaging', driver='messaging',
topic=topic).prepare( topic=topic).prepare(
publisher_id='publisher-%d' % _id) publisher_id='publisher-%d' % _id)
msg = 0 payload = dict(msg=0, vm='test', otherdata='ahah')
for i in range(0, messages): ctxt = {}
msg = 1 + msg
ctxt = {} def send_notif():
payload = dict(msg=msg, vm='test', otherdata='ahah') payload['msg'] += 1
LOG.debug("send msg") LOG.debug("sending notification %s", payload)
LOG.debug(payload)
n1.info(ctxt, 'compute.start1', payload) n1.info(ctxt, 'compute.start1', payload)
if wait_after_msg > 0: if wait_after_msg > 0:
time.sleep(wait_after_msg) time.sleep(wait_after_msg)
if duration:
with timeutils.StopWatch(duration) as stop_watch:
while not stop_watch.expired():
send_notif()
else:
for i in range(0, messages):
send_notif()
def _setup_logging(is_debug): def _setup_logging(is_debug):
log_level = logging.DEBUG if is_debug else logging.INFO log_level = logging.DEBUG if is_debug else logging.INFO
@@ -360,6 +369,7 @@ def main():
server = subparsers.add_parser('batch-notify-server') server = subparsers.add_parser('batch-notify-server')
server.add_argument('--show-stats', dest='show_stats', server.add_argument('--show-stats', dest='show_stats',
type=bool, default=True) type=bool, default=True)
client = subparsers.add_parser('notify-client') client = subparsers.add_parser('notify-client')
client.add_argument('-p', dest='threads', type=int, default=1, client.add_argument('-p', dest='threads', type=int, default=1,
help='number of client threads') help='number of client threads')
@@ -420,12 +430,14 @@ def main():
rpc_server(transport, target, args.wait_before_answer, args.executor, rpc_server(transport, target, args.wait_before_answer, args.executor,
args.show_stats, args.duration) args.show_stats, args.duration)
elif args.mode == 'notify-server': elif args.mode == 'notify-server':
notify_server(transport, args.topic, args.show_stats) notify_server(transport, args.topic, args.show_stats, args.duration)
elif args.mode == 'batch-notify-server': elif args.mode == 'batch-notify-server':
batch_notify_server(transport, args.topic, args.show_stats) batch_notify_server(transport, args.topic, args.show_stats,
args.duration)
elif args.mode == 'notify-client': elif args.mode == 'notify-client':
threads_spawner(args.threads, notifier, args.topic, transport, spawn_notify_clients(args.threads, args.topic, transport,
args.messages, args.wait_after_msg, args.timeout) args.messages, args.wait_after_msg, args.timeout,
args.duration)
elif args.mode == 'rpc-client': elif args.mode == 'rpc-client':
init_msg(args.messages) init_msg(args.messages)
targets = [target.partition('.')[::2] for target in args.targets] targets = [target.partition('.')[::2] for target in args.targets]