Merge "Fix ZK-related race condition in github driver"
This commit is contained in:
@@ -41,7 +41,6 @@ from github3.session import AppInstallationTokenAuth
|
||||
|
||||
from zuul.connection import CachedBranchConnection
|
||||
from zuul.driver.github.graphql import GraphQLClient
|
||||
from zuul.lib.queue import NamedQueue
|
||||
from zuul.web.handler import BaseWebController
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
from zuul.model import Ref, Branch, Tag, Project
|
||||
@@ -653,11 +652,8 @@ class GithubEventConnector:
|
||||
self._event_dispatcher = threading.Thread(
|
||||
name='GithubEventDispatcher', target=self.run_event_dispatcher,
|
||||
daemon=True)
|
||||
self._event_forwarder = threading.Thread(
|
||||
name='GithubEventForwarder', target=self.run_event_forwarder,
|
||||
daemon=True)
|
||||
self._thread_pool = concurrent.futures.ThreadPoolExecutor()
|
||||
self._event_forward_queue = NamedQueue("GithubEventForwardQueue")
|
||||
self._event_forward_queue = collections.deque()
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
@@ -665,12 +661,9 @@ class GithubEventConnector:
|
||||
self.event_queue.election.cancel()
|
||||
self._event_dispatcher.join()
|
||||
|
||||
self._event_forward_queue.put(None)
|
||||
self._event_forwarder.join()
|
||||
self._thread_pool.shutdown()
|
||||
|
||||
def start(self):
|
||||
self._event_forwarder.start()
|
||||
self._event_dispatcher.start()
|
||||
|
||||
def _onNewEvent(self):
|
||||
@@ -680,49 +673,80 @@ class GithubEventConnector:
|
||||
|
||||
def run_event_dispatcher(self):
|
||||
self.event_queue.registerEventWatch(self._onNewEvent)
|
||||
# Set the wake event so we get an initial run
|
||||
self._dispatcher_wake_event.set()
|
||||
while not self._stopped:
|
||||
try:
|
||||
self.event_queue.election.run(self._dispatchEvents)
|
||||
self.event_queue.election.run(self._dispatchEventsMain)
|
||||
except Exception:
|
||||
self.log.exception("Exception handling GitHub event:")
|
||||
# In case we caught an exception with events in progress,
|
||||
# reset these in case we run the loop again.
|
||||
self._events_in_progress = set()
|
||||
self._event_forward_queue = collections.deque()
|
||||
|
||||
def _dispatchEvents(self):
|
||||
while not self._stopped:
|
||||
# We need to create a copy of the in-progress set in order to
|
||||
# prevent a race between the dispatcher and forwarder thread.
|
||||
# This could happen if a previously seen event finished and was
|
||||
# removed from the set between start of the iteration and the
|
||||
# in-progress check for this event.
|
||||
in_progress = set(self._events_in_progress)
|
||||
for event in self.event_queue:
|
||||
if event.ack_ref in in_progress:
|
||||
continue
|
||||
etuple = self._eventAsTuple(event)
|
||||
log = get_annotated_logger(self.log, etuple.delivery)
|
||||
log.debug("Github Webhook Received")
|
||||
log.debug("X-Github-Event: %s", etuple.event_type)
|
||||
processor = GithubEventProcessor(self, etuple, event)
|
||||
future = self._thread_pool.submit(processor.run)
|
||||
def _dispatchEventsMain(self):
|
||||
while True:
|
||||
# We can start processing events as long as we're running;
|
||||
# if we are stopping, then we need to continue this loop
|
||||
# until previously processed events are completed but not
|
||||
# start processing any new events.
|
||||
if self._dispatcher_wake_event.is_set() and not self._stopped:
|
||||
self._dispatcher_wake_event.clear()
|
||||
self._dispatchEvents()
|
||||
|
||||
# Events are acknowledged in the event forwarder loop after
|
||||
# pre-processing. This way we can ensure that no events are
|
||||
# lost.
|
||||
self._events_in_progress.add(event.ack_ref)
|
||||
self._event_forward_queue.put(future)
|
||||
# Now process the futures from this or any previous
|
||||
# iterations of the loop.
|
||||
if len(self._event_forward_queue):
|
||||
self._forwardEvents()
|
||||
|
||||
# If there are no futures, we can sleep until there are
|
||||
# new events (or stop altogether); otherwise we need to
|
||||
# continue processing futures.
|
||||
if not len(self._event_forward_queue):
|
||||
if self._stopped:
|
||||
return
|
||||
self._dispatcher_wake_event.wait(10)
|
||||
self._dispatcher_wake_event.clear()
|
||||
self._dispatcher_wake_event.wait(10)
|
||||
else:
|
||||
# Sleep a small amount of time to give the futures
|
||||
# time to complete.
|
||||
self._dispatcher_wake_event.wait(0.1)
|
||||
|
||||
def run_event_forwarder(self):
|
||||
while True:
|
||||
def _dispatchEvents(self):
|
||||
# This is the first half of the event dispatcher. It reads
|
||||
# events from the webhook event queue and passes them to a
|
||||
# concurrent executor for pre-processing.
|
||||
for event in self.event_queue:
|
||||
if self._stopped:
|
||||
return
|
||||
break
|
||||
if event.ack_ref in self._events_in_progress:
|
||||
continue
|
||||
etuple = self._eventAsTuple(event)
|
||||
log = get_annotated_logger(self.log, etuple.delivery)
|
||||
log.debug("Github Webhook Received")
|
||||
log.debug("X-Github-Event: %s", etuple.event_type)
|
||||
processor = GithubEventProcessor(self, etuple, event)
|
||||
future = self._thread_pool.submit(processor.run)
|
||||
|
||||
# Events are acknowledged in the event forwarder loop after
|
||||
# pre-processing. This way we can ensure that no events are
|
||||
# lost.
|
||||
self._events_in_progress.add(event.ack_ref)
|
||||
self._event_forward_queue.append(future)
|
||||
|
||||
def _forwardEvents(self):
|
||||
# This is the second half of the event dispatcher. It
|
||||
# collects pre-processed events from the concurrent executor
|
||||
# and forwards them to the scheduler queues.
|
||||
while True:
|
||||
try:
|
||||
future = self._event_forward_queue.get()
|
||||
if future is None:
|
||||
if not len(self._event_forward_queue):
|
||||
return
|
||||
# Peek at the next event and see if it's done or if we
|
||||
# need to wait for the next loop iteration.
|
||||
if not self._event_forward_queue[0].done():
|
||||
return
|
||||
future = self._event_forward_queue.popleft()
|
||||
event, connection_event = future.result()
|
||||
try:
|
||||
if not event:
|
||||
@@ -740,9 +764,6 @@ class GithubEventConnector:
|
||||
self._events_in_progress.remove(connection_event.ack_ref)
|
||||
except Exception:
|
||||
self.log.exception("Exception moving GitHub event:")
|
||||
finally:
|
||||
# Ack task in forward queue
|
||||
self._event_forward_queue.task_done()
|
||||
|
||||
@staticmethod
|
||||
def _eventAsTuple(event):
|
||||
|
||||
Reference in New Issue
Block a user