Basic benchmark setup
This commit is contained in:
93
benchmarks/base.py
Normal file
93
benchmarks/base.py
Normal file
@@ -0,0 +1,93 @@
|
||||
import logging
|
||||
import os.path
|
||||
import sys
|
||||
import time
|
||||
dirname = os.path.dirname(os.path.abspath(__file__))
|
||||
sys.path.append(dirname)
|
||||
sys.path.append(os.path.join(dirname, '..'))
|
||||
|
||||
from cassandra.cluster import Cluster
|
||||
from cassandra.io.asyncorereactor import AsyncoreConnection
|
||||
from cassandra.query import SimpleStatement
|
||||
|
||||
log = logging.getLogger()
|
||||
log.setLevel('INFO')
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
|
||||
log.addHandler(handler)
|
||||
|
||||
supported_reactors = [AsyncoreConnection]
|
||||
try:
|
||||
from cassandra.io.pyevreactor import PyevConnection
|
||||
supported_reactors.append(PyevConnection)
|
||||
except ImportError, exc:
|
||||
log.warning("Not benchmarking pyev reactor: %s" % (exc,))
|
||||
|
||||
KEYSPACE = "testkeyspace"
|
||||
TABLE = "testtable"
|
||||
NUM_QUERIES = 10000
|
||||
|
||||
def setup():
|
||||
|
||||
cluster = Cluster(['127.0.0.1'])
|
||||
session = cluster.connect()
|
||||
|
||||
rows = session.execute("SELECT keyspace_name FROM system.schema_keyspaces")
|
||||
if KEYSPACE in [row[0] for row in rows]:
|
||||
log.debug("dropping existing keyspace...")
|
||||
session.execute("DROP KEYSPACE " + KEYSPACE)
|
||||
|
||||
log.debug("Creating keyspace...")
|
||||
session.execute("""
|
||||
CREATE KEYSPACE %s
|
||||
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
|
||||
""" % KEYSPACE)
|
||||
|
||||
log.debug("Setting keyspace...")
|
||||
session.set_keyspace(KEYSPACE)
|
||||
|
||||
log.debug("Creating table...")
|
||||
session.execute("""
|
||||
CREATE TABLE %s (
|
||||
thekey text,
|
||||
col1 text,
|
||||
col2 text,
|
||||
PRIMARY KEY (thekey, col1)
|
||||
)
|
||||
""" % TABLE)
|
||||
|
||||
def teardown():
|
||||
cluster = Cluster(['127.0.0.1'])
|
||||
session = cluster.connect()
|
||||
session.execute("DROP KEYSPACE " + KEYSPACE)
|
||||
|
||||
|
||||
def benchmark(run_fn):
|
||||
for conn_class in supported_reactors:
|
||||
setup()
|
||||
log.info("Testing %s" % (conn_class.__name__,))
|
||||
|
||||
cluster = Cluster(['127.0.0.1'])
|
||||
cluster.connection_class = conn_class
|
||||
session = cluster.connect(KEYSPACE)
|
||||
|
||||
log.debug("Sleeping for two seconds...")
|
||||
time.sleep(2.0)
|
||||
|
||||
query = SimpleStatement("""
|
||||
INSERT INTO {table} (thekey, col1, col2)
|
||||
VALUES (%(key)s, %(a)s, %(b)s)
|
||||
""".format(table=TABLE))
|
||||
values = {'key': 'key', 'a': 'a', 'b': 'b'}
|
||||
|
||||
log.debug("Beginning inserts...")
|
||||
start = time.time()
|
||||
try:
|
||||
run_fn(session, query, values, NUM_QUERIES)
|
||||
end = time.time()
|
||||
finally:
|
||||
teardown()
|
||||
|
||||
total = end - start
|
||||
log.info("Total time: %0.4f" % total)
|
||||
log.info("Average throughput: %0.4f/sec" % (NUM_QUERIES / total))
|
||||
40
benchmarks/single_thread_callback_full_pipeline.py
Normal file
40
benchmarks/single_thread_callback_full_pipeline.py
Normal file
@@ -0,0 +1,40 @@
|
||||
from base import benchmark
|
||||
|
||||
import logging
|
||||
from collections import deque
|
||||
from itertools import count
|
||||
from threading import Event
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
futures = deque()
|
||||
initial = object()
|
||||
|
||||
def execute(session, query, values, num_queries):
|
||||
|
||||
num_started = count()
|
||||
num_finished = count()
|
||||
event = Event()
|
||||
|
||||
def handle_error(exc):
|
||||
log.error("Error on insert: %r", exc)
|
||||
|
||||
def insert_next(previous_result):
|
||||
current_num = num_started.next()
|
||||
|
||||
if previous_result is not initial:
|
||||
num = next(num_finished)
|
||||
if num >= num_queries:
|
||||
event.set()
|
||||
|
||||
if current_num <= num_queries:
|
||||
future = session.execute_async(query, values)
|
||||
future.add_callbacks(insert_next, handle_error)
|
||||
|
||||
for i in range(120):
|
||||
insert_next(initial)
|
||||
|
||||
event.wait()
|
||||
|
||||
if __name__ == "__main__":
|
||||
benchmark(execute)
|
||||
8
benchmarks/single_thread_sync.py
Normal file
8
benchmarks/single_thread_sync.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from base import benchmark
|
||||
|
||||
def execute(session, query, values, num_queries):
|
||||
for i in xrange(num_queries):
|
||||
session.execute(query, values)
|
||||
|
||||
if __name__ == "__main__":
|
||||
benchmark(execute)
|
||||
Reference in New Issue
Block a user