Fix ZK-related race condition in github driver
This change is an alternative to I6f6d7a02ca8358c26ca1afa1d1bfaf9341ca2c3f. At a high level, our plan for ZooKeeper queues is that anyone can append to a queue without a lock, but a queue would only have one processor at a time which is able to remove items from the queue. In the case of the github driver, that is generally true, except that we have two threads doing that work; the first processes queue items, and the second actually removes them. The two threads in some respects act like two simultaneous "owners" of the queue. This cas cause a problem within a single process (in that the second thread can change the queue out from under the first) but also potentially in multiple processes if we don't drain events from the second thread before stopping the first (the second thread could continue to remove events out from under a *different* process in that case). To address both of these, we move all of the event handling into a single thread. This maintains our "single-owner" paradigm, while also maintaining the benefits of the concurrent processing. We start each cycle of the dispatch loop by reading events from the queue and beginning new pre-processing of those events, and we finish each cycle by collecting and acking any events whose pre-processing is complete. The events_in_progress set no longer has to contend with a second thread, so we don't need to make a copy of it anymore. Change-Id: I42f49a60ac3494c39f62cf63e35b78fb5fc0d804
This commit is contained in:
parent
c861ec6dbc
commit
9af4e07532
|
@ -4350,10 +4350,6 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.poller_events, self.git_url_with_auth, self.source_only,
|
||||
self.fake_sql, self.addCleanup, self.validate_tenants)
|
||||
|
||||
if hasattr(self, 'fake_github'):
|
||||
self.additional_event_queues.append(
|
||||
self.fake_github.github_event_connector._event_forward_queue)
|
||||
|
||||
self.merge_server = None
|
||||
|
||||
# Cleanups are run in reverse order
|
||||
|
|
|
@ -41,7 +41,6 @@ from github3.session import AppInstallationTokenAuth
|
|||
|
||||
from zuul.connection import CachedBranchConnection
|
||||
from zuul.driver.github.graphql import GraphQLClient
|
||||
from zuul.lib.queue import NamedQueue
|
||||
from zuul.web.handler import BaseWebController
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
from zuul.model import Ref, Branch, Tag, Project
|
||||
|
@ -652,11 +651,8 @@ class GithubEventConnector:
|
|||
self._event_dispatcher = threading.Thread(
|
||||
name='GithubEventDispatcher', target=self.run_event_dispatcher,
|
||||
daemon=True)
|
||||
self._event_forwarder = threading.Thread(
|
||||
name='GithubEventForwarder', target=self.run_event_forwarder,
|
||||
daemon=True)
|
||||
self._thread_pool = concurrent.futures.ThreadPoolExecutor()
|
||||
self._event_forward_queue = NamedQueue("GithubEventForwardQueue")
|
||||
self._event_forward_queue = collections.deque()
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
|
@ -664,12 +660,9 @@ class GithubEventConnector:
|
|||
self.event_queue.election.cancel()
|
||||
self._event_dispatcher.join()
|
||||
|
||||
self._event_forward_queue.put(None)
|
||||
self._event_forwarder.join()
|
||||
self._thread_pool.shutdown()
|
||||
|
||||
def start(self):
|
||||
self._event_forwarder.start()
|
||||
self._event_dispatcher.start()
|
||||
|
||||
def _onNewEvent(self):
|
||||
|
@ -679,49 +672,80 @@ class GithubEventConnector:
|
|||
|
||||
def run_event_dispatcher(self):
|
||||
self.event_queue.registerEventWatch(self._onNewEvent)
|
||||
# Set the wake event so we get an initial run
|
||||
self._dispatcher_wake_event.set()
|
||||
while not self._stopped:
|
||||
try:
|
||||
self.event_queue.election.run(self._dispatchEvents)
|
||||
self.event_queue.election.run(self._dispatchEventsMain)
|
||||
except Exception:
|
||||
self.log.exception("Exception handling GitHub event:")
|
||||
# In case we caught an exception with events in progress,
|
||||
# reset these in case we run the loop again.
|
||||
self._events_in_progress = set()
|
||||
self._event_forward_queue = collections.deque()
|
||||
|
||||
def _dispatchEvents(self):
|
||||
while not self._stopped:
|
||||
# We need to create a copy of the in-progress set in order to
|
||||
# prevent a race between the dispatcher and forwarder thread.
|
||||
# This could happen if a previously seen event finished and was
|
||||
# removed from the set between start of the iteration and the
|
||||
# in-progress check for this event.
|
||||
in_progress = set(self._events_in_progress)
|
||||
for event in self.event_queue:
|
||||
if event.ack_ref in in_progress:
|
||||
continue
|
||||
etuple = self._eventAsTuple(event)
|
||||
log = get_annotated_logger(self.log, etuple.delivery)
|
||||
log.debug("Github Webhook Received")
|
||||
log.debug("X-Github-Event: %s", etuple.event_type)
|
||||
processor = GithubEventProcessor(self, etuple, event)
|
||||
future = self._thread_pool.submit(processor.run)
|
||||
def _dispatchEventsMain(self):
|
||||
while True:
|
||||
# We can start processing events as long as we're running;
|
||||
# if we are stopping, then we need to continue this loop
|
||||
# until previously processed events are completed but not
|
||||
# start processing any new events.
|
||||
if self._dispatcher_wake_event.is_set() and not self._stopped:
|
||||
self._dispatcher_wake_event.clear()
|
||||
self._dispatchEvents()
|
||||
|
||||
# Events are acknowledged in the event forwarder loop after
|
||||
# pre-processing. This way we can ensure that no events are
|
||||
# lost.
|
||||
self._events_in_progress.add(event.ack_ref)
|
||||
self._event_forward_queue.put(future)
|
||||
# Now process the futures from this or any previous
|
||||
# iterations of the loop.
|
||||
if len(self._event_forward_queue):
|
||||
self._forwardEvents()
|
||||
|
||||
# If there are no futures, we can sleep until there are
|
||||
# new events (or stop altogether); otherwise we need to
|
||||
# continue processing futures.
|
||||
if not len(self._event_forward_queue):
|
||||
if self._stopped:
|
||||
return
|
||||
self._dispatcher_wake_event.wait(10)
|
||||
self._dispatcher_wake_event.clear()
|
||||
self._dispatcher_wake_event.wait(10)
|
||||
else:
|
||||
# Sleep a small amount of time to give the futures
|
||||
# time to complete.
|
||||
self._dispatcher_wake_event.wait(0.1)
|
||||
|
||||
def run_event_forwarder(self):
|
||||
while True:
|
||||
def _dispatchEvents(self):
|
||||
# This is the first half of the event dispatcher. It reads
|
||||
# events from the webhook event queue and passes them to a
|
||||
# concurrent executor for pre-processing.
|
||||
for event in self.event_queue:
|
||||
if self._stopped:
|
||||
return
|
||||
break
|
||||
if event.ack_ref in self._events_in_progress:
|
||||
continue
|
||||
etuple = self._eventAsTuple(event)
|
||||
log = get_annotated_logger(self.log, etuple.delivery)
|
||||
log.debug("Github Webhook Received")
|
||||
log.debug("X-Github-Event: %s", etuple.event_type)
|
||||
processor = GithubEventProcessor(self, etuple, event)
|
||||
future = self._thread_pool.submit(processor.run)
|
||||
|
||||
# Events are acknowledged in the event forwarder loop after
|
||||
# pre-processing. This way we can ensure that no events are
|
||||
# lost.
|
||||
self._events_in_progress.add(event.ack_ref)
|
||||
self._event_forward_queue.append(future)
|
||||
|
||||
def _forwardEvents(self):
|
||||
# This is the second half of the event dispatcher. It
|
||||
# collects pre-processed events from the concurrent executor
|
||||
# and forwards them to the scheduler queues.
|
||||
while True:
|
||||
try:
|
||||
future = self._event_forward_queue.get()
|
||||
if future is None:
|
||||
if not len(self._event_forward_queue):
|
||||
return
|
||||
# Peek at the next event and see if it's done or if we
|
||||
# need to wait for the next loop iteration.
|
||||
if not self._event_forward_queue[0].done():
|
||||
return
|
||||
future = self._event_forward_queue.popleft()
|
||||
event, connection_event = future.result()
|
||||
try:
|
||||
if not event:
|
||||
|
@ -739,9 +763,6 @@ class GithubEventConnector:
|
|||
self._events_in_progress.remove(connection_event.ack_ref)
|
||||
except Exception:
|
||||
self.log.exception("Exception moving GitHub event:")
|
||||
finally:
|
||||
# Ack task in forward queue
|
||||
self._event_forward_queue.task_done()
|
||||
|
||||
@staticmethod
|
||||
def _eventAsTuple(event):
|
||||
|
|
Loading…
Reference in New Issue