diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 311693f9..b4854f68 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,11 +1,40 @@ +2.0.3 +===== +In Progress + +Features +-------- +* Use io.BytesIO for reduced CPU consumption (github #143) + +Bug Fixes +--------- +* Fix references to xrange that do not go through "six" in + libevreactor and geventreactor (github #138) +* Make BoundStatements inherit fetch_size from their parent + PreparedStatement (PYTHON-80) +* Clear reactor state in child process after forking + to prevent errors with multiprocessing when the parent + process has connected a Cluster before forking (github #141) + 2.0.2 ===== +June 10, 2014 Bug Fixes --------- * Add six to requirements.txt * Avoid KeyError during schema refresh when a keyspace is dropped and TokenAwarePolicy is not in use +* Avoid registering multiple atexit cleanup functions when the + asyncore event loop is restarted multiple times +* Delay initialization of reactors in order to avoid problems + with shared state when using multiprocessing (PYTHON-60) +* Add python-six to debian dependencies, move python-blist to + recommends +* Fix memory leak when libev connections are created and + destroyed (github #93) +* Ensure token map is rebuilt when hosts are removed from + the cluster 2.0.1 ===== @@ -103,6 +132,9 @@ now: * cassandra.decoder.dict_factory has moved to cassandra.query.dict_factory * cassandra.decoder.ordered_dict_factory has moved to cassandra.query.ordered_dict_factory +Exceptions that were in cassandra.decoder have been moved to cassandra.protocol. If +you handle any of these exceptions, you must adjust the code accordingly. + 1.1.2 ===== May 8, 2014 diff --git a/README-dev.rst b/README-dev.rst index 9a81395d..8c6a13bb 100644 --- a/README-dev.rst +++ b/README-dev.rst @@ -1,6 +1,8 @@ Releasing ========= * Run the tests and ensure they all pass +* If dependencies have changed, make sure ``debian/control`` + is up to date * Update CHANGELOG.rst * Update the version in ``cassandra/__init__.py`` * Commit the changelog and version changes diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 800e74e1..1462df37 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -23,7 +23,7 @@ class NullHandler(logging.Handler): logging.getLogger('cassandra').addHandler(NullHandler()) -__version_info__ = (2, 0, 1, 'post') +__version_info__ = (2, 0, 2, 'post') __version__ = '.'.join(map(str, __version_info__)) @@ -172,6 +172,11 @@ class Timeout(Exception): class ReadTimeout(Timeout): """ A subclass of :exc:`Timeout` for read operations. + + This indicates that the replicas failed to respond to the coordinator + node before the configured timeout. This timeout is configured in + ``cassandra.yaml`` with the ``read_request_timeout_in_ms`` + and ``range_request_timeout_in_ms`` options. """ data_retrieved = None @@ -189,6 +194,11 @@ class ReadTimeout(Timeout): class WriteTimeout(Timeout): """ A subclass of :exc:`Timeout` for write operations. + + This indicates that the replicas failed to respond to the coordinator + node before the configured timeout. This timeout is configured in + ``cassandra.yaml`` with the ``write_request_timeout_in_ms`` + option. """ write_type = None diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 7451dd3d..3ef4245d 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -530,6 +530,7 @@ class Cluster(object): raise Exception("Cluster is already shut down") if not self._is_setup: + self.connection_class.initialize_reactor() atexit.register(partial(_shutdown_cluster, self)) for address in self.contact_points: host = self.add_host(address, signal=False) @@ -1708,17 +1709,18 @@ class ControlConnection(object): else: self._cluster.metadata.rebuild_schema(ks_result, cf_result, col_result) - def refresh_node_list_and_token_map(self): + def refresh_node_list_and_token_map(self, force_token_rebuild=False): try: if self._connection: - self._refresh_node_list_and_token_map(self._connection) + self._refresh_node_list_and_token_map(self._connection, force_token_rebuild=force_token_rebuild) except ReferenceError: pass # our weak reference to the Cluster is no good except Exception: log.debug("[control connection] Error refreshing node list and token map", exc_info=True) self._signal_error() - def _refresh_node_list_and_token_map(self, connection, preloaded_results=None): + def _refresh_node_list_and_token_map(self, connection, preloaded_results=None, + force_token_rebuild=False): if preloaded_results: log.debug("[control connection] Refreshing node list and token map using preloaded results") peers_result = preloaded_results[0] @@ -1753,7 +1755,7 @@ class ControlConnection(object): if partitioner and tokens: token_map[host] = tokens - should_rebuild_token_map = False + should_rebuild_token_map = force_token_rebuild found_hosts = set() for row in peers_result: addr = row.get("rpc_address") @@ -1779,12 +1781,11 @@ class ControlConnection(object): token_map[host] = tokens for old_host in self._cluster.metadata.all_hosts(): - if old_host.address != connection.host and \ - old_host.address not in found_hosts and \ - old_host.address not in self._cluster.contact_points: - log.debug("[control connection] Found host that has been removed: %r", old_host) + if old_host.address != connection.host and old_host.address not in found_hosts: should_rebuild_token_map = True - self._cluster.remove_host(old_host) + if old_host.address not in self._cluster.contact_points: + log.debug("[control connection] Found host that has been removed: %r", old_host) + self._cluster.remove_host(old_host) log.debug("[control connection] Finished fetching ring info") if partitioner and should_rebuild_token_map: @@ -1963,10 +1964,10 @@ class ControlConnection(object): self.reconnect() def on_add(self, host): - self.refresh_node_list_and_token_map() + self.refresh_node_list_and_token_map(force_token_rebuild=True) def on_remove(self, host): - self.refresh_node_list_and_token_map() + self.refresh_node_list_and_token_map(force_token_rebuild=True) def _stop_scheduler(scheduler, thread): diff --git a/cassandra/connection.py b/cassandra/connection.py index ec2c7c41..d2bfdfb4 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -195,6 +195,22 @@ class Connection(object): self.lock = RLock() + @classmethod + def initialize_reactor(self): + """ + Called once by Cluster.connect(). This should be used by implementations + to set up any resources that will be shared across connections. + """ + pass + + @classmethod + def handle_fork(self): + """ + Called after a forking. This should cleanup any remaining reactor state + from the parent process. + """ + pass + def close(self): raise NotImplementedError() diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index de1429c4..7f7e5226 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -12,16 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import # to enable import io from stdlib import atexit from collections import deque from functools import partial +import io import logging import os import socket import sys from threading import Event, Lock, Thread +import weakref -from six import BytesIO from six.moves import range from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, EINVAL, EISCONN, errorcode @@ -46,15 +48,27 @@ from cassandra.marshal import int32_unpack log = logging.getLogger(__name__) +def _cleanup(loop_weakref): + try: + loop = loop_weakref() + except ReferenceError: + return + + loop._cleanup() + + class AsyncoreLoop(object): def __init__(self): + self._pid = os.getpid() self._loop_lock = Lock() self._started = False self._shutdown = False self._conns_lock = Lock() self._conns = WeakSet() + self._thread = None + atexit.register(partial(_cleanup, weakref.ref(self))) def maybe_start(self): should_start = False @@ -69,10 +83,9 @@ class AsyncoreLoop(object): self._loop_lock.release() if should_start: - thread = Thread(target=self._run_loop, name="cassandra_driver_event_loop") - thread.daemon = True - thread.start() - atexit.register(partial(self._cleanup, thread)) + self._thread = Thread(target=self._run_loop, name="cassandra_driver_event_loop") + self._thread.daemon = True + self._thread.start() def _run_loop(self): log.debug("Starting asyncore event loop") @@ -95,11 +108,14 @@ class AsyncoreLoop(object): log.debug("Asyncore event loop ended") - def _cleanup(self, thread): + def _cleanup(self): self._shutdown = True + if not self._thread: + return + log.debug("Waiting for event loop thread to join...") - thread.join(timeout=1.0) - if thread.is_alive(): + self._thread.join(timeout=1.0) + if self._thread.is_alive(): log.warning( "Event loop thread could not be joined, so shutdown may not be clean. " "Please call Cluster.shutdown() to avoid this.") @@ -121,12 +137,29 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): module in the Python standard library for its event loop. """ - _loop = AsyncoreLoop() + _loop = None _total_reqd_bytes = 0 _writable = False _readable = False + @classmethod + def initialize_reactor(cls): + if not cls._loop: + cls._loop = AsyncoreLoop() + else: + current_pid = os.getpid() + if cls._loop._pid != current_pid: + log.debug("Detected fork, clearing and reinitializing reactor state") + cls.handle_fork() + cls._loop = AsyncoreLoop() + + @classmethod + def handle_fork(cls): + if cls._loop: + cls._loop._cleanup() + cls._loop = None + @classmethod def factory(cls, *args, **kwargs): timeout = kwargs.pop('timeout', 5.0) @@ -145,7 +178,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): asyncore.dispatcher.__init__(self) self.connected_event = Event() - self._iobuf = BytesIO() + self._iobuf = io.BytesIO() self._callbacks = {} self.deque = deque() @@ -298,7 +331,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): # leave leftover in current buffer leftover = self._iobuf.read() - self._iobuf = BytesIO() + self._iobuf = io.BytesIO() self._iobuf.write(leftover) self._total_reqd_bytes = 0 diff --git a/cassandra/io/geventreactor.py b/cassandra/io/geventreactor.py index cefe4b8e..93ebab82 100644 --- a/cassandra/io/geventreactor.py +++ b/cassandra/io/geventreactor.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import # to enable import io from stdlib import gevent from gevent import select, socket from gevent.event import Event @@ -19,13 +20,11 @@ from gevent.queue import Queue from collections import defaultdict from functools import partial +import io import logging import os -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO # ignore flake8 warning: # NOQA +from six.moves import xrange from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, EINVAL @@ -72,7 +71,7 @@ class GeventConnection(Connection): Connection.__init__(self, *args, **kwargs) self.connected_event = Event() - self._iobuf = StringIO() + self._iobuf = io.BytesIO() self._write_queue = Queue() self._callbacks = {} @@ -178,7 +177,7 @@ class GeventConnection(Connection): # leave leftover in current buffer leftover = self._iobuf.read() - self._iobuf = StringIO() + self._iobuf = io.BytesIO() self._iobuf.write(leftover) self._total_reqd_bytes = 0 diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index bd000e78..eb95f5bc 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -12,15 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import # to enable import io from stdlib import atexit from collections import deque from functools import partial +import io import logging import os import socket from threading import Event, Lock, Thread +import weakref -from six import BytesIO +from six.moves import xrange from cassandra import OperationTimedOut from cassandra.connection import Connection, ConnectionShutdown, NONBLOCKING @@ -46,81 +49,197 @@ except ImportError: log = logging.getLogger(__name__) -class LibevConnection(Connection): - """ - An implementation of :class:`.Connection` that uses libev for its event loop. - """ - _loop = libev.Loop() - _loop_notifier = libev.Async(_loop) - _loop_notifier.start() +def _cleanup(loop_weakref): + try: + loop = loop_weakref() + except ReferenceError: + return - # prevent _loop_notifier from keeping the loop from returning - _loop.unref() + loop._cleanup() - _loop_started = None - _loop_lock = Lock() - _loop_shutdown = False - @classmethod - def _run_loop(cls): +class LibevLoop(object): + + def __init__(self): + self._pid = os.getpid() + self._loop = libev.Loop() + self._notifier = libev.Async(self._loop) + self._notifier.start() + + # prevent _notifier from keeping the loop from returning + self._loop.unref() + + self._started = False + self._shutdown = False + self._lock = Lock() + + self._thread = None + + # set of all connections; only replaced with a new copy + # while holding _conn_set_lock, never modified in place + self._live_conns = set() + # newly created connections that need their write/read watcher started + self._new_conns = set() + # recently closed connections that need their write/read watcher stopped + self._closed_conns = set() + self._conn_set_lock = Lock() + + self._preparer = libev.Prepare(self._loop, self._loop_will_run) + # prevent _preparer from keeping the loop from returning + self._loop.unref() + self._preparer.start() + + atexit.register(partial(_cleanup, weakref.ref(self))) + + def notify(self): + self._notifier.send() + + def maybe_start(self): + should_start = False + with self._lock: + if not self._started: + log.debug("Starting libev event loop") + self._started = True + should_start = True + + if should_start: + self._thread = Thread(target=self._run_loop, name="event_loop") + self._thread.daemon = True + self._thread.start() + + self._notifier.send() + + def _run_loop(self): while True: - end_condition = cls._loop.start() + end_condition = self._loop.start() # there are still active watchers, no deadlock - with cls._loop_lock: - if not cls._loop_shutdown and (end_condition or cls._live_conns): + with self._lock: + if not self._shutdown and (end_condition or self._live_conns): log.debug("Restarting event loop") continue else: # all Connections have been closed, no active watchers log.debug("All Connections currently closed, event loop ended") - cls._loop_started = False + self._started = False break - @classmethod - def _maybe_start_loop(cls): - should_start = False - with cls._loop_lock: - if not cls._loop_started: - log.debug("Starting libev event loop") - cls._loop_started = True - should_start = True + def _cleanup(self): + self._shutdown = True + if not self._thread: + return - if should_start: - t = Thread(target=cls._run_loop, name="event_loop") - t.daemon = True - t.start() - atexit.register(partial(cls._cleanup, t)) + for conn in self._live_conns | self._new_conns | self._closed_conns: + conn.close() + if conn._write_watcher: + conn._write_watcher.stop() + del conn._write_watcher + if conn._read_watcher: + conn._read_watcher.stop() + del conn._read_watcher - return should_start - - @classmethod - def _cleanup(cls, thread): - cls._loop_shutdown = True log.debug("Waiting for event loop thread to join...") - thread.join(timeout=1.0) - if thread.is_alive(): + self._thread.join(timeout=1.0) + if self._thread.is_alive(): log.warning( "Event loop thread could not be joined, so shutdown may not be clean. " "Please call Cluster.shutdown() to avoid this.") log.debug("Event loop thread was joined") + self._loop = None - # class-level set of all connections; only replaced with a new copy - # while holding _conn_set_lock, never modified in place - _live_conns = set() - # newly created connections that need their write/read watcher started - _new_conns = set() - # recently closed connections that need their write/read watcher stopped - _closed_conns = set() - _conn_set_lock = Lock() + def connection_created(self, conn): + with self._conn_set_lock: + new_live_conns = self._live_conns.copy() + new_live_conns.add(conn) + self._live_conns = new_live_conns + new_new_conns = self._new_conns.copy() + new_new_conns.add(conn) + self._new_conns = new_new_conns + + def connection_destroyed(self, conn): + with self._conn_set_lock: + new_live_conns = self._live_conns.copy() + new_live_conns.discard(conn) + self._live_conns = new_live_conns + + new_closed_conns = self._closed_conns.copy() + new_closed_conns.add(conn) + self._closed_conns = new_closed_conns + + self._notifier.send() + + def _loop_will_run(self, prepare): + changed = False + for conn in self._live_conns: + if not conn.deque and conn._write_watcher_is_active: + if conn._write_watcher: + conn._write_watcher.stop() + conn._write_watcher_is_active = False + changed = True + elif conn.deque and not conn._write_watcher_is_active: + conn._write_watcher.start() + conn._write_watcher_is_active = True + changed = True + + if self._new_conns: + with self._conn_set_lock: + to_start = self._new_conns + self._new_conns = set() + + for conn in to_start: + conn._read_watcher.start() + + changed = True + + if self._closed_conns: + with self._conn_set_lock: + to_stop = self._closed_conns + self._closed_conns = set() + + for conn in to_stop: + if conn._write_watcher: + conn._write_watcher.stop() + # clear reference cycles from IO callback + del conn._write_watcher + if conn._read_watcher: + conn._read_watcher.stop() + # clear reference cycles from IO callback + del conn._read_watcher + + changed = True + + if changed: + self._notifier.send() + + +class LibevConnection(Connection): + """ + An implementation of :class:`.Connection` that uses libev for its event loop. + """ + _libevloop = None _write_watcher_is_active = False - _total_reqd_bytes = 0 _read_watcher = None _write_watcher = None _socket = None + @classmethod + def initialize_reactor(cls): + if not cls._libevloop: + cls._libevloop = LibevLoop() + else: + if cls._libevloop._pid != os.getpid(): + log.debug("Detected fork, clearing and reinitializing reactor state") + cls.handle_fork() + cls._libevloop = LibevLoop() + + @classmethod + def handle_fork(cls): + if cls._libevloop: + cls._libevloop._cleanup() + cls._libevloop = None + @classmethod def factory(cls, *args, **kwargs): timeout = kwargs.pop('timeout', 5.0) @@ -134,73 +253,11 @@ class LibevConnection(Connection): else: return conn - @classmethod - def _connection_created(cls, conn): - with cls._conn_set_lock: - new_live_conns = cls._live_conns.copy() - new_live_conns.add(conn) - cls._live_conns = new_live_conns - - new_new_conns = cls._new_conns.copy() - new_new_conns.add(conn) - cls._new_conns = new_new_conns - - @classmethod - def _connection_destroyed(cls, conn): - with cls._conn_set_lock: - new_live_conns = cls._live_conns.copy() - new_live_conns.discard(conn) - cls._live_conns = new_live_conns - - new_closed_conns = cls._closed_conns.copy() - new_closed_conns.add(conn) - cls._closed_conns = new_closed_conns - - @classmethod - def loop_will_run(cls, prepare): - changed = False - for conn in cls._live_conns: - if not conn.deque and conn._write_watcher_is_active: - if conn._write_watcher: - conn._write_watcher.stop() - conn._write_watcher_is_active = False - changed = True - elif conn.deque and not conn._write_watcher_is_active: - conn._write_watcher.start() - conn._write_watcher_is_active = True - changed = True - - if cls._new_conns: - with cls._conn_set_lock: - to_start = cls._new_conns - cls._new_conns = set() - - for conn in to_start: - conn._read_watcher.start() - - changed = True - - if cls._closed_conns: - with cls._conn_set_lock: - to_stop = cls._closed_conns - cls._closed_conns = set() - - for conn in to_stop: - if conn._write_watcher: - conn._write_watcher.stop() - if conn._read_watcher: - conn._read_watcher.stop() - - changed = True - - if changed: - cls._loop_notifier.send() - def __init__(self, *args, **kwargs): Connection.__init__(self, *args, **kwargs) self.connected_event = Event() - self._iobuf = BytesIO() + self._iobuf = io.BytesIO() self._callbacks = {} self.deque = deque() @@ -219,17 +276,16 @@ class LibevConnection(Connection): for args in self.sockopts: self._socket.setsockopt(*args) - with self._loop_lock: - self._read_watcher = libev.IO(self._socket.fileno(), libev.EV_READ, self._loop, self.handle_read) - self._write_watcher = libev.IO(self._socket.fileno(), libev.EV_WRITE, self._loop, self.handle_write) + with self._libevloop._lock: + self._read_watcher = libev.IO(self._socket.fileno(), libev.EV_READ, self._libevloop._loop, self.handle_read) + self._write_watcher = libev.IO(self._socket.fileno(), libev.EV_WRITE, self._libevloop._loop, self.handle_write) self._send_options_message() - self.__class__._connection_created(self) + self._libevloop.connection_created(self) # start the global event loop if needed - self._maybe_start_loop() - self._loop_notifier.send() + self._libevloop.maybe_start() def close(self): with self.lock: @@ -238,9 +294,9 @@ class LibevConnection(Connection): self.is_closed = True log.debug("Closing connection (%s) to %s", id(self), self.host) - self.__class__._connection_destroyed(self) - self._loop_notifier.send() + self._libevloop.connection_destroyed(self) self._socket.close() + log.debug("Closed socket to %s", self.host) # don't leave in-progress operations hanging if not self.is_defunct: @@ -326,7 +382,7 @@ class LibevConnection(Connection): # leave leftover in current buffer leftover = self._iobuf.read() - self._iobuf = BytesIO() + self._iobuf = io.BytesIO() self._iobuf.write(leftover) self._total_reqd_bytes = 0 @@ -349,7 +405,7 @@ class LibevConnection(Connection): with self._deque_lock: self.deque.extend(chunks) - self._loop_notifier.send() + self._libevloop.notify() def register_watcher(self, event_type, callback, register_timeout=None): self._push_watchers[event_type].add(callback) @@ -361,9 +417,3 @@ class LibevConnection(Connection): self._push_watchers[event_type].add(callback) self.wait_for_response( RegisterMessage(event_list=type_callback_dict.keys()), timeout=register_timeout) - - -_preparer = libev.Prepare(LibevConnection._loop, LibevConnection.loop_will_run) -# prevent _preparer from keeping the loop from returning -LibevConnection._loop.unref() -_preparer.start() diff --git a/cassandra/io/libevwrapper.c b/cassandra/io/libevwrapper.c index ced72208..cbac83b2 100644 --- a/cassandra/io/libevwrapper.c +++ b/cassandra/io/libevwrapper.c @@ -8,6 +8,7 @@ typedef struct libevwrapper_Loop { static void Loop_dealloc(libevwrapper_Loop *self) { + ev_loop_destroy(self->loop); Py_TYPE(self)->tp_free((PyObject *)self); }; @@ -17,9 +18,9 @@ Loop_new(PyTypeObject *type, PyObject *args, PyObject *kwds) { self = (libevwrapper_Loop *)type->tp_alloc(type, 0); if (self != NULL) { - self->loop = ev_default_loop(EVBACKEND_SELECT); + self->loop = ev_loop_new(EVBACKEND_SELECT); if (!self->loop) { - PyErr_SetString(PyExc_Exception, "Error getting default ev loop"); + PyErr_SetString(PyExc_Exception, "Error getting new ev loop"); Py_DECREF(self); return NULL; } @@ -133,7 +134,8 @@ IO_init(libevwrapper_IO *self, PyObject *args, PyObject *kwds) { PyObject *socket; PyObject *callback; PyObject *loop; - int io_flags = 0; + int io_flags = 0, fd = -1; + struct ev_io *io = NULL; if (!PyArg_ParseTuple(args, "OiOO", &socket, &io_flags, &loop, &callback)) { return -1; @@ -154,14 +156,14 @@ IO_init(libevwrapper_IO *self, PyObject *args, PyObject *kwds) { self->callback = callback; } - int fd = PyObject_AsFileDescriptor(socket); + fd = PyObject_AsFileDescriptor(socket); if (fd == -1) { PyErr_SetString(PyExc_TypeError, "unable to get file descriptor from socket"); Py_XDECREF(callback); Py_XDECREF(loop); return -1; } - struct ev_io *io = &(self->io); + io = &(self->io); ev_io_init(io, io_callback, fd, io_flags); self->io.data = self; return 0; @@ -246,6 +248,7 @@ typedef struct libevwrapper_Async { static void Async_dealloc(libevwrapper_Async *self) { + Py_XDECREF(self->loop); Py_TYPE(self)->tp_free((PyObject *)self); }; @@ -254,8 +257,9 @@ static void async_callback(EV_P_ ev_async *watcher, int revents) {}; static int Async_init(libevwrapper_Async *self, PyObject *args, PyObject *kwds) { PyObject *loop; - static char *kwlist[] = {"loop", NULL}; + struct ev_async *async = NULL; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O", kwlist, &loop)) { PyErr_SetString(PyExc_TypeError, "unable to get file descriptor from socket"); return -1; @@ -267,7 +271,7 @@ Async_init(libevwrapper_Async *self, PyObject *args, PyObject *kwds) { } else { return -1; } - struct ev_async *async = &(self->async); + async = &(self->async); ev_async_init(async, async_callback); return 0; }; @@ -345,11 +349,11 @@ Prepare_dealloc(libevwrapper_Prepare *self) { static void prepare_callback(struct ev_loop *loop, ev_prepare *watcher, int revents) { libevwrapper_Prepare *self = watcher->data; - + PyObject *result = NULL; PyGILState_STATE gstate; - gstate = PyGILState_Ensure(); - PyObject *result = PyObject_CallFunction(self->callback, "O", self); + gstate = PyGILState_Ensure(); + result = PyObject_CallFunction(self->callback, "O", self); if (!result) { PyErr_WriteUnraisable(self->callback); } @@ -362,6 +366,7 @@ static int Prepare_init(libevwrapper_Prepare *self, PyObject *args, PyObject *kwds) { PyObject *callback; PyObject *loop; + struct ev_prepare *prepare = NULL; if (!PyArg_ParseTuple(args, "OO", &loop, &callback)) { return -1; @@ -383,7 +388,7 @@ Prepare_init(libevwrapper_Prepare *self, PyObject *args, PyObject *kwds) { Py_INCREF(callback); self->callback = callback; } - struct ev_prepare *prepare = &(self->prepare); + prepare = &(self->prepare); ev_prepare_init(prepare, prepare_callback); self->prepare.data = self; return 0; @@ -478,6 +483,8 @@ void initlibevwrapper(void) #endif { + PyObject *module = NULL; + if (PyType_Ready(&libevwrapper_LoopType) < 0) INITERROR; @@ -494,9 +501,9 @@ initlibevwrapper(void) INITERROR; # if PY_MAJOR_VERSION >= 3 - PyObject *module = PyModule_Create(&moduledef); + module = PyModule_Create(&moduledef); # else - PyObject *module = Py_InitModule3("libevwrapper", module_methods, module_doc); + module = Py_InitModule3("libevwrapper", module_methods, module_doc); # endif if (module == NULL) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index c37478be..eb37d765 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import # to enable import io from stdlib import logging import socket from uuid import UUID import six from six.moves import range +import io from cassandra import (Unavailable, WriteTimeout, ReadTimeout, AlreadyExists, InvalidRequest, Unauthorized, @@ -69,7 +71,7 @@ class _MessageType(object): tracing = False def to_binary(self, stream_id, protocol_version, compression=None): - body = six.BytesIO() + body = io.BytesIO() self.send_body(body, protocol_version) body = body.getvalue() @@ -80,7 +82,7 @@ class _MessageType(object): if self.tracing: flags |= TRACING_FLAG - msg = six.BytesIO() + msg = io.BytesIO() write_header(msg, protocol_version, flags, stream_id, self.opcode, len(body)) msg.write(body) @@ -105,7 +107,7 @@ def decode_response(protocol_version, stream_id, flags, opcode, body, decompress body = decompressor(body) flags ^= COMPRESSED_FLAG - body = six.BytesIO(body) + body = io.BytesIO(body) if flags & TRACING_FLAG: trace_id = UUID(bytes=body.read(16)) flags ^= TRACING_FLAG @@ -228,7 +230,7 @@ class TruncateError(RequestExecutionException): class WriteTimeoutErrorMessage(RequestExecutionException): - summary = 'Timeout during write request' + summary = "Coordinator node timed out waiting for replica nodes' responses" error_code = 0x1100 @staticmethod @@ -245,7 +247,7 @@ class WriteTimeoutErrorMessage(RequestExecutionException): class ReadTimeoutErrorMessage(RequestExecutionException): - summary = 'Timeout during read request' + summary = "Coordinator node timed out waiting for replica nodes' responses" error_code = 0x1200 @staticmethod diff --git a/cassandra/query.py b/cassandra/query.py index fcd24d26..108f4da4 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -109,12 +109,12 @@ def dict_factory(colnames, rows): Example:: - >>> from cassandra.query import named_tuple_factory + >>> from cassandra.query import dict_factory >>> session = cluster.connect('mykeyspace') >>> session.row_factory = dict_factory >>> rows = session.execute("SELECT name, age FROM users LIMIT 1") >>> print rows[0] - {'age': 42, 'name': 'Bob'} + {u'age': 42, u'name': u'Bob'} .. versionchanged:: 2.0.0 moved from ``cassandra.decoder`` to ``cassandra.query`` @@ -397,6 +397,7 @@ class BoundStatement(Statement): """ self.consistency_level = prepared_statement.consistency_level self.serial_consistency_level = prepared_statement.serial_consistency_level + self.fetch_size = prepared_statement.fetch_size self.prepared_statement = prepared_statement self.values = [] diff --git a/debian/changelog b/debian/changelog index c98ea8c9..0360d906 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,15 @@ +python-cassandra-driver (2.0.2-1) unstable; urgency=low + + * Release 2.0.2 + + -- Tyler Hobbs Tue, 10 Jun 2014 16:22:23 -0500 + +python-cassandra-driver (2.0.2~prerelease-1) unstable; urgency=low + + * Update dependencies for 2.0.x + + -- Tyler Hobbs Tue, 03 Jun 2014 15:16:53 -0500 + python-cassandra-driver (1.1.0~prerelease-1) unstable; urgency=low * Initial packaging diff --git a/debian/control b/debian/control index c13f4e86..6a698a8f 100644 --- a/debian/control +++ b/debian/control @@ -6,16 +6,16 @@ Build-Depends: python-all-dev (>= 2.6.6-3), python-all-dbg, debhelper (>= 9), python-sphinx (>= 1.0.7+dfsg) | python3-sphinx, libev-dev, python-concurrent.futures | python-futures, python-setuptools, python-nose, python-mock, python-yaml, python-gevent, - python-blist, python-tz + python-blist, python-tz, python-six (>= 1.6) X-Python-Version: >= 2.7 Standards-Version: 3.9.4 Package: python-cassandra-driver Architecture: any -Depends: ${misc:Depends}, ${python:Depends}, ${shlibs:Depends}, python-blist, - python-concurrent.futures | python-futures +Depends: ${misc:Depends}, ${python:Depends}, ${shlibs:Depends}, + python-concurrent.futures | python-futures, python-six (>= 1.6) Provides: ${python:Provides} -Recommends: python-scales +Recommends: python-scales, python-blist Suggests: python-cassandra-driver-doc Description: Python driver for Apache Cassandra This driver works exclusively with the Cassandra Query Language v3 (CQL3) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 31240093..320e5a46 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -109,7 +109,7 @@ def setup_package(): cluster.populate(3) log.debug("Starting ccm test cluster") - cluster.start(wait_for_binary_proto=True) + cluster.start(wait_for_binary_proto=True, wait_other_notice=True) except Exception: log.exception("Failed to start ccm cluster:") raise @@ -134,7 +134,7 @@ def use_multidc(dc_list): cluster.populate(dc_list) log.debug("Starting ccm test cluster") - cluster.start(wait_for_binary_proto=True) + cluster.start(wait_for_binary_proto=True, wait_other_notice=True) except Exception: log.exception("Failed to start ccm cluster:") raise diff --git a/tests/integration/standard/test_connection.py b/tests/integration/standard/test_connection.py index fca1e6fa..485f0ac7 100644 --- a/tests/integration/standard/test_connection.py +++ b/tests/integration/standard/test_connection.py @@ -39,6 +39,9 @@ class ConnectionTest(object): klass = None + def setUp(self): + self.klass.initialize_reactor() + def get_connection(self): """ Helper method to solve automated testing issues within Jenkins. @@ -225,6 +228,7 @@ class AsyncoreConnectionTest(ConnectionTest, unittest.TestCase): def setUp(self): if 'gevent.monkey' in sys.modules: raise unittest.SkipTest("Can't test libev with gevent monkey patching") + ConnectionTest.setUp(self) class LibevConnectionTest(ConnectionTest, unittest.TestCase): @@ -237,3 +241,4 @@ class LibevConnectionTest(ConnectionTest, unittest.TestCase): if LibevConnection is None: raise unittest.SkipTest( 'libev does not appear to be installed properly') + ConnectionTest.setUp(self) diff --git a/tests/integration/standard/test_metrics.py b/tests/integration/standard/test_metrics.py index aa39843a..9793aa22 100644 --- a/tests/integration/standard/test_metrics.py +++ b/tests/integration/standard/test_metrics.py @@ -51,7 +51,7 @@ class MetricsTests(unittest.TestCase): # Ensure the nodes are actually down self.assertRaises(NoHostAvailable, session.execute, "USE test3rf") finally: - get_cluster().start(wait_for_binary_proto=True) + get_cluster().start(wait_for_binary_proto=True, wait_other_notice=True) self.assertGreater(cluster.metrics.stats.connection_errors, 0) diff --git a/tests/unit/io/test_asyncorereactor.py b/tests/unit/io/test_asyncorereactor.py index 8b358e6d..ecb59bd0 100644 --- a/tests/unit/io/test_asyncorereactor.py +++ b/tests/unit/io/test_asyncorereactor.py @@ -42,6 +42,7 @@ class AsyncoreConnectionTest(unittest.TestCase): @classmethod def setUpClass(cls): + AsyncoreConnection.initialize_reactor() cls.socket_patcher = patch('socket.socket', spec=socket.socket) cls.mock_socket = cls.socket_patcher.start() cls.mock_socket().connect_ex.return_value = 0 diff --git a/tests/unit/io/test_libevreactor.py b/tests/unit/io/test_libevreactor.py index a7b060b1..bef4ecbf 100644 --- a/tests/unit/io/test_libevreactor.py +++ b/tests/unit/io/test_libevreactor.py @@ -44,12 +44,13 @@ except ImportError: @patch('cassandra.io.libevwrapper.IO') @patch('cassandra.io.libevwrapper.Prepare') @patch('cassandra.io.libevwrapper.Async') -@patch('cassandra.io.libevreactor.LibevConnection._maybe_start_loop') +@patch('cassandra.io.libevreactor.LibevLoop.maybe_start') class LibevConnectionTest(unittest.TestCase): def setUp(self): if LibevConnection is None: raise unittest.SkipTest('libev does not appear to be installed correctly') + LibevConnection.initialize_reactor() def make_connection(self): c = LibevConnection('1.2.3.4', cql_version='3.0.1') diff --git a/tests/unit/test_parameter_binding.py b/tests/unit/test_parameter_binding.py index 3f79c6b5..908556fc 100644 --- a/tests/unit/test_parameter_binding.py +++ b/tests/unit/test_parameter_binding.py @@ -108,3 +108,22 @@ class BoundStatementTestCase(unittest.TestCase): self.assertIn('list', str(e)) else: self.fail('Passed invalid type but exception was not thrown') + + def test_inherit_fetch_size(self): + keyspace = 'keyspace1' + column_family = 'cf1' + + column_metadata = [ + (keyspace, column_family, 'foo1', Int32Type), + (keyspace, column_family, 'foo2', Int32Type) + ] + + prepared_statement = PreparedStatement(column_metadata=column_metadata, + query_id=None, + routing_key_indexes=[], + query=None, + keyspace=keyspace, + protocol_version=2, + fetch_size=1234) + bound_statement = BoundStatement(prepared_statement=prepared_statement) + self.assertEqual(1234, bound_statement.fetch_size)