Use a seeded PRNG when setting timer triggers

In the single scheduler days, we went to some effort to make sure
that we preserved apscheduler timer trigger jobs across reconfigurations
because to do otherwise risks missing zuul triggers if we reconfigure
within the jitter interval of a timer job.

In other words, if a timer trigger is set for midnight with a jitter
of 60s, and we reconfigure the tenant at midnight plus 10 seconds, if
the jitter would have caused the trigger to fire at midnight plus 30
seconds and the apscheduler job is canceled and recreated but this
time the jitter would cause it to fire at midnight plus 5 seconds,
the job would not fire.

Our mitigation for this was to preserve the apscheduler jobs across
reconfigurations, assuming nothing has changed.

In the multi-scheduler world, a similar problem arises when we perform
a rolling restart during a jitter interval.  Fixing this by recording
all of the jobs in ZK would be costly (as we create an apscheduler job
for each project-branch participating in a timer pipeline).  But we
can address the bulk of the problem just by ensuring that the same
jitter values are used on all schedulers.  This reduces the window for
missed jobs down to the time it takes for the schedulers to run the
timer trigger election (which, assuming an orderly shutdown, is very
fast).

In APScheduler the jitter value is calculated for every event.  This
new approach calculates a fixed jitter for a job, and will use that
value every time the job fires.  So if a job ends up running at
midnight plus 10 seconds one day, and no configuration changes,
it will run at midnight plus 10 seconds the next day.  This is
sufficient to achieve what we want to do with jitter: avoid starting
all the jobs at once.

We subclass the apscheduler CronTrigger to implement this custom
fixed jitter behavior and use a PRNG seeded with the tenant, project
and branch so that each scheduler will use the same value for each
combination, assuming they have the same configuration (and they
should).

Docs are updated to highlight that the same jitter value may be reused,
but no release note is added since the behavior change is not
significant.

Change-Id: Idd9d3a8cfa791860e46e4fc508566417f5d5a9bf
This commit is contained in:
James E. Blair 2023-09-12 15:48:27 -07:00
parent 27a4beb698
commit be4ab1bb38
4 changed files with 137 additions and 24 deletions

View File

@ -31,11 +31,16 @@ Zuul implements the timer using `apscheduler`_, Please check the
The time specification in cron syntax. Only the 5 part syntax
is supported, not the symbolic names. Example: ``0 0 * * *``
runs at midnight.
An optional 6th part specifies seconds. The optional 7th part specifies
a jitter in seconds. This delays the trigger randomly, limited by
the specified value. Example ``0 0 * * * * 60`` runs at
midnight or randomly up to 60 seconds later. The jitter is
applied individually to each project-branch combination.
An optional 6th part specifies seconds. The optional 7th part
specifies a jitter in seconds. This delays the trigger randomly,
limited by the specified value. Example ``0 0 * * * * 60`` runs
at midnight or randomly up to 60 seconds later. The jitter is
applied individually to each project-branch combination. While
the jitter is initialized to a random value, the same value will
often be used for a given project-branch combination (in other
words, it is not guaranteed to vary from one run of the timer
trigger to the next).
.. warning::
Be aware the day-of-week value differs from from cron.

View File

@ -1,5 +1,5 @@
# Copyright 2021 BMW Group
# Copyright 2021 Acme Gating, LLC
# Copyright 2021-2023 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
import zuul.model
from tests.base import iterate_timeout, ZuulTestCase, simple_layout
@ -610,6 +612,56 @@ class TestScaleOutScheduler(ZuulTestCase):
self.fake_nodepool.unpause()
self.waitUntilSettled()
@simple_layout('layouts/timer-jitter.yaml')
def test_timer_multi_scheduler(self):
# Test that two schedulers create exactly the same timer jobs
# including jitter.
self.create_branch('org/project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable'))
self.waitUntilSettled()
timer1 = self.scheds.first.sched.connections.drivers['timer']
timer1_jobs = timer1.apsched.get_jobs()
sched2 = self.createScheduler()
sched2.start()
self.assertEqual(len(self.scheds), 2)
timer1.stop()
self.waitUntilSettled(matcher=[sched2])
timer2 = sched2.connections.drivers['timer']
for _ in iterate_timeout(10, "until jobs registered"):
timer2_jobs = timer2.apsched.get_jobs()
if timer2_jobs:
break
for x in range(len(timer1_jobs)):
self.log.debug("Timer jitter: %s %s",
timer1_jobs[x].trigger._zuul_jitter,
timer2_jobs[x].trigger._zuul_jitter)
self.assertEqual(timer1_jobs[x].trigger._zuul_jitter,
timer2_jobs[x].trigger._zuul_jitter)
if x:
# Assert that we're not applying the same jitter to
# every job. Since we're dealing with a PRNG here,
# this could fail and be a false negative, but that's
# unlikely to happen often.
self.assertNotEqual(timer1_jobs[x - 1].trigger._zuul_jitter,
timer1_jobs[x].trigger._zuul_jitter)
self.commitConfigUpdate('org/common-config', 'layouts/no-timer.yaml')
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
self.waitUntilSettled()
# If APScheduler is in mid-event when we remove the job, we
# can end up with one more event firing, so give it an extra
# second to settle.
time.sleep(1)
self.waitUntilSettled()
class TestSOSCircularDependencies(ZuulTestCase):
# Those tests are testing specific interactions between multiple

View File

@ -1,6 +1,7 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013 OpenStack Foundation
# Copyright 2016 Red Hat, Inc.
# Copyright 2023 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -14,20 +15,22 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import threading
import time
import random
from collections import defaultdict
from uuid import uuid4
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from opentelemetry import trace
from zuul.driver import Driver, TriggerInterface
from zuul.driver.timer import timertrigger
from zuul.driver.timer import timermodel
from zuul.driver.timer.timermodel import TimerTriggerEvent
from zuul.driver.timer.crontrigger import ZuulCronTrigger
from zuul.lib.logutil import get_annotated_logger
from zuul.zk.election import SessionAwareElection
@ -118,22 +121,23 @@ class TimerDriver(Driver, TriggerInterface):
timespec,
pipeline.name))
continue
minute, hour, dom, month, dow = parts[:5]
# default values
second = None
jitter = None
if len(parts) > 5:
second = parts[5]
if len(parts) > 6:
jitter = parts[6]
try:
jitter = int(jitter) if jitter is not None else None
trigger = CronTrigger(day=dom, day_of_week=dow,
hour=hour, minute=minute,
second=second, jitter=jitter)
cron_args = dict(
minute=parts[0],
hour=parts[2],
day=parts[3],
day_of_week=parts[4],
second=None,
)
if len(parts) > 5:
cron_args['second'] = parts[5]
if len(parts) > 6:
jitter = int(parts[6])
else:
jitter = None
# Trigger any value errors by creating a
# throwaway object.
ZuulCronTrigger(jitter=jitter, **cron_args)
except ValueError:
self.log.exception(
"Unable to create CronTrigger "
@ -143,12 +147,14 @@ class TimerDriver(Driver, TriggerInterface):
pipeline.name)
continue
self._addJobsInner(tenant, pipeline, trigger, timespec,
self._addJobsInner(tenant, pipeline,
cron_args, jitter, timespec,
jobs)
self._removeJobs(tenant, jobs)
self.tenant_jobs[tenant.name] = jobs
def _addJobsInner(self, tenant, pipeline, trigger, timespec, jobs):
def _addJobsInner(self, tenant, pipeline, cron_args, jitter,
timespec, jobs):
# jobs is a dict of args->job that we mutate
existing_jobs = self.tenant_jobs.get(tenant.name, {})
for project_name, pcs in tenant.layout.project_configs.items():
@ -165,6 +171,22 @@ class TimerDriver(Driver, TriggerInterface):
args = (tenant.name, pipeline.name, project_name,
branch, timespec,)
existing_job = existing_jobs.get(args)
if jitter:
# Resolve jitter here so that it is the same
# on every scheduler for a given
# project-branch, assuming the same
# configuration.
prng_init = dict(
tenant=tenant.name,
project=project_name,
branch=branch,
)
prng_seed = json.dumps(prng_init, sort_keys=True)
prng = random.Random(prng_seed)
job_jitter = prng.uniform(0, jitter)
else:
job_jitter = None
if existing_job:
job = existing_job
else:
@ -175,6 +197,8 @@ class TimerDriver(Driver, TriggerInterface):
# to e.g. high scheduler load. Those short
# delays are not a problem for our trigger
# use-case.
trigger = ZuulCronTrigger(
jitter=job_jitter, **cron_args)
job = self.apsched.add_job(
self._onTrigger, trigger=trigger,
args=args, misfire_grace_time=None)

View File

@ -0,0 +1,32 @@
# Copyright 2023 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from apscheduler.triggers.cron import CronTrigger
class ZuulCronTrigger(CronTrigger):
def __init__(self, *args, **kw):
self._zuul_jitter = kw.pop('jitter')
super().__init__(*args, **kw)
def get_next_fire_time(self, previous_fire_time, now):
next_time = super().get_next_fire_time(previous_fire_time, now)
if self._zuul_jitter:
next_time = next_time + datetime.timedelta(
seconds=self._zuul_jitter)
if self.end_date:
next_time = min(next_time, self.end_date)
return next_time