Add observer role to benchmark tool

This patch introduces a new observer role, which lists messages but
does not claim them. As part of this work the config options were
updated and the defaults adjusted to provide a better "kick the
tires" experience.

The default number of procs and workers is now hard-coded rather
than being based on number of available CPUs, since the number of
workers you may want to run is more dependent on the size of your
Zaqar deployment and your network bandwidth than it is on the
number of CPUs on the load generator.

Finally, the "-pc" suffix was removed from the command name. This
was included in this patch because it didn't seem significant
enough to split out.

Change-Id: I8a8190fb2cebc3489c78da4f6e1e7c51d8b97017
This commit is contained in:
kgriffs 2014-08-22 16:02:24 -05:00
parent ab1d471d94
commit 07ddeee517
8 changed files with 319 additions and 70 deletions

View File

@ -112,40 +112,45 @@ is used for all requests.
Run the benchmarking tool using the following command:: Run the benchmarking tool using the following command::
$ zaqar-bench-pc $ zaqar-bench
By default, the command will run a performance test for 3 seconds, using one By default, the command will run a performance test for 5 seconds, using one
consumer and one producer for each CPU on the system, with 2 greenlet workers producer process with 10 greenlet workers, and one observer process with
per CPU per process. You can override these defaults in the config file or on 5 workers. The consumer role is disabled by default.
the command line using a variety of options. For example, the following
command runs a performance test for 10 seconds using 4 producer processes with
20 workers each, plus 1 consumer process with 4 workers::
$ zaqar-bench-pc -pp 4 -pw 20 -cp 1 -cw 4 -t 10 You can override these defaults in the config file or on the command line
using a variety of options. For example, the following command runs a
performance test for 30 seconds using 4 producer processes with
20 workers each, plus 4 consumer processes with 20 workers each. Note that
the observer role is also disabled in this example by setting its number of
workers to zero::
By default, the results are in JSON. For more human-readable output add the ``--verbose`` flag. $ zaqar-bench -pp 4 -pw 10 -cw 4 -cw 20 -ow 0 -t 30
Verbose output looks similar to the following::
Starting Producer... By default, the results are in JSON. For more human-readable output add
the ``--verbose`` flag. Verbose output looks similar to the following::
Starting Consumer... $ zaqar-bench --verbose
Consumer Starting producer (pp=1 , pw=10)...
========
duration_sec: 10.2 Starting observer (op=1 , ow=5)...
ms_per_claim: 37.6
ms_per_delete: 11.8
reqs_per_sec: 82.0
successful_reqs: 833.0
total_reqs: 833.0
Producer Producer
======== ========
duration_sec: 10.2 duration_sec: 5.1
ms_per_req: 3.8 ms_per_req: 2.9
reqs_per_sec: 1033.6 reqs_per_sec: 344.5
successful_reqs: 10523.0 successful_reqs: 1742.0
total_reqs: 10523.0 total_reqs: 1742.0
Observer
========
duration_sec: 5.0
ms_per_req: 2.9
reqs_per_sec: 339.3
successful_reqs: 1706.0
total_reqs: 1706.0
.. _`OpenStack` : http://openstack.org/ .. _`OpenStack` : http://openstack.org/

View File

@ -1,5 +1,4 @@
argparse>=1.2.1 argparse>=1.2.1
gevent>=1.0.1 gevent>=1.0.1
marktime>=0.2.0 marktime>=0.2.0
psutil>=2.1.1
python-zaqarclient>=0.0.2 python-zaqarclient>=0.0.2

View File

@ -32,7 +32,7 @@ source-dir = doc/source
[entry_points] [entry_points]
console_scripts = console_scripts =
zaqar-bench-pc = zaqar.bench.conductor:main zaqar-bench = zaqar.bench.conductor:main
zaqar-server = zaqar.cmd.server:run zaqar-server = zaqar.cmd.server:run
marconi-server = zaqar.cmd.server:run marconi-server = zaqar.cmd.server:run

View File

