Implementing batch queries (not tested yet)

This commit is contained in:
Blake Eggleston
2013-03-07 22:43:42 -08:00
parent 54d4b804c4
commit dc6031ae92
3 changed files with 222 additions and 101 deletions

View File

@@ -4,7 +4,7 @@ import re
from cqlengine import columns
from cqlengine.exceptions import ModelException
from cqlengine.functions import BaseQueryFunction
from cqlengine.query import QuerySet, QueryException
from cqlengine.query import QuerySet, QueryException, DMLQuery
class ModelDefinitionException(ModelException): pass
@@ -112,10 +112,13 @@ class BaseModel(object):
def get(cls, **kwargs):
return cls.objects.get(**kwargs)
def save(self):
def save(self, batch_obj=None):
is_new = self.pk is None
self.validate()
self.objects.save(self)
if batch_obj:
DMLQuery(self.__class__, self).batch(batch_obj).save()
else:
DMLQuery(self.__class__, self).save()
#reset the value managers
for v in self._values.values():
@@ -127,8 +130,13 @@ class BaseModel(object):
def delete(self):
""" Deletes this instance """
self.objects.delete_instance(self)
DMLQuery(self.__class__, self).delete()
def batch(self, batch_obj):
"""
Returns a batched DML query
"""
return DMLQuery(self.__class__, self).batch(batch_obj)
class ModelMetaClass(type):

View File

