Verify nodes and requests are not leaked

Check that at the end of every test, there are no outstanding
nodepool requests and no locked nodes.

Move final state assertions into the tearDown method so that
they run right after the end of the test but before any
cleanup handlers are called (which can interfere with the
assertion checking by, say, deleting the zookeeper tree we
are trying to check).  Move the cleanup in test_webapp to
tearDown so that it ends the paused job that the tests in
that class use before the assertion check.

Fix some bugs uncovered by this testing:

* Two typos.
* When we re-launch a job, we need a new nodeset, so make sure
  to remove the nodeset from the buildset after the build
  completes if we are going to retry the build.
* Always report build results to the scheduler even for non-current
  buildsets so that it can return used nodes for aborted builds.
* Have the scheduler return the nodeset for a completed build rather
  than the pipeline manager to avoid the edge case where a build
  result is returned after a configuration that removes the pipeline
  (and therefore, there is no longer a manager to return the nodeset).
* When canceling jobs, return nodesets for any jobs which do not yet
  have builds (such as jobs which have nodes but have not yet
  launched).
* Return nodes for skipped jobs.

Normalize the debug messages in nodepool.py.

Change-Id: I32f6807ac95034fc2636993824f4a45ffe7c59d8
This commit is contained in:
James E. Blair 2017-01-05 11:17:28 -08:00
parent ce001e11a8
commit e18d460e47
7 changed files with 94 additions and 29 deletions

View File

@ -874,6 +874,7 @@ class FakeSwiftClientConnection(swiftclient.client.Connection):
class FakeNodepool(object): class FakeNodepool(object):
REQUEST_ROOT = '/nodepool/requests' REQUEST_ROOT = '/nodepool/requests'
NODE_ROOT = '/nodepool/nodes'
log = logging.getLogger("zuul.test.FakeNodepool") log = logging.getLogger("zuul.test.FakeNodepool")
@ -918,6 +919,28 @@ class FakeNodepool(object):
reqs.append(data) reqs.append(data)
return reqs return reqs
def getNodes(self):
try:
nodeids = self.client.get_children(self.NODE_ROOT)
except kazoo.exceptions.NoNodeError:
return []
nodes = []
for oid in sorted(nodeids):
path = self.NODE_ROOT + '/' + oid
data, stat = self.client.get(path)
data = json.loads(data)
data['_oid'] = oid
try:
lockfiles = self.client.get_children(path + '/lock')
except kazoo.exceptions.NoNodeError:
lockfiles = []
if lockfiles:
data['_lock'] = True
else:
data['_lock'] = False
nodes.append(data)
return nodes
def makeNode(self, request_id, node_type): def makeNode(self, request_id, node_type):
now = time.time() now = time.time()
path = '/nodepool/nodes/' path = '/nodepool/nodes/'
@ -1257,9 +1280,12 @@ class ZuulTestCase(BaseTestCase):
self.rpc.start() self.rpc.start()
self.launch_client.gearman.waitForServer() self.launch_client.gearman.waitForServer()
self.addCleanup(self.assertFinalState)
self.addCleanup(self.shutdown) self.addCleanup(self.shutdown)
def tearDown(self):
super(ZuulTestCase, self).tearDown()
self.assertFinalState()
def configure_connections(self): def configure_connections(self):
# Register connections from the config # Register connections from the config
self.smtp_messages = [] self.smtp_messages = []
@ -1366,6 +1392,17 @@ class ZuulTestCase(BaseTestCase):
self.addCommitToRepo(project, 'add content from fixture', self.addCommitToRepo(project, 'add content from fixture',
files, branch='master', tag='init') files, branch='master', tag='init')
def assertNodepoolState(self):
# Make sure that there are no pending requests
requests = self.fake_nodepool.getNodeRequests()
self.assertEqual(len(requests), 0)
nodes = self.fake_nodepool.getNodes()
for node in nodes:
self.assertFalse(node['_lock'], "Node %s is locked" %
(node['_oid'],))
def assertFinalState(self): def assertFinalState(self):
# Make sure that git.Repo objects have been garbage collected. # Make sure that git.Repo objects have been garbage collected.
repos = [] repos = []
@ -1375,6 +1412,7 @@ class ZuulTestCase(BaseTestCase):
repos.append(obj) repos.append(obj)
self.assertEqual(len(repos), 0) self.assertEqual(len(repos), 0)
self.assertEmptyQueues() self.assertEmptyQueues()
self.assertNodepoolState()
ipm = zuul.manager.independent.IndependentPipelineManager ipm = zuul.manager.independent.IndependentPipelineManager
for tenant in self.sched.abide.tenants.values(): for tenant in self.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values(): for pipeline in tenant.layout.pipelines.values():

View File

@ -26,14 +26,8 @@ from tests.base import ZuulTestCase
class TestWebapp(ZuulTestCase): class TestWebapp(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml' tenant_config_file = 'config/single-tenant/main.yaml'
def _cleanup(self):
self.launch_server.hold_jobs_in_build = False
self.launch_server.release()
self.waitUntilSettled()
def setUp(self): def setUp(self):
super(TestWebapp, self).setUp() super(TestWebapp, self).setUp()
self.addCleanup(self._cleanup)
self.launch_server.hold_jobs_in_build = True self.launch_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('code-review', 2) A.addApproval('code-review', 2)
@ -44,6 +38,12 @@ class TestWebapp(ZuulTestCase):
self.waitUntilSettled() self.waitUntilSettled()
self.port = self.webapp.server.socket.getsockname()[1] self.port = self.webapp.server.socket.getsockname()[1]
def tearDown(self):
self.launch_server.hold_jobs_in_build = False
self.launch_server.release()
self.waitUntilSettled()
super(TestWebapp, self).tearDown()
def test_webapp_status(self): def test_webapp_status(self):
"Test that we can filter to only certain changes in the webapp." "Test that we can filter to only certain changes in the webapp."

View File

@ -471,14 +471,13 @@ class LaunchClient(object):
data = getJobData(job) data = getJobData(job)
build.node_labels = data.get('node_labels', []) build.node_labels = data.get('node_labels', [])
build.node_name = data.get('node_name') build.node_name = data.get('node_name')
if not build.canceled: if result is None:
if result is None: result = data.get('result')
result = data.get('result') if result is None:
if result is None: build.retry = True
build.retry = True self.log.info("Build %s complete, result %s" %
self.log.info("Build %s complete, result %s" % (job, result))
(job, result)) self.sched.onBuildCompleted(build, result)
self.sched.onBuildCompleted(build, result)
# The test suite expects the build to be removed from the # The test suite expects the build to be removed from the
# internal dict after it's added to the report queue. # internal dict after it's added to the report queue.
del self.builds[job.unique] del self.builds[job.unique]

View File

@ -395,6 +395,7 @@ class PipelineManager(object):
for req in old_build_set.node_requests.values(): for req in old_build_set.node_requests.values():
self.sched.nodepool.cancelRequest(req) self.sched.nodepool.cancelRequest(req)
old_build_set.node_requests = {} old_build_set.node_requests = {}
canceled_jobs = set()
for build in old_build_set.getBuilds(): for build in old_build_set.getBuilds():
was_running = False was_running = False
try: try:
@ -405,13 +406,18 @@ class PipelineManager(object):
if not was_running: if not was_running:
try: try:
nodeset = build.build_set.getJobNodeSet(build.job.name) nodeset = build.build_set.getJobNodeSet(build.job.name)
self.nodepool.returnNodeset(nodeset) self.sched.nodepool.returnNodeset(nodeset)
except Exception: except Exception:
self.log.exception("Unable to return nodeset %s for " self.log.exception("Unable to return nodeset %s for "
"canceled build request %s" % "canceled build request %s" %
(nodeset, build)) (nodeset, build))
build.result = 'CANCELED' build.result = 'CANCELED'
canceled = True canceled = True
canceled_jobs.add(build.job.name)
for jobname, nodeset in old_build_set.nodesets.items()[:]:
if jobname in canceled_jobs:
continue
self.sched.nodepool.returnNodeset(nodeset)
for item_behind in item.items_behind: for item_behind in item.items_behind:
self.log.debug("Canceling jobs for change %s, behind change %s" % self.log.debug("Canceling jobs for change %s, behind change %s" %
(item_behind.change, item.change)) (item_behind.change, item.change))
@ -609,11 +615,15 @@ class PipelineManager(object):
self.log.debug("Item %s status is now:\n %s" % self.log.debug("Item %s status is now:\n %s" %
(item, item.formatStatus())) (item, item.formatStatus()))
try: if build.retry:
nodeset = build.build_set.getJobNodeSet(build.job.name) build.build_set.removeJobNodeSet(build.job.name)
self.nodepool.returnNodeset(nodeset)
except Exception: # If any jobs were skipped as a result of this build, return
self.log.exception("Unable to return nodeset %s" % (nodeset,)) # their nodes.
for build in build.build_set.getBuilds():
if build.result == 'SKIPPED':
nodeset = build.build_set.getJobNodeSet(build.job.name)
self.sched.nodepool.returnNodeset(nodeset)
return True return True

View File

@ -762,6 +762,11 @@ class BuildSet(object):
# required # required
return self.nodesets.get(job_name) return self.nodesets.get(job_name)
def removeJobNodeSet(self, job_name):
if job_name not in self.nodesets:
raise Exception("No job set for %s" % (job_name))
del self.nodesets[job_name]
def setJobNodeRequest(self, job_name, req): def setJobNodeRequest(self, job_name, req):
if job_name in self.node_requests: if job_name in self.node_requests:
raise Exception("Prior node request for %s" % (job_name)) raise Exception("Prior node request for %s" % (job_name))

View File

@ -28,14 +28,15 @@ class Nodepool(object):
nodeset = job.nodeset.copy() nodeset = job.nodeset.copy()
req = NodeRequest(build_set, job, nodeset) req = NodeRequest(build_set, job, nodeset)
self.requests[req.uid] = req self.requests[req.uid] = req
self.log.debug("Submitting node request: %s" % (req,))
self.sched.zk.submitNodeRequest(req, self._updateNodeRequest) self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
# Logged after submission so that we have the request id
self.log.info("Submited node request %s" % (req,))
return req return req
def cancelRequest(self, request): def cancelRequest(self, request):
self.log.debug("Canceling node request: %s" % (request,)) self.log.info("Canceling node request %s" % (request,))
if request.uid in self.requests: if request.uid in self.requests:
try: try:
self.sched.zk.deleteNodeRequest(request) self.sched.zk.deleteNodeRequest(request)
@ -44,6 +45,7 @@ class Nodepool(object):
del self.requests[request.uid] del self.requests[request.uid]
def useNodeset(self, nodeset): def useNodeset(self, nodeset):
self.log.info("Setting nodeset %s in use" % (nodeset,))
for node in nodeset.getNodes(): for node in nodeset.getNodes():
if node.lock is None: if node.lock is None:
raise Exception("Node %s is not locked" % (node,)) raise Exception("Node %s is not locked" % (node,))
@ -51,6 +53,7 @@ class Nodepool(object):
self.sched.zk.storeNode(node) self.sched.zk.storeNode(node)
def returnNodeset(self, nodeset): def returnNodeset(self, nodeset):
self.log.info("Returning nodeset %s" % (nodeset,))
for node in nodeset.getNodes(): for node in nodeset.getNodes():
if node.lock is None: if node.lock is None:
raise Exception("Node %s is not locked" % (node,)) raise Exception("Node %s is not locked" % (node,))
@ -79,7 +82,7 @@ class Nodepool(object):
locked_nodes = [] locked_nodes = []
try: try:
for node in nodes: for node in nodes:
self.log.debug("Locking node: %s" % (node,)) self.log.debug("Locking node %s" % (node,))
self.sched.zk.lockNode(node) self.sched.zk.lockNode(node)
locked_nodes.append(node) locked_nodes.append(node)
except Exception: except Exception:
@ -90,7 +93,7 @@ class Nodepool(object):
def _updateNodeRequest(self, request, deleted): def _updateNodeRequest(self, request, deleted):
# Return False to indicate that we should stop watching the # Return False to indicate that we should stop watching the
# node. # node.
self.log.debug("Updating node request: %s" % (request,)) self.log.debug("Updating node request %s" % (request,))
if request.uid not in self.requests: if request.uid not in self.requests:
return False return False
@ -114,7 +117,7 @@ class Nodepool(object):
# Called by the scheduler when it wants to accept and lock # Called by the scheduler when it wants to accept and lock
# nodes for (potential) use. # nodes for (potential) use.
self.log.debug("Accepting node request: %s" % (request,)) self.log.info("Accepting node request %s" % (request,))
# First, try to lock the nodes. # First, try to lock the nodes.
locked = False locked = False
@ -127,7 +130,7 @@ class Nodepool(object):
# Regardless of whether locking succeeded, delete the # Regardless of whether locking succeeded, delete the
# request. # request.
self.log.debug("Deleting node request: %s" % (request,)) self.log.debug("Deleting node request %s" % (request,))
try: try:
self.sched.zk.deleteNodeRequest(request) self.sched.zk.deleteNodeRequest(request)
except Exception: except Exception:

View File

@ -767,9 +767,19 @@ class Scheduler(threading.Thread):
def _doBuildCompletedEvent(self, event): def _doBuildCompletedEvent(self, event):
build = event.build build = event.build
# Regardless of any other conditions which might cause us not
# to pass this on to the pipeline manager, make sure we return
# the nodes to nodepool.
try:
nodeset = build.build_set.getJobNodeSet(build.job.name)
self.nodepool.returnNodeset(nodeset)
except Exception:
self.log.exception("Unable to return nodeset %s" % (nodeset,))
if build.build_set is not build.build_set.item.current_build_set: if build.build_set is not build.build_set.item.current_build_set:
self.log.warning("Build %s is not in the current build set" % self.log.debug("Build %s is not in the current build set" %
(build,)) (build,))
return return
pipeline = build.build_set.item.pipeline pipeline = build.build_set.item.pipeline
if not pipeline: if not pipeline: