Merge "Add the ability to benchmark across multiple queues"

This commit is contained in:
Jenkins 2014-08-26 19:22:26 +00:00 committed by Gerrit Code Review
commit 9889ff9b70
3 changed files with 40 additions and 27 deletions

View File

@ -43,7 +43,8 @@ _CLI_OPTIONS = (
cfg.IntOpt('time', short='t', default=3, cfg.IntOpt('time', short='t', default=3,
help="Duration of the performance test, in seconds"), help="Duration of the performance test, in seconds"),
cfg.StrOpt('server_url', short='s', default='http://localhost:8888'), cfg.StrOpt('server_url', short='s', default='http://localhost:8888'),
cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue-'), cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue'),
cfg.IntOpt('num_queues', short='qno', default=4),
cfg.StrOpt('messages_path', short='m') cfg.StrOpt('messages_path', short='m')
) )
conf.register_cli_opts(_CLI_OPTIONS) conf.register_cli_opts(_CLI_OPTIONS)

View File

@ -16,6 +16,7 @@ from __future__ import division
from __future__ import print_function from __future__ import print_function
import multiprocessing as mp import multiprocessing as mp
import random
import sys import sys
import time import time
@ -29,7 +30,7 @@ from zaqarclient.transport.errors import TransportError
from zaqar.bench.config import conf from zaqar.bench.config import conf
def claim_delete(stats, test_duration, ttl, grace, limit): def claim_delete(queues, stats, test_duration, ttl, grace, limit):
"""Consumer Worker """Consumer Worker
The Consumer Worker continuously claims and deletes messages The Consumer Worker continuously claims and deletes messages
@ -37,8 +38,6 @@ def claim_delete(stats, test_duration, ttl, grace, limit):
delete is recorded for calculating throughput and latency. delete is recorded for calculating throughput and latency.
""" """
cli = client.Client(conf.server_url)
queue = cli.queue(conf.queue_prefix + '1')
end = time.time() + test_duration end = time.time() + test_duration
claim_total_elapsed = 0 claim_total_elapsed = 0
delete_total_elapsed = 0 delete_total_elapsed = 0
@ -47,6 +46,9 @@ def claim_delete(stats, test_duration, ttl, grace, limit):
delete_total_requests = 0 delete_total_requests = 0
while time.time() < end: while time.time() < end:
# NOTE(kgriffs): Distribute requests across all queues evenly.
queue = random.choice(queues)
try: try:
marktime.start('claim_message') marktime.start('claim_message')
@ -88,10 +90,17 @@ def claim_delete(stats, test_duration, ttl, grace, limit):
}) })
def load_generator(stats, num_workers, test_duration, url, ttl, grace, limit): def load_generator(stats, num_workers, num_queues,
test_duration, url, ttl, grace, limit):
cli = client.Client(conf.server_url)
queues = [cli.queue(conf.queue_prefix + '-' + str(i))
for i in range(num_queues)]
gevent.joinall([ gevent.joinall([
gevent.spawn(claim_delete, stats, test_duration, ttl, gevent.spawn(claim_delete,
grace, limit) queues, stats, test_duration, ttl, grace, limit)
for _ in range(num_workers) for _ in range(num_workers)
]) ])
@ -118,6 +127,7 @@ def crunch(stats):
def run(upstream_queue): def run(upstream_queue):
num_procs = conf.consumer_processes num_procs = conf.consumer_processes
num_workers = conf.consumer_workers num_workers = conf.consumer_workers
num_queues = conf.num_queues
# Stats that will be reported # Stats that will be reported
duration = 0 duration = 0
@ -134,8 +144,8 @@ def run(upstream_queue):
test_duration = conf.time test_duration = conf.time
stats = mp.Queue() stats = mp.Queue()
# TODO(TheSriram) : Make ttl and grace configurable # TODO(TheSriram) : Make ttl and grace configurable
args = (stats, num_workers, test_duration, conf.server_url, args = (stats, num_workers, num_queues, test_duration,
300, 200, conf.messages_per_claim) conf.server_url, 300, 200, conf.messages_per_claim)
procs = [mp.Process(target=load_generator, args=args) procs = [mp.Process(target=load_generator, args=args)
for _ in range(num_procs)] for _ in range(num_procs)]

View File

@ -61,7 +61,7 @@ def load_messages():
"evt": "Single"}}}] "evt": "Single"}}}]
def producer(stats, test_duration): def producer(queues, message_pool, stats, test_duration):
"""Producer Worker """Producer Worker
The Producer Worker continuously post messages for The Producer Worker continuously post messages for
@ -69,31 +69,26 @@ def producer(stats, test_duration):
is recorded for calculating throughput and latency. is recorded for calculating throughput and latency.
""" """
cli = client.Client(conf.server_url)
queue = cli.queue(conf.queue_prefix + '1')
message_pool = load_messages()
total_requests = 0 total_requests = 0
successful_requests = 0 successful_requests = 0
total_elapsed = 0 total_elapsed = 0
end = time.time() + test_duration end = time.time() + test_duration
while time.time() < end: while time.time() < end:
marktime.start('post message') queue = random.choice(queues)
# TODO(TheSriram): Track/report errors
try: try:
marktime.start('post_message')
queue.post(choose_message(message_pool)) queue.post(choose_message(message_pool))
total_elapsed += marktime.stop('post_message').seconds
successful_requests += 1
except TransportError as ex: except TransportError as ex:
sys.stderr.write("Could not post a message : {0}\n".format(ex)) sys.stderr.write("Could not post a message : {0}\n".format(ex))
else: total_requests += 1
successful_requests += 1
total_elapsed += marktime.stop('post message').seconds
finally:
total_requests += 1
stats.put({ stats.put({
'successful_requests': successful_requests, 'successful_requests': successful_requests,
@ -105,12 +100,18 @@ def producer(stats, test_duration):
# TODO(TheSriram): make distributed across multiple machines # TODO(TheSriram): make distributed across multiple machines
# TODO(TheSriram): post across several queues (which workers to which queues? # TODO(TheSriram): post across several queues (which workers to which queues?
# weight them, so can have some busy queues, some not.) # weight them, so can have some busy queues, some not.)
def load_generator(stats, num_workers, test_duration): def load_generator(stats, num_workers, num_queues, test_duration):
# TODO(TheSriram): Have some way to get all of the workers to line up and
# start at the same time (is this really useful?) cli = client.Client(conf.server_url)
queues = [cli.queue(conf.queue_prefix + '-' + str(i))
for i in range(num_queues)]
message_pool = load_messages()
gevent.joinall([ gevent.joinall([
gevent.spawn(producer, stats, test_duration) gevent.spawn(producer,
queues, message_pool, stats, test_duration)
for _ in range(num_workers) for _ in range(num_workers)
]) ])
@ -132,6 +133,7 @@ def crunch(stats):
def run(upstream_queue): def run(upstream_queue):
num_procs = conf.producer_processes num_procs = conf.producer_processes
num_workers = conf.producer_workers num_workers = conf.producer_workers
num_queues = conf.num_queues
duration = 0 duration = 0
total_requests = 0 total_requests = 0
@ -142,7 +144,7 @@ def run(upstream_queue):
if num_procs and num_workers: if num_procs and num_workers:
test_duration = conf.time test_duration = conf.time
stats = mp.Queue() stats = mp.Queue()
args = (stats, num_workers, test_duration) args = (stats, num_workers, num_queues, test_duration)
# TODO(TheSriram): Multiple test runs, vary num workers and # TODO(TheSriram): Multiple test runs, vary num workers and
# drain/delete queues in between each run. Plot these on a # drain/delete queues in between each run. Plot these on a