@ -17,17 +17,52 @@ from __future__ import print_function
import json import json
import multiprocessing as mp import multiprocessing as mp
from zaqar.bench.config import conf from zaqarclient.queues.v1 import client
from zaqar.bench import config
from zaqar.bench import consumer from zaqar.bench import consumer
from zaqar.bench import observer
from zaqar.bench import producer from zaqar.bench import producer
CONF = config.conf
def _print_verbose_stats(name, stats):
print(name.capitalize())
print('=' * len(name))
values = sorted(stats.items(), key=lambda v: v[0])
formatted_vals = ['{}: {:.1f}'.format(*v) for v in values]
print('\n'.join(formatted_vals))
print() # Blank line
def _reset_queues():
cli = client.Client(CONF.server_url)
for i in range(CONF.num_queues):
# TODO(kgriffs): DRY up name generation so it is done
# in a helper, vs. being copy-pasted everywhere.
queue = cli.queue(CONF.queue_prefix + '-' + str(i))
queue.delete()
def main(): def main():
conf(project='zaqar', prog='zaqar-benchmark') CONF(project='zaqar', prog='zaqar-benchmark')
# NOTE(kgriffs): Reset queues since last time. We don't
# clean them up after the performance test, in case
# the user wants to examine the state of the system.
if not CONF.skip_queue_reset:
if CONF.verbose:
print('Resetting queues...')
_reset_queues()
downstream_queue = mp.Queue() downstream_queue = mp.Queue()
procs = [mp.Process(target=worker.run, args=(downstream_queue,)) procs = [mp.Process(target=worker.run, args=(downstream_queue,))
for worker in [producer, consumer]] for worker in [producer, consumer, observer]]
for each_proc in procs: for each_proc in procs:
each_proc.start() each_proc.start()
@ -39,29 +74,32 @@ def main():
for each_proc in procs: for each_proc in procs:
stats.update(downstream_queue.get_nowait()) stats.update(downstream_queue.get_nowait())
if conf.verbose: if CONF.verbose:
print() print()
for name, stats_group in stats.items(): for name in ('producer', 'observer', 'consumer'):
print(name.capitalize()) stats_group = stats[name]
print('=' * len(name))
values = sorted(stats_group.items(), key=lambda v: v[0]) # Skip disabled workers
formatted_vals = ["{}: {:.1f}".format(*v) for v in values] if not stats_group['duration_sec']:
continue
print("\n".join(formatted_vals)) _print_verbose_stats(name, stats_group)
print('') # Blank line
else: else:
stats['params'] = { stats['params'] = {
'producer': { 'producer': {
'processes': conf.producer_processes, 'processes': CONF.producer_processes,
'workers': conf.producer_workers 'workers': CONF.producer_workers
}, },
'consumer': { 'consumer': {
'processes': conf.consumer_processes, 'processes': CONF.consumer_processes,
'workers': conf.consumer_workers 'workers': CONF.consumer_workers
} },
'observer': {
'processes': CONF.observer_processes,
'workers': CONF.observer_workers
},
} }
print(json.dumps(stats)) print(json.dumps(stats))

View File

@ -13,38 +13,61 @@
# limitations under the License. # limitations under the License.
from oslo.config import cfg from oslo.config import cfg
import psutil
conf = cfg.CONF conf = cfg.CONF
_CLI_OPTIONS = ( _CLI_OPTIONS = (
cfg.IntOpt( cfg.IntOpt(
'producer_processes', 'producer_processes',
short='pp', short='pp',
default=psutil.NUM_CPUS, default=1,
help='Number of Producer Processes'), help='Number of Producer Processes'),
cfg.IntOpt( cfg.IntOpt(
'producer_workers', 'producer_workers',
short='pw', short='pw',
default=psutil.NUM_CPUS * 2, default=10,
help='Number of Producer Workers'), help='Number of Producer Workers'),
cfg.IntOpt( cfg.IntOpt(
'consumer_processes', 'consumer_processes',
short='cp', short='cp',
default=psutil.NUM_CPUS, default=1,
help='Number of Consumer Processes'), help='Number of Consumer Processes'),
cfg.IntOpt( cfg.IntOpt(
'consumer_workers', 'consumer_workers',
short='cw', short='cw',
default=psutil.NUM_CPUS * 2, default=0,
help='Number of Consumer Workers'), help='Number of Consumer Workers'),
cfg.IntOpt(
'observer_processes',
short='op',
default=1,
help='Number of Observer Processes'),
cfg.IntOpt(
'observer_workers',
short='ow',
default=5,
help='Number of Observer Workers'),
cfg.IntOpt('messages_per_claim', short='cno', default=5, cfg.IntOpt('messages_per_claim', short='cno', default=5,
help=('Number of messages the consumer will attempt to ' help=('Number of messages the consumer will attempt to '
'claim at a time')), 'claim at a time')),
cfg.IntOpt('time', short='t', default=3, cfg.IntOpt('messages_per_list', short='lno', default=5,
help=('Number of messages the obserer will attempt to '
'list at a time')),
cfg.IntOpt('time', short='t', default=5,
help="Duration of the performance test, in seconds"), help="Duration of the performance test, in seconds"),
cfg.StrOpt('server_url', short='s', default='http://localhost:8888'), cfg.StrOpt('server_url', short='s', default='http://localhost:8888'),
cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue'), cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue'),
cfg.IntOpt('num_queues', short='qno', default=4), cfg.IntOpt('num_queues', short='qno', default=4),
cfg.StrOpt('messages_path', short='m')
cfg.StrOpt('messages_path', short='m'),
cfg.BoolOpt('skip_queue_reset', default=False,
help=('Do not reset queues before running'
'the performance test')),
) )
conf.register_cli_opts(_CLI_OPTIONS) conf.register_cli_opts(_CLI_OPTIONS)

