Merge "Verify nodes and requests are not leaked" into feature/zuulv3

This commit is contained in:
Jenkins 2017-01-18 21:24:07 +00:00 committed by Gerrit Code Review
commit 09d6e4a9e0
7 changed files with 94 additions and 29 deletions

View File

@ -874,6 +874,7 @@ class FakeSwiftClientConnection(swiftclient.client.Connection):
class FakeNodepool(object): class FakeNodepool(object):
REQUEST_ROOT = '/nodepool/requests' REQUEST_ROOT = '/nodepool/requests'
NODE_ROOT = '/nodepool/nodes'
log = logging.getLogger("zuul.test.FakeNodepool") log = logging.getLogger("zuul.test.FakeNodepool")
@ -918,6 +919,28 @@ class FakeNodepool(object):
reqs.append(data) reqs.append(data)
return reqs return reqs
def getNodes(self):
try:
nodeids = self.client.get_children(self.NODE_ROOT)
except kazoo.exceptions.NoNodeError:
return []
nodes = []
for oid in sorted(nodeids):
path = self.NODE_ROOT + '/' + oid
data, stat = self.client.get(path)
data = json.loads(data)
data['_oid'] = oid
try:
lockfiles = self.client.get_children(path + '/lock')
except kazoo.exceptions.NoNodeError:
lockfiles = []
if lockfiles:
data['_lock'] = True
else:
data['_lock'] = False
nodes.append(data)
return nodes
def makeNode(self, request_id, node_type): def makeNode(self, request_id, node_type):
now = time.time() now = time.time()
path = '/nodepool/nodes/' path = '/nodepool/nodes/'
@ -1257,9 +1280,12 @@ class ZuulTestCase(BaseTestCase):
self.rpc.start() self.rpc.start()
self.launch_client.gearman.waitForServer() self.launch_client.gearman.waitForServer()
self.addCleanup(self.assertFinalState)
self.addCleanup(self.shutdown) self.addCleanup(self.shutdown)
def tearDown(self):
super(ZuulTestCase, self).tearDown()
self.assertFinalState()
def configure_connections(self): def configure_connections(self):
# Register connections from the config # Register connections from the config
self.smtp_messages = [] self.smtp_messages = []
@ -1366,6 +1392,17 @@ class ZuulTestCase(BaseTestCase):
self.addCommitToRepo(project, 'add content from fixture', self.addCommitToRepo(project, 'add content from fixture',
files, branch='master', tag='init') files, branch='master', tag='init')
def assertNodepoolState(self):
# Make sure that there are no pending requests
requests = self.fake_nodepool.getNodeRequests()
self.assertEqual(len(requests), 0)
nodes = self.fake_nodepool.getNodes()
for node in nodes:
self.assertFalse(node['_lock'], "Node %s is locked" %
(node['_oid'],))
def assertFinalState(self): def assertFinalState(self):
# Make sure that git.Repo objects have been garbage collected. # Make sure that git.Repo objects have been garbage collected.
repos = [] repos = []
@ -1375,6 +1412,7 @@ class ZuulTestCase(BaseTestCase):
repos.append(obj) repos.append(obj)
self.assertEqual(len(repos), 0) self.assertEqual(len(repos), 0)
self.assertEmptyQueues() self.assertEmptyQueues()
self.assertNodepoolState()
ipm = zuul.manager.independent.IndependentPipelineManager ipm = zuul.manager.independent.IndependentPipelineManager
for tenant in self.sched.abide.tenants.values(): for tenant in self.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values(): for pipeline in tenant.layout.pipelines.values():

View File

@ -25,14 +25,8 @@ from tests.base import ZuulTestCase
class TestWebapp(ZuulTestCase): class TestWebapp(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml' tenant_config_file = 'config/single-tenant/main.yaml'
def _cleanup(self):
self.launch_server.hold_jobs_in_build = False
self.launch_server.release()
self.waitUntilSettled()
def setUp(self): def setUp(self):
super(TestWebapp, self).setUp() super(TestWebapp, self).setUp()
self.addCleanup(self._cleanup)
self.launch_server.hold_jobs_in_build = True self.launch_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('code-review', 2) A.addApproval('code-review', 2)
@ -43,6 +37,12 @@ class TestWebapp(ZuulTestCase):
self.waitUntilSettled() self.waitUntilSettled()
self.port = self.webapp.server.socket.getsockname()[1] self.port = self.webapp.server.socket.getsockname()[1]
def tearDown(self):
self.launch_server.hold_jobs_in_build = False
self.launch_server.release()
self.waitUntilSettled()
super(TestWebapp, self).tearDown()
def test_webapp_status(self): def test_webapp_status(self):
"Test that we can filter to only certain changes in the webapp." "Test that we can filter to only certain changes in the webapp."

View File

@ -471,14 +471,13 @@ class LaunchClient(object):
data = getJobData(job) data = getJobData(job)
build.node_labels = data.get('node_labels', []) build.node_labels = data.get('node_labels', [])
build.node_name = data.get('node_name') build.node_name = data.get('node_name')
if not build.canceled: if result is None:
if result is None: result = data.get('result')
result = data.get('result') if result is None:
if result is None: build.retry = True
build.retry = True self.log.info("Build %s complete, result %s" %
self.log.info("Build %s complete, result %s" % (job, result))
(job, result)) self.sched.onBuildCompleted(build, result)
self.sched.onBuildCompleted(build, result)
# The test suite expects the build to be removed from the # The test suite expects the build to be removed from the
# internal dict after it's added to the report queue. # internal dict after it's added to the report queue.
del self.builds[job.unique] del self.builds[job.unique]

View File

@ -395,6 +395,7 @@ class PipelineManager(object):
for req in old_build_set.node_requests.values(): for req in old_build_set.node_requests.values():
self.sched.nodepool.cancelRequest(req) self.sched.nodepool.cancelRequest(req)
old_build_set.node_requests = {} old_build_set.node_requests = {}
canceled_jobs = set()
for build in old_build_set.getBuilds(): for build in old_build_set.getBuilds():
was_running = False was_running = False
try: try:
@ -405,13 +406,18 @@ class PipelineManager(object):
if not was_running: if not was_running:
try: try:
nodeset = build.build_set.getJobNodeSet(build.job.name) nodeset = build.build_set.getJobNodeSet(build.job.name)
self.nodepool.returnNodeset(nodeset) self.sched.nodepool.returnNodeset(nodeset)
except Exception: except Exception:
self.log.exception("Unable to return nodeset %s for " self.log.exception("Unable to return nodeset %s for "
"canceled build request %s" % "canceled build request %s" %
(nodeset, build)) (nodeset, build))
build.result = 'CANCELED' build.result = 'CANCELED'
canceled = True canceled = True
canceled_jobs.add(build.job.name)
for jobname, nodeset in old_build_set.nodesets.items()[:]:
if jobname in canceled_jobs:
continue
self.sched.nodepool.returnNodeset(nodeset)
for item_behind in item.items_behind: for item_behind in item.items_behind:
self.log.debug("Canceling jobs for change %s, behind change %s" % self.log.debug("Canceling jobs for change %s, behind change %s" %
(item_behind.change, item.change)) (item_behind.change, item.change))
@ -609,11 +615,15 @@ class PipelineManager(object):
self.log.debug("Item %s status is now:\n %s" % self.log.debug("Item %s status is now:\n %s" %
(item, item.formatStatus())) (item, item.formatStatus()))
try: if build.retry:
nodeset = build.build_set.getJobNodeSet(build.job.name) build.build_set.removeJobNodeSet(build.job.name)
self.nodepool.returnNodeset(nodeset)
except Exception: # If any jobs were skipped as a result of this build, return
self.log.exception("Unable to return nodeset %s" % (nodeset,)) # their nodes.
for build in build.build_set.getBuilds():
if build.result == 'SKIPPED':
nodeset = build.build_set.getJobNodeSet(build.job.name)
self.sched.nodepool.returnNodeset(nodeset)
return True return True

View File

@ -762,6 +762,11 @@ class BuildSet(object):
# required # required
return self.nodesets.get(job_name) return self.nodesets.get(job_name)
def removeJobNodeSet(self, job_name):
if job_name not in self.nodesets:
raise Exception("No job set for %s" % (job_name))
del self.nodesets[job_name]
def setJobNodeRequest(self, job_name, req): def setJobNodeRequest(self, job_name, req):
if job_name in self.node_requests: if job_name in self.node_requests:
raise Exception("Prior node request for %s" % (job_name)) raise Exception("Prior node request for %s" % (job_name))

View File

@ -28,14 +28,15 @@ class Nodepool(object):
nodeset = job.nodeset.copy() nodeset = job.nodeset.copy()
req = NodeRequest(build_set, job, nodeset) req = NodeRequest(build_set, job, nodeset)
self.requests[req.uid] = req self.requests[req.uid] = req
self.log.debug("Submitting node request: %s" % (req,))
self.sched.zk.submitNodeRequest(req, self._updateNodeRequest) self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
# Logged after submission so that we have the request id
self.log.info("Submited node request %s" % (req,))
return req return req
def cancelRequest(self, request): def cancelRequest(self, request):
self.log.debug("Canceling node request: %s" % (request,)) self.log.info("Canceling node request %s" % (request,))
if request.uid in self.requests: if request.uid in self.requests:
try: try:
self.sched.zk.deleteNodeRequest(request) self.sched.zk.deleteNodeRequest(request)
@ -44,6 +45,7 @@ class Nodepool(object):
del self.requests[request.uid] del self.requests[request.uid]
def useNodeset(self, nodeset): def useNodeset(self, nodeset):
self.log.info("Setting nodeset %s in use" % (nodeset,))
for node in nodeset.getNodes(): for node in nodeset.getNodes():
if node.lock is None: if node.lock is None:
raise Exception("Node %s is not locked" % (node,)) raise Exception("Node %s is not locked" % (node,))
@ -51,6 +53,7 @@ class Nodepool(object):
self.sched.zk.storeNode(node) self.sched.zk.storeNode(node)
def returnNodeset(self, nodeset): def returnNodeset(self, nodeset):
self.log.info("Returning nodeset %s" % (nodeset,))
for node in nodeset.getNodes(): for node in nodeset.getNodes():
if node.lock is None: if node.lock is None:
raise Exception("Node %s is not locked" % (node,)) raise Exception("Node %s is not locked" % (node,))
@ -79,7 +82,7 @@ class Nodepool(object):
locked_nodes = [] locked_nodes = []
try: try:
for node in nodes: for node in nodes:
self.log.debug("Locking node: %s" % (node,)) self.log.debug("Locking node %s" % (node,))
self.sched.zk.lockNode(node) self.sched.zk.lockNode(node)
locked_nodes.append(node) locked_nodes.append(node)
except Exception: except Exception:
@ -90,7 +93,7 @@ class Nodepool(object):
def _updateNodeRequest(self, request, deleted): def _updateNodeRequest(self, request, deleted):
# Return False to indicate that we should stop watching the # Return False to indicate that we should stop watching the
# node. # node.
self.log.debug("Updating node request: %s" % (request,)) self.log.debug("Updating node request %s" % (request,))
if request.uid not in self.requests: if request.uid not in self.requests:
return False return False
@ -114,7 +117,7 @@ class Nodepool(object):
# Called by the scheduler when it wants to accept and lock # Called by the scheduler when it wants to accept and lock
# nodes for (potential) use. # nodes for (potential) use.
self.log.debug("Accepting node request: %s" % (request,)) self.log.info("Accepting node request %s" % (request,))
# First, try to lock the nodes. # First, try to lock the nodes.
locked = False locked = False
@ -127,7 +130,7 @@ class Nodepool(object):
# Regardless of whether locking succeeded, delete the # Regardless of whether locking succeeded, delete the
# request. # request.
self.log.debug("Deleting node request: %s" % (request,)) self.log.debug("Deleting node request %s" % (request,))
try: try:
self.sched.zk.deleteNodeRequest(request) self.sched.zk.deleteNodeRequest(request)
except Exception: except Exception:

View File

@ -767,9 +767,19 @@ class Scheduler(threading.Thread):
def _doBuildCompletedEvent(self, event): def _doBuildCompletedEvent(self, event):
build = event.build build = event.build
# Regardless of any other conditions which might cause us not
# to pass this on to the pipeline manager, make sure we return
# the nodes to nodepool.
try:
nodeset = build.build_set.getJobNodeSet(build.job.name)
self.nodepool.returnNodeset(nodeset)
except Exception:
self.log.exception("Unable to return nodeset %s" % (nodeset,))
if build.build_set is not build.build_set.item.current_build_set: if build.build_set is not build.build_set.item.current_build_set:
self.log.warning("Build %s is not in the current build set" % self.log.debug("Build %s is not in the current build set" %
(build,)) (build,))
return return
pipeline = build.build_set.item.pipeline pipeline = build.build_set.item.pipeline
if not pipeline: if not pipeline: