Split ZK event queues by type

We're currently storing all of (trigger, result, management) events
in a single queue per tenant or per-pipeline.  This means that in
order to iterate over just the management events, we need to
filter the list by name.

By splitting the event queues, we will be able to deal with smaller
numbers of events at a time, and can more quickly calculate event
queue length.

Change-Id: I2c01e51a6f9c55ba521d26416db16dfba068246d
This commit is contained in:
James E. Blair 2021-07-19 14:31:54 -07:00
parent a2b8b975d0
commit fc78352b18
1 changed files with 88 additions and 78 deletions

View File

@ -49,7 +49,26 @@ MANAGEMENT_EVENT_TYPE_MAP = {
"TenantReconfigureEvent": model.TenantReconfigureEvent,
}
# /zuul/events/tenant TENANT_ROOT
# /{tenant} TENANT_NAME_ROOT
# /management TENANT_MANAGEMENT_ROOT
# /trigger TENANT_TRIGGER_ROOT
# /pipelines PIPELINE_ROOT
# /{pipeline} PIPELINE_NAME_ROOT
# /management PIPELINE_MANAGEMENT_ROOT
# /trigger PIPELINE_TRIGGER_ROOT
# /result PIPELINE_RESULT_ROOT
TENANT_ROOT = "/zuul/events/tenant"
TENANT_NAME_ROOT = TENANT_ROOT + "/{tenant}"
TENANT_MANAGEMENT_ROOT = TENANT_NAME_ROOT + "/management"
TENANT_TRIGGER_ROOT = TENANT_NAME_ROOT + "/trigger"
PIPELINE_ROOT = TENANT_NAME_ROOT + "/pipelines"
PIPELINE_NAME_ROOT = PIPELINE_ROOT + "/{pipeline}"
PIPELINE_MANAGEMENT_ROOT = PIPELINE_NAME_ROOT + "/management"
PIPELINE_TRIGGER_ROOT = PIPELINE_NAME_ROOT + "/trigger"
PIPELINE_RESULT_ROOT = PIPELINE_NAME_ROOT + "/result"
CONNECTION_ROOT = "/zuul/events/connection"
# This is the path to the serialized from of the event in ZK (along
@ -88,33 +107,39 @@ class EventWatcher(ZooKeeperSimpleBase):
def _tenantWatch(self, tenants):
for tenant_name in tenants:
tenant_path = "/".join((TENANT_ROOT, tenant_name))
if tenant_path in self.watched_tenants:
if tenant_name in self.watched_tenants:
continue
events_path = f"{tenant_path}/events"
self.kazoo_client.ensure_path(events_path)
self.kazoo_client.ChildrenWatch(
events_path, self._eventWatch, send_event=True)
for path in (TENANT_MANAGEMENT_ROOT,
TENANT_TRIGGER_ROOT):
path = path.format(tenant=tenant_name)
self.kazoo_client.ensure_path(path)
self.kazoo_client.ChildrenWatch(
path, self._eventWatch, send_event=True)
pipelines_path = f"{tenant_path}/pipelines"
pipelines_path = PIPELINE_ROOT.format(tenant=tenant_name)
self.kazoo_client.ensure_path(pipelines_path)
self.kazoo_client.ChildrenWatch(
pipelines_path, self._makePipelineWatcher(tenant_name))
self.watched_tenants.add(tenant_path)
self.watched_tenants.add(tenant_name)
def _pipelineWatch(self, tenant_name, pipelines):
for pipeline_name in pipelines:
pipeline_path = "/".join((TENANT_ROOT, tenant_name, "pipelines",
pipeline_name))
if pipeline_path in self.watched_pipelines:
key = (tenant_name, pipeline_name)
if key in self.watched_pipelines:
continue
self.kazoo_client.ChildrenWatch(
pipeline_path, self._eventWatch, send_event=True)
self.watched_pipelines.add(pipeline_path)
for path in (PIPELINE_MANAGEMENT_ROOT,
PIPELINE_TRIGGER_ROOT,
PIPELINE_RESULT_ROOT):
path = path.format(tenant=tenant_name,
pipeline=pipeline_name)
self.kazoo_client.ensure_path(path)
self.kazoo_client.ChildrenWatch(
path, self._eventWatch, send_event=True)
self.watched_pipelines.add(key)
def _eventWatch(self, event_list, event=None):
if event is None:
@ -127,7 +152,32 @@ class EventWatcher(ZooKeeperSimpleBase):
class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
"""Abstract API for events via ZooKeeper"""
"""Abstract API for events via ZooKeeper
The lifecycle of a global (not pipeline-specific) event is:
* Serialized form of event added to ZK queue.
* During queue processing, events are de-serialized and
AbstractEvent subclasses are instantiated. An EventAckRef is
associated with the event instance in order to maintain the link
to the serialized form.
* When event processing is complete, the EventAckRef is used to
delete the original event. If the event requires a result
(e.g., a management event that returns data) the result will be
written to a pre-determined location. A future can watch for
the result to appear at that location.
Pipeline specific events usually start out as global events, but
upon processing, may be forwarded to pipeline-specific queues. In
these cases, the original event will be deleted as above, and a
new, identical event will be created in the pipeline-specific
queue. If the event expects a result, no result will be written
upon forwarding; the result will only be written when the
forwarded event is processed.
"""
log = logging.getLogger("zuul.zk.event_queues.ZooKeeperEventQueue")
@ -141,7 +191,8 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
def __len__(self):
try:
return len(self._listEvents())
data, stat = self.kazoo_client.get(self.event_root)
return stat.children_count
except NoNodeError:
return 0
@ -199,52 +250,6 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
self.kazoo_client.delete(path, version=version, recursive=True)
class SchedulerEventQueue(ZooKeeperEventQueue):
"""Abstract API for tenant specific events via ZooKeeper
The lifecycle of a global (not pipeline-specific) event is:
* Serialized form of event added to ZK queue.
* During queue processing, events are de-serialized and
AbstractEvent subclasses are instantiated. An EventAckRef is
associated with the event instance in order to maintain the link
to the serialized form.
* When event processing is complete, the EventAckRef is used to
delete the original event. If the event requires a result
(e.g., a management event that returns data) the result will be
written to a pre-determined location. A future can watch for
the result to appear at that location.
Pipeline specific events usually start out as global events, but
upon processing, may be forwarded to pipeline-specific queues. In
these cases, the original event will be deleted as above, and a
new, identical event will be created in the pipeline-specific
queue. If the event expects a result, no result will be written
upon forwarding; the result will only be written when the
forwarded event is processed.
"""
log = logging.getLogger("zuul.zk.event_queues.SchedulerEventQueue")
def __init__(self, client, event_root, event_prefix):
super().__init__(client, event_root)
self.event_prefix = event_prefix
def _listEvents(self):
return [
e
for e in self.kazoo_client.get_children(self.event_root)
if e.startswith(self.event_prefix.value)
]
@property
def _event_create_path(self) -> str:
return "{}/{}-".format(self.event_root, self.event_prefix.value)
class ManagementEventResultFuture(ZooKeeperSimpleBase):
"""Returned when a management event is put into a queue."""
@ -292,7 +297,7 @@ class ManagementEventResultFuture(ZooKeeperSimpleBase):
return True
class ManagementEventQueue(SchedulerEventQueue):
class ManagementEventQueue(ZooKeeperEventQueue):
"""Management events via ZooKeeper"""
RESULTS_ROOT = "/zuul/results/management"
@ -384,9 +389,10 @@ class PipelineManagementEventQueue(ManagementEventQueue):
)
def __init__(self, client, tenant_name, pipeline_name):
event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines",
pipeline_name))
super().__init__(client, event_root, EventPrefix.MANAGEMENT)
event_root = PIPELINE_MANAGEMENT_ROOT.format(
tenant=tenant_name,
pipeline=pipeline_name)
super().__init__(client, event_root)
@classmethod
def createRegistry(cls, client):
@ -411,8 +417,9 @@ class TenantManagementEventQueue(ManagementEventQueue):
log = logging.getLogger("zuul.zk.event_queues.TenantManagementEventQueue")
def __init__(self, client, tenant_name):
event_root = "/".join((TENANT_ROOT, tenant_name, "events"))
super().__init__(client, event_root, EventPrefix.MANAGEMENT)
event_root = TENANT_MANAGEMENT_ROOT.format(
tenant=tenant_name)
super().__init__(client, event_root)
def ackWithoutResult(self, event):
"""
@ -437,15 +444,16 @@ class TenantManagementEventQueue(ManagementEventQueue):
return DefaultKeyDict(lambda t: cls(client, t))
class PipelineResultEventQueue(SchedulerEventQueue):
class PipelineResultEventQueue(ZooKeeperEventQueue):
"""Result events via ZooKeeper"""
log = logging.getLogger("zuul.zk.event_queues.PipelineResultEventQueue")
def __init__(self, client, tenant_name, pipeline_name):
event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines",
pipeline_name))
super().__init__(client, event_root, EventPrefix.RESULT)
event_root = PIPELINE_RESULT_ROOT.format(
tenant=tenant_name,
pipeline=pipeline_name)
super().__init__(client, event_root)
@classmethod
def createRegistry(cls, client):
@ -486,14 +494,14 @@ class PipelineResultEventQueue(SchedulerEventQueue):
yield event
class TriggerEventQueue(SchedulerEventQueue):
class TriggerEventQueue(ZooKeeperEventQueue):
"""Trigger events via ZooKeeper"""
log = logging.getLogger("zuul.zk.event_queues.TriggerEventQueue")
def __init__(self, client, event_root, connections):
self.connections = connections
super().__init__(client, event_root, EventPrefix.TRIGGER)
super().__init__(client, event_root)
def put(self, driver_name, event):
data = {
@ -523,7 +531,8 @@ class TenantTriggerEventQueue(TriggerEventQueue):
log = logging.getLogger("zuul.zk.event_queues.TenantTriggerEventQueue")
def __init__(self, client, connections, tenant_name):
event_root = "/".join((TENANT_ROOT, tenant_name, "events"))
event_root = TENANT_TRIGGER_ROOT.format(
tenant=tenant_name)
super().__init__(client, event_root, connections)
@classmethod
@ -544,8 +553,9 @@ class PipelineTriggerEventQueue(TriggerEventQueue):
log = logging.getLogger("zuul.zk.event_queues.PipelineTriggerEventQueue")
def __init__(self, client, tenant_name, pipeline_name, connections):
event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines",
pipeline_name))
event_root = PIPELINE_TRIGGER_ROOT.format(
tenant=tenant_name,
pipeline=pipeline_name)
super().__init__(client, event_root, connections)
@classmethod