Periodically cleanup leaked pipeline state

So far we did not cleanup the pipeline state and event queues of deleted
pipelines. To fix that we'll remove the data of pipelines that are no
longer part of the tenant layout in a periodic cleanup task as part of
the general cleanup.

To support the use-case when a pipeline is added back we also need to
initialize the event queues during a tenant reconfiguration. The local
event queue registry usually takes care of creating the queues when the
even queue is first accessed. However, the old event queue object could
still be cached in the registry when we remove and re-add a pipeline.

For the same use-case we also need to remove the pipeline from the list
of watches in the event watcher. Otherwise we won't re-create the
children watch when the pipeline is added.

Change-Id: I02127fe462cc390c81330e717be55780bc2535eb
This commit is contained in:
Simon Westphahl 2023-01-19 15:59:01 +01:00
parent 19668e0bc7
commit a2b114e1a3
No known key found for this signature in database
4 changed files with 132 additions and 0 deletions

View File

@ -637,6 +637,36 @@ class TestEventWatchers(EventQueueBaseTestCase):
result_queues["other-tenant"]["post"].put(result_event)
self._wait_for_event(event)
def test_pipeline_event_watcher_recreate(self):
event = threading.Event()
watcher = event_queues.EventWatcher(self.zk_client, event.set)
management_queues = (
event_queues.PipelineManagementEventQueue.createRegistry(
self.zk_client
)
)
self.assertFalse(event.is_set())
management_queues["tenant"]["check"].put(model.ReconfigureEvent())
self._wait_for_event(event)
# Wait for the watch to be fully established to avoid race
# conditions, since the event watcher will also ensure that the
# trigger and result event paths exist.
for _ in iterate_timeout(5, "all watches to be established"):
if watcher.watched_pipelines:
break
self.zk_client.client.delete(
event_queues.PIPELINE_NAME_ROOT.format(
tenant="tenant", pipeline="check"), recursive=True)
event.clear()
management_queues["tenant"]["check"].initialize()
management_queues["tenant"]["check"].put(model.ReconfigureEvent())
self._wait_for_event(event)
class TestConnectionEventQueue(EventQueueBaseTestCase):

View File

@ -52,6 +52,7 @@ from tests.base import (
skipIfMultiScheduler,
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.event_queues import PIPELINE_NAME_ROOT
from zuul.zk.layout import LayoutState
from zuul.zk.locks import management_queue_lock
from zuul.zk import zkobject
@ -6508,6 +6509,33 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'MERGED')
def test_leaked_pipeline_cleanup(self):
self.waitUntilSettled()
sched = self.scheds.first.sched
pipeline_state_path = "/zuul/tenant/tenant-one/pipeline/invalid"
self.zk_client.client.ensure_path(pipeline_state_path)
# Create the ZK path as a side-effect of getting the event queue.
sched.pipeline_management_events["tenant-one"]["invalid"]
pipeline_event_queue_path = PIPELINE_NAME_ROOT.format(
tenant="tenant-one", pipeline="invalid")
self.assertIsNotNone(self.zk_client.client.exists(pipeline_state_path))
# Wait for the event watcher to create the event queues
for _ in iterate_timeout(30, "create event queues"):
for event_queue in ("management", "trigger", "result"):
if self.zk_client.client.exists(
f"{pipeline_event_queue_path}/{event_queue}") is None:
break
else:
break
sched._runLeakedPipelineCleanup()
self.assertIsNone(
self.zk_client.client.exists(pipeline_event_queue_path))
self.assertIsNone(self.zk_client.client.exists(pipeline_state_path))
class TestChangeQueues(ZuulTestCase):
tenant_config_file = 'config/change-queues/main.yaml'

View File

