Implement Zookeeper backed connection event queues

Connection evens will be stored in
`/zuul/events/connection/<connection_name>/events/<sequence>`.

The implementation utilizes Kazoo's leader election recipe to ensure
that there is only one scheduler processing events for a given
connection at the same time.

Elections use `/zuul/events/connection/<connection_name>/election` as
the root path.

Change-Id: I70ff1b522475a4873999b89e2f5a9707e87a8906
This commit is contained in:
Simon Westphahl 2021-02-12 14:08:18 +01:00
parent 0e9635fa51
commit 1ef47dbbd2
3 changed files with 178 additions and 73 deletions

View File

@ -57,10 +57,6 @@ class DummyEvent(model.AbstractEvent):
return cls()
class DummyPrefix:
value = "dummy"
class DummyEventQueue(event_queues.ZooKeeperEventQueue):
def put(self, event):
@ -77,7 +73,7 @@ class TestEventQueue(EventQueueBaseTestCase):
def setUp(self):
super().setUp()
self.queue = DummyEventQueue(self.zk_client, "root", DummyPrefix())
self.queue = DummyEventQueue(self.zk_client, "root")
def test_missing_ack_ref(self):
# Every event from a ZK event queue should have an ack_ref
@ -429,3 +425,44 @@ class TestEventWatchers(EventQueueBaseTestCase):
result_queues["other-tenant"]["post"].put(DummyResultEvent())
self._wait_for_event(event)
class TestConnectionEventQueue(EventQueueBaseTestCase):
def test_connection_events(self):
# Test enqueue/dequeue of the connection event queue.
queue = event_queues.ConnectionEventQueue(self.zk_client, "dummy")
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
payload = {"message": "hello world!"}
queue.put(payload)
queue.put(payload)
self.assertEqual(len(queue), 2)
self.assertTrue(queue.hasEvents())
acked = 0
for event in queue:
self.assertIsInstance(event, model.ConnectionEvent)
self.assertEqual(event, payload)
queue.ack(event)
acked += 1
self.assertEqual(acked, 2)
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
def test_event_watch(self):
# Test the registered function is called on new events.
queue = event_queues.ConnectionEventQueue(self.zk_client, "dummy")
event = threading.Event()
queue.registerEventWatch(event.set)
self.assertFalse(event.is_set())
queue.put({"message": "hello world!"})
for _ in iterate_timeout(5, "event set"):
if event.is_set():
break

View File

@ -14,7 +14,7 @@
# under the License.
import abc
from collections import OrderedDict, defaultdict
from collections import OrderedDict, defaultdict, UserDict
import copy
import json
import logging
@ -3489,6 +3489,15 @@ class AbstractEvent(abc.ABC):
return event
class ConnectionEvent(AbstractEvent, UserDict):
def toDict(self):
return self.data
def updateFromDict(self, d):
self.data.update(d)
class ManagementEvent(AbstractEvent):
"""An event that should be processed within the main queue run loop"""

View File

