Merge "Parallelize github event processing"
This commit is contained in:
commit
65e76d47d6
|
@ -2916,6 +2916,10 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.configure_connections()
|
||||
self.sched.registerConnections(self.connections)
|
||||
|
||||
if hasattr(self, 'fake_github'):
|
||||
self.event_queues.append(
|
||||
self.fake_github.github_event_connector._event_forward_queue)
|
||||
|
||||
self.executor_server = RecordingExecutorServer(
|
||||
self.config, self.connections,
|
||||
jobdir_root=self.test_root,
|
||||
|
|
|
@ -364,19 +364,21 @@ class GithubEventProcessor(object):
|
|||
self.connector = connector
|
||||
self.connection = connector.connection
|
||||
self.ts, self.body, self.event_type, self.delivery = event_tuple
|
||||
logger = logging.getLogger("zuul.GithubEventConnector")
|
||||
logger = logging.getLogger("zuul.GithubEventProcessor")
|
||||
self.zuul_event_id = self.delivery
|
||||
self.log = get_annotated_logger(logger, self.zuul_event_id)
|
||||
self.event = None
|
||||
|
||||
def run(self):
|
||||
self.log.debug("Starting event processing, queue length %s",
|
||||
self.connection.getEventQueueSize())
|
||||
try:
|
||||
self._handle_event()
|
||||
self._process_event()
|
||||
finally:
|
||||
self.log.debug("Finished event processing")
|
||||
return self.event
|
||||
|
||||
def _handle_event(self):
|
||||
def _process_event(self):
|
||||
if self.connector._stopped:
|
||||
return
|
||||
|
||||
|
@ -415,38 +417,42 @@ class GithubEventProcessor(object):
|
|||
self.log.exception('Exception when handling event:')
|
||||
|
||||
if event:
|
||||
event.delivery = self.delivery
|
||||
event.zuul_event_id = self.delivery
|
||||
project = self.connection.source.getProject(event.project_name)
|
||||
if event.change_number:
|
||||
self.connection._getChange(project,
|
||||
event.change_number,
|
||||
event.patch_number,
|
||||
refresh=True,
|
||||
event=event)
|
||||
self.log.debug("Refreshed change %s,%s",
|
||||
event.change_number, event.patch_number)
|
||||
|
||||
# If this event references a branch and we're excluding unprotected
|
||||
# branches, we might need to check whether the branch is now
|
||||
# protected.
|
||||
if event.branch:
|
||||
b = self.connection.getBranch(project.name, event.branch)
|
||||
if b is not None:
|
||||
branch_protected = b.get('protected')
|
||||
self.connection.checkBranchCache(
|
||||
project, event.branch, branch_protected, self.log)
|
||||
event.branch_protected = branch_protected
|
||||
else:
|
||||
# This can happen if the branch was deleted in GitHub. In
|
||||
# this case we assume that the branch COULD have been
|
||||
# protected before. The cache update is handled by the
|
||||
# push event, so we don't touch the cache here again.
|
||||
event.branch_protected = True
|
||||
# Note we limit parallel requests per installation id to avoid
|
||||
# triggering abuse detection.
|
||||
with self.connection.get_request_lock(installation_id):
|
||||
event.delivery = self.delivery
|
||||
event.zuul_event_id = self.delivery
|
||||
project = self.connection.source.getProject(event.project_name)
|
||||
if event.change_number:
|
||||
self.connection._getChange(project,
|
||||
event.change_number,
|
||||
event.patch_number,
|
||||
refresh=True,
|
||||
event=event)
|
||||
self.log.debug("Refreshed change %s,%s",
|
||||
event.change_number, event.patch_number)
|
||||
|
||||
# If this event references a branch and we're excluding
|
||||
# unprotected branches, we might need to check whether the
|
||||
# branch is now protected.
|
||||
if event.branch:
|
||||
b = self.connection.getBranch(project.name, event.branch)
|
||||
if b is not None:
|
||||
branch_protected = b.get('protected')
|
||||
self.connection.checkBranchCache(
|
||||
project, event.branch, branch_protected, self.log)
|
||||
event.branch_protected = branch_protected
|
||||
else:
|
||||
# This can happen if the branch was deleted in GitHub.
|
||||
# In this case we assume that the branch COULD have
|
||||
# been protected before. The cache update is handled by
|
||||
# the push event, so we don't touch the cache here
|
||||
# again.
|
||||
event.branch_protected = True
|
||||
|
||||
event.project_hostname = self.connection.canonical_hostname
|
||||
self.connection.logEvent(event)
|
||||
self.connection.sched.addEvent(event)
|
||||
self.event = event
|
||||
|
||||
def _event_push(self):
|
||||
base_repo = self.body.get('repository')
|
||||
|
@ -621,33 +627,67 @@ class GithubEventProcessor(object):
|
|||
return user
|
||||
|
||||
|
||||
class GithubEventConnector(threading.Thread):
|
||||
class GithubEventConnector:
|
||||
"""Move events from GitHub into the scheduler"""
|
||||
|
||||
log = logging.getLogger("zuul.GithubEventConnector")
|
||||
|
||||
def __init__(self, connection):
|
||||
super(GithubEventConnector, self).__init__()
|
||||
self.daemon = True
|
||||
self.connection = connection
|
||||
self._stopped = False
|
||||
self._event_dispatcher = threading.Thread(
|
||||
name='GithubEventDispatcher', target=self.run_event_dispatcher,
|
||||
daemon=True)
|
||||
self._event_forwarder = threading.Thread(
|
||||
name='GithubEventForwarder', target=self.run_event_forwarder,
|
||||
daemon=True)
|
||||
self._thread_pool = concurrent.futures.ThreadPoolExecutor()
|
||||
self._event_forward_queue = queue.Queue()
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
self.connection.addEvent(None)
|
||||
self._event_dispatcher.join()
|
||||
|
||||
def run(self):
|
||||
self._event_forward_queue.put(None)
|
||||
self._event_forwarder.join()
|
||||
self._thread_pool.shutdown()
|
||||
|
||||
def start(self):
|
||||
self._event_forwarder.start()
|
||||
self._event_dispatcher.start()
|
||||
|
||||
def run_event_dispatcher(self):
|
||||
while True:
|
||||
if self._stopped:
|
||||
return
|
||||
try:
|
||||
data = self.connection.getEvent()
|
||||
GithubEventProcessor(self, data).run()
|
||||
processor = GithubEventProcessor(self, data)
|
||||
future = self._thread_pool.submit(processor.run)
|
||||
self._event_forward_queue.put(future)
|
||||
except Exception:
|
||||
self.log.exception("Exception moving GitHub event:")
|
||||
finally:
|
||||
self.connection.eventDone()
|
||||
|
||||
def run_event_forwarder(self):
|
||||
while True:
|
||||
if self._stopped:
|
||||
return
|
||||
try:
|
||||
future = self._event_forward_queue.get()
|
||||
if future is None:
|
||||
return
|
||||
event = future.result()
|
||||
if event:
|
||||
self.connection.logEvent(event)
|
||||
self.connection.sched.addEvent(event)
|
||||
except Exception:
|
||||
self.log.exception("Exception moving GitHub event:")
|
||||
finally:
|
||||
self._event_forward_queue.task_done()
|
||||
|
||||
|
||||
class GithubUser(collections.Mapping):
|
||||
log = logging.getLogger('zuul.GithubUser')
|
||||
|
@ -692,6 +732,7 @@ class GithubConnection(BaseConnection):
|
|||
super(GithubConnection, self).__init__(
|
||||
driver, connection_name, connection_config)
|
||||
self._change_cache = {}
|
||||
self._change_update_lock = {}
|
||||
self._project_branch_cache_include_unprotected = {}
|
||||
self._project_branch_cache_exclude_unprotected = {}
|
||||
self.projects = {}
|
||||
|
@ -703,6 +744,10 @@ class GithubConnection(BaseConnection):
|
|||
self.event_queue = queue.Queue()
|
||||
self._sha_pr_cache = GithubShaCache()
|
||||
|
||||
self._request_locks = {}
|
||||
self.max_threads_per_installation = int(self.connection_config.get(
|
||||
'max_threads_per_installation', 1))
|
||||
|
||||
# Logging of rate limit is optional as this does additional requests
|
||||
rate_limit_logging = self.connection_config.get(
|
||||
'rate_limit_logging', 'true')
|
||||
|
@ -794,7 +839,6 @@ class GithubConnection(BaseConnection):
|
|||
def _stop_event_connector(self):
|
||||
if self.github_event_connector:
|
||||
self.github_event_connector.stop()
|
||||
self.github_event_connector.join()
|
||||
|
||||
def _createGithubClient(self, zuul_event_id=None):
|
||||
if self.server != 'github.com':
|
||||
|
@ -989,6 +1033,11 @@ class GithubConnection(BaseConnection):
|
|||
for project_name in project_names:
|
||||
self.installation_map[project_name] = inst_id
|
||||
|
||||
def get_request_lock(self, installation_id):
|
||||
return self._request_locks.setdefault(
|
||||
installation_id, threading.Semaphore(
|
||||
value=self.max_threads_per_installation))
|
||||
|
||||
def addEvent(self, data, event=None, delivery=None):
|
||||
return self.event_queue.put((time.time(), data, event, delivery))
|
||||
|
||||
|
@ -1076,7 +1125,29 @@ class GithubConnection(BaseConnection):
|
|||
change.patchset = patchset
|
||||
self._change_cache[key] = change
|
||||
try:
|
||||
self._updateChange(change, event)
|
||||
# This can be called multi-threaded during github event
|
||||
# preprocessing. In order to avoid data races perform locking
|
||||
# by cached key. Try to acquire the lock non-blocking at first.
|
||||
# If the lock is already taken we're currently updating the very
|
||||
# same chnange right now and would likely get the same data again.
|
||||
lock = self._change_update_lock.setdefault(key, threading.Lock())
|
||||
if lock.acquire(blocking=False):
|
||||
try:
|
||||
self._updateChange(change, event)
|
||||
finally:
|
||||
# We need to remove the lock here again so we don't leak
|
||||
# them.
|
||||
lock.release()
|
||||
del self._change_update_lock[key]
|
||||
else:
|
||||
# We didn't get the lock so we don't need to update the same
|
||||
# change again, but to be correct we should at least wait until
|
||||
# the other thread is done updating the change.
|
||||
log = get_annotated_logger(self.log, event)
|
||||
log.debug("Change %s is currently being updated, "
|
||||
"waiting for it to finish", change)
|
||||
with lock:
|
||||
log.debug('Finished updating change %s', change)
|
||||
except Exception:
|
||||
if key in self._change_cache:
|
||||
del self._change_cache[key]
|
||||
|
|
Loading…
Reference in New Issue