From 1fab39cc4b131505adc23d19faf112981f122f01 Mon Sep 17 00:00:00 2001 From: Tobias Henkel Date: Mon, 3 Jun 2019 20:30:35 +0200 Subject: [PATCH] Parallelize github event processing The GitHub driver does an event pre processing before adding a trigger event to the scheduler. This is currently done single threaded to ensure that the trigger events are enqueued in order into the scheduler. A problem is that this pre processing can take a few seconds which limits the rate of events we can process. In order to parallelize this while keeping the order of the trigger events we need to do two things. We keep consuming the event queue single threaded but instead of processing and forwarding them directly to the scheduler we process them in a thread pool and put the futures into a result queue. This second queue can then again be processed single threaded and maintains the correct ordering of the events. Second updating the change cache currently assumes that it runs single threaded. In order to avoid data races we need to lock this by the change. Change-Id: I08f31b99e2a58e51cef0de89edd98c957d7db87f --- tests/base.py | 4 ++ zuul/driver/github/githubconnection.py | 81 ++++++++++++++++++++++---- 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/tests/base.py b/tests/base.py index e396be8d5a..bd2d3176d6 100644 --- a/tests/base.py +++ b/tests/base.py @@ -2584,6 +2584,10 @@ class ZuulTestCase(BaseTestCase): self.configure_connections() self.sched.registerConnections(self.connections) + if hasattr(self, 'fake_github'): + self.event_queues.append( + self.fake_github.github_event_connector._event_forward_queue) + self.executor_server = RecordingExecutorServer( self.config, self.connections, jobdir_root=self.test_root, diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 69bc8fffef..e30a1af5b1 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -215,19 +215,21 @@ class GithubEventProcessor(object): self.connector = connector self.connection = connector.connection self.ts, self.body, self.event_type, self.delivery = event_tuple - logger = logging.getLogger("zuul.GithubEventConnector") + logger = logging.getLogger("zuul.GithubEventProcessor") self.zuul_event_id = self.delivery self.log = get_annotated_logger(logger, self.zuul_event_id) + self.event = None def run(self): self.log.debug("Starting event processing, queue length %s", self.connection.getEventQueueSize()) try: - self._handle_event() + self._process_event() finally: self.log.debug("Finished event processing") + return self.event - def _handle_event(self): + def _process_event(self): if self.connector._stopped: return @@ -294,8 +296,7 @@ class GithubEventProcessor(object): event.branch_protected = True event.project_hostname = self.connection.canonical_hostname - self.connection.logEvent(event) - self.connection.sched.addEvent(event) + self.event = event def _event_push(self): base_repo = self.body.get('repository') @@ -470,33 +471,67 @@ class GithubEventProcessor(object): return user -class GithubEventConnector(threading.Thread): +class GithubEventConnector: """Move events from GitHub into the scheduler""" log = logging.getLogger("zuul.GithubEventConnector") def __init__(self, connection): - super(GithubEventConnector, self).__init__() - self.daemon = True self.connection = connection self._stopped = False + self._event_dispatcher = threading.Thread( + name='GithubEventDispatcher', target=self.run_event_dispatcher, + daemon=True) + self._event_forwarder = threading.Thread( + name='GithubEventForwarder', target=self.run_event_forwarder, + daemon=True) + self._thread_pool = concurrent.futures.ThreadPoolExecutor() + self._event_forward_queue = queue.Queue() def stop(self): self._stopped = True self.connection.addEvent(None) + self._event_dispatcher.join() - def run(self): + self._event_forward_queue.put(None) + self._event_forwarder.join() + self._thread_pool.shutdown() + + def start(self): + self._event_forwarder.start() + self._event_dispatcher.start() + + def run_event_dispatcher(self): while True: if self._stopped: return try: data = self.connection.getEvent() - GithubEventProcessor(self, data).run() + processor = GithubEventProcessor(self, data) + future = self._thread_pool.submit(processor.run) + self._event_forward_queue.put(future) except Exception: self.log.exception("Exception moving GitHub event:") finally: self.connection.eventDone() + def run_event_forwarder(self): + while True: + if self._stopped: + return + try: + future = self._event_forward_queue.get() + if future is None: + return + event = future.result() + if event: + self.connection.logEvent(event) + self.connection.sched.addEvent(event) + except Exception: + self.log.exception("Exception moving GitHub event:") + finally: + self._event_forward_queue.task_done() + class GithubUser(collections.Mapping): log = logging.getLogger('zuul.GithubUser') @@ -542,6 +577,7 @@ class GithubConnection(BaseConnection): super(GithubConnection, self).__init__( driver, connection_name, connection_config) self._change_cache = {} + self._change_update_lock = {} self._project_branch_cache_include_unprotected = {} self._project_branch_cache_exclude_unprotected = {} self.projects = {} @@ -644,7 +680,6 @@ class GithubConnection(BaseConnection): def _stop_event_connector(self): if self.github_event_connector: self.github_event_connector.stop() - self.github_event_connector.join() def _createGithubClient(self, zuul_event_id=None): if self.server != 'github.com': @@ -918,7 +953,29 @@ class GithubConnection(BaseConnection): change.patchset = patchset self._change_cache[key] = change try: - self._updateChange(change, event) + # This can be called multi-threaded during github event + # preprocessing. In order to avoid data races perform locking + # by cached key. Try to acquire the lock non-blocking at first. + # If the lock is already taken we're currently updating the very + # same chnange right now and would likely get the same data again. + lock = self._change_update_lock.setdefault(key, threading.Lock()) + if lock.acquire(blocking=False): + try: + self._updateChange(change, event) + finally: + # We need to remove the lock here again so we don't leak + # them. + lock.release() + del self._change_update_lock[key] + else: + # We didn't get the lock so we don't need to update the same + # change again, but to be correct we should at least wait until + # the other thread is done updating the change. + log = get_annotated_logger(self.log, event) + log.debug("Change %s is currently being updated, " + "waiting for it to finish", change) + with lock: + log.debug('Finished updating change %s', change) except Exception: if key in self._change_cache: del self._change_cache[key]