Add a logical timestamp to management events

Create a 'zuul_event_ltime' attribute for management events, which is
the Zookeeper creation transaction ID of an event's Znode. In case the
event's ltime is already set it won't be change, which will be the case
for forwarded events.

This timestamp can e.g. be used later on to determine if a project's
config cache in Zookeeper is up-to-date (happend before relation).

Change-Id: I922fef7ddc833565a97abe798fdecdb22b0d8dcb
This commit is contained in:
Simon Westphahl 2021-05-04 14:10:23 +02:00 committed by James E. Blair
parent 3061107fdc
commit eedb4f4447
3 changed files with 48 additions and 6 deletions

View File

@ -62,7 +62,7 @@ class DummyEventQueue(event_queues.ZooKeeperEventQueue):
self._put(event.toDict())
def __iter__(self):
for data, ack_ref in self._iterEvents():
for data, ack_ref, _ in self._iterEvents():
event = DummyEvent.fromDict(data)
event.ack_ref = ack_ref
yield event
@ -266,6 +266,39 @@ class TestManagementEventQueue(EventQueueBaseTestCase):
queue.ack(event)
self.assertFalse(queue.hasEvents())
def test_event_ltime(self):
global_queue = event_queues.GlobalManagementEventQueue(self.zk_client)
registry = event_queues.PipelineManagementEventQueue.createRegistry(
self.zk_client
)
event = model.ReconfigureEvent(None)
global_queue.put(event, needs_result=False)
self.assertTrue(global_queue.hasEvents())
pipeline_queue = registry["tenant"]["pipeline"]
self.assertIsInstance(
pipeline_queue, event_queues.ManagementEventQueue
)
processed_events = 0
for event in global_queue:
processed_events += 1
event_ltime = event.zuul_event_ltime
self.assertGreater(event_ltime, -1)
# Forward event to pipeline management event queue
pipeline_queue.put(event)
self.assertEqual(processed_events, 1)
self.assertTrue(pipeline_queue.hasEvents())
processed_events = 0
for event in pipeline_queue:
pipeline_queue.ack(event)
processed_events += 1
self.assertEqual(event.zuul_event_ltime, event_ltime)
self.assertEqual(processed_events, 1)
def test_pipeline_management_events(self):
# Test that when a management event is forwarded from the
# global to the a pipeline-specific queue, it is not

View File

@ -3519,6 +3519,10 @@ class ManagementEvent(AbstractEvent):
def __init__(self):
self.traceback = None
self.zuul_event_id = None
# Logical timestamp of the event (Zookeeper creation transaction ID).
# This will be automatically set when the event is consumed from
# the event queue in case it is None.
self.zuul_event_ltime = None
# Opaque identifier in order to report the result of an event
self.result_ref = None
@ -3528,10 +3532,12 @@ class ManagementEvent(AbstractEvent):
def toDict(self):
return {
"zuul_event_id": self.zuul_event_id,
"zuul_event_ltime": self.zuul_event_ltime,
}
def updateFromDict(self, d):
self.zuul_event_id = d.get("zuul_event_id")
self.zuul_event_ltime = d.get("zuul_event_ltime")
class ReconfigureEvent(ManagementEvent):

View File

@ -202,7 +202,7 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
self.log.exception("Malformed event data in %s", path)
self._remove(path)
continue
yield event, EventAckRef(path, zstat.version)
yield event, EventAckRef(path, zstat.version), zstat
def _remove(self, path, version=UNKNOWN_ZVERSION):
with suppress(NoNodeError):
@ -334,7 +334,7 @@ class ManagementEventQueue(SchedulerEventQueue):
def __iter__(self):
event_list = []
for data, ack_ref in self._iterEvents():
for data, ack_ref, zstat in self._iterEvents():
try:
event_class = MANAGEMENT_EVENT_TYPE_MAP[data["event_type"]]
event_data = data["event_data"]
@ -346,6 +346,9 @@ class ManagementEventQueue(SchedulerEventQueue):
event = event_class.fromDict(event_data)
event.ack_ref = ack_ref
event.result_ref = result_path
# Initialize the logical timestamp if not valid
if event.zuul_event_ltime is None:
event.zuul_event_ltime = zstat.creation_transaction_id
with suppress(ValueError):
other_event = event_list[event_list.index(event)]
@ -464,7 +467,7 @@ class PipelineResultEventQueue(SchedulerEventQueue):
self._put(data)
def __iter__(self):
for data, ack_ref in self._iterEvents():
for data, ack_ref, _ in self._iterEvents():
try:
event_class = RESULT_EVENT_TYPE_MAP[data["event_type"]]
event_data = data["event_data"]
@ -494,7 +497,7 @@ class TriggerEventQueue(SchedulerEventQueue):
self._put(data)
def __iter__(self):
for data, ack_ref in self._iterEvents():
for data, ack_ref, _ in self._iterEvents():
try:
event_class = self.connections.getTriggerEventClass(
data["driver_name"]
@ -577,7 +580,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
self._put(data)
def __iter__(self):
for data, ack_ref in self._iterEvents():
for data, ack_ref, _ in self._iterEvents():
if not data:
self.log.warning("Malformed event found: %s", data)
self._remove(ack_ref.path)