Switch from global to tenant event queues

With the way the global management and trigger event queues are
currently implemented we can't continue processing other events during a
reconfiguration and end up serializing reconfigurations.

The implementation is diverging from the spec here, as we did not
anticipate the need for tenant event queues back then.

We also add the following gauge metrics:
* zuul.tenant.<tenant>.trigger_events
* zuul.tenant.<tenant>.management_events

The `zuul.scheduler.eventqueues.trigger` gauge was removed together with
the global trigger event queue.

Change-Id: I1e0d872daf0fc4dd90e22b70c334e3e8152aa5b2
This commit is contained in:
Simon Westphahl 2021-06-22 14:12:03 +02:00
parent f44cc4fa83
commit c95d1b6d18
8 changed files with 224 additions and 143 deletions

View File

@ -49,6 +49,16 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
:stat:`zuul.tenant.<tenant>.event_enqueue_processing_time` and any
driver-specific pre-processing of the event.
.. stat:: zuul.tenant.<tenant>.management_events
:type: gauge
The size of the tenant's management event queue.
.. stat:: zuul.tenant.<tenant>.trigger_events
:type: gauge
The size of the tenant's trigger event queue.
.. stat:: zuul.tenant.<tenant>.pipeline
Holds metrics specific to jobs. This hierarchy includes:
@ -464,11 +474,6 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
Holds metrics about the event queue lengths in the Zuul scheduler.
.. stat:: trigger
:type: gauge
The size of the current trigger event queue.
.. stat:: result
:type: gauge
@ -477,7 +482,7 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
.. stat:: management
:type: gauge
The size of the current management event queue.
The size of the current reconfiguration event queue.
.. stat:: connection.<connection-name>
:type: gauge

View File

@ -4170,9 +4170,11 @@ class SchedulerTestApp:
if validate_tenants is None:
self.connections.registerScheduler(self.sched)
# TODO (felix): Can be removed when the nodes provisioned events are
# switched to ZooKeeper.
# TODO (felix, swestphahl): Can be removed when the nodes
# provisioned events are switched to ZooKeeper and after we no
# longer use global management events.
self.event_queues = [
self.sched.reconfigure_event_queue,
self.sched.result_event_queue,
]
@ -5080,11 +5082,11 @@ class ZuulTestCase(BaseTestCase):
for connection_name in sched.connections.connections:
if self.connection_event_queues[connection_name].hasEvents():
return False
if sched.management_events.hasEvents():
return False
if sched.trigger_events.hasEvents():
return False
for tenant in sched.abide.tenants.values():
if sched.management_events[tenant.name].hasEvents():
return False
if sched.trigger_events[tenant.name].hasEvents():
return False
for pipeline_name in tenant.layout.pipelines:
if sched.pipeline_management_events[tenant.name][
pipeline_name

View File

@ -611,7 +611,7 @@ class TestUnparsedConfigCache(ZuulTestCase):
event = model.TenantReconfigureEvent(
tenant.name, project.canonical_name, branch_name=None)
event.zuul_event_ltime = ltime
sched.management_events.put(event, needs_result=False)
sched.management_events[tenant.name].put(event, needs_result=False)
self.waitUntilSettled()
# As the cache should be valid, we only expect a cat job for

View File

@ -128,10 +128,10 @@ class TestTriggerEventQueue(EventQueueBaseTestCase):
self.driver = DummyDriver()
self.connections.registerDriver(self.driver)
def test_global_trigger_events(self):
# Test enqueue/dequeue of the global trigger event queue.
queue = event_queues.GlobalTriggerEventQueue(
self.zk_client, self.connections
def test_tenant_trigger_events(self):
# Test enqueue/dequeue of the tenant trigger event queue.
queue = event_queues.TenantTriggerEventQueue(
self.zk_client, self.connections, "tenant"
)
self.assertEqual(len(queue), 0)
@ -199,8 +199,9 @@ class TestTriggerEventQueue(EventQueueBaseTestCase):
class TestManagementEventQueue(EventQueueBaseTestCase):
def test_management_events(self):
# Test enqueue/dequeue of the global management event queue.
queue = event_queues.GlobalManagementEventQueue(self.zk_client)
# Test enqueue/dequeue of the tenant management event queue.
queue = event_queues.TenantManagementEventQueue(
self.zk_client, "tenant")
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
@ -229,7 +230,8 @@ class TestManagementEventQueue(EventQueueBaseTestCase):
def test_management_event_error(self):
# Test that management event errors are reported.
queue = event_queues.GlobalManagementEventQueue(self.zk_client)
queue = event_queues.TenantManagementEventQueue(
self.zk_client, "tenant")
event = model.ReconfigureEvent(None)
result_future = queue.put(event)
@ -246,7 +248,8 @@ class TestManagementEventQueue(EventQueueBaseTestCase):
def test_event_merge(self):
# Test that similar management events (eg, reconfiguration of
# two projects) can be merged.
queue = event_queues.GlobalManagementEventQueue(self.zk_client)
queue = event_queues.TenantManagementEventQueue(
self.zk_client, "tenant")
event = model.TenantReconfigureEvent("tenant", "project", "master")
queue.put(event, needs_result=False)
event = model.TenantReconfigureEvent("tenant", "other", "branch")
@ -267,21 +270,22 @@ class TestManagementEventQueue(EventQueueBaseTestCase):
self.assertFalse(queue.hasEvents())
def test_event_ltime(self):
global_queue = event_queues.GlobalManagementEventQueue(self.zk_client)
tenant_queue = event_queues.TenantManagementEventQueue(
self.zk_client, "tenant")
registry = event_queues.PipelineManagementEventQueue.createRegistry(
self.zk_client
)
event = model.ReconfigureEvent(None)
global_queue.put(event, needs_result=False)
self.assertTrue(global_queue.hasEvents())
tenant_queue.put(event, needs_result=False)
self.assertTrue(tenant_queue.hasEvents())
pipeline_queue = registry["tenant"]["pipeline"]
self.assertIsInstance(
pipeline_queue, event_queues.ManagementEventQueue
)
processed_events = 0
for event in global_queue:
for event in tenant_queue:
processed_events += 1
event_ltime = event.zuul_event_ltime
self.assertGreater(event_ltime, -1)
@ -301,33 +305,34 @@ class TestManagementEventQueue(EventQueueBaseTestCase):
def test_pipeline_management_events(self):
# Test that when a management event is forwarded from the
# global to the a pipeline-specific queue, it is not
# tenant to the a pipeline-specific queue, it is not
# prematurely acked and the future returns correctly.
global_queue = event_queues.GlobalManagementEventQueue(self.zk_client)
tenant_queue = event_queues.TenantManagementEventQueue(
self.zk_client, "tenant")
registry = event_queues.PipelineManagementEventQueue.createRegistry(
self.zk_client
)
event = model.PromoteEvent('tenant', 'check', ['1234,1'])
result_future = global_queue.put(event, needs_result=False)
result_future = tenant_queue.put(event, needs_result=False)
self.assertIsNone(result_future)
result_future = global_queue.put(event)
result_future = tenant_queue.put(event)
self.assertIsNotNone(result_future)
self.assertEqual(len(global_queue), 2)
self.assertTrue(global_queue.hasEvents())
self.assertEqual(len(tenant_queue), 2)
self.assertTrue(tenant_queue.hasEvents())
pipeline_queue = registry["tenant"]["pipeline"]
self.assertIsInstance(
pipeline_queue, event_queues.ManagementEventQueue
)
acked = 0
for event in global_queue:
for event in tenant_queue:
self.assertIsInstance(event, model.PromoteEvent)
# Forward event to pipeline management event queue
pipeline_queue.put(event)
global_queue.ackWithoutResult(event)
tenant_queue.ackWithoutResult(event)
acked += 1
self.assertEqual(acked, 2)
@ -335,8 +340,8 @@ class TestManagementEventQueue(EventQueueBaseTestCase):
# future should not be completed yet.
self.assertFalse(result_future.wait(0.1))
self.assertEqual(len(global_queue), 0)
self.assertFalse(global_queue.hasEvents())
self.assertEqual(len(tenant_queue), 0)
self.assertFalse(tenant_queue.hasEvents())
self.assertEqual(len(pipeline_queue), 2)
self.assertTrue(pipeline_queue.hasEvents())
@ -355,7 +360,8 @@ class TestManagementEventQueue(EventQueueBaseTestCase):
def test_management_events_client(self):
# Test management events from a second client
queue = event_queues.GlobalManagementEventQueue(self.zk_client)
queue = event_queues.TenantManagementEventQueue(
self.zk_client, "tenant")
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
@ -368,8 +374,8 @@ class TestManagementEventQueue(EventQueueBaseTestCase):
self.addCleanup(external_client.disconnect)
external_client.connect()
external_queue = event_queues.GlobalManagementEventQueue(
external_client)
external_queue = event_queues.TenantManagementEventQueue(
external_client, "tenant")
event = model.ReconfigureEvent(None)
result_future = external_queue.put(event)
@ -394,7 +400,8 @@ class TestManagementEventQueue(EventQueueBaseTestCase):
# Test management events from a second client which
# disconnects before the event is complete.
queue = event_queues.GlobalManagementEventQueue(self.zk_client)
queue = event_queues.TenantManagementEventQueue(
self.zk_client, "tenant")
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
@ -408,8 +415,8 @@ class TestManagementEventQueue(EventQueueBaseTestCase):
self.addCleanup(external_client.disconnect)
external_client.connect()
external_queue = event_queues.GlobalManagementEventQueue(
external_client)
external_queue = event_queues.TenantManagementEventQueue(
external_client, "tenant")
# Submit the event
event = model.ReconfigureEvent(None)
@ -492,28 +499,31 @@ class TestEventWatchers(EventQueueBaseTestCase):
if event.is_set():
break
def test_global_event_watcher(self):
def test_tenant_event_watcher(self):
event = threading.Event()
event_queues.GlobalEventWatcher(self.zk_client, event.set)
event_queues.EventWatcher(self.zk_client, event.set)
management_queue = event_queues.GlobalManagementEventQueue(
self.zk_client
management_queue = (
event_queues.TenantManagementEventQueue.createRegistry(
self.zk_client)
)
trigger_queue = event_queues.GlobalTriggerEventQueue(
trigger_queue = event_queues.TenantTriggerEventQueue.createRegistry(
self.zk_client, self.connections
)
self.assertFalse(event.is_set())
management_queue.put(model.ReconfigureEvent(None), needs_result=False)
management_queue["tenant"].put(model.ReconfigureEvent(None),
needs_result=False)
self._wait_for_event(event)
event.clear()
trigger_queue.put(self.driver.driver_name, DummyTriggerEvent())
trigger_queue["tenant"].put(self.driver.driver_name,
DummyTriggerEvent())
self._wait_for_event(event)
def test_pipeline_event_watcher(self):
event = threading.Event()
event_queues.PipelineEventWatcher(self.zk_client, event.set)
event_queues.EventWatcher(self.zk_client, event.set)
management_queues = (
event_queues.PipelineManagementEventQueue.createRegistry(

View File

@ -437,6 +437,10 @@ class TestScheduler(ZuulTestCase):
# Per-driver per-connection
self.assertReportedStat('zuul.event.gerrit.gerrit.comment-added',
value='1', kind='c')
self.assertReportedStat(
'zuul.tenant.tenant-one.trigger_events', value='0', kind='g')
self.assertReportedStat(
'zuul.tenant.tenant-one.management_events', value='0', kind='g')
self.assertReportedStat(
'zuul.tenant.tenant-one.pipeline.gate.current_changes',
value='1', kind='g')
@ -3463,27 +3467,30 @@ class TestScheduler(ZuulTestCase):
tenant = self.scheds.first.sched.abide.tenants['tenant-one']
(trusted, project) = tenant.getProject('org/project')
management_queue = self.scheds.first.sched.management_events[
'tenant-one']
with self.scheds.first.sched.run_handler_lock:
mgmt_queue_size = len(self.scheds.first.sched.management_events)
mgmt_queue_size = len(management_queue)
self.assertEqual(mgmt_queue_size, 0)
self.scheds.first.sched.reconfigureTenant(tenant, project, None)
mgmt_queue_size = len(self.scheds.first.sched.management_events)
mgmt_queue_size = len(management_queue)
self.assertEqual(mgmt_queue_size, 1)
self.scheds.first.sched.reconfigureTenant(tenant, project, None)
mgmt_queue_size = len(self.scheds.first.sched.management_events)
mgmt_queue_size = len(management_queue)
self.assertEqual(mgmt_queue_size, 2)
# The second event should be combined with the first so we should
# only see the merged entry when consuming from the queue.
mgmt_events = list(self.scheds.first.sched.management_events)
mgmt_events = list(management_queue)
self.assertEqual(len(mgmt_events), 1)
self.assertEqual(len(mgmt_events[0].merged_events), 1)
self.waitUntilSettled()
mgmt_queue_size = len(self.scheds.first.sched.management_events)
mgmt_queue_size = len(management_queue)
self.assertEqual(mgmt_queue_size, 0)
def test_live_reconfiguration(self):

View File

@ -753,7 +753,7 @@ class GithubEventConnector:
continue
self.connection.logEvent(event)
if isinstance(event, DequeueEvent):
self.connection.sched.addManagementEvent(event)
self.connection.sched.addChangeManagementEvent(event)
else:
self.connection.sched.addTriggerEvent(
self.connection.driver_name, event

View File

@ -76,10 +76,9 @@ from zuul.zk.components import (
)
from zuul.zk.config_cache import UnparsedConfigCache
from zuul.zk.event_queues import (
GlobalEventWatcher,
GlobalManagementEventQueue,
GlobalTriggerEventQueue,
PipelineEventWatcher,
EventWatcher,
TenantManagementEventQueue,
TenantTriggerEventQueue,
PipelineManagementEventQueue,
PipelineResultEventQueue,
PipelineTriggerEventQueue,
@ -165,20 +164,21 @@ class Scheduler(threading.Thread):
self.component_registry = ComponentRegistry(self.zk_client)
self.unparsed_config_cache = UnparsedConfigCache(self.zk_client)
# TODO (swestphahl): Remove after we've refactored reconfigurations
# to be performed on the tenant level.
self.reconfigure_event_queue = NamedQueue("ReconfigureEventQueue")
self.result_event_queue = NamedQueue("ResultEventQueue")
self.global_watcher = GlobalEventWatcher(
self.event_watcher = EventWatcher(
self.zk_client, self.wake_event.set
)
self.pipeline_watcher = PipelineEventWatcher(
self.zk_client, self.wake_event.set
)
self.management_events = GlobalManagementEventQueue(self.zk_client)
self.management_events = TenantManagementEventQueue.createRegistry(
self.zk_client)
self.pipeline_management_events = (
PipelineManagementEventQueue.createRegistry(
self.zk_client
)
)
self.trigger_events = GlobalTriggerEventQueue(
self.trigger_events = TenantTriggerEventQueue.createRegistry(
self.zk_client, self.connections
)
self.pipeline_trigger_events = (
@ -396,12 +396,10 @@ class Scheduler(threading.Thread):
merge_running += running
self.statsd.gauge('zuul.mergers.jobs_running', merge_running)
self.statsd.gauge('zuul.mergers.jobs_queued', merge_queue)
self.statsd.gauge('zuul.scheduler.eventqueues.trigger',
len(self.trigger_events))
self.statsd.gauge('zuul.scheduler.eventqueues.result',
self.result_event_queue.qsize())
self.statsd.gauge('zuul.scheduler.eventqueues.management',
len(self.management_events))
self.reconfigure_event_queue.qsize())
base = 'zuul.scheduler.eventqueues.connection'
for connection in self.connections.connections.values():
queue = connection.getEventQueue()
@ -410,6 +408,10 @@ class Scheduler(threading.Thread):
len(queue))
for tenant in self.abide.tenants.values():
self.statsd.gauge(f"zuul.tenant.{tenant.name}.management_events",
len(self.management_events[tenant.name]))
self.statsd.gauge(f"zuul.tenant.{tenant.name}.trigger_events",
len(self.trigger_events[tenant.name]))
trigger_event_queues = self.pipeline_trigger_events[tenant.name]
result_event_queues = self.pipeline_result_events[tenant.name]
management_event_queues = (
@ -489,11 +491,27 @@ class Scheduler(threading.Thread):
def addTriggerEvent(self, driver_name, event):
event.arrived_at_scheduler_timestamp = time.time()
self.trigger_events.put(driver_name, event)
self.wake_event.set()
for tenant in self.abide.tenants.values():
trusted, project = tenant.getProject(event.canonical_project_name)
def addManagementEvent(self, event):
self.management_events.put(event, needs_result=False)
if project is None:
continue
self.trigger_events[tenant.name].put(driver_name, event)
def addChangeManagementEvent(self, event):
tenant_name = event.tenant_name
pipeline_name = event.pipeline_name
tenant = self.abide.tenants.get(tenant_name)
if tenant is None:
raise ValueError(f'Unknown tenant {event.tenant_name}')
pipeline = tenant.layout.pipelines.get(pipeline_name)
if pipeline is None:
raise ValueError(f'Unknown pipeline {event.pipeline_name}')
self.pipeline_management_events[tenant_name][pipeline_name].put(
event, needs_result=False
)
def addResultEvent(self, event):
self.result_event_queue.put(event)
@ -577,7 +595,7 @@ class Scheduler(threading.Thread):
event = TenantReconfigureEvent(
tenant.name, project.canonical_name, branch
)
self.management_events.put(event, needs_result=False)
self.management_events[tenant.name].put(event, needs_result=False)
def fullReconfigureCommandHandler(self):
self._zuul_app.fullReconfigure()
@ -611,10 +629,12 @@ class Scheduler(threading.Thread):
else:
event = ReconfigureEvent()
result = self.management_events.put(event)
event.ack_ref = threading.Event()
self.reconfigure_event_queue.put(event)
self.wake_event.set()
self.log.debug("Waiting for reconfiguration")
result.wait()
event.ack_ref.wait()
self.log.debug("Reconfiguration complete")
self.last_reconfigured = int(time.time())
# TODOv3(jeblair): reconfigure time should be per-tenant
@ -713,7 +733,7 @@ class Scheduler(threading.Thread):
def promote(self, tenant_name, pipeline_name, change_ids):
event = PromoteEvent(tenant_name, pipeline_name, change_ids)
result = self.management_events.put(event)
result = self.management_events[tenant_name].put(event)
self.log.debug("Waiting for promotion")
result.wait()
self.log.debug("Promotion complete")
@ -732,7 +752,7 @@ class Scheduler(threading.Thread):
event = DequeueEvent(tenant_name, pipeline_name,
project.canonical_hostname, project.name,
change, ref)
result = self.management_events.put(event)
result = self.management_events[tenant_name].put(event)
self.log.debug("Waiting for dequeue")
result.wait()
self.log.debug("Dequeue complete")
@ -752,7 +772,7 @@ class Scheduler(threading.Thread):
event = EnqueueEvent(tenant_name, pipeline_name,
project.canonical_hostname, project.name,
change, ref, oldrev, newrev)
result = self.management_events.put(event)
result = self.management_events[tenant_name].put(event)
self.log.debug("Waiting for enqueue")
result.wait()
self.log.debug("Enqueue complete")
@ -1299,9 +1319,6 @@ class Scheduler(threading.Thread):
self.log.debug("Run handler awake")
self.run_handler_lock.acquire()
try:
if not self._stopped:
self.process_global_management_queue()
if not self._stopped:
self.process_management_queue()
@ -1310,9 +1327,6 @@ class Scheduler(threading.Thread):
if not self._stopped:
self.process_result_queue()
if not self._stopped:
self.process_global_trigger_queue()
if not self._stopped:
self.process_trigger_queue()
@ -1353,19 +1367,17 @@ class Scheduler(threading.Thread):
"End maintain connection cache for: %s" % connection)
self.log.debug("Connection cache size: %s" % len(relevant))
def process_global_trigger_queue(self):
for event in self.trigger_events:
def process_tenant_trigger_queue(self, tenant):
for event in self.trigger_events[tenant.name]:
log = get_annotated_logger(self.log, event.zuul_event_id)
log.debug("Forwarding trigger event %s", event)
try:
for tenant in self.abide.tenants.values():
try:
self._forward_trigger_event(event, tenant)
except Exception:
log.exception("Unable to forward event %s "
"to tenant %s", event, tenant.name)
self._forward_trigger_event(event, tenant)
except Exception:
log.exception("Unable to forward event %s "
"to tenant %s", event, tenant.name)
finally:
self.trigger_events.ack(event)
self.trigger_events[tenant.name].ack(event)
def _forward_trigger_event(self, event, tenant):
log = get_annotated_logger(self.log, event.zuul_event_id)
@ -1434,6 +1446,10 @@ class Scheduler(threading.Thread):
def process_trigger_queue(self):
for tenant in self.abide.tenants.values():
if self._stopped:
return
self.process_tenant_trigger_queue(tenant)
for pipeline in tenant.layout.pipelines.values():
for event in self.pipeline_trigger_events[tenant.name][
pipeline.name
@ -1473,15 +1489,11 @@ class Scheduler(threading.Thread):
if pipeline.manager.eventMatches(event, change):
pipeline.manager.addChange(change, event)
def process_global_management_queue(self):
for event in self.management_events:
def process_tenant_management_queue(self, tenant):
for event in self.management_events[tenant.name]:
event_forwarded = False
try:
if isinstance(event, ReconfigureEvent):
self._doReconfigureEvent(event)
elif isinstance(event, SmartReconfigureEvent):
self._doSmartReconfigureEvent(event)
elif isinstance(event, TenantReconfigureEvent):
if isinstance(event, TenantReconfigureEvent):
self._doTenantReconfigureEvent(event)
elif isinstance(event, (PromoteEvent, ChangeManagementEvent)):
event_forwarded = self._forward_management_event(event)
@ -1489,9 +1501,10 @@ class Scheduler(threading.Thread):
self.log.error("Unable to handle event %s", event)
finally:
if event_forwarded:
self.management_events.ackWithoutResult(event)
self.management_events[tenant.name].ackWithoutResult(
event)
else:
self.management_events.ack(event)
self.management_events[tenant.name].ack(event)
def _forward_management_event(self, event):
event_forwarded = False
@ -1515,7 +1528,26 @@ class Scheduler(threading.Thread):
return event_forwarded
def process_management_queue(self):
while not self.reconfigure_event_queue.empty() and not self._stopped:
self.log.debug("Fetching reconfiguration event")
event = self.reconfigure_event_queue.get()
try:
if isinstance(event, ReconfigureEvent):
self._doReconfigureEvent(event)
elif isinstance(event, SmartReconfigureEvent):
self._doSmartReconfigureEvent(event)
else:
self.log.error("Unable to handle event %s", event)
finally:
if event.ack_ref:
event.ack_ref.set()
self.reconfigure_event_queue.task_done()
for tenant in self.abide.tenants.values():
if self._stopped:
return
self.process_tenant_management_queue(tenant)
for pipeline in tenant.layout.pipelines.values():
for event in self.pipeline_management_events[tenant.name][
pipeline.name
@ -1965,12 +1997,15 @@ class Scheduler(threading.Thread):
websocket_url = get_default(self.config, 'web', 'websocket_url', None)
data['trigger_event_queue'] = {}
data['trigger_event_queue']['length'] = len(self.trigger_events)
data['trigger_event_queue']['length'] = len(
self.trigger_events[tenant_name])
data['result_event_queue'] = {}
data['result_event_queue']['length'] = \
self.result_event_queue.qsize()
data['management_event_queue'] = {}
data['management_event_queue']['length'] = len(self.management_events)
data['management_event_queue']['length'] = len(
self.management_events[tenant_name]
) + self.reconfigure_event_queue.qsize()
data['connection_event_queues'] = {}
for connection in self.connections.connections.values():
queue = connection.getEventQueue()

View File

@ -51,7 +51,6 @@ MANAGEMENT_EVENT_TYPE_MAP = {
}
TENANT_ROOT = "/zuul/events/tenant"
SCHEDULER_GLOBAL_ROOT = "/zuul/events/scheduler-global"
CONNECTION_ROOT = "/zuul/events/connection"
# This is the path to the serialized from of the event in ZK (along
@ -71,26 +70,9 @@ class EventPrefix(enum.Enum):
TRIGGER = "300"
class GlobalEventWatcher(ZooKeeperSimpleBase):
class EventWatcher(ZooKeeperSimpleBase):
log = logging.getLogger("zuul.zk.event_queues.EventQueueWatcher")
def __init__(self, client, callback):
super().__init__(client)
self.callback = callback
self.kazoo_client.ensure_path(SCHEDULER_GLOBAL_ROOT)
self.kazoo_client.ChildrenWatch(
SCHEDULER_GLOBAL_ROOT, self._eventWatch
)
def _eventWatch(self, event_list):
if event_list:
self.callback()
class PipelineEventWatcher(ZooKeeperSimpleBase):
log = logging.getLogger("zuul.zk.event_queues.EventQueueWatcher")
log = logging.getLogger("zuul.zk.event_queues.EventWatcher")
def __init__(self, client, callback):
super().__init__(client)
@ -100,6 +82,11 @@ class PipelineEventWatcher(ZooKeeperSimpleBase):
self.kazoo_client.ensure_path(TENANT_ROOT)
self.kazoo_client.ChildrenWatch(TENANT_ROOT, self._tenantWatch)
def _makePipelineWatcher(self, tenant_name):
def watch(children, event=None):
return self._pipelineWatch(tenant_name, children)
return watch
def _tenantWatch(self, tenants):
for tenant_name in tenants:
tenant_path = "/".join((TENANT_ROOT, tenant_name))
@ -107,23 +94,27 @@ class PipelineEventWatcher(ZooKeeperSimpleBase):
if tenant_path in self.watched_tenants:
continue
events_path = f"{tenant_path}/events"
self.kazoo_client.ensure_path(events_path)
self.kazoo_client.ChildrenWatch(
tenant_path,
lambda p: self._pipelineWatch(tenant_name, p),
)
events_path, self._eventWatch, send_event=True)
pipelines_path = f"{tenant_path}/pipelines"
self.kazoo_client.ensure_path(pipelines_path)
self.kazoo_client.ChildrenWatch(
pipelines_path, self._makePipelineWatcher(tenant_name))
self.watched_tenants.add(tenant_path)
def _pipelineWatch(self, tenant_name, pipelines):
for pipeline_name in pipelines:
pipeline_path = "/".join((TENANT_ROOT, tenant_name, pipeline_name))
pipeline_path = "/".join((TENANT_ROOT, tenant_name, "pipelines",
pipeline_name))
if pipeline_path in self.watched_pipelines:
continue
self.kazoo_client.ChildrenWatch(
pipeline_path,
self._eventWatch,
send_event=True,
)
pipeline_path, self._eventWatch, send_event=True)
self.watched_pipelines.add(pipeline_path)
def _eventWatch(self, event_list, event=None):
@ -394,7 +385,8 @@ class PipelineManagementEventQueue(ManagementEventQueue):
)
def __init__(self, client, tenant_name, pipeline_name):
event_root = "/".join((TENANT_ROOT, tenant_name, pipeline_name))
event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines",
pipeline_name))
super().__init__(client, event_root, EventPrefix.MANAGEMENT)
@classmethod
@ -416,11 +408,12 @@ class PipelineManagementEventQueue(ManagementEventQueue):
return DefaultKeyDict(lambda p: cls(client, tenant_name, p))
class GlobalManagementEventQueue(ManagementEventQueue):
log = logging.getLogger("zuul.zk.event_queues.GlobalManagementEventQueue")
class TenantManagementEventQueue(ManagementEventQueue):
log = logging.getLogger("zuul.zk.event_queues.TenantManagementEventQueue")
def __init__(self, client):
super().__init__(client, SCHEDULER_GLOBAL_ROOT, EventPrefix.MANAGEMENT)
def __init__(self, client, tenant_name):
event_root = "/".join((TENANT_ROOT, tenant_name, "events"))
super().__init__(client, event_root, EventPrefix.MANAGEMENT)
def ackWithoutResult(self, event):
"""
@ -431,6 +424,19 @@ class GlobalManagementEventQueue(ManagementEventQueue):
for merged_event in event.merged_events:
super(ManagementEventQueue, self).ack(merged_event)
@classmethod
def createRegistry(cls, client):
"""Create a tenant queue registry
Returns a dictionary of: tenant_name -> EventQueue
Queues are dynamically created with the originally supplied ZK
client as they are accessed via the registry (so new tenants
show up automatically).
"""
return DefaultKeyDict(lambda t: cls(client, t))
class PipelineResultEventQueue(SchedulerEventQueue):
"""Result events via ZooKeeper"""
@ -438,7 +444,8 @@ class PipelineResultEventQueue(SchedulerEventQueue):
log = logging.getLogger("zuul.zk.event_queues.PipelineResultEventQueue")
def __init__(self, client, tenant_name, pipeline_name):
event_root = "/".join((TENANT_ROOT, tenant_name, pipeline_name))
event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines",
pipeline_name))
super().__init__(client, event_root, EventPrefix.RESULT)
@classmethod
@ -513,18 +520,33 @@ class TriggerEventQueue(SchedulerEventQueue):
yield event
class GlobalTriggerEventQueue(TriggerEventQueue):
log = logging.getLogger("zuul.zk.event_queues.GlobalTriggerEventQueue")
class TenantTriggerEventQueue(TriggerEventQueue):
log = logging.getLogger("zuul.zk.event_queues.TenantTriggerEventQueue")
def __init__(self, client, connections):
super().__init__(client, SCHEDULER_GLOBAL_ROOT, connections)
def __init__(self, client, connections, tenant_name):
event_root = "/".join((TENANT_ROOT, tenant_name, "events"))
super().__init__(client, event_root, connections)
@classmethod
def createRegistry(cls, client, connections):
"""Create a tenant queue registry
Returns a dictionary of: tenant_name -> EventQueue
Queues are dynamically created with the originally supplied ZK
client as they are accessed via the registry (so new tenants
show up automatically).
"""
return DefaultKeyDict(lambda t: cls(client, connections, t))
class PipelineTriggerEventQueue(TriggerEventQueue):
log = logging.getLogger("zuul.zk.event_queues.PipelineTriggerEventQueue")
def __init__(self, client, tenant_name, pipeline_name, connections):
event_root = "/".join((TENANT_ROOT, tenant_name, pipeline_name))
event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines",
pipeline_name))
super().__init__(client, event_root, connections)
@classmethod