Add ZK session-aware elections

This creates a session-aware election class which will set a flag
that indicates it has lost the underlying lock.  We can check this
flag when iterating to make sure that we don't continue to attempt
to operate when we have lost the lock underlying an election.

Some drivers had connection lost handling for the EventReceiverElection
at the driver level.  Those are updated to use the handling at the
election level for consistency as well as brevity.

Change-Id: I776f88d015acdfbf1487a85d8473cd174917e90f
This commit is contained in:
James E. Blair 2021-09-06 15:51:16 -07:00
parent aee6ef6f7f
commit 65cac91e6c
5 changed files with 53 additions and 24 deletions

View File

@ -155,7 +155,7 @@ class GerritEventConnector(threading.Thread):
time.sleep(1) time.sleep(1)
def _run(self): def _run(self):
while not self._stopped: while not self._stopped and self.event_queue.election.is_still_valid():
for event in self.event_queue: for event in self.event_queue:
try: try:
self._handleEvent(event) self._handleEvent(event)
@ -343,9 +343,6 @@ class GerritWatcher(threading.Thread):
gerrit_connection.sched.zk_client, gerrit_connection.sched.zk_client,
gerrit_connection.connection_name, gerrit_connection.connection_name,
"watcher") "watcher")
self._connection_lost_event = threading.Event()
gerrit_connection.sched.zk_client.on_connection_lost_listeners.append(
self._connection_lost_event.set)
self.keepalive = keepalive self.keepalive = keepalive
self._stopped = False self._stopped = False
@ -363,9 +360,9 @@ class GerritWatcher(threading.Thread):
def _listen(self, stdout, stderr): def _listen(self, stdout, stderr):
poll = select.poll() poll = select.poll()
poll.register(stdout.channel) poll.register(stdout.channel)
while not (self._stopped or self._connection_lost_event.is_set()): while not self._stopped and self.watcher_election.is_still_valid():
ret = poll.poll(self.poll_timeout) ret = poll.poll(self.poll_timeout)
if self._connection_lost_event.is_set(): if not self.watcher_election.is_still_valid():
return return
for (fd, event) in ret: for (fd, event) in ret:
if fd == stdout.channel.fileno(): if fd == stdout.channel.fileno():
@ -418,7 +415,6 @@ class GerritWatcher(threading.Thread):
self.log.exception("Exception on ssh event stream with %s:", self.log.exception("Exception on ssh event stream with %s:",
self.gerrit_connection.connection_name) self.gerrit_connection.connection_name)
self._stop_event.wait(5) self._stop_event.wait(5)
self._connection_lost_event.clear()
def stop(self): def stop(self):
self.log.debug("Stopping watcher") self.log.debug("Stopping watcher")
@ -440,9 +436,6 @@ class GerritPoller(threading.Thread):
connection.sched.zk_client, connection.connection_name, "poller") connection.sched.zk_client, connection.connection_name, "poller")
self._stopped = False self._stopped = False
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._connection_lost_event = threading.Event()
connection.sched.zk_client.on_connection_lost_listeners.append(
self._connection_lost_event.set)
def _makePendingCheckEvent(self, change, uuid, check): def _makePendingCheckEvent(self, change, uuid, check):
return {'type': 'pending-check', return {'type': 'pending-check',
@ -497,7 +490,7 @@ class GerritPoller(threading.Thread):
def _poll(self): def _poll(self):
next_start = self._last_start + self.poll_interval next_start = self._last_start + self.poll_interval
self._stop_event.wait(max(next_start - time.time(), 0)) self._stop_event.wait(max(next_start - time.time(), 0))
if self._stopped or self._connection_lost_event.is_set(): if self._stopped or not self.poller_election.is_still_valid():
return return
self._last_start = time.time() self._last_start = time.time()
self._poll_checkers() self._poll_checkers()
@ -506,7 +499,7 @@ class GerritPoller(threading.Thread):
def _run(self): def _run(self):
self._last_start = time.time() self._last_start = time.time()
while not (self._stopped or self._connection_lost_event.is_set()): while not self._stopped and self.poller_election.is_still_valid():
# during tests, a sub-class _poll method is used to send # during tests, a sub-class _poll method is used to send
# notifications # notifications
self._poll() self._poll()
@ -519,7 +512,6 @@ class GerritPoller(threading.Thread):
self.log.exception("Exception on Gerrit poll with %s:", self.log.exception("Exception on Gerrit poll with %s:",
self.connection.connection_name) self.connection.connection_name)
time.sleep(1) time.sleep(1)
self._connection_lost_event.clear()
def stop(self): def stop(self):
self.log.debug("Stopping watcher") self.log.debug("Stopping watcher")

View File

@ -60,9 +60,6 @@ class GitWatcher(threading.Thread):
connection.sched.zk_client, connection.sched.zk_client,
connection.connection_name, connection.connection_name,
election_name) election_name)
self._connection_lost_event = threading.Event()
connection.sched.zk_client.on_connection_lost_listeners.append(
self._connection_lost_event.set)
# This is used by the test framework # This is used by the test framework
self._event_count = 0 self._event_count = 0
self._pause = False self._pause = False
@ -142,7 +139,7 @@ class GitWatcher(threading.Thread):
self._event_count += 1 self._event_count += 1
def _run(self): def _run(self):
while not (self._stopped or self._connection_lost_event.is_set()): while not self._stopped and self.watcher_election.is_still_valid():
if not self._pause: if not self._pause:
# during tests, a sub-class _poll method is used to send # during tests, a sub-class _poll method is used to send
# notifications # notifications
@ -158,7 +155,6 @@ class GitWatcher(threading.Thread):
self.watcher_election.run(self._run) self.watcher_election.run(self._run)
except Exception: except Exception:
self.log.exception("Unexpected issue in _run loop:") self.log.exception("Unexpected issue in _run loop:")
self._connection_lost_event.clear()
def stop(self): def stop(self):
self.log.debug("Stopping watcher") self.log.debug("Stopping watcher")