@ -23,6 +23,7 @@ import sys
import threading
import time
import traceback
import urllib.parse
import uuid
from contextlib import suppress
from zuul.vendor.contextlib import nullcontext
@ -99,6 +100,8 @@ from zuul.zk.event_queues import (
PipelineManagementEventQueue,
PipelineResultEventQueue,
PipelineTriggerEventQueue,
PIPELINE_ROOT,
PIPELINE_NAME_ROOT,
TENANT_ROOT,
)
from zuul.zk.exceptions import LockException
@ -711,6 +714,7 @@ class Scheduler(threading.Thread):
self._runMergerApiCleanup()
self._runLayoutDataCleanup()
self._runBlobStoreCleanup()
self._runLeakedPipelineCleanup()
self.maintainConnectionCache()
except Exception:
self.log.exception("Error in general cleanup:")
@ -753,6 +757,54 @@ class Scheduler(threading.Thread):
except Exception:
self.log.exception("Error in layout data cleanup:")
def _runLeakedPipelineCleanup(self):
for tenant in self.abide.tenants.values():
try:
with tenant_read_lock(self.zk_client, tenant.name,
blocking=False):
if not self.isTenantLayoutUpToDate(tenant.name):
self.log.debug(
"Skipping leaked pipeline cleanup for tenant %s",
tenant.name)
continue
valid_pipelines = tenant.layout.pipelines.values()
valid_state_paths = set(
p.state.getPath() for p in valid_pipelines)
valid_event_root_paths = set(
PIPELINE_NAME_ROOT.format(
tenant=p.tenant.name, pipeline=p.name)
for p in valid_pipelines)
safe_tenant = urllib.parse.quote_plus(tenant.name)
state_root = f"/zuul/tenant/{safe_tenant}/pipeline"
event_root = PIPELINE_ROOT.format(tenant=tenant.name)
all_state_paths = set(
f"{state_root}/{p}" for p in
self.zk_client.client.get_children(state_root))
all_event_root_paths = set(
f"{event_root}/{p}" for p in
self.zk_client.client.get_children(event_root))
leaked_state_paths = all_state_paths - valid_state_paths
leaked_event_root_paths = (
all_event_root_paths - valid_event_root_paths)
for leaked_path in (
leaked_state_paths | leaked_event_root_paths):
self.log.info("Removing leaked pipeline path %s",
leaked_path)
try:
self.zk_client.client.delete(leaked_path,
recursive=True)
except Exception:
self.log.exception(
"Error removing leaked pipeline path %s in "
"tenant %s", leaked_path, tenant.name)
except LockException:
# We'll cleanup this tenant on the next iteration
pass
def _runBlobStoreCleanup(self):
self.log.debug("Starting blob store cleanup")
try:
@ -1708,10 +1760,18 @@ class Scheduler(threading.Thread):
with old_pipeline.manager.currentContext(context):
self._reconfigureDeletePipeline(old_pipeline)
self.management_events[tenant.name].initialize()
self.trigger_events[tenant.name].initialize()
self.connections.reconfigureDrivers(tenant)
# TODOv3(jeblair): remove postconfig calls?
for pipeline in tenant.layout.pipelines.values():
self.pipeline_management_events[tenant.name][
pipeline.name].initialize()
self.pipeline_trigger_events[tenant.name][
pipeline.name].initialize()
self.pipeline_result_events[tenant.name
][pipeline.name].initialize()
for trigger in pipeline.triggers:
trigger.postConfig(pipeline)
for reporter in pipeline.actions:

View File

@ -144,6 +144,17 @@ class EventWatcher(ZooKeeperSimpleBase):
self.watched_tenants.add(tenant_name)
def _pipelineWatch(self, tenant_name, pipelines):
# Remove pipelines that no longer exists from the watch list so
# we re-register the children watch in case the pipeline is
# added again.
for watched_tenant, pipeline_name in list(self.watched_pipelines):
if watched_tenant != tenant_name:
continue
if pipeline_name in pipelines:
continue
with suppress(KeyError):
self.watched_pipelines.remove((tenant_name, pipeline_name))
for pipeline_name in pipelines:
key = (tenant_name, pipeline_name)
if key in self.watched_pipelines:
@ -205,6 +216,9 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
self.queue_root = queue_root
self.event_root = f'{queue_root}/queue'
self.data_root = f'{queue_root}/data'
self.initialize()
def initialize(self):
self.kazoo_client.ensure_path(self.event_root)
self.kazoo_client.ensure_path(self.data_root)