@@ -1,7 +1,9 @@
from collections import namedtuple
import copy
from datetime import datetime
from hashlib import md5
from time import time
from uuid import uuid1
from cqlengine.connection import connection_manager
from cqlengine.exceptions import CQLEngineException
@@ -28,7 +30,7 @@ class QueryOperator(object):
#the identifier is a unique key that will be used in string
#replacement on query strings, it's created from a hash
#of this object's id and the time
self.identifier = md5(str(id(self)) + str(time())).hexdigest()
self.identifier = uuid1().hex
#perform validation on this operator
self.validate_operator()
@@ -123,6 +125,59 @@ class LessThanOrEqualOperator(QueryOperator):
symbol = "LTE"
cql_symbol = '<='
class Consistency(object):
ANY = 'ANY'
ONE = 'ONE'
QUORUM = 'QUORUM'
LOCAL_QUORUM = 'LOCAL_QUORUM'
EACH_QUORUM = 'EACH_QUORUM'
ALL = 'ALL'
class BatchQuery(object):
"""
Handles the batching of queries
"""
def __init__(self, consistency=Consistency.ONE, timestamp=None):
self.queries = []
self.consistency = consistency
if timestamp is not None and not isinstance(timestamp, datetime):
raise CQLEngineException('timestamp object must be an instance of datetime')
self.timestamp = timestamp
def add_query(self, query, params):
self.queries.append((query, params))
def execute(self):
query_list = []
parameters = {}
opener = 'BEGIN BATCH USING CONSISTENCY {}'.format(self.consistency)
if self.timestamp:
epoch = datetime(1970, 1, 1)
ts = long((self.timestamp - epoch).total_seconds() * 1000)
opener += ' TIMESTAMP {}'.format(ts)
query_list = [opener]
for query, params in self.queries:
query_list.append(query)
parameters.update(params)
query_list.append('APPLY BATCH;')
with connection_manager() as con:
con.execute('\n'.join(query_list), parameters)
self.queries = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
#don't execute if there was an exception
if exc_type is not None: return
self.execute()
class QuerySet(object):
def __init__(self, model):
@@ -152,6 +207,8 @@ class QuerySet(object):
self._result_cache = None
self._result_idx = None
self._batch = None
def __unicode__(self):
return self._select_query()
@@ -242,6 +299,8 @@ class QuerySet(object):
#----Reads------
def _execute_query(self):
if self._batch:
raise CQLEngineException("Only inserts, updates, and deletes are available in batch mode")
if self._result_cache is None:
self._con = connection_manager()
self._cur = self._con.execute(self._select_query(), self._where_values())
@@ -318,6 +377,18 @@ class QuerySet(object):
instance._is_persisted = True
return instance
def batch(self, batch_obj):
"""
Adds a batch query to the mix
:param batch_obj:
:return:
"""
if not isinstance(batch_obj, BatchQuery):
raise CQLEngineException('batch_obj must be a BatchQuery instance')
clone = copy.deepcopy(self)
clone._batch = batch_obj
return clone
def first(self):
try:
return iter(self).next()
@@ -379,8 +450,6 @@ class QuerySet(object):
else:
return self[0]
def order_by(self, colname):
"""
orders the result set.
@@ -416,6 +485,8 @@ class QuerySet(object):
def count(self):
""" Returns the number of rows matched by this query """
if self._batch:
raise CQLEngineException("Only inserts, updates, and deletes are available in batch mode")
#TODO: check for previous query execution and return row count if it exists
if self._result_cache is None:
qs = ['SELECT COUNT(*)']
@@ -426,7 +497,7 @@ class QuerySet(object):
qs += ['ALLOW FILTERING']
qs = ' '.join(qs)
with connection_manager() as con:
cur = con.execute(qs, self._where_values())
return cur.fetchone()[0]
@@ -435,7 +506,7 @@ class QuerySet(object):
def limit(self, v):
"""
Sets the limit on the number of results returned
Sets the limit on the number of results returned
CQL has a default limit of 10,000
"""
if not (v is None or isinstance(v, (int, long))):
@@ -488,90 +559,8 @@ class QuerySet(object):
""" Don't load these fields for the returned query """
return self._only_or_defer('defer', fields)
#----writes----
def save(self, instance):
"""
Creates / updates a row.
This is a blind insert call.
All validation and cleaning needs to happen
prior to calling this.
"""
assert type(instance) == self.model
#organize data
value_pairs = []
values = instance.as_dict()
#get defined fields and their column names
for name, col in self.model._columns.items():
val = values.get(name)
if val is None: continue
value_pairs += [(col.db_field_name, val)]
#construct query string
field_names = zip(*value_pairs)[0]
field_values = dict(value_pairs)
qs = []
if instance._can_update():
qs += ["UPDATE {}".format(self.column_family_name)]
qs += ["SET"]
set_statements = []
#get defined fields and their column names
for name, col in self.model._columns.items():
if not col.is_primary_key:
val = values.get(name)
if val is None: continue
set_statements += ['"{0}" = :{0}'.format(col.db_field_name)]
qs += [', '.join(set_statements)]
qs += ['WHERE']
where_statements = []
for name, col in self.model._primary_keys.items():
where_statements += ['"{0}" = :{0}'.format(col.db_field_name)]
qs += [' AND '.join(where_statements)]
# clear the qs if there are not set statements
if not set_statements: qs = []
else:
qs += ["INSERT INTO {}".format(self.column_family_name)]
qs += ["({})".format(', '.join(['"{}"'.format(f) for f in field_names]))]
qs += ['VALUES']
qs += ["({})".format(', '.join([':'+f for f in field_names]))]
qs = ' '.join(qs)
# skip query execution if it's empty
# caused by pointless update queries
if qs:
with connection_manager() as con:
con.execute(qs, field_values)
#delete deleted / nulled columns
deleted = [k for k,v in instance._values.items() if v.deleted]
if deleted:
del_fields = [self.model._columns[f] for f in deleted]
del_fields = [f.db_field_name for f in del_fields if not f.primary_key]
pks = self.model._primary_keys
qs = ['DELETE {}'.format(', '.join(['"{}"'.format(f) for f in del_fields]))]
qs += ['FROM {}'.format(self.column_family_name)]
qs += ['WHERE']
eq = lambda col: '{0} = :{0}'.format(col.db_field_name)
qs += [' AND '.join([eq(f) for f in pks.values()])]
qs = ' '.join(qs)
pk_dict = dict([(v.db_field_name, getattr(instance, k)) for k,v in pks.items()])
with connection_manager() as con:
con.execute(qs, pk_dict)
def create(self, **kwargs):
return self.model(**kwargs).save()
return self.model(**kwargs).save(batch_obj=self._batch)
#----delete---
def delete(self, columns=[]):
@@ -586,18 +575,142 @@ class QuerySet(object):
qs += ['WHERE {}'.format(self._where_clause())]
qs = ' '.join(qs)
with connection_manager() as con:
con.execute(qs, self._where_values())
if self._batch:
self._batch.add_query(qs, self._where_values())
else:
with connection_manager() as con:
con.execute(qs, self._where_values())
class DMLQuery(object):
"""
A query object used for queries performing inserts, updates, or deletes
this is usually instantiated by the model instance to be modified
unlike the read query object, this is mutable
"""
def __init__(self, model, instance=None):
self.model = model
self.column_family_name = self.model.column_family_name()
self.instance = instance
self.batch = None
pass
def batch(self, batch_obj):
if not isinstance(batch_obj, BatchQuery):
raise CQLEngineException('batch_obj must be a BatchQuery instance')
self.batch = batch_obj
return self
def save(self):
"""
Creates / updates a row.
This is a blind insert call.
All validation and cleaning needs to happen
prior to calling this.
"""
if self.instance is None:
raise CQLEngineException("DML Query intance attribute is None")
assert type(self.instance) == self.model
#organize data
value_pairs = []
values = self.instance.as_dict()
#get defined fields and their column names
for name, col in self.model._columns.items():
val = values.get(name)
if val is None: continue
value_pairs += [(col.db_field_name, val)]
#construct query string
field_names = zip(*value_pairs)[0]
field_ids = {n:uuid1().hex for n in field_names}
field_values = dict(value_pairs)
query_values = {field_ids[n]:field_values[n] for n in field_names}
qs = []
if self.instance._can_update():
qs += ["UPDATE {}".format(self.column_family_name)]
qs += ["SET"]
set_statements = []
#get defined fields and their column names
for name, col in self.model._columns.items():
if not col.is_primary_key:
val = values.get(name)
if val is None: continue
set_statements += ['"{}" = :{}'.format(col.db_field_name, field_ids[col.db_field_name])]
qs += [', '.join(set_statements)]
qs += ['WHERE']
where_statements = []
for name, col in self.model._primary_keys.items():
where_statements += ['"{}" = :{}'.format(col.db_field_name, field_ids[col.db_field_name])]
qs += [' AND '.join(where_statements)]
# clear the qs if there are not set statements
if not set_statements: qs = []
else:
qs += ["INSERT INTO {}".format(self.column_family_name)]
qs += ["({})".format(', '.join(['"{}"'.format(f) for f in field_names]))]
qs += ['VALUES']
qs += ["({})".format(', '.join([':'+field_ids[f] for f in field_names]))]
def delete_instance(self, instance):
""" Deletes one instance """
pk_name = self.model._pk_name
qs = ['DELETE FROM {}'.format(self.column_family_name)]
qs += ['WHERE {0}=:{0}'.format(pk_name)]
qs = ' '.join(qs)
with connection_manager() as con:
con.execute(qs, {pk_name:instance.pk})
# skip query execution if it's empty
# caused by pointless update queries
if qs:
if self.batch:
self.batch.add_query(qs, query_values)
else:
with connection_manager() as con:
con.execute(qs, query_values)
#delete deleted / nulled columns
deleted = [k for k,v in self.instance._values.items() if v.deleted]
if deleted:
del_fields = [self.model._columns[f] for f in deleted]
del_fields = [f.db_field_name for f in del_fields if not f.primary_key]
pks = self.model._primary_keys
qs = ['DELETE {}'.format(', '.join(['"{}"'.format(f) for f in del_fields]))]
qs += ['FROM {}'.format(self.column_family_name)]
qs += ['WHERE']
eq = lambda col: '"{}" = :{}'.format(col.db_field_name, field_ids[col.db_field_name])
qs += [' AND '.join([eq(f) for f in pks.values()])]
qs = ' '.join(qs)
if self.batch:
self.batch.add_query(qs, query_values)
else:
with connection_manager() as con:
con.execute(qs, query_values)
def delete(self):
""" Deletes one instance """
if self.instance is None:
raise CQLEngineException("DML Query intance attribute is None")
field_values = {}
qs = ['DELETE FROM {}'.format(self.column_family_name)]
qs += ['WHERE']
where_statements = []
for name, col in self.model._primary_keys.items():
field_id = uuid1().hex
field_values[field_id] = col.to_database(getattr(self.instance, name))
where_statements += ['"{}" = :{}'.format(col.db_field_name, field_id)]
qs += [' AND '.join(where_statements)]
qs = ' '.join(qs)
if self.batch:
self.batch.add_query(qs, field_values)
else:
with connection_manager() as con:
con.execute(qs, field_values)

View File