From 90134e142845352092c4bc41e28851db1bdee95e Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 20 Apr 2016 15:30:52 -0500 Subject: [PATCH 01/18] only add local token to the map if it's still in the ring addresses an issue which would cause a None key in the map if the topo change triggering this event was removing the control connection host PYTHON-548 --- cassandra/cluster.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index d7bc95db..641528ff 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2356,6 +2356,9 @@ class ControlConnection(object): cluster_name = local_row["cluster_name"] self._cluster.metadata.cluster_name = cluster_name + partitioner = local_row.get("partitioner") + tokens = local_row.get("tokens") + host = self._cluster.metadata.get_host(connection.host) if host: datacenter = local_row.get("data_center") @@ -2365,10 +2368,8 @@ class ControlConnection(object): host.broadcast_address = local_row.get("broadcast_address") host.release_version = local_row.get("release_version") - partitioner = local_row.get("partitioner") - tokens = local_row.get("tokens") - if partitioner and tokens: - token_map[host] = tokens + if partitioner and tokens: + token_map[host] = tokens # Check metadata.partitioner to see if we haven't built anything yet. If # every node in the cluster was in the contact points, we won't discover From b3149e69e3764c428206d6a97381230bec1c6e66 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 20 Apr 2016 15:33:51 -0500 Subject: [PATCH 02/18] proactively reconnect when the control conn host is removed --- cassandra/cluster.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 641528ff..b84fa590 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2610,7 +2610,13 @@ class ControlConnection(object): self.refresh_node_list_and_token_map(force_token_rebuild=True) def on_remove(self, host): - self.refresh_node_list_and_token_map(force_token_rebuild=True) + c = self._connection + if c and c.host == host.address: + log.debug("[control connection] Control connection host (%s) is being removed. Reconnecting", host) + # refresh will be done on reconnect + self.reconnect() + else: + self.refresh_node_list_and_token_map(force_token_rebuild=True) def get_connections(self): c = getattr(self, '_connection', None) From 98074db5b786fef8b97b440c41c392299e2a1a31 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 20 Apr 2016 16:35:58 -0500 Subject: [PATCH 03/18] make token mapping robust against unexpected state --- cassandra/metadata.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/cassandra/metadata.py b/cassandra/metadata.py index e9bf0b4b..f7337d87 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -1397,12 +1397,18 @@ class TokenMap(object): def rebuild_keyspace(self, keyspace, build_if_absent=False): with self._rebuild_lock: - current = self.tokens_to_hosts_by_ks.get(keyspace, None) - if (build_if_absent and current is None) or (not build_if_absent and current is not None): - ks_meta = self._metadata.keyspaces.get(keyspace) - if ks_meta: - replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace]) - self.tokens_to_hosts_by_ks[keyspace] = replica_map + try: + current = self.tokens_to_hosts_by_ks.get(keyspace, None) + if (build_if_absent and current is None) or (not build_if_absent and current is not None): + ks_meta = self._metadata.keyspaces.get(keyspace) + if ks_meta: + replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace]) + self.tokens_to_hosts_by_ks[keyspace] = replica_map + except Exception: + # should not happen normally, but we don't want to blow up queries because of unexpected meta state + # bypass until new map is generated + self.tokens_to_hosts_by_ks[keyspace] = {} + log.exception("Failed creating a token map for keyspace '%s' with %s. PLEASE REPORT THIS: https://datastax-oss.atlassian.net/projects/PYTHON", keyspace, self.token_to_host_owner) def replica_map_for_keyspace(self, ks_metadata): strategy = ks_metadata.replication_strategy From da977afe92eb74ace187cf42836644512c219085 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 21 Apr 2016 15:16:03 -0500 Subject: [PATCH 04/18] weakref cluster arg in exit reg prevents cluster ref leak when cycling Clusters PYTHON-135 --- cassandra/cluster.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index d7bc95db..ebc78b1f 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -167,8 +167,11 @@ def run_in_executor(f): def _shutdown_cluster(cluster): - if cluster and not cluster.is_shutdown: - cluster.shutdown() + try: + if not cluster.is_shutdown: + cluster.shutdown() + except ReferenceError: + pass # murmur3 implementation required for TokenAware is only available for CPython @@ -887,7 +890,7 @@ class Cluster(object): log.debug("Connecting to cluster, contact points: %s; protocol version: %s", self.contact_points, self.protocol_version) self.connection_class.initialize_reactor() - atexit.register(partial(_shutdown_cluster, self)) + atexit.register(partial(_shutdown_cluster, weakref.proxy(self))) for address in self.contact_points_resolved: host, new = self.add_host(address, signal=False) if new: From d0ff842862943c345cc917652b8fa5dddfa4da61 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 21 Apr 2016 15:41:41 -0500 Subject: [PATCH 05/18] don't register cluster scheduler for separate exit function PYTHON-135 --- cassandra/cluster.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index ebc78b1f..99c6f1df 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2633,7 +2633,7 @@ def _stop_scheduler(scheduler, thread): thread.join() -class _Scheduler(object): +class _Scheduler(Thread): _queue = None _scheduled_tasks = None @@ -2646,13 +2646,9 @@ class _Scheduler(object): self._count = count() self._executor = executor - t = Thread(target=self.run, name="Task Scheduler") - t.daemon = True - t.start() - - # although this runs on a daemonized thread, we prefer to stop - # it gracefully to avoid random errors during interpreter shutdown - atexit.register(partial(_stop_scheduler, weakref.proxy(self), t)) + Thread.__init__(self, name="Task Scheduler") + self.daemon = True + self.start() def shutdown(self): try: @@ -2662,6 +2658,7 @@ class _Scheduler(object): pass self.is_shutdown = True self._queue.put_nowait((0, 0, None)) + self.join() def schedule(self, delay, fn, *args, **kwargs): self._insert_task(delay, (fn, args, tuple(kwargs.items()))) @@ -2690,7 +2687,8 @@ class _Scheduler(object): while True: run_at, i, task = self._queue.get(block=True, timeout=None) if self.is_shutdown: - log.debug("Not executing scheduled task due to Scheduler shutdown") + if task: + log.debug("Not executing scheduled task due to Scheduler shutdown") return if run_at <= time.time(): self._scheduled_tasks.discard(task) From 6910bf7d93cfb947b5d18493217b728a25c681f3 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 21 Apr 2016 16:51:13 -0500 Subject: [PATCH 06/18] single atexit shutdown function prevents growing the atexit function list indefinitely while cycling clusters PYTHON-135 --- cassandra/cluster.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 99c6f1df..6c8e69e9 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -27,7 +27,6 @@ import socket import sys import time from threading import Lock, RLock, Thread, Event -import warnings import six from six.moves import range @@ -166,12 +165,23 @@ def run_in_executor(f): return new_f -def _shutdown_cluster(cluster): - try: - if not cluster.is_shutdown: - cluster.shutdown() - except ReferenceError: - pass +_clusters_for_shutdown = set() + + +def _register_cluster_shutdown(cluster): + _clusters_for_shutdown.add(cluster) + + +def _discard_cluster_shutdown(cluster): + _clusters_for_shutdown.discard(cluster) + + +def _shutdown_clusters(): + clusters = _clusters_for_shutdown.copy() # copy because shutdown modifies the global set "discard" + for cluster in clusters: + cluster.shutdown() + +atexit.register(_shutdown_clusters) # murmur3 implementation required for TokenAware is only available for CPython @@ -890,7 +900,7 @@ class Cluster(object): log.debug("Connecting to cluster, contact points: %s; protocol version: %s", self.contact_points, self.protocol_version) self.connection_class.initialize_reactor() - atexit.register(partial(_shutdown_cluster, weakref.proxy(self))) + _register_cluster_shutdown(self) for address in self.contact_points_resolved: host, new = self.add_host(address, signal=False) if new: @@ -954,6 +964,8 @@ class Cluster(object): self.executor.shutdown() + _discard_cluster_shutdown(self) + def _new_session(self): session = Session(self, self.metadata.all_hosts()) self._session_register_user_types(session) From da994dcd3ad76f3eacae367b87f345f2ccc7a1c4 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Fri, 22 Apr 2016 09:25:36 -0500 Subject: [PATCH 07/18] Explicitly state errors originating from server PYTHON-412 --- cassandra/protocol.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index ede6cc58..a2432a6e 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -134,12 +134,12 @@ class ErrorMessage(_MessageType, Exception): return subcls(code=code, message=msg, info=extra_info) def summary_msg(self): - msg = 'code=%04x [%s] message="%s"' \ + msg = 'Error from server: code=%04x [%s] message="%s"' \ % (self.code, self.summary, self.message) return msg def __str__(self): - return '' % self.summary_msg() + return '<%s>' % self.summary_msg() __repr__ = __str__ @staticmethod @@ -544,6 +544,7 @@ class QueryMessage(_MessageType): if self.timestamp is not None: write_long(f, self.timestamp) + CUSTOM_TYPE = object() RESULT_KIND_VOID = 0x0001 @@ -552,6 +553,7 @@ RESULT_KIND_SET_KEYSPACE = 0x0003 RESULT_KIND_PREPARED = 0x0004 RESULT_KIND_SCHEMA_CHANGE = 0x0005 + class ResultMessage(_MessageType): opcode = 0x08 name = 'RESULT' From f72b207c3c1c0e8388e8ef1addbcb4319cd72fec Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Fri, 22 Apr 2016 09:56:37 -0500 Subject: [PATCH 08/18] Improve warning message when protocol downgrade is used also don't log error when we're defuncting for unsupported protocol PYTHON-157 --- cassandra/cluster.py | 4 +++- cassandra/connection.py | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index d7bc95db..a821a74a 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -868,7 +868,9 @@ class Cluster(object): new_version = previous_version - 1 if new_version < self.protocol_version: if new_version >= MIN_SUPPORTED_VERSION: - log.warning("Downgrading core protocol version from %d to %d for %s", self.protocol_version, new_version, host_addr) + log.warning("Downgrading core protocol version from %d to %d for %s. " + "To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. " + "http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_addr) self.protocol_version = new_version else: raise DriverException("Cannot downgrade protocol version (%d) below minimum supported version: %d" % (new_version, MIN_SUPPORTED_VERSION)) diff --git a/cassandra/connection.py b/cassandra/connection.py index 2b13eebf..8d9ba0e5 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -601,8 +601,8 @@ class Connection(object): if isinstance(response, ProtocolException): if 'unsupported protocol version' in response.message: self.is_unsupported_proto_version = True - - log.error("Closing connection %s due to protocol error: %s", self, response.summary_msg()) + else: + log.error("Closing connection %s due to protocol error: %s", self, response.summary_msg()) self.defunct(response) if callback is not None: callback(response) From 6763fc64774c1108c2b691d421b472cf2b30ac0b Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Mon, 25 Apr 2016 13:21:16 -0500 Subject: [PATCH 09/18] make benchmarks work with C* 3 Now using proto ver negotiation instead of 1. --- benchmarks/base.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/benchmarks/base.py b/benchmarks/base.py index 812db42a..bd30bb32 100644 --- a/benchmarks/base.py +++ b/benchmarks/base.py @@ -63,8 +63,7 @@ TABLE = "testtable" def setup(hosts): log.info("Using 'cassandra' package from %s", cassandra.__path__) - cluster = Cluster(hosts, protocol_version=1) - cluster.set_core_connections_per_host(HostDistance.LOCAL, 1) + cluster = Cluster(hosts, schema_metadata_enabled=False, token_metadata_enabled=False) try: session = cluster.connect() @@ -91,8 +90,7 @@ def setup(hosts): def teardown(hosts): - cluster = Cluster(hosts, protocol_version=1) - cluster.set_core_connections_per_host(HostDistance.LOCAL, 1) + cluster = Cluster(hosts, schema_metadata_enabled=False, token_metadata_enabled=False) session = cluster.connect() session.execute("DROP KEYSPACE " + KEYSPACE) cluster.shutdown() From 21e43412c72d3ebe85493edff083606d3036ae3e Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 26 Apr 2016 11:32:50 -0500 Subject: [PATCH 10/18] asyncore: add method to wake loop and avoid idle spinning PYTHON-239 --- cassandra/io/asyncorereactor.py | 43 ++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 2a83996d..f5cae963 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -18,7 +18,7 @@ import logging import os import socket import sys -from threading import Event, Lock, Thread +from threading import Lock, Thread import time import weakref @@ -52,8 +52,40 @@ def _cleanup(loop_weakref): loop._cleanup() -class AsyncoreLoop(object): +class _PipeWrapper(object): + def __init__(self, fd): + self.fd = fd + + def fileno(self): + return self.fd + + +class _AsyncorePipeDispatcher(asyncore.dispatcher): + + def __init__(self): + self.read_fd, self.write_fd = os.pipe() + asyncore.dispatcher.__init__(self) + self.set_socket(_PipeWrapper(self.read_fd)) + self._notified = False + self._wrote = 0 + self._read = 0 + + def writable(self): + return False + + def handle_read(self): + while len(os.read(self.read_fd, 4096)) == 4096: + pass + self._notified = False + + def notify_loop(self): + if not self._notified: + self._notified = True + os.write(self.write_fd, 'x') + + +class AsyncoreLoop(object): def __init__(self): self._pid = os.getpid() @@ -65,6 +97,7 @@ class AsyncoreLoop(object): self._timers = TimerManager() + self._pipe_dispatcher = _AsyncorePipeDispatcher() atexit.register(partial(_cleanup, weakref.ref(self))) def maybe_start(self): @@ -84,12 +117,15 @@ class AsyncoreLoop(object): self._thread.daemon = True self._thread.start() + def wake_loop(self): + self._pipe_dispatcher.notify_loop() + def _run_loop(self): log.debug("Starting asyncore event loop") with self._loop_lock: while not self._shutdown: try: - asyncore.loop(timeout=0.001, use_poll=True, count=100) + asyncore.loop(timeout=0.1, use_poll=True, count=1) self._timers.service_timeouts() if not asyncore.socket_map: time.sleep(0.005) @@ -254,6 +290,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): with self.deque_lock: self.deque.extend(chunks) self._writable = True + self._loop.wake_loop() def writable(self): return self._writable From 04259fd56f9d118a35bd68084f9816d4051f4d45 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 26 Apr 2016 14:36:46 -0500 Subject: [PATCH 11/18] prototype udp dispatcher for asyncore --- cassandra/io/asyncorereactor.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index f5cae963..d26fd9fa 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -30,6 +30,7 @@ except ImportError: from cassandra.util import WeakSet # noqa import asyncore +from errno import EWOULDBLOCK try: import ssl @@ -85,6 +86,33 @@ class _AsyncorePipeDispatcher(asyncore.dispatcher): os.write(self.write_fd, 'x') +class _AsyncoreUDPDispatcher(asyncore.dispatcher): + + def __init__(self): + self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._socket.bind(('localhost', 10000)) + self._socket.setblocking(False) + asyncore.dispatcher.__init__(self) + self.set_socket(self._socket) + self._notified = False + + def writable(self): + return False + + def handle_read(self): + try: + while self._sock.recvfrom(0): + pass + except socket.error as e: + pass + self._notified = False + + def notify_loop(self): + if not self._notified: + self._notified = True + self._socket.sendto('', ('localhost', 10000)) + + class AsyncoreLoop(object): def __init__(self): @@ -97,7 +125,7 @@ class AsyncoreLoop(object): self._timers = TimerManager() - self._pipe_dispatcher = _AsyncorePipeDispatcher() + self._pipe_dispatcher = _AsyncoreUDPDispatcher() atexit.register(partial(_cleanup, weakref.ref(self))) def maybe_start(self): From 4a82dfe064786df9e17624077486541f6eb2bf90 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 26 Apr 2016 16:19:21 -0500 Subject: [PATCH 12/18] refactor asyncore loop dispatch to use pipe, fallback to busy wait PYTHON-239 --- cassandra/io/asyncorereactor.py | 98 +++++++++++++++++++++++++-------- 1 file changed, 75 insertions(+), 23 deletions(-) diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index d26fd9fa..42eab0fc 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -62,15 +62,33 @@ class _PipeWrapper(object): return self.fd -class _AsyncorePipeDispatcher(asyncore.dispatcher): +class _AsyncoreDispatcher(asyncore.dispatcher): + + def __init__(self, socket): + asyncore.dispatcher.__init__(self) + # inject after to avoid base class validation + self.set_socket(socket) + self._notified = False + + def writable(self): + return False + + def validate(self): + assert not self._notified + self.notify_loop() + assert self._notified + self.loop(0.1) + assert not self._notified + + def loop(self, timeout): + asyncore.loop(timeout=timeout, use_poll=True, count=1) + + +class _AsyncorePipeDispatcher(_AsyncoreDispatcher): def __init__(self): self.read_fd, self.write_fd = os.pipe() - asyncore.dispatcher.__init__(self) - self.set_socket(_PipeWrapper(self.read_fd)) - self._notified = False - self._wrote = 0 - self._read = 0 + _AsyncoreDispatcher.__init__(self, _PipeWrapper(self.read_fd)) def writable(self): return False @@ -86,23 +104,26 @@ class _AsyncorePipeDispatcher(asyncore.dispatcher): os.write(self.write_fd, 'x') -class _AsyncoreUDPDispatcher(asyncore.dispatcher): +class _AsyncoreUDPDispatcher(_AsyncoreDispatcher): + """ + Experimental alternate dispatcher for avoiding busy wait in the asyncore loop. It is not used by default because + it relies on local port binding. + Port scanning is not implemented, so multiple clients on one host will collide. This address would need to be set per + instance, or this could be specialized to scan until an address is found. + """ + bind_address = ('localhost', 10000) def __init__(self): self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self._socket.bind(('localhost', 10000)) - self._socket.setblocking(False) - asyncore.dispatcher.__init__(self) - self.set_socket(self._socket) - self._notified = False - - def writable(self): - return False + self._socket.bind(self.bind_address) + self._socket.setblocking(0) + _AsyncoreDispatcher.__init__(self, self._socket) def handle_read(self): try: - while self._sock.recvfrom(0): - pass + d = self._socket.recvfrom(1) + while d and d[1]: + d = self._socket.recvfrom(1) except socket.error as e: pass self._notified = False @@ -110,11 +131,36 @@ class _AsyncoreUDPDispatcher(asyncore.dispatcher): def notify_loop(self): if not self._notified: self._notified = True - self._socket.sendto('', ('localhost', 10000)) + self._socket.sendto(b'', self.bind_address) + + def loop(self, timeout): + asyncore.loop(timeout=timeout, use_poll=False, count=1) + + +class _BusyWaitDispatcher(object): + + max_write_latency = 0.001 + """ + Timeout pushed down to asyncore select/poll. Dictates the amount of time it will sleep before coming back to check + if anything is writable. + """ + + def notify_loop(self): + pass + + def loop(self, timeout): + if not asyncore.socket_map: + time.sleep(0.005) + count = timeout // self.max_write_latency + asyncore.loop(timeout=self.max_write_latency, use_poll=True, count=count) class AsyncoreLoop(object): + timer_resolution = 0.1 # used as the max interval to be in the io loop before returning to service timeouts + + _loop_dispatch_class = _AsyncorePipeDispatcher + def __init__(self): self._pid = os.getpid() self._loop_lock = Lock() @@ -125,7 +171,15 @@ class AsyncoreLoop(object): self._timers = TimerManager() - self._pipe_dispatcher = _AsyncoreUDPDispatcher() + try: + dispatcher = self._loop_dispatch_class() + dispatcher.validate() + log.debug("Validated loop dispatch with %s", self._loop_dispatch_class) + except Exception: + log.exception("Failed validating loop dispatch with %s. Using busy wait execution instead.", self._loop_dispatch_class) + dispatcher = _BusyWaitDispatcher() + self._loop_dispatcher = dispatcher + atexit.register(partial(_cleanup, weakref.ref(self))) def maybe_start(self): @@ -146,17 +200,15 @@ class AsyncoreLoop(object): self._thread.start() def wake_loop(self): - self._pipe_dispatcher.notify_loop() + self._loop_dispatcher.notify_loop() def _run_loop(self): log.debug("Starting asyncore event loop") with self._loop_lock: while not self._shutdown: try: - asyncore.loop(timeout=0.1, use_poll=True, count=1) + self._loop_dispatcher.loop(self.timer_resolution) self._timers.service_timeouts() - if not asyncore.socket_map: - time.sleep(0.005) except Exception: log.debug("Asyncore event loop stopped unexepectedly", exc_info=True) break From fe762ae8641ab00eb7ae43b1aa882fa5a9fbaec2 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 26 Apr 2016 16:26:39 -0500 Subject: [PATCH 13/18] validate for ayncore _BusyWaitDispatcher --- cassandra/io/asyncorereactor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 42eab0fc..242c0b3d 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -154,6 +154,9 @@ class _BusyWaitDispatcher(object): count = timeout // self.max_write_latency asyncore.loop(timeout=self.max_write_latency, use_poll=True, count=count) + def validate(self): + pass + class AsyncoreLoop(object): From cc3307c9b27516018dad3ad9e22207670cee9f55 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 26 Apr 2016 16:41:35 -0500 Subject: [PATCH 14/18] isolate our dispatcher map from asyncore package map PYTHON-239 --- cassandra/io/asyncorereactor.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 242c0b3d..c02513d7 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -30,19 +30,17 @@ except ImportError: from cassandra.util import WeakSet # noqa import asyncore -from errno import EWOULDBLOCK try: import ssl except ImportError: ssl = None # NOQA -from cassandra.connection import (Connection, ConnectionShutdown, - ConnectionException, NONBLOCKING, - Timer, TimerManager) +from cassandra.connection import Connection, ConnectionShutdown, NONBLOCKING, Timer, TimerManager log = logging.getLogger(__name__) +_dispatcher_map = {} def _cleanup(loop_weakref): try: @@ -65,7 +63,7 @@ class _PipeWrapper(object): class _AsyncoreDispatcher(asyncore.dispatcher): def __init__(self, socket): - asyncore.dispatcher.__init__(self) + asyncore.dispatcher.__init__(self, map=_dispatcher_map) # inject after to avoid base class validation self.set_socket(socket) self._notified = False @@ -81,7 +79,7 @@ class _AsyncoreDispatcher(asyncore.dispatcher): assert not self._notified def loop(self, timeout): - asyncore.loop(timeout=timeout, use_poll=True, count=1) + asyncore.loop(timeout=timeout, use_poll=True, map=_dispatcher_map, count=1) class _AsyncorePipeDispatcher(_AsyncoreDispatcher): @@ -134,7 +132,7 @@ class _AsyncoreUDPDispatcher(_AsyncoreDispatcher): self._socket.sendto(b'', self.bind_address) def loop(self, timeout): - asyncore.loop(timeout=timeout, use_poll=False, count=1) + asyncore.loop(timeout=timeout, use_poll=False, map=_dispatcher_map, count=1) class _BusyWaitDispatcher(object): @@ -149,10 +147,10 @@ class _BusyWaitDispatcher(object): pass def loop(self, timeout): - if not asyncore.socket_map: + if not _dispatcher_map: time.sleep(0.005) count = timeout // self.max_write_latency - asyncore.loop(timeout=self.max_write_latency, use_poll=True, count=count) + asyncore.loop(timeout=self.max_write_latency, use_poll=True, map=_dispatcher_map, count=count) def validate(self): pass @@ -273,13 +271,12 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): def __init__(self, *args, **kwargs): Connection.__init__(self, *args, **kwargs) - asyncore.dispatcher.__init__(self) self.deque = deque() self.deque_lock = Lock() self._connect_socket() - asyncore.dispatcher.__init__(self, self._socket) + asyncore.dispatcher.__init__(self, self._socket, _dispatcher_map) self._writable = True self._readable = True From 9146a1b51c42da264f76e1b53affe470b9464346 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 26 Apr 2016 16:56:39 -0500 Subject: [PATCH 15/18] close and cleanup when asyncore dispatch validation fails --- cassandra/io/asyncorereactor.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index c02513d7..fd899614 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -59,6 +59,9 @@ class _PipeWrapper(object): def fileno(self): return self.fd + def close(self): + os.close(self.fd) + class _AsyncoreDispatcher(asyncore.dispatcher): @@ -155,6 +158,9 @@ class _BusyWaitDispatcher(object): def validate(self): pass + def close(self): + pass + class AsyncoreLoop(object): @@ -178,6 +184,7 @@ class AsyncoreLoop(object): log.debug("Validated loop dispatch with %s", self._loop_dispatch_class) except Exception: log.exception("Failed validating loop dispatch with %s. Using busy wait execution instead.", self._loop_dispatch_class) + dispatcher.close() dispatcher = _BusyWaitDispatcher() self._loop_dispatcher = dispatcher From 489afe5b9e2543d010079db94026c438458a6bde Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 26 Apr 2016 17:02:27 -0500 Subject: [PATCH 16/18] make asyncore default busywaitdispatcher for windows PYTHON-239 --- cassandra/io/asyncorereactor.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index fd899614..c6cf8a19 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -111,6 +111,12 @@ class _AsyncoreUDPDispatcher(_AsyncoreDispatcher): it relies on local port binding. Port scanning is not implemented, so multiple clients on one host will collide. This address would need to be set per instance, or this could be specialized to scan until an address is found. + + To use:: + + from cassandra.io.asyncorereactor import _AsyncoreUDPDispatcher, AsyncoreLoop + AsyncoreLoop._loop_dispatch_class = _AsyncoreUDPDispatcher + """ bind_address = ('localhost', 10000) @@ -166,7 +172,7 @@ class AsyncoreLoop(object): timer_resolution = 0.1 # used as the max interval to be in the io loop before returning to service timeouts - _loop_dispatch_class = _AsyncorePipeDispatcher + _loop_dispatch_class = _AsyncorePipeDispatcher if os.name != 'nt' else _BusyWaitDispatcher def __init__(self): self._pid = os.getpid() From 01a38259e4a1864db79e7c1e5e2e469a3372bf31 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 28 Apr 2016 16:20:41 -0500 Subject: [PATCH 17/18] Don't mark ignored hosts up on discovery We don't know their state unless an event comes in PYTHON-531 --- cassandra/cluster.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index dc5c7221..505c2e24 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1161,7 +1161,7 @@ class Cluster(object): if distance == HostDistance.IGNORED: log.debug("Not adding connection pool for new host %r because the " "load balancing policy has marked it as IGNORED", host) - self._finalize_add(host) + self._finalize_add(host, set_up=False) return futures_lock = Lock() @@ -1203,9 +1203,10 @@ class Cluster(object): if not have_future: self._finalize_add(host) - def _finalize_add(self, host): - # mark the host as up and notify all listeners - host.set_up() + def _finalize_add(self, host, set_up=True): + if set_up: + host.set_up() + for listener in self.listeners: listener.on_add(host) From 1ad419bda41f934ad5b1fa0ce2d76de01f2d1ee7 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Fri, 29 Apr 2016 09:54:40 -0500 Subject: [PATCH 18/18] don't check schema agreement with ignored hosts PYTHON-531 --- cassandra/cluster.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 505c2e24..d1d6590a 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2569,13 +2569,14 @@ class ControlConnection(object): if local_row.get("schema_version"): versions[local_row.get("schema_version")].add(local_address) + lbp = self._cluster.load_balancing_policy for row in peers_result: schema_ver = row.get('schema_version') if not schema_ver: continue addr = self._rpc_from_peer_row(row) peer = self._cluster.metadata.get_host(addr) - if peer and peer.is_up: + if peer and peer.is_up and lbp.distance(peer) != HostDistance.IGNORED: versions[schema_ver].add(addr) if len(versions) == 1: