Switch to Zookeeper backed management event queues

Management events will now be dispatched via Zookeeper. The event queues
are namespaced by tenant since the event processing will later require a
tenant lock in a multi scheduler deployment.

Events are considered immutable once in the queue which eliminates the
need for a separate read/write lock. Because of this tenant
reconfiguration events are merged on-the-fly when consuming from the
queue instead of merging them on insert.

Change-Id: Ia79089ce87ab9f4921c38b4542bbf2ea3e655055
This commit is contained in:
Simon Westphahl 2020-11-06 12:10:49 +01:00
parent 2e6cfff818
commit 50cf6a994a
5 changed files with 169 additions and 125 deletions

View File

@ -4032,14 +4032,12 @@ class SchedulerTestApp:
self.event_queues = [
self.sched.result_event_queue,
self.sched.management_event_queue
]
def start(self, validate_tenants: list):
self.sched.start()
self.sched.executor.gearman.waitForServer()
self.sched.reconfigure(
self.config, validate_tenants=validate_tenants)
self.sched.prime(self.config, validate_tenants=validate_tenants)
def fullReconfigure(self):
try:
@ -4916,10 +4914,16 @@ class ZuulTestCase(BaseTestCase):
def __areZooKeeperEventQueuesEmpty(self, matcher=None) -> bool:
for sched in map(lambda app: app.sched, self.scheds.filter(matcher)):
if sched.management_events.hasEvents():
return False
if sched.trigger_events.hasEvents():
return False
for tenant in sched.abide.tenants.values():
for pipeline_name in tenant.layout.pipelines:
if sched.pipeline_management_events[tenant.name][
pipeline_name
].hasEvents():
return False
if sched.pipeline_trigger_events[tenant.name][
pipeline_name
].hasEvents():

View File

@ -18,6 +18,7 @@ import json
import textwrap
import os
import re
import shutil
import socket
import time
@ -28,6 +29,7 @@ from kazoo.exceptions import NoNodeError
import git
import testtools
from testtools.matchers import AfterPreprocessing, MatchesRegex
from zuul.scheduler import Scheduler
import zuul.change_matcher
@ -3396,25 +3398,28 @@ class TestScheduler(ZuulTestCase):
tenant = self.scheds.first.sched.abide.tenants['tenant-one']
(trusted, project) = tenant.getProject('org/project')
self.scheds.first.sched.run_handler_lock.acquire()
self.assertEqual(
self.scheds.first.sched.management_event_queue.qsize(), 0)
with self.scheds.first.sched.run_handler_lock:
mgmt_queue_size = len(self.scheds.first.sched.management_events)
self.assertEqual(mgmt_queue_size, 0)
self.scheds.first.sched.reconfigureTenant(tenant, project, None)
self.assertEqual(
self.scheds.first.sched.management_event_queue.qsize(), 1)
self.scheds.first.sched.reconfigureTenant(tenant, project, None)
mgmt_queue_size = len(self.scheds.first.sched.management_events)
self.assertEqual(mgmt_queue_size, 1)
self.scheds.first.sched.reconfigureTenant(tenant, project, None)
# The second event should have been combined with the first
# so we should still only have one entry.
self.assertEqual(
self.scheds.first.sched.management_event_queue.qsize(), 1)
self.scheds.first.sched.reconfigureTenant(tenant, project, None)
mgmt_queue_size = len(self.scheds.first.sched.management_events)
self.assertEqual(mgmt_queue_size, 2)
# The second event should be combined with the first so we should
# only see the merged entry when consuming from the queue.
mgmt_events = list(self.scheds.first.sched.management_events)
self.assertEqual(len(mgmt_events), 1)
self.assertEqual(len(mgmt_events[0].merged_events), 1)
self.scheds.first.sched.run_handler_lock.release()
self.waitUntilSettled()
self.assertEqual(
self.scheds.first.sched.management_event_queue.qsize(), 0)
mgmt_queue_size = len(self.scheds.first.sched.management_events)
self.assertEqual(mgmt_queue_size, 0)
def test_live_reconfiguration(self):
"Test that live reconfiguration works"
@ -4641,17 +4646,23 @@ class TestScheduler(ZuulTestCase):
self.fake_gerrit.addEvent(D.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
matcher = AfterPreprocessing(
str,
MatchesRegex(
'.*Change 4,1 does not belong to project "org/project1"',
re.DOTALL,
),
annotate=False
)
with testtools.ExpectedException(
zuul.rpcclient.RPCFailure,
'Change 4,1 does not belong to project "org/project1"'):
r = client.dequeue(
zuul.rpcclient.RPCFailure, matcher):
client.dequeue(
tenant='tenant-one',
pipeline='check',
project='org/project1',
change='4,1',
ref=None)
self.waitUntilSettled()
self.assertEqual(r, False)
ref=None
)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()

