From 3e379abf8fc2a26d2907eb043b6f08769060362e Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Thu, 3 Mar 2016 13:22:52 -0500 Subject: [PATCH] Add QuerySet.fetch_size() support --- cassandra/cqlengine/connection.py | 3 +-- cassandra/cqlengine/query.py | 28 +++++++++++++++++++++++++++- cassandra/cqlengine/statements.py | 10 +++++++--- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/cassandra/cqlengine/connection.py b/cassandra/cqlengine/connection.py index 7d53af69..1e043822 100644 --- a/cassandra/cqlengine/connection.py +++ b/cassandra/cqlengine/connection.py @@ -162,8 +162,7 @@ def execute(query, params=None, consistency_level=None, timeout=NOT_SET): elif isinstance(query, BaseCQLStatement): params = query.get_context() - query = str(query) - query = SimpleStatement(query, consistency_level=consistency_level) + query = SimpleStatement(str(query), consistency_level=consistency_level, fetch_size=query.fetch_size) elif isinstance(query, six.string_types): query = SimpleStatement(query, consistency_level=consistency_level) diff --git a/cassandra/cqlengine/query.py b/cassandra/cqlengine/query.py index 38918ced..963c4dce 100644 --- a/cassandra/cqlengine/query.py +++ b/cassandra/cqlengine/query.py @@ -302,6 +302,7 @@ class AbstractQuerySet(object): self._if_not_exists = False self._timeout = connection.NOT_SET self._if_exists = False + self._fetch_size = None @property def column_family_name(self): @@ -369,7 +370,8 @@ class AbstractQuerySet(object): order_by=self._order, limit=self._limit, allow_filtering=self._allow_filtering, - distinct_fields=self._distinct_fields + distinct_fields=self._distinct_fields, + fetch_size=self._fetch_size ) # ----Reads------ @@ -774,6 +776,30 @@ class AbstractQuerySet(object): clone._limit = v return clone + def fetch_size(self, v): + """ + Sets the number of rows that are fetched at a time. + + *Note that driver's default fetch size is 5000. + + .. code-block:: python + + for user in User.objects().fetch_size(500): + print(user) + """ + + if not isinstance(v, six.integer_types): + raise TypeError + if v == self._fetch_size: + return self + + if v < 1: + raise QueryException("fetch size less than 1 is not allowed") + + clone = copy.deepcopy(self) + clone._fetch_size = v + return clone + def allow_filtering(self): """ Enables the (usually) unwise practive of querying on a clustering key without also defining a partition key diff --git a/cassandra/cqlengine/statements.py b/cassandra/cqlengine/statements.py index 7bf6f27f..73f9197e 100644 --- a/cassandra/cqlengine/statements.py +++ b/cassandra/cqlengine/statements.py @@ -16,6 +16,7 @@ from datetime import datetime, timedelta import time import six +from cassandra.query import FETCH_SIZE_UNSET from cassandra.cqlengine import UnicodeMixin from cassandra.cqlengine.functions import QueryValue from cassandra.cqlengine.operators import BaseWhereOperator, InOperator @@ -470,13 +471,14 @@ class MapDeleteClause(BaseDeleteClause): class BaseCQLStatement(UnicodeMixin): """ The base cql statement class """ - def __init__(self, table, consistency=None, timestamp=None, where=None): + def __init__(self, table, consistency=None, timestamp=None, where=None, fetch_size=None): super(BaseCQLStatement, self).__init__() self.table = table self.consistency = consistency self.context_id = 0 self.context_counter = self.context_id self.timestamp = timestamp + self.fetch_size = fetch_size if fetch_size else FETCH_SIZE_UNSET self.where_clauses = [] for clause in where or []: @@ -556,7 +558,8 @@ class SelectStatement(BaseCQLStatement): order_by=None, limit=None, allow_filtering=False, - distinct_fields=None): + distinct_fields=None, + fetch_size=None): """ :param where @@ -565,7 +568,8 @@ class SelectStatement(BaseCQLStatement): super(SelectStatement, self).__init__( table, consistency=consistency, - where=where + where=where, + fetch_size=fetch_size ) self.fields = [fields] if isinstance(fields, six.string_types) else (fields or [])