From 6505cbcec187ef7bb179622193edc0f0616e9a7e Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Fri, 28 Jun 2013 13:41:34 -0500 Subject: [PATCH] Basic benchmark setup --- benchmarks/base.py | 93 +++++++++++++++++++ .../single_thread_callback_full_pipeline.py | 40 ++++++++ benchmarks/single_thread_sync.py | 8 ++ 3 files changed, 141 insertions(+) create mode 100644 benchmarks/base.py create mode 100644 benchmarks/single_thread_callback_full_pipeline.py create mode 100644 benchmarks/single_thread_sync.py diff --git a/benchmarks/base.py b/benchmarks/base.py new file mode 100644 index 00000000..716afee6 --- /dev/null +++ b/benchmarks/base.py @@ -0,0 +1,93 @@ +import logging +import os.path +import sys +import time +dirname = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(dirname) +sys.path.append(os.path.join(dirname, '..')) + +from cassandra.cluster import Cluster +from cassandra.io.asyncorereactor import AsyncoreConnection +from cassandra.query import SimpleStatement + +log = logging.getLogger() +log.setLevel('INFO') +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")) +log.addHandler(handler) + +supported_reactors = [AsyncoreConnection] +try: + from cassandra.io.pyevreactor import PyevConnection + supported_reactors.append(PyevConnection) +except ImportError, exc: + log.warning("Not benchmarking pyev reactor: %s" % (exc,)) + +KEYSPACE = "testkeyspace" +TABLE = "testtable" +NUM_QUERIES = 10000 + +def setup(): + + cluster = Cluster(['127.0.0.1']) + session = cluster.connect() + + rows = session.execute("SELECT keyspace_name FROM system.schema_keyspaces") + if KEYSPACE in [row[0] for row in rows]: + log.debug("dropping existing keyspace...") + session.execute("DROP KEYSPACE " + KEYSPACE) + + log.debug("Creating keyspace...") + session.execute(""" + CREATE KEYSPACE %s + WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' } + """ % KEYSPACE) + + log.debug("Setting keyspace...") + session.set_keyspace(KEYSPACE) + + log.debug("Creating table...") + session.execute(""" + CREATE TABLE %s ( + thekey text, + col1 text, + col2 text, + PRIMARY KEY (thekey, col1) + ) + """ % TABLE) + +def teardown(): + cluster = Cluster(['127.0.0.1']) + session = cluster.connect() + session.execute("DROP KEYSPACE " + KEYSPACE) + + +def benchmark(run_fn): + for conn_class in supported_reactors: + setup() + log.info("Testing %s" % (conn_class.__name__,)) + + cluster = Cluster(['127.0.0.1']) + cluster.connection_class = conn_class + session = cluster.connect(KEYSPACE) + + log.debug("Sleeping for two seconds...") + time.sleep(2.0) + + query = SimpleStatement(""" + INSERT INTO {table} (thekey, col1, col2) + VALUES (%(key)s, %(a)s, %(b)s) + """.format(table=TABLE)) + values = {'key': 'key', 'a': 'a', 'b': 'b'} + + log.debug("Beginning inserts...") + start = time.time() + try: + run_fn(session, query, values, NUM_QUERIES) + end = time.time() + finally: + teardown() + + total = end - start + log.info("Total time: %0.4f" % total) + log.info("Average throughput: %0.4f/sec" % (NUM_QUERIES / total)) diff --git a/benchmarks/single_thread_callback_full_pipeline.py b/benchmarks/single_thread_callback_full_pipeline.py new file mode 100644 index 00000000..fa7cbf56 --- /dev/null +++ b/benchmarks/single_thread_callback_full_pipeline.py @@ -0,0 +1,40 @@ +from base import benchmark + +import logging +from collections import deque +from itertools import count +from threading import Event + +log = logging.getLogger(__name__) + +futures = deque() +initial = object() + +def execute(session, query, values, num_queries): + + num_started = count() + num_finished = count() + event = Event() + + def handle_error(exc): + log.error("Error on insert: %r", exc) + + def insert_next(previous_result): + current_num = num_started.next() + + if previous_result is not initial: + num = next(num_finished) + if num >= num_queries: + event.set() + + if current_num <= num_queries: + future = session.execute_async(query, values) + future.add_callbacks(insert_next, handle_error) + + for i in range(120): + insert_next(initial) + + event.wait() + +if __name__ == "__main__": + benchmark(execute) diff --git a/benchmarks/single_thread_sync.py b/benchmarks/single_thread_sync.py new file mode 100644 index 00000000..3fd0cf48 --- /dev/null +++ b/benchmarks/single_thread_sync.py @@ -0,0 +1,8 @@ +from base import benchmark + +def execute(session, query, values, num_queries): + for i in xrange(num_queries): + session.execute(query, values) + +if __name__ == "__main__": + benchmark(execute)