Merge "Use ephemeral nodes for management result events"
This commit is contained in:
commit
19ed5d1cee
|
@ -320,6 +320,98 @@ class TestManagementEventQueue(EventQueueBaseTestCase):
|
|||
self.assertEqual(len(pipeline_queue), 0)
|
||||
self.assertFalse(pipeline_queue.hasEvents())
|
||||
|
||||
def test_management_events_client(self):
|
||||
# Test management events from a second client
|
||||
|
||||
queue = event_queues.GlobalManagementEventQueue(self.zk_client)
|
||||
self.assertEqual(len(queue), 0)
|
||||
self.assertFalse(queue.hasEvents())
|
||||
|
||||
# This client will submit a reconfigure event and wait for it.
|
||||
external_client = ZooKeeperClient(
|
||||
self.zk_chroot_fixture.zk_hosts,
|
||||
tls_cert=self.zk_chroot_fixture.zookeeper_cert,
|
||||
tls_key=self.zk_chroot_fixture.zookeeper_key,
|
||||
tls_ca=self.zk_chroot_fixture.zookeeper_ca)
|
||||
self.addCleanup(external_client.disconnect)
|
||||
external_client.connect()
|
||||
|
||||
external_queue = event_queues.GlobalManagementEventQueue(
|
||||
external_client)
|
||||
|
||||
event = model.ReconfigureEvent(None)
|
||||
result_future = external_queue.put(event)
|
||||
self.assertIsNotNone(result_future)
|
||||
|
||||
self.assertEqual(len(queue), 1)
|
||||
self.assertTrue(queue.hasEvents())
|
||||
self.assertFalse(result_future.wait(0.1))
|
||||
|
||||
acked = 0
|
||||
for event in queue:
|
||||
self.assertIsInstance(event, model.ReconfigureEvent)
|
||||
queue.ack(event)
|
||||
acked += 1
|
||||
|
||||
self.assertEqual(acked, 1)
|
||||
self.assertTrue(result_future.wait(5))
|
||||
self.assertEqual(len(queue), 0)
|
||||
self.assertFalse(queue.hasEvents())
|
||||
|
||||
def test_management_events_client_disconnect(self):
|
||||
# Test management events from a second client which
|
||||
# disconnects before the event is complete.
|
||||
|
||||
queue = event_queues.GlobalManagementEventQueue(self.zk_client)
|
||||
self.assertEqual(len(queue), 0)
|
||||
self.assertFalse(queue.hasEvents())
|
||||
|
||||
# This client will submit a reconfigure event and disconnect
|
||||
# before it's complete.
|
||||
external_client = ZooKeeperClient(
|
||||
self.zk_chroot_fixture.zk_hosts,
|
||||
tls_cert=self.zk_chroot_fixture.zookeeper_cert,
|
||||
tls_key=self.zk_chroot_fixture.zookeeper_key,
|
||||
tls_ca=self.zk_chroot_fixture.zookeeper_ca)
|
||||
self.addCleanup(external_client.disconnect)
|
||||
external_client.connect()
|
||||
|
||||
external_queue = event_queues.GlobalManagementEventQueue(
|
||||
external_client)
|
||||
|
||||
# Submit the event
|
||||
event = model.ReconfigureEvent(None)
|
||||
result_future = external_queue.put(event)
|
||||
self.assertIsNotNone(result_future)
|
||||
|
||||
# Make sure the event is in the queue and the result node exists
|
||||
self.assertEqual(len(queue), 1)
|
||||
self.assertTrue(queue.hasEvents())
|
||||
self.assertFalse(result_future.wait(0.1))
|
||||
self.assertEqual(len(
|
||||
self.zk_client.client.get_children('/zuul/results/management')), 1)
|
||||
|
||||
# Disconnect the originating client
|
||||
external_client.disconnect()
|
||||
# Ensure the result node is gone
|
||||
self.assertEqual(len(
|
||||
self.zk_client.client.get_children('/zuul/results/management')), 0)
|
||||
|
||||
# Process the event
|
||||
acked = 0
|
||||
for event in queue:
|
||||
self.assertIsInstance(event, model.ReconfigureEvent)
|
||||
queue.ack(event)
|
||||
acked += 1
|
||||
|
||||
# Make sure the event has been processed and we didn't
|
||||
# re-create the result node.
|
||||
self.assertEqual(acked, 1)
|
||||
self.assertEqual(len(queue), 0)
|
||||
self.assertFalse(queue.hasEvents())
|
||||
self.assertEqual(len(
|
||||
self.zk_client.client.get_children('/zuul/results/management')), 0)
|
||||
|
||||
|
||||
# TODO: use actual model.ResultEvent once it inherits from
|
||||
# AbstractEvent and implements serialization.
|
||||
|
|
|
@ -279,9 +279,9 @@ class ManagementEventResultFuture(ZooKeeperBase):
|
|||
# that an event has been processed is to check for a result;
|
||||
# the original event may have been deleted when forwaded to a
|
||||
# different queue.
|
||||
if not self._wait_event.wait(timeout):
|
||||
return False
|
||||
try:
|
||||
if not self._wait_event.wait(timeout):
|
||||
return False
|
||||
try:
|
||||
data, _ = self.kazoo_client.get(self._result_path)
|
||||
result = json.loads(data.decode("utf-8"))
|
||||
|
@ -321,6 +321,10 @@ class ManagementEventQueue(SchedulerEventQueue):
|
|||
"event_data": event.toDict(),
|
||||
"result_path": result_path,
|
||||
}
|
||||
if needs_result and not event.result_ref:
|
||||
# The event was not forwarded, create the result ref
|
||||
self.kazoo_client.create(result_path, None,
|
||||
makepath=True, ephemeral=True)
|
||||
self._put(data)
|
||||
if needs_result and result_path:
|
||||
return ManagementEventResultFuture(self.client, result_path)
|
||||
|
@ -367,14 +371,16 @@ class ManagementEventQueue(SchedulerEventQueue):
|
|||
if not event.result_ref:
|
||||
return
|
||||
|
||||
# TODO: Add a cleanup thread that deletes old events.
|
||||
result_data = {"traceback": event.traceback,
|
||||
"timestamp": time.monotonic()}
|
||||
self.kazoo_client.create(
|
||||
event.result_ref,
|
||||
json.dumps(result_data).encode("utf-8"),
|
||||
makepath=True,
|
||||
)
|
||||
try:
|
||||
self.kazoo_client.set(
|
||||
event.result_ref,
|
||||
json.dumps(result_data).encode("utf-8"),
|
||||
)
|
||||
except NoNodeError:
|
||||
self.log.warning(f"No result node found for {event}; "
|
||||
"client may have disconnected")
|
||||
|
||||
|
||||
class PipelineManagementEventQueue(ManagementEventQueue):
|
||||
|
|
Loading…
Reference in New Issue