Several merger cleanups

This change contains several merger-related cleanups which seem
distinct but are intertwined.

* Ensure that the merger API space in ZK is empty at the end of all
  tests.  This assures us that we aren't leaking anything.
* Move some ZK untility functions into the base test class.
* Remove the extra zk_client created in the component registry test
  since we can use the normal ZK client.
* The null result value in the merger server is initialized earlier to
  make sure that it is initalized for use in the exception handler.
* The test_branch_delete_full_reconfiguration leaked a result node
  because one of the cat jobs fails, and later cat jobs are run but
  ignored.

To address the last point, we need to make a change to the cat job
handling.  Currently, when a cat job fails, the exception bubbles up
and we simply ignore all the remaining jobs.  The mergers will run
them, write results to ZK, but no one will see those results.  That
would be fine, except that we created a "waiter" node in ZK to
indicate we want to see those results, and as long as it exists, the
results won't be deleted by the garbage collecter, yet we are no
longer waiting for them, so we won't delete them either.

To correct that, we store the merge job request path on the job
future.  Then, when the first cat job fails, we "cancel" all the cat
jobs.  That entails deleting the merge job request if we are able (to
save the mergers from having to do useless work), and regardless of
whether that succeeds, we delete the waiter node in ZK.  If a cat job
happens to be running (and if there's more than one, like in this test
case, it likely is), it will eventually complete and write its result
data.  But since we have removed the waiter node, the periodic cleanup
task will detect it as leaked data and delete.

Change-Id: I49a459debf5a6c032adc60b66bbd8f6a5901bebe
This commit is contained in:
James E. Blair
2021-08-19 14:41:57 -07:00
parent 9432c968ba
commit 6a0b5c419c
9 changed files with 96 additions and 91 deletions

View File

@@ -4210,6 +4210,35 @@ class BaseTestCase(testtools.TestCase):
ChrootedKazooFixture(self.id())
)
def getZKWatches(self):
chroot = self.zk_chroot_fixture.zookeeper_chroot
data = self.zk_client.client.command(b'wchp')
ret = {}
sessions = None
for line in data.split('\n'):
if line.startswith('\t'):
if sessions is not None:
sessions.append(line.strip())
else:
line = line.strip()
if not line:
continue
if line.startswith(chroot):
line = line[len(chroot):]
sessions = []
ret[line] = sessions
else:
sessions = None
return ret
def getZKTree(self, root):
items = []
for x in self.zk_client.client.get_children(root):
path = '/'.join([root, x])
items.append(path)
items.extend(self.getZKTree(path))
return items
class SymLink(object):
def __init__(self, target):
@@ -4953,6 +4982,7 @@ class ZuulTestCase(BaseTestCase):
self.assertNodepoolState()
self.assertNoGeneratedKeys()
self.assertSQLState()
self.assertCleanZooKeeper()
ipm = zuul.manager.independent.IndependentPipelineManager
for tenant in self.scheds.first.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
@@ -5386,6 +5416,16 @@ class ZuulTestCase(BaseTestCase):
self.assertEqual(len(pipeline_queue.queue), 0,
"Pipelines queues should be empty")
def assertCleanZooKeeper(self):
# Make sure there are no extraneous ZK nodes
client = self.merger_api
self.assertEqual(self.getZKTree(client.REQUEST_ROOT), [])
self.assertEqual(self.getZKTree(client.PARAM_ROOT), [])
self.assertEqual(self.getZKTree(client.RESULT_ROOT), [])
self.assertEqual(self.getZKTree(client.RESULT_DATA_ROOT), [])
self.assertEqual(self.getZKTree(client.WAITER_ROOT), [])
self.assertEqual(self.getZKTree(client.LOCK_ROOT), [])
def assertReportedStat(self, key, value=None, kind=None):
"""Check statsd output

View File

@@ -14,7 +14,6 @@
import configparser
from zuul.lib.fingergw import FingerGateway
from zuul.zk import ZooKeeperClient
from zuul.zk.components import BaseComponent, ComponentRegistry
from tests.base import iterate_timeout, ZuulTestCase, ZuulWebFixture
@@ -26,15 +25,6 @@ class TestComponentRegistry(ZuulTestCase):
def setUp(self):
super().setUp()
self.host = '::'
self.zk_client = ZooKeeperClient(
self.zk_chroot_fixture.zk_hosts,
tls_cert=self.zk_chroot_fixture.zookeeper_cert,
tls_key=self.zk_chroot_fixture.zookeeper_key,
tls_ca=self.zk_chroot_fixture.zookeeper_ca,
)
self.addCleanup(self.zk_client.disconnect)
self.zk_client.connect()
self.component_registry = ComponentRegistry(self.zk_client)
def assertComponentAttr(self, component_name, attr_name,
@@ -114,7 +104,7 @@ class TestComponentRegistry(ZuulTestCase):
config.read_dict(self.config)
config.read_dict({
'fingergw': {
'listen_address': self.host,
'listen_address': '::',
'port': '0',
'hostname': 'janine',
}

View File

@@ -485,6 +485,8 @@ class TestBranchDeletion(ZuulTestCase):
self.assertHistory([
dict(name='base', result='SUCCESS', changes='2,1')])
self.scheds.first.sched.merger.merger_api.cleanup(0)
class TestBranchTag(ZuulTestCase):
tenant_config_file = 'config/branch-tag/main.yaml'

View File

@@ -341,35 +341,6 @@ class TestComponentRegistry(ZooKeeperBaseTestCase):
class TestExecutorApi(ZooKeeperBaseTestCase):
def _get_zk_tree(self, root):
items = []
for x in self.zk_client.client.get_children(root):
path = '/'.join([root, x])
items.append(path)
items.extend(self._get_zk_tree(path))
return items
def _get_watches(self):
chroot = self.zk_chroot_fixture.zookeeper_chroot
data = self.zk_client.client.command(b'wchp')
ret = {}
sessions = None
for line in data.split('\n'):
if line.startswith('\t'):
if sessions is not None:
sessions.append(line.strip())
else:
line = line.strip()
if not line:
continue
if line.startswith(chroot):
line = line[len(chroot):]
sessions = []
ret[line] = sessions
else:
sessions = None
return ret
def test_build_request(self):
# Test the lifecycle of a build request
request_queue = queue.Queue()
@@ -450,13 +421,13 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
server.unlock(a)
self.assertEqual(client.get(a.path).state, BuildRequest.COMPLETED)
for _ in iterate_timeout(5, "Wait for watches to be registered"):
if self._get_watches():
if self.getZKWatches():
break
# Scheduler removes build request on completion
client.remove(sched_a)
self.assertEqual(set(self._get_zk_tree('/zuul/executor')),
self.assertEqual(set(self.getZKTree('/zuul/executor')),
set(['/zuul/executor/unzoned',
'/zuul/executor/unzoned/locks',
'/zuul/executor/unzoned/params',
@@ -464,7 +435,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
'/zuul/executor/unzoned/result-data',
'/zuul/executor/unzoned/results',
'/zuul/executor/unzoned/waiters']))
self.assertEqual(self._get_watches(), {})
self.assertEqual(self.getZKWatches(), {})
def test_build_request_remove(self):
# Test the scheduler forcibly removing a request (perhaps the
@@ -713,43 +684,14 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
class TestMergerApi(ZooKeeperBaseTestCase):
def _get_zk_tree(self, root):
items = []
for x in self.zk_client.client.get_children(root):
path = '/'.join([root, x])
items.append(path)
items.extend(self._get_zk_tree(path))
return items
def _get_watches(self):
chroot = self.zk_chroot_fixture.zookeeper_chroot
data = self.zk_client.client.command(b'wchp')
ret = {}
sessions = None
for line in data.split('\n'):
if line.startswith('\t'):
if sessions is not None:
sessions.append(line.strip())
else:
line = line.strip()
if not line:
continue
if line.startswith(chroot):
line = line[len(chroot):]
sessions = []
ret[line] = sessions
else:
sessions = None
return ret
def _assertEmptyRoots(self, client):
self.assertEqual(self._get_zk_tree(client.REQUEST_ROOT), [])
self.assertEqual(self._get_zk_tree(client.PARAM_ROOT), [])
self.assertEqual(self._get_zk_tree(client.RESULT_ROOT), [])
self.assertEqual(self._get_zk_tree(client.RESULT_DATA_ROOT), [])
self.assertEqual(self._get_zk_tree(client.WAITER_ROOT), [])
self.assertEqual(self._get_zk_tree(client.LOCK_ROOT), [])
self.assertEqual(self._get_watches(), {})
self.assertEqual(self.getZKTree(client.REQUEST_ROOT), [])
self.assertEqual(self.getZKTree(client.PARAM_ROOT), [])
self.assertEqual(self.getZKTree(client.RESULT_ROOT), [])
self.assertEqual(self.getZKTree(client.RESULT_DATA_ROOT), [])
self.assertEqual(self.getZKTree(client.WAITER_ROOT), [])
self.assertEqual(self.getZKTree(client.LOCK_ROOT), [])
self.assertEqual(self.getZKWatches(), {})
def test_merge_request(self):
# Test the lifecycle of a merge request
@@ -895,12 +837,12 @@ class TestMergerApi(ZooKeeperBaseTestCase):
result_data = {'result': 'ok'}
server.reportResult(a, result_data)
self.assertEqual(set(self._get_zk_tree(client.RESULT_ROOT)),
self.assertEqual(set(self.getZKTree(client.RESULT_ROOT)),
set(['/zuul/merger/results/A']))
self.assertEqual(set(self._get_zk_tree(client.RESULT_DATA_ROOT)),
self.assertEqual(set(self.getZKTree(client.RESULT_DATA_ROOT)),
set(['/zuul/merger/result-data/A',
'/zuul/merger/result-data/A/0000000000']))
self.assertEqual(self._get_zk_tree(client.WAITER_ROOT),
self.assertEqual(self.getZKTree(client.WAITER_ROOT),
['/zuul/merger/waiters/A'])
# Merger removes and unlocks merge request on completion
@@ -994,12 +936,12 @@ class TestMergerApi(ZooKeeperBaseTestCase):
server.remove(a)
server.unlock(a)
self.assertEqual(set(self._get_zk_tree(client.RESULT_ROOT)),
self.assertEqual(set(self.getZKTree(client.RESULT_ROOT)),
set(['/zuul/merger/results/A']))
self.assertEqual(set(self._get_zk_tree(client.RESULT_DATA_ROOT)),
self.assertEqual(set(self.getZKTree(client.RESULT_DATA_ROOT)),
set(['/zuul/merger/result-data/A',
'/zuul/merger/result-data/A/0000000000']))
self.assertEqual(self._get_zk_tree(client.WAITER_ROOT),
self.assertEqual(self.getZKTree(client.WAITER_ROOT),
['/zuul/merger/waiters/A'])
# Scheduler "disconnects"

View File

@@ -1818,10 +1818,18 @@ class TenantParser(object):
job.ltime = ltime
job.source_context = source_context
jobs.append(job)
# Remove project from scope, so it's not accidentally used in
# the following section.
del project
try:
self._processCatJobs(abide, tenant, loading_errors, jobs)
except Exception:
self.log.debug("Error processing cat jobs, canceling")
for job in jobs:
try:
self.log.debug("Canceling cat job %s", job)
self.merger.cancel(job)
except Exception:
self.log.exception("Unable to cancel job %s", job)
def _processCatJobs(self, abide, tenant, loading_errors, jobs):
for job in jobs:
self.log.debug("Waiting for cat job %s" % (job,))
job.wait(self.merger.git_timeout)

View File

@@ -153,3 +153,17 @@ class MergeClient(object):
# update will fail in this case and we can simply ignore
# the exception.
return
def cancel(self, job):
try:
# Try to remove the request first
request = self.merger_api.get(job.request_path)
if request:
if self.merger_api.lock(request, blocking=False):
try:
self.merger_api.remove(request)
finally:
self.executor_api.unlock(request)
finally:
# Regardless of that, remove the waiter node
job.cancel()

View File

@@ -198,6 +198,7 @@ class BaseMergeServer(metaclass=ABCMeta):
except Exception:
self.log.exception("Error in merge thread:")
time.sleep(5)
self.merger_loop_wake_event.set()
def _runMergeJob(self, merge_request):
log = get_annotated_logger(
@@ -207,6 +208,7 @@ class BaseMergeServer(metaclass=ABCMeta):
if not self.merger_api.lock(merge_request, blocking=False):
return
result = None
try:
merge_request.state = MergeRequest.RUNNING
params = self.merger_api.getParams(merge_request)
@@ -215,7 +217,6 @@ class BaseMergeServer(metaclass=ABCMeta):
# don't loop over and try to lock it again and again.
self.merger_api.update(merge_request)
self.log.debug("Next executed merge job: %s", merge_request)
result = None
try:
result = self.executeMergeJob(merge_request, params)
except Exception:

View File

@@ -445,9 +445,10 @@ class JobResultFuture(EventResultFuture):
log = logging.getLogger("zuul.JobResultFuture")
def __init__(self, client, result_path, waiter_path):
def __init__(self, client, request_path, result_path, waiter_path):
super().__init__(client, result_path)
self.request_path = request_path
self._waiter_path = waiter_path
self._result_data_path = None
self.merged = None
@@ -486,6 +487,12 @@ class JobResultFuture(EventResultFuture):
self.item_in_branches = self.data.get("item_in_branches", [])
return res
def cancel(self):
# Remove our waiter node so that if a result is ever reported,
# it will be garbage collected.
with suppress(NoNodeError):
self.kazoo_client.delete(self._waiter_path)
class ManagementEventResultFuture(EventResultFuture):
"""Returned when a management event is put into a queue."""

View File

@@ -212,7 +212,8 @@ class JobRequestQueue(ZooKeeperSimpleBase):
[self.WAITER_ROOT, request.uuid]
)
self.kazoo_client.create(waiter_path, ephemeral=True)
result = JobResultFuture(self.client, result_path, waiter_path)
result = JobResultFuture(self.client, request.path,
result_path, waiter_path)
request.result_path = result_path
log.debug("Submitting job request to ZooKeeper %s", request)