From 603b8269114a5703f21e05ca11c833346c7d5ecb Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Fri, 17 Jun 2022 15:43:19 -0700 Subject: [PATCH] Add --wait-for-init scheduler option This instructs the scheduler to wait until all tenants have been initialized before processing pipelines. This can be useful for large systems with excess scheduler capacity to speed up a rolling restart. This also removes an unused instance variable from SchedulerTestManager. Change-Id: I19e733c881d1abf636674bf572f4764a0d018a8a --- .../notes/wait-for-init-934370422b22b442.yaml | 8 +++++++ tests/base.py | 21 +++++++++++-------- tests/unit/test_scheduler.py | 17 +++++++++++++++ zuul/cmd/scheduler.py | 7 ++++++- zuul/scheduler.py | 7 ++++++- 5 files changed, 49 insertions(+), 11 deletions(-) create mode 100644 releasenotes/notes/wait-for-init-934370422b22b442.yaml diff --git a/releasenotes/notes/wait-for-init-934370422b22b442.yaml b/releasenotes/notes/wait-for-init-934370422b22b442.yaml new file mode 100644 index 0000000000..9033131f18 --- /dev/null +++ b/releasenotes/notes/wait-for-init-934370422b22b442.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + The scheduler now accepts an argument `--wait-for-init` which will + cause it to wait until all tenants have been initialized before it + begins processing pipelines. This may help large systems with + excess scheduler capacity perform a rolling restart of schedulers + more quickly. diff --git a/tests/base.py b/tests/base.py index 504a9718de..2076fc2add 100644 --- a/tests/base.py +++ b/tests/base.py @@ -4379,11 +4379,12 @@ class SchedulerTestApp: def __init__(self, log, config, changes, additional_event_queues, upstream_root, poller_events, git_url_with_auth, add_cleanup, validate_tenants, - instance_id): + wait_for_init, instance_id): self.log = log self.config = config self.changes = changes self.validate_tenants = validate_tenants + self.wait_for_init = wait_for_init # Register connections from the config using fakes self.connections = TestConnectionRegistry( @@ -4397,7 +4398,8 @@ class SchedulerTestApp: ) self.connections.configure(self.config) - self.sched = TestScheduler(self.config, self.connections, self) + self.sched = TestScheduler(self.config, self.connections, self, + wait_for_init) self.sched.log = logging.getLogger(f"zuul.Scheduler-{instance_id}") self.sched._stats_interval = 1 @@ -4463,13 +4465,12 @@ class SchedulerTestApp: class SchedulerTestManager: - def __init__(self, validate_tenants): + def __init__(self, validate_tenants, wait_for_init): self.instances = [] - self.validate_tenants = validate_tenants def create(self, log, config, changes, additional_event_queues, - upstream_root, poller_events, - git_url_with_auth, add_cleanup, validate_tenants): + upstream_root, poller_events, git_url_with_auth, + add_cleanup, validate_tenants, wait_for_init): # Since the config contains a regex we cannot use copy.deepcopy() # as this will raise an exception with Python <3.7 config_data = StringIO() @@ -4490,7 +4491,7 @@ class SchedulerTestManager: additional_event_queues, upstream_root, poller_events, git_url_with_auth, add_cleanup, - validate_tenants, instance_id) + validate_tenants, wait_for_init, instance_id) self.instances.append(app) return app @@ -4593,6 +4594,7 @@ class ZuulTestCase(BaseTestCase): git_url_with_auth: bool = False log_console_port: int = 19885 validate_tenants = None + wait_for_init = None scheduler_count = SCHEDULER_COUNT def __getattr__(self, name): @@ -4757,7 +4759,8 @@ class ZuulTestCase(BaseTestCase): self.history = self.executor_server.build_history self.builds = self.executor_server.running_builds - self.scheds = SchedulerTestManager(self.validate_tenants) + self.scheds = SchedulerTestManager(self.validate_tenants, + self.wait_for_init) for _ in range(self.scheduler_count): self.createScheduler() @@ -4776,7 +4779,7 @@ class ZuulTestCase(BaseTestCase): self.log, self.config, self.changes, self.additional_event_queues, self.upstream_root, self.poller_events, self.git_url_with_auth, - self.addCleanup, self.validate_tenants) + self.addCleanup, self.validate_tenants, self.wait_for_init) def createZKContext(self, lock=None): if lock is None: diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 5cc70b9cfc..f3ee0d28e5 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -8793,3 +8793,20 @@ class TestEventProcessing(ZuulTestCase): dict(name='tagjob', result='SUCCESS'), dict(name='checkjob', result='SUCCESS', changes='1,1'), ], ordered=False) + + +class TestWaitForInit(ZuulTestCase): + tenant_config_file = 'config/single-tenant/main.yaml' + wait_for_init = True + + def setUp(self): + with self.assertLogs('zuul.Scheduler-0', level='DEBUG') as full_logs: + super().setUp() + self.assertRegexInList('Waiting for tenant initialization', + full_logs.output) + + def test_wait_for_init(self): + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.addApproval('Code-Review', 2) + self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) + self.waitUntilSettled() diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py index 3ab93670f6..7ed30b68ef 100755 --- a/zuul/cmd/scheduler.py +++ b/zuul/cmd/scheduler.py @@ -39,6 +39,10 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): 'listed, all tenants will be validated. ' 'Note: this requires ZooKeeper and ' 'will distribute work to mergers.') + parser.add_argument('--wait-for-init', dest='wait_for_init', + action='store_true', + help='Wait until all tenants are fully loaded ' + 'before beginning to process events.') self.addSubCommands(parser, zuul.scheduler.COMMANDS) return parser @@ -82,7 +86,8 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): self.configure_connections(require_sql=True) self.sched = zuul.scheduler.Scheduler(self.config, - self.connections, self) + self.connections, self, + self.args.wait_for_init) if self.args.validate_tenants is None: self.connections.registerScheduler(self.sched) self.connections.load(self.sched.zk_client, diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 2b1ac8c771..5c1808d261 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -184,9 +184,11 @@ class Scheduler(threading.Thread): _merger_client_class = MergeClient _executor_client_class = ExecutorClient - def __init__(self, config, connections, app, testonly=False): + def __init__(self, config, connections, app, wait_for_init, + testonly=False): threading.Thread.__init__(self) self.daemon = True + self.wait_for_init = wait_for_init self.hostname = socket.getfqdn() self.primed_event = threading.Event() # Wake up the main run loop @@ -1901,6 +1903,9 @@ class Scheduler(threading.Thread): self.log.debug("Statsd enabled") else: self.log.debug("Statsd not configured") + if self.wait_for_init: + self.log.debug("Waiting for tenant initialization") + self.primed_event.wait() while True: self.log.debug("Run handler sleeping") self.wake_event.wait()