""" Connection pooling and host management. """ import logging import time from threading import Lock, RLock, Condition import traceback import weakref from cassandra import AuthenticationFailed from cassandra.connection import MAX_STREAM_PER_CONNECTION, ConnectionException log = logging.getLogger(__name__) class NoConnectionsAvailable(Exception): """ All existing connections to a given host are busy, or there are no open connections. """ pass class Host(object): """ Represents a single Cassandra node. """ address = None """ The IP address or hostname of the node. """ monitor = None """ A :class:`.HealthMonitor` instance that tracks whether this node is up or down. """ _datacenter = None _rack = None _reconnection_handler = None def __init__(self, inet_address, conviction_policy_factory): if inet_address is None: raise ValueError("inet_address may not be None") if conviction_policy_factory is None: raise ValueError("conviction_policy_factory may not be None") self.address = inet_address self.monitor = HealthMonitor(conviction_policy_factory(self)) self._reconnection_lock = Lock() @property def datacenter(self): """ The datacenter the node is in. """ return self._datacenter @property def rack(self): """ The rack the node is in. """ return self._rack def set_location_info(self, datacenter, rack): """ Sets the datacenter and rack for this node. Intended for internal use (by the control connection, which periodically checks the ring topology) only. """ self._datacenter = datacenter self._rack = rack def get_and_set_reconnection_handler(self, new_handler): """ Atomically replaces the reconnection handler for this host. Intended for internal use only. """ with self._reconnection_lock: old = self._reconnection_handler self._reconnection_handler = new_handler return old def __eq__(self, other): if not isinstance(other, Host): return False return self.address == other.address def __str__(self): return self.address def __repr__(self): dc = (" %s" % (self._datacenter,)) if self._datacenter else "" return "<%s: %s%s>" % (self.__class__.__name__, self.address, dc) class _ReconnectionHandler(object): """ Abstract class for attempting reconnections with a given schedule and scheduler. """ _cancelled = False def __init__(self, scheduler, schedule, callback, *callback_args, **callback_kwargs): self.scheduler = scheduler self.schedule = schedule self.callback = callback self.callback_args = callback_args self.callback_kwargs = callback_kwargs def start(self): if self._cancelled: return # TODO cancel previous reconnection handlers? That's probably the job # of whatever created this. first_delay = self.schedule.next() self.scheduler.schedule(first_delay, self.run) def run(self): if self._cancelled: self.callback(*(self.callback_args), **(self.callback_kwargs)) try: self.on_reconnection(self.try_reconnect()) except Exception, exc: next_delay = self.schedule.next() if self.on_exception(exc, next_delay): self.scheduler.schedule(next_delay, self.run) else: self.callback(*(self.callback_args), **(self.callback_kwargs)) def cancel(self): self._cancelled = True def try_reconnect(self): """ Subclasses must implement this method. It should attempt to open a new Connection and return it; if a failure occurs, an Exception should be raised. """ raise NotImplementedError() def on_reconnection(self, connection): """ Called when a new Connection is successfully opened. Nothing is done by default. """ pass def on_exception(self, exc, next_delay): """ Called when an Exception is raised when trying to connect. `exc` is the Exception that was raised and `next_delay` is the number of seconds (as a float) that the handler will wait before attempting to connect again. Subclasses should return :const:`False` if no more attempts to connection should be made, :const:`True` otherwise. The default behavior is to always retry unless the error is an :exc:`.AuthenticationFailed` instance. """ if isinstance(exc, AuthenticationFailed): return False else: return True class _HostReconnectionHandler(_ReconnectionHandler): def __init__(self, host, connection_factory, *args, **kwargs): _ReconnectionHandler.__init__(self, *args, **kwargs) self.host = host self.connection_factory = connection_factory def try_reconnect(self): return self.connection_factory() def on_reconnection(self, connection): self.host.monitor.reset() def on_exception(self, exc, next_delay): if isinstance(exc, AuthenticationFailed): return False else: log.warn("Error attempting to reconnect to %s: %s", self.host, exc) log.debug(traceback.format_exc(exc)) return True class HealthMonitor(object): """ Monitors whether a particular host is marked as up or down. This class is primarily intended for internal use, although applications may find it useful to check whether a given node is up or down. """ is_up = True """ A boolean representing the current state of the node. """ def __init__(self, conviction_policy): self._conviction_policy = conviction_policy self._host = conviction_policy.host # self._listeners will hold, among other things, references to # Cluster objects. To allow those to be GC'ed (and shutdown) even # though we've implemented __del__, use weak references. self._listeners = weakref.WeakSet() self._lock = RLock() def register(self, listener): with self._lock: self._listeners.add(listener) def unregister(self, listener): with self._lock: self._listeners.remove(listener) def set_up(self): if self.is_up: return self._conviction_policy.reset() log.info("Host %s is considered up", self._host) with self._lock: listeners = self._listeners.copy() for listener in listeners: listener.on_up(self._host) self.is_up = True def set_down(self): if not self.is_up: return self.is_up = False log.info("Host %s is considered down", self._host) with self._lock: listeners = self._listeners.copy() for listener in listeners: listener.on_down(self._host) def reset(self): return self.set_up() def signal_connection_failure(self, connection_exc): is_down = self._conviction_policy.add_failure(connection_exc) if is_down: self.set_down() return is_down _MAX_SIMULTANEOUS_CREATION = 1 class HostConnectionPool(object): host = None host_distance = None is_shutdown = False open_count = 0 _scheduled_for_creation = 0 def __init__(self, host, host_distance, session): self.host = host self.host_distance = host_distance self._session = weakref.proxy(session) self._lock = RLock() self._conn_available_condition = Condition() core_conns = session.cluster.get_core_connections_per_host(host_distance) self._connections = [session.cluster.connection_factory(host.address) for i in range(core_conns)] self._trash = set() self.open_count = core_conns def borrow_connection(self, timeout): if self.is_shutdown: raise ConnectionException( "Pool for %s is shutdown" % (self.host,), self.host) conns = self._connections if not conns: # handled specially just for simpler code log.debug("Detected empty pool, opening core conns to %s" % (self.host,)) core_conns = self._session.cluster.get_core_connections_per_host(self.host_distance) with self._lock: # we check the length of self._connections again # along with self._scheduled_for_creation while holding the lock # in case multiple threads hit this condition at the same time to_create = core_conns - (len(self._connections) + self._scheduled_for_creation) for i in range(to_create): self._scheduled_for_creation += 1 self._session.submit(self._create_new_connection) # in_flight is incremented by wait_for_conn conn = self._wait_for_conn(timeout) conn.set_keyspace(self._session.keyspace) return conn else: # note: it would be nice to push changes to these config settings # to pools instead of doing a new lookup on every # borrow_connection() call max_reqs = self._session.cluster.get_max_requests_per_connection(self.host_distance) max_conns = self._session.cluster.get_max_connections_per_host(self.host_distance) least_busy = min(conns, key=lambda c: c.in_flight) # to avoid another thread closing this connection while # trashing it (through the return_connection process), hold # the connection lock from this point until we've incremented # its in_flight count with least_busy.lock: # if we have too many requests on this connection but we still # have space to open a new connection against this host, go ahead # and schedule the creation of a new connection if least_busy.in_flight >= max_reqs and len(self._connections) < max_conns: self._maybe_spawn_new_connection() if least_busy.in_flight >= MAX_STREAM_PER_CONNECTION: # once we release the lock, wait for another connection need_to_wait = True else: need_to_wait = False least_busy.in_flight += 1 if need_to_wait: # wait_for_conn will increment in_flight on the conn least_busy = self._wait_for_conn(timeout) least_busy.set_keyspace(self._session.keyspace) return least_busy def _maybe_spawn_new_connection(self): with self._lock: if self._scheduled_for_creation >= _MAX_SIMULTANEOUS_CREATION: return self._scheduled_for_creation += 1 log.debug("Submitting task for creation of new Connection to %s" % (self.host,)) self._session.submit(self._create_new_connection) def _create_new_connection(self): try: self._add_conn_if_under_max() except: log.exception("Unexpectedly failed to create new connection") finally: with self._lock: self._scheduled_for_creation -= 1 def _add_conn_if_under_max(self): max_conns = self._session.cluster.get_max_connections_per_host(self.host_distance) with self._lock: if self.is_shutdown: return False if self.open_count >= max_conns: return False self.open_count += 1 try: conn = self._session.cluster.connection_factory(self.host.address) with self._lock: new_connections = self._connections[:] + [conn] self._connections = new_connections self._signal_available_conn() return True except ConnectionException, exc: log.exception("Failed to add new connection to pool for host %s" % (self.host,)) with self._lock: self.open_count -= 1 if self.host.monitor.signal_connection_failure(exc): self.shutdown() return False except AuthenticationFailed: with self._lock: self.open_count -= 1 return False def _await_available_conn(self, timeout): with self._conn_available_condition: self._conn_available_condition.wait(timeout) def _signal_available_conn(self): with self._conn_available_condition: self._conn_available_condition.notify() def _signal_all_available_conn(self): with self._conn_available_condition: self._conn_available_condition.notify_all() def _wait_for_conn(self, timeout): start = time.time() remaining = timeout while remaining > 0: # wait on our condition for the possibility that a connection # is useable self._await_available_conn(remaining) # self.shutdown() may trigger the above Condition if self.is_shutdown: raise ConnectionException("Pool is shutdown") conns = self._connections if conns: least_busy = min(conns, key=lambda c: c.in_flight) with least_busy.lock: if least_busy.in_flight < MAX_STREAM_PER_CONNECTION: least_busy.in_flight += 1 return least_busy remaining = timeout - (time.time() - start) raise NoConnectionsAvailable() def return_connection(self, connection): with connection.lock: connection.in_flight -= 1 in_flight = connection.in_flight if connection.is_defunct or connection.is_closed: is_down = self.host.monitor.signal_connection_failure(connection.last_error) if is_down: self.shutdown() else: self._replace(connection) else: if connection in self._trash: with connection.lock: if in_flight == 0: with self._lock: self._trash.remove(connection) connection.close() return core_conns = self._session.cluster.get_core_connections_per_host(self.host_distance) min_reqs = self._session.cluster.get_min_requests_per_connection(self.host_distance) # we can use in_flight here without holding the connection lock # because the fact that in_flight dipped below the min at some # point is enough to start the trashing procedure if len(self._connections) > core_conns and in_flight <= min_reqs: self._maybe_trash_connection(connection) else: self._signal_available_conn() def _maybe_trash_connection(self, connection): core_conns = self._session.cluster.get_core_connections_per_host(self.host_distance) did_trash = False with self._lock: if connection not in self._connections: return if self.open_count > core_conns: did_trash = True self.open_count -= 1 new_connections = self._connections[:] new_connections.remove(connection) self._connections = new_connections with connection.lock: if connection.in_flight == 0: connection.close() # skip adding it to the trash if we're already closing it return self._trash.add(connection) if did_trash: log.debug("Trashed connection to %s" % (self.host,)) def _replace(self, connection): should_replace = False with self._lock: if connection in self._connections: new_connections = self._connections[:] new_connections.remove(connection) self._connections = new_connections self.open_count -= 1 should_replace = True if should_replace: log.debug("Replacing connection to %s" % (self.host,)) def close_and_replace(): connection.close() self._add_conn_if_under_max() self._session.submit(close_and_replace) else: # just close it log.debug("Closing connection to %s" % (self.host,)) connection.close() def shutdown(self): with self._lock: if self.is_shutdown: return else: self.is_shutdown = True self._signal_all_available_conn() for conn in self._connections: conn.close() self.open_count -= 1 reconnector = self.host.get_and_set_reconnection_handler(None) if reconnector: reconnector.cancel() def ensure_core_connections(self): if self.is_shutdown: return core_conns = self._session.cluster.get_core_connections_per_host(self.host_distance) with self._lock: to_create = core_conns - (len(self._connections) + self._scheduled_for_creation) for i in range(to_create): self._scheduled_for_creation += 1 self._session.submit(self._create_new_connection)