@ -13,6 +13,7 @@
# under the License.
import enum
import functools
import json
import logging
import threading
@ -49,6 +50,7 @@ MANAGEMENT_EVENT_TYPE_MAP = {
TENANT_ROOT = "/zuul/events/tenant"
SCHEDULER_GLOBAL_ROOT = "/zuul/events/scheduler-global"
CONNECTION_ROOT = "/zuul/events/connection"
# This is the path to the serialized from of the event in ZK (along
# with the version when it was read (which should not change since
@ -133,6 +135,79 @@ class PipelineEventWatcher(ZooKeeperBase):
class ZooKeeperEventQueue(ZooKeeperBase, Iterable):
"""Abstract API for events via ZooKeeper"""
log = logging.getLogger("zuul.zk.event_queues.ZooKeeperEventQueue")
def __init__(self, client, event_root):
super().__init__(client)
self.event_root = event_root
self.kazoo_client.ensure_path(self.event_root)
def _listEvents(self):
return self.kazoo_client.get_children(self.event_root)
def __len__(self):
try:
return len(self._listEvents())
except NoNodeError:
return 0
def hasEvents(self):
return bool(len(self))
def ack(self, event):
# Event.ack_ref is an EventAckRef, previously attached to an
# event object when it was de-serialized.
if not event.ack_ref:
raise RuntimeError("Cannot ack event %s without reference", event)
try:
self.kazoo_client.delete(
event.ack_ref.path,
version=event.ack_ref.version,
recursive=True,
)
except NoNodeError:
self.log.warning("Event %s was already acknowledged", event)
@property
def _event_create_path(self):
return f"{self.event_root}/"
def _put(self, data):
return self.kazoo_client.create(
self._event_create_path,
json.dumps(data).encode("utf-8"),
sequence=True,
makepath=True,
)
def _iterEvents(self):
try:
# We need to sort this ourself, since Kazoo doesn't guarantee any
# ordering of the returned children.
events = sorted(self._listEvents())
except NoNodeError:
return
for event_id in events:
path = "/".join((self.event_root, event_id))
# TODO: implement sharding of large events
data, zstat = self.kazoo_client.get(path)
try:
event = json.loads(data)
except json.JSONDecodeError:
self.log.exception("Malformed event data in %s", path)
self._remove(path)
continue
yield event, EventAckRef(path, zstat.version)
def _remove(self, path, version=UNKNOWN_ZVERSION):
with suppress(NoNodeError):
self.kazoo_client.delete(path, version=version, recursive=True)
class SchedulerEventQueue(ZooKeeperEventQueue):
"""Abstract API for tenant specific events via ZooKeeper
The lifecycle of a global (not pipeline-specific) event is:
@ -160,75 +235,22 @@ class ZooKeeperEventQueue(ZooKeeperBase, Iterable):
"""
log = logging.getLogger("zuul.zk.event_queues.ZooKeeperEventQueue")
log = logging.getLogger("zuul.zk.event_queues.SchedulerEventQueue")
def __init__(self, client, event_root, event_prefix):
super().__init__(client)
super().__init__(client, event_root)
self.event_prefix = event_prefix
self.event_root = event_root
self.kazoo_client.ensure_path(self.event_root)
def __len__(self):
try:
return len(
[e for e in self.kazoo_client.get_children(self.event_root)
if e.startswith(self.event_prefix.value)]
)
except NoNodeError:
return 0
def _listEvents(self):
return [
e
for e in self.kazoo_client.get_children(self.event_root)
if e.startswith(self.event_prefix.value)
]
def hasEvents(self):
return bool(len(self))
def ack(self, event):
# Event.ack_ref is an EventAckRef, previously attached to an
# event object when it was de-serialized.
if not event.ack_ref:
raise RuntimeError("Cannot ack event %s without reference", event)
try:
self.kazoo_client.delete(
event.ack_ref.path,
version=event.ack_ref.version,
recursive=True,
)
except NoNodeError:
self.log.warning("Event %s was already acknowledged", event)
def _put(self, data):
event_path = "{}/{}-".format(self.event_root, self.event_prefix.value)
return self.kazoo_client.create(
event_path,
json.dumps(data).encode("utf-8"),
sequence=True,
makepath=True,
)
def _iterEvents(self):
try:
events = self.kazoo_client.get_children(self.event_root)
except NoNodeError:
return
# We need to sort this ourself, since Kazoo doesn't guarantee any
# ordering of the returned children.
events = sorted(
e for e in events if e.startswith(self.event_prefix.value)
)
for event_id in events:
path = "/".join((self.event_root, event_id))
# TODO: implement sharding of large events
data, zstat = self.kazoo_client.get(path)
try:
event = json.loads(data)
except json.JSONDecodeError:
self.log.exception("Malformed event data in %s", path)
self._remove(path)
continue
yield event, EventAckRef(path, zstat.version)
def _remove(self, path, version=UNKNOWN_ZVERSION):
with suppress(NoNodeError):
self.kazoo_client.delete(path, version=version, recursive=True)
@property
def _event_create_path(self) -> str:
return "{}/{}-".format(self.event_root, self.event_prefix.value)
class ManagementEventResultFuture(ZooKeeperBase):
@ -278,7 +300,7 @@ class ManagementEventResultFuture(ZooKeeperBase):
return True
class ManagementEventQueue(ZooKeeperEventQueue):
class ManagementEventQueue(SchedulerEventQueue):
"""Management events via ZooKeeper"""
RESULTS_ROOT = "/zuul/results/management"
@ -399,7 +421,7 @@ class GlobalManagementEventQueue(ManagementEventQueue):
super(ManagementEventQueue, self).ack(merged_event)
class PipelineResultEventQueue(ZooKeeperEventQueue):
class PipelineResultEventQueue(SchedulerEventQueue):
"""Result events via ZooKeeper"""
log = logging.getLogger("zuul.zk.event_queues.PipelineResultEventQueue")
@ -447,7 +469,7 @@ class PipelineResultEventQueue(ZooKeeperEventQueue):
yield event
class TriggerEventQueue(ZooKeeperEventQueue):
class TriggerEventQueue(SchedulerEventQueue):
"""Trigger events via ZooKeeper"""
log = logging.getLogger("zuul.zk.event_queues.TriggerEventQueue")
@ -516,3 +538,40 @@ class PipelineTriggerEventQueue(TriggerEventQueue):
return DefaultKeyDict(
lambda p: cls(client, tenant_name, p, connections)
)
class ConnectionEventQueue(ZooKeeperEventQueue):
"""Connection events via ZooKeeper"""
log = logging.getLogger("zuul.zk.event_queues.ConnectionEventQueue")
def __init__(self, client, connection_name):
event_root = "/".join((CONNECTION_ROOT, connection_name, "events"))
super().__init__(client, event_root)
self.election_root = "/".join(
(CONNECTION_ROOT, connection_name, "election")
)
self.kazoo_client.ensure_path(self.election_root)
self.election = self.kazoo_client.Election(self.election_root)
def _eventWatch(self, callback, event_list):
if event_list:
return callback()
def registerEventWatch(self, callback):
self.kazoo_client.ChildrenWatch(
self.event_root, functools.partial(self._eventWatch, callback)
)
def put(self, data):
self._put(data)
def __iter__(self):
for data, ack_ref in self._iterEvents():
if not data:
self.log.warning("Malformed event found: %s", data)
self._remove(ack_ref.path)
continue
event = model.ConnectionEvent.fromDict(data)
event.ack_ref = ack_ref
yield event