Merge "Stop active event gathering on connection loss"

This commit is contained in:
Zuul 2021-04-08 15:34:36 +00:00 committed by Gerrit Code Review
commit 1c0f0d0719
3 changed files with 24 additions and 4 deletions

View File

@ -340,6 +340,9 @@ class GerritWatcher(threading.Thread):
gerrit_connection.sched.zk_client, gerrit_connection.sched.zk_client,
gerrit_connection.connection_name, gerrit_connection.connection_name,
"watcher") "watcher")
self._connection_lost_event = threading.Event()
gerrit_connection.sched.zk_client.on_connection_lost_listeners.append(
self._connection_lost_event.set)
self.keepalive = keepalive self.keepalive = keepalive
self._stopped = False self._stopped = False
@ -357,8 +360,10 @@ class GerritWatcher(threading.Thread):
def _listen(self, stdout, stderr): def _listen(self, stdout, stderr):
poll = select.poll() poll = select.poll()
poll.register(stdout.channel) poll.register(stdout.channel)
while not self._stopped: while not (self._stopped or self._connection_lost_event.is_set()):
ret = poll.poll(self.poll_timeout) ret = poll.poll(self.poll_timeout)
if self._connection_lost_event.is_set():
return
for (fd, event) in ret: for (fd, event) in ret:
if fd == stdout.channel.fileno(): if fd == stdout.channel.fileno():
if event == select.POLLIN: if event == select.POLLIN:
@ -410,6 +415,7 @@ class GerritWatcher(threading.Thread):
self.log.exception("Exception on ssh event stream with %s:", self.log.exception("Exception on ssh event stream with %s:",
self.gerrit_connection.connection_name) self.gerrit_connection.connection_name)
self._stop_event.wait(5) self._stop_event.wait(5)
self._connection_lost_event.clear()
def stop(self): def stop(self):
self.log.debug("Stopping watcher") self.log.debug("Stopping watcher")
@ -431,6 +437,9 @@ class GerritPoller(threading.Thread):
connection.sched.zk_client, connection.connection_name, "poller") connection.sched.zk_client, connection.connection_name, "poller")
self._stopped = False self._stopped = False
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._connection_lost_event = threading.Event()
connection.sched.zk_client.on_connection_lost_listeners.append(
self._connection_lost_event.set)
def _makePendingCheckEvent(self, change, uuid, check): def _makePendingCheckEvent(self, change, uuid, check):
return {'type': 'pending-check', return {'type': 'pending-check',
@ -484,10 +493,10 @@ class GerritPoller(threading.Thread):
def _run(self): def _run(self):
last_start = time.time() last_start = time.time()
while not self._stopped: while not (self._stopped or self._connection_lost_event.is_set()):
next_start = last_start + self.poll_interval next_start = last_start + self.poll_interval
self._stop_event.wait(max(next_start - time.time(), 0)) self._stop_event.wait(max(next_start - time.time(), 0))
if self._stopped: if self._stopped or self._connection_lost_event.is_set():
break break
last_start = time.time() last_start = time.time()
@ -502,6 +511,7 @@ class GerritPoller(threading.Thread):
except Exception: except Exception:
self.log.exception("Exception on Gerrit poll with %s:", self.log.exception("Exception on Gerrit poll with %s:",
self.connection.connection_name) self.connection.connection_name)
self._connection_lost_event.clear()
def stop(self): def stop(self):
self.log.debug("Stopping watcher") self.log.debug("Stopping watcher")

View File

@ -60,6 +60,9 @@ class GitWatcher(threading.Thread):
connection.sched.zk_client, connection.sched.zk_client,
connection.connection_name, connection.connection_name,
election_name) election_name)
self._connection_lost_event = threading.Event()
connection.sched.zk_client.on_connection_lost_listeners.append(
self._connection_lost_event.set)
# This is used by the test framework # This is used by the test framework
self._event_count = 0 self._event_count = 0
self._pause = False self._pause = False
@ -139,7 +142,7 @@ class GitWatcher(threading.Thread):
self._event_count += 1 self._event_count += 1
def _run(self): def _run(self):
while not self._stopped: while not (self._stopped or self._connection_lost_event.is_set()):
if not self._pause: if not self._pause:
self._poll() self._poll()
# Polling wait delay # Polling wait delay
@ -153,6 +156,7 @@ class GitWatcher(threading.Thread):
self.watcher_election.run(self._run) self.watcher_election.run(self._run)
except Exception: except Exception:
self.log.exception("Unexpected issue in _run loop:") self.log.exception("Unexpected issue in _run loop:")
self._connection_lost_event.clear()
def stop(self): def stop(self):
self.log.debug("Stopping watcher") self.log.debug("Stopping watcher")

View File

@ -61,6 +61,7 @@ class ZooKeeperClient(object):
self._last_retry_log: int = 0 self._last_retry_log: int = 0
self.on_connect_listeners: List[Callable[[], None]] = [] self.on_connect_listeners: List[Callable[[], None]] = []
self.on_disconnect_listeners: List[Callable[[], None]] = [] self.on_disconnect_listeners: List[Callable[[], None]] = []
self.on_connection_lost_listeners: List[Callable[[], None]] = []
def _connectionListener(self, state): def _connectionListener(self, state):
""" """
@ -70,6 +71,11 @@ class ZooKeeperClient(object):
""" """
if state == KazooState.LOST: if state == KazooState.LOST:
self.log.debug("ZooKeeper connection: LOST") self.log.debug("ZooKeeper connection: LOST")
for listener in self.on_connection_lost_listeners:
try:
listener()
except Exception:
self.log.exception("Exception calling listener:")
elif state == KazooState.SUSPENDED: elif state == KazooState.SUSPENDED:
self.log.debug("ZooKeeper connection: SUSPENDED") self.log.debug("ZooKeeper connection: SUSPENDED")
else: else: