Improve benchmarking tool

This commit adds several enhancements to benchmarking tool: server_url
and path to messages now can be configured in config file.  Default
output of program has been changed: now it prints values in json so they
can be parsed more easily. Previous human readable representation is
accessible via --verbose flag.

The `total_requests` metric now shows all performed requests (either
failed or successful) and new metric - `successful_requests` - was
introduced to store count of successful requests.

Change-Id: Id6fe4b2046394a348ba07eb5b2b003c6024b78b0
Partially-implements: blueprint gen-bench-reports
This commit is contained in:
Nataliia Uvarova 2014-07-31 12:29:42 +02:00
parent 5254d264d8
commit 3eef684d05
7 changed files with 151 additions and 74 deletions

View File

@ -82,6 +82,59 @@ And then run tests::
You can read more about running functional tests in separate `TESTS_README`_.
Running the benchmarking tool
----------------------
First install and run zaqar-server (see above).
Then install additional requirements::
pip install -r bench-requirements.txt
Copy the configuration file to ``~/.zaqar``::
cp etc/zaqar-benchmark.conf.sample ~/.zaqar/zaqar-benchmark.conf
In the configuration file specify where zaqar-server can be found::
server_url = http://localhost:8888
The benchmarking tool needs a set of messages to work with. Specify the path to the file with messages
in the configuration file. Alternatively, put it in the directory with the configuration file and name it
``zaqar-benchmark-messages.json``. As a starting point, you can use the sample file from the ``etc`` directory:
cp etc/zaqar-benchmark-messages.json ~/.zaqar/
If the file is not found or no file is specified, a single hard-coded message is used for all requests.
Run the benchmarking tool using the following command::
zaqar-bench-pc --processes 2 --workers 2 --time 10
By default, the results are in JSON. For more human-readable output add the ``--verbose`` flag.
Verbose output looks similar to the following::
Starting Producer...
Starting Consumer...
Params
processes: 2.0
workers: 2.0
Consumer
duration_sec: 4.2
ms_per_req: 38.9
total_reqs: 104.0
successful_reqs: 104.0
reqs_per_sec: 24.8
Producer
duration_sec: 4.1
ms_per_req: 6.9
total_reqs: 575.0
successful_reqs: 575.0
reqs_per_sec: 138.6
.. _`OpenStack` : http://openstack.org/
.. _`MongoDB` : http://docs.mongodb.org/manual/installation/
.. _`pyenv` : https://github.com/yyuu/pyenv/

View File

@ -0,0 +1,5 @@
[DEFAULT]
# verbose = False
# server_url = http://localhost:8888
# messages_path = some/path/to/messages.json
# queue_prefix = ogre-test-queue-

View File

@ -14,16 +14,31 @@
from __future__ import print_function
import json
import multiprocessing as mp
from zaqar.bench.config import conf
from zaqar.bench import consumer
from zaqar.bench import producer
def main():
procs = [mp.Process(target=worker.run)
downstream_queue = mp.Queue()
procs = [mp.Process(target=worker.run, args=(downstream_queue,))
for worker in [producer, consumer]]
for each_proc in procs:
each_proc.start()
for each_proc in procs:
each_proc.join()
stats = {'params': {'processes': conf.processes, 'workers': conf.workers}}
for each_proc in procs:
stats.update(downstream_queue.get_nowait())
if conf.verbose:
for name, stat in stats.items():
print(name.capitalize())
print("\n".join("{}: {:.1f}".format(*it) for it in stat.items()))
print('') # Blank line
else:
print(json.dumps(stats))

View File

@ -28,6 +28,9 @@ _CLI_OPTIONS = (
default=psutil.NUM_CPUS * 2,
help='Number of Workers'),
cfg.IntOpt('time', short='t', default=3, help="time in seconds"),
cfg.StrOpt('server_url', short='s', default='http://localhost:8888'),
cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue-'),
cfg.StrOpt('messages_path', short='m')
)
conf.register_cli_opts(_CLI_OPTIONS)
conf(project='zaqar', prog='zaqar-queues')
conf(project='zaqar', prog='zaqar-benchmark')

