Implementation of Zookeeper backed event queues

Event queues are namespaced by tenant and event type (management, result
and trigger). Events will be stored as sequential nodes in Zookeeper
using the follwing base path:

    /zuul/events/tenants/<tenant-name>/{mangement,result,trigger}/

This allows us to use a simple per-tenant lock in a multi scheduler
deployment that needs to be held while processing events and pipelines.

Change-Id: I9c4835e6a6ac5a83d9f63af146c8dddea9789108
This commit is contained in:
Simon Westphahl 2020-10-30 15:50:29 +01:00
parent 4457ca1086
commit b23b9d7844
4 changed files with 854 additions and 0 deletions

View File

@ -0,0 +1,367 @@
# Copyright 2021 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest.mock import patch
import testtools
from zuul import model
from zuul.driver import Driver, TriggerInterface
from zuul.lib.connections import ConnectionRegistry
from zuul.zk import ZooKeeperClient, event_queues
from tests.base import BaseTestCase
class EventQueueBaseTestCase(BaseTestCase):
def setUp(self):
super().setUp()
self.setupZK()
self.zk_client = ZooKeeperClient(
self.zk_chroot_fixture.zk_hosts,
tls_cert=self.zk_chroot_fixture.zookeeper_cert,
tls_key=self.zk_chroot_fixture.zookeeper_key,
tls_ca=self.zk_chroot_fixture.zookeeper_ca
)
self.addCleanup(self.zk_client.disconnect)
self.zk_client.connect()
self.connections = ConnectionRegistry()
self.addCleanup(self.connections.stop)
class DummyEvent(model.AbstractEvent):
def toDict(self):
return {}
def updateFromDict(self):
pass
@classmethod
def fromDict(cls, d):
return cls()
class DummyPrefix:
value = "dummy"
class DummyEventQueue(event_queues.ZooKeeperEventQueue):
def put(self, event):
self._put(event.toDict())
def __iter__(self):
for data, ack_ref in self._iterEvents():
event = DummyEvent.fromDict(data)
event.ack_ref = ack_ref
yield event
class TestEventQueue(EventQueueBaseTestCase):
def setUp(self):
super().setUp()
self.queue = DummyEventQueue(self.zk_client, "root", DummyPrefix())
def test_missing_ack_ref(self):
# Every event from a ZK event queue should have an ack_ref
# attached to it when it is deserialized; ensure that an error
# is raised if we try to ack an event without one.
with testtools.ExpectedException(RuntimeError):
self.queue.ack(DummyEvent())
def test_double_ack(self):
# Test that if we ack an event twice, an exception isn't
# raised.
self.queue.put(DummyEvent())
self.assertEqual(len(self.queue), 1)
event = next(iter(self.queue))
self.queue.ack(event)
self.assertEqual(len(self.queue), 0)
# Should not raise an exception
self.queue.ack(event)
def test_invalid_json_ignored(self):
# Test that invalid json is automatically removed.
event_path = self.queue._put({})
self.zk_client.client.set(event_path, b"{ invalid")
self.assertEqual(len(self.queue), 1)
self.assertEqual(list(self.queue._iterEvents()), [])
self.assertEqual(len(self.queue), 0)
class DummyTriggerEvent(model.TriggerEvent):
pass
class DummyDriver(Driver, TriggerInterface):
name = driver_name = "dummy"
def getTrigger(self, connection, config=None):
pass
def getTriggerSchema(self):
pass
def getTriggerEventClass(self):
return DummyTriggerEvent
class TestTriggerEventQueue(EventQueueBaseTestCase):
def setUp(self):
super().setUp()
self.driver = DummyDriver()
self.connections.registerDriver(self.driver)
def test_global_trigger_events(self):
# Test enqueue/dequeue of the global trigger event queue.
queue = event_queues.GlobalTriggerEventQueue(
self.zk_client, self.connections
)
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
event = DummyTriggerEvent()
queue.put(self.driver.driver_name, event)
queue.put(self.driver.driver_name, event)
self.assertEqual(len(queue), 2)
self.assertTrue(queue.hasEvents())
processed = 0
for event in queue:
self.assertIsInstance(event, DummyTriggerEvent)
processed += 1
self.assertEqual(processed, 2)
self.assertEqual(len(queue), 2)
self.assertTrue(queue.hasEvents())
acked = 0
for event in queue:
queue.ack(event)
acked += 1
self.assertEqual(acked, 2)
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
def test_pipeline_trigger_events(self):
# Test enqueue/dequeue of pipeline-specific trigger event
# queues.
registry = event_queues.PipelineTriggerEventQueue.createRegistry(
self.zk_client, self.connections
)
queue = registry["tenant"]["pipeline"]
self.assertIsInstance(queue, event_queues.TriggerEventQueue)
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
event = DummyTriggerEvent()
queue.put(self.driver.driver_name, event)
self.assertEqual(len(queue), 1)
self.assertTrue(queue.hasEvents())
other_queue = registry["other_tenant"]["pipeline"]
self.assertEqual(len(other_queue), 0)
self.assertFalse(other_queue.hasEvents())
acked = 0
for event in queue:
self.assertIsInstance(event, DummyTriggerEvent)
queue.ack(event)
acked += 1
self.assertEqual(acked, 1)
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
class TestManagementEventQueue(EventQueueBaseTestCase):
def test_management_events(self):
# Test enqueue/dequeue of the global management event queue.
queue = event_queues.GlobalManagementEventQueue(self.zk_client)
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
event = model.ReconfigureEvent(None)
result_future = queue.put(event, needs_result=False)
self.assertIsNone(result_future)
result_future = queue.put(event)
self.assertIsNotNone(result_future)
self.assertEqual(len(queue), 2)
self.assertTrue(queue.hasEvents())
self.assertFalse(result_future.wait(0.1))
acked = 0
for event in queue:
self.assertIsInstance(event, model.ReconfigureEvent)
queue.ack(event)
acked += 1
self.assertEqual(acked, 2)
self.assertTrue(result_future.wait(5))
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
def test_management_event_error(self):
# Test that management event errors are reported.
queue = event_queues.GlobalManagementEventQueue(self.zk_client)
event = model.ReconfigureEvent(None)
result_future = queue.put(event)
acked = 0
for event in queue:
event.traceback = "hello traceback"
queue.ack(event)
acked += 1
self.assertEqual(acked, 1)
with testtools.ExpectedException(RuntimeError, msg="hello traceback"):
self.assertFalse(result_future.wait(5))
def test_event_merge(self):
# Test that similar management events (eg, reconfiguration of
# two projects) can be merged.
queue = event_queues.GlobalManagementEventQueue(self.zk_client)
event = model.TenantReconfigureEvent("tenant", "project", "master")
queue.put(event, needs_result=False)
event = model.TenantReconfigureEvent("tenant", "other", "branch")
queue.put(event, needs_result=False)
self.assertEqual(len(queue), 2)
events = list(queue)
self.assertEqual(len(events), 1)
event = events[0]
self.assertEqual(len(event.merged_events), 1)
self.assertEqual(
event.project_branches,
set([("project", "master"), ("other", "branch")])
)
queue.ack(event)
self.assertFalse(queue.hasEvents())
def test_pipeline_management_events(self):
# Test that when a management event is forwarded from the
# global to the a pipeline-specific queue, it is not
# prematurely acked and the future returns correctly.
global_queue = event_queues.GlobalManagementEventQueue(self.zk_client)
registry = event_queues.PipelineManagementEventQueue.createRegistry(
self.zk_client
)
event = model.PromoteEvent('tenant', 'check', ['1234,1'])
result_future = global_queue.put(event, needs_result=False)
self.assertIsNone(result_future)
result_future = global_queue.put(event)
self.assertIsNotNone(result_future)
self.assertEqual(len(global_queue), 2)
self.assertTrue(global_queue.hasEvents())
pipeline_queue = registry["tenant"]["pipeline"]
self.assertIsInstance(
pipeline_queue, event_queues.ManagementEventQueue
)
acked = 0
for event in global_queue:
self.assertIsInstance(event, model.PromoteEvent)
# Forward event to pipeline management event queue
pipeline_queue.put(event)
global_queue.ackWithoutResult(event)
acked += 1
self.assertEqual(acked, 2)
# Event was just forwarded and since we expect a result, the
# future should not be completed yet.
self.assertFalse(result_future.wait(0.1))
self.assertEqual(len(global_queue), 0)
self.assertFalse(global_queue.hasEvents())
self.assertEqual(len(pipeline_queue), 2)
self.assertTrue(pipeline_queue.hasEvents())
acked = 0
for event in pipeline_queue:
self.assertIsInstance(event, model.PromoteEvent)
pipeline_queue.ack(event)
acked += 1
self.assertEqual(acked, 2)
self.assertTrue(result_future.wait(5))
self.assertEqual(len(pipeline_queue), 0)
self.assertFalse(pipeline_queue.hasEvents())
# TODO: use actual model.ResultEvent once it inherits from
# AbstractEvent and implements serialization.
class DummyResultEvent(model.ResultEvent, DummyEvent):
pass
@patch.dict(event_queues.RESULT_EVENT_TYPE_MAP,
{"DummyResultEvent": DummyResultEvent})
class TestResultEventQueue(EventQueueBaseTestCase):
def test_pipeline_result_events(self):
# Test enqueue/dequeue of result events.
registry = event_queues.PipelineResultEventQueue.createRegistry(
self.zk_client
)
queue = registry["tenant"]["pipeline"]
self.assertIsInstance(queue, event_queues.PipelineResultEventQueue)
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
event = DummyResultEvent()
queue.put(event)
self.assertEqual(len(queue), 1)
self.assertTrue(queue.hasEvents())
other_queue = registry["other_tenant"]["pipeline"]
self.assertEqual(len(other_queue), 0)
self.assertFalse(other_queue.hasEvents())
acked = 0
for event in queue:
self.assertIsInstance(event, DummyResultEvent)
queue.ack(event)
acked += 1
self.assertEqual(acked, 1)
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())

