Silence control conn errors around shutdown
This commit is contained in:
@@ -1440,7 +1440,7 @@ class ControlConnection(object):
|
|||||||
if self._is_shutdown:
|
if self._is_shutdown:
|
||||||
return
|
return
|
||||||
|
|
||||||
self._cluster.executor.submit(self._reconnect)
|
self._submit(self._reconnect)
|
||||||
|
|
||||||
def _reconnect(self):
|
def _reconnect(self):
|
||||||
log.debug("[control connection] Attempting to reconnect")
|
log.debug("[control connection] Attempting to reconnect")
|
||||||
@@ -1479,6 +1479,14 @@ class ControlConnection(object):
|
|||||||
self._reconnection_handler = new_handler
|
self._reconnection_handler = new_handler
|
||||||
return old
|
return old
|
||||||
|
|
||||||
|
def _submit(self, *args, **kwargs):
|
||||||
|
try:
|
||||||
|
if not self._cluster._is_shutdown:
|
||||||
|
return self._cluster.executor.submit(*args, **kwargs)
|
||||||
|
except ReferenceError:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
with self._lock:
|
with self._lock:
|
||||||
if self._is_shutdown:
|
if self._is_shutdown:
|
||||||
@@ -1504,6 +1512,9 @@ class ControlConnection(object):
|
|||||||
self._signal_error()
|
self._signal_error()
|
||||||
|
|
||||||
def _refresh_schema(self, connection, keyspace=None, table=None):
|
def _refresh_schema(self, connection, keyspace=None, table=None):
|
||||||
|
if self._cluster._is_shutdown:
|
||||||
|
return
|
||||||
|
|
||||||
self.wait_for_schema_agreement(connection)
|
self.wait_for_schema_agreement(connection)
|
||||||
|
|
||||||
where_clause = ""
|
where_clause = ""
|
||||||
@@ -1645,9 +1656,9 @@ class ControlConnection(object):
|
|||||||
table = event['table'] or None
|
table = event['table'] or None
|
||||||
if event['change_type'] in ("CREATED", "DROPPED"):
|
if event['change_type'] in ("CREATED", "DROPPED"):
|
||||||
keyspace = keyspace if table else None
|
keyspace = keyspace if table else None
|
||||||
self._cluster.executor.submit(self.refresh_schema, keyspace)
|
self._submit(self.refresh_schema, keyspace)
|
||||||
elif event['change_type'] == "UPDATED":
|
elif event['change_type'] == "UPDATED":
|
||||||
self._cluster.executor.submit(self.refresh_schema, keyspace, table)
|
self._submit(self.refresh_schema, keyspace, table)
|
||||||
|
|
||||||
def wait_for_schema_agreement(self, connection=None):
|
def wait_for_schema_agreement(self, connection=None):
|
||||||
# Each schema change typically generates two schema refreshes, one
|
# Each schema change typically generates two schema refreshes, one
|
||||||
|
|||||||
Reference in New Issue
Block a user