From 2e6cfff81824617f2a3d358e920cc71653f6b51e Mon Sep 17 00:00:00 2001 From: Simon Westphahl Date: Tue, 3 Nov 2020 15:24:51 +0100 Subject: [PATCH] Switch to Zookeeper backed trigger event queues Trigger events will now be dispatched via Zookeeper. The event queues are namespaced by tenant since the event processing will later require a tenant lock in a multi scheduler deployment. Gitlab events hold their labels as a non-serializable set attribute; this change adjusts them to be held in a list (but set operations are still used for de-duplication). Change-Id: Ie54fc16488ab8cbc15f97d003f36c12b8a648ed4 --- tests/base.py | 21 +- tests/unit/test_zuultrigger.py | 77 +++---- zuul/cmd/scheduler.py | 2 - zuul/driver/gerrit/gerritconnection.py | 4 +- zuul/driver/git/gitconnection.py | 2 +- zuul/driver/github/githubconnection.py | 20 +- zuul/driver/gitlab/gitlabconnection.py | 6 +- zuul/driver/gitlab/gitlabmodel.py | 2 +- zuul/driver/pagure/pagureconnection.py | 4 +- zuul/driver/timer/__init__.py | 2 +- zuul/driver/zuul/__init__.py | 4 +- zuul/scheduler.py | 308 ++++++++++++------------- 12 files changed, 224 insertions(+), 228 deletions(-) diff --git a/tests/base.py b/tests/base.py index d2b2331d83..4a1cbe1f09 100644 --- a/tests/base.py +++ b/tests/base.py @@ -4032,7 +4032,6 @@ class SchedulerTestApp: self.event_queues = [ self.sched.result_event_queue, - self.sched.trigger_event_queue, self.sched.management_event_queue ] @@ -4041,7 +4040,6 @@ class SchedulerTestApp: self.sched.executor.gearman.waitForServer() self.sched.reconfigure( self.config, validate_tenants=validate_tenants) - self.sched.wakeUp() def fullReconfigure(self): try: @@ -4916,6 +4914,18 @@ class ZuulTestCase(BaseTestCase): for event_queue in self.additional_event_queues: event_queue.join() + def __areZooKeeperEventQueuesEmpty(self, matcher=None) -> bool: + for sched in map(lambda app: app.sched, self.scheds.filter(matcher)): + if sched.trigger_events.hasEvents(): + return False + for tenant in sched.abide.tenants.values(): + for pipeline_name in tenant.layout.pipelines: + if sched.pipeline_trigger_events[tenant.name][ + pipeline_name + ].hasEvents(): + return False + return True + def waitUntilSettled(self, msg="", matcher=None) -> None: self.log.debug("Waiting until settled... (%s)", msg) start = time.time() @@ -4928,6 +4938,10 @@ class ZuulTestCase(BaseTestCase): for event_queue in self.__event_queues(matcher): self.log.error(" %s: %s" % (event_queue, event_queue.empty())) + self.log.error( + "All ZK event queues empty: %s", + self.__areZooKeeperEventQueuesEmpty(matcher), + ) self.log.error("All builds waiting: %s" % (self.__areAllBuildsWaiting(matcher),)) self.log.error("All merge jobs waiting: %s" % @@ -4953,7 +4967,8 @@ class ZuulTestCase(BaseTestCase): self.__eventQueuesJoin(matcher) self.scheds.execute( lambda app: app.sched.run_handler_lock.acquire()) - if (self.__areAllMergeJobsWaiting(matcher) and + if (self.__areZooKeeperEventQueuesEmpty(matcher) and + self.__areAllMergeJobsWaiting(matcher) and self.__haveAllBuildsReported(matcher) and self.__areAllBuildsWaiting(matcher) and self.__areAllNodeRequestsComplete(matcher) and diff --git a/tests/unit/test_zuultrigger.py b/tests/unit/test_zuultrigger.py index 00ee2efdef..f25d75c53b 100644 --- a/tests/unit/test_zuultrigger.py +++ b/tests/unit/test_zuultrigger.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. +from unittest import mock + from tests.base import ZuulTestCase, ZuulGithubAppTestCase from zuul.driver.zuul.zuulmodel import ZuulTriggerEvent @@ -63,29 +65,24 @@ class TestZuulTriggerParentChangeEnqueued(ZuulTestCase): # Now directly enqueue a change into the check. As no pipeline reacts # on parent-change-enqueued from pipeline check no # parent-change-enqueued event is expected. - zuultrigger_event_count = 0 + _add_trigger_event = self.scheds.first.sched.addTriggerEvent - def counting_put(*args, **kwargs): - nonlocal zuultrigger_event_count - if isinstance(args[0], ZuulTriggerEvent): - zuultrigger_event_count += 1 - self.scheds.first.sched.trigger_event_queue\ - .put_orig(*args, **kwargs) + def addTriggerEvent(driver_name, event): + self.assertNotIsInstance(event, ZuulTriggerEvent) + _add_trigger_event(driver_name, event) - self.scheds.first.sched.trigger_event_queue.put_orig = \ - self.scheds.first.sched.trigger_event_queue.put - self.scheds.first.sched.trigger_event_queue.put = counting_put + with mock.patch.object( + self.scheds.first.sched, "addTriggerEvent", addTriggerEvent + ): + C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') + C.addApproval('Verified', -1) + D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D') + D.addApproval('Verified', -1) + D.setDependsOn(C, 1) + self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1)) - C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') - C.addApproval('Verified', -1) - D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D') - D.addApproval('Verified', -1) - D.setDependsOn(C, 1) - self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1)) - - self.waitUntilSettled() - self.assertEqual(len(self.history), 4) - self.assertEqual(zuultrigger_event_count, 0) + self.waitUntilSettled() + self.assertEqual(len(self.history), 4) class TestZuulTriggerParentChangeEnqueuedGithub(ZuulGithubAppTestCase): @@ -145,31 +142,31 @@ class TestZuulTriggerParentChangeEnqueuedGithub(ZuulGithubAppTestCase): # on parent-change-enqueued from pipeline check no # parent-change-enqueued event is expected. self.waitUntilSettled() - zuultrigger_event_count = 0 - def counting_put(*args, **kwargs): - nonlocal zuultrigger_event_count - if isinstance(args[0], ZuulTriggerEvent): - zuultrigger_event_count += 1 - self.scheds.first.sched.trigger_event_queue\ - .put_orig(*args, **kwargs) + _add_trigger_event = self.scheds.first.sched.addTriggerEvent - self.scheds.first.sched.trigger_event_queue.put_orig = \ - self.scheds.first.sched.trigger_event_queue.put - self.scheds.first.sched.trigger_event_queue.put = counting_put + def addTriggerEvent(driver_name, event): + self.assertNotIsInstance(event, ZuulTriggerEvent) + _add_trigger_event(driver_name, event) - C = self.fake_github.openFakePullRequest('org/project', 'master', 'C') - C.addLabel('for-check') # should go to check + with mock.patch.object( + self.scheds.first.sched, "addTriggerEvent", addTriggerEvent + ): + C = self.fake_github.openFakePullRequest( + 'org/project', 'master', 'C' + ) + C.addLabel('for-check') # should go to check - msg = "Depends-On: https://github.com/org/project1/pull/%s" % C.number - D = self.fake_github.openFakePullRequest( - 'org/project', 'master', 'D', body=msg) - D.addLabel('for-check') # should go to check - self.fake_github.emitEvent(C.getPullRequestOpenedEvent()) + msg = "Depends-On: https://github.com/org/project1/pull/{}".format( + C.number + ) + D = self.fake_github.openFakePullRequest( + 'org/project', 'master', 'D', body=msg) + D.addLabel('for-check') # should go to check + self.fake_github.emitEvent(C.getPullRequestOpenedEvent()) - self.waitUntilSettled() - self.assertEqual(len(self.history), 4) - self.assertEqual(zuultrigger_event_count, 0) + self.waitUntilSettled() + self.assertEqual(len(self.history), 4) # After starting recording installation containing org2/project # should not be contacted diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py index d1ec602147..18928f724e 100755 --- a/zuul/cmd/scheduler.py +++ b/zuul/cmd/scheduler.py @@ -71,7 +71,6 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): self.log.exception("Reconfiguration failed:") def exit_handler(self, signum, frame): - self.sched.exit() self.sched.join() self.stop_gear_server() sys.exit(0) @@ -146,7 +145,6 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): self.sched.start() self.sched.reconfigure(self.config, validate_tenants=self.args.validate_tenants) - self.sched.wakeUp() except Exception: self.log.exception("Error starting Zuul:") # TODO(jeblair): If we had all threads marked as daemon, diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 1d0bab26fc..d815a8b45b 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -258,7 +258,9 @@ class GerritEventConnector(threading.Thread): self._getChange(event) self.connection.logEvent(event) - self.connection.sched.addEvent(event) + self.connection.sched.addTriggerEvent( + self.connection.driver_name, event + ) def _getChange(self, event): # Grab the change if we are managing the project or if it exists in the diff --git a/zuul/driver/git/gitconnection.py b/zuul/driver/git/gitconnection.py index a20ef7df97..be0992419f 100644 --- a/zuul/driver/git/gitconnection.py +++ b/zuul/driver/git/gitconnection.py @@ -151,7 +151,7 @@ class GitConnection(BaseConnection): self.getChange(event) self.logEvent(event) # Pass the event to the scheduler - self.sched.addEvent(event) + self.sched.addTriggerEvent(self.driver_name, event) def onLoad(self): self.log.debug("Starting Git Watcher") diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 1e047f4683..b2657b145a 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -694,13 +694,7 @@ class GithubEventProcessor(object): return event def _get_sender(self, body): - login = body.get('sender').get('login') - if login: - # TODO(tobiash): it might be better to plumb in the installation id - project = body.get('repository', {}).get('full_name') - user = self.connection.getUser(login, project) - self.log.debug("Got user %s", user) - return user + return body.get('sender').get('login') class GithubEventConnector: @@ -756,9 +750,15 @@ class GithubEventConnector: if future is None: return event = future.result() - if event: - self.connection.logEvent(event) - self.connection.sched.addEvent(event) + if not event: + return + self.connection.logEvent(event) + if isinstance(event, DequeueEvent): + self.connection.sched.addManagementEvent(event) + else: + self.connection.sched.addTriggerEvent( + self.connection.driver_name, event + ) except Exception: self.log.exception("Exception moving GitHub event:") finally: diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py index 9e36f71b6d..d8a654fbef 100644 --- a/zuul/driver/gitlab/gitlabconnection.py +++ b/zuul/driver/gitlab/gitlabconnection.py @@ -146,7 +146,7 @@ class GitlabEventConnector(threading.Thread): label["title"] for label in body["changes"]["labels"]["current"]] new_labels = set(current_labels) - set(previous_labels) - event.labels = new_labels + event.labels = list(new_labels) elif attrs['action'] in ('approved', 'unapproved'): event.action = attrs['action'] else: @@ -237,7 +237,9 @@ class GitlabEventConnector(threading.Thread): self.connection.checkBranchCache(event.project_name, event) self.connection.logEvent(event) - self.connection.sched.addEvent(event) + self.connection.sched.addTriggerEvent( + self.connection.driver_name, event + ) def run(self): while True: diff --git a/zuul/driver/gitlab/gitlabmodel.py b/zuul/driver/gitlab/gitlabmodel.py index 88f3774889..5e83eb384d 100644 --- a/zuul/driver/gitlab/gitlabmodel.py +++ b/zuul/driver/gitlab/gitlabmodel.py @@ -88,7 +88,7 @@ class GitlabTriggerEvent(TriggerEvent): r = [super(GitlabTriggerEvent, self)._repr()] if self.action: r.append("action:%s" % self.action) - r.append("project:%s" % self.canonical_project_name) + r.append("project:%s" % self.project_name) if self.change_number: r.append("mr:%s" % self.change_number) if self.labels: diff --git a/zuul/driver/pagure/pagureconnection.py b/zuul/driver/pagure/pagureconnection.py index d4bca3b1d0..737279b1fb 100644 --- a/zuul/driver/pagure/pagureconnection.py +++ b/zuul/driver/pagure/pagureconnection.py @@ -255,7 +255,9 @@ class PagureEventConnector(threading.Thread): event=event) event.project_hostname = self.connection.canonical_hostname self.connection.logEvent(event) - self.connection.sched.addEvent(event) + self.connection.sched.addTriggerEvent( + self.connection.driver_name, event + ) def _event_base(self, body, pull_data_field='pullrequest'): event = PagureTriggerEvent() diff --git a/zuul/driver/timer/__init__.py b/zuul/driver/timer/__init__.py index cc7552545a..d2ac1fa924 100644 --- a/zuul/driver/timer/__init__.py +++ b/zuul/driver/timer/__init__.py @@ -130,7 +130,7 @@ class TimerDriver(Driver, TriggerInterface): event.timestamp = time.time() log = get_annotated_logger(self.log, event) log.debug("Adding event") - self.sched.addEvent(event) + self.sched.addTriggerEvent(self.name, event) def stop(self): if self.apsched: diff --git a/zuul/driver/zuul/__init__.py b/zuul/driver/zuul/__init__.py index 93ffd53144..f1d1e2ce9a 100644 --- a/zuul/driver/zuul/__init__.py +++ b/zuul/driver/zuul/__init__.py @@ -96,7 +96,7 @@ class ZuulDriver(Driver, TriggerInterface): event.ref = change.ref event.zuul_event_id = str(uuid4().hex) event.timestamp = time.time() - self.sched.addEvent(event) + self.sched.addTriggerEvent(self.name, event) def _createParentChangeEnqueuedEvents(self, change, pipeline, tenant, event): @@ -133,7 +133,7 @@ class ZuulDriver(Driver, TriggerInterface): event.patch_number = change.patchset event.ref = change.ref event.zuul_event_id = str(uuid4().hex) - self.sched.addEvent(event) + self.sched.addTriggerEvent(self.name, event) def getTrigger(self, connection_name, config=None): return zuultrigger.ZuulTrigger(self, config) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 8cdb9ecead..e56aaba015 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -18,7 +18,6 @@ import json import logging import os -import pickle import re import queue import socket @@ -53,19 +52,22 @@ from zuul.model import ( EnqueueEvent, FilesChangesCompletedEvent, HoldRequest, - ManagementEvent, MergeCompletedEvent, NodesProvisionedEvent, PromoteEvent, ReconfigureEvent, - ResultEvent, SmartReconfigureEvent, Tenant, TenantReconfigureEvent, - TriggerEvent, ) from zuul.zk import ZooKeeperClient from zuul.zk.components import ZooKeeperComponentRegistry +from zuul.zk.event_queues import ( + GlobalEventWatcher, + GlobalTriggerEventQueue, + PipelineEventWatcher, + PipelineTriggerEventQueue, +) from zuul.zk.nodepool import ZooKeeperNodepool COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl'] @@ -113,7 +115,6 @@ class Scheduler(threading.Thread): 'repl': self.start_repl, 'norepl': self.stop_repl, } - self._hibernate = False self._stopped = False self._zuul_app = app @@ -143,9 +144,22 @@ class Scheduler(threading.Thread): ) ) - self.trigger_event_queue = queue.Queue() self.result_event_queue = queue.Queue() self.management_event_queue = zuul.lib.queue.MergedQueue() + self.global_watcher = GlobalEventWatcher( + self.zk_client, self.wake_event.set + ) + self.pipeline_watcher = PipelineEventWatcher( + self.zk_client, self.wake_event.set + ) + self.trigger_events = GlobalTriggerEventQueue( + self.zk_client, self.connections + ) + self.pipeline_trigger_events = ( + PipelineTriggerEventQueue.createRegistry( + self.zk_client, self.connections + ) + ) self.abide = model.Abide() self.unparsed_abide = model.UnparsedAbideConfig() @@ -298,38 +312,23 @@ class Scheduler(threading.Thread): self.statsd.gauge('zuul.mergers.jobs_running', merge_running) self.statsd.gauge('zuul.mergers.jobs_queued', merge_queue) self.statsd.gauge('zuul.scheduler.eventqueues.trigger', - self.trigger_event_queue.qsize()) + len(self.trigger_events)) self.statsd.gauge('zuul.scheduler.eventqueues.result', self.result_event_queue.qsize()) self.statsd.gauge('zuul.scheduler.eventqueues.management', self.management_event_queue.qsize()) - def addEvent(self, event): - # Check the event type and put it in the corresponding queue - if isinstance(event, TriggerEvent): - return self._addTriggerEvent(event) - - if isinstance(event, ManagementEvent): - return self._addManagementEvent(event) - - if isinstance(event, ResultEvent): - return self._addResultEvent(event) - - self.log.warning( - "Unable to found appropriate queue for event %s", event - ) - - def _addTriggerEvent(self, event): + def addTriggerEvent(self, driver_name, event): event.arrived_at_scheduler_timestamp = time.time() - self.trigger_event_queue.put(event) + self.trigger_events.put(driver_name, event) self.wake_event.set() - def _addManagementEvent(self, event): + def addManagementEvent(self, event): self.management_event_queue.put(event) self.wake_event.set() event.wait() - def _addResultEvent(self, event): + def addResultEvent(self, event): self.result_event_queue.put(event) self.wake_event.set() @@ -570,17 +569,6 @@ class Scheduler(threading.Thread): event.wait() self.log.debug("Enqueue complete") - def exit(self): - self.log.debug("Prepare to exit") - self._hibernate = True - self.wake_event.set() - self.log.debug("Waiting for exit") - - def _get_queue_pickle_file(self): - state_dir = get_default(self.config, 'scheduler', 'state_dir', - '/var/lib/zuul', expand_user=True) - return os.path.join(state_dir, 'queue.pickle') - def _get_time_database_dir(self): state_dir = get_default(self.config, 'scheduler', 'state_dir', '/var/lib/zuul', expand_user=True) @@ -602,60 +590,6 @@ class Scheduler(threading.Thread): "current mode is %o" % (key_dir, mode)) return key_dir - def _save_queue(self) -> None: - # TODO JK: Remove when queues in ZK - pickle_file = self._get_queue_pickle_file() - events = [] - while not self.trigger_event_queue.empty(): - events.append(self.trigger_event_queue.get()) - self.log.debug("Queue length is %s" % len(events)) - if events: - self.log.debug("Saving queue") - pickle.dump(events, open(pickle_file, 'wb')) - - def _load_queue(self) -> None: - # TODO JK: Remove when queues in ZK - pickle_file = self._get_queue_pickle_file() - if os.path.exists(pickle_file): - self.log.debug("Loading queue") - events = pickle.load(open(pickle_file, 'rb')) - self.log.debug("Queue length is %s" % len(events)) - for event in events: - self.trigger_event_queue.put(event) - else: - self.log.debug("No queue file found") - - def _delete_queue(self) -> None: - # TODO JK: Remove when queues in ZK - pickle_file = self._get_queue_pickle_file() - if os.path.exists(pickle_file): - self.log.debug("Deleting saved queue") - os.unlink(pickle_file) - - def wakeUp(self) -> None: - """ - Wakes up scheduler by loading pickled queue. - - TODO JK: Remove when queues in ZK - """ - try: - self._load_queue() - except Exception: - self.log.exception("Unable to load queue") - try: - self._delete_queue() - except Exception: - self.log.exception("Unable to delete saved queue") - self.log.debug("Resuming queue processing") - self.wake_event.set() - - def _doHibernate(self) -> None: - # TODO JK: Remove when queues in ZK - if self._hibernate: - self.log.debug("Exiting") - self._save_queue() - os._exit(0) - def _checkTenantSourceConf(self, config): tenant_config = None script = False @@ -1109,13 +1043,11 @@ class Scheduler(threading.Thread): not self._stopped): self.process_result_queue() - if not self._hibernate: - while (not self.trigger_event_queue.empty() and - not self._stopped): - self.process_event_queue() + if not self._stopped: + self.process_global_trigger_queue() - if self._hibernate and self._areAllBuildsComplete(): - self._doHibernate() + if not self._stopped: + self.process_trigger_queue() for tenant in self.abide.tenants.values(): for pipeline in tenant.layout.pipelines.values(): @@ -1154,72 +1086,126 @@ class Scheduler(threading.Thread): "End maintain connection cache for: %s" % connection) self.log.debug("Connection cache size: %s" % len(relevant)) - def process_event_queue(self): - self.log.debug("Fetching trigger event") - event = self.trigger_event_queue.get() - log = get_annotated_logger(self.log, event.zuul_event_id) - log.debug("Processing trigger event %s" % event) + def process_global_trigger_queue(self): + for event in self.trigger_events: + log = get_annotated_logger( + self.log, event.zuul_event_id + ) + log.debug("Forwarding trigger event %s", event) + try: + for tenant in self.abide.tenants.values(): + self._forward_trigger_event(event, tenant) + finally: + self.trigger_events.ack(event) + + def _forward_trigger_event(self, event, tenant): + log = get_annotated_logger( + self.log, event.zuul_event_id + ) + _, project = tenant.getProject( + event.canonical_project_name + ) + if project is None: + return + try: - full_project_name = ('/'.join([event.project_hostname, - event.project_name])) - for tenant in self.abide.tenants.values(): - (trusted, project) = tenant.getProject(full_project_name) - if project is None: - continue - try: - change = project.source.getChange(event) - except exceptions.ChangeNotFound as e: - log.debug("Unable to get change %s from source %s", - e.change, project.source) - continue - reconfigure_tenant = False - if ((event.branch_updated and - hasattr(change, 'files') and - change.updatesConfig(tenant)) or - (event.branch_deleted and - self.abide.hasUnparsedBranchCache(project.canonical_name, - event.branch))): - reconfigure_tenant = True + change = project.source.getChange(event) + except exceptions.ChangeNotFound as e: + log.debug("Unable to get change %s from source %s", + e.change, project.source) + return - # The branch_created attribute is also true when a tag is - # created. Since we load config only from branches only trigger - # a tenant reconfiguration if the branch is set as well. - if event.branch_created and event.branch: - reconfigure_tenant = True + reconfigure_tenant = False + if ((event.branch_updated and + hasattr(change, 'files') and + change.updatesConfig(tenant)) or + (event.branch_deleted and + self.abide.hasUnparsedBranchCache(project.canonical_name, + event.branch))): + reconfigure_tenant = True - # If the driver knows the branch but we don't have a config, we - # also need to reconfigure. This happens if a GitHub branch - # was just configured as protected without a push in between. - if (event.branch in project.source.getProjectBranches( - project, tenant) - and not self.abide.hasUnparsedBranchCache( - project.canonical_name, event.branch)): - reconfigure_tenant = True + # The branch_created attribute is also true when a tag is + # created. Since we load config only from branches only trigger + # a tenant reconfiguration if the branch is set as well. + if event.branch_created and event.branch: + reconfigure_tenant = True - # If the branch is unprotected and unprotected branches - # are excluded from the tenant for that project skip reconfig. - if (reconfigure_tenant and not - event.branch_protected and - tenant.getExcludeUnprotectedBranches(project)): + # If the driver knows the branch but we don't have a config, we + # also need to reconfigure. This happens if a GitHub branch + # was just configured as protected without a push in between. + if (event.branch in project.source.getProjectBranches( + project, tenant) + and not self.abide.hasUnparsedBranchCache( + project.canonical_name, event.branch)): + reconfigure_tenant = True - reconfigure_tenant = False + # If the branch is unprotected and unprotected branches + # are excluded from the tenant for that project skip reconfig. + if (reconfigure_tenant and not + event.branch_protected and + tenant.getExcludeUnprotectedBranches(project)): - if reconfigure_tenant: - # The change that just landed updates the config - # or a branch was just created or deleted. Clear - # out cached data for this project and perform a - # reconfiguration. - self.reconfigureTenant(tenant, change.project, event) - for pipeline in tenant.layout.pipelines.values(): - if event.isPatchsetCreated(): - pipeline.manager.removeOldVersionsOfChange( - change, event) - elif event.isChangeAbandoned(): - pipeline.manager.removeAbandonedChange(change, event) - if pipeline.manager.eventMatches(event, change): - pipeline.manager.addChange(change, event) - finally: - self.trigger_event_queue.task_done() + reconfigure_tenant = False + + if reconfigure_tenant: + # The change that just landed updates the config + # or a branch was just created or deleted. Clear + # out cached data for this project and perform a + # reconfiguration. + self.reconfigureTenant(tenant, change.project, event) + + for pipeline in tenant.layout.pipelines.values(): + if ( + pipeline.manager.eventMatches(event, change) + or pipeline.manager.isChangeAlreadyInPipeline(change) + or pipeline.manager.findOldVersionOfChangeAlreadyInQueue( + change + ) + ): + self.pipeline_trigger_events[tenant.name][ + pipeline.name + ].put(event.driver_name, event) + + def process_trigger_queue(self): + for tenant in self.abide.tenants.values(): + for pipeline in tenant.layout.pipelines.values(): + for event in self.pipeline_trigger_events[tenant.name][ + pipeline.name + ]: + if self._stopped: + return + log = get_annotated_logger( + self.log, event.zuul_event_id + ) + log.debug("Processing trigger event %s", event) + try: + self._process_trigger_event(tenant, pipeline, event) + finally: + self.pipeline_trigger_events[tenant.name][ + pipeline.name + ].ack(event) + + def _process_trigger_event(self, tenant, pipeline, event): + log = get_annotated_logger( + self.log, event.zuul_event_id + ) + trusted, project = tenant.getProject(event.canonical_project_name) + if project is None: + return + try: + change = project.source.getChange(event) + except exceptions.ChangeNotFound as e: + log.debug("Unable to get change %s from source %s", + e.change, project.source) + return + + if event.isPatchsetCreated(): + pipeline.manager.removeOldVersionsOfChange( + change, event) + elif event.isChangeAbandoned(): + pipeline.manager.removeAbandonedChange(change, event) + if pipeline.manager.eventMatches(event, change): + pipeline.manager.addChange(change, event) def process_management_queue(self): self.log.debug("Fetching management event") @@ -1545,14 +1531,8 @@ class Scheduler(threading.Thread): data['zuul_version'] = self.zuul_version websocket_url = get_default(self.config, 'web', 'websocket_url', None) - if self._hibernate: - data['message'] = 'Queue only mode: preparing to hibernate,' \ - ' queue length: %s'\ - % self.trigger_event_queue.qsize() - data['trigger_event_queue'] = {} - data['trigger_event_queue']['length'] = \ - self.trigger_event_queue.qsize() + data['trigger_event_queue']['length'] = len(self.trigger_events) data['result_event_queue'] = {} data['result_event_queue']['length'] = \ self.result_event_queue.qsize()