26
zuul/lib/collections.py Normal file
View File

@ -0,0 +1,26 @@
# Copyright 2020 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import defaultdict
class DefaultKeyDict(defaultdict):
"""A defaultdict with the key passed to the default factory."""
def __init__(self, default_factory):
self.factory = default_factory
def __missing__(self, key):
item = self[key] = self.factory(key)
return item

View File

@ -3470,6 +3470,9 @@ class Change(Branch):
class AbstractEvent(abc.ABC):
"""Base class defining the interface for all events."""
# Opaque identifier in order to acknowledge an event
ack_ref = None
@abc.abstractmethod
def toDict(self):
pass
@ -3490,7 +3493,10 @@ class ManagementEvent(AbstractEvent):
def __init__(self):
self._wait_event = threading.Event()
self._exc_info = None
self.traceback = None
self.zuul_event_id = None
# Opaque identifier in order to report the result of an event
self.result_ref = None
def exception(self, exc_info):
self._exc_info = exc_info
@ -3576,6 +3582,7 @@ class TenantReconfigureEvent(ManagementEvent):
super(TenantReconfigureEvent, self).__init__()
self.tenant_name = tenant_name
self.project_branches = set([(project, branch)])
self.merged_events = []
def __ne__(self, other):
return not self.__eq__(other)
@ -3591,6 +3598,7 @@ class TenantReconfigureEvent(ManagementEvent):
if self.tenant_name != other.tenant_name:
raise Exception("Can not merge events from different tenants")
self.project_branches |= other.project_branches
self.merged_events.append(other)
def toDict(self):
d = super().toDict()
@ -3840,6 +3848,7 @@ class TriggerEvent(AbstractEvent):
self.zuul_event_id = None
self.timestamp = None
self.arrived_at_scheduler_timestamp = None
self.driver_name = None
def toDict(self):
return {

452
zuul/zk/event_queues.py Normal file
View File

@ -0,0 +1,452 @@
# Copyright 2020 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import enum
import json
import logging
import threading
import time
import uuid
from collections import namedtuple
from collections.abc import Iterable
from contextlib import suppress
from kazoo.exceptions import NoNodeError
from zuul import model
from zuul.lib.collections import DefaultKeyDict
from zuul.zk import ZooKeeperBase
RESULT_EVENT_TYPE_MAP = {
"BuildCompletedEvent": model.BuildCompletedEvent,
"BuildPausedEvent": model.BuildPausedEvent,
"BuildStartedEvent": model.BuildStartedEvent,
"FilesChangesCompletedEvent": model.FilesChangesCompletedEvent,
"MergeCompletedEvent": model.MergeCompletedEvent,
"NodesProvisionedEvent": model.NodesProvisionedEvent,
}
MANAGEMENT_EVENT_TYPE_MAP = {
"DequeueEvent": model.DequeueEvent,
"EnqueueEvent": model.EnqueueEvent,
"PromoteEvent": model.PromoteEvent,
"ReconfigureEvent": model.ReconfigureEvent,
"SmartReconfigureEvent": model.SmartReconfigureEvent,
"TenantReconfigureEvent": model.TenantReconfigureEvent,
}
TENANT_ROOT = "/zuul/events/tenant"
SCHEDULER_GLOBAL_ROOT = "/zuul/events/scheduler-global"
# This is the path to the serialized from of the event in ZK (along
# with the version when it was read (which should not change since
# events are immutable in queue)). When processing of the event is
# complete, this is the path that should be deleted in order to
# acknowledge it and prevent re-processing. Instances of this are
# dynamically created and attached to de-serialized Event instances.
EventAckRef = namedtuple("EventAckRef", ("path", "version"))
UNKNOWN_ZVERSION = -1
class EventPrefix(enum.Enum):
MANAGEMENT = "100"
RESULT = "200"
TRIGGER = "300"
class ZooKeeperEventQueue(ZooKeeperBase, Iterable):
"""Abstract API for tenant specific events via ZooKeeper
The lifecycle of a global (not pipeline-specific) event is:
* Serialized form of event added to ZK queue.
* During queue processing, events are de-serialized and
AbstractEvent subclasses are instantiated. An EventAckRef is
associated with the event instance in order to maintain the link
to the serialized form.
* When event processing is complete, the EventAckRef is used to
delete the original event. If the event requires a result
(e.g., a management event that returns data) the result will be
written to a pre-determined location. A future can watch for
the result to appear at that location.
Pipeline specific events usually start out as global events, but
upon processing, may be forwarded to pipeline-specific queues. In
these cases, the original event will be deleted as above, and a
new, identical event will be created in the pipeline-specific
queue. If the event expects a result, no result will be written
upon forwarding; the result will only be written when the
forwarded event is processed.
"""
log = logging.getLogger("zuul.zk.event_queues.ZooKeeperEventQueue")
def __init__(self, client, event_root, event_prefix):
super().__init__(client)
self.event_prefix = event_prefix
self.event_root = event_root
self.kazoo_client.ensure_path(self.event_root)
def __len__(self):
try:
return len(
[e for e in self.kazoo_client.get_children(self.event_root)
if e.startswith(self.event_prefix.value)]
)
except NoNodeError:
return 0
def hasEvents(self):
return bool(len(self))
def ack(self, event):
# Event.ack_ref is an EventAckRef, previously attached to an
# event object when it was de-serialized.
if not event.ack_ref:
raise RuntimeError("Cannot ack event %s without reference", event)
try:
self.kazoo_client.delete(
event.ack_ref.path,
version=event.ack_ref.version,
recursive=True,
)
except NoNodeError:
self.log.warning("Event %s was already acknowledged", event)
def _put(self, data):
event_path = "{}/{}-".format(self.event_root, self.event_prefix.value)
return self.kazoo_client.create(
event_path,
json.dumps(data).encode("utf-8"),
sequence=True,
makepath=True,
)
def _iterEvents(self):
try:
events = self.kazoo_client.get_children(self.event_root)
except NoNodeError:
return
# We need to sort this ourself, since Kazoo doesn't guarantee any
# ordering of the returned children.
events = sorted(
e for e in events if e.startswith(self.event_prefix.value)
)
for event_id in events:
path = "/".join((self.event_root, event_id))
# TODO: implement sharding of large events
data, zstat = self.kazoo_client.get(path)
try:
event = json.loads(data)
except json.JSONDecodeError:
self.log.exception("Malformed event data in %s", path)
self._remove(path)
continue
yield event, EventAckRef(path, zstat.version)
def _remove(self, path, version=UNKNOWN_ZVERSION):
with suppress(NoNodeError):
self.kazoo_client.delete(path, version=version, recursive=True)
class ManagementEventResultFuture(ZooKeeperBase):
"""Returned when a management event is put into a queue."""
log = logging.getLogger("zuul.zk.event_queues.MangementEventResultFuture")
def __init__(self, client, result_path):
super().__init__(client)
self._result_path = result_path
self._wait_event = threading.Event()
self.kazoo_client.DataWatch(self._result_path, self._resultCallback)
def _resultCallback(self, data=None, stat=None):
if data is None:
# Igore events w/o any data
return None
self._wait_event.set()
# Stop the watch if we got a result
return False
def wait(self, timeout=None):
"""Wait until the result for this event has been written."""
# Note that due to event forwarding, the only way to confirm
# that an event has been processed is to check for a result;
# the original event may have been deleted when forwaded to a
# different queue.
try:
if not self._wait_event.wait(timeout):
return False
try:
data, _ = self.kazoo_client.get(self._result_path)
result = json.loads(data.decode("utf-8"))
except json.JSONDecodeError:
self.log.exception(
"Malformed result data in %s", self._result_path
)
raise
tb = result.get("traceback")
if tb is not None:
# TODO: raise some kind of ManagementEventException here
raise RuntimeError(tb)
finally:
with suppress(NoNodeError):
self.kazoo_client.delete(self._result_path)
return True
class ManagementEventQueue(ZooKeeperEventQueue):
"""Management events via ZooKeeper"""
RESULTS_ROOT = "/zuul/results/management"
log = logging.getLogger("zuul.zk.event_queues.ManagementEventQueue")
def put(self, event, needs_result=True):
result_path = None
# If this event is forwarded it might have a result ref that
# we need to forward.
if event.result_ref:
result_path = event.result_ref
elif needs_result:
result_path = "/".join((self.RESULTS_ROOT, str(uuid.uuid4())))
data = {
"event_type": type(event).__name__,
"event_data": event.toDict(),
"result_path": result_path,
}
self._put(data)
if needs_result and result_path:
return ManagementEventResultFuture(self.client, result_path)
return None
def __iter__(self):
event_list = []
for data, ack_ref in self._iterEvents():
try:
event_class = MANAGEMENT_EVENT_TYPE_MAP[data["event_type"]]
event_data = data["event_data"]
result_path = data["result_path"]
except KeyError:
self.log.warning("Malformed event found: %s", data)
self._remove(ack_ref.path, ack_ref.version)
continue
event = event_class.fromDict(event_data)
event.ack_ref = ack_ref
event.result_ref = result_path
with suppress(ValueError):
other_event = event_list[event_list.index(event)]
if isinstance(other_event, model.TenantReconfigureEvent):
other_event.merge(event)
continue
event_list.append(event)
yield from event_list
def ack(self, event):
"""Acknowledge the event (by deleting it from the queue)"""
# Note: the result is reported first to ensure that the
# originator of the event which may be waiting on a future
# receives a result, or otherwise this event is considered
# unprocessed and remains on the queue.
self._reportResult(event)
super().ack(event)
if isinstance(event, model.TenantReconfigureEvent):
for merged_event in event.merged_events:
merged_event.traceback = event.traceback
self._reportResult(merged_event)
super().ack(merged_event)
def _reportResult(self, event):
if not event.result_ref:
return
# TODO: Add a cleanup thread that deletes old events.
result_data = {"traceback": event.traceback,
"timestamp": time.monotonic()}
self.kazoo_client.create(
event.result_ref,
json.dumps(result_data).encode("utf-8"),
makepath=True,
)
class PipelineManagementEventQueue(ManagementEventQueue):
log = logging.getLogger(
"zuul.zk.event_queues.PipelineManagementEventQueue"
)
def __init__(self, client, tenant_name, pipeline_name):
event_root = "/".join((TENANT_ROOT, tenant_name, pipeline_name))
super().__init__(client, event_root, EventPrefix.MANAGEMENT)
@classmethod
def createRegistry(cls, client):
"""Create a tenant/pipeline queue registry
Returns a nested dictionary of:
tenant_name -> pipeline_name -> EventQueue
Queues are dynamically created with the originally supplied ZK
client as they are accessed via the registry (so new tenants
or pipelines show up automatically).
"""
return DefaultKeyDict(lambda t: cls._createRegistry(client, t))
@classmethod
def _createRegistry(cls, client, tenant_name):
return DefaultKeyDict(lambda p: cls(client, tenant_name, p))
class GlobalManagementEventQueue(ManagementEventQueue):
log = logging.getLogger("zuul.zk.event_queues.GlobalManagementEventQueue")
def __init__(self, client):
super().__init__(client, SCHEDULER_GLOBAL_ROOT, EventPrefix.MANAGEMENT)
def ackWithoutResult(self, event):
"""
Used to ack a management event when forwarding to a pipeline queue
"""
super(ManagementEventQueue, self).ack(event)
if isinstance(event, model.TenantReconfigureEvent):
for merged_event in event.merged_events:
super(ManagementEventQueue, self).ack(merged_event)
class PipelineResultEventQueue(ZooKeeperEventQueue):
"""Result events via ZooKeeper"""
log = logging.getLogger("zuul.zk.event_queues.PipelineResultEventQueue")
def __init__(self, client, tenant_name, pipeline_name):
event_root = "/".join((TENANT_ROOT, tenant_name, pipeline_name))
super().__init__(client, event_root, EventPrefix.RESULT)
@classmethod
def createRegistry(cls, client):
"""Create a tenant/pipeline queue registry
Returns a nested dictionary of:
tenant_name -> pipeline_name -> EventQueue
Queues are dynamically created with the originally supplied ZK
client as they are accessed via the registry (so new tenants
or pipelines show up automatically).
"""
return DefaultKeyDict(lambda t: cls._createRegistry(client, t))
@classmethod
def _createRegistry(cls, client, tenant_name):
return DefaultKeyDict(lambda p: cls(client, tenant_name, p))
def put(self, event):
data = {
"event_type": type(event).__name__,
"event_data": event.toDict(),
}
self._put(data)
def __iter__(self):
for data, ack_ref in self._iterEvents():
try:
event_class = RESULT_EVENT_TYPE_MAP[data["event_type"]]
event_data = data["event_data"]
except KeyError:
self.log.warning("Malformed event found: %s", data)
self._remove(ack_ref.path, ack_ref.version)
continue
event = event_class.fromDict(event_data)
event.ack_ref = ack_ref
yield event
class TriggerEventQueue(ZooKeeperEventQueue):
"""Trigger events via ZooKeeper"""
log = logging.getLogger("zuul.zk.event_queues.TriggerEventQueue")
def __init__(self, client, event_root, connections):
self.connections = connections
super().__init__(client, event_root, EventPrefix.TRIGGER)
def put(self, driver_name, event):
data = {
"driver_name": driver_name,
"event_data": event.toDict(),
}
self._put(data)
def __iter__(self):
for data, ack_ref in self._iterEvents():
try:
event_class = self.connections.getTriggerEventClass(
data["driver_name"]
)
event_data = data["event_data"]
except KeyError:
self.log.warning("Malformed event found: %s", data)
self._remove(ack_ref.path, ack_ref.version)
continue
event = event_class.fromDict(event_data)
event.ack_ref = ack_ref
event.driver_name = data["driver_name"]
yield event
class GlobalTriggerEventQueue(TriggerEventQueue):
log = logging.getLogger("zuul.zk.event_queues.GlobalTriggerEventQueue")
def __init__(self, client, connections):
super().__init__(client, SCHEDULER_GLOBAL_ROOT, connections)
class PipelineTriggerEventQueue(TriggerEventQueue):
log = logging.getLogger("zuul.zk.event_queues.PipelineTriggerEventQueue")
def __init__(self, client, tenant_name, pipeline_name, connections):
event_root = "/".join((TENANT_ROOT, tenant_name, pipeline_name))
super().__init__(client, event_root, connections)
@classmethod
def createRegistry(cls, client, connections):
"""Create a tenant/pipeline queue registry
Returns a nested dictionary of:
tenant_name -> pipeline_name -> EventQueue
Queues are dynamically created with the originally supplied ZK
client and connection registry as they are accessed via the
queue registry (so new tenants or pipelines show up
automatically).
"""
return DefaultKeyDict(
lambda t: cls._createRegistry(client, t, connections)
)
@classmethod
def _createRegistry(cls, client, tenant_name, connections):
return DefaultKeyDict(
lambda p: cls(client, tenant_name, p, connections)
)