From 94ba853fe85e670c4374429efa894c33bfe66806 Mon Sep 17 00:00:00 2001 From: Simon Westphahl Date: Wed, 7 Apr 2021 09:53:09 +0200 Subject: [PATCH] Stop active event gathering on connection loss In case of active event gathering we need to exit the event receiver loop if we lose the connection to Zookeeper. This is to avoid duplicate events. Change-Id: I23b8416fff1c0ce6d3a67059283d41fdf29d7f96 --- zuul/driver/gerrit/gerritconnection.py | 16 +++++++++++++--- zuul/driver/git/gitwatcher.py | 6 +++++- zuul/zk/__init__.py | 6 ++++++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index c9bd01c30a..1db4fae286 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -340,6 +340,9 @@ class GerritWatcher(threading.Thread): gerrit_connection.sched.zk_client, gerrit_connection.connection_name, "watcher") + self._connection_lost_event = threading.Event() + gerrit_connection.sched.zk_client.on_connection_lost_listeners.append( + self._connection_lost_event.set) self.keepalive = keepalive self._stopped = False @@ -357,8 +360,10 @@ class GerritWatcher(threading.Thread): def _listen(self, stdout, stderr): poll = select.poll() poll.register(stdout.channel) - while not self._stopped: + while not (self._stopped or self._connection_lost_event.is_set()): ret = poll.poll(self.poll_timeout) + if self._connection_lost_event.is_set(): + return for (fd, event) in ret: if fd == stdout.channel.fileno(): if event == select.POLLIN: @@ -410,6 +415,7 @@ class GerritWatcher(threading.Thread): self.log.exception("Exception on ssh event stream with %s:", self.gerrit_connection.connection_name) self._stop_event.wait(5) + self._connection_lost_event.clear() def stop(self): self.log.debug("Stopping watcher") @@ -431,6 +437,9 @@ class GerritPoller(threading.Thread): connection.sched.zk_client, connection.connection_name, "poller") self._stopped = False self._stop_event = threading.Event() + self._connection_lost_event = threading.Event() + connection.sched.zk_client.on_connection_lost_listeners.append( + self._connection_lost_event.set) def _makePendingCheckEvent(self, change, uuid, check): return {'type': 'pending-check', @@ -484,10 +493,10 @@ class GerritPoller(threading.Thread): def _run(self): last_start = time.time() - while not self._stopped: + while not (self._stopped or self._connection_lost_event.is_set()): next_start = last_start + self.poll_interval self._stop_event.wait(max(next_start - time.time(), 0)) - if self._stopped: + if self._stopped or self._connection_lost_event.is_set(): break last_start = time.time() @@ -502,6 +511,7 @@ class GerritPoller(threading.Thread): except Exception: self.log.exception("Exception on Gerrit poll with %s:", self.connection.connection_name) + self._connection_lost_event.clear() def stop(self): self.log.debug("Stopping watcher") diff --git a/zuul/driver/git/gitwatcher.py b/zuul/driver/git/gitwatcher.py index cf780a351b..8556e49c2f 100644 --- a/zuul/driver/git/gitwatcher.py +++ b/zuul/driver/git/gitwatcher.py @@ -60,6 +60,9 @@ class GitWatcher(threading.Thread): connection.sched.zk_client, connection.connection_name, election_name) + self._connection_lost_event = threading.Event() + connection.sched.zk_client.on_connection_lost_listeners.append( + self._connection_lost_event.set) # This is used by the test framework self._event_count = 0 self._pause = False @@ -139,7 +142,7 @@ class GitWatcher(threading.Thread): self._event_count += 1 def _run(self): - while not self._stopped: + while not (self._stopped or self._connection_lost_event.is_set()): if not self._pause: self._poll() # Polling wait delay @@ -153,6 +156,7 @@ class GitWatcher(threading.Thread): self.watcher_election.run(self._run) except Exception: self.log.exception("Unexpected issue in _run loop:") + self._connection_lost_event.clear() def stop(self): self.log.debug("Stopping watcher") diff --git a/zuul/zk/__init__.py b/zuul/zk/__init__.py index c085d21205..7b76375df9 100644 --- a/zuul/zk/__init__.py +++ b/zuul/zk/__init__.py @@ -61,6 +61,7 @@ class ZooKeeperClient(object): self._last_retry_log: int = 0 self.on_connect_listeners: List[Callable[[], None]] = [] self.on_disconnect_listeners: List[Callable[[], None]] = [] + self.on_connection_lost_listeners: List[Callable[[], None]] = [] def _connectionListener(self, state): """ @@ -70,6 +71,11 @@ class ZooKeeperClient(object): """ if state == KazooState.LOST: self.log.debug("ZooKeeper connection: LOST") + for listener in self.on_connection_lost_listeners: + try: + listener() + except Exception: + self.log.exception("Exception calling listener:") elif state == KazooState.SUSPENDED: self.log.debug("ZooKeeper connection: SUSPENDED") else: