zuul/zuul/driver/timer/__init__.py

296 lines
12 KiB
Python

# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013 OpenStack Foundation
# Copyright 2016 Red Hat, Inc.
# Copyright 2023 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import threading
import time
import random
from collections import defaultdict
from uuid import uuid4
from apscheduler.schedulers.background import BackgroundScheduler
from opentelemetry import trace
from zuul.driver import Driver, TriggerInterface
from zuul.driver.timer import timertrigger
from zuul.driver.timer import timermodel
from zuul.driver.timer.timermodel import TimerTriggerEvent
from zuul.driver.timer.crontrigger import ZuulCronTrigger
from zuul.lib.logutil import get_annotated_logger
from zuul.zk.election import SessionAwareElection
class TimerDriver(Driver, TriggerInterface):
name = 'timer'
election_root = "/zuul/scheduler/timer-election"
log = logging.getLogger("zuul.TimerDriver")
tracer = trace.get_tracer("zuul")
def __init__(self):
self.log.debug("Starting apscheduler")
self.apsched = BackgroundScheduler()
self.apsched.start()
self.tenant_jobs = {}
self.election = None
self.election_thread = None
self.election_won = False
# Mapping of locks: canonical project name -> lock
# The lock are used to avoid concurrent update errors when a
# lot of periodic pipelines are triggering simultanously.
self.project_update_locks = defaultdict(threading.Lock)
self.stop_event = threading.Event()
self.stopped = False
def registerScheduler(self, scheduler):
self.sched = scheduler
self.election = SessionAwareElection(
self.sched.zk_client.client, self.election_root)
self.election_thread = threading.Thread(name="TimerElection",
target=self._runElection,
daemon=True)
self.election_thread.start()
def _runElection(self):
self.log.debug("Starting timer election loop")
while not self.stopped:
try:
self.log.info("Running timer election")
self.election.run(self._electionInner)
except Exception:
self.log.exception("Error in timer election:")
self.log.debug("Exiting timer election loop")
def _electionInner(self):
try:
self.election_won = True
self.log.info("Won timer election")
self.stop_event.wait()
finally:
self.election_won = False
self.stop_event.clear()
self.log.debug("Timer election tenure ended")
def reconfigure(self, tenant):
if self.stopped:
return
if not self.apsched:
# Handle possible reuse of the driver without connection objects.
self.log.debug("Starting apscheduler on reconfigure")
self.apsched = BackgroundScheduler()
self.apsched.start()
self._addJobs(tenant)
def _removeJobs(self, tenant, new_jobs):
# Compare existing jobs to new jobs and remove any that should
# not be present.
existing_jobs = self.tenant_jobs.get(tenant.name)
if not existing_jobs:
return
to_remove = set(existing_jobs.keys()) - set(new_jobs.keys())
for key in to_remove:
job = existing_jobs[key]
job.remove()
def _addJobs(self, tenant):
jobs = {}
for pipeline in tenant.layout.pipelines.values():
for ef in pipeline.manager.event_filters:
if not isinstance(ef.trigger, timertrigger.TimerTrigger):
continue
for timespec in ef.timespecs:
parts = timespec.split()
if len(parts) < 5 or len(parts) > 7:
self.log.error(
"Unable to parse time value '%s' "
"defined in pipeline %s" % (
timespec,
pipeline.name))
continue
try:
cron_args = dict(
minute=parts[0],
hour=parts[1],
day=parts[2],
month=parts[3],
day_of_week=parts[4],
second=None,
)
if len(parts) > 5:
cron_args['second'] = parts[5]
if len(parts) > 6:
jitter = int(parts[6])
else:
jitter = None
# Trigger any value errors by creating a
# throwaway object.
ZuulCronTrigger(jitter=jitter, **cron_args)
except ValueError:
self.log.exception(
"Unable to create CronTrigger "
"for value '%s' defined in "
"pipeline %s",
timespec,
pipeline.name)
continue
self._addJobsInner(tenant, pipeline,
cron_args, jitter, timespec,
jobs)
self._removeJobs(tenant, jobs)
self.tenant_jobs[tenant.name] = jobs
def _addJobsInner(self, tenant, pipeline, cron_args, jitter,
timespec, jobs):
# jobs is a dict of args->job that we mutate
existing_jobs = self.tenant_jobs.get(tenant.name, {})
for project_name, pcs in tenant.layout.project_configs.items():
# timer operates on branch heads and doesn't need
# speculative layouts to decide if it should be
# enqueued or not. So it can be decided on cached
# data if it needs to run or not.
pcst = tenant.layout.getAllProjectConfigs(project_name)
if not [True for pc in pcst if pipeline.name in pc.pipelines]:
continue
try:
for branch in tenant.getProjectBranches(project_name):
args = (tenant.name, pipeline.name, project_name,
branch, timespec,)
existing_job = existing_jobs.get(args)
if jitter:
# Resolve jitter here so that it is the same
# on every scheduler for a given
# project-branch, assuming the same
# configuration.
prng_init = dict(
tenant=tenant.name,
project=project_name,
branch=branch,
)
prng_seed = json.dumps(prng_init, sort_keys=True)
prng = random.Random(prng_seed)
job_jitter = prng.uniform(0, jitter)
else:
job_jitter = None
if existing_job:
job = existing_job
else:
# The 'misfire_grace_time' argument is set to
# None to disable checking if the job missed
# its run time window. This ensures we don't
# miss a trigger when the job is delayed due
# to e.g. high scheduler load. Those short
# delays are not a problem for our trigger
# use-case.
trigger = ZuulCronTrigger(
jitter=job_jitter, **cron_args)
job = self.apsched.add_job(
self._onTrigger, trigger=trigger,
args=args, misfire_grace_time=None)
jobs[job.args] = job
except Exception:
self.log.exception("Unable to create APScheduler job for "
"%s %s %s",
tenant, pipeline, project_name)
def _onTrigger(self, tenant_name, pipeline_name, project_name, branch,
timespec):
if not self.election_won:
return
if not self.election.is_still_valid():
self.stop_event.set()
return
try:
attributes = {
"timespec": timespec,
}
with self.tracer.start_as_current_span(
"TimerEvent", attributes=attributes):
self._dispatchEvent(tenant_name, pipeline_name, project_name,
branch, timespec)
except Exception:
self.stop_event.set()
self.log.exception("Error when dispatching timer event")
def _dispatchEvent(self, tenant_name, pipeline_name, project_name,
branch, timespec):
self.log.debug('Got trigger for tenant %s and pipeline %s '
'project %s branch %s with timespec %s',
tenant_name, pipeline_name, project_name,
branch, timespec)
try:
tenant = self.sched.abide.tenants[tenant_name]
(trusted, project) = tenant.getProject(project_name)
event = TimerTriggerEvent()
event.type = 'timer'
event.timespec = timespec
event.forced_pipeline = pipeline_name
event.project_hostname = project.canonical_hostname
event.project_name = project.name
event.ref = 'refs/heads/%s' % branch
event.branch = branch
event.zuul_event_id = str(uuid4().hex)
event.timestamp = time.time()
event.arrived_at_scheduler_timestamp = event.timestamp
# Refresh the branch in order to update the item in the
# change cache.
change_key = project.source.getChangeKey(event)
with self.project_update_locks[project.canonical_name]:
project.source.getChange(change_key, refresh=True,
event=event)
log = get_annotated_logger(self.log, event)
log.debug("Adding event")
self.sched.pipeline_trigger_events[tenant.name][
pipeline_name
].put(self.name, event)
except Exception:
self.log.exception("Error dispatching timer event for "
"tenant %s project %s branch %s",
tenant_name, project_name, branch)
def stop(self):
self.log.debug("Stopping timer driver")
self.stopped = True
self.stop_event.set()
if self.apsched:
self.log.debug("Stopping apscheduler")
self.apsched.shutdown()
self.apsched = None
self.log.debug("Stopped apscheduler")
if self.election:
self.log.debug("Stopping election")
self.election.cancel()
self.log.debug("Stopped election")
if self.election_thread:
self.log.debug("Stopping election thread")
self.election_thread.join()
self.log.debug("Stopped election thread")
self.log.debug("Stopped timer driver")
def getTrigger(self, connection_name, config=None):
return timertrigger.TimerTrigger(self, config)
def getTriggerSchema(self):
return timertrigger.getSchema()
def getTriggerEventClass(self):
return timermodel.TimerTriggerEvent