Add separate proc and worker options for producer and consumer

This patch splits the previous "worker" and "processes" config
options so that producers and consumers can be controlled
independently. In this way, a wider variety of loads can be
modeled.

Change-Id: Ia5c2cf941901c12108486de6cc9b9fa69477dc1a
This commit is contained in:
kgriffs 2014-08-19 13:18:57 -05:00
parent 8d889fc4e7
commit 04fa909282
5 changed files with 134 additions and 75 deletions

View File

@ -74,42 +74,54 @@ Running tests
First install additional requirements::
pip install tox
$ pip install tox
And then run tests::
tox -e py27
$ tox -e py27
You can read more about running functional tests in separate `TESTS_README`_.
Running the benchmarking tool
----------------------
-----------------------------
First install and run zaqar-server (see above).
Then install additional requirements::
pip install -r bench-requirements.txt
$ pip install -r bench-requirements.txt
Copy the configuration file to ``~/.zaqar``::
cp etc/zaqar-benchmark.conf.sample ~/.zaqar/zaqar-benchmark.conf
$ cp etc/zaqar-benchmark.conf.sample ~/.zaqar/zaqar-benchmark.conf
In the configuration file specify where zaqar-server can be found::
server_url = http://localhost:8888
The benchmarking tool needs a set of messages to work with. Specify the path to the file with messages
in the configuration file. Alternatively, put it in the directory with the configuration file and name it
``zaqar-benchmark-messages.json``. As a starting point, you can use the sample file from the ``etc`` directory:
The benchmarking tool needs a set of messages to work with. Specify the path
to the file with messages in the configuration file. Alternatively, put it in
the directory with the configuration file and name it ``zaqar-benchmark-
messages.json``. As a starting point, you can use the sample file from the
``etc`` directory::
cp etc/zaqar-benchmark-messages.json ~/.zaqar/
$ cp etc/zaqar-benchmark-messages.json ~/.zaqar/
If the file is not found or no file is specified, a single hard-coded message is used for all requests.
If the file is not found or no file is specified, a single hard-coded message
is used for all requests.
Run the benchmarking tool using the following command::
zaqar-bench-pc --processes 2 --workers 2 --time 10
$ zaqar-bench-pc
By default, the command will run a performance test for 3 seconds, using one
consumer and one producer for each CPU on the system, with 2 greenlet workers
per CPU per process. You can override these defaults in the config file or on
the command line using a variety of options. For example, the following
command runs a performance test for 10 seconds using 4 producer processes with
20 workers each, plus 1 consumer process with 4 workers::
$ zaqar-bench-pc -pp 4 -pw 20 -cp 1 -cw 4 -t 10
By default, the results are in JSON. For more human-readable output add the ``--verbose`` flag.
Verbose output looks similar to the following::
@ -117,23 +129,23 @@ Verbose output looks similar to the following::
Starting Producer...
Starting Consumer...
Params
processes: 2.0
workers: 2.0
Consumer
duration_sec: 4.2
ms_per_req: 38.9
total_reqs: 104.0
successful_reqs: 104.0
reqs_per_sec: 24.8
========
duration_sec: 10.1
ms_per_req: 77.1
total_reqs: 160.0
successful_reqs: 160.0
reqs_per_sec: 15.8
Producer
duration_sec: 4.1
ms_per_req: 6.9
total_reqs: 575.0
successful_reqs: 575.0
reqs_per_sec: 138.6
========
duration_sec: 10.2
ms_per_req: 4.6
total_reqs: 8866.0
successful_reqs: 8866.0
reqs_per_sec: 870.5
.. _`OpenStack` : http://openstack.org/
.. _`MongoDB` : http://docs.mongodb.org/manual/installation/

View File

@ -26,19 +26,35 @@ def main():
downstream_queue = mp.Queue()
procs = [mp.Process(target=worker.run, args=(downstream_queue,))
for worker in [producer, consumer]]
for each_proc in procs:
each_proc.start()
for each_proc in procs:
each_proc.join()
stats = {'params': {'processes': conf.processes, 'workers': conf.workers}}
stats = {}
for each_proc in procs:
stats.update(downstream_queue.get_nowait())
if conf.verbose:
print()
for name, stat in stats.items():
print(name.capitalize())
print('=' * len(name))
print("\n".join("{}: {:.1f}".format(*it) for it in stat.items()))
print('') # Blank line
else:
stats['params'] = {
'producer': {
'processes': conf.producer_processes,
'workers': conf.producer_workers
},
'consumer': {
'processes': conf.consumer_processes,
'workers': conf.consumer_workers
}
}
print(json.dumps(stats))

View File

@ -18,15 +18,25 @@ import psutil
conf = cfg.CONF
_CLI_OPTIONS = (
cfg.IntOpt(
'processes',
short='p',
'producer_processes',
short='pp',
default=psutil.NUM_CPUS,
help='Number of Processes'),
help='Number of Producer Processes'),
cfg.IntOpt(
'workers',
short='w',
'producer_workers',
short='pw',
default=psutil.NUM_CPUS * 2,
help='Number of Workers'),
help='Number of Producer Workers'),
cfg.IntOpt(
'consumer_processes',
short='cp',
default=psutil.NUM_CPUS,
help='Number of Consumer Processes'),
cfg.IntOpt(
'consumer_workers',
short='cw',
default=psutil.NUM_CPUS * 2,
help='Number of Consumer Workers'),
cfg.IntOpt('time', short='t', default=3, help="time in seconds"),
cfg.StrOpt('server_url', short='s', default='http://localhost:8888'),
cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue-'),

View File

@ -107,33 +107,44 @@ def crunch(stats):
def run(upstream_queue):
num_procs = conf.processes
num_workers = conf.workers
test_duration = conf.time
stats = mp.Queue()
# TODO(TheSriram) : Make ttl,grace and limit configurable
args = (stats, num_workers, test_duration, conf.server_url, 300, 200, 1)
num_procs = conf.consumer_processes
num_workers = conf.consumer_workers
procs = [mp.Process(target=load_generator, args=args)
for _ in range(num_procs)]
if num_procs and num_workers:
test_duration = conf.time
stats = mp.Queue()
# TODO(TheSriram) : Make ttl,grace and limit configurable
args = (stats, num_workers, test_duration, conf.server_url,
300, 200, 1)
if conf.verbose:
print("\nStarting Consumer...")
procs = [mp.Process(target=load_generator, args=args)
for _ in range(num_procs)]
start = time.time()
if conf.verbose:
print("\nStarting Consumer...")
for each_proc in procs:
each_proc.start()
for each_proc in procs:
each_proc.join()
start = time.time()
(total_requests, total_latency, claim_total_requests,
delete_total_requests) = crunch(stats)
for each_proc in procs:
each_proc.start()
successful_requests = claim_total_requests + delete_total_requests
duration = time.time() - start
throughput = successful_requests / duration
latency = 1000 * total_latency / successful_requests
for each_proc in procs:
each_proc.join()
(total_requests, total_latency, claim_total_requests,
delete_total_requests) = crunch(stats)
successful_requests = claim_total_requests + delete_total_requests
duration = time.time() - start
throughput = successful_requests / duration
latency = 1000 * total_latency / successful_requests
else:
duration = 0
total_requests = 0
successful_requests = 0
throughput = 0
latency = 0
upstream_queue.put({'consumer': {
'duration_sec': duration,

View File

@ -130,36 +130,46 @@ def crunch(stats):
def run(upstream_queue):
num_procs = conf.processes
num_workers = conf.workers
test_duration = conf.time
stats = mp.Queue()
args = (stats, num_workers, test_duration)
num_procs = conf.producer_processes
num_workers = conf.producer_workers
# TODO(TheSriram): Multiple test runs, vary num workers and drain/delete
# queues in between each run. Plot these on a graph, with
# concurrency as the X axis.
if num_procs and num_workers:
test_duration = conf.time
stats = mp.Queue()
args = (stats, num_workers, test_duration)
procs = [
mp.Process(target=load_generator, args=args)
for _ in range(num_procs)
]
# TODO(TheSriram): Multiple test runs, vary num workers and
# drain/delete queues in between each run. Plot these on a
# graph, with concurrency as the X axis.
if conf.verbose:
print('\nStarting Producer...')
start = time.time()
procs = [
mp.Process(target=load_generator, args=args)
for _ in range(num_procs)
]
for each_proc in procs:
each_proc.start()
if conf.verbose:
print('\nStarting Producer...')
for each_proc in procs:
each_proc.join()
start = time.time()
successful_requests, total_requests, total_latency = crunch(stats)
for each_proc in procs:
each_proc.start()
duration = time.time() - start
throughput = successful_requests / duration
latency = 1000 * total_latency / successful_requests
for each_proc in procs:
each_proc.join()
successful_requests, total_requests, total_latency = crunch(stats)
duration = time.time() - start
throughput = successful_requests / duration
latency = 1000 * total_latency / successful_requests
else:
duration = 0
total_requests = 0
successful_requests = 0
throughput = 0
latency = 0
upstream_queue.put({'producer': {
'duration_sec': duration,