From 1ef47dbbd28c206110a7154d9dc6dd42d4af1c5f Mon Sep 17 00:00:00 2001 From: Simon Westphahl Date: Fri, 12 Feb 2021 14:08:18 +0100 Subject: [PATCH] Implement Zookeeper backed connection event queues Connection evens will be stored in `/zuul/events/connection//events/`. The implementation utilizes Kazoo's leader election recipe to ensure that there is only one scheduler processing events for a given connection at the same time. Elections use `/zuul/events/connection//election` as the root path. Change-Id: I70ff1b522475a4873999b89e2f5a9707e87a8906 --- tests/unit/test_event_queues.py | 47 +++++++- zuul/model.py | 11 +- zuul/zk/event_queues.py | 193 +++++++++++++++++++++----------- 3 files changed, 178 insertions(+), 73 deletions(-) diff --git a/tests/unit/test_event_queues.py b/tests/unit/test_event_queues.py index 562f62d984..14df4d18f9 100644 --- a/tests/unit/test_event_queues.py +++ b/tests/unit/test_event_queues.py @@ -57,10 +57,6 @@ class DummyEvent(model.AbstractEvent): return cls() -class DummyPrefix: - value = "dummy" - - class DummyEventQueue(event_queues.ZooKeeperEventQueue): def put(self, event): @@ -77,7 +73,7 @@ class TestEventQueue(EventQueueBaseTestCase): def setUp(self): super().setUp() - self.queue = DummyEventQueue(self.zk_client, "root", DummyPrefix()) + self.queue = DummyEventQueue(self.zk_client, "root") def test_missing_ack_ref(self): # Every event from a ZK event queue should have an ack_ref @@ -429,3 +425,44 @@ class TestEventWatchers(EventQueueBaseTestCase): result_queues["other-tenant"]["post"].put(DummyResultEvent()) self._wait_for_event(event) + + +class TestConnectionEventQueue(EventQueueBaseTestCase): + + def test_connection_events(self): + # Test enqueue/dequeue of the connection event queue. + queue = event_queues.ConnectionEventQueue(self.zk_client, "dummy") + + self.assertEqual(len(queue), 0) + self.assertFalse(queue.hasEvents()) + + payload = {"message": "hello world!"} + queue.put(payload) + queue.put(payload) + + self.assertEqual(len(queue), 2) + self.assertTrue(queue.hasEvents()) + + acked = 0 + for event in queue: + self.assertIsInstance(event, model.ConnectionEvent) + self.assertEqual(event, payload) + queue.ack(event) + acked += 1 + + self.assertEqual(acked, 2) + self.assertEqual(len(queue), 0) + self.assertFalse(queue.hasEvents()) + + def test_event_watch(self): + # Test the registered function is called on new events. + queue = event_queues.ConnectionEventQueue(self.zk_client, "dummy") + + event = threading.Event() + queue.registerEventWatch(event.set) + self.assertFalse(event.is_set()) + + queue.put({"message": "hello world!"}) + for _ in iterate_timeout(5, "event set"): + if event.is_set(): + break diff --git a/zuul/model.py b/zuul/model.py index 99f781e0bc..a9d4fa49a4 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -14,7 +14,7 @@ # under the License. import abc -from collections import OrderedDict, defaultdict +from collections import OrderedDict, defaultdict, UserDict import copy import json import logging @@ -3489,6 +3489,15 @@ class AbstractEvent(abc.ABC): return event +class ConnectionEvent(AbstractEvent, UserDict): + + def toDict(self): + return self.data + + def updateFromDict(self, d): + self.data.update(d) + + class ManagementEvent(AbstractEvent): """An event that should be processed within the main queue run loop""" diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py index e5554a765f..7d280797f8 100644 --- a/zuul/zk/event_queues.py +++ b/zuul/zk/event_queues.py @@ -13,6 +13,7 @@ # under the License. import enum +import functools import json import logging import threading @@ -49,6 +50,7 @@ MANAGEMENT_EVENT_TYPE_MAP = { TENANT_ROOT = "/zuul/events/tenant" SCHEDULER_GLOBAL_ROOT = "/zuul/events/scheduler-global" +CONNECTION_ROOT = "/zuul/events/connection" # This is the path to the serialized from of the event in ZK (along # with the version when it was read (which should not change since @@ -133,6 +135,79 @@ class PipelineEventWatcher(ZooKeeperBase): class ZooKeeperEventQueue(ZooKeeperBase, Iterable): + """Abstract API for events via ZooKeeper""" + + log = logging.getLogger("zuul.zk.event_queues.ZooKeeperEventQueue") + + def __init__(self, client, event_root): + super().__init__(client) + self.event_root = event_root + self.kazoo_client.ensure_path(self.event_root) + + def _listEvents(self): + return self.kazoo_client.get_children(self.event_root) + + def __len__(self): + try: + return len(self._listEvents()) + except NoNodeError: + return 0 + + def hasEvents(self): + return bool(len(self)) + + def ack(self, event): + # Event.ack_ref is an EventAckRef, previously attached to an + # event object when it was de-serialized. + if not event.ack_ref: + raise RuntimeError("Cannot ack event %s without reference", event) + try: + self.kazoo_client.delete( + event.ack_ref.path, + version=event.ack_ref.version, + recursive=True, + ) + except NoNodeError: + self.log.warning("Event %s was already acknowledged", event) + + @property + def _event_create_path(self): + return f"{self.event_root}/" + + def _put(self, data): + return self.kazoo_client.create( + self._event_create_path, + json.dumps(data).encode("utf-8"), + sequence=True, + makepath=True, + ) + + def _iterEvents(self): + try: + # We need to sort this ourself, since Kazoo doesn't guarantee any + # ordering of the returned children. + events = sorted(self._listEvents()) + except NoNodeError: + return + + for event_id in events: + path = "/".join((self.event_root, event_id)) + # TODO: implement sharding of large events + data, zstat = self.kazoo_client.get(path) + try: + event = json.loads(data) + except json.JSONDecodeError: + self.log.exception("Malformed event data in %s", path) + self._remove(path) + continue + yield event, EventAckRef(path, zstat.version) + + def _remove(self, path, version=UNKNOWN_ZVERSION): + with suppress(NoNodeError): + self.kazoo_client.delete(path, version=version, recursive=True) + + +class SchedulerEventQueue(ZooKeeperEventQueue): """Abstract API for tenant specific events via ZooKeeper The lifecycle of a global (not pipeline-specific) event is: @@ -160,75 +235,22 @@ class ZooKeeperEventQueue(ZooKeeperBase, Iterable): """ - log = logging.getLogger("zuul.zk.event_queues.ZooKeeperEventQueue") + log = logging.getLogger("zuul.zk.event_queues.SchedulerEventQueue") def __init__(self, client, event_root, event_prefix): - super().__init__(client) + super().__init__(client, event_root) self.event_prefix = event_prefix - self.event_root = event_root - self.kazoo_client.ensure_path(self.event_root) - def __len__(self): - try: - return len( - [e for e in self.kazoo_client.get_children(self.event_root) - if e.startswith(self.event_prefix.value)] - ) - except NoNodeError: - return 0 + def _listEvents(self): + return [ + e + for e in self.kazoo_client.get_children(self.event_root) + if e.startswith(self.event_prefix.value) + ] - def hasEvents(self): - return bool(len(self)) - - def ack(self, event): - # Event.ack_ref is an EventAckRef, previously attached to an - # event object when it was de-serialized. - if not event.ack_ref: - raise RuntimeError("Cannot ack event %s without reference", event) - try: - self.kazoo_client.delete( - event.ack_ref.path, - version=event.ack_ref.version, - recursive=True, - ) - except NoNodeError: - self.log.warning("Event %s was already acknowledged", event) - - def _put(self, data): - event_path = "{}/{}-".format(self.event_root, self.event_prefix.value) - return self.kazoo_client.create( - event_path, - json.dumps(data).encode("utf-8"), - sequence=True, - makepath=True, - ) - - def _iterEvents(self): - try: - events = self.kazoo_client.get_children(self.event_root) - except NoNodeError: - return - - # We need to sort this ourself, since Kazoo doesn't guarantee any - # ordering of the returned children. - events = sorted( - e for e in events if e.startswith(self.event_prefix.value) - ) - for event_id in events: - path = "/".join((self.event_root, event_id)) - # TODO: implement sharding of large events - data, zstat = self.kazoo_client.get(path) - try: - event = json.loads(data) - except json.JSONDecodeError: - self.log.exception("Malformed event data in %s", path) - self._remove(path) - continue - yield event, EventAckRef(path, zstat.version) - - def _remove(self, path, version=UNKNOWN_ZVERSION): - with suppress(NoNodeError): - self.kazoo_client.delete(path, version=version, recursive=True) + @property + def _event_create_path(self) -> str: + return "{}/{}-".format(self.event_root, self.event_prefix.value) class ManagementEventResultFuture(ZooKeeperBase): @@ -278,7 +300,7 @@ class ManagementEventResultFuture(ZooKeeperBase): return True -class ManagementEventQueue(ZooKeeperEventQueue): +class ManagementEventQueue(SchedulerEventQueue): """Management events via ZooKeeper""" RESULTS_ROOT = "/zuul/results/management" @@ -399,7 +421,7 @@ class GlobalManagementEventQueue(ManagementEventQueue): super(ManagementEventQueue, self).ack(merged_event) -class PipelineResultEventQueue(ZooKeeperEventQueue): +class PipelineResultEventQueue(SchedulerEventQueue): """Result events via ZooKeeper""" log = logging.getLogger("zuul.zk.event_queues.PipelineResultEventQueue") @@ -447,7 +469,7 @@ class PipelineResultEventQueue(ZooKeeperEventQueue): yield event -class TriggerEventQueue(ZooKeeperEventQueue): +class TriggerEventQueue(SchedulerEventQueue): """Trigger events via ZooKeeper""" log = logging.getLogger("zuul.zk.event_queues.TriggerEventQueue") @@ -516,3 +538,40 @@ class PipelineTriggerEventQueue(TriggerEventQueue): return DefaultKeyDict( lambda p: cls(client, tenant_name, p, connections) ) + + +class ConnectionEventQueue(ZooKeeperEventQueue): + """Connection events via ZooKeeper""" + + log = logging.getLogger("zuul.zk.event_queues.ConnectionEventQueue") + + def __init__(self, client, connection_name): + event_root = "/".join((CONNECTION_ROOT, connection_name, "events")) + super().__init__(client, event_root) + self.election_root = "/".join( + (CONNECTION_ROOT, connection_name, "election") + ) + self.kazoo_client.ensure_path(self.election_root) + self.election = self.kazoo_client.Election(self.election_root) + + def _eventWatch(self, callback, event_list): + if event_list: + return callback() + + def registerEventWatch(self, callback): + self.kazoo_client.ChildrenWatch( + self.event_root, functools.partial(self._eventWatch, callback) + ) + + def put(self, data): + self._put(data) + + def __iter__(self): + for data, ack_ref in self._iterEvents(): + if not data: + self.log.warning("Malformed event found: %s", data) + self._remove(ack_ref.path) + continue + event = model.ConnectionEvent.fromDict(data) + event.ack_ref = ack_ref + yield event