Merge "Add a logical timestamp to management events"
This commit is contained in:
@@ -62,7 +62,7 @@ class DummyEventQueue(event_queues.ZooKeeperEventQueue):
|
||||
self._put(event.toDict())
|
||||
|
||||
def __iter__(self):
|
||||
for data, ack_ref in self._iterEvents():
|
||||
for data, ack_ref, _ in self._iterEvents():
|
||||
event = DummyEvent.fromDict(data)
|
||||
event.ack_ref = ack_ref
|
||||
yield event
|
||||
@@ -266,6 +266,39 @@ class TestManagementEventQueue(EventQueueBaseTestCase):
|
||||
queue.ack(event)
|
||||
self.assertFalse(queue.hasEvents())
|
||||
|
||||
def test_event_ltime(self):
|
||||
global_queue = event_queues.GlobalManagementEventQueue(self.zk_client)
|
||||
registry = event_queues.PipelineManagementEventQueue.createRegistry(
|
||||
self.zk_client
|
||||
)
|
||||
|
||||
event = model.ReconfigureEvent(None)
|
||||
global_queue.put(event, needs_result=False)
|
||||
self.assertTrue(global_queue.hasEvents())
|
||||
|
||||
pipeline_queue = registry["tenant"]["pipeline"]
|
||||
self.assertIsInstance(
|
||||
pipeline_queue, event_queues.ManagementEventQueue
|
||||
)
|
||||
processed_events = 0
|
||||
for event in global_queue:
|
||||
processed_events += 1
|
||||
event_ltime = event.zuul_event_ltime
|
||||
self.assertGreater(event_ltime, -1)
|
||||
# Forward event to pipeline management event queue
|
||||
pipeline_queue.put(event)
|
||||
|
||||
self.assertEqual(processed_events, 1)
|
||||
self.assertTrue(pipeline_queue.hasEvents())
|
||||
|
||||
processed_events = 0
|
||||
for event in pipeline_queue:
|
||||
pipeline_queue.ack(event)
|
||||
processed_events += 1
|
||||
self.assertEqual(event.zuul_event_ltime, event_ltime)
|
||||
|
||||
self.assertEqual(processed_events, 1)
|
||||
|
||||
def test_pipeline_management_events(self):
|
||||
# Test that when a management event is forwarded from the
|
||||
# global to the a pipeline-specific queue, it is not
|
||||
|
||||
@@ -3575,6 +3575,10 @@ class ManagementEvent(AbstractEvent):
|
||||
def __init__(self):
|
||||
self.traceback = None
|
||||
self.zuul_event_id = None
|
||||
# Logical timestamp of the event (Zookeeper creation transaction ID).
|
||||
# This will be automatically set when the event is consumed from
|
||||
# the event queue in case it is None.
|
||||
self.zuul_event_ltime = None
|
||||
# Opaque identifier in order to report the result of an event
|
||||
self.result_ref = None
|
||||
|
||||
@@ -3584,10 +3588,12 @@ class ManagementEvent(AbstractEvent):
|
||||
def toDict(self):
|
||||
return {
|
||||
"zuul_event_id": self.zuul_event_id,
|
||||
"zuul_event_ltime": self.zuul_event_ltime,
|
||||
}
|
||||
|
||||
def updateFromDict(self, d):
|
||||
self.zuul_event_id = d.get("zuul_event_id")
|
||||
self.zuul_event_ltime = d.get("zuul_event_ltime")
|
||||
|
||||
|
||||
class ReconfigureEvent(ManagementEvent):
|
||||
|
||||
@@ -202,7 +202,7 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
|
||||
self.log.exception("Malformed event data in %s", path)
|
||||
self._remove(path)
|
||||
continue
|
||||
yield event, EventAckRef(path, zstat.version)
|
||||
yield event, EventAckRef(path, zstat.version), zstat
|
||||
|
||||
def _remove(self, path, version=UNKNOWN_ZVERSION):
|
||||
with suppress(NoNodeError):
|
||||
@@ -334,7 +334,7 @@ class ManagementEventQueue(SchedulerEventQueue):
|
||||
|
||||
def __iter__(self):
|
||||
event_list = []
|
||||
for data, ack_ref in self._iterEvents():
|
||||
for data, ack_ref, zstat in self._iterEvents():
|
||||
try:
|
||||
event_class = MANAGEMENT_EVENT_TYPE_MAP[data["event_type"]]
|
||||
event_data = data["event_data"]
|
||||
@@ -346,6 +346,9 @@ class ManagementEventQueue(SchedulerEventQueue):
|
||||
event = event_class.fromDict(event_data)
|
||||
event.ack_ref = ack_ref
|
||||
event.result_ref = result_path
|
||||
# Initialize the logical timestamp if not valid
|
||||
if event.zuul_event_ltime is None:
|
||||
event.zuul_event_ltime = zstat.creation_transaction_id
|
||||
|
||||
with suppress(ValueError):
|
||||
other_event = event_list[event_list.index(event)]
|
||||
@@ -464,7 +467,7 @@ class PipelineResultEventQueue(SchedulerEventQueue):
|
||||
self._put(data)
|
||||
|
||||
def __iter__(self):
|
||||
for data, ack_ref in self._iterEvents():
|
||||
for data, ack_ref, _ in self._iterEvents():
|
||||
try:
|
||||
event_class = RESULT_EVENT_TYPE_MAP[data["event_type"]]
|
||||
event_data = data["event_data"]
|
||||
@@ -494,7 +497,7 @@ class TriggerEventQueue(SchedulerEventQueue):
|
||||
self._put(data)
|
||||
|
||||
def __iter__(self):
|
||||
for data, ack_ref in self._iterEvents():
|
||||
for data, ack_ref, _ in self._iterEvents():
|
||||
try:
|
||||
event_class = self.connections.getTriggerEventClass(
|
||||
data["driver_name"]
|
||||
@@ -577,7 +580,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
|
||||
self._put(data)
|
||||
|
||||
def __iter__(self):
|
||||
for data, ack_ref in self._iterEvents():
|
||||
for data, ack_ref, _ in self._iterEvents():
|
||||
if not data:
|
||||
self.log.warning("Malformed event found: %s", data)
|
||||
self._remove(ack_ref.path)
|
||||
|
||||
Reference in New Issue
Block a user