View File

@ -121,8 +121,11 @@ class Nodepool(object):
# completed event is added to the zk queue. # completed event is added to the zk queue.
try: try:
if self.election_won: if self.election_won:
self.emitStats(request) if self.election.is_still_valid():
self._sendNodesProvisionedEvent(request) self.emitStats(request)
self._sendNodesProvisionedEvent(request)
else:
self.stop_watcher_event.set()
except Exception: except Exception:
# If there are any errors moving the event, re-run the # If there are any errors moving the event, re-run the
# election. # election.

37
zuul/zk/election.py Normal file
View File

@ -0,0 +1,37 @@
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kazoo.protocol.states import KazooState
from kazoo.recipe.election import Election
class SessionAwareElection(Election):
def __init__(self, client, path, identifier=None):
self._zuul_session_expired = False
super().__init__(client, path, identifier)
def run(self, func, *args, **kwargs):
self._zuul_session_expired = False
self.lock.client.add_listener(self._zuul_session_watcher)
try:
return super().run(func, *args, **kwargs)
finally:
self.lock.client.remove_listener(self._zuul_session_watcher)
def _zuul_session_watcher(self, state):
if state == KazooState.LOST:
self._zuul_session_expired = True
def is_still_valid(self):
return not self._zuul_session_expired

View File

@ -25,11 +25,11 @@ from contextlib import suppress
from kazoo.exceptions import NoNodeError from kazoo.exceptions import NoNodeError
from kazoo.protocol.states import EventType from kazoo.protocol.states import EventType
from kazoo.recipe.election import Election
from zuul import model from zuul import model
from zuul.lib.collections import DefaultKeyDict from zuul.lib.collections import DefaultKeyDict
from zuul.zk import ZooKeeperSimpleBase, sharding from zuul.zk import ZooKeeperSimpleBase, sharding
from zuul.zk.election import SessionAwareElection
RESULT_EVENT_TYPE_MAP = { RESULT_EVENT_TYPE_MAP = {
"BuildCompletedEvent": model.BuildCompletedEvent, "BuildCompletedEvent": model.BuildCompletedEvent,
@ -808,7 +808,8 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
(CONNECTION_ROOT, connection_name, "election") (CONNECTION_ROOT, connection_name, "election")
) )
self.kazoo_client.ensure_path(self.election_root) self.kazoo_client.ensure_path(self.election_root)
self.election = self.kazoo_client.Election(self.election_root) self.election = SessionAwareElection(
self.kazoo_client, self.election_root)
def _eventWatch(self, callback, event_list): def _eventWatch(self, callback, event_list):
if event_list: if event_list:
@ -837,7 +838,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
yield event yield event
class EventReceiverElection(Election): class EventReceiverElection(SessionAwareElection):
"""Election for a singleton event receiver.""" """Election for a singleton event receiver."""
def __init__(self, client, connection_name, receiver_name): def __init__(self, client, connection_name, receiver_name):
@ -847,7 +848,7 @@ class EventReceiverElection(Election):
super().__init__(client.client, self.election_root) super().__init__(client.client, self.election_root)
class NodepoolEventElection(Election): class NodepoolEventElection(SessionAwareElection):
"""Election for the nodepool completion event processor.""" """Election for the nodepool completion event processor."""
def __init__(self, client): def __init__(self, client):