Merge branch 'master' into 2.1-support

Conflicts:
	cassandra/protocol.py
This commit is contained in:
Tyler Hobbs
2014-06-17 11:57:02 -05:00
19 changed files with 375 additions and 184 deletions

View File

@@ -1,11 +1,40 @@
2.0.3
=====
In Progress
Features
--------
* Use io.BytesIO for reduced CPU consumption (github #143)
Bug Fixes
---------
* Fix references to xrange that do not go through "six" in
libevreactor and geventreactor (github #138)
* Make BoundStatements inherit fetch_size from their parent
PreparedStatement (PYTHON-80)
* Clear reactor state in child process after forking
to prevent errors with multiprocessing when the parent
process has connected a Cluster before forking (github #141)
2.0.2
=====
June 10, 2014
Bug Fixes
---------
* Add six to requirements.txt
* Avoid KeyError during schema refresh when a keyspace is dropped
and TokenAwarePolicy is not in use
* Avoid registering multiple atexit cleanup functions when the
asyncore event loop is restarted multiple times
* Delay initialization of reactors in order to avoid problems
with shared state when using multiprocessing (PYTHON-60)
* Add python-six to debian dependencies, move python-blist to
recommends
* Fix memory leak when libev connections are created and
destroyed (github #93)
* Ensure token map is rebuilt when hosts are removed from
the cluster
2.0.1
=====
@@ -103,6 +132,9 @@ now:
* cassandra.decoder.dict_factory has moved to cassandra.query.dict_factory
* cassandra.decoder.ordered_dict_factory has moved to cassandra.query.ordered_dict_factory
Exceptions that were in cassandra.decoder have been moved to cassandra.protocol. If
you handle any of these exceptions, you must adjust the code accordingly.
1.1.2
=====
May 8, 2014

View File

@@ -1,6 +1,8 @@
Releasing
=========
* Run the tests and ensure they all pass
* If dependencies have changed, make sure ``debian/control``
is up to date
* Update CHANGELOG.rst
* Update the version in ``cassandra/__init__.py``
* Commit the changelog and version changes

View File

@@ -23,7 +23,7 @@ class NullHandler(logging.Handler):
logging.getLogger('cassandra').addHandler(NullHandler())
__version_info__ = (2, 0, 1, 'post')
__version_info__ = (2, 0, 2, 'post')
__version__ = '.'.join(map(str, __version_info__))
@@ -172,6 +172,11 @@ class Timeout(Exception):
class ReadTimeout(Timeout):
"""
A subclass of :exc:`Timeout` for read operations.
This indicates that the replicas failed to respond to the coordinator
node before the configured timeout. This timeout is configured in
``cassandra.yaml`` with the ``read_request_timeout_in_ms``
and ``range_request_timeout_in_ms`` options.
"""
data_retrieved = None
@@ -189,6 +194,11 @@ class ReadTimeout(Timeout):
class WriteTimeout(Timeout):
"""
A subclass of :exc:`Timeout` for write operations.
This indicates that the replicas failed to respond to the coordinator
node before the configured timeout. This timeout is configured in
``cassandra.yaml`` with the ``write_request_timeout_in_ms``
option.
"""
write_type = None

View File

@@ -530,6 +530,7 @@ class Cluster(object):
raise Exception("Cluster is already shut down")
if not self._is_setup:
self.connection_class.initialize_reactor()
atexit.register(partial(_shutdown_cluster, self))
for address in self.contact_points:
host = self.add_host(address, signal=False)
@@ -1708,17 +1709,18 @@ class ControlConnection(object):
else:
self._cluster.metadata.rebuild_schema(ks_result, cf_result, col_result)
def refresh_node_list_and_token_map(self):
def refresh_node_list_and_token_map(self, force_token_rebuild=False):
try:
if self._connection:
self._refresh_node_list_and_token_map(self._connection)
self._refresh_node_list_and_token_map(self._connection, force_token_rebuild=force_token_rebuild)
except ReferenceError:
pass # our weak reference to the Cluster is no good
except Exception:
log.debug("[control connection] Error refreshing node list and token map", exc_info=True)
self._signal_error()
def _refresh_node_list_and_token_map(self, connection, preloaded_results=None):
def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
force_token_rebuild=False):
if preloaded_results:
log.debug("[control connection] Refreshing node list and token map using preloaded results")
peers_result = preloaded_results[0]
@@ -1753,7 +1755,7 @@ class ControlConnection(object):
if partitioner and tokens:
token_map[host] = tokens
should_rebuild_token_map = False
should_rebuild_token_map = force_token_rebuild
found_hosts = set()
for row in peers_result:
addr = row.get("rpc_address")
@@ -1779,12 +1781,11 @@ class ControlConnection(object):
token_map[host] = tokens
for old_host in self._cluster.metadata.all_hosts():
if old_host.address != connection.host and \
old_host.address not in found_hosts and \
old_host.address not in self._cluster.contact_points:
log.debug("[control connection] Found host that has been removed: %r", old_host)
if old_host.address != connection.host and old_host.address not in found_hosts:
should_rebuild_token_map = True
self._cluster.remove_host(old_host)
if old_host.address not in self._cluster.contact_points:
log.debug("[control connection] Found host that has been removed: %r", old_host)
self._cluster.remove_host(old_host)
log.debug("[control connection] Finished fetching ring info")
if partitioner and should_rebuild_token_map:
@@ -1963,10 +1964,10 @@ class ControlConnection(object):
self.reconnect()
def on_add(self, host):
self.refresh_node_list_and_token_map()
self.refresh_node_list_and_token_map(force_token_rebuild=True)
def on_remove(self, host):
self.refresh_node_list_and_token_map()
self.refresh_node_list_and_token_map(force_token_rebuild=True)
def _stop_scheduler(scheduler, thread):

View File

@@ -195,6 +195,22 @@ class Connection(object):
self.lock = RLock()
@classmethod
def initialize_reactor(self):
"""
Called once by Cluster.connect(). This should be used by implementations
to set up any resources that will be shared across connections.
"""
pass
@classmethod
def handle_fork(self):
"""
Called after a forking. This should cleanup any remaining reactor state
from the parent process.
"""
pass
def close(self):
raise NotImplementedError()

View File

@@ -12,16 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import # to enable import io from stdlib
import atexit
from collections import deque
from functools import partial
import io
import logging
import os
import socket
import sys
from threading import Event, Lock, Thread
import weakref
from six import BytesIO
from six.moves import range
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, EINVAL, EISCONN, errorcode
@@ -46,15 +48,27 @@ from cassandra.marshal import int32_unpack
log = logging.getLogger(__name__)
def _cleanup(loop_weakref):
try:
loop = loop_weakref()
except ReferenceError:
return
loop._cleanup()
class AsyncoreLoop(object):
def __init__(self):
self._pid = os.getpid()
self._loop_lock = Lock()
self._started = False
self._shutdown = False
self._conns_lock = Lock()
self._conns = WeakSet()
self._thread = None
atexit.register(partial(_cleanup, weakref.ref(self)))
def maybe_start(self):
should_start = False
@@ -69,10 +83,9 @@ class AsyncoreLoop(object):
self._loop_lock.release()
if should_start:
thread = Thread(target=self._run_loop, name="cassandra_driver_event_loop")
thread.daemon = True
thread.start()
atexit.register(partial(self._cleanup, thread))
self._thread = Thread(target=self._run_loop, name="cassandra_driver_event_loop")
self._thread.daemon = True
self._thread.start()
def _run_loop(self):
log.debug("Starting asyncore event loop")
@@ -95,11 +108,14 @@ class AsyncoreLoop(object):
log.debug("Asyncore event loop ended")
def _cleanup(self, thread):
def _cleanup(self):
self._shutdown = True
if not self._thread:
return
log.debug("Waiting for event loop thread to join...")
thread.join(timeout=1.0)
if thread.is_alive():
self._thread.join(timeout=1.0)
if self._thread.is_alive():
log.warning(
"Event loop thread could not be joined, so shutdown may not be clean. "
"Please call Cluster.shutdown() to avoid this.")
@@ -121,12 +137,29 @@ class AsyncoreConnection(Connection, asyncore.dispatcher):
module in the Python standard library for its event loop.
"""
_loop = AsyncoreLoop()
_loop = None
_total_reqd_bytes = 0
_writable = False
_readable = False
@classmethod
def initialize_reactor(cls):
if not cls._loop:
cls._loop = AsyncoreLoop()
else:
current_pid = os.getpid()
if cls._loop._pid != current_pid:
log.debug("Detected fork, clearing and reinitializing reactor state")
cls.handle_fork()
cls._loop = AsyncoreLoop()
@classmethod
def handle_fork(cls):
if cls._loop:
cls._loop._cleanup()
cls._loop = None
@classmethod
def factory(cls, *args, **kwargs):
timeout = kwargs.pop('timeout', 5.0)
@@ -145,7 +178,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher):
asyncore.dispatcher.__init__(self)
self.connected_event = Event()
self._iobuf = BytesIO()
self._iobuf = io.BytesIO()
self._callbacks = {}
self.deque = deque()
@@ -298,7 +331,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher):
# leave leftover in current buffer
leftover = self._iobuf.read()
self._iobuf = BytesIO()
self._iobuf = io.BytesIO()
self._iobuf.write(leftover)
self._total_reqd_bytes = 0

View File

@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import # to enable import io from stdlib
import gevent
from gevent import select, socket
from gevent.event import Event
@@ -19,13 +20,11 @@ from gevent.queue import Queue
from collections import defaultdict
from functools import partial
import io
import logging
import os
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO # ignore flake8 warning: # NOQA
from six.moves import xrange
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, EINVAL
@@ -72,7 +71,7 @@ class GeventConnection(Connection):
Connection.__init__(self, *args, **kwargs)
self.connected_event = Event()
self._iobuf = StringIO()
self._iobuf = io.BytesIO()
self._write_queue = Queue()
self._callbacks = {}
@@ -178,7 +177,7 @@ class GeventConnection(Connection):
# leave leftover in current buffer
leftover = self._iobuf.read()
self._iobuf = StringIO()
self._iobuf = io.BytesIO()
self._iobuf.write(leftover)
self._total_reqd_bytes = 0

View File

@@ -12,15 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import # to enable import io from stdlib
import atexit
from collections import deque
from functools import partial
import io
import logging
import os
import socket
from threading import Event, Lock, Thread
import weakref
from six import BytesIO
from six.moves import xrange
from cassandra import OperationTimedOut
from cassandra.connection import Connection, ConnectionShutdown, NONBLOCKING
@@ -46,81 +49,197 @@ except ImportError:
log = logging.getLogger(__name__)
class LibevConnection(Connection):
"""
An implementation of :class:`.Connection` that uses libev for its event loop.
"""
_loop = libev.Loop()
_loop_notifier = libev.Async(_loop)
_loop_notifier.start()
def _cleanup(loop_weakref):
try:
loop = loop_weakref()
except ReferenceError:
return
# prevent _loop_notifier from keeping the loop from returning
_loop.unref()
loop._cleanup()
_loop_started = None
_loop_lock = Lock()
_loop_shutdown = False
@classmethod
def _run_loop(cls):
class LibevLoop(object):
def __init__(self):
self._pid = os.getpid()
self._loop = libev.Loop()
self._notifier = libev.Async(self._loop)
self._notifier.start()
# prevent _notifier from keeping the loop from returning
self._loop.unref()
self._started = False
self._shutdown = False
self._lock = Lock()
self._thread = None
# set of all connections; only replaced with a new copy
# while holding _conn_set_lock, never modified in place
self._live_conns = set()
# newly created connections that need their write/read watcher started
self._new_conns = set()
# recently closed connections that need their write/read watcher stopped
self._closed_conns = set()
self._conn_set_lock = Lock()
self._preparer = libev.Prepare(self._loop, self._loop_will_run)
# prevent _preparer from keeping the loop from returning
self._loop.unref()
self._preparer.start()
atexit.register(partial(_cleanup, weakref.ref(self)))
def notify(self):
self._notifier.send()
def maybe_start(self):
should_start = False
with self._lock:
if not self._started:
log.debug("Starting libev event loop")
self._started = True
should_start = True
if should_start:
self._thread = Thread(target=self._run_loop, name="event_loop")
self._thread.daemon = True
self._thread.start()
self._notifier.send()
def _run_loop(self):
while True:
end_condition = cls._loop.start()
end_condition = self._loop.start()
# there are still active watchers, no deadlock
with cls._loop_lock:
if not cls._loop_shutdown and (end_condition or cls._live_conns):
with self._lock:
if not self._shutdown and (end_condition or self._live_conns):
log.debug("Restarting event loop")
continue
else:
# all Connections have been closed, no active watchers
log.debug("All Connections currently closed, event loop ended")
cls._loop_started = False
self._started = False
break
@classmethod
def _maybe_start_loop(cls):
should_start = False
with cls._loop_lock:
if not cls._loop_started:
log.debug("Starting libev event loop")
cls._loop_started = True
should_start = True
def _cleanup(self):
self._shutdown = True
if not self._thread:
return
if should_start:
t = Thread(target=cls._run_loop, name="event_loop")
t.daemon = True
t.start()
atexit.register(partial(cls._cleanup, t))
for conn in self._live_conns | self._new_conns | self._closed_conns:
conn.close()
if conn._write_watcher:
conn._write_watcher.stop()
del conn._write_watcher
if conn._read_watcher:
conn._read_watcher.stop()
del conn._read_watcher
return should_start
@classmethod
def _cleanup(cls, thread):
cls._loop_shutdown = True
log.debug("Waiting for event loop thread to join...")
thread.join(timeout=1.0)
if thread.is_alive():
self._thread.join(timeout=1.0)
if self._thread.is_alive():
log.warning(
"Event loop thread could not be joined, so shutdown may not be clean. "
"Please call Cluster.shutdown() to avoid this.")
log.debug("Event loop thread was joined")
self._loop = None
# class-level set of all connections; only replaced with a new copy
# while holding _conn_set_lock, never modified in place
_live_conns = set()
# newly created connections that need their write/read watcher started
_new_conns = set()
# recently closed connections that need their write/read watcher stopped
_closed_conns = set()
_conn_set_lock = Lock()
def connection_created(self, conn):
with self._conn_set_lock:
new_live_conns = self._live_conns.copy()
new_live_conns.add(conn)
self._live_conns = new_live_conns
new_new_conns = self._new_conns.copy()
new_new_conns.add(conn)
self._new_conns = new_new_conns
def connection_destroyed(self, conn):
with self._conn_set_lock:
new_live_conns = self._live_conns.copy()
new_live_conns.discard(conn)
self._live_conns = new_live_conns
new_closed_conns = self._closed_conns.copy()
new_closed_conns.add(conn)
self._closed_conns = new_closed_conns
self._notifier.send()
def _loop_will_run(self, prepare):
changed = False
for conn in self._live_conns:
if not conn.deque and conn._write_watcher_is_active:
if conn._write_watcher:
conn._write_watcher.stop()
conn._write_watcher_is_active = False
changed = True
elif conn.deque and not conn._write_watcher_is_active:
conn._write_watcher.start()
conn._write_watcher_is_active = True
changed = True
if self._new_conns:
with self._conn_set_lock:
to_start = self._new_conns
self._new_conns = set()
for conn in to_start:
conn._read_watcher.start()
changed = True
if self._closed_conns:
with self._conn_set_lock:
to_stop = self._closed_conns
self._closed_conns = set()
for conn in to_stop:
if conn._write_watcher:
conn._write_watcher.stop()
# clear reference cycles from IO callback
del conn._write_watcher
if conn._read_watcher:
conn._read_watcher.stop()
# clear reference cycles from IO callback
del conn._read_watcher
changed = True
if changed:
self._notifier.send()
class LibevConnection(Connection):
"""
An implementation of :class:`.Connection` that uses libev for its event loop.
"""
_libevloop = None
_write_watcher_is_active = False
_total_reqd_bytes = 0
_read_watcher = None
_write_watcher = None
_socket = None
@classmethod
def initialize_reactor(cls):
if not cls._libevloop:
cls._libevloop = LibevLoop()
else:
if cls._libevloop._pid != os.getpid():
log.debug("Detected fork, clearing and reinitializing reactor state")
cls.handle_fork()
cls._libevloop = LibevLoop()
@classmethod
def handle_fork(cls):
if cls._libevloop:
cls._libevloop._cleanup()
cls._libevloop = None
@classmethod
def factory(cls, *args, **kwargs):
timeout = kwargs.pop('timeout', 5.0)
@@ -134,73 +253,11 @@ class LibevConnection(Connection):
else:
return conn
@classmethod
def _connection_created(cls, conn):
with cls._conn_set_lock:
new_live_conns = cls._live_conns.copy()
new_live_conns.add(conn)
cls._live_conns = new_live_conns
new_new_conns = cls._new_conns.copy()
new_new_conns.add(conn)
cls._new_conns = new_new_conns
@classmethod
def _connection_destroyed(cls, conn):
with cls._conn_set_lock:
new_live_conns = cls._live_conns.copy()
new_live_conns.discard(conn)
cls._live_conns = new_live_conns
new_closed_conns = cls._closed_conns.copy()
new_closed_conns.add(conn)
cls._closed_conns = new_closed_conns
@classmethod
def loop_will_run(cls, prepare):
changed = False
for conn in cls._live_conns:
if not conn.deque and conn._write_watcher_is_active:
if conn._write_watcher:
conn._write_watcher.stop()
conn._write_watcher_is_active = False
changed = True
elif conn.deque and not conn._write_watcher_is_active:
conn._write_watcher.start()
conn._write_watcher_is_active = True
changed = True
if cls._new_conns:
with cls._conn_set_lock:
to_start = cls._new_conns
cls._new_conns = set()
for conn in to_start:
conn._read_watcher.start()
changed = True
if cls._closed_conns:
with cls._conn_set_lock:
to_stop = cls._closed_conns
cls._closed_conns = set()
for conn in to_stop:
if conn._write_watcher:
conn._write_watcher.stop()
if conn._read_watcher:
conn._read_watcher.stop()
changed = True
if changed:
cls._loop_notifier.send()
def __init__(self, *args, **kwargs):
Connection.__init__(self, *args, **kwargs)
self.connected_event = Event()
self._iobuf = BytesIO()
self._iobuf = io.BytesIO()
self._callbacks = {}
self.deque = deque()
@@ -219,17 +276,16 @@ class LibevConnection(Connection):
for args in self.sockopts:
self._socket.setsockopt(*args)
with self._loop_lock:
self._read_watcher = libev.IO(self._socket.fileno(), libev.EV_READ, self._loop, self.handle_read)
self._write_watcher = libev.IO(self._socket.fileno(), libev.EV_WRITE, self._loop, self.handle_write)
with self._libevloop._lock:
self._read_watcher = libev.IO(self._socket.fileno(), libev.EV_READ, self._libevloop._loop, self.handle_read)
self._write_watcher = libev.IO(self._socket.fileno(), libev.EV_WRITE, self._libevloop._loop, self.handle_write)
self._send_options_message()
self.__class__._connection_created(self)
self._libevloop.connection_created(self)
# start the global event loop if needed
self._maybe_start_loop()
self._loop_notifier.send()
self._libevloop.maybe_start()
def close(self):
with self.lock:
@@ -238,9 +294,9 @@ class LibevConnection(Connection):
self.is_closed = True
log.debug("Closing connection (%s) to %s", id(self), self.host)
self.__class__._connection_destroyed(self)
self._loop_notifier.send()
self._libevloop.connection_destroyed(self)
self._socket.close()
log.debug("Closed socket to %s", self.host)
# don't leave in-progress operations hanging
if not self.is_defunct:
@@ -326,7 +382,7 @@ class LibevConnection(Connection):
# leave leftover in current buffer
leftover = self._iobuf.read()
self._iobuf = BytesIO()
self._iobuf = io.BytesIO()
self._iobuf.write(leftover)
self._total_reqd_bytes = 0
@@ -349,7 +405,7 @@ class LibevConnection(Connection):
with self._deque_lock:
self.deque.extend(chunks)
self._loop_notifier.send()
self._libevloop.notify()
def register_watcher(self, event_type, callback, register_timeout=None):
self._push_watchers[event_type].add(callback)
@@ -361,9 +417,3 @@ class LibevConnection(Connection):
self._push_watchers[event_type].add(callback)
self.wait_for_response(
RegisterMessage(event_list=type_callback_dict.keys()), timeout=register_timeout)
_preparer = libev.Prepare(LibevConnection._loop, LibevConnection.loop_will_run)
# prevent _preparer from keeping the loop from returning
LibevConnection._loop.unref()
_preparer.start()

View File

@@ -8,6 +8,7 @@ typedef struct libevwrapper_Loop {
static void
Loop_dealloc(libevwrapper_Loop *self) {
ev_loop_destroy(self->loop);
Py_TYPE(self)->tp_free((PyObject *)self);
};
@@ -17,9 +18,9 @@ Loop_new(PyTypeObject *type, PyObject *args, PyObject *kwds) {
self = (libevwrapper_Loop *)type->tp_alloc(type, 0);
if (self != NULL) {
self->loop = ev_default_loop(EVBACKEND_SELECT);
self->loop = ev_loop_new(EVBACKEND_SELECT);
if (!self->loop) {
PyErr_SetString(PyExc_Exception, "Error getting default ev loop");
PyErr_SetString(PyExc_Exception, "Error getting new ev loop");
Py_DECREF(self);
return NULL;
}
@@ -133,7 +134,8 @@ IO_init(libevwrapper_IO *self, PyObject *args, PyObject *kwds) {
PyObject *socket;
PyObject *callback;
PyObject *loop;
int io_flags = 0;
int io_flags = 0, fd = -1;
struct ev_io *io = NULL;
if (!PyArg_ParseTuple(args, "OiOO", &socket, &io_flags, &loop, &callback)) {
return -1;
@@ -154,14 +156,14 @@ IO_init(libevwrapper_IO *self, PyObject *args, PyObject *kwds) {
self->callback = callback;
}
int fd = PyObject_AsFileDescriptor(socket);
fd = PyObject_AsFileDescriptor(socket);
if (fd == -1) {
PyErr_SetString(PyExc_TypeError, "unable to get file descriptor from socket");
Py_XDECREF(callback);
Py_XDECREF(loop);
return -1;
}
struct ev_io *io = &(self->io);
io = &(self->io);
ev_io_init(io, io_callback, fd, io_flags);
self->io.data = self;
return 0;
@@ -246,6 +248,7 @@ typedef struct libevwrapper_Async {
static void
Async_dealloc(libevwrapper_Async *self) {
Py_XDECREF(self->loop);
Py_TYPE(self)->tp_free((PyObject *)self);
};
@@ -254,8 +257,9 @@ static void async_callback(EV_P_ ev_async *watcher, int revents) {};
static int
Async_init(libevwrapper_Async *self, PyObject *args, PyObject *kwds) {
PyObject *loop;
static char *kwlist[] = {"loop", NULL};
struct ev_async *async = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O", kwlist, &loop)) {
PyErr_SetString(PyExc_TypeError, "unable to get file descriptor from socket");
return -1;
@@ -267,7 +271,7 @@ Async_init(libevwrapper_Async *self, PyObject *args, PyObject *kwds) {
} else {
return -1;
}
struct ev_async *async = &(self->async);
async = &(self->async);
ev_async_init(async, async_callback);
return 0;
};
@@ -345,11 +349,11 @@ Prepare_dealloc(libevwrapper_Prepare *self) {
static void prepare_callback(struct ev_loop *loop, ev_prepare *watcher, int revents) {
libevwrapper_Prepare *self = watcher->data;
PyObject *result = NULL;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
PyObject *result = PyObject_CallFunction(self->callback, "O", self);
gstate = PyGILState_Ensure();
result = PyObject_CallFunction(self->callback, "O", self);
if (!result) {
PyErr_WriteUnraisable(self->callback);
}
@@ -362,6 +366,7 @@ static int
Prepare_init(libevwrapper_Prepare *self, PyObject *args, PyObject *kwds) {
PyObject *callback;
PyObject *loop;
struct ev_prepare *prepare = NULL;
if (!PyArg_ParseTuple(args, "OO", &loop, &callback)) {
return -1;
@@ -383,7 +388,7 @@ Prepare_init(libevwrapper_Prepare *self, PyObject *args, PyObject *kwds) {
Py_INCREF(callback);
self->callback = callback;
}
struct ev_prepare *prepare = &(self->prepare);
prepare = &(self->prepare);
ev_prepare_init(prepare, prepare_callback);
self->prepare.data = self;
return 0;
@@ -478,6 +483,8 @@ void
initlibevwrapper(void)
#endif
{
PyObject *module = NULL;
if (PyType_Ready(&libevwrapper_LoopType) < 0)
INITERROR;
@@ -494,9 +501,9 @@ initlibevwrapper(void)
INITERROR;
# if PY_MAJOR_VERSION >= 3
PyObject *module = PyModule_Create(&moduledef);
module = PyModule_Create(&moduledef);
# else
PyObject *module = Py_InitModule3("libevwrapper", module_methods, module_doc);
module = Py_InitModule3("libevwrapper", module_methods, module_doc);
# endif
if (module == NULL)

View File

@@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import # to enable import io from stdlib
import logging
import socket
from uuid import UUID
import six
from six.moves import range
import io
from cassandra import (Unavailable, WriteTimeout, ReadTimeout,
AlreadyExists, InvalidRequest, Unauthorized,
@@ -69,7 +71,7 @@ class _MessageType(object):
tracing = False
def to_binary(self, stream_id, protocol_version, compression=None):
body = six.BytesIO()
body = io.BytesIO()
self.send_body(body, protocol_version)
body = body.getvalue()
@@ -80,7 +82,7 @@ class _MessageType(object):
if self.tracing:
flags |= TRACING_FLAG
msg = six.BytesIO()
msg = io.BytesIO()
write_header(msg, protocol_version, flags, stream_id, self.opcode, len(body))
msg.write(body)
@@ -105,7 +107,7 @@ def decode_response(protocol_version, stream_id, flags, opcode, body, decompress
body = decompressor(body)
flags ^= COMPRESSED_FLAG
body = six.BytesIO(body)
body = io.BytesIO(body)
if flags & TRACING_FLAG:
trace_id = UUID(bytes=body.read(16))
flags ^= TRACING_FLAG
@@ -228,7 +230,7 @@ class TruncateError(RequestExecutionException):
class WriteTimeoutErrorMessage(RequestExecutionException):
summary = 'Timeout during write request'
summary = "Coordinator node timed out waiting for replica nodes' responses"
error_code = 0x1100
@staticmethod
@@ -245,7 +247,7 @@ class WriteTimeoutErrorMessage(RequestExecutionException):
class ReadTimeoutErrorMessage(RequestExecutionException):
summary = 'Timeout during read request'
summary = "Coordinator node timed out waiting for replica nodes' responses"
error_code = 0x1200
@staticmethod

View File

@@ -109,12 +109,12 @@ def dict_factory(colnames, rows):
Example::
>>> from cassandra.query import named_tuple_factory
>>> from cassandra.query import dict_factory
>>> session = cluster.connect('mykeyspace')
>>> session.row_factory = dict_factory
>>> rows = session.execute("SELECT name, age FROM users LIMIT 1")
>>> print rows[0]
{'age': 42, 'name': 'Bob'}
{u'age': 42, u'name': u'Bob'}
.. versionchanged:: 2.0.0
moved from ``cassandra.decoder`` to ``cassandra.query``
@@ -397,6 +397,7 @@ class BoundStatement(Statement):
"""
self.consistency_level = prepared_statement.consistency_level
self.serial_consistency_level = prepared_statement.serial_consistency_level
self.fetch_size = prepared_statement.fetch_size
self.prepared_statement = prepared_statement
self.values = []

12
debian/changelog vendored
View File

@@ -1,3 +1,15 @@
python-cassandra-driver (2.0.2-1) unstable; urgency=low
* Release 2.0.2
-- Tyler Hobbs <tyler@datastax.com> Tue, 10 Jun 2014 16:22:23 -0500
python-cassandra-driver (2.0.2~prerelease-1) unstable; urgency=low
* Update dependencies for 2.0.x
-- Tyler Hobbs <tyler@datastax.com> Tue, 03 Jun 2014 15:16:53 -0500
python-cassandra-driver (1.1.0~prerelease-1) unstable; urgency=low
* Initial packaging

8
debian/control vendored
View File

@@ -6,16 +6,16 @@ Build-Depends: python-all-dev (>= 2.6.6-3), python-all-dbg, debhelper (>= 9),
python-sphinx (>= 1.0.7+dfsg) | python3-sphinx, libev-dev,
python-concurrent.futures | python-futures, python-setuptools,
python-nose, python-mock, python-yaml, python-gevent,
python-blist, python-tz
python-blist, python-tz, python-six (>= 1.6)
X-Python-Version: >= 2.7
Standards-Version: 3.9.4
Package: python-cassandra-driver
Architecture: any
Depends: ${misc:Depends}, ${python:Depends}, ${shlibs:Depends}, python-blist,
python-concurrent.futures | python-futures
Depends: ${misc:Depends}, ${python:Depends}, ${shlibs:Depends},
python-concurrent.futures | python-futures, python-six (>= 1.6)
Provides: ${python:Provides}
Recommends: python-scales
Recommends: python-scales, python-blist
Suggests: python-cassandra-driver-doc
Description: Python driver for Apache Cassandra
This driver works exclusively with the Cassandra Query Language v3 (CQL3)

View File

@@ -109,7 +109,7 @@ def setup_package():
cluster.populate(3)
log.debug("Starting ccm test cluster")
cluster.start(wait_for_binary_proto=True)
cluster.start(wait_for_binary_proto=True, wait_other_notice=True)
except Exception:
log.exception("Failed to start ccm cluster:")
raise
@@ -134,7 +134,7 @@ def use_multidc(dc_list):
cluster.populate(dc_list)
log.debug("Starting ccm test cluster")
cluster.start(wait_for_binary_proto=True)
cluster.start(wait_for_binary_proto=True, wait_other_notice=True)
except Exception:
log.exception("Failed to start ccm cluster:")
raise

View File

@@ -39,6 +39,9 @@ class ConnectionTest(object):
klass = None
def setUp(self):
self.klass.initialize_reactor()
def get_connection(self):
"""
Helper method to solve automated testing issues within Jenkins.
@@ -225,6 +228,7 @@ class AsyncoreConnectionTest(ConnectionTest, unittest.TestCase):
def setUp(self):
if 'gevent.monkey' in sys.modules:
raise unittest.SkipTest("Can't test libev with gevent monkey patching")
ConnectionTest.setUp(self)
class LibevConnectionTest(ConnectionTest, unittest.TestCase):
@@ -237,3 +241,4 @@ class LibevConnectionTest(ConnectionTest, unittest.TestCase):
if LibevConnection is None:
raise unittest.SkipTest(
'libev does not appear to be installed properly')
ConnectionTest.setUp(self)

View File

@@ -51,7 +51,7 @@ class MetricsTests(unittest.TestCase):
# Ensure the nodes are actually down
self.assertRaises(NoHostAvailable, session.execute, "USE test3rf")
finally:
get_cluster().start(wait_for_binary_proto=True)
get_cluster().start(wait_for_binary_proto=True, wait_other_notice=True)
self.assertGreater(cluster.metrics.stats.connection_errors, 0)

View File

@@ -42,6 +42,7 @@ class AsyncoreConnectionTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
AsyncoreConnection.initialize_reactor()
cls.socket_patcher = patch('socket.socket', spec=socket.socket)
cls.mock_socket = cls.socket_patcher.start()
cls.mock_socket().connect_ex.return_value = 0

View File

@@ -44,12 +44,13 @@ except ImportError:
@patch('cassandra.io.libevwrapper.IO')
@patch('cassandra.io.libevwrapper.Prepare')
@patch('cassandra.io.libevwrapper.Async')
@patch('cassandra.io.libevreactor.LibevConnection._maybe_start_loop')
@patch('cassandra.io.libevreactor.LibevLoop.maybe_start')
class LibevConnectionTest(unittest.TestCase):
def setUp(self):
if LibevConnection is None:
raise unittest.SkipTest('libev does not appear to be installed correctly')
LibevConnection.initialize_reactor()
def make_connection(self):
c = LibevConnection('1.2.3.4', cql_version='3.0.1')

View File

@@ -108,3 +108,22 @@ class BoundStatementTestCase(unittest.TestCase):
self.assertIn('list', str(e))
else:
self.fail('Passed invalid type but exception was not thrown')
def test_inherit_fetch_size(self):
keyspace = 'keyspace1'
column_family = 'cf1'
column_metadata = [
(keyspace, column_family, 'foo1', Int32Type),
(keyspace, column_family, 'foo2', Int32Type)
]
prepared_statement = PreparedStatement(column_metadata=column_metadata,
query_id=None,
routing_key_indexes=[],
query=None,
keyspace=keyspace,
protocol_version=2,
fetch_size=1234)
bound_statement = BoundStatement(prepared_statement=prepared_statement)
self.assertEqual(1234, bound_statement.fetch_size)