diff --git a/zaqar/bench/config.py b/zaqar/bench/config.py index 25bd8dbb5..03dcc108b 100644 --- a/zaqar/bench/config.py +++ b/zaqar/bench/config.py @@ -43,7 +43,8 @@ _CLI_OPTIONS = ( cfg.IntOpt('time', short='t', default=3, help="Duration of the performance test, in seconds"), cfg.StrOpt('server_url', short='s', default='http://localhost:8888'), - cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue-'), + cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue'), + cfg.IntOpt('num_queues', short='qno', default=4), cfg.StrOpt('messages_path', short='m') ) conf.register_cli_opts(_CLI_OPTIONS) diff --git a/zaqar/bench/consumer.py b/zaqar/bench/consumer.py index 9d5c08b0e..44da977a2 100644 --- a/zaqar/bench/consumer.py +++ b/zaqar/bench/consumer.py @@ -16,6 +16,7 @@ from __future__ import division from __future__ import print_function import multiprocessing as mp +import random import sys import time @@ -29,7 +30,7 @@ from zaqarclient.transport.errors import TransportError from zaqar.bench.config import conf -def claim_delete(stats, test_duration, ttl, grace, limit): +def claim_delete(queues, stats, test_duration, ttl, grace, limit): """Consumer Worker The Consumer Worker continuously claims and deletes messages @@ -37,8 +38,6 @@ def claim_delete(stats, test_duration, ttl, grace, limit): delete is recorded for calculating throughput and latency. """ - cli = client.Client(conf.server_url) - queue = cli.queue(conf.queue_prefix + '1') end = time.time() + test_duration claim_total_elapsed = 0 delete_total_elapsed = 0 @@ -47,6 +46,9 @@ def claim_delete(stats, test_duration, ttl, grace, limit): delete_total_requests = 0 while time.time() < end: + # NOTE(kgriffs): Distribute requests across all queues evenly. + queue = random.choice(queues) + try: marktime.start('claim_message') @@ -88,10 +90,17 @@ def claim_delete(stats, test_duration, ttl, grace, limit): }) -def load_generator(stats, num_workers, test_duration, url, ttl, grace, limit): +def load_generator(stats, num_workers, num_queues, + test_duration, url, ttl, grace, limit): + + cli = client.Client(conf.server_url) + queues = [cli.queue(conf.queue_prefix + '-' + str(i)) + for i in range(num_queues)] + gevent.joinall([ - gevent.spawn(claim_delete, stats, test_duration, ttl, - grace, limit) + gevent.spawn(claim_delete, + queues, stats, test_duration, ttl, grace, limit) + for _ in range(num_workers) ]) @@ -118,6 +127,7 @@ def crunch(stats): def run(upstream_queue): num_procs = conf.consumer_processes num_workers = conf.consumer_workers + num_queues = conf.num_queues # Stats that will be reported duration = 0 @@ -134,8 +144,8 @@ def run(upstream_queue): test_duration = conf.time stats = mp.Queue() # TODO(TheSriram) : Make ttl and grace configurable - args = (stats, num_workers, test_duration, conf.server_url, - 300, 200, conf.messages_per_claim) + args = (stats, num_workers, num_queues, test_duration, + conf.server_url, 300, 200, conf.messages_per_claim) procs = [mp.Process(target=load_generator, args=args) for _ in range(num_procs)] diff --git a/zaqar/bench/producer.py b/zaqar/bench/producer.py index ec90a4f59..38ec7e015 100644 --- a/zaqar/bench/producer.py +++ b/zaqar/bench/producer.py @@ -61,7 +61,7 @@ def load_messages(): "evt": "Single"}}}] -def producer(stats, test_duration): +def producer(queues, message_pool, stats, test_duration): """Producer Worker The Producer Worker continuously post messages for @@ -69,31 +69,26 @@ def producer(stats, test_duration): is recorded for calculating throughput and latency. """ - cli = client.Client(conf.server_url) - queue = cli.queue(conf.queue_prefix + '1') - message_pool = load_messages() - total_requests = 0 successful_requests = 0 total_elapsed = 0 end = time.time() + test_duration while time.time() < end: - marktime.start('post message') + queue = random.choice(queues) - # TODO(TheSriram): Track/report errors try: + marktime.start('post_message') + queue.post(choose_message(message_pool)) + total_elapsed += marktime.stop('post_message').seconds + successful_requests += 1 + except TransportError as ex: sys.stderr.write("Could not post a message : {0}\n".format(ex)) - else: - successful_requests += 1 - total_elapsed += marktime.stop('post message').seconds - - finally: - total_requests += 1 + total_requests += 1 stats.put({ 'successful_requests': successful_requests, @@ -105,12 +100,18 @@ def producer(stats, test_duration): # TODO(TheSriram): make distributed across multiple machines # TODO(TheSriram): post across several queues (which workers to which queues? # weight them, so can have some busy queues, some not.) -def load_generator(stats, num_workers, test_duration): - # TODO(TheSriram): Have some way to get all of the workers to line up and - # start at the same time (is this really useful?) +def load_generator(stats, num_workers, num_queues, test_duration): + + cli = client.Client(conf.server_url) + queues = [cli.queue(conf.queue_prefix + '-' + str(i)) + for i in range(num_queues)] + + message_pool = load_messages() gevent.joinall([ - gevent.spawn(producer, stats, test_duration) + gevent.spawn(producer, + queues, message_pool, stats, test_duration) + for _ in range(num_workers) ]) @@ -132,6 +133,7 @@ def crunch(stats): def run(upstream_queue): num_procs = conf.producer_processes num_workers = conf.producer_workers + num_queues = conf.num_queues duration = 0 total_requests = 0 @@ -142,7 +144,7 @@ def run(upstream_queue): if num_procs and num_workers: test_duration = conf.time stats = mp.Queue() - args = (stats, num_workers, test_duration) + args = (stats, num_workers, num_queues, test_duration) # TODO(TheSriram): Multiple test runs, vary num workers and # drain/delete queues in between each run. Plot these on a