680 lines
22 KiB
Python
680 lines
22 KiB
Python
# Copyright 2021 BMW Group
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import random
|
|
import string
|
|
import threading
|
|
|
|
import testtools
|
|
|
|
from zuul import model
|
|
from zuul.driver import Driver, TriggerInterface
|
|
from zuul.lib.connections import ConnectionRegistry
|
|
from zuul.zk import ZooKeeperClient, event_queues, sharding
|
|
|
|
from tests.base import BaseTestCase, iterate_timeout
|
|
|
|
|
|
class EventQueueBaseTestCase(BaseTestCase):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.setupZK()
|
|
|
|
self.zk_client = ZooKeeperClient(
|
|
self.zk_chroot_fixture.zk_hosts,
|
|
tls_cert=self.zk_chroot_fixture.zookeeper_cert,
|
|
tls_key=self.zk_chroot_fixture.zookeeper_key,
|
|
tls_ca=self.zk_chroot_fixture.zookeeper_ca
|
|
)
|
|
self.addCleanup(self.zk_client.disconnect)
|
|
self.zk_client.connect()
|
|
|
|
self.connections = ConnectionRegistry()
|
|
self.addCleanup(self.connections.stop)
|
|
|
|
|
|
class DummyEvent(model.AbstractEvent):
|
|
|
|
def toDict(self):
|
|
return {}
|
|
|
|
def updateFromDict(self):
|
|
pass
|
|
|
|
@classmethod
|
|
def fromDict(cls, d):
|
|
return cls()
|
|
|
|
|
|
class DummyEventQueue(event_queues.ZooKeeperEventQueue):
|
|
|
|
def put(self, event):
|
|
self._put(event.toDict())
|
|
|
|
def __iter__(self):
|
|
for data, ack_ref, _ in self._iterEvents():
|
|
event = DummyEvent.fromDict(data)
|
|
event.ack_ref = ack_ref
|
|
yield event
|
|
|
|
|
|
class TestEventQueue(EventQueueBaseTestCase):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.queue = DummyEventQueue(self.zk_client, "root")
|
|
|
|
def test_missing_ack_ref(self):
|
|
# Every event from a ZK event queue should have an ack_ref
|
|
# attached to it when it is deserialized; ensure that an error
|
|
# is raised if we try to ack an event without one.
|
|
with testtools.ExpectedException(RuntimeError):
|
|
self.queue.ack(DummyEvent())
|
|
|
|
def test_double_ack(self):
|
|
# Test that if we ack an event twice, an exception isn't
|
|
# raised.
|
|
self.queue.put(DummyEvent())
|
|
self.assertEqual(len(self.queue), 1)
|
|
|
|
event = next(iter(self.queue))
|
|
self.queue.ack(event)
|
|
self.assertEqual(len(self.queue), 0)
|
|
|
|
# Should not raise an exception
|
|
self.queue.ack(event)
|
|
|
|
def test_invalid_json_ignored(self):
|
|
# Test that invalid json is automatically removed.
|
|
event_path = self.queue._put({})
|
|
self.zk_client.client.set(event_path, b"{ invalid")
|
|
|
|
self.assertEqual(len(self.queue), 1)
|
|
self.assertEqual(list(self.queue._iterEvents()), [])
|
|
self.assertEqual(len(self.queue), 0)
|
|
|
|
|
|
class DummyTriggerEvent(model.TriggerEvent):
|
|
pass
|
|
|
|
|
|
class DummyDriver(Driver, TriggerInterface):
|
|
name = driver_name = "dummy"
|
|
|
|
def getTrigger(self, connection, config=None):
|
|
pass
|
|
|
|
def getTriggerSchema(self):
|
|
pass
|
|
|
|
def getTriggerEventClass(self):
|
|
return DummyTriggerEvent
|
|
|
|
|
|
class TestTriggerEventQueue(EventQueueBaseTestCase):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.driver = DummyDriver()
|
|
self.connections.registerDriver(self.driver)
|
|
|
|
def test_sharded_tenant_trigger_events(self):
|
|
# Test enqueue/dequeue of the tenant trigger event queue.
|
|
queue = event_queues.TenantTriggerEventQueue(
|
|
self.zk_client, self.connections, "tenant"
|
|
)
|
|
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
event = DummyTriggerEvent()
|
|
data = {'test': ''.join(
|
|
random.choice(string.ascii_letters + string.digits)
|
|
for x in range(sharding.NODE_BYTE_SIZE_LIMIT * 2))}
|
|
event.data = data
|
|
|
|
queue.put(self.driver.driver_name, event)
|
|
queue.put(self.driver.driver_name, event)
|
|
|
|
self.assertEqual(len(queue), 2)
|
|
self.assertTrue(queue.hasEvents())
|
|
|
|
processed = 0
|
|
for event in queue:
|
|
self.assertIsInstance(event, DummyTriggerEvent)
|
|
processed += 1
|
|
|
|
self.assertEqual(processed, 2)
|
|
self.assertEqual(len(queue), 2)
|
|
self.assertTrue(queue.hasEvents())
|
|
|
|
acked = 0
|
|
for event in queue:
|
|
queue.ack(event)
|
|
self.assertEqual(event.data, data)
|
|
acked += 1
|
|
|
|
self.assertEqual(acked, 2)
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
def test_lost_sharded_tenant_trigger_events(self):
|
|
# Test cleanup when we write out side-channel data but not an
|
|
# event.
|
|
queue = event_queues.TenantTriggerEventQueue(
|
|
self.zk_client, self.connections, "tenant"
|
|
)
|
|
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
event = DummyTriggerEvent()
|
|
data = {'test': ''.join(
|
|
random.choice(string.ascii_letters + string.digits)
|
|
for x in range(sharding.NODE_BYTE_SIZE_LIMIT * 2))}
|
|
event.data = data
|
|
|
|
queue.put(self.driver.driver_name, event)
|
|
queue.put(self.driver.driver_name, event)
|
|
|
|
self.assertEqual(len(queue), 2)
|
|
self.assertTrue(queue.hasEvents())
|
|
|
|
# Delete the first event (but not its sharded data)
|
|
events = list(queue)
|
|
self.zk_client.client.delete(events[0].ack_ref.path)
|
|
# There should still be 2 data nodes
|
|
self.assertEqual(len(
|
|
self.zk_client.client.get_children(queue.data_root)), 2)
|
|
|
|
# Clean up lost data
|
|
queue.cleanup(age=0)
|
|
# There should only be one data node remaining
|
|
self.assertEqual(len(
|
|
self.zk_client.client.get_children(queue.data_root)), 1)
|
|
|
|
# Ack the second event
|
|
queue.ack(events[1])
|
|
|
|
# Assert there are no side channel data nodes
|
|
self.assertEqual(len(
|
|
self.zk_client.client.get_children(queue.data_root)), 0)
|
|
|
|
def test_tenant_trigger_events(self):
|
|
# Test enqueue/dequeue of the tenant trigger event queue.
|
|
queue = event_queues.TenantTriggerEventQueue(
|
|
self.zk_client, self.connections, "tenant"
|
|
)
|
|
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
event = DummyTriggerEvent()
|
|
queue.put(self.driver.driver_name, event)
|
|
queue.put(self.driver.driver_name, event)
|
|
|
|
self.assertEqual(len(queue), 2)
|
|
self.assertTrue(queue.hasEvents())
|
|
|
|
processed = 0
|
|
for event in queue:
|
|
self.assertIsInstance(event, DummyTriggerEvent)
|
|
processed += 1
|
|
|
|
self.assertEqual(processed, 2)
|
|
self.assertEqual(len(queue), 2)
|
|
self.assertTrue(queue.hasEvents())
|
|
|
|
acked = 0
|
|
for event in queue:
|
|
queue.ack(event)
|
|
acked += 1
|
|
|
|
self.assertEqual(acked, 2)
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
def test_pipeline_trigger_events(self):
|
|
# Test enqueue/dequeue of pipeline-specific trigger event
|
|
# queues.
|
|
registry = event_queues.PipelineTriggerEventQueue.createRegistry(
|
|
self.zk_client, self.connections
|
|
)
|
|
|
|
queue = registry["tenant"]["pipeline"]
|
|
self.assertIsInstance(queue, event_queues.TriggerEventQueue)
|
|
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
event = DummyTriggerEvent()
|
|
queue.put(self.driver.driver_name, event)
|
|
|
|
self.assertEqual(len(queue), 1)
|
|
self.assertTrue(queue.hasEvents())
|
|
|
|
other_queue = registry["other_tenant"]["pipeline"]
|
|
self.assertEqual(len(other_queue), 0)
|
|
self.assertFalse(other_queue.hasEvents())
|
|
|
|
acked = 0
|
|
for event in queue:
|
|
self.assertIsInstance(event, DummyTriggerEvent)
|
|
queue.ack(event)
|
|
acked += 1
|
|
|
|
self.assertEqual(acked, 1)
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
|
|
class TestManagementEventQueue(EventQueueBaseTestCase):
|
|
|
|
def test_management_events(self):
|
|
# Test enqueue/dequeue of the tenant management event queue.
|
|
queue = event_queues.TenantManagementEventQueue(
|
|
self.zk_client, "tenant")
|
|
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
event = model.ReconfigureEvent()
|
|
result_future = queue.put(event, needs_result=False)
|
|
self.assertIsNone(result_future)
|
|
|
|
result_future = queue.put(event)
|
|
self.assertIsNotNone(result_future)
|
|
|
|
self.assertEqual(len(queue), 2)
|
|
self.assertTrue(queue.hasEvents())
|
|
self.assertFalse(result_future.wait(0.1))
|
|
|
|
acked = 0
|
|
for event in queue:
|
|
self.assertIsInstance(event, model.ReconfigureEvent)
|
|
queue.ack(event)
|
|
acked += 1
|
|
|
|
self.assertEqual(acked, 2)
|
|
self.assertTrue(result_future.wait(5))
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
def test_management_event_error(self):
|
|
# Test that management event errors are reported.
|
|
queue = event_queues.TenantManagementEventQueue(
|
|
self.zk_client, "tenant")
|
|
event = model.ReconfigureEvent()
|
|
result_future = queue.put(event)
|
|
|
|
acked = 0
|
|
for event in queue:
|
|
event.traceback = "hello traceback"
|
|
queue.ack(event)
|
|
acked += 1
|
|
|
|
self.assertEqual(acked, 1)
|
|
with testtools.ExpectedException(RuntimeError, msg="hello traceback"):
|
|
self.assertFalse(result_future.wait(5))
|
|
|
|
def test_event_merge(self):
|
|
# Test that similar management events (eg, reconfiguration of
|
|
# two projects) can be merged.
|
|
queue = event_queues.TenantManagementEventQueue(
|
|
self.zk_client, "tenant")
|
|
event = model.TenantReconfigureEvent("tenant", "project", "master")
|
|
queue.put(event, needs_result=False)
|
|
event = model.TenantReconfigureEvent("tenant", "other", "branch")
|
|
queue.put(event, needs_result=False)
|
|
|
|
self.assertEqual(len(queue), 2)
|
|
events = list(queue)
|
|
|
|
self.assertEqual(len(events), 1)
|
|
event = events[0]
|
|
self.assertEqual(len(event.merged_events), 1)
|
|
self.assertEqual(
|
|
event.project_branches,
|
|
set([("project", "master"), ("other", "branch")])
|
|
)
|
|
|
|
queue.ack(event)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
def test_event_ltime(self):
|
|
tenant_queue = event_queues.TenantManagementEventQueue(
|
|
self.zk_client, "tenant")
|
|
registry = event_queues.PipelineManagementEventQueue.createRegistry(
|
|
self.zk_client
|
|
)
|
|
|
|
event = model.ReconfigureEvent()
|
|
tenant_queue.put(event, needs_result=False)
|
|
self.assertTrue(tenant_queue.hasEvents())
|
|
|
|
pipeline_queue = registry["tenant"]["pipeline"]
|
|
self.assertIsInstance(
|
|
pipeline_queue, event_queues.ManagementEventQueue
|
|
)
|
|
processed_events = 0
|
|
for event in tenant_queue:
|
|
processed_events += 1
|
|
event_ltime = event.zuul_event_ltime
|
|
self.assertGreater(event_ltime, -1)
|
|
# Forward event to pipeline management event queue
|
|
pipeline_queue.put(event)
|
|
|
|
self.assertEqual(processed_events, 1)
|
|
self.assertTrue(pipeline_queue.hasEvents())
|
|
|
|
processed_events = 0
|
|
for event in pipeline_queue:
|
|
pipeline_queue.ack(event)
|
|
processed_events += 1
|
|
self.assertEqual(event.zuul_event_ltime, event_ltime)
|
|
|
|
self.assertEqual(processed_events, 1)
|
|
|
|
def test_pipeline_management_events(self):
|
|
# Test that when a management event is forwarded from the
|
|
# tenant to the a pipeline-specific queue, it is not
|
|
# prematurely acked and the future returns correctly.
|
|
tenant_queue = event_queues.TenantManagementEventQueue(
|
|
self.zk_client, "tenant")
|
|
registry = event_queues.PipelineManagementEventQueue.createRegistry(
|
|
self.zk_client
|
|
)
|
|
|
|
event = model.PromoteEvent('tenant', 'check', ['1234,1'])
|
|
result_future = tenant_queue.put(event, needs_result=False)
|
|
self.assertIsNone(result_future)
|
|
|
|
result_future = tenant_queue.put(event)
|
|
self.assertIsNotNone(result_future)
|
|
|
|
self.assertEqual(len(tenant_queue), 2)
|
|
self.assertTrue(tenant_queue.hasEvents())
|
|
|
|
pipeline_queue = registry["tenant"]["pipeline"]
|
|
self.assertIsInstance(
|
|
pipeline_queue, event_queues.ManagementEventQueue
|
|
)
|
|
acked = 0
|
|
for event in tenant_queue:
|
|
self.assertIsInstance(event, model.PromoteEvent)
|
|
# Forward event to pipeline management event queue
|
|
pipeline_queue.put(event)
|
|
tenant_queue.ackWithoutResult(event)
|
|
acked += 1
|
|
|
|
self.assertEqual(acked, 2)
|
|
# Event was just forwarded and since we expect a result, the
|
|
# future should not be completed yet.
|
|
self.assertFalse(result_future.wait(0.1))
|
|
|
|
self.assertEqual(len(tenant_queue), 0)
|
|
self.assertFalse(tenant_queue.hasEvents())
|
|
|
|
self.assertEqual(len(pipeline_queue), 2)
|
|
self.assertTrue(pipeline_queue.hasEvents())
|
|
|
|
acked = 0
|
|
for event in pipeline_queue:
|
|
self.assertIsInstance(event, model.PromoteEvent)
|
|
pipeline_queue.ack(event)
|
|
acked += 1
|
|
|
|
self.assertEqual(acked, 2)
|
|
self.assertTrue(result_future.wait(5))
|
|
self.assertEqual(len(pipeline_queue), 0)
|
|
self.assertFalse(pipeline_queue.hasEvents())
|
|
|
|
def test_management_events_client(self):
|
|
# Test management events from a second client
|
|
|
|
queue = event_queues.TenantManagementEventQueue(
|
|
self.zk_client, "tenant")
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
# This client will submit a reconfigure event and wait for it.
|
|
external_client = ZooKeeperClient(
|
|
self.zk_chroot_fixture.zk_hosts,
|
|
tls_cert=self.zk_chroot_fixture.zookeeper_cert,
|
|
tls_key=self.zk_chroot_fixture.zookeeper_key,
|
|
tls_ca=self.zk_chroot_fixture.zookeeper_ca)
|
|
self.addCleanup(external_client.disconnect)
|
|
external_client.connect()
|
|
|
|
external_queue = event_queues.TenantManagementEventQueue(
|
|
external_client, "tenant")
|
|
|
|
event = model.ReconfigureEvent()
|
|
result_future = external_queue.put(event)
|
|
self.assertIsNotNone(result_future)
|
|
|
|
self.assertEqual(len(queue), 1)
|
|
self.assertTrue(queue.hasEvents())
|
|
self.assertFalse(result_future.wait(0.1))
|
|
|
|
acked = 0
|
|
for event in queue:
|
|
self.assertIsInstance(event, model.ReconfigureEvent)
|
|
queue.ack(event)
|
|
acked += 1
|
|
|
|
self.assertEqual(acked, 1)
|
|
self.assertTrue(result_future.wait(5))
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
def test_management_events_client_disconnect(self):
|
|
# Test management events from a second client which
|
|
# disconnects before the event is complete.
|
|
|
|
queue = event_queues.TenantManagementEventQueue(
|
|
self.zk_client, "tenant")
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
# This client will submit a reconfigure event and disconnect
|
|
# before it's complete.
|
|
external_client = ZooKeeperClient(
|
|
self.zk_chroot_fixture.zk_hosts,
|
|
tls_cert=self.zk_chroot_fixture.zookeeper_cert,
|
|
tls_key=self.zk_chroot_fixture.zookeeper_key,
|
|
tls_ca=self.zk_chroot_fixture.zookeeper_ca)
|
|
self.addCleanup(external_client.disconnect)
|
|
external_client.connect()
|
|
|
|
external_queue = event_queues.TenantManagementEventQueue(
|
|
external_client, "tenant")
|
|
|
|
# Submit the event
|
|
event = model.ReconfigureEvent()
|
|
result_future = external_queue.put(event)
|
|
self.assertIsNotNone(result_future)
|
|
|
|
# Make sure the event is in the queue and the result node exists
|
|
self.assertEqual(len(queue), 1)
|
|
self.assertTrue(queue.hasEvents())
|
|
self.assertFalse(result_future.wait(0.1))
|
|
self.assertEqual(len(
|
|
self.zk_client.client.get_children('/zuul/results/management')), 1)
|
|
|
|
# Disconnect the originating client
|
|
external_client.disconnect()
|
|
# Ensure the result node is gone
|
|
self.assertEqual(len(
|
|
self.zk_client.client.get_children('/zuul/results/management')), 0)
|
|
|
|
# Process the event
|
|
acked = 0
|
|
for event in queue:
|
|
self.assertIsInstance(event, model.ReconfigureEvent)
|
|
queue.ack(event)
|
|
acked += 1
|
|
|
|
# Make sure the event has been processed and we didn't
|
|
# re-create the result node.
|
|
self.assertEqual(acked, 1)
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
self.assertEqual(len(
|
|
self.zk_client.client.get_children('/zuul/results/management')), 0)
|
|
|
|
|
|
class TestResultEventQueue(EventQueueBaseTestCase):
|
|
|
|
def test_pipeline_result_events(self):
|
|
# Test enqueue/dequeue of result events.
|
|
registry = event_queues.PipelineResultEventQueue.createRegistry(
|
|
self.zk_client
|
|
)
|
|
|
|
queue = registry["tenant"]["pipeline"]
|
|
self.assertIsInstance(queue, event_queues.PipelineResultEventQueue)
|
|
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
event = model.BuildStartedEvent(
|
|
"build", "buildset", "job", "build_request_path", {})
|
|
queue.put(event)
|
|
|
|
self.assertEqual(len(queue), 1)
|
|
self.assertTrue(queue.hasEvents())
|
|
|
|
other_queue = registry["other_tenant"]["pipeline"]
|
|
self.assertEqual(len(other_queue), 0)
|
|
self.assertFalse(other_queue.hasEvents())
|
|
|
|
acked = 0
|
|
for event in queue:
|
|
self.assertIsInstance(event, model.BuildStartedEvent)
|
|
queue.ack(event)
|
|
acked += 1
|
|
|
|
self.assertEqual(acked, 1)
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
|
|
class TestEventWatchers(EventQueueBaseTestCase):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.driver = DummyDriver()
|
|
self.connections.registerDriver(self.driver)
|
|
|
|
def _wait_for_event(self, event):
|
|
for _ in iterate_timeout(5, "event set"):
|
|
if event.is_set():
|
|
break
|
|
|
|
def test_tenant_event_watcher(self):
|
|
event = threading.Event()
|
|
event_queues.EventWatcher(self.zk_client, event.set)
|
|
|
|
management_queue = (
|
|
event_queues.TenantManagementEventQueue.createRegistry(
|
|
self.zk_client)
|
|
)
|
|
trigger_queue = event_queues.TenantTriggerEventQueue.createRegistry(
|
|
self.zk_client, self.connections
|
|
)
|
|
self.assertFalse(event.is_set())
|
|
|
|
management_queue["tenant"].put(model.ReconfigureEvent(),
|
|
needs_result=False)
|
|
self._wait_for_event(event)
|
|
event.clear()
|
|
|
|
trigger_queue["tenant"].put(self.driver.driver_name,
|
|
DummyTriggerEvent())
|
|
self._wait_for_event(event)
|
|
|
|
def test_pipeline_event_watcher(self):
|
|
event = threading.Event()
|
|
event_queues.EventWatcher(self.zk_client, event.set)
|
|
|
|
management_queues = (
|
|
event_queues.PipelineManagementEventQueue.createRegistry(
|
|
self.zk_client
|
|
)
|
|
)
|
|
trigger_queues = event_queues.PipelineTriggerEventQueue.createRegistry(
|
|
self.zk_client, self.connections
|
|
)
|
|
result_queues = event_queues.PipelineResultEventQueue.createRegistry(
|
|
self.zk_client
|
|
)
|
|
self.assertFalse(event.is_set())
|
|
|
|
management_queues["tenant"]["check"].put(model.ReconfigureEvent())
|
|
self._wait_for_event(event)
|
|
event.clear()
|
|
|
|
trigger_queues["tenant"]["gate"].put(self.driver.driver_name,
|
|
DummyTriggerEvent())
|
|
self._wait_for_event(event)
|
|
event.clear()
|
|
|
|
result_event = model.BuildStartedEvent(
|
|
"build", "buildset", "job", "build_request_path", {})
|
|
result_queues["other-tenant"]["post"].put(result_event)
|
|
self._wait_for_event(event)
|
|
|
|
|
|
class TestConnectionEventQueue(EventQueueBaseTestCase):
|
|
|
|
def test_connection_events(self):
|
|
# Test enqueue/dequeue of the connection event queue.
|
|
queue = event_queues.ConnectionEventQueue(self.zk_client, "dummy")
|
|
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
payload = {"message": "hello world!"}
|
|
queue.put(payload)
|
|
queue.put(payload)
|
|
|
|
self.assertEqual(len(queue), 2)
|
|
self.assertTrue(queue.hasEvents())
|
|
|
|
acked = 0
|
|
for event in queue:
|
|
self.assertIsInstance(event, model.ConnectionEvent)
|
|
self.assertEqual(event, payload)
|
|
queue.ack(event)
|
|
acked += 1
|
|
|
|
self.assertEqual(acked, 2)
|
|
self.assertEqual(len(queue), 0)
|
|
self.assertFalse(queue.hasEvents())
|
|
|
|
def test_event_watch(self):
|
|
# Test the registered function is called on new events.
|
|
queue = event_queues.ConnectionEventQueue(self.zk_client, "dummy")
|
|
|
|
event = threading.Event()
|
|
queue.registerEventWatch(event.set)
|
|
self.assertFalse(event.is_set())
|
|
|
|
queue.put({"message": "hello world!"})
|
|
for _ in iterate_timeout(5, "event set"):
|
|
if event.is_set():
|
|
break
|