Merge "Annotate node request processing with event id"

This commit is contained in:
Zuul 2019-05-22 22:04:02 +00:00 committed by Gerrit Code Review
commit dd954db3cc
4 changed files with 61 additions and 44 deletions

View File

@ -378,19 +378,21 @@ class PipelineManager(object):
change.commit_needs_changes = dependencies
def provisionNodes(self, item):
log = get_annotated_logger(self.log, item.event)
jobs = item.findJobsToRequest(item.pipeline.tenant.semaphore_handler)
if not jobs:
return False
build_set = item.current_build_set
self.log.debug("Requesting nodes for change %s" % item.change)
log.debug("Requesting nodes for change %s", item.change)
if self.sched.use_relative_priority:
priority = item.getNodePriority()
else:
priority = 0
for job in jobs:
req = self.sched.nodepool.requestNodes(build_set, job, priority)
self.log.debug("Adding node request %s for job %s to item %s" %
(req, job, item))
req = self.sched.nodepool.requestNodes(
build_set, job, priority, event=item.event)
log.debug("Adding node request %s for job %s to item %s",
req, job, item)
build_set.setJobNodeRequest(job.name, req)
return True
@ -928,21 +930,22 @@ class PipelineManager(object):
def onNodesProvisioned(self, event):
# TODOv3(jeblair): handle provisioning failure here
request = event.request
log = get_annotated_logger(self.log, request.event_id)
build_set = request.build_set
build_set.jobNodeRequestComplete(request.job.name,
request.nodeset)
if request.failed or not request.fulfilled:
self.log.info("Node request %s: failure for %s" %
(request, request.job.name,))
log.info("Node request %s: failure for %s",
request, request.job.name)
build_set.item.setNodeRequestFailure(request.job)
self._resumeBuilds(request.build_set)
tenant = build_set.item.pipeline.tenant
tenant.semaphore_handler.release(build_set.item, request.job)
self.log.info("Completed node request %s for job %s of item %s "
"with nodes %s" %
(request, request.job, build_set.item,
request.nodeset))
log.info("Completed node request %s for job %s of item %s "
"with nodes %s",
request, request.job, build_set.item, request.nodeset)
def reportItem(self, item):
if not item.reported:

View File

@ -704,7 +704,8 @@ class NodeSet(ConfigObject):
class NodeRequest(object):
"""A request for a set of nodes."""
def __init__(self, requestor, build_set, job, nodeset, relative_priority):
def __init__(self, requestor, build_set, job, nodeset, relative_priority,
event=None):
self.requestor = requestor
self.build_set = build_set
self.job = job
@ -719,6 +720,10 @@ class NodeRequest(object):
self.provider = self._getPausedParentProvider()
self.id = None
self._zk_data = {} # Data that we read back from ZK
if event is not None:
self.event_id = event.zuul_event_id
else:
self.event_id = None
# Zuul internal flags (not stored in ZK so they are not
# overwritten).
self.failed = False
@ -786,6 +791,7 @@ class NodeRequest(object):
d['state'] = self.state
d['state_time'] = self.state_time
d['relative_priority'] = self.relative_priority
d['event_id'] = self.event_id
return d
def updateFromDict(self, data):

View File

@ -13,6 +13,7 @@
import logging
from zuul import model
from zuul.lib.logutil import get_annotated_logger
from zuul.zk import LockException
@ -59,34 +60,36 @@ class Nodepool(object):
pipe.gauge('zuul.nodepool.current_requests', len(self.requests))
pipe.send()
def requestNodes(self, build_set, job, relative_priority):
def requestNodes(self, build_set, job, relative_priority, event=None):
log = get_annotated_logger(self.log, event)
# Create a copy of the nodeset to represent the actual nodes
# returned by nodepool.
nodeset = job.nodeset.copy()
req = model.NodeRequest(self.sched.hostname, build_set, job,
nodeset, relative_priority)
nodeset, relative_priority, event=event)
self.requests[req.uid] = req
if nodeset.nodes:
self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
# Logged after submission so that we have the request id
self.log.info("Submitted node request %s" % (req,))
log.info("Submitted node request %s", req)
self.emitStats(req)
else:
self.log.info("Fulfilling empty node request %s" % (req,))
log.info("Fulfilling empty node request %s", req)
req.state = model.STATE_FULFILLED
self.sched.onNodesProvisioned(req)
del self.requests[req.uid]
return req
def cancelRequest(self, request):
self.log.info("Canceling node request %s" % (request,))
log = get_annotated_logger(self.log, request.event_id)
log.info("Canceling node request %s", request)
if request.uid in self.requests:
request.canceled = True
try:
self.sched.zk.deleteNodeRequest(request)
except Exception:
self.log.exception("Error deleting node request:")
log.exception("Error deleting node request:")
def reviseRequest(self, request, relative_priority=None):
'''Attempt to update the node request, if it is not currently being
@ -97,28 +100,29 @@ class Nodepool(object):
priority to set on the request.
'''
log = get_annotated_logger(self.log, request.event_id)
if relative_priority is None:
return
try:
self.sched.zk.lockNodeRequest(request, blocking=False)
except LockException:
# It may be locked by nodepool, which is fine.
self.log.debug("Unable to revise locked node request %s", request)
log.debug("Unable to revise locked node request %s", request)
return False
try:
old_priority = request.relative_priority
request.relative_priority = relative_priority
self.sched.zk.storeNodeRequest(request)
self.log.debug("Revised relative priority of "
"node request %s from %s to %s",
request, old_priority, relative_priority)
log.debug("Revised relative priority of "
"node request %s from %s to %s",
request, old_priority, relative_priority)
except Exception:
self.log.exception("Unable to update node request %s", request)
log.exception("Unable to update node request %s", request)
finally:
try:
self.sched.zk.unlockNodeRequest(request)
except Exception:
self.log.exception("Unable to unlock node request %s", request)
log.exception("Unable to unlock node request %s", request)
def holdNodeSet(self, nodeset, autohold_key):
'''
@ -218,12 +222,13 @@ class Nodepool(object):
raise
def _updateNodeRequest(self, request, deleted):
log = get_annotated_logger(self.log, request.event_id)
# Return False to indicate that we should stop watching the
# node.
self.log.debug("Updating node request %s" % (request,))
log.debug("Updating node request %s", request)
if request.uid not in self.requests:
self.log.debug("Request %s is unknown" % (request.uid,))
log.debug("Request %s is unknown", request.uid)
return False
if request.canceled:
@ -233,11 +238,11 @@ class Nodepool(object):
# TODOv3(jeblair): handle allocation failure
if deleted:
self.log.debug("Resubmitting lost node request %s" % (request,))
log.debug("Resubmitting lost node request %s", request)
request.id = None
self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
self.log.info("Node request %s %s" % (request, request.state))
log.info("Node request %s %s", request, request.state)
# Give our results to the scheduler.
self.sched.onNodesProvisioned(request)
@ -251,20 +256,22 @@ class Nodepool(object):
return True
def acceptNodes(self, request, request_id):
log = get_annotated_logger(self.log, request.event_id)
# Called by the scheduler when it wants to accept and lock
# nodes for (potential) use. Return False if there is a
# problem with the request (canceled or retrying), True if it
# is ready to be acted upon (success or failure).
self.log.info("Accepting node request %s" % (request,))
log.info("Accepting node request %s", request)
if request_id != request.id:
self.log.info("Skipping node accept for %s (resubmitted as %s)",
request_id, request.id)
log.info("Skipping node accept for %s (resubmitted as %s)",
request_id, request.id)
return False
if request.canceled:
self.log.info("Ignoring canceled node request %s" % (request,))
log.info("Ignoring canceled node request %s", request)
# The request was already deleted when it was canceled
return False
@ -281,8 +288,8 @@ class Nodepool(object):
# nodes in that situation.
try:
if not self.sched.zk.nodeRequestExists(request):
self.log.info("Request %s no longer exists, resubmitting",
request.id)
log.info("Request %s no longer exists, resubmitting",
request.id)
request.id = None
request.state = model.STATE_REQUESTED
self.requests[request.uid] = request
@ -295,7 +302,7 @@ class Nodepool(object):
# request probably doesn't make sense at this point in time as it
# is likely to directly fail again. So just log the problem
# with zookeeper and fail here.
self.log.exception("Error getting node request %s:" % request_id)
log.exception("Error getting node request %s:", request_id)
request.failed = True
return True
@ -306,16 +313,16 @@ class Nodepool(object):
self.lockNodeSet(request.nodeset, request.id)
locked = True
except Exception:
self.log.exception("Error locking nodes:")
log.exception("Error locking nodes:")
request.failed = True
# Regardless of whether locking (or even the request)
# succeeded, delete the request.
self.log.debug("Deleting node request %s" % (request,))
log.debug("Deleting node request %s", request)
try:
self.sched.zk.deleteNodeRequest(request)
except Exception:
self.log.exception("Error deleting node request:")
log.exception("Error deleting node request:")
request.failed = True
# If deleting the request failed, and we did lock the
# nodes, unlock the nodes since we're not going to use

View File

@ -1327,29 +1327,30 @@ class Scheduler(threading.Thread):
request = event.request
request_id = event.request_id
build_set = request.build_set
log = get_annotated_logger(self.log, request.event_id)
ready = self.nodepool.acceptNodes(request, request_id)
if not ready:
return
if build_set is not build_set.item.current_build_set:
self.log.warning("Build set %s is not current "
"for node request %s", build_set, request)
log.warning("Build set %s is not current "
"for node request %s", build_set, request)
if request.fulfilled:
self.nodepool.returnNodeSet(request.nodeset)
return
if request.job.name not in [x.name for x in build_set.item.getJobs()]:
self.log.warning("Item %s does not contain job %s "
"for node request %s",
build_set.item, request.job.name, request)
log.warning("Item %s does not contain job %s "
"for node request %s",
build_set.item, request.job.name, request)
build_set.removeJobNodeRequest(request.job.name)
if request.fulfilled:
self.nodepool.returnNodeSet(request.nodeset)
return
pipeline = build_set.item.pipeline
if not pipeline:
self.log.warning("Build set %s is not associated with a pipeline "
"for node request %s", build_set, request)
log.warning("Build set %s is not associated with a pipeline "
"for node request %s", build_set, request)
if request.fulfilled:
self.nodepool.returnNodeSet(request.nodeset)
return