Implementation of Zookeeper event watcher

With the event queues in Zookeeper we need a way to set the scheduler's
wake event in case there are new events available.

Change-Id: I4dfd2bbee1fc9b8f738c4795cdc0e86e6bf95ce5
This commit is contained in:
Simon Westphahl 2020-11-19 13:22:39 +01:00
parent b23b9d7844
commit be8d216629
2 changed files with 131 additions and 1 deletions

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
from unittest.mock import patch
import testtools
@ -21,7 +22,7 @@ from zuul.driver import Driver, TriggerInterface
from zuul.lib.connections import ConnectionRegistry
from zuul.zk import ZooKeeperClient, event_queues
from tests.base import BaseTestCase
from tests.base import BaseTestCase, iterate_timeout
class EventQueueBaseTestCase(BaseTestCase):
@ -365,3 +366,66 @@ class TestResultEventQueue(EventQueueBaseTestCase):
self.assertEqual(acked, 1)
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
@patch.dict(event_queues.RESULT_EVENT_TYPE_MAP,
{"DummyResultEvent": DummyResultEvent})
class TestEventWatchers(EventQueueBaseTestCase):
def setUp(self):
super().setUp()
self.driver = DummyDriver()
self.connections.registerDriver(self.driver)
def _wait_for_event(self, event):
for _ in iterate_timeout(5, "event set"):
if event.is_set():
break
def test_global_event_watcher(self):
event = threading.Event()
event_queues.GlobalEventWatcher(self.zk_client, event.set)
management_queue = event_queues.GlobalManagementEventQueue(
self.zk_client
)
trigger_queue = event_queues.GlobalTriggerEventQueue(
self.zk_client, self.connections
)
self.assertFalse(event.is_set())
management_queue.put(model.ReconfigureEvent(None), needs_result=False)
self._wait_for_event(event)
event.clear()
trigger_queue.put(self.driver.driver_name, DummyTriggerEvent())
self._wait_for_event(event)
def test_pipeline_event_watcher(self):
event = threading.Event()
event_queues.PipelineEventWatcher(self.zk_client, event.set)
management_queues = (
event_queues.PipelineManagementEventQueue.createRegistry(
self.zk_client
)
)
trigger_queues = event_queues.PipelineTriggerEventQueue.createRegistry(
self.zk_client, self.connections
)
result_queues = event_queues.PipelineResultEventQueue.createRegistry(
self.zk_client
)
self.assertFalse(event.is_set())
management_queues["tenant"]["check"].put(model.ReconfigureEvent(None))
self._wait_for_event(event)
event.clear()
trigger_queues["tenant"]["gate"].put(self.driver.driver_name,
DummyTriggerEvent())
self._wait_for_event(event)
event.clear()
result_queues["other-tenant"]["post"].put(DummyResultEvent())
self._wait_for_event(event)

View File

@ -23,6 +23,7 @@ from collections.abc import Iterable
from contextlib import suppress
from kazoo.exceptions import NoNodeError
from kazoo.protocol.states import EventType
from zuul import model
from zuul.lib.collections import DefaultKeyDict
@ -66,6 +67,71 @@ class EventPrefix(enum.Enum):
TRIGGER = "300"
class GlobalEventWatcher(ZooKeeperBase):
log = logging.getLogger("zuul.zk.event_queues.EventQueueWatcher")
def __init__(self, client, callback):
super().__init__(client)
self.callback = callback
self.kazoo_client.ensure_path(SCHEDULER_GLOBAL_ROOT)
self.kazoo_client.ChildrenWatch(
SCHEDULER_GLOBAL_ROOT, self._eventWatch
)
def _eventWatch(self, event_list):
if event_list:
self.callback()
class PipelineEventWatcher(ZooKeeperBase):
log = logging.getLogger("zuul.zk.event_queues.EventQueueWatcher")
def __init__(self, client, callback):
super().__init__(client)
self.callback = callback
self.watched_tenants = set()
self.watched_pipelines = set()
self.kazoo_client.ensure_path(TENANT_ROOT)
self.kazoo_client.ChildrenWatch(TENANT_ROOT, self._tenantWatch)
def _tenantWatch(self, tenants):
for tenant_name in tenants:
tenant_path = "/".join((TENANT_ROOT, tenant_name))
if tenant_path in self.watched_tenants:
continue
self.kazoo_client.ChildrenWatch(
tenant_path,
lambda p: self._pipelineWatch(tenant_name, p),
)
self.watched_tenants.add(tenant_path)
def _pipelineWatch(self, tenant_name, pipelines):
for pipeline_name in pipelines:
pipeline_path = "/".join((TENANT_ROOT, tenant_name, pipeline_name))
if pipeline_path in self.watched_pipelines:
continue
self.kazoo_client.ChildrenWatch(
pipeline_path,
self._eventWatch,
send_event=True,
)
self.watched_pipelines.add(pipeline_path)
def _eventWatch(self, event_list, event=None):
if event is None:
# Handle initial call when the watch is created. If there are
# already events in the queue we trigger the callback.
if event_list:
self.callback()
elif event.type == EventType.CHILD:
self.callback()
class ZooKeeperEventQueue(ZooKeeperBase, Iterable):
"""Abstract API for tenant specific events via ZooKeeper