View File

@ -13,8 +13,10 @@
# limitations under the License.
from __future__ import division
from __future__ import print_function
import multiprocessing as mp
import sys
import time
from gevent import monkey as curious_george
@ -24,10 +26,7 @@ import marktime
from zaqarclient.queues.v1 import client
from zaqarclient.transport.errors import TransportError
from zaqar.bench.cli_config import conf
URL = 'http://localhost:8888'
QUEUE_PREFIX = 'ogre-test-queue-'
from zaqar.bench.config import conf
def claim_delete(stats, test_duration, ttl, grace, limit):
@ -38,8 +37,8 @@ def claim_delete(stats, test_duration, ttl, grace, limit):
delete is recorded for calculating throughput and latency.
"""
cli = client.Client(URL)
queue = cli.queue(QUEUE_PREFIX + '1')
cli = client.Client(conf.server_url)
queue = cli.queue(conf.queue_prefix + '1')
end = time.time() + test_duration
total_elapsed = 0
total_requests = 0
@ -52,11 +51,10 @@ def claim_delete(stats, test_duration, ttl, grace, limit):
claim = queue.claim(ttl=ttl, grace=grace, limit=limit)
except TransportError as ex:
print ("Could not claim messages : {0}".format(ex))
sys.stderr.write("Could not claim messages : {0}\n".format(ex))
else:
total_elapsed += marktime.stop('claim_message').seconds
total_requests += 1
claim_total_requests += 1
try:
@ -68,15 +66,20 @@ def claim_delete(stats, test_duration, ttl, grace, limit):
total_elapsed += marktime.stop('delete_message').seconds
delete_total_requests += 1
except TransportError as ex:
sys.stderr.write("Could not delete messages: {0}\n".format(ex))
finally:
total_requests += 1
finally:
total_requests += 1
stats.put({'total_requests': total_requests,
'claim_total_requests': claim_total_requests,
'delete_total_requests': delete_total_requests,
'total_elapsed': total_elapsed})
except TransportError as ex:
print ("Could not claim and delete : {0}".format(ex))
def load_generator(stats, num_workers, test_duration, url, ttl, grace, limit):
gevent.joinall([
@ -103,17 +106,18 @@ def crunch(stats):
delete_total_requests)
def run():
def run(upstream_queue):
num_procs = conf.processes
num_workers = conf.workers
test_duration = conf.time
stats = mp.Queue()
# TODO(TheSriram) : Make ttl,grace and limit configurable
args = (stats, num_workers, test_duration, URL, 300, 200, 1)
args = (stats, num_workers, test_duration, conf.server_url, 300, 200, 1)
procs = [mp.Process(target=load_generator, args=args)
for _ in range(num_procs)]
if conf.verbose:
print("\nStarting Consumer...")
start = time.time()
@ -126,17 +130,14 @@ def run():
(total_requests, total_latency, claim_total_requests,
delete_total_requests) = crunch(stats)
successful_requests = claim_total_requests + delete_total_requests
duration = time.time() - start
throughput = total_requests / duration
latency = 1000 * total_latency / total_requests
throughput = successful_requests / duration
latency = 1000 * total_latency / successful_requests
print('Duration: {0:.1f} sec'.format(duration))
print('Total Requests: {0}'.format(total_requests))
print('Throughput: {0:.0f} req/sec'.format(throughput))
print('Latency: {0:.1f} ms/req'.format(latency))
print('') # Blank line
def main():
run()
upstream_queue.put({'consumer': {
'duration_sec': duration,
'total_reqs': total_requests,
'successful_reqs': successful_requests,
'reqs_per_sec': throughput,
'ms_per_req': latency}})

View File

@ -13,10 +13,10 @@
# limitations under the License.
from __future__ import division
from __future__ import print_function
import json
import multiprocessing as mp
import os
import random
import sys
import time
@ -28,26 +28,10 @@ import marktime
from zaqarclient.queues.v1 import client
from zaqarclient.transport.errors import TransportError
from zaqar.bench.cli_config import conf
from zaqar.bench.config import conf
# TODO(TheSriram): Make configurable
URL = 'http://localhost:8888'
QUEUE_PREFIX = 'ogre-test-queue-'
# TODO(TheSriram) : Migrate from env variable to config
if os.environ.get('MESSAGES_PATH'):
with open(os.environ.get('MESSAGES_PATH')) as f:
message_pool = json.loads(f.read())
else:
print("Error : $MESSAGES_PATH needs to be set")
sys.exit(1)
message_pool.sort(key=lambda msg: msg['weight'])
def choose_message():
def choose_message(message_pool):
"""Choose a message from our pool of possibilities."""
# Assume message_pool is sorted by weight, ascending
@ -62,6 +46,21 @@ def choose_message():
assert False
def load_messages():
default_file_name = 'zaqar-benchmark-messages.json'
messages_path = conf.messages_path or conf.find_file(default_file_name)
if messages_path:
with open(messages_path) as f:
message_pool = json.load(f)
message_pool.sort(key=lambda msg: msg['weight'])
return message_pool
else:
return [{"weight": 1.0,
"doc": {"ttl": 60,
"body": {"id": "7FA23C90-62F7-40D2-9360-FBD5D7D61CD1",
"evt": "Single"}}}]
def producer(stats, test_duration):
"""Producer Worker
@ -70,10 +69,12 @@ def producer(stats, test_duration):
is recorded for calculating throughput and latency.
"""
cli = client.Client(URL)
queue = cli.queue(QUEUE_PREFIX + '1')
cli = client.Client(conf.server_url)
queue = cli.queue(conf.queue_prefix + '1')
message_pool = load_messages()
total_requests = 0
successful_requests = 0
total_elapsed = 0
end = time.time() + test_duration
@ -82,16 +83,20 @@ def producer(stats, test_duration):
# TODO(TheSriram): Track/report errors
try:
queue.post(choose_message())
queue.post(choose_message(message_pool))
except TransportError as ex:
print("Could not post a message : {0}".format(ex))
sys.stderr.write("Could not post a message : {0}\n".format(ex))
else:
successful_requests += 1
total_elapsed += marktime.stop('post message').seconds
finally:
total_requests += 1
stats.put({
'successful_requests': successful_requests,
'total_requests': total_requests,
'total_elapsed': total_elapsed
})
@ -113,17 +118,18 @@ def load_generator(stats, num_workers, test_duration):
def crunch(stats):
total_requests = 0
total_latency = 0.0
successful_requests = 0
while not stats.empty():
entry = stats.get_nowait()
total_requests += entry['total_requests']
total_latency += entry['total_elapsed']
successful_requests += entry['successful_requests']
return total_requests, total_latency
return successful_requests, total_requests, total_latency
def run():
def run(upstream_queue):
num_procs = conf.processes
num_workers = conf.workers
test_duration = conf.time
@ -139,6 +145,7 @@ def run():
for _ in range(num_procs)
]
if conf.verbose:
print('\nStarting Producer...')
start = time.time()
@ -148,22 +155,15 @@ def run():
for each_proc in procs:
each_proc.join()
total_requests, total_latency = crunch(stats)
successful_requests, total_requests, total_latency = crunch(stats)
# TODO(TheSriram): Add one more stat: "attempted req/sec" so can
# graph that on the x axis vs. achieved throughput and
# latency.
duration = time.time() - start
throughput = total_requests / duration
latency = 1000 * total_latency / total_requests
throughput = successful_requests / duration
latency = 1000 * total_latency / successful_requests
print('Duration: {0:.1f} sec'.format(duration))
print('Total Requests: {0}'.format(total_requests))
print('Throughput: {0:.0f} req/sec'.format(throughput))
print('Latency: {0:.1f} ms/req'.format(latency))
print('') # Blank line
def main():
run()
upstream_queue.put({'producer': {
'duration_sec': duration,
'total_reqs': total_requests,
'successful_reqs': successful_requests,
'reqs_per_sec': throughput,
'ms_per_req': latency}})