diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py index 089690564c..503d0cda98 100644 --- a/zuul/zk/event_queues.py +++ b/zuul/zk/event_queues.py @@ -49,7 +49,26 @@ MANAGEMENT_EVENT_TYPE_MAP = { "TenantReconfigureEvent": model.TenantReconfigureEvent, } +# /zuul/events/tenant TENANT_ROOT +# /{tenant} TENANT_NAME_ROOT +# /management TENANT_MANAGEMENT_ROOT +# /trigger TENANT_TRIGGER_ROOT +# /pipelines PIPELINE_ROOT +# /{pipeline} PIPELINE_NAME_ROOT +# /management PIPELINE_MANAGEMENT_ROOT +# /trigger PIPELINE_TRIGGER_ROOT +# /result PIPELINE_RESULT_ROOT + TENANT_ROOT = "/zuul/events/tenant" +TENANT_NAME_ROOT = TENANT_ROOT + "/{tenant}" +TENANT_MANAGEMENT_ROOT = TENANT_NAME_ROOT + "/management" +TENANT_TRIGGER_ROOT = TENANT_NAME_ROOT + "/trigger" +PIPELINE_ROOT = TENANT_NAME_ROOT + "/pipelines" +PIPELINE_NAME_ROOT = PIPELINE_ROOT + "/{pipeline}" +PIPELINE_MANAGEMENT_ROOT = PIPELINE_NAME_ROOT + "/management" +PIPELINE_TRIGGER_ROOT = PIPELINE_NAME_ROOT + "/trigger" +PIPELINE_RESULT_ROOT = PIPELINE_NAME_ROOT + "/result" + CONNECTION_ROOT = "/zuul/events/connection" # This is the path to the serialized from of the event in ZK (along @@ -88,33 +107,39 @@ class EventWatcher(ZooKeeperSimpleBase): def _tenantWatch(self, tenants): for tenant_name in tenants: - tenant_path = "/".join((TENANT_ROOT, tenant_name)) - - if tenant_path in self.watched_tenants: + if tenant_name in self.watched_tenants: continue - events_path = f"{tenant_path}/events" - self.kazoo_client.ensure_path(events_path) - self.kazoo_client.ChildrenWatch( - events_path, self._eventWatch, send_event=True) + for path in (TENANT_MANAGEMENT_ROOT, + TENANT_TRIGGER_ROOT): + path = path.format(tenant=tenant_name) + self.kazoo_client.ensure_path(path) + self.kazoo_client.ChildrenWatch( + path, self._eventWatch, send_event=True) - pipelines_path = f"{tenant_path}/pipelines" + pipelines_path = PIPELINE_ROOT.format(tenant=tenant_name) self.kazoo_client.ensure_path(pipelines_path) self.kazoo_client.ChildrenWatch( pipelines_path, self._makePipelineWatcher(tenant_name)) - self.watched_tenants.add(tenant_path) + self.watched_tenants.add(tenant_name) def _pipelineWatch(self, tenant_name, pipelines): for pipeline_name in pipelines: - pipeline_path = "/".join((TENANT_ROOT, tenant_name, "pipelines", - pipeline_name)) - if pipeline_path in self.watched_pipelines: + key = (tenant_name, pipeline_name) + if key in self.watched_pipelines: continue - self.kazoo_client.ChildrenWatch( - pipeline_path, self._eventWatch, send_event=True) - self.watched_pipelines.add(pipeline_path) + for path in (PIPELINE_MANAGEMENT_ROOT, + PIPELINE_TRIGGER_ROOT, + PIPELINE_RESULT_ROOT): + path = path.format(tenant=tenant_name, + pipeline=pipeline_name) + + self.kazoo_client.ensure_path(path) + self.kazoo_client.ChildrenWatch( + path, self._eventWatch, send_event=True) + self.watched_pipelines.add(key) def _eventWatch(self, event_list, event=None): if event is None: @@ -127,7 +152,32 @@ class EventWatcher(ZooKeeperSimpleBase): class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable): - """Abstract API for events via ZooKeeper""" + """Abstract API for events via ZooKeeper + + The lifecycle of a global (not pipeline-specific) event is: + + * Serialized form of event added to ZK queue. + + * During queue processing, events are de-serialized and + AbstractEvent subclasses are instantiated. An EventAckRef is + associated with the event instance in order to maintain the link + to the serialized form. + + * When event processing is complete, the EventAckRef is used to + delete the original event. If the event requires a result + (e.g., a management event that returns data) the result will be + written to a pre-determined location. A future can watch for + the result to appear at that location. + + Pipeline specific events usually start out as global events, but + upon processing, may be forwarded to pipeline-specific queues. In + these cases, the original event will be deleted as above, and a + new, identical event will be created in the pipeline-specific + queue. If the event expects a result, no result will be written + upon forwarding; the result will only be written when the + forwarded event is processed. + + """ log = logging.getLogger("zuul.zk.event_queues.ZooKeeperEventQueue") @@ -141,7 +191,8 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable): def __len__(self): try: - return len(self._listEvents()) + data, stat = self.kazoo_client.get(self.event_root) + return stat.children_count except NoNodeError: return 0 @@ -199,52 +250,6 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable): self.kazoo_client.delete(path, version=version, recursive=True) -class SchedulerEventQueue(ZooKeeperEventQueue): - """Abstract API for tenant specific events via ZooKeeper - - The lifecycle of a global (not pipeline-specific) event is: - - * Serialized form of event added to ZK queue. - - * During queue processing, events are de-serialized and - AbstractEvent subclasses are instantiated. An EventAckRef is - associated with the event instance in order to maintain the link - to the serialized form. - - * When event processing is complete, the EventAckRef is used to - delete the original event. If the event requires a result - (e.g., a management event that returns data) the result will be - written to a pre-determined location. A future can watch for - the result to appear at that location. - - Pipeline specific events usually start out as global events, but - upon processing, may be forwarded to pipeline-specific queues. In - these cases, the original event will be deleted as above, and a - new, identical event will be created in the pipeline-specific - queue. If the event expects a result, no result will be written - upon forwarding; the result will only be written when the - forwarded event is processed. - - """ - - log = logging.getLogger("zuul.zk.event_queues.SchedulerEventQueue") - - def __init__(self, client, event_root, event_prefix): - super().__init__(client, event_root) - self.event_prefix = event_prefix - - def _listEvents(self): - return [ - e - for e in self.kazoo_client.get_children(self.event_root) - if e.startswith(self.event_prefix.value) - ] - - @property - def _event_create_path(self) -> str: - return "{}/{}-".format(self.event_root, self.event_prefix.value) - - class ManagementEventResultFuture(ZooKeeperSimpleBase): """Returned when a management event is put into a queue.""" @@ -292,7 +297,7 @@ class ManagementEventResultFuture(ZooKeeperSimpleBase): return True -class ManagementEventQueue(SchedulerEventQueue): +class ManagementEventQueue(ZooKeeperEventQueue): """Management events via ZooKeeper""" RESULTS_ROOT = "/zuul/results/management" @@ -384,9 +389,10 @@ class PipelineManagementEventQueue(ManagementEventQueue): ) def __init__(self, client, tenant_name, pipeline_name): - event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines", - pipeline_name)) - super().__init__(client, event_root, EventPrefix.MANAGEMENT) + event_root = PIPELINE_MANAGEMENT_ROOT.format( + tenant=tenant_name, + pipeline=pipeline_name) + super().__init__(client, event_root) @classmethod def createRegistry(cls, client): @@ -411,8 +417,9 @@ class TenantManagementEventQueue(ManagementEventQueue): log = logging.getLogger("zuul.zk.event_queues.TenantManagementEventQueue") def __init__(self, client, tenant_name): - event_root = "/".join((TENANT_ROOT, tenant_name, "events")) - super().__init__(client, event_root, EventPrefix.MANAGEMENT) + event_root = TENANT_MANAGEMENT_ROOT.format( + tenant=tenant_name) + super().__init__(client, event_root) def ackWithoutResult(self, event): """ @@ -437,15 +444,16 @@ class TenantManagementEventQueue(ManagementEventQueue): return DefaultKeyDict(lambda t: cls(client, t)) -class PipelineResultEventQueue(SchedulerEventQueue): +class PipelineResultEventQueue(ZooKeeperEventQueue): """Result events via ZooKeeper""" log = logging.getLogger("zuul.zk.event_queues.PipelineResultEventQueue") def __init__(self, client, tenant_name, pipeline_name): - event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines", - pipeline_name)) - super().__init__(client, event_root, EventPrefix.RESULT) + event_root = PIPELINE_RESULT_ROOT.format( + tenant=tenant_name, + pipeline=pipeline_name) + super().__init__(client, event_root) @classmethod def createRegistry(cls, client): @@ -486,14 +494,14 @@ class PipelineResultEventQueue(SchedulerEventQueue): yield event -class TriggerEventQueue(SchedulerEventQueue): +class TriggerEventQueue(ZooKeeperEventQueue): """Trigger events via ZooKeeper""" log = logging.getLogger("zuul.zk.event_queues.TriggerEventQueue") def __init__(self, client, event_root, connections): self.connections = connections - super().__init__(client, event_root, EventPrefix.TRIGGER) + super().__init__(client, event_root) def put(self, driver_name, event): data = { @@ -523,7 +531,8 @@ class TenantTriggerEventQueue(TriggerEventQueue): log = logging.getLogger("zuul.zk.event_queues.TenantTriggerEventQueue") def __init__(self, client, connections, tenant_name): - event_root = "/".join((TENANT_ROOT, tenant_name, "events")) + event_root = TENANT_TRIGGER_ROOT.format( + tenant=tenant_name) super().__init__(client, event_root, connections) @classmethod @@ -544,8 +553,9 @@ class PipelineTriggerEventQueue(TriggerEventQueue): log = logging.getLogger("zuul.zk.event_queues.PipelineTriggerEventQueue") def __init__(self, client, tenant_name, pipeline_name, connections): - event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines", - pipeline_name)) + event_root = PIPELINE_TRIGGER_ROOT.format( + tenant=tenant_name, + pipeline=pipeline_name) super().__init__(client, event_root, connections) @classmethod