Merge "Implement Zookeeper backed connection event queues"
This commit is contained in:
commit
461f0605a4
|
@ -57,10 +57,6 @@ class DummyEvent(model.AbstractEvent):
|
|||
return cls()
|
||||
|
||||
|
||||
class DummyPrefix:
|
||||
value = "dummy"
|
||||
|
||||
|
||||
class DummyEventQueue(event_queues.ZooKeeperEventQueue):
|
||||
|
||||
def put(self, event):
|
||||
|
@ -77,7 +73,7 @@ class TestEventQueue(EventQueueBaseTestCase):
|
|||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.queue = DummyEventQueue(self.zk_client, "root", DummyPrefix())
|
||||
self.queue = DummyEventQueue(self.zk_client, "root")
|
||||
|
||||
def test_missing_ack_ref(self):
|
||||
# Every event from a ZK event queue should have an ack_ref
|
||||
|
@ -429,3 +425,44 @@ class TestEventWatchers(EventQueueBaseTestCase):
|
|||
|
||||
result_queues["other-tenant"]["post"].put(DummyResultEvent())
|
||||
self._wait_for_event(event)
|
||||
|
||||
|
||||
class TestConnectionEventQueue(EventQueueBaseTestCase):
|
||||
|
||||
def test_connection_events(self):
|
||||
# Test enqueue/dequeue of the connection event queue.
|
||||
queue = event_queues.ConnectionEventQueue(self.zk_client, "dummy")
|
||||
|
||||
self.assertEqual(len(queue), 0)
|
||||
self.assertFalse(queue.hasEvents())
|
||||
|
||||
payload = {"message": "hello world!"}
|
||||
queue.put(payload)
|
||||
queue.put(payload)
|
||||
|
||||
self.assertEqual(len(queue), 2)
|
||||
self.assertTrue(queue.hasEvents())
|
||||
|
||||
acked = 0
|
||||
for event in queue:
|
||||
self.assertIsInstance(event, model.ConnectionEvent)
|
||||
self.assertEqual(event, payload)
|
||||
queue.ack(event)
|
||||
acked += 1
|
||||
|
||||
self.assertEqual(acked, 2)
|
||||
self.assertEqual(len(queue), 0)
|
||||
self.assertFalse(queue.hasEvents())
|
||||
|
||||
def test_event_watch(self):
|
||||
# Test the registered function is called on new events.
|
||||
queue = event_queues.ConnectionEventQueue(self.zk_client, "dummy")
|
||||
|
||||
event = threading.Event()
|
||||
queue.registerEventWatch(event.set)
|
||||
self.assertFalse(event.is_set())
|
||||
|
||||
queue.put({"message": "hello world!"})
|
||||
for _ in iterate_timeout(5, "event set"):
|
||||
if event.is_set():
|
||||
break
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# under the License.
|
||||
|
||||
import abc
|
||||
from collections import OrderedDict, defaultdict
|
||||
from collections import OrderedDict, defaultdict, UserDict
|
||||
import copy
|
||||
import json
|
||||
import logging
|
||||
|
@ -3490,6 +3490,15 @@ class AbstractEvent(abc.ABC):
|
|||
return event
|
||||
|
||||
|
||||
class ConnectionEvent(AbstractEvent, UserDict):
|
||||
|
||||
def toDict(self):
|
||||
return self.data
|
||||
|
||||
def updateFromDict(self, d):
|
||||
self.data.update(d)
|
||||
|
||||
|
||||
class ManagementEvent(AbstractEvent):
|
||||
"""An event that should be processed within the main queue run loop"""
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
import enum
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
|
@ -49,6 +50,7 @@ MANAGEMENT_EVENT_TYPE_MAP = {
|
|||
|
||||
TENANT_ROOT = "/zuul/events/tenant"
|
||||
SCHEDULER_GLOBAL_ROOT = "/zuul/events/scheduler-global"
|
||||
CONNECTION_ROOT = "/zuul/events/connection"
|
||||
|
||||
# This is the path to the serialized from of the event in ZK (along
|
||||
# with the version when it was read (which should not change since
|
||||
|
@ -133,6 +135,79 @@ class PipelineEventWatcher(ZooKeeperBase):
|
|||
|
||||
|
||||
class ZooKeeperEventQueue(ZooKeeperBase, Iterable):
|
||||
"""Abstract API for events via ZooKeeper"""
|
||||
|
||||
log = logging.getLogger("zuul.zk.event_queues.ZooKeeperEventQueue")
|
||||
|
||||
def __init__(self, client, event_root):
|
||||
super().__init__(client)
|
||||
self.event_root = event_root
|
||||
self.kazoo_client.ensure_path(self.event_root)
|
||||
|
||||
def _listEvents(self):
|
||||
return self.kazoo_client.get_children(self.event_root)
|
||||
|
||||
def __len__(self):
|
||||
try:
|
||||
return len(self._listEvents())
|
||||
except NoNodeError:
|
||||
return 0
|
||||
|
||||
def hasEvents(self):
|
||||
return bool(len(self))
|
||||
|
||||
def ack(self, event):
|
||||
# Event.ack_ref is an EventAckRef, previously attached to an
|
||||
# event object when it was de-serialized.
|
||||
if not event.ack_ref:
|
||||
raise RuntimeError("Cannot ack event %s without reference", event)
|
||||
try:
|
||||
self.kazoo_client.delete(
|
||||
event.ack_ref.path,
|
||||
version=event.ack_ref.version,
|
||||
recursive=True,
|
||||
)
|
||||
except NoNodeError:
|
||||
self.log.warning("Event %s was already acknowledged", event)
|
||||
|
||||
@property
|
||||
def _event_create_path(self):
|
||||
return f"{self.event_root}/"
|
||||
|
||||
def _put(self, data):
|
||||
return self.kazoo_client.create(
|
||||
self._event_create_path,
|
||||
json.dumps(data).encode("utf-8"),
|
||||
sequence=True,
|
||||
makepath=True,
|
||||
)
|
||||
|
||||
def _iterEvents(self):
|
||||
try:
|
||||
# We need to sort this ourself, since Kazoo doesn't guarantee any
|
||||
# ordering of the returned children.
|
||||
events = sorted(self._listEvents())
|
||||
except NoNodeError:
|
||||
return
|
||||
|
||||
for event_id in events:
|
||||
path = "/".join((self.event_root, event_id))
|
||||
# TODO: implement sharding of large events
|
||||
data, zstat = self.kazoo_client.get(path)
|
||||
try:
|
||||
event = json.loads(data)
|
||||
except json.JSONDecodeError:
|
||||
self.log.exception("Malformed event data in %s", path)
|
||||
self._remove(path)
|
||||
continue
|
||||
yield event, EventAckRef(path, zstat.version)
|
||||
|
||||
def _remove(self, path, version=UNKNOWN_ZVERSION):
|
||||
with suppress(NoNodeError):
|
||||
self.kazoo_client.delete(path, version=version, recursive=True)
|
||||
|
||||
|
||||
class SchedulerEventQueue(ZooKeeperEventQueue):
|
||||
"""Abstract API for tenant specific events via ZooKeeper
|
||||
|
||||
The lifecycle of a global (not pipeline-specific) event is:
|
||||
|
@ -160,75 +235,22 @@ class ZooKeeperEventQueue(ZooKeeperBase, Iterable):
|
|||
|
||||
"""
|
||||
|
||||
log = logging.getLogger("zuul.zk.event_queues.ZooKeeperEventQueue")
|
||||
log = logging.getLogger("zuul.zk.event_queues.SchedulerEventQueue")
|
||||
|
||||
def __init__(self, client, event_root, event_prefix):
|
||||
super().__init__(client)
|
||||
super().__init__(client, event_root)
|
||||
self.event_prefix = event_prefix
|
||||
self.event_root = event_root
|
||||
self.kazoo_client.ensure_path(self.event_root)
|
||||
|
||||
def __len__(self):
|
||||
try:
|
||||
return len(
|
||||
[e for e in self.kazoo_client.get_children(self.event_root)
|
||||
if e.startswith(self.event_prefix.value)]
|
||||
)
|
||||
except NoNodeError:
|
||||
return 0
|
||||
def _listEvents(self):
|
||||
return [
|
||||
e
|
||||
for e in self.kazoo_client.get_children(self.event_root)
|
||||
if e.startswith(self.event_prefix.value)
|
||||
]
|
||||
|
||||
def hasEvents(self):
|
||||
return bool(len(self))
|
||||
|
||||
def ack(self, event):
|
||||
# Event.ack_ref is an EventAckRef, previously attached to an
|
||||
# event object when it was de-serialized.
|
||||
if not event.ack_ref:
|
||||
raise RuntimeError("Cannot ack event %s without reference", event)
|
||||
try:
|
||||
self.kazoo_client.delete(
|
||||
event.ack_ref.path,
|
||||
version=event.ack_ref.version,
|
||||
recursive=True,
|
||||
)
|
||||
except NoNodeError:
|
||||
self.log.warning("Event %s was already acknowledged", event)
|
||||
|
||||
def _put(self, data):
|
||||
event_path = "{}/{}-".format(self.event_root, self.event_prefix.value)
|
||||
return self.kazoo_client.create(
|
||||
event_path,
|
||||
json.dumps(data).encode("utf-8"),
|
||||
sequence=True,
|
||||
makepath=True,
|
||||
)
|
||||
|
||||
def _iterEvents(self):
|
||||
try:
|
||||
events = self.kazoo_client.get_children(self.event_root)
|
||||
except NoNodeError:
|
||||
return
|
||||
|
||||
# We need to sort this ourself, since Kazoo doesn't guarantee any
|
||||
# ordering of the returned children.
|
||||
events = sorted(
|
||||
e for e in events if e.startswith(self.event_prefix.value)
|
||||
)
|
||||
for event_id in events:
|
||||
path = "/".join((self.event_root, event_id))
|
||||
# TODO: implement sharding of large events
|
||||
data, zstat = self.kazoo_client.get(path)
|
||||
try:
|
||||
event = json.loads(data)
|
||||
except json.JSONDecodeError:
|
||||
self.log.exception("Malformed event data in %s", path)
|
||||
self._remove(path)
|
||||
continue
|
||||
yield event, EventAckRef(path, zstat.version)
|
||||
|
||||
def _remove(self, path, version=UNKNOWN_ZVERSION):
|
||||
with suppress(NoNodeError):
|
||||
self.kazoo_client.delete(path, version=version, recursive=True)
|
||||
@property
|
||||
def _event_create_path(self) -> str:
|
||||
return "{}/{}-".format(self.event_root, self.event_prefix.value)
|
||||
|
||||
|
||||
class ManagementEventResultFuture(ZooKeeperBase):
|
||||
|
@ -278,7 +300,7 @@ class ManagementEventResultFuture(ZooKeeperBase):
|
|||
return True
|
||||
|
||||
|
||||
class ManagementEventQueue(ZooKeeperEventQueue):
|
||||
class ManagementEventQueue(SchedulerEventQueue):
|
||||
"""Management events via ZooKeeper"""
|
||||
|
||||
RESULTS_ROOT = "/zuul/results/management"
|
||||
|
@ -399,7 +421,7 @@ class GlobalManagementEventQueue(ManagementEventQueue):
|
|||
super(ManagementEventQueue, self).ack(merged_event)
|
||||
|
||||
|
||||
class PipelineResultEventQueue(ZooKeeperEventQueue):
|
||||
class PipelineResultEventQueue(SchedulerEventQueue):
|
||||
"""Result events via ZooKeeper"""
|
||||
|
||||
log = logging.getLogger("zuul.zk.event_queues.PipelineResultEventQueue")
|
||||
|
@ -447,7 +469,7 @@ class PipelineResultEventQueue(ZooKeeperEventQueue):
|
|||
yield event
|
||||
|
||||
|
||||
class TriggerEventQueue(ZooKeeperEventQueue):
|
||||
class TriggerEventQueue(SchedulerEventQueue):
|
||||
"""Trigger events via ZooKeeper"""
|
||||
|
||||
log = logging.getLogger("zuul.zk.event_queues.TriggerEventQueue")
|
||||
|
@ -516,3 +538,40 @@ class PipelineTriggerEventQueue(TriggerEventQueue):
|
|||
return DefaultKeyDict(
|
||||
lambda p: cls(client, tenant_name, p, connections)
|
||||
)
|
||||
|
||||
|
||||
class ConnectionEventQueue(ZooKeeperEventQueue):
|
||||
"""Connection events via ZooKeeper"""
|
||||
|
||||
log = logging.getLogger("zuul.zk.event_queues.ConnectionEventQueue")
|
||||
|
||||
def __init__(self, client, connection_name):
|
||||
event_root = "/".join((CONNECTION_ROOT, connection_name, "events"))
|
||||
super().__init__(client, event_root)
|
||||
self.election_root = "/".join(
|
||||
(CONNECTION_ROOT, connection_name, "election")
|
||||
)
|
||||
self.kazoo_client.ensure_path(self.election_root)
|
||||
self.election = self.kazoo_client.Election(self.election_root)
|
||||
|
||||
def _eventWatch(self, callback, event_list):
|
||||
if event_list:
|
||||
return callback()
|
||||
|
||||
def registerEventWatch(self, callback):
|
||||
self.kazoo_client.ChildrenWatch(
|
||||
self.event_root, functools.partial(self._eventWatch, callback)
|
||||
)
|
||||
|
||||
def put(self, data):
|
||||
self._put(data)
|
||||
|
||||
def __iter__(self):
|
||||
for data, ack_ref in self._iterEvents():
|
||||
if not data:
|
||||
self.log.warning("Malformed event found: %s", data)
|
||||
self._remove(ack_ref.path)
|
||||
continue
|
||||
event = model.ConnectionEvent.fromDict(data)
|
||||
event.ack_ref = ack_ref
|
||||
yield event
|
||||
|
|
Loading…
Reference in New Issue