View File

@ -27,7 +27,9 @@ import marktime
from zaqarclient.queues.v1 import client from zaqarclient.queues.v1 import client
from zaqarclient.transport.errors import TransportError from zaqarclient.transport.errors import TransportError
from zaqar.bench.config import conf from zaqar.bench import config
CONF = config.conf
def claim_delete(queues, stats, test_duration, ttl, grace, limit): def claim_delete(queues, stats, test_duration, ttl, grace, limit):
@ -93,8 +95,8 @@ def claim_delete(queues, stats, test_duration, ttl, grace, limit):
def load_generator(stats, num_workers, num_queues, def load_generator(stats, num_workers, num_queues,
test_duration, url, ttl, grace, limit): test_duration, url, ttl, grace, limit):
cli = client.Client(conf.server_url) cli = client.Client(CONF.server_url)
queues = [cli.queue(conf.queue_prefix + '-' + str(i)) queues = [cli.queue(CONF.queue_prefix + '-' + str(i))
for i in range(num_queues)] for i in range(num_queues)]
gevent.joinall([ gevent.joinall([
@ -125,9 +127,9 @@ def crunch(stats):
def run(upstream_queue): def run(upstream_queue):
num_procs = conf.consumer_processes num_procs = CONF.consumer_processes
num_workers = conf.consumer_workers num_workers = CONF.consumer_workers
num_queues = conf.num_queues num_queues = CONF.num_queues
# Stats that will be reported # Stats that will be reported
duration = 0 duration = 0
@ -141,17 +143,18 @@ def run(upstream_queue):
# Performance test # Performance test
if num_procs and num_workers: if num_procs and num_workers:
test_duration = conf.time test_duration = CONF.time
stats = mp.Queue() stats = mp.Queue()
# TODO(TheSriram) : Make ttl and grace configurable # TODO(TheSriram) : Make ttl and grace configurable
args = (stats, num_workers, num_queues, test_duration, args = (stats, num_workers, num_queues, test_duration,
conf.server_url, 300, 200, conf.messages_per_claim) CONF.server_url, 300, 200, CONF.messages_per_claim)
procs = [mp.Process(target=load_generator, args=args) procs = [mp.Process(target=load_generator, args=args)
for _ in range(num_procs)] for _ in range(num_procs)]
if conf.verbose: if CONF.verbose:
print("\nStarting Consumer...") print('\nStarting consumers (cp={0}, cw={1})...'.format(
num_procs, num_workers))
start = time.time() start = time.time()

178
zaqar/bench/observer.py Normal file
View File

@ -0,0 +1,178 @@
# Copyright (c) 2014 Rackspace, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from __future__ import print_function
import multiprocessing as mp
import random
import sys
import time
from gevent import monkey as curious_george
curious_george.patch_all(thread=False, select=False)
import gevent
import marktime
from six.moves import urllib
from zaqarclient.queues.v1 import client
from zaqarclient.transport.errors import TransportError
from zaqar.bench import config
CONF = config.conf
#
# TODO(kgriffs): Factor out the common code from producer, consumer
# and worker (DRY all the things!)
#
def _extract_marker(links):
for link in links:
if link['rel'] == 'next':
href = link['href']
break
query = urllib.parse.urlparse(href).query
params = urllib.parse.parse_qs(query)
return params['marker'][0]
def observer(queues, stats, test_duration, limit):
"""Observer Worker
The observer lists messages without claiming them.
"""
end = time.time() + test_duration
total_elapsed = 0
total_succeeded = 0
total_failed = 0
queues = [{'q': q, 'm': None} for q in queues]
while time.time() < end:
# NOTE(kgriffs): Distribute requests across all queues evenly.
queue = random.choice(queues)
try:
marktime.start('list_messages')
cursor = queue['q'].messages(limit=limit, marker=queue['m'])
total_elapsed += marktime.stop('list_messages').seconds
total_succeeded += 1
messages = list(cursor)
if messages:
# TODO(kgriffs): Figure out a less hacky way to do this
# while preserving the ability to measure elapsed time
# per request.
queue['m'] = _extract_marker(cursor._links)
except TransportError as ex:
sys.stderr.write("Could not list messages : {0}\n".format(ex))
total_failed += 1
total_requests = total_succeeded + total_failed
stats.put({
'total_requests': total_requests,
'total_succeeded': total_succeeded,
'total_elapsed': total_elapsed,
})
def load_generator(stats, num_workers, num_queues,
test_duration, limit):
cli = client.Client(CONF.server_url)
queues = [cli.queue(CONF.queue_prefix + '-' + str(i))
for i in range(num_queues)]
gevent.joinall([
gevent.spawn(observer,
queues, stats, test_duration, limit)
for _ in range(num_workers)
])
def crunch(stats):
total_requests = 0
total_succeeded = 0
total_elapsed = 0.0
while not stats.empty():
entry = stats.get_nowait()
total_requests += entry['total_requests']
total_succeeded += entry['total_succeeded']
total_elapsed += entry['total_elapsed']
return (total_requests, total_succeeded, total_elapsed)
def run(upstream_queue):
num_procs = CONF.observer_processes
num_workers = CONF.observer_workers
num_queues = CONF.num_queues
# Stats that will be reported
duration = 0
total_requests = 0
total_succeeded = 0
throughput = 0
latency = 0
# Performance test
if num_procs and num_workers:
test_duration = CONF.time
stats = mp.Queue()
args = (stats, num_workers, num_queues, test_duration,
CONF.messages_per_list)
procs = [mp.Process(target=load_generator, args=args)
for _ in range(num_procs)]
if CONF.verbose:
print('\nStarting observer (op={0}, ow={1})...'.format(
num_procs, num_workers))
start = time.time()
for each_proc in procs:
each_proc.start()
for each_proc in procs:
each_proc.join()
(total_requests, total_succeeded, total_elapsed) = crunch(stats)
duration = time.time() - start
throughput = total_succeeded / duration
if total_succeeded:
latency = (1000 * total_elapsed / total_succeeded)
upstream_queue.put({
'observer': {
'duration_sec': duration,
'total_reqs': total_requests,
'successful_reqs': total_succeeded,
'reqs_per_sec': throughput,
'ms_per_req': latency,
}
})

View File

@ -28,7 +28,9 @@ import marktime
from zaqarclient.queues.v1 import client from zaqarclient.queues.v1 import client
from zaqarclient.transport.errors import TransportError from zaqarclient.transport.errors import TransportError
from zaqar.bench.config import conf from zaqar.bench import config
CONF = config.conf
def choose_message(message_pool): def choose_message(message_pool):
@ -48,7 +50,7 @@ def choose_message(message_pool):
def load_messages(): def load_messages():
default_file_name = 'zaqar-benchmark-messages.json' default_file_name = 'zaqar-benchmark-messages.json'
messages_path = conf.messages_path or conf.find_file(default_file_name) messages_path = CONF.messages_path or CONF.find_file(default_file_name)
if messages_path: if messages_path:
with open(messages_path) as f: with open(messages_path) as f:
message_pool = json.load(f) message_pool = json.load(f)
@ -102,8 +104,8 @@ def producer(queues, message_pool, stats, test_duration):
# weight them, so can have some busy queues, some not.) # weight them, so can have some busy queues, some not.)
def load_generator(stats, num_workers, num_queues, test_duration): def load_generator(stats, num_workers, num_queues, test_duration):
cli = client.Client(conf.server_url) cli = client.Client(CONF.server_url)
queues = [cli.queue(conf.queue_prefix + '-' + str(i)) queues = [cli.queue(CONF.queue_prefix + '-' + str(i))
for i in range(num_queues)] for i in range(num_queues)]
message_pool = load_messages() message_pool = load_messages()
@ -131,9 +133,9 @@ def crunch(stats):
def run(upstream_queue): def run(upstream_queue):
num_procs = conf.producer_processes num_procs = CONF.producer_processes
num_workers = conf.producer_workers num_workers = CONF.producer_workers
num_queues = conf.num_queues num_queues = CONF.num_queues
duration = 0 duration = 0
total_requests = 0 total_requests = 0
@ -142,7 +144,7 @@ def run(upstream_queue):
latency = 0 latency = 0
if num_procs and num_workers: if num_procs and num_workers:
test_duration = conf.time test_duration = CONF.time
stats = mp.Queue() stats = mp.Queue()
args = (stats, num_workers, num_queues, test_duration) args = (stats, num_workers, num_queues, test_duration)
@ -155,8 +157,9 @@ def run(upstream_queue):
for _ in range(num_procs) for _ in range(num_procs)
] ]
if conf.verbose: if CONF.verbose:
print('\nStarting Producer...') print('\nStarting producer (pp={0}, pw={1})...'.format(
num_procs, num_workers))
start = time.time() start = time.time()