From 34c30bc53b2aae50d85856eb5ef0f700cc602e47 Mon Sep 17 00:00:00 2001 From: kgriffs Date: Thu, 21 Aug 2014 18:14:10 -0500 Subject: [PATCH] Add the ability to benchmark across multiple queues Previously, all operations where performed on a single queue. This patch introduces the ability to distribute operations across multiple queues, which is important to measure since we know that the mongod driver has some per-queue performance limits, but that it has several features which allow for scaling load by spreading it across multiple queues. Note that the default of 4 queues was chosen somewhat arbitrarily, and may be adjusted later as we gather more data. As part of this patch, queue classes and sample messages were hoisted so they can be shared across all greenlets. Sharing a requests session across all workers should improve the performance of the benchmarking tool itself. Change-Id: Iac6625b0214d40156da1c2bebe738fa453f184d1 --- zaqar/bench/config.py | 3 ++- zaqar/bench/consumer.py | 26 ++++++++++++++++++-------- zaqar/bench/producer.py | 38 ++++++++++++++++++++------------------ 3 files changed, 40 insertions(+), 27 deletions(-) diff --git a/zaqar/bench/config.py b/zaqar/bench/config.py index 25bd8dbb5..03dcc108b 100644 --- a/zaqar/bench/config.py +++ b/zaqar/bench/config.py @@ -43,7 +43,8 @@ _CLI_OPTIONS = ( cfg.IntOpt('time', short='t', default=3, help="Duration of the performance test, in seconds"), cfg.StrOpt('server_url', short='s', default='http://localhost:8888'), - cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue-'), + cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue'), + cfg.IntOpt('num_queues', short='qno', default=4), cfg.StrOpt('messages_path', short='m') ) conf.register_cli_opts(_CLI_OPTIONS) diff --git a/zaqar/bench/consumer.py b/zaqar/bench/consumer.py index 9d5c08b0e..44da977a2 100644 --- a/zaqar/bench/consumer.py +++ b/zaqar/bench/consumer.py @@ -16,6 +16,7 @@ from __future__ import division from __future__ import print_function import multiprocessing as mp +import random import sys import time @@ -29,7 +30,7 @@ from zaqarclient.transport.errors import TransportError from zaqar.bench.config import conf -def claim_delete(stats, test_duration, ttl, grace, limit): +def claim_delete(queues, stats, test_duration, ttl, grace, limit): """Consumer Worker The Consumer Worker continuously claims and deletes messages @@ -37,8 +38,6 @@ def claim_delete(stats, test_duration, ttl, grace, limit): delete is recorded for calculating throughput and latency. """ - cli = client.Client(conf.server_url) - queue = cli.queue(conf.queue_prefix + '1') end = time.time() + test_duration claim_total_elapsed = 0 delete_total_elapsed = 0 @@ -47,6 +46,9 @@ def claim_delete(stats, test_duration, ttl, grace, limit): delete_total_requests = 0 while time.time() < end: + # NOTE(kgriffs): Distribute requests across all queues evenly. + queue = random.choice(queues) + try: marktime.start('claim_message') @@ -88,10 +90,17 @@ def claim_delete(stats, test_duration, ttl, grace, limit): }) -def load_generator(stats, num_workers, test_duration, url, ttl, grace, limit): +def load_generator(stats, num_workers, num_queues, + test_duration, url, ttl, grace, limit): + + cli = client.Client(conf.server_url) + queues = [cli.queue(conf.queue_prefix + '-' + str(i)) + for i in range(num_queues)] + gevent.joinall([ - gevent.spawn(claim_delete, stats, test_duration, ttl, - grace, limit) + gevent.spawn(claim_delete, + queues, stats, test_duration, ttl, grace, limit) + for _ in range(num_workers) ]) @@ -118,6 +127,7 @@ def crunch(stats): def run(upstream_queue): num_procs = conf.consumer_processes num_workers = conf.consumer_workers + num_queues = conf.num_queues # Stats that will be reported duration = 0 @@ -134,8 +144,8 @@ def run(upstream_queue): test_duration = conf.time stats = mp.Queue() # TODO(TheSriram) : Make ttl and grace configurable - args = (stats, num_workers, test_duration, conf.server_url, - 300, 200, conf.messages_per_claim) + args = (stats, num_workers, num_queues, test_duration, + conf.server_url, 300, 200, conf.messages_per_claim) procs = [mp.Process(target=load_generator, args=args) for _ in range(num_procs)] diff --git a/zaqar/bench/producer.py b/zaqar/bench/producer.py index ec90a4f59..38ec7e015 100644 --- a/zaqar/bench/producer.py +++ b/zaqar/bench/producer.py @@ -61,7 +61,7 @@ def load_messages(): "evt": "Single"}}}] -def producer(stats, test_duration): +def producer(queues, message_pool, stats, test_duration): """Producer Worker The Producer Worker continuously post messages for @@ -69,31 +69,26 @@ def producer(stats, test_duration): is recorded for calculating throughput and latency. """ - cli = client.Client(conf.server_url) - queue = cli.queue(conf.queue_prefix + '1') - message_pool = load_messages() - total_requests = 0 successful_requests = 0 total_elapsed = 0 end = time.time() + test_duration while time.time() < end: - marktime.start('post message') + queue = random.choice(queues) - # TODO(TheSriram): Track/report errors try: + marktime.start('post_message') + queue.post(choose_message(message_pool)) + total_elapsed += marktime.stop('post_message').seconds + successful_requests += 1 + except TransportError as ex: sys.stderr.write("Could not post a message : {0}\n".format(ex)) - else: - successful_requests += 1 - total_elapsed += marktime.stop('post message').seconds - - finally: - total_requests += 1 + total_requests += 1 stats.put({ 'successful_requests': successful_requests, @@ -105,12 +100,18 @@ def producer(stats, test_duration): # TODO(TheSriram): make distributed across multiple machines # TODO(TheSriram): post across several queues (which workers to which queues? # weight them, so can have some busy queues, some not.) -def load_generator(stats, num_workers, test_duration): - # TODO(TheSriram): Have some way to get all of the workers to line up and - # start at the same time (is this really useful?) +def load_generator(stats, num_workers, num_queues, test_duration): + + cli = client.Client(conf.server_url) + queues = [cli.queue(conf.queue_prefix + '-' + str(i)) + for i in range(num_queues)] + + message_pool = load_messages() gevent.joinall([ - gevent.spawn(producer, stats, test_duration) + gevent.spawn(producer, + queues, message_pool, stats, test_duration) + for _ in range(num_workers) ]) @@ -132,6 +133,7 @@ def crunch(stats): def run(upstream_queue): num_procs = conf.producer_processes num_workers = conf.producer_workers + num_queues = conf.num_queues duration = 0 total_requests = 0 @@ -142,7 +144,7 @@ def run(upstream_queue): if num_procs and num_workers: test_duration = conf.time stats = mp.Queue() - args = (stats, num_workers, test_duration) + args = (stats, num_workers, num_queues, test_duration) # TODO(TheSriram): Multiple test runs, vary num workers and # drain/delete queues in between each run. Plot these on a