diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index c9bd01c30a..1db4fae286 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -340,6 +340,9 @@ class GerritWatcher(threading.Thread): gerrit_connection.sched.zk_client, gerrit_connection.connection_name, "watcher") + self._connection_lost_event = threading.Event() + gerrit_connection.sched.zk_client.on_connection_lost_listeners.append( + self._connection_lost_event.set) self.keepalive = keepalive self._stopped = False @@ -357,8 +360,10 @@ class GerritWatcher(threading.Thread): def _listen(self, stdout, stderr): poll = select.poll() poll.register(stdout.channel) - while not self._stopped: + while not (self._stopped or self._connection_lost_event.is_set()): ret = poll.poll(self.poll_timeout) + if self._connection_lost_event.is_set(): + return for (fd, event) in ret: if fd == stdout.channel.fileno(): if event == select.POLLIN: @@ -410,6 +415,7 @@ class GerritWatcher(threading.Thread): self.log.exception("Exception on ssh event stream with %s:", self.gerrit_connection.connection_name) self._stop_event.wait(5) + self._connection_lost_event.clear() def stop(self): self.log.debug("Stopping watcher") @@ -431,6 +437,9 @@ class GerritPoller(threading.Thread): connection.sched.zk_client, connection.connection_name, "poller") self._stopped = False self._stop_event = threading.Event() + self._connection_lost_event = threading.Event() + connection.sched.zk_client.on_connection_lost_listeners.append( + self._connection_lost_event.set) def _makePendingCheckEvent(self, change, uuid, check): return {'type': 'pending-check', @@ -484,10 +493,10 @@ class GerritPoller(threading.Thread): def _run(self): last_start = time.time() - while not self._stopped: + while not (self._stopped or self._connection_lost_event.is_set()): next_start = last_start + self.poll_interval self._stop_event.wait(max(next_start - time.time(), 0)) - if self._stopped: + if self._stopped or self._connection_lost_event.is_set(): break last_start = time.time() @@ -502,6 +511,7 @@ class GerritPoller(threading.Thread): except Exception: self.log.exception("Exception on Gerrit poll with %s:", self.connection.connection_name) + self._connection_lost_event.clear() def stop(self): self.log.debug("Stopping watcher") diff --git a/zuul/driver/git/gitwatcher.py b/zuul/driver/git/gitwatcher.py index cf780a351b..8556e49c2f 100644 --- a/zuul/driver/git/gitwatcher.py +++ b/zuul/driver/git/gitwatcher.py @@ -60,6 +60,9 @@ class GitWatcher(threading.Thread): connection.sched.zk_client, connection.connection_name, election_name) + self._connection_lost_event = threading.Event() + connection.sched.zk_client.on_connection_lost_listeners.append( + self._connection_lost_event.set) # This is used by the test framework self._event_count = 0 self._pause = False @@ -139,7 +142,7 @@ class GitWatcher(threading.Thread): self._event_count += 1 def _run(self): - while not self._stopped: + while not (self._stopped or self._connection_lost_event.is_set()): if not self._pause: self._poll() # Polling wait delay @@ -153,6 +156,7 @@ class GitWatcher(threading.Thread): self.watcher_election.run(self._run) except Exception: self.log.exception("Unexpected issue in _run loop:") + self._connection_lost_event.clear() def stop(self): self.log.debug("Stopping watcher") diff --git a/zuul/zk/__init__.py b/zuul/zk/__init__.py index 6f19ed6049..df3f5e1832 100644 --- a/zuul/zk/__init__.py +++ b/zuul/zk/__init__.py @@ -61,6 +61,7 @@ class ZooKeeperClient(object): self._last_retry_log: int = 0 self.on_connect_listeners: List[Callable[[], None]] = [] self.on_disconnect_listeners: List[Callable[[], None]] = [] + self.on_connection_lost_listeners: List[Callable[[], None]] = [] def _connectionListener(self, state): """ @@ -70,6 +71,11 @@ class ZooKeeperClient(object): """ if state == KazooState.LOST: self.log.debug("ZooKeeper connection: LOST") + for listener in self.on_connection_lost_listeners: + try: + listener() + except Exception: + self.log.exception("Exception calling listener:") elif state == KazooState.SUSPENDED: self.log.debug("ZooKeeper connection: SUSPENDED") else: