Merge remote-tracking branch 'origin/3.4'
Conflicts: benchmarks/base.py
This commit is contained in:
@@ -72,7 +72,7 @@ COLUMN_VALUES = {
|
||||
def setup(options):
|
||||
log.info("Using 'cassandra' package from %s", cassandra.__path__)
|
||||
|
||||
cluster = Cluster(options.hosts, protocol_version=options.protocol_version)
|
||||
cluster = Cluster(hosts, schema_metadata_enabled=False, token_metadata_enabled=False)
|
||||
try:
|
||||
session = cluster.connect()
|
||||
|
||||
@@ -107,8 +107,8 @@ def setup(options):
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def teardown(options):
|
||||
cluster = Cluster(options.hosts, protocol_version=options.protocol_version)
|
||||
def teardown(hosts):
|
||||
cluster = Cluster(hosts, schema_metadata_enabled=False, token_metadata_enabled=False)
|
||||
session = cluster.connect()
|
||||
if not options.keep_data:
|
||||
session.execute("DROP KEYSPACE " + options.keyspace)
|
||||
|
@@ -27,7 +27,6 @@ import socket
|
||||
import sys
|
||||
import time
|
||||
from threading import Lock, RLock, Thread, Event
|
||||
import warnings
|
||||
|
||||
import six
|
||||
from six.moves import range
|
||||
@@ -166,10 +165,24 @@ def run_in_executor(f):
|
||||
return new_f
|
||||
|
||||
|
||||
def _shutdown_cluster(cluster):
|
||||
if cluster and not cluster.is_shutdown:
|
||||
_clusters_for_shutdown = set()
|
||||
|
||||
|
||||
def _register_cluster_shutdown(cluster):
|
||||
_clusters_for_shutdown.add(cluster)
|
||||
|
||||
|
||||
def _discard_cluster_shutdown(cluster):
|
||||
_clusters_for_shutdown.discard(cluster)
|
||||
|
||||
|
||||
def _shutdown_clusters():
|
||||
clusters = _clusters_for_shutdown.copy() # copy because shutdown modifies the global set "discard"
|
||||
for cluster in clusters:
|
||||
cluster.shutdown()
|
||||
|
||||
atexit.register(_shutdown_clusters)
|
||||
|
||||
|
||||
# murmur3 implementation required for TokenAware is only available for CPython
|
||||
import platform
|
||||
@@ -868,7 +881,9 @@ class Cluster(object):
|
||||
new_version = previous_version - 1
|
||||
if new_version < self.protocol_version:
|
||||
if new_version >= MIN_SUPPORTED_VERSION:
|
||||
log.warning("Downgrading core protocol version from %d to %d for %s", self.protocol_version, new_version, host_addr)
|
||||
log.warning("Downgrading core protocol version from %d to %d for %s. "
|
||||
"To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. "
|
||||
"http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_addr)
|
||||
self.protocol_version = new_version
|
||||
else:
|
||||
raise DriverException("Cannot downgrade protocol version (%d) below minimum supported version: %d" % (new_version, MIN_SUPPORTED_VERSION))
|
||||
@@ -887,7 +902,7 @@ class Cluster(object):
|
||||
log.debug("Connecting to cluster, contact points: %s; protocol version: %s",
|
||||
self.contact_points, self.protocol_version)
|
||||
self.connection_class.initialize_reactor()
|
||||
atexit.register(partial(_shutdown_cluster, self))
|
||||
_register_cluster_shutdown(self)
|
||||
for address in self.contact_points_resolved:
|
||||
host, new = self.add_host(address, signal=False)
|
||||
if new:
|
||||
@@ -951,6 +966,8 @@ class Cluster(object):
|
||||
|
||||
self.executor.shutdown()
|
||||
|
||||
_discard_cluster_shutdown(self)
|
||||
|
||||
def _new_session(self):
|
||||
session = Session(self, self.metadata.all_hosts())
|
||||
self._session_register_user_types(session)
|
||||
@@ -1144,7 +1161,7 @@ class Cluster(object):
|
||||
if distance == HostDistance.IGNORED:
|
||||
log.debug("Not adding connection pool for new host %r because the "
|
||||
"load balancing policy has marked it as IGNORED", host)
|
||||
self._finalize_add(host)
|
||||
self._finalize_add(host, set_up=False)
|
||||
return
|
||||
|
||||
futures_lock = Lock()
|
||||
@@ -1186,9 +1203,10 @@ class Cluster(object):
|
||||
if not have_future:
|
||||
self._finalize_add(host)
|
||||
|
||||
def _finalize_add(self, host):
|
||||
# mark the host as up and notify all listeners
|
||||
host.set_up()
|
||||
def _finalize_add(self, host, set_up=True):
|
||||
if set_up:
|
||||
host.set_up()
|
||||
|
||||
for listener in self.listeners:
|
||||
listener.on_add(host)
|
||||
|
||||
@@ -2356,6 +2374,9 @@ class ControlConnection(object):
|
||||
cluster_name = local_row["cluster_name"]
|
||||
self._cluster.metadata.cluster_name = cluster_name
|
||||
|
||||
partitioner = local_row.get("partitioner")
|
||||
tokens = local_row.get("tokens")
|
||||
|
||||
host = self._cluster.metadata.get_host(connection.host)
|
||||
if host:
|
||||
datacenter = local_row.get("data_center")
|
||||
@@ -2365,10 +2386,8 @@ class ControlConnection(object):
|
||||
host.broadcast_address = local_row.get("broadcast_address")
|
||||
host.release_version = local_row.get("release_version")
|
||||
|
||||
partitioner = local_row.get("partitioner")
|
||||
tokens = local_row.get("tokens")
|
||||
if partitioner and tokens:
|
||||
token_map[host] = tokens
|
||||
if partitioner and tokens:
|
||||
token_map[host] = tokens
|
||||
|
||||
# Check metadata.partitioner to see if we haven't built anything yet. If
|
||||
# every node in the cluster was in the contact points, we won't discover
|
||||
@@ -2550,13 +2569,14 @@ class ControlConnection(object):
|
||||
if local_row.get("schema_version"):
|
||||
versions[local_row.get("schema_version")].add(local_address)
|
||||
|
||||
lbp = self._cluster.load_balancing_policy
|
||||
for row in peers_result:
|
||||
schema_ver = row.get('schema_version')
|
||||
if not schema_ver:
|
||||
continue
|
||||
addr = self._rpc_from_peer_row(row)
|
||||
peer = self._cluster.metadata.get_host(addr)
|
||||
if peer and peer.is_up:
|
||||
if peer and peer.is_up and lbp.distance(peer) != HostDistance.IGNORED:
|
||||
versions[schema_ver].add(addr)
|
||||
|
||||
if len(versions) == 1:
|
||||
@@ -2609,7 +2629,13 @@ class ControlConnection(object):
|
||||
self.refresh_node_list_and_token_map(force_token_rebuild=True)
|
||||
|
||||
def on_remove(self, host):
|
||||
self.refresh_node_list_and_token_map(force_token_rebuild=True)
|
||||
c = self._connection
|
||||
if c and c.host == host.address:
|
||||
log.debug("[control connection] Control connection host (%s) is being removed. Reconnecting", host)
|
||||
# refresh will be done on reconnect
|
||||
self.reconnect()
|
||||
else:
|
||||
self.refresh_node_list_and_token_map(force_token_rebuild=True)
|
||||
|
||||
def get_connections(self):
|
||||
c = getattr(self, '_connection', None)
|
||||
@@ -2630,7 +2656,7 @@ def _stop_scheduler(scheduler, thread):
|
||||
thread.join()
|
||||
|
||||
|
||||
class _Scheduler(object):
|
||||
class _Scheduler(Thread):
|
||||
|
||||
_queue = None
|
||||
_scheduled_tasks = None
|
||||
@@ -2643,13 +2669,9 @@ class _Scheduler(object):
|
||||
self._count = count()
|
||||
self._executor = executor
|
||||
|
||||
t = Thread(target=self.run, name="Task Scheduler")
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
# although this runs on a daemonized thread, we prefer to stop
|
||||
# it gracefully to avoid random errors during interpreter shutdown
|
||||
atexit.register(partial(_stop_scheduler, weakref.proxy(self), t))
|
||||
Thread.__init__(self, name="Task Scheduler")
|
||||
self.daemon = True
|
||||
self.start()
|
||||
|
||||
def shutdown(self):
|
||||
try:
|
||||
@@ -2659,6 +2681,7 @@ class _Scheduler(object):
|
||||
pass
|
||||
self.is_shutdown = True
|
||||
self._queue.put_nowait((0, 0, None))
|
||||
self.join()
|
||||
|
||||
def schedule(self, delay, fn, *args, **kwargs):
|
||||
self._insert_task(delay, (fn, args, tuple(kwargs.items())))
|
||||
@@ -2687,7 +2710,8 @@ class _Scheduler(object):
|
||||
while True:
|
||||
run_at, i, task = self._queue.get(block=True, timeout=None)
|
||||
if self.is_shutdown:
|
||||
log.debug("Not executing scheduled task due to Scheduler shutdown")
|
||||
if task:
|
||||
log.debug("Not executing scheduled task due to Scheduler shutdown")
|
||||
return
|
||||
if run_at <= time.time():
|
||||
self._scheduled_tasks.discard(task)
|
||||
|
@@ -601,8 +601,8 @@ class Connection(object):
|
||||
if isinstance(response, ProtocolException):
|
||||
if 'unsupported protocol version' in response.message:
|
||||
self.is_unsupported_proto_version = True
|
||||
|
||||
log.error("Closing connection %s due to protocol error: %s", self, response.summary_msg())
|
||||
else:
|
||||
log.error("Closing connection %s due to protocol error: %s", self, response.summary_msg())
|
||||
self.defunct(response)
|
||||
if callback is not None:
|
||||
callback(response)
|
||||
|
@@ -18,7 +18,7 @@ import logging
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
from threading import Event, Lock, Thread
|
||||
from threading import Lock, Thread
|
||||
import time
|
||||
import weakref
|
||||
|
||||
@@ -36,12 +36,11 @@ try:
|
||||
except ImportError:
|
||||
ssl = None # NOQA
|
||||
|
||||
from cassandra.connection import (Connection, ConnectionShutdown,
|
||||
ConnectionException, NONBLOCKING,
|
||||
Timer, TimerManager)
|
||||
from cassandra.connection import Connection, ConnectionShutdown, NONBLOCKING, Timer, TimerManager
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_dispatcher_map = {}
|
||||
|
||||
def _cleanup(loop_weakref):
|
||||
try:
|
||||
@@ -52,8 +51,128 @@ def _cleanup(loop_weakref):
|
||||
loop._cleanup()
|
||||
|
||||
|
||||
class _PipeWrapper(object):
|
||||
|
||||
def __init__(self, fd):
|
||||
self.fd = fd
|
||||
|
||||
def fileno(self):
|
||||
return self.fd
|
||||
|
||||
def close(self):
|
||||
os.close(self.fd)
|
||||
|
||||
|
||||
class _AsyncoreDispatcher(asyncore.dispatcher):
|
||||
|
||||
def __init__(self, socket):
|
||||
asyncore.dispatcher.__init__(self, map=_dispatcher_map)
|
||||
# inject after to avoid base class validation
|
||||
self.set_socket(socket)
|
||||
self._notified = False
|
||||
|
||||
def writable(self):
|
||||
return False
|
||||
|
||||
def validate(self):
|
||||
assert not self._notified
|
||||
self.notify_loop()
|
||||
assert self._notified
|
||||
self.loop(0.1)
|
||||
assert not self._notified
|
||||
|
||||
def loop(self, timeout):
|
||||
asyncore.loop(timeout=timeout, use_poll=True, map=_dispatcher_map, count=1)
|
||||
|
||||
|
||||
class _AsyncorePipeDispatcher(_AsyncoreDispatcher):
|
||||
|
||||
def __init__(self):
|
||||
self.read_fd, self.write_fd = os.pipe()
|
||||
_AsyncoreDispatcher.__init__(self, _PipeWrapper(self.read_fd))
|
||||
|
||||
def writable(self):
|
||||
return False
|
||||
|
||||
def handle_read(self):
|
||||
while len(os.read(self.read_fd, 4096)) == 4096:
|
||||
pass
|
||||
self._notified = False
|
||||
|
||||
def notify_loop(self):
|
||||
if not self._notified:
|
||||
self._notified = True
|
||||
os.write(self.write_fd, 'x')
|
||||
|
||||
|
||||
class _AsyncoreUDPDispatcher(_AsyncoreDispatcher):
|
||||
"""
|
||||
Experimental alternate dispatcher for avoiding busy wait in the asyncore loop. It is not used by default because
|
||||
it relies on local port binding.
|
||||
Port scanning is not implemented, so multiple clients on one host will collide. This address would need to be set per
|
||||
instance, or this could be specialized to scan until an address is found.
|
||||
|
||||
To use::
|
||||
|
||||
from cassandra.io.asyncorereactor import _AsyncoreUDPDispatcher, AsyncoreLoop
|
||||
AsyncoreLoop._loop_dispatch_class = _AsyncoreUDPDispatcher
|
||||
|
||||
"""
|
||||
bind_address = ('localhost', 10000)
|
||||
|
||||
def __init__(self):
|
||||
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self._socket.bind(self.bind_address)
|
||||
self._socket.setblocking(0)
|
||||
_AsyncoreDispatcher.__init__(self, self._socket)
|
||||
|
||||
def handle_read(self):
|
||||
try:
|
||||
d = self._socket.recvfrom(1)
|
||||
while d and d[1]:
|
||||
d = self._socket.recvfrom(1)
|
||||
except socket.error as e:
|
||||
pass
|
||||
self._notified = False
|
||||
|
||||
def notify_loop(self):
|
||||
if not self._notified:
|
||||
self._notified = True
|
||||
self._socket.sendto(b'', self.bind_address)
|
||||
|
||||
def loop(self, timeout):
|
||||
asyncore.loop(timeout=timeout, use_poll=False, map=_dispatcher_map, count=1)
|
||||
|
||||
|
||||
class _BusyWaitDispatcher(object):
|
||||
|
||||
max_write_latency = 0.001
|
||||
"""
|
||||
Timeout pushed down to asyncore select/poll. Dictates the amount of time it will sleep before coming back to check
|
||||
if anything is writable.
|
||||
"""
|
||||
|
||||
def notify_loop(self):
|
||||
pass
|
||||
|
||||
def loop(self, timeout):
|
||||
if not _dispatcher_map:
|
||||
time.sleep(0.005)
|
||||
count = timeout // self.max_write_latency
|
||||
asyncore.loop(timeout=self.max_write_latency, use_poll=True, map=_dispatcher_map, count=count)
|
||||
|
||||
def validate(self):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
class AsyncoreLoop(object):
|
||||
|
||||
timer_resolution = 0.1 # used as the max interval to be in the io loop before returning to service timeouts
|
||||
|
||||
_loop_dispatch_class = _AsyncorePipeDispatcher if os.name != 'nt' else _BusyWaitDispatcher
|
||||
|
||||
def __init__(self):
|
||||
self._pid = os.getpid()
|
||||
@@ -65,6 +184,16 @@ class AsyncoreLoop(object):
|
||||
|
||||
self._timers = TimerManager()
|
||||
|
||||
try:
|
||||
dispatcher = self._loop_dispatch_class()
|
||||
dispatcher.validate()
|
||||
log.debug("Validated loop dispatch with %s", self._loop_dispatch_class)
|
||||
except Exception:
|
||||
log.exception("Failed validating loop dispatch with %s. Using busy wait execution instead.", self._loop_dispatch_class)
|
||||
dispatcher.close()
|
||||
dispatcher = _BusyWaitDispatcher()
|
||||
self._loop_dispatcher = dispatcher
|
||||
|
||||
atexit.register(partial(_cleanup, weakref.ref(self)))
|
||||
|
||||
def maybe_start(self):
|
||||
@@ -84,15 +213,16 @@ class AsyncoreLoop(object):
|
||||
self._thread.daemon = True
|
||||
self._thread.start()
|
||||
|
||||
def wake_loop(self):
|
||||
self._loop_dispatcher.notify_loop()
|
||||
|
||||
def _run_loop(self):
|
||||
log.debug("Starting asyncore event loop")
|
||||
with self._loop_lock:
|
||||
while not self._shutdown:
|
||||
try:
|
||||
asyncore.loop(timeout=0.001, use_poll=True, count=100)
|
||||
self._loop_dispatcher.loop(self.timer_resolution)
|
||||
self._timers.service_timeouts()
|
||||
if not asyncore.socket_map:
|
||||
time.sleep(0.005)
|
||||
except Exception:
|
||||
log.debug("Asyncore event loop stopped unexepectedly", exc_info=True)
|
||||
break
|
||||
@@ -154,13 +284,12 @@ class AsyncoreConnection(Connection, asyncore.dispatcher):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
Connection.__init__(self, *args, **kwargs)
|
||||
asyncore.dispatcher.__init__(self)
|
||||
|
||||
self.deque = deque()
|
||||
self.deque_lock = Lock()
|
||||
|
||||
self._connect_socket()
|
||||
asyncore.dispatcher.__init__(self, self._socket)
|
||||
asyncore.dispatcher.__init__(self, self._socket, _dispatcher_map)
|
||||
|
||||
self._writable = True
|
||||
self._readable = True
|
||||
@@ -254,6 +383,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher):
|
||||
with self.deque_lock:
|
||||
self.deque.extend(chunks)
|
||||
self._writable = True
|
||||
self._loop.wake_loop()
|
||||
|
||||
def writable(self):
|
||||
return self._writable
|
||||
|
@@ -1397,12 +1397,18 @@ class TokenMap(object):
|
||||
|
||||
def rebuild_keyspace(self, keyspace, build_if_absent=False):
|
||||
with self._rebuild_lock:
|
||||
current = self.tokens_to_hosts_by_ks.get(keyspace, None)
|
||||
if (build_if_absent and current is None) or (not build_if_absent and current is not None):
|
||||
ks_meta = self._metadata.keyspaces.get(keyspace)
|
||||
if ks_meta:
|
||||
replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace])
|
||||
self.tokens_to_hosts_by_ks[keyspace] = replica_map
|
||||
try:
|
||||
current = self.tokens_to_hosts_by_ks.get(keyspace, None)
|
||||
if (build_if_absent and current is None) or (not build_if_absent and current is not None):
|
||||
ks_meta = self._metadata.keyspaces.get(keyspace)
|
||||
if ks_meta:
|
||||
replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace])
|
||||
self.tokens_to_hosts_by_ks[keyspace] = replica_map
|
||||
except Exception:
|
||||
# should not happen normally, but we don't want to blow up queries because of unexpected meta state
|
||||
# bypass until new map is generated
|
||||
self.tokens_to_hosts_by_ks[keyspace] = {}
|
||||
log.exception("Failed creating a token map for keyspace '%s' with %s. PLEASE REPORT THIS: https://datastax-oss.atlassian.net/projects/PYTHON", keyspace, self.token_to_host_owner)
|
||||
|
||||
def replica_map_for_keyspace(self, ks_metadata):
|
||||
strategy = ks_metadata.replication_strategy
|
||||
|
@@ -134,12 +134,12 @@ class ErrorMessage(_MessageType, Exception):
|
||||
return subcls(code=code, message=msg, info=extra_info)
|
||||
|
||||
def summary_msg(self):
|
||||
msg = 'code=%04x [%s] message="%s"' \
|
||||
msg = 'Error from server: code=%04x [%s] message="%s"' \
|
||||
% (self.code, self.summary, self.message)
|
||||
return msg
|
||||
|
||||
def __str__(self):
|
||||
return '<ErrorMessage %s>' % self.summary_msg()
|
||||
return '<%s>' % self.summary_msg()
|
||||
__repr__ = __str__
|
||||
|
||||
@staticmethod
|
||||
@@ -544,6 +544,7 @@ class QueryMessage(_MessageType):
|
||||
if self.timestamp is not None:
|
||||
write_long(f, self.timestamp)
|
||||
|
||||
|
||||
CUSTOM_TYPE = object()
|
||||
|
||||
RESULT_KIND_VOID = 0x0001
|
||||
@@ -552,6 +553,7 @@ RESULT_KIND_SET_KEYSPACE = 0x0003
|
||||
RESULT_KIND_PREPARED = 0x0004
|
||||
RESULT_KIND_SCHEMA_CHANGE = 0x0005
|
||||
|
||||
|
||||
class ResultMessage(_MessageType):
|
||||
opcode = 0x08
|
||||
name = 'RESULT'
|
||||
|
Reference in New Issue
Block a user