@@ -19,6 +19,7 @@ import sys
|
||||
from threading import Thread
|
||||
import time
|
||||
from optparse import OptionParser
|
||||
import uuid
|
||||
|
||||
from greplin import scales
|
||||
|
||||
@@ -59,49 +60,65 @@ except ImportError as exc:
|
||||
KEYSPACE = "testkeyspace" + str(int(time.time()))
|
||||
TABLE = "testtable"
|
||||
|
||||
COLUMN_VALUES = {
|
||||
'int': 42,
|
||||
'text': "'42'",
|
||||
'float': 42.0,
|
||||
'uuid': uuid.uuid4(),
|
||||
'timestamp': "'2016-02-03 04:05+0000'"
|
||||
}
|
||||
|
||||
def setup(hosts):
|
||||
|
||||
def setup(options):
|
||||
log.info("Using 'cassandra' package from %s", cassandra.__path__)
|
||||
|
||||
cluster = Cluster(hosts, protocol_version=1)
|
||||
cluster.set_core_connections_per_host(HostDistance.LOCAL, 1)
|
||||
cluster = Cluster(options.hosts, protocol_version=options.protocol_version)
|
||||
try:
|
||||
session = cluster.connect()
|
||||
|
||||
log.debug("Creating keyspace...")
|
||||
session.execute("""
|
||||
CREATE KEYSPACE %s
|
||||
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
|
||||
""" % KEYSPACE)
|
||||
try:
|
||||
session.execute("""
|
||||
CREATE KEYSPACE %s
|
||||
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
|
||||
""" % options.keyspace)
|
||||
|
||||
log.debug("Setting keyspace...")
|
||||
session.set_keyspace(KEYSPACE)
|
||||
log.debug("Setting keyspace...")
|
||||
except cassandra.AlreadyExists:
|
||||
log.debug("Keyspace already exists")
|
||||
|
||||
session.set_keyspace(options.keyspace)
|
||||
|
||||
log.debug("Creating table...")
|
||||
session.execute("""
|
||||
CREATE TABLE %s (
|
||||
create_table_query = """
|
||||
CREATE TABLE {} (
|
||||
thekey text,
|
||||
col1 text,
|
||||
col2 text,
|
||||
PRIMARY KEY (thekey, col1)
|
||||
)
|
||||
""" % TABLE)
|
||||
"""
|
||||
for i in range(options.num_columns):
|
||||
create_table_query += "col{} {},\n".format(i, options.column_type)
|
||||
create_table_query += "PRIMARY KEY (thekey))"
|
||||
|
||||
try:
|
||||
session.execute(create_table_query.format(TABLE))
|
||||
except cassandra.AlreadyExists:
|
||||
log.debug("Table already exists.")
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def teardown(hosts):
|
||||
cluster = Cluster(hosts, protocol_version=1)
|
||||
cluster.set_core_connections_per_host(HostDistance.LOCAL, 1)
|
||||
def teardown(options):
|
||||
cluster = Cluster(options.hosts, protocol_version=options.protocol_version)
|
||||
session = cluster.connect()
|
||||
session.execute("DROP KEYSPACE " + KEYSPACE)
|
||||
if not options.keep_data:
|
||||
session.execute("DROP KEYSPACE " + options.keyspace)
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def benchmark(thread_class):
|
||||
options, args = parse_options()
|
||||
for conn_class in options.supported_reactors:
|
||||
setup(options.hosts)
|
||||
setup(options)
|
||||
log.info("==== %s ====" % (conn_class.__name__,))
|
||||
|
||||
kwargs = {'metrics_enabled': options.enable_metrics,
|
||||
@@ -109,20 +126,30 @@ def benchmark(thread_class):
|
||||
if options.protocol_version:
|
||||
kwargs['protocol_version'] = options.protocol_version
|
||||
cluster = Cluster(options.hosts, **kwargs)
|
||||
session = cluster.connect(KEYSPACE)
|
||||
session = cluster.connect(options.keyspace)
|
||||
|
||||
log.debug("Sleeping for two seconds...")
|
||||
time.sleep(2.0)
|
||||
|
||||
query = session.prepare("""
|
||||
INSERT INTO {table} (thekey, col1, col2) VALUES (?, ?, ?)
|
||||
""".format(table=TABLE))
|
||||
values = ('key', 'a', 'b')
|
||||
|
||||
# Generate the query
|
||||
if options.read:
|
||||
query = "SELECT * FROM {} WHERE thekey = '{{key}}'".format(TABLE)
|
||||
else:
|
||||
query = "INSERT INTO {} (thekey".format(TABLE)
|
||||
for i in range(options.num_columns):
|
||||
query += ", col{}".format(i)
|
||||
|
||||
query += ") VALUES ('{key}'"
|
||||
for i in range(options.num_columns):
|
||||
query += ", {}".format(COLUMN_VALUES[options.column_type])
|
||||
query += ")"
|
||||
|
||||
values = None # we don't use that anymore. Keeping it in case we go back to prepared statements.
|
||||
per_thread = options.num_ops // options.threads
|
||||
threads = []
|
||||
|
||||
log.debug("Beginning inserts...")
|
||||
log.debug("Beginning {}...".format('reads' if options.read else 'inserts'))
|
||||
start = time.time()
|
||||
try:
|
||||
for i in range(options.threads):
|
||||
@@ -142,7 +169,7 @@ def benchmark(thread_class):
|
||||
end = time.time()
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
teardown(options.hosts)
|
||||
teardown(options)
|
||||
|
||||
total = end - start
|
||||
log.info("Total time: %0.2fs" % total)
|
||||
@@ -190,8 +217,19 @@ def parse_options():
|
||||
help='logging level: debug, info, warning, or error')
|
||||
parser.add_option('-p', '--profile', action='store_true', dest='profile',
|
||||
help='Profile the run')
|
||||
parser.add_option('--protocol-version', type='int', dest='protocol_version',
|
||||
parser.add_option('--protocol-version', type='int', dest='protocol_version', default=4,
|
||||
help='Native protocol version to use')
|
||||
parser.add_option('-c', '--num-columns', type='int', dest='num_columns', default=2,
|
||||
help='Specify the number of columns for the schema')
|
||||
parser.add_option('-k', '--keyspace', type='str', dest='keyspace', default=KEYSPACE,
|
||||
help='Specify the keyspace name for the schema')
|
||||
parser.add_option('--keep-data', action='store_true', dest='keep_data', default=False,
|
||||
help='Keep the data after the benchmark')
|
||||
parser.add_option('--column-type', type='str', dest='column_type', default='text',
|
||||
help='Specify the column type for the schema (supported: int, text, float, uuid, timestamp)')
|
||||
parser.add_option('--read', action='store_true', dest='read', default=False,
|
||||
help='Read mode')
|
||||
|
||||
|
||||
options, args = parser.parse_args()
|
||||
|
||||
@@ -235,6 +273,9 @@ class BenchmarkThread(Thread):
|
||||
if self.profiler:
|
||||
self.profiler.enable()
|
||||
|
||||
def run_query(self, key, **kwargs):
|
||||
return self.session.execute_async(self.query.format(key=key), **kwargs)
|
||||
|
||||
def finish_profile(self):
|
||||
if self.profiler:
|
||||
self.profiler.disable()
|
||||
|
||||
@@ -41,8 +41,10 @@ class Runner(BenchmarkThread):
|
||||
if next(self.num_finished) >= self.num_queries:
|
||||
self.event.set()
|
||||
|
||||
if next(self.num_started) <= self.num_queries:
|
||||
future = self.session.execute_async(self.query, self.values, timeout=None)
|
||||
i = next(self.num_started)
|
||||
if i <= self.num_queries:
|
||||
key = "{}-{}".format(self.thread_num, i)
|
||||
future = self.run_query(key, timeout=None)
|
||||
future.add_callbacks(self.insert_next, self.insert_next)
|
||||
|
||||
def run(self):
|
||||
|
||||
@@ -35,7 +35,8 @@ class Runner(BenchmarkThread):
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
future = self.session.execute_async(self.query, self.values)
|
||||
key = "{}-{}".format(self.thread_num, i)
|
||||
future = self.run_query(key)
|
||||
futures.put_nowait(future)
|
||||
|
||||
while True:
|
||||
|
||||
@@ -31,7 +31,8 @@ class Runner(BenchmarkThread):
|
||||
old_future = futures.get_nowait()
|
||||
old_future.result()
|
||||
|
||||
future = self.session.execute_async(self.query, self.values)
|
||||
key = "{}-{}".format(self.thread_num, i)
|
||||
future = self.run_query(key)
|
||||
futures.put_nowait(future)
|
||||
|
||||
while True:
|
||||
|
||||
@@ -25,8 +25,9 @@ class Runner(BenchmarkThread):
|
||||
|
||||
self.start_profile()
|
||||
|
||||
for _ in range(self.num_queries):
|
||||
future = self.session.execute_async(self.query, self.values)
|
||||
for i in range(self.num_queries):
|
||||
key = "{}-{}".format(self.thread_num, i)
|
||||
future = self.run_query(key)
|
||||
futures.append(future)
|
||||
|
||||
for future in futures:
|
||||
|
||||
Reference in New Issue
Block a user