diff --git a/tests/base.py b/tests/base.py index 4a1cbe1f09..99fc9664e0 100644 --- a/tests/base.py +++ b/tests/base.py @@ -4032,14 +4032,12 @@ class SchedulerTestApp: self.event_queues = [ self.sched.result_event_queue, - self.sched.management_event_queue ] def start(self, validate_tenants: list): self.sched.start() self.sched.executor.gearman.waitForServer() - self.sched.reconfigure( - self.config, validate_tenants=validate_tenants) + self.sched.prime(self.config, validate_tenants=validate_tenants) def fullReconfigure(self): try: @@ -4916,10 +4914,16 @@ class ZuulTestCase(BaseTestCase): def __areZooKeeperEventQueuesEmpty(self, matcher=None) -> bool: for sched in map(lambda app: app.sched, self.scheds.filter(matcher)): + if sched.management_events.hasEvents(): + return False if sched.trigger_events.hasEvents(): return False for tenant in sched.abide.tenants.values(): for pipeline_name in tenant.layout.pipelines: + if sched.pipeline_management_events[tenant.name][ + pipeline_name + ].hasEvents(): + return False if sched.pipeline_trigger_events[tenant.name][ pipeline_name ].hasEvents(): diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index dbe3bc5704..839acb5a5b 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -18,6 +18,7 @@ import json import textwrap import os +import re import shutil import socket import time @@ -28,6 +29,7 @@ from kazoo.exceptions import NoNodeError import git import testtools +from testtools.matchers import AfterPreprocessing, MatchesRegex from zuul.scheduler import Scheduler import zuul.change_matcher @@ -3396,25 +3398,28 @@ class TestScheduler(ZuulTestCase): tenant = self.scheds.first.sched.abide.tenants['tenant-one'] (trusted, project) = tenant.getProject('org/project') - self.scheds.first.sched.run_handler_lock.acquire() - self.assertEqual( - self.scheds.first.sched.management_event_queue.qsize(), 0) + with self.scheds.first.sched.run_handler_lock: + mgmt_queue_size = len(self.scheds.first.sched.management_events) + self.assertEqual(mgmt_queue_size, 0) - self.scheds.first.sched.reconfigureTenant(tenant, project, None) - self.assertEqual( - self.scheds.first.sched.management_event_queue.qsize(), 1) + self.scheds.first.sched.reconfigureTenant(tenant, project, None) + mgmt_queue_size = len(self.scheds.first.sched.management_events) + self.assertEqual(mgmt_queue_size, 1) - self.scheds.first.sched.reconfigureTenant(tenant, project, None) - # The second event should have been combined with the first - # so we should still only have one entry. - self.assertEqual( - self.scheds.first.sched.management_event_queue.qsize(), 1) + self.scheds.first.sched.reconfigureTenant(tenant, project, None) + mgmt_queue_size = len(self.scheds.first.sched.management_events) + self.assertEqual(mgmt_queue_size, 2) + + # The second event should be combined with the first so we should + # only see the merged entry when consuming from the queue. + mgmt_events = list(self.scheds.first.sched.management_events) + self.assertEqual(len(mgmt_events), 1) + self.assertEqual(len(mgmt_events[0].merged_events), 1) - self.scheds.first.sched.run_handler_lock.release() self.waitUntilSettled() - self.assertEqual( - self.scheds.first.sched.management_event_queue.qsize(), 0) + mgmt_queue_size = len(self.scheds.first.sched.management_events) + self.assertEqual(mgmt_queue_size, 0) def test_live_reconfiguration(self): "Test that live reconfiguration works" @@ -4641,17 +4646,23 @@ class TestScheduler(ZuulTestCase): self.fake_gerrit.addEvent(D.getPatchsetCreatedEvent(1)) self.waitUntilSettled() + matcher = AfterPreprocessing( + str, + MatchesRegex( + '.*Change 4,1 does not belong to project "org/project1"', + re.DOTALL, + ), + annotate=False + ) with testtools.ExpectedException( - zuul.rpcclient.RPCFailure, - 'Change 4,1 does not belong to project "org/project1"'): - r = client.dequeue( + zuul.rpcclient.RPCFailure, matcher): + client.dequeue( tenant='tenant-one', pipeline='check', project='org/project1', change='4,1', - ref=None) - self.waitUntilSettled() - self.assertEqual(r, False) + ref=None + ) self.executor_server.hold_jobs_in_build = False self.executor_server.release() diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py index 18928f724e..04ce746b7b 100755 --- a/zuul/cmd/scheduler.py +++ b/zuul/cmd/scheduler.py @@ -143,8 +143,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): self.log.info('Starting scheduler') try: self.sched.start() - self.sched.reconfigure(self.config, - validate_tenants=self.args.validate_tenants) + self.sched.prime(self.config, self.args.validate_tenants) except Exception: self.log.exception("Error starting Zuul:") # TODO(jeblair): If we had all threads marked as daemon, diff --git a/zuul/model.py b/zuul/model.py index 66d51709e3..35046c1bc2 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -23,7 +23,6 @@ from itertools import chain import re2 import struct -import threading import time from uuid import uuid4 import urllib.parse @@ -3490,28 +3489,15 @@ class AbstractEvent(abc.ABC): class ManagementEvent(AbstractEvent): """An event that should be processed within the main queue run loop""" + def __init__(self): - self._wait_event = threading.Event() - self._exc_info = None self.traceback = None self.zuul_event_id = None # Opaque identifier in order to report the result of an event self.result_ref = None - def exception(self, exc_info): - self._exc_info = exc_info - self._wait_event.set() - - def done(self): - self._wait_event.set() - - def wait(self, timeout=None): - self._wait_event.wait(timeout) - if self._exc_info: - # sys.exc_info returns (type, value, traceback) - type_, exception_instance, traceback = self._exc_info - raise exception_instance.with_traceback(traceback) - return self._wait_event.is_set() + def exception(self, tb: str): + self.traceback = tb def toDict(self): return { @@ -3524,48 +3510,25 @@ class ManagementEvent(AbstractEvent): class ReconfigureEvent(ManagementEvent): """Reconfigure the scheduler. The layout will be (re-)loaded from - the path specified in the configuration. + the path specified in the configuration.""" - :arg ConfigParser config: the new configuration - """ - def __init__(self, config, validate_tenants=None): + def __init__(self, validate_tenants=None): super(ReconfigureEvent, self).__init__() - self.config = config self.validate_tenants = validate_tenants def toDict(self): d = super().toDict() - # Note: config is not JSON serializable and will be removed - # before this is serialized into ZK. - d["config"] = self.config d["validate_tenants"] = self.validate_tenants return d @classmethod def fromDict(cls, data): - return cls(data.get("config"), data.get("validate_tenants")) + return cls(data.get("validate_tenants")) class SmartReconfigureEvent(ManagementEvent): """Reconfigure the scheduler. The layout will be (re-)loaded from - the path specified in the configuration. - - :arg ConfigParser config: the new configuration - """ - def __init__(self, config, smart=False): - super().__init__() - self.config = config - - def toDict(self): - d = super().toDict() - # Note: config is not JSON serializable and will be removed - # before this is serialized into ZK. - d["config"] = self.config - return d - - @classmethod - def fromDict(cls, data): - return cls(data.get("config")) + the path specified in the configuration.""" class TenantReconfigureEvent(ManagementEvent): @@ -3573,15 +3536,15 @@ class TenantReconfigureEvent(ManagementEvent): the path specified in the configuration. :arg str tenant_name: the tenant to reconfigure - :arg Project project: if supplied, clear the cached configuration + :arg str project_name: if supplied, clear the cached configuration from this project first - :arg Branch branch: if supplied along with project, only remove the + :arg str branch_name: if supplied along with project, only remove the configuration of the specific branch from the cache """ - def __init__(self, tenant_name, project, branch): + def __init__(self, tenant_name, project_name, branch_name): super(TenantReconfigureEvent, self).__init__() self.tenant_name = tenant_name - self.project_branches = set([(project, branch)]) + self.project_branches = set([(project_name, branch_name)]) self.merged_events = [] def __ne__(self, other): @@ -3718,13 +3681,13 @@ class EnqueueEvent(ManagementEvent): def toDict(self): d = super().toDict() - d["trigger_event"] = self.trigger_event + d["trigger_event"] = self.trigger_event.toDict() return d @classmethod def fromDict(cls, data): return cls( - data.get("trigger_event"), + TriggerEvent.fromDict(data["trigger_event"]), ) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index e56aaba015..8c268ecd21 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -64,8 +64,10 @@ from zuul.zk import ZooKeeperClient from zuul.zk.components import ZooKeeperComponentRegistry from zuul.zk.event_queues import ( GlobalEventWatcher, + GlobalManagementEventQueue, GlobalTriggerEventQueue, PipelineEventWatcher, + PipelineManagementEventQueue, PipelineTriggerEventQueue, ) from zuul.zk.nodepool import ZooKeeperNodepool @@ -145,13 +147,18 @@ class Scheduler(threading.Thread): ) self.result_event_queue = queue.Queue() - self.management_event_queue = zuul.lib.queue.MergedQueue() self.global_watcher = GlobalEventWatcher( self.zk_client, self.wake_event.set ) self.pipeline_watcher = PipelineEventWatcher( self.zk_client, self.wake_event.set ) + self.management_events = GlobalManagementEventQueue(self.zk_client) + self.pipeline_management_events = ( + PipelineManagementEventQueue.createRegistry( + self.zk_client + ) + ) self.trigger_events = GlobalTriggerEventQueue( self.zk_client, self.connections ) @@ -316,7 +323,7 @@ class Scheduler(threading.Thread): self.statsd.gauge('zuul.scheduler.eventqueues.result', self.result_event_queue.qsize()) self.statsd.gauge('zuul.scheduler.eventqueues.management', - self.management_event_queue.qsize()) + len(self.management_events)) def addTriggerEvent(self, driver_name, event): event.arrived_at_scheduler_timestamp = time.time() @@ -324,9 +331,7 @@ class Scheduler(threading.Thread): self.wake_event.set() def addManagementEvent(self, event): - self.management_event_queue.put(event) - self.wake_event.set() - event.wait() + self.management_events.put(event, needs_result=False) def addResultEvent(self, event): self.result_event_queue.put(event) @@ -416,9 +421,10 @@ class Scheduler(threading.Thread): "%s due to event %s in project %s", tenant.name, event, project) branch = event.branch if event is not None else None - event = TenantReconfigureEvent(tenant.name, project, branch) - self.management_event_queue.put(event) - self.wake_event.set() + event = TenantReconfigureEvent( + tenant.name, project.canonical_name, branch + ) + self.management_events.put(event, needs_result=False) def fullReconfigureCommandHandler(self): self._zuul_app.fullReconfigure() @@ -438,16 +444,24 @@ class Scheduler(threading.Thread): self.repl.stop() self.repl = None - def reconfigure(self, config, smart=False, validate_tenants=None): + def prime(self, config, validate_tenants=None): + self.log.debug("Priming scheduler config") + event = ReconfigureEvent(validate_tenants=validate_tenants) + self._doReconfigureEvent(event) + self.log.debug("Config priming complete") + self.last_reconfigured = int(time.time()) + + def reconfigure(self, config, smart=False): self.log.debug("Submitting reconfiguration event") if smart: - event = SmartReconfigureEvent(config) + event = SmartReconfigureEvent() else: - event = ReconfigureEvent(config, validate_tenants=validate_tenants) - self.management_event_queue.put(event) - self.wake_event.set() + event = ReconfigureEvent() + + result = self.management_events.put(event) + self.log.debug("Waiting for reconfiguration") - event.wait() + result.wait() self.log.debug("Reconfiguration complete") self.last_reconfigured = int(time.time()) # TODOv3(jeblair): reconfigure time should be per-tenant @@ -546,27 +560,24 @@ class Scheduler(threading.Thread): def promote(self, tenant_name, pipeline_name, change_ids): event = PromoteEvent(tenant_name, pipeline_name, change_ids) - self.management_event_queue.put(event) - self.wake_event.set() + result = self.management_events.put(event) self.log.debug("Waiting for promotion") - event.wait() + result.wait() self.log.debug("Promotion complete") def dequeue(self, tenant_name, pipeline_name, project_name, change, ref): event = DequeueEvent( tenant_name, pipeline_name, project_name, change, ref) - self.management_event_queue.put(event) - self.wake_event.set() + result = self.management_events.put(event) self.log.debug("Waiting for dequeue") - event.wait() + result.wait() self.log.debug("Dequeue complete") def enqueue(self, trigger_event): event = EnqueueEvent(trigger_event) - self.management_event_queue.put(event) - self.wake_event.set() + result = self.management_events.put(event) self.log.debug("Waiting for enqueue") - event.wait() + result.wait() self.log.debug("Enqueue complete") def _get_time_database_dir(self): @@ -616,7 +627,7 @@ class Scheduler(threading.Thread): # This is called in the scheduler loop after another thread submits # a request self.layout_lock.acquire() - self.config = event.config + self.config = self._zuul_app.config try: self.log.info("Full reconfiguration beginning") start = time.monotonic() @@ -666,7 +677,7 @@ class Scheduler(threading.Thread): # a request reconfigured_tenants = [] with self.layout_lock: - self.config = event.config + self.config = self._zuul_app.config self.log.info("Smart reconfiguration beginning") start = time.monotonic() @@ -726,11 +737,11 @@ class Scheduler(threading.Thread): start = time.monotonic() # If a change landed to a project, clear out the cached # config of the changed branch before reconfiguring. - for (project, branch) in event.project_branches: + for project_name, branch_name in event.project_branches: self.log.debug("Clearing unparsed config: %s @%s", - project.canonical_name, branch) - self.abide.clearUnparsedBranchCache(project.canonical_name, - branch) + project_name, branch_name) + self.abide.clearUnparsedBranchCache(project_name, + branch_name) old_tenant = self.abide.tenants[event.tenant_name] loader = configloader.ConfigLoader( self.connections, self, self.merger, @@ -1033,8 +1044,10 @@ class Scheduler(threading.Thread): self.log.debug("Run handler awake") self.run_handler_lock.acquire() try: - while (not self.management_event_queue.empty() and - not self._stopped): + if not self._stopped: + self.process_global_management_queue() + + if not self._stopped: self.process_management_queue() # Give result events priority -- they let us stop builds, @@ -1207,18 +1220,77 @@ class Scheduler(threading.Thread): if pipeline.manager.eventMatches(event, change): pipeline.manager.addChange(change, event) + def process_global_management_queue(self): + for event in self.management_events: + event_forwarded = False + try: + if isinstance(event, ReconfigureEvent): + self._doReconfigureEvent(event) + elif isinstance(event, SmartReconfigureEvent): + self._doSmartReconfigureEvent(event) + elif isinstance(event, TenantReconfigureEvent): + self._doTenantReconfigureEvent(event) + elif isinstance(event, (PromoteEvent, DequeueEvent)): + try: + tenant = self.abide.tenants[event.tenant_name] + pipeline = tenant.layout.pipelines[event.pipeline_name] + self.pipeline_management_events[tenant.name][ + pipeline.name + ].put(event) + event_forwarded = True + except Exception: + event.exception( + "".join( + traceback.format_exception(*sys.exc_info()) + ) + ) + elif isinstance(event, EnqueueEvent): + try: + trigger_event = event.trigger_event + tenant = self.abide.tenants[trigger_event.tenant_name] + pipeline = tenant.layout.pipelines[ + trigger_event.forced_pipeline + ] + self.pipeline_management_events[tenant.name][ + pipeline.name + ].put(event) + event_forwarded = True + except Exception: + event.exception( + "".join( + traceback.format_exception(*sys.exc_info()) + ) + ) + else: + self.log.error("Unable to handle event %s", event) + finally: + if event_forwarded: + self.management_events.ackWithoutResult(event) + else: + self.management_events.ack(event) + def process_management_queue(self): - self.log.debug("Fetching management event") - event = self.management_event_queue.get() - self.log.debug("Processing management event %s" % event) + for tenant in self.abide.tenants.values(): + for pipeline in tenant.layout.pipelines.values(): + for event in self.pipeline_management_events[tenant.name][ + pipeline.name + ]: + if self._stopped: + return + log = get_annotated_logger( + self.log, event.zuul_event_id + ) + log.debug("Processing management event %s", event) + try: + self._process_management_event(event) + finally: + self.pipeline_management_events[tenant.name][ + pipeline.name + ].ack(event) + + def _process_management_event(self, event): try: - if isinstance(event, ReconfigureEvent): - self._doReconfigureEvent(event) - elif isinstance(event, SmartReconfigureEvent): - self._doSmartReconfigureEvent(event) - elif isinstance(event, TenantReconfigureEvent): - self._doTenantReconfigureEvent(event) - elif isinstance(event, PromoteEvent): + if isinstance(event, PromoteEvent): self._doPromoteEvent(event) elif isinstance(event, DequeueEvent): self._doDequeueEvent(event) @@ -1226,15 +1298,11 @@ class Scheduler(threading.Thread): self._doEnqueueEvent(event.trigger_event) else: self.log.error("Unable to handle event %s" % event) - event.done() except Exception: self.log.exception("Exception in management event:") - type, val, tb = sys.exc_info() - # Remove local variables from the traceback to prevent leaking - # large objects. - traceback.clear_frames(tb) - event.exception((type, val, tb)) - self.management_event_queue.task_done() + event.exception( + "".join(traceback.format_exception(*sys.exc_info())) + ) def process_result_queue(self): self.log.debug("Fetching result event") @@ -1537,8 +1605,7 @@ class Scheduler(threading.Thread): data['result_event_queue']['length'] = \ self.result_event_queue.qsize() data['management_event_queue'] = {} - data['management_event_queue']['length'] = \ - self.management_event_queue.qsize() + data['management_event_queue']['length'] = len(self.management_events) if self.last_reconfigured: data['last_reconfigured'] = self.last_reconfigured * 1000