View File

@ -143,8 +143,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.log.info('Starting scheduler')
try:
self.sched.start()
self.sched.reconfigure(self.config,
validate_tenants=self.args.validate_tenants)
self.sched.prime(self.config, self.args.validate_tenants)
except Exception:
self.log.exception("Error starting Zuul:")
# TODO(jeblair): If we had all threads marked as daemon,

View File

@ -23,7 +23,6 @@ from itertools import chain
import re2
import struct
import threading
import time
from uuid import uuid4
import urllib.parse
@ -3490,28 +3489,15 @@ class AbstractEvent(abc.ABC):
class ManagementEvent(AbstractEvent):
"""An event that should be processed within the main queue run loop"""
def __init__(self):
self._wait_event = threading.Event()
self._exc_info = None
self.traceback = None
self.zuul_event_id = None
# Opaque identifier in order to report the result of an event
self.result_ref = None
def exception(self, exc_info):
self._exc_info = exc_info
self._wait_event.set()
def done(self):
self._wait_event.set()
def wait(self, timeout=None):
self._wait_event.wait(timeout)
if self._exc_info:
# sys.exc_info returns (type, value, traceback)
type_, exception_instance, traceback = self._exc_info
raise exception_instance.with_traceback(traceback)
return self._wait_event.is_set()
def exception(self, tb: str):
self.traceback = tb
def toDict(self):
return {
@ -3524,48 +3510,25 @@ class ManagementEvent(AbstractEvent):
class ReconfigureEvent(ManagementEvent):
"""Reconfigure the scheduler. The layout will be (re-)loaded from
the path specified in the configuration.
the path specified in the configuration."""
:arg ConfigParser config: the new configuration
"""
def __init__(self, config, validate_tenants=None):
def __init__(self, validate_tenants=None):
super(ReconfigureEvent, self).__init__()
self.config = config
self.validate_tenants = validate_tenants
def toDict(self):
d = super().toDict()
# Note: config is not JSON serializable and will be removed
# before this is serialized into ZK.
d["config"] = self.config
d["validate_tenants"] = self.validate_tenants
return d
@classmethod
def fromDict(cls, data):
return cls(data.get("config"), data.get("validate_tenants"))
return cls(data.get("validate_tenants"))
class SmartReconfigureEvent(ManagementEvent):
"""Reconfigure the scheduler. The layout will be (re-)loaded from
the path specified in the configuration.
:arg ConfigParser config: the new configuration
"""
def __init__(self, config, smart=False):
super().__init__()
self.config = config
def toDict(self):
d = super().toDict()
# Note: config is not JSON serializable and will be removed
# before this is serialized into ZK.
d["config"] = self.config
return d
@classmethod
def fromDict(cls, data):
return cls(data.get("config"))
the path specified in the configuration."""
class TenantReconfigureEvent(ManagementEvent):
@ -3573,15 +3536,15 @@ class TenantReconfigureEvent(ManagementEvent):
the path specified in the configuration.
:arg str tenant_name: the tenant to reconfigure
:arg Project project: if supplied, clear the cached configuration
:arg str project_name: if supplied, clear the cached configuration
from this project first
:arg Branch branch: if supplied along with project, only remove the
:arg str branch_name: if supplied along with project, only remove the
configuration of the specific branch from the cache
"""
def __init__(self, tenant_name, project, branch):
def __init__(self, tenant_name, project_name, branch_name):
super(TenantReconfigureEvent, self).__init__()
self.tenant_name = tenant_name
self.project_branches = set([(project, branch)])
self.project_branches = set([(project_name, branch_name)])
self.merged_events = []
def __ne__(self, other):
@ -3718,13 +3681,13 @@ class EnqueueEvent(ManagementEvent):
def toDict(self):
d = super().toDict()
d["trigger_event"] = self.trigger_event
d["trigger_event"] = self.trigger_event.toDict()
return d
@classmethod
def fromDict(cls, data):
return cls(
data.get("trigger_event"),
TriggerEvent.fromDict(data["trigger_event"]),
)

View File

@ -64,8 +64,10 @@ from zuul.zk import ZooKeeperClient
from zuul.zk.components import ZooKeeperComponentRegistry
from zuul.zk.event_queues import (
GlobalEventWatcher,
GlobalManagementEventQueue,
GlobalTriggerEventQueue,
PipelineEventWatcher,
PipelineManagementEventQueue,
PipelineTriggerEventQueue,
)
from zuul.zk.nodepool import ZooKeeperNodepool
@ -145,13 +147,18 @@ class Scheduler(threading.Thread):
)
self.result_event_queue = queue.Queue()
self.management_event_queue = zuul.lib.queue.MergedQueue()
self.global_watcher = GlobalEventWatcher(
self.zk_client, self.wake_event.set
)
self.pipeline_watcher = PipelineEventWatcher(
self.zk_client, self.wake_event.set
)
self.management_events = GlobalManagementEventQueue(self.zk_client)
self.pipeline_management_events = (
PipelineManagementEventQueue.createRegistry(
self.zk_client
)
)
self.trigger_events = GlobalTriggerEventQueue(
self.zk_client, self.connections
)
@ -316,7 +323,7 @@ class Scheduler(threading.Thread):
self.statsd.gauge('zuul.scheduler.eventqueues.result',
self.result_event_queue.qsize())
self.statsd.gauge('zuul.scheduler.eventqueues.management',
self.management_event_queue.qsize())
len(self.management_events))
def addTriggerEvent(self, driver_name, event):
event.arrived_at_scheduler_timestamp = time.time()
@ -324,9 +331,7 @@ class Scheduler(threading.Thread):
self.wake_event.set()
def addManagementEvent(self, event):
self.management_event_queue.put(event)
self.wake_event.set()
event.wait()
self.management_events.put(event, needs_result=False)
def addResultEvent(self, event):
self.result_event_queue.put(event)
@ -416,9 +421,10 @@ class Scheduler(threading.Thread):
"%s due to event %s in project %s",
tenant.name, event, project)
branch = event.branch if event is not None else None
event = TenantReconfigureEvent(tenant.name, project, branch)
self.management_event_queue.put(event)
self.wake_event.set()
event = TenantReconfigureEvent(
tenant.name, project.canonical_name, branch
)
self.management_events.put(event, needs_result=False)
def fullReconfigureCommandHandler(self):
self._zuul_app.fullReconfigure()
@ -438,16 +444,24 @@ class Scheduler(threading.Thread):
self.repl.stop()
self.repl = None
def reconfigure(self, config, smart=False, validate_tenants=None):
def prime(self, config, validate_tenants=None):
self.log.debug("Priming scheduler config")
event = ReconfigureEvent(validate_tenants=validate_tenants)
self._doReconfigureEvent(event)
self.log.debug("Config priming complete")
self.last_reconfigured = int(time.time())
def reconfigure(self, config, smart=False):
self.log.debug("Submitting reconfiguration event")
if smart:
event = SmartReconfigureEvent(config)
event = SmartReconfigureEvent()
else:
event = ReconfigureEvent(config, validate_tenants=validate_tenants)
self.management_event_queue.put(event)
self.wake_event.set()
event = ReconfigureEvent()
result = self.management_events.put(event)
self.log.debug("Waiting for reconfiguration")
event.wait()
result.wait()
self.log.debug("Reconfiguration complete")
self.last_reconfigured = int(time.time())
# TODOv3(jeblair): reconfigure time should be per-tenant
@ -546,27 +560,24 @@ class Scheduler(threading.Thread):
def promote(self, tenant_name, pipeline_name, change_ids):
event = PromoteEvent(tenant_name, pipeline_name, change_ids)
self.management_event_queue.put(event)
self.wake_event.set()
result = self.management_events.put(event)
self.log.debug("Waiting for promotion")
event.wait()
result.wait()
self.log.debug("Promotion complete")
def dequeue(self, tenant_name, pipeline_name, project_name, change, ref):
event = DequeueEvent(
tenant_name, pipeline_name, project_name, change, ref)
self.management_event_queue.put(event)
self.wake_event.set()
result = self.management_events.put(event)
self.log.debug("Waiting for dequeue")
event.wait()
result.wait()
self.log.debug("Dequeue complete")
def enqueue(self, trigger_event):
event = EnqueueEvent(trigger_event)
self.management_event_queue.put(event)
self.wake_event.set()
result = self.management_events.put(event)
self.log.debug("Waiting for enqueue")
event.wait()
result.wait()
self.log.debug("Enqueue complete")
def _get_time_database_dir(self):
@ -616,7 +627,7 @@ class Scheduler(threading.Thread):
# This is called in the scheduler loop after another thread submits
# a request
self.layout_lock.acquire()
self.config = event.config
self.config = self._zuul_app.config
try:
self.log.info("Full reconfiguration beginning")
start = time.monotonic()
@ -666,7 +677,7 @@ class Scheduler(threading.Thread):
# a request
reconfigured_tenants = []
with self.layout_lock:
self.config = event.config
self.config = self._zuul_app.config
self.log.info("Smart reconfiguration beginning")
start = time.monotonic()
@ -726,11 +737,11 @@ class Scheduler(threading.Thread):
start = time.monotonic()
# If a change landed to a project, clear out the cached
# config of the changed branch before reconfiguring.
for (project, branch) in event.project_branches:
for project_name, branch_name in event.project_branches:
self.log.debug("Clearing unparsed config: %s @%s",
project.canonical_name, branch)
self.abide.clearUnparsedBranchCache(project.canonical_name,
branch)
project_name, branch_name)
self.abide.clearUnparsedBranchCache(project_name,
branch_name)
old_tenant = self.abide.tenants[event.tenant_name]
loader = configloader.ConfigLoader(
self.connections, self, self.merger,
@ -1033,8 +1044,10 @@ class Scheduler(threading.Thread):
self.log.debug("Run handler awake")
self.run_handler_lock.acquire()
try:
while (not self.management_event_queue.empty() and
not self._stopped):
if not self._stopped:
self.process_global_management_queue()
if not self._stopped:
self.process_management_queue()
# Give result events priority -- they let us stop builds,
@ -1207,18 +1220,77 @@ class Scheduler(threading.Thread):
if pipeline.manager.eventMatches(event, change):
pipeline.manager.addChange(change, event)
def process_global_management_queue(self):
for event in self.management_events:
event_forwarded = False
try:
if isinstance(event, ReconfigureEvent):
self._doReconfigureEvent(event)
elif isinstance(event, SmartReconfigureEvent):
self._doSmartReconfigureEvent(event)
elif isinstance(event, TenantReconfigureEvent):
self._doTenantReconfigureEvent(event)
elif isinstance(event, (PromoteEvent, DequeueEvent)):
try:
tenant = self.abide.tenants[event.tenant_name]
pipeline = tenant.layout.pipelines[event.pipeline_name]
self.pipeline_management_events[tenant.name][
pipeline.name
].put(event)
event_forwarded = True
except Exception:
event.exception(
"".join(
traceback.format_exception(*sys.exc_info())
)
)
elif isinstance(event, EnqueueEvent):
try:
trigger_event = event.trigger_event
tenant = self.abide.tenants[trigger_event.tenant_name]
pipeline = tenant.layout.pipelines[
trigger_event.forced_pipeline
]
self.pipeline_management_events[tenant.name][
pipeline.name
].put(event)
event_forwarded = True
except Exception:
event.exception(
"".join(
traceback.format_exception(*sys.exc_info())
)
)
else:
self.log.error("Unable to handle event %s", event)
finally:
if event_forwarded:
self.management_events.ackWithoutResult(event)
else:
self.management_events.ack(event)
def process_management_queue(self):
self.log.debug("Fetching management event")
event = self.management_event_queue.get()
self.log.debug("Processing management event %s" % event)
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
for event in self.pipeline_management_events[tenant.name][
pipeline.name
]:
if self._stopped:
return
log = get_annotated_logger(
self.log, event.zuul_event_id
)
log.debug("Processing management event %s", event)
try:
self._process_management_event(event)
finally:
self.pipeline_management_events[tenant.name][
pipeline.name
].ack(event)
def _process_management_event(self, event):
try:
if isinstance(event, ReconfigureEvent):
self._doReconfigureEvent(event)
elif isinstance(event, SmartReconfigureEvent):
self._doSmartReconfigureEvent(event)
elif isinstance(event, TenantReconfigureEvent):
self._doTenantReconfigureEvent(event)
elif isinstance(event, PromoteEvent):
if isinstance(event, PromoteEvent):
self._doPromoteEvent(event)
elif isinstance(event, DequeueEvent):
self._doDequeueEvent(event)
@ -1226,15 +1298,11 @@ class Scheduler(threading.Thread):
self._doEnqueueEvent(event.trigger_event)
else:
self.log.error("Unable to handle event %s" % event)
event.done()
except Exception:
self.log.exception("Exception in management event:")
type, val, tb = sys.exc_info()
# Remove local variables from the traceback to prevent leaking
# large objects.
traceback.clear_frames(tb)
event.exception((type, val, tb))
self.management_event_queue.task_done()
event.exception(
"".join(traceback.format_exception(*sys.exc_info()))
)
def process_result_queue(self):
self.log.debug("Fetching result event")
@ -1537,8 +1605,7 @@ class Scheduler(threading.Thread):
data['result_event_queue']['length'] = \
self.result_event_queue.qsize()
data['management_event_queue'] = {}
data['management_event_queue']['length'] = \
self.management_event_queue.qsize()
data['management_event_queue']['length'] = len(self.management_events)
if self.last_reconfigured:
data['last_reconfigured'] = self.last_reconfigured * 1000