Parallelize github event processing
The GitHub driver does an event pre processing before adding a trigger event to the scheduler. This is currently done single threaded to ensure that the trigger events are enqueued in order into the scheduler. A problem is that this pre processing can take a few seconds which limits the rate of events we can process. In order to parallelize this while keeping the order of the trigger events we need to do two things. We keep consuming the event queue single threaded but instead of processing and forwarding them directly to the scheduler we process them in a thread pool and put the futures into a result queue. This second queue can then again be processed single threaded and maintains the correct ordering of the events. Second updating the change cache currently assumes that it runs single threaded. In order to avoid data races we need to lock this by the change. In order to avoid triggering the abuse detection algorithms of GitHub limit the parallel processing per installation id. This behavior can be adjusted by an undocumented switch 'max_threads_per_installation' which can be used to relax this restriction in case this is needed (e.g. for large installations that still would result in queueing). Change-Id: I352dd452dfe08b09451c88b047d3de190e4e6bd8
This commit is contained in:
parent
2029ac77ba
commit
3e98c70127
|
@ -2913,6 +2913,10 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.configure_connections()
|
||||
self.sched.registerConnections(self.connections)
|
||||
|
||||
if hasattr(self, 'fake_github'):
|
||||
self.event_queues.append(
|
||||
self.fake_github.github_event_connector._event_forward_queue)
|
||||
|
||||
self.executor_server = RecordingExecutorServer(
|
||||
self.config, self.connections,
|
||||
jobdir_root=self.test_root,
|
||||
|
|
|
@ -364,19 +364,21 @@ class GithubEventProcessor(object):
|
|||
self.connector = connector
|
||||
self.connection = connector.connection
|
||||
self.ts, self.body, self.event_type, self.delivery = event_tuple
|
||||
logger = logging.getLogger("zuul.GithubEventConnector")
|
||||
logger = logging.getLogger("zuul.GithubEventProcessor")
|
||||
self.zuul_event_id = self.delivery
|
||||
self.log = get_annotated_logger(logger, self.zuul_event_id)
|
||||
self.event = None
|
||||
|
||||
def run(self):
|
||||
self.log.debug("Starting event processing, queue length %s",
|
||||
self.connection.getEventQueueSize())
|
||||
try:
|
||||
self._handle_event()
|
||||
self._process_event()
|
||||
finally:
|
||||
self.log.debug("Finished event processing")
|
||||
return self.event
|
||||
|
||||
def _handle_event(self):
|
||||
def _process_event(self):
|
||||
if self.connector._stopped:
|
||||
return
|
||||
|
||||
|
@ -415,38 +417,42 @@ class GithubEventProcessor(object):
|
|||
self.log.exception('Exception when handling event:')
|
||||
|
||||
if event:
|
||||
event.delivery = self.delivery
|
||||
event.zuul_event_id = self.delivery
|
||||
project = self.connection.source.getProject(event.project_name)
|
||||
if event.change_number:
|
||||
self.connection._getChange(project,
|
||||
event.change_number,
|
||||
event.patch_number,
|
||||
refresh=True,
|
||||
event=event)
|
||||
self.log.debug("Refreshed change %s,%s",
|
||||
event.change_number, event.patch_number)
|
||||
|
||||
# If this event references a branch and we're excluding unprotected
|
||||
# branches, we might need to check whether the branch is now
|
||||
# protected.
|
||||
if event.branch:
|
||||
b = self.connection.getBranch(project.name, event.branch)
|
||||
if b is not None:
|
||||
branch_protected = b.get('protected')
|
||||
self.connection.checkBranchCache(
|
||||
project, event.branch, branch_protected, self.log)
|
||||
event.branch_protected = branch_protected
|
||||
else:
|
||||
# This can happen if the branch was deleted in GitHub. In
|
||||
# this case we assume that the branch COULD have been
|
||||
# protected before. The cache update is handled by the
|
||||
# push event, so we don't touch the cache here again.
|
||||
event.branch_protected = True
|
||||
# Note we limit parallel requests per installation id to avoid
|
||||
# triggering abuse detection.
|
||||
with self.connection.get_request_lock(installation_id):
|
||||
event.delivery = self.delivery
|
||||
event.zuul_event_id = self.delivery
|
||||
project = self.connection.source.getProject(event.project_name)
|
||||
if event.change_number:
|
||||
self.connection._getChange(project,
|
||||
event.change_number,
|
||||
event.patch_number,
|
||||
refresh=True,
|
||||
event=event)
|
||||
self.log.debug("Refreshed change %s,%s",
|
||||
event.change_number, event.patch_number)
|
||||
|
||||
# If this event references a branch and we're excluding
|
||||
# unprotected branches, we might need to check whether the
|
||||
# branch is now protected.
|
||||
if event.branch:
|
||||
b = self.connection.getBranch(project.name, event.branch)
|
||||
if b is not None:
|
||||
branch_protected = b.get('protected')
|
||||
self.connection.checkBranchCache(
|
||||
project, event.branch, branch_protected, self.log)
|
||||
event.branch_protected = branch_protected
|
||||
else:
|
||||
# This can happen if the branch was deleted in GitHub.
|
||||
# In this case we assume that the branch COULD have
|
||||
# been protected before. The cache update is handled by
|
||||
# the push event, so we don't touch the cache here
|
||||
# again.
|
||||
event.branch_protected = True
|
||||
|
||||
event.project_hostname = self.connection.canonical_hostname
|
||||
self.connection.logEvent(event)
|
||||
self.connection.sched.addEvent(event)
|
||||
self.event = event
|
||||
|
||||
def _event_push(self):
|
||||
base_repo = self.body.get('repository')
|
||||
|
@ -621,33 +627,67 @@ class GithubEventProcessor(object):
|
|||
return user
|
||||
|
||||
|
||||
class GithubEventConnector(threading.Thread):
|
||||
class GithubEventConnector:
|
||||
"""Move events from GitHub into the scheduler"""
|
||||
|
||||
log = logging.getLogger("zuul.GithubEventConnector")
|
||||
|
||||
def __init__(self, connection):
|
||||
super(GithubEventConnector, self).__init__()
|
||||
self.daemon = True
|
||||
self.connection = connection
|
||||
self._stopped = False
|
||||
self._event_dispatcher = threading.Thread(
|
||||
name='GithubEventDispatcher', target=self.run_event_dispatcher,
|
||||
daemon=True)
|
||||
self._event_forwarder = threading.Thread(
|
||||
name='GithubEventForwarder', target=self.run_event_forwarder,
|
||||
daemon=True)
|
||||
self._thread_pool = concurrent.futures.ThreadPoolExecutor()
|
||||
self._event_forward_queue = queue.Queue()
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
self.connection.addEvent(None)
|
||||
self._event_dispatcher.join()
|
||||
|
||||
def run(self):
|
||||
self._event_forward_queue.put(None)
|
||||
self._event_forwarder.join()
|
||||
self._thread_pool.shutdown()
|
||||
|
||||
def start(self):
|
||||
self._event_forwarder.start()
|
||||
self._event_dispatcher.start()
|
||||
|
||||
def run_event_dispatcher(self):
|
||||
while True:
|
||||
if self._stopped:
|
||||
return
|
||||
try:
|
||||
data = self.connection.getEvent()
|
||||
GithubEventProcessor(self, data).run()
|
||||
processor = GithubEventProcessor(self, data)
|
||||
future = self._thread_pool.submit(processor.run)
|
||||
self._event_forward_queue.put(future)
|
||||
except Exception:
|
||||
self.log.exception("Exception moving GitHub event:")
|
||||
finally:
|
||||
self.connection.eventDone()
|
||||
|
||||
def run_event_forwarder(self):
|
||||
while True:
|
||||
if self._stopped:
|
||||
return
|
||||
try:
|
||||
future = self._event_forward_queue.get()
|
||||
if future is None:
|
||||
return
|
||||
event = future.result()
|
||||
if event:
|
||||
self.connection.logEvent(event)
|
||||
self.connection.sched.addEvent(event)
|
||||
except Exception:
|
||||
self.log.exception("Exception moving GitHub event:")
|
||||
finally:
|
||||
self._event_forward_queue.task_done()
|
||||
|
||||
|
||||
class GithubUser(collections.Mapping):
|
||||
log = logging.getLogger('zuul.GithubUser')
|
||||
|
@ -692,6 +732,7 @@ class GithubConnection(BaseConnection):
|
|||
super(GithubConnection, self).__init__(
|
||||
driver, connection_name, connection_config)
|
||||
self._change_cache = {}
|
||||
self._change_update_lock = {}
|
||||
self._project_branch_cache_include_unprotected = {}
|
||||
self._project_branch_cache_exclude_unprotected = {}
|
||||
self.projects = {}
|
||||
|
@ -703,6 +744,10 @@ class GithubConnection(BaseConnection):
|
|||
self.event_queue = queue.Queue()
|
||||
self._sha_pr_cache = GithubShaCache()
|
||||
|
||||
self._request_locks = {}
|
||||
self.max_threads_per_installation = int(self.connection_config.get(
|
||||
'max_threads_per_installation', 1))
|
||||
|
||||
# Logging of rate limit is optional as this does additional requests
|
||||
rate_limit_logging = self.connection_config.get(
|
||||
'rate_limit_logging', 'true')
|
||||
|
@ -794,7 +839,6 @@ class GithubConnection(BaseConnection):
|
|||
def _stop_event_connector(self):
|
||||
if self.github_event_connector:
|
||||
self.github_event_connector.stop()
|
||||
self.github_event_connector.join()
|
||||
|
||||
def _createGithubClient(self, zuul_event_id=None):
|
||||
if self.server != 'github.com':
|
||||
|
@ -989,6 +1033,11 @@ class GithubConnection(BaseConnection):
|
|||
for project_name in project_names:
|
||||
self.installation_map[project_name] = inst_id
|
||||
|
||||
def get_request_lock(self, installation_id):
|
||||
return self._request_locks.setdefault(
|
||||
installation_id, threading.Semaphore(
|
||||
value=self.max_threads_per_installation))
|
||||
|
||||
def addEvent(self, data, event=None, delivery=None):
|
||||
return self.event_queue.put((time.time(), data, event, delivery))
|
||||
|
||||
|
@ -1076,7 +1125,29 @@ class GithubConnection(BaseConnection):
|
|||
change.patchset = patchset
|
||||
self._change_cache[key] = change
|
||||
try:
|
||||
self._updateChange(change, event)
|
||||
# This can be called multi-threaded during github event
|
||||
# preprocessing. In order to avoid data races perform locking
|
||||
# by cached key. Try to acquire the lock non-blocking at first.
|
||||
# If the lock is already taken we're currently updating the very
|
||||
# same chnange right now and would likely get the same data again.
|
||||
lock = self._change_update_lock.setdefault(key, threading.Lock())
|
||||
if lock.acquire(blocking=False):
|
||||
try:
|
||||
self._updateChange(change, event)
|
||||
finally:
|
||||
# We need to remove the lock here again so we don't leak
|
||||
# them.
|
||||
lock.release()
|
||||
del self._change_update_lock[key]
|
||||
else:
|
||||
# We didn't get the lock so we don't need to update the same
|
||||
# change again, but to be correct we should at least wait until
|
||||
# the other thread is done updating the change.
|
||||
log = get_annotated_logger(self.log, event)
|
||||
log.debug("Change %s is currently being updated, "
|
||||
"waiting for it to finish", change)
|
||||
with lock:
|
||||
log.debug('Finished updating change %s', change)
|
||||
except Exception:
|
||||
if key in self._change_cache:
|
||||
del self._change_cache[key]
|
||||
|
|
Loading…
Reference in New Issue