Add nodepool request framework

This does not actually talk to nodepool, but this adds the nodepool
request flow to the pipeline managers, and establishes a nodepool
class for zuul to interact with nodepool directly.

Change-Id: I41c4d8f86e140786d590698f1a0048c0011382dd
This commit is contained in:
James E. Blair
2016-04-08 17:47:58 -07:00
committed by James E. Blair
parent 78c301afed
commit 8d692398f1
8 changed files with 181 additions and 4 deletions

View File

@@ -55,6 +55,7 @@ import zuul.lib.connections
import zuul.merger.client
import zuul.merger.merger
import zuul.merger.server
import zuul.nodepool
import zuul.reporter.gerrit
import zuul.reporter.smtp
import zuul.source.gerrit
@@ -1003,9 +1004,11 @@ class ZuulTestCase(BaseTestCase):
self.config, self.sched, self.swift)
self.merge_client = zuul.merger.client.MergeClient(
self.config, self.sched)
self.nodepool = zuul.nodepool.Nodepool(self.sched)
self.sched.setLauncher(self.launcher)
self.sched.setMerger(self.merge_client)
self.sched.setNodepool(self.nodepool)
self.webapp = zuul.webapp.WebApp(
self.sched, port=0, listen_address='127.0.0.1')

View File

@@ -16,3 +16,6 @@
- job:
name:
python27
nodes:
- name: controller
image: ubuntu-trusty

View File

@@ -149,6 +149,7 @@ class Server(zuul.cmd.ZuulApp):
import zuul.scheduler
import zuul.launcher.client
import zuul.merger.client
import zuul.nodepool
import zuul.lib.swift
import zuul.webapp
import zuul.rpclistener
@@ -168,6 +169,7 @@ class Server(zuul.cmd.ZuulApp):
gearman = zuul.launcher.client.LaunchClient(self.config, self.sched,
self.swift)
merger = zuul.merger.client.MergeClient(self.config, self.sched)
nodepool = zuul.nodepool.Nodepool(self.sched)
if self.config.has_option('zuul', 'status_expiry'):
cache_expiry = self.config.getint('zuul', 'status_expiry')
@@ -192,6 +194,7 @@ class Server(zuul.cmd.ZuulApp):
self.configure_connections()
self.sched.setLauncher(gearman)
self.sched.setMerger(merger)
self.sched.setNodepool(nodepool)
self.log.info('Starting scheduler')
self.sched.start()

View File

@@ -53,6 +53,10 @@ class JobParser(object):
'logserver-prefix': str,
}
node = {vs.Required('name'): str,
vs.Required('image'): str,
}
job = {vs.Required('name'): str,
'parent': str,
'queue-name': str,
@@ -67,6 +71,7 @@ class JobParser(object):
'files': to_list(str),
'swift': to_list(swift),
'irrelevant-files': to_list(str),
'nodes': [node],
'timeout': int,
'_project_source': str, # used internally
'_project_name': str, # used internally
@@ -494,7 +499,7 @@ class TenantParser(object):
# Note: this is an ordered list -- we wait for cat jobs to
# complete in the order they were launched which is the
# same order they were defined in the main config file.
# This is important for correct inheritence.
# This is important for correct inheritance.
TenantParser.log.debug("Waiting for cat job %s" % (job,))
job.wait()
for fn in ['zuul.yaml', '.zuul.yaml']:

View File

@@ -331,6 +331,19 @@ class BasePipelineManager(object):
self.dequeueItem(item)
self.reportStats(item)
def provisionNodes(self, item):
jobs = self.pipeline.findJobsToRequest(item)
if not jobs:
return False
build_set = item.current_build_set
self.log.debug("Requesting nodes for change %s" % item.change)
for job in jobs:
req = self.sched.nodepool.requestNodes(build_set, job)
self.log.debug("Adding node request %s for job %s to item %s" %
(req, job, item))
build_set.setJobNodeRequest(job.name, req)
return True
def _launchJobs(self, item, jobs):
self.log.debug("Launching jobs for change %s" % item.change)
dependent_items = self.getDependentItems(item)
@@ -360,6 +373,9 @@ class BasePipelineManager(object):
old_build_set = item.current_build_set
if prime and item.current_build_set.ref:
item.resetAllBuilds()
for req in old_build_set.node_requests.values():
self.sched.nodepool.cancelRequest(req)
old_build_set.node_requests = {}
for build in old_build_set.getBuilds():
try:
self.sched.launcher.cancel(build)
@@ -420,8 +436,10 @@ class BasePipelineManager(object):
if actionable:
if not item.current_build_set.ref:
item.current_build_set.setConfiguration()
if actionable and self.launchJobs(item):
changed = True
if self.provisionNodes(item):
changed = True
if self.launchJobs(item):
changed = True
if self.pipeline.didAnyJobFail(item):
failing_reasons.append("at least one job failed")
if (not item.live) and (not item.items_behind):
@@ -522,6 +540,14 @@ class BasePipelineManager(object):
self.log.info("Unable to merge change %s" % item.change)
self.pipeline.setUnableToMerge(item)
def onNodesProvisioned(self, event):
request = event.request
build_set = request.build_set
build_set.jobNodeRequestComplete(request.job.name, request,
request.nodes)
self.log.info("Completed node request %s for job %s of item %s" %
(request, request.job.name, build_set.item))
def reportItem(self, item):
if not item.reported:
# _reportItem() returns True if it failed to report.

View File

@@ -185,6 +185,33 @@ class Pipeline(object):
return []
return self._findJobsToRun(tree.job_trees, item, mutex)
def _findJobsToRequest(self, job_trees, item):
toreq = []
for tree in job_trees:
job = tree.job
if job:
if not job.changeMatches(item.change):
continue
nodes = item.current_build_set.getJobNodes(job.name)
if nodes is None:
req = item.current_build_set.getJobNodeRequest(job.name)
if req is None:
toreq.append(job)
# If there is no job, this is a null job tree, and we should
# run all of its jobs.
if not job:
toreq.extend(self._findJobsToRequest(
tree.job_trees, item))
return toreq
def findJobsToRequest(self, item):
if not item.live:
return []
tree = item.job_tree
if not tree:
return []
return self._findJobsToRequest(tree.job_trees, item)
def haveAllJobsStarted(self, item):
for job in self.getJobs(item):
build = item.current_build_set.getBuild(job.name)
@@ -456,7 +483,7 @@ class Job(object):
attributes = dict(
timeout=None,
# variables={},
# nodes=[],
nodes=[],
# auth={},
workspace=None,
pre_run=None,
@@ -650,6 +677,8 @@ class BuildSet(object):
self.unable_to_merge = False
self.failing_reasons = []
self.merge_state = self.NEW
self.nodes = {} # job -> nodes
self.node_requests = {} # job -> reqs
def __repr__(self):
return '<BuildSet item: %s #builds: %s merge state: %s>' % (
@@ -688,6 +717,24 @@ class BuildSet(object):
keys.sort()
return [self.builds.get(x) for x in keys]
def getJobNodes(self, job_name):
# Return None if not provisioned; [] if no nodes required
return self.nodes.get(job_name)
def setJobNodeRequest(self, job_name, req):
if job_name in self.node_requests:
raise Exception("Prior node request for %s" % (job_name))
self.node_requests[job_name] = req
def getJobNodeRequest(self, job_name):
return self.node_requests.get(job_name)
def jobNodeRequestComplete(self, job_name, req, nodes):
if job_name in self.nodes:
raise Exception("Prior node request for %s" % (job_name))
self.nodes[job_name] = nodes
del self.node_requests[job_name]
class QueueItem(object):
"""A changish inside of a Pipeline queue"""

53
zuul/nodepool.py Normal file
View File

@@ -0,0 +1,53 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from uuid import uuid4
class Node(object):
def __init__(self, name, image):
self.name = name
self.image = image
class Request(object):
def __init__(self, build_set, job, nodes):
self.build_set = build_set
self.job = job
self.nodes = nodes
self.id = uuid4().hex
class Nodepool(object):
def __init__(self, scheduler):
self.requests = {}
self.sched = scheduler
def requestNodes(self, build_set, job):
nodes = job.nodes
nodes = [Node(node['name'], node['image']) for node in nodes]
req = Request(build_set, job, nodes)
self.requests[req.id] = req
self._requestComplete(req.id)
return req
def cancelRequest(self, request):
if request in self.requests:
self.requests.remove(request)
def returnNodes(self, nodes, used=True):
pass
def _requestComplete(self, id):
req = self.requests[id]
del self.requests[id]
self.sched.onNodesProvisioned(req)

View File

@@ -199,6 +199,17 @@ class MergeCompletedEvent(ResultEvent):
self.commit = commit
class NodesProvisionedEvent(ResultEvent):
"""Nodes have been provisioned for a build_set
:arg BuildSet build_set: The build_set which has nodes.
:arg list of Node objects nodes: The provisioned nodes
"""
def __init__(self, request):
self.request = request
def toList(item):
if not item:
return []
@@ -269,6 +280,9 @@ class Scheduler(threading.Thread):
def setMerger(self, merger):
self.merger = merger
def setNodepool(self, nodepool):
self.nodepool = nodepool
def getProject(self, name, create_foreign=False):
self.layout_lock.acquire()
p = None
@@ -352,6 +366,13 @@ class Scheduler(threading.Thread):
self.result_event_queue.put(event)
self.wake_event.set()
def onNodesProvisioned(self, req):
self.log.debug("Adding nodes provisioned event for build set: %s" %
req.build_set)
event = NodesProvisionedEvent(req)
self.result_event_queue.put(event)
self.wake_event.set()
def reconfigure(self, config):
self.log.debug("Prepare to reconfigure")
event = ReconfigureEvent(config)
@@ -718,6 +739,8 @@ class Scheduler(threading.Thread):
self._doBuildCompletedEvent(event)
elif isinstance(event, MergeCompletedEvent):
self._doMergeCompletedEvent(event)
elif isinstance(event, NodesProvisionedEvent):
self._doNodesProvisionedEvent(event)
else:
self.log.error("Unable to handle event %s" % event)
finally:
@@ -773,6 +796,20 @@ class Scheduler(threading.Thread):
return
pipeline.manager.onMergeCompleted(event)
def _doNodesProvisionedEvent(self, event):
request = event.request
build_set = request.build_set
if build_set is not build_set.item.current_build_set:
self.log.warning("Build set %s is not current" % (build_set,))
self.nodepool.returnNodes(request.nodes, used=False)
return
pipeline = build_set.item.pipeline
if not pipeline:
self.log.warning("Build set %s is not associated with a pipeline" %
(build_set,))
return
pipeline.manager.onNodesProvisioned(event)
def formatStatusJSON(self):
# TODOv3(jeblair): use tenants
if self.config.has_option('zuul', 'url_pattern'):