Parallelize github event processing

The GitHub driver does an event pre processing before adding a trigger
event to the scheduler. This is currently done single threaded to
ensure that the trigger events are enqueued in order into the
scheduler. A problem is that this pre processing can take a few
seconds which limits the rate of events we can process. In order to
parallelize this while keeping the order of the trigger events we need
to do two things.

We keep consuming the event queue single threaded but instead of
processing and forwarding them directly to the scheduler we process
them in a thread pool and put the futures into a result queue. This
second queue can then again be processed single threaded and maintains
the correct ordering of the events.

Second updating the change cache currently assumes that it runs single
threaded. In order to avoid data races we need to lock this by the
change.

Change-Id: I08f31b99e2a58e51cef0de89edd98c957d7db87f
This commit is contained in:
Tobias Henkel 2019-06-03 20:30:35 +02:00
parent 48c049db79
commit 1fab39cc4b
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
2 changed files with 73 additions and 12 deletions

View File

@ -2584,6 +2584,10 @@ class ZuulTestCase(BaseTestCase):
self.configure_connections()
self.sched.registerConnections(self.connections)
if hasattr(self, 'fake_github'):
self.event_queues.append(
self.fake_github.github_event_connector._event_forward_queue)
self.executor_server = RecordingExecutorServer(
self.config, self.connections,
jobdir_root=self.test_root,

View File

@ -215,19 +215,21 @@ class GithubEventProcessor(object):
self.connector = connector
self.connection = connector.connection
self.ts, self.body, self.event_type, self.delivery = event_tuple
logger = logging.getLogger("zuul.GithubEventConnector")
logger = logging.getLogger("zuul.GithubEventProcessor")
self.zuul_event_id = self.delivery
self.log = get_annotated_logger(logger, self.zuul_event_id)
self.event = None
def run(self):
self.log.debug("Starting event processing, queue length %s",
self.connection.getEventQueueSize())
try:
self._handle_event()
self._process_event()
finally:
self.log.debug("Finished event processing")
return self.event
def _handle_event(self):
def _process_event(self):
if self.connector._stopped:
return
@ -294,8 +296,7 @@ class GithubEventProcessor(object):
event.branch_protected = True
event.project_hostname = self.connection.canonical_hostname
self.connection.logEvent(event)
self.connection.sched.addEvent(event)
self.event = event
def _event_push(self):
base_repo = self.body.get('repository')
@ -470,33 +471,67 @@ class GithubEventProcessor(object):
return user
class GithubEventConnector(threading.Thread):
class GithubEventConnector:
"""Move events from GitHub into the scheduler"""
log = logging.getLogger("zuul.GithubEventConnector")
def __init__(self, connection):
super(GithubEventConnector, self).__init__()
self.daemon = True
self.connection = connection
self._stopped = False
self._event_dispatcher = threading.Thread(
name='GithubEventDispatcher', target=self.run_event_dispatcher,
daemon=True)
self._event_forwarder = threading.Thread(
name='GithubEventForwarder', target=self.run_event_forwarder,
daemon=True)
self._thread_pool = concurrent.futures.ThreadPoolExecutor()
self._event_forward_queue = queue.Queue()
def stop(self):
self._stopped = True
self.connection.addEvent(None)
self._event_dispatcher.join()
def run(self):
self._event_forward_queue.put(None)
self._event_forwarder.join()
self._thread_pool.shutdown()
def start(self):
self._event_forwarder.start()
self._event_dispatcher.start()
def run_event_dispatcher(self):
while True:
if self._stopped:
return
try:
data = self.connection.getEvent()
GithubEventProcessor(self, data).run()
processor = GithubEventProcessor(self, data)
future = self._thread_pool.submit(processor.run)
self._event_forward_queue.put(future)
except Exception:
self.log.exception("Exception moving GitHub event:")
finally:
self.connection.eventDone()
def run_event_forwarder(self):
while True:
if self._stopped:
return
try:
future = self._event_forward_queue.get()
if future is None:
return
event = future.result()
if event:
self.connection.logEvent(event)
self.connection.sched.addEvent(event)
except Exception:
self.log.exception("Exception moving GitHub event:")
finally:
self._event_forward_queue.task_done()
class GithubUser(collections.Mapping):
log = logging.getLogger('zuul.GithubUser')
@ -542,6 +577,7 @@ class GithubConnection(BaseConnection):
super(GithubConnection, self).__init__(
driver, connection_name, connection_config)
self._change_cache = {}
self._change_update_lock = {}
self._project_branch_cache_include_unprotected = {}
self._project_branch_cache_exclude_unprotected = {}
self.projects = {}
@ -644,7 +680,6 @@ class GithubConnection(BaseConnection):
def _stop_event_connector(self):
if self.github_event_connector:
self.github_event_connector.stop()
self.github_event_connector.join()
def _createGithubClient(self, zuul_event_id=None):
if self.server != 'github.com':
@ -917,8 +952,30 @@ class GithubConnection(BaseConnection):
change.number = number
change.patchset = patchset
self._change_cache[key] = change
try:
# This can be called multi-threaded during github event
# preprocessing. In order to avoid data races perform locking
# by cached key. Try to acquire the lock non-blocking at first.
# If the lock is already taken we're currently updating the very
# same chnange right now and would likely get the same data again.
lock = self._change_update_lock.setdefault(key, threading.Lock())
if lock.acquire(blocking=False):
try:
self._updateChange(change, event)
finally:
# We need to remove the lock here again so we don't leak
# them.
lock.release()
del self._change_update_lock[key]
else:
# We didn't get the lock so we don't need to update the same
# change again, but to be correct we should at least wait until
# the other thread is done updating the change.
log = get_annotated_logger(self.log, event)
log.debug("Change %s is currently being updated, "
"waiting for it to finish", change)
with lock:
log.debug('Finished updating change %s', change)
except Exception:
if key in self._change_cache:
del self._change_cache[key]