Improve simulator.py
- added config options to set debug level - added config options to show proceed messages per second on rpc-server - added config options to select executor for rpc-server - added config options to select call or cast mesages for rpc-client Usage section updated Change-Id: Ieadbc600f556ca5eb43b05abec69315b46023662
This commit is contained in:
parent
e72dc250b3
commit
28de384e9c
@ -10,19 +10,14 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
# Usage example:
|
|
||||||
# python tools/simulator.py \
|
|
||||||
# --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-server
|
|
||||||
# python tools/simulator.py
|
|
||||||
# --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-client \
|
|
||||||
# --exit-wait 15000 -p 64 -m 64
|
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
eventlet.monkey_patch()
|
eventlet.monkey_patch()
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
@ -30,8 +25,19 @@ import oslo_messaging as messaging
|
|||||||
from oslo_messaging import notify # noqa
|
from oslo_messaging import notify # noqa
|
||||||
from oslo_messaging import rpc # noqa
|
from oslo_messaging import rpc # noqa
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger()
|
LOG = logging.getLogger()
|
||||||
|
|
||||||
|
USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
|
||||||
|
{notify-server,notify-client,rpc-server,rpc-client} ...
|
||||||
|
|
||||||
|
Usage example:
|
||||||
|
python tools/simulator.py\
|
||||||
|
--url rabbit://stackrabbit:secretrabbit@localhost/ rpc-server
|
||||||
|
python tools/simulator.py\
|
||||||
|
--url rabbit://stackrabbit:secretrabbit@localhost/ rpc-client\
|
||||||
|
--exit-wait 15000 -p 64 -m 64"""
|
||||||
|
|
||||||
|
|
||||||
class LoggingNoParsingFilter(logging.Filter):
|
class LoggingNoParsingFilter(logging.Filter):
|
||||||
def filter(self, record):
|
def filter(self, record):
|
||||||
@ -75,7 +81,7 @@ class RpcEndpoint(object):
|
|||||||
self.wait_before_answer = wait_before_answer
|
self.wait_before_answer = wait_before_answer
|
||||||
|
|
||||||
def info(self, ctxt, message):
|
def info(self, ctxt, message):
|
||||||
i = int(message.replace('test ', ''))
|
i = int(message.split(' ')[-1])
|
||||||
if self.count is None:
|
if self.count is None:
|
||||||
self.count = i
|
self.count = i
|
||||||
elif i == 0:
|
elif i == 0:
|
||||||
@ -89,11 +95,29 @@ class RpcEndpoint(object):
|
|||||||
return "OK: %s" % message
|
return "OK: %s" % message
|
||||||
|
|
||||||
|
|
||||||
def rpc_server(transport, wait_before_answer):
|
class RpcEndpointMonitor(RpcEndpoint):
|
||||||
endpoints = [RpcEndpoint(wait_before_answer)]
|
def __init__(self, *args, **kwargs):
|
||||||
target = messaging.Target(topic='t1', server='moi')
|
super(RpcEndpointMonitor, self).__init__(*args, **kwargs)
|
||||||
server = rpc.get_rpc_server(transport, target,
|
|
||||||
endpoints, executor='eventlet')
|
self._count = self._prev_count = 0
|
||||||
|
self._monitor()
|
||||||
|
|
||||||
|
def _monitor(self):
|
||||||
|
threading.Timer(1.0, self._monitor).start()
|
||||||
|
print ("%d msg was received per second"
|
||||||
|
% (self._count - self._prev_count))
|
||||||
|
self._prev_count = self._count
|
||||||
|
|
||||||
|
def info(self, *args, **kwargs):
|
||||||
|
self._count += 1
|
||||||
|
super(RpcEndpointMonitor, self).info(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def rpc_server(transport, target, wait_before_answer, executor, show_stats):
|
||||||
|
endpoint_cls = RpcEndpointMonitor if show_stats else RpcEndpoint
|
||||||
|
endpoints = [endpoint_cls(wait_before_answer)]
|
||||||
|
server = rpc.get_rpc_server(transport, target, endpoints,
|
||||||
|
executor=executor)
|
||||||
server.start()
|
server.start()
|
||||||
server.wait()
|
server.wait()
|
||||||
|
|
||||||
@ -105,23 +129,38 @@ def threads_spawner(threads, method, *args, **kwargs):
|
|||||||
p.waitall()
|
p.waitall()
|
||||||
|
|
||||||
|
|
||||||
def rpc_call(_id, transport, messages, wait_after_msg, timeout):
|
def send_msg(_id, transport, target, messages, wait_after_msg, timeout,
|
||||||
target = messaging.Target(topic='t1', server='moi')
|
is_cast):
|
||||||
c = rpc.RPCClient(transport, target)
|
client = rpc.RPCClient(transport, target)
|
||||||
c = c.prepare(timeout=timeout)
|
client = client.prepare(timeout=timeout)
|
||||||
|
rpc_method = _rpc_cast if is_cast else _rpc_call
|
||||||
|
|
||||||
for i in range(0, messages):
|
for i in range(0, messages):
|
||||||
payload = "test %d" % i
|
msg = "test message %d" % i
|
||||||
LOG.info("SEND: %s" % payload)
|
LOG.info("SEND: %s" % msg)
|
||||||
try:
|
rpc_method(client, msg)
|
||||||
res = c.call({}, 'info', message=payload)
|
|
||||||
except Exception:
|
|
||||||
LOG.exception('no RCV for %s' % i)
|
|
||||||
else:
|
|
||||||
LOG.info("RCV: %s" % res)
|
|
||||||
if wait_after_msg > 0:
|
if wait_after_msg > 0:
|
||||||
time.sleep(wait_after_msg)
|
time.sleep(wait_after_msg)
|
||||||
|
|
||||||
|
|
||||||
|
def _rpc_call(client, msg):
|
||||||
|
try:
|
||||||
|
res = client.call({}, 'info', message=msg)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception('Error %s on CALL for message %s' % (str(e), msg))
|
||||||
|
else:
|
||||||
|
LOG.info("SENT: %s, RCV: %s" % (msg, res))
|
||||||
|
|
||||||
|
|
||||||
|
def _rpc_cast(client, msg):
|
||||||
|
try:
|
||||||
|
client.cast({}, 'info', message=msg)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception('Error %s on CAST for message %s' % (str(e), msg))
|
||||||
|
else:
|
||||||
|
LOG.info("SENT: %s" % msg)
|
||||||
|
|
||||||
|
|
||||||
def notifier(_id, transport, messages, wait_after_msg, timeout):
|
def notifier(_id, transport, messages, wait_after_msg, timeout):
|
||||||
n1 = notify.Notifier(transport, topic="n-t1").prepare(
|
n1 = notify.Notifier(transport, topic="n-t1").prepare(
|
||||||
publisher_id='publisher-%d' % _id)
|
publisher_id='publisher-%d' % _id)
|
||||||
@ -137,11 +176,26 @@ def notifier(_id, transport, messages, wait_after_msg, timeout):
|
|||||||
time.sleep(wait_after_msg)
|
time.sleep(wait_after_msg)
|
||||||
|
|
||||||
|
|
||||||
|
def _setup_logging(is_debug):
|
||||||
|
log_level = logging.DEBUG if is_debug else logging.WARN
|
||||||
|
logging.basicConfig(stream=sys.stdout, level=log_level)
|
||||||
|
logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter())
|
||||||
|
for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging'
|
||||||
|
'oslo.messaging._drivers.amqp', ]:
|
||||||
|
logging.getLogger(i).setLevel(logging.WARN)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description='RPC DEMO')
|
parser = argparse.ArgumentParser(
|
||||||
|
description='Tools to play with oslo.messaging\'s RPC',
|
||||||
|
usage=USAGE,
|
||||||
|
)
|
||||||
parser.add_argument('--url', dest='url',
|
parser.add_argument('--url', dest='url',
|
||||||
default='rabbit://guest:password@localhost/',
|
default='rabbit://guest:password@localhost/',
|
||||||
help="oslo.messaging transport url")
|
help="oslo.messaging transport url")
|
||||||
|
parser.add_argument('-d', '--debug', dest='debug', type=bool,
|
||||||
|
default=False,
|
||||||
|
help="Turn on DEBUG logging level instead of WARN")
|
||||||
subparsers = parser.add_subparsers(dest='mode',
|
subparsers = parser.add_subparsers(dest='mode',
|
||||||
help='notify/rpc server/client mode')
|
help='notify/rpc server/client mode')
|
||||||
|
|
||||||
@ -158,6 +212,11 @@ def main():
|
|||||||
|
|
||||||
server = subparsers.add_parser('rpc-server')
|
server = subparsers.add_parser('rpc-server')
|
||||||
server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
|
server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
|
||||||
|
server.add_argument('--show-stats', dest='show_stats',
|
||||||
|
type=bool, default=True)
|
||||||
|
server.add_argument('-e', '--executor', dest='executor',
|
||||||
|
type=str, default='eventlet',
|
||||||
|
help='name of a message executor')
|
||||||
|
|
||||||
client = subparsers.add_parser('rpc-client')
|
client = subparsers.add_parser('rpc-client')
|
||||||
client.add_argument('-p', dest='threads', type=int, default=1,
|
client.add_argument('-p', dest='threads', type=int, default=1,
|
||||||
@ -171,34 +230,41 @@ def main():
|
|||||||
client.add_argument('--exit-wait', dest='exit_wait', type=int, default=0,
|
client.add_argument('--exit-wait', dest='exit_wait', type=int, default=0,
|
||||||
help='Keep connections open N seconds after calls '
|
help='Keep connections open N seconds after calls '
|
||||||
'have been done')
|
'have been done')
|
||||||
|
client.add_argument('--is-cast', dest='is_cast', type=bool, default=False,
|
||||||
|
help='Use `call` or `cast` RPC methods')
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
# Setup logging
|
_setup_logging(is_debug=args.debug)
|
||||||
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
|
|
||||||
logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter())
|
|
||||||
for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging'
|
|
||||||
'oslo.messaging._drivers.amqp', ]:
|
|
||||||
logging.getLogger(i).setLevel(logging.WARN)
|
|
||||||
|
|
||||||
# oslo.config defaults
|
# oslo.config defaults
|
||||||
cfg.CONF.heartbeat_interval = 5
|
cfg.CONF.heartbeat_interval = 5
|
||||||
cfg.CONF.notification_topics = "notif"
|
cfg.CONF.notification_topics = "notif"
|
||||||
cfg.CONF.notification_driver = "messaging"
|
cfg.CONF.notification_driver = "messaging"
|
||||||
|
|
||||||
# the transport
|
|
||||||
transport = messaging.get_transport(cfg.CONF, url=args.url)
|
transport = messaging.get_transport(cfg.CONF, url=args.url)
|
||||||
|
target = messaging.Target(topic='profiler_topic', server='profiler_server')
|
||||||
|
|
||||||
if args.mode == 'rpc-server':
|
if args.mode == 'rpc-server':
|
||||||
rpc_server(transport, args.wait_before_answer)
|
if args.url.startswith('zmq'):
|
||||||
|
cfg.CONF.rpc_zmq_matchmaker = "redis"
|
||||||
|
transport._driver.matchmaker._redis.flushdb()
|
||||||
|
rpc_server(transport, target, args.wait_before_answer, args.executor,
|
||||||
|
args.show_stats)
|
||||||
elif args.mode == 'notify-server':
|
elif args.mode == 'notify-server':
|
||||||
notify_server(transport)
|
notify_server(transport)
|
||||||
elif args.mode == 'notify-client':
|
elif args.mode == 'notify-client':
|
||||||
threads_spawner(args.threads, notifier, transport, args.messages,
|
threads_spawner(args.threads, notifier, transport, args.messages,
|
||||||
args.wait_after_msg, args.timeout)
|
args.wait_after_msg, args.timeout)
|
||||||
elif args.mode == 'rpc-client':
|
elif args.mode == 'rpc-client':
|
||||||
threads_spawner(args.threads, rpc_call, transport, args.messages,
|
start = datetime.datetime.now()
|
||||||
args.wait_after_msg, args.timeout)
|
threads_spawner(args.threads, send_msg, transport, target,
|
||||||
|
args.messages, args.wait_after_msg, args.timeout,
|
||||||
|
args.is_cast)
|
||||||
|
time_ellapsed = (datetime.datetime.now() - start).total_seconds()
|
||||||
|
msg_count = args.messages * args.threads
|
||||||
|
print ('%d messages was sent for %s seconds. Bandwight is %s msg/sec'
|
||||||
|
% (msg_count, time_ellapsed, (msg_count / time_ellapsed)))
|
||||||
LOG.info("calls finished, wait %d seconds" % args.exit_wait)
|
LOG.info("calls finished, wait %d seconds" % args.exit_wait)
|
||||||
time.sleep(args.exit_wait)
|
time.sleep(args.exit_wait)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user