Stop active event gathering on connection loss

In case of active event gathering we need to exit the event receiver
loop if we lose the connection to Zookeeper. This is to avoid duplicate
events.

Change-Id: I23b8416fff1c0ce6d3a67059283d41fdf29d7f96
This commit is contained in:
Simon Westphahl 2021-04-07 09:53:09 +02:00
parent ebcbb544be
commit 94ba853fe8
3 changed files with 24 additions and 4 deletions

View File

@ -340,6 +340,9 @@ class GerritWatcher(threading.Thread):
gerrit_connection.sched.zk_client,
gerrit_connection.connection_name,
"watcher")
self._connection_lost_event = threading.Event()
gerrit_connection.sched.zk_client.on_connection_lost_listeners.append(
self._connection_lost_event.set)
self.keepalive = keepalive
self._stopped = False
@ -357,8 +360,10 @@ class GerritWatcher(threading.Thread):
def _listen(self, stdout, stderr):
poll = select.poll()
poll.register(stdout.channel)
while not self._stopped:
while not (self._stopped or self._connection_lost_event.is_set()):
ret = poll.poll(self.poll_timeout)
if self._connection_lost_event.is_set():
return
for (fd, event) in ret:
if fd == stdout.channel.fileno():
if event == select.POLLIN:
@ -410,6 +415,7 @@ class GerritWatcher(threading.Thread):
self.log.exception("Exception on ssh event stream with %s:",
self.gerrit_connection.connection_name)
self._stop_event.wait(5)
self._connection_lost_event.clear()
def stop(self):
self.log.debug("Stopping watcher")
@ -431,6 +437,9 @@ class GerritPoller(threading.Thread):
connection.sched.zk_client, connection.connection_name, "poller")
self._stopped = False
self._stop_event = threading.Event()
self._connection_lost_event = threading.Event()
connection.sched.zk_client.on_connection_lost_listeners.append(
self._connection_lost_event.set)
def _makePendingCheckEvent(self, change, uuid, check):
return {'type': 'pending-check',
@ -484,10 +493,10 @@ class GerritPoller(threading.Thread):
def _run(self):
last_start = time.time()
while not self._stopped:
while not (self._stopped or self._connection_lost_event.is_set()):
next_start = last_start + self.poll_interval
self._stop_event.wait(max(next_start - time.time(), 0))
if self._stopped:
if self._stopped or self._connection_lost_event.is_set():
break
last_start = time.time()
@ -502,6 +511,7 @@ class GerritPoller(threading.Thread):
except Exception:
self.log.exception("Exception on Gerrit poll with %s:",
self.connection.connection_name)
self._connection_lost_event.clear()
def stop(self):
self.log.debug("Stopping watcher")

View File

@ -60,6 +60,9 @@ class GitWatcher(threading.Thread):
connection.sched.zk_client,
connection.connection_name,
election_name)
self._connection_lost_event = threading.Event()
connection.sched.zk_client.on_connection_lost_listeners.append(
self._connection_lost_event.set)
# This is used by the test framework
self._event_count = 0
self._pause = False
@ -139,7 +142,7 @@ class GitWatcher(threading.Thread):
self._event_count += 1
def _run(self):
while not self._stopped:
while not (self._stopped or self._connection_lost_event.is_set()):
if not self._pause:
self._poll()
# Polling wait delay
@ -153,6 +156,7 @@ class GitWatcher(threading.Thread):
self.watcher_election.run(self._run)
except Exception:
self.log.exception("Unexpected issue in _run loop:")
self._connection_lost_event.clear()
def stop(self):
self.log.debug("Stopping watcher")

View File

@ -61,6 +61,7 @@ class ZooKeeperClient(object):
self._last_retry_log: int = 0
self.on_connect_listeners: List[Callable[[], None]] = []
self.on_disconnect_listeners: List[Callable[[], None]] = []
self.on_connection_lost_listeners: List[Callable[[], None]] = []
def _connectionListener(self, state):
"""
@ -70,6 +71,11 @@ class ZooKeeperClient(object):
"""
if state == KazooState.LOST:
self.log.debug("ZooKeeper connection: LOST")
for listener in self.on_connection_lost_listeners:
try:
listener()
except Exception:
self.log.exception("Exception calling listener:")
elif state == KazooState.SUSPENDED:
self.log.debug("ZooKeeper connection: SUSPENDED")
else: