Annotate node request processing with event id
Adding the event ids to the ndoeset processing makes it much easier to trace events through the system. This also adds the event id as metadata to the node request so nodepool can continue annotating logs while working on the node request. Change-Id: I2fac44564552d1fd303b76b994f4c4c16c1370b1
This commit is contained in:
parent
e161ffb3bc
commit
c825e4769f
|
@ -378,19 +378,21 @@ class PipelineManager(object):
|
||||||
change.commit_needs_changes = dependencies
|
change.commit_needs_changes = dependencies
|
||||||
|
|
||||||
def provisionNodes(self, item):
|
def provisionNodes(self, item):
|
||||||
|
log = get_annotated_logger(self.log, item.event)
|
||||||
jobs = item.findJobsToRequest(item.pipeline.tenant.semaphore_handler)
|
jobs = item.findJobsToRequest(item.pipeline.tenant.semaphore_handler)
|
||||||
if not jobs:
|
if not jobs:
|
||||||
return False
|
return False
|
||||||
build_set = item.current_build_set
|
build_set = item.current_build_set
|
||||||
self.log.debug("Requesting nodes for change %s" % item.change)
|
log.debug("Requesting nodes for change %s", item.change)
|
||||||
if self.sched.use_relative_priority:
|
if self.sched.use_relative_priority:
|
||||||
priority = item.getNodePriority()
|
priority = item.getNodePriority()
|
||||||
else:
|
else:
|
||||||
priority = 0
|
priority = 0
|
||||||
for job in jobs:
|
for job in jobs:
|
||||||
req = self.sched.nodepool.requestNodes(build_set, job, priority)
|
req = self.sched.nodepool.requestNodes(
|
||||||
self.log.debug("Adding node request %s for job %s to item %s" %
|
build_set, job, priority, event=item.event)
|
||||||
(req, job, item))
|
log.debug("Adding node request %s for job %s to item %s",
|
||||||
|
req, job, item)
|
||||||
build_set.setJobNodeRequest(job.name, req)
|
build_set.setJobNodeRequest(job.name, req)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -928,21 +930,22 @@ class PipelineManager(object):
|
||||||
def onNodesProvisioned(self, event):
|
def onNodesProvisioned(self, event):
|
||||||
# TODOv3(jeblair): handle provisioning failure here
|
# TODOv3(jeblair): handle provisioning failure here
|
||||||
request = event.request
|
request = event.request
|
||||||
|
log = get_annotated_logger(self.log, request.event_id)
|
||||||
|
|
||||||
build_set = request.build_set
|
build_set = request.build_set
|
||||||
build_set.jobNodeRequestComplete(request.job.name,
|
build_set.jobNodeRequestComplete(request.job.name,
|
||||||
request.nodeset)
|
request.nodeset)
|
||||||
if request.failed or not request.fulfilled:
|
if request.failed or not request.fulfilled:
|
||||||
self.log.info("Node request %s: failure for %s" %
|
log.info("Node request %s: failure for %s",
|
||||||
(request, request.job.name,))
|
request, request.job.name)
|
||||||
build_set.item.setNodeRequestFailure(request.job)
|
build_set.item.setNodeRequestFailure(request.job)
|
||||||
self._resumeBuilds(request.build_set)
|
self._resumeBuilds(request.build_set)
|
||||||
tenant = build_set.item.pipeline.tenant
|
tenant = build_set.item.pipeline.tenant
|
||||||
tenant.semaphore_handler.release(build_set.item, request.job)
|
tenant.semaphore_handler.release(build_set.item, request.job)
|
||||||
|
|
||||||
self.log.info("Completed node request %s for job %s of item %s "
|
log.info("Completed node request %s for job %s of item %s "
|
||||||
"with nodes %s" %
|
"with nodes %s",
|
||||||
(request, request.job, build_set.item,
|
request, request.job, build_set.item, request.nodeset)
|
||||||
request.nodeset))
|
|
||||||
|
|
||||||
def reportItem(self, item):
|
def reportItem(self, item):
|
||||||
if not item.reported:
|
if not item.reported:
|
||||||
|
|
|
@ -704,7 +704,8 @@ class NodeSet(ConfigObject):
|
||||||
class NodeRequest(object):
|
class NodeRequest(object):
|
||||||
"""A request for a set of nodes."""
|
"""A request for a set of nodes."""
|
||||||
|
|
||||||
def __init__(self, requestor, build_set, job, nodeset, relative_priority):
|
def __init__(self, requestor, build_set, job, nodeset, relative_priority,
|
||||||
|
event=None):
|
||||||
self.requestor = requestor
|
self.requestor = requestor
|
||||||
self.build_set = build_set
|
self.build_set = build_set
|
||||||
self.job = job
|
self.job = job
|
||||||
|
@ -719,6 +720,10 @@ class NodeRequest(object):
|
||||||
self.provider = self._getPausedParentProvider()
|
self.provider = self._getPausedParentProvider()
|
||||||
self.id = None
|
self.id = None
|
||||||
self._zk_data = {} # Data that we read back from ZK
|
self._zk_data = {} # Data that we read back from ZK
|
||||||
|
if event is not None:
|
||||||
|
self.event_id = event.zuul_event_id
|
||||||
|
else:
|
||||||
|
self.event_id = None
|
||||||
# Zuul internal flags (not stored in ZK so they are not
|
# Zuul internal flags (not stored in ZK so they are not
|
||||||
# overwritten).
|
# overwritten).
|
||||||
self.failed = False
|
self.failed = False
|
||||||
|
@ -786,6 +791,7 @@ class NodeRequest(object):
|
||||||
d['state'] = self.state
|
d['state'] = self.state
|
||||||
d['state_time'] = self.state_time
|
d['state_time'] = self.state_time
|
||||||
d['relative_priority'] = self.relative_priority
|
d['relative_priority'] = self.relative_priority
|
||||||
|
d['event_id'] = self.event_id
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def updateFromDict(self, data):
|
def updateFromDict(self, data):
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from zuul import model
|
from zuul import model
|
||||||
|
from zuul.lib.logutil import get_annotated_logger
|
||||||
from zuul.zk import LockException
|
from zuul.zk import LockException
|
||||||
|
|
||||||
|
|
||||||
|
@ -59,34 +60,36 @@ class Nodepool(object):
|
||||||
pipe.gauge('zuul.nodepool.current_requests', len(self.requests))
|
pipe.gauge('zuul.nodepool.current_requests', len(self.requests))
|
||||||
pipe.send()
|
pipe.send()
|
||||||
|
|
||||||
def requestNodes(self, build_set, job, relative_priority):
|
def requestNodes(self, build_set, job, relative_priority, event=None):
|
||||||
|
log = get_annotated_logger(self.log, event)
|
||||||
# Create a copy of the nodeset to represent the actual nodes
|
# Create a copy of the nodeset to represent the actual nodes
|
||||||
# returned by nodepool.
|
# returned by nodepool.
|
||||||
nodeset = job.nodeset.copy()
|
nodeset = job.nodeset.copy()
|
||||||
req = model.NodeRequest(self.sched.hostname, build_set, job,
|
req = model.NodeRequest(self.sched.hostname, build_set, job,
|
||||||
nodeset, relative_priority)
|
nodeset, relative_priority, event=event)
|
||||||
self.requests[req.uid] = req
|
self.requests[req.uid] = req
|
||||||
|
|
||||||
if nodeset.nodes:
|
if nodeset.nodes:
|
||||||
self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
|
self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
|
||||||
# Logged after submission so that we have the request id
|
# Logged after submission so that we have the request id
|
||||||
self.log.info("Submitted node request %s" % (req,))
|
log.info("Submitted node request %s", req)
|
||||||
self.emitStats(req)
|
self.emitStats(req)
|
||||||
else:
|
else:
|
||||||
self.log.info("Fulfilling empty node request %s" % (req,))
|
log.info("Fulfilling empty node request %s", req)
|
||||||
req.state = model.STATE_FULFILLED
|
req.state = model.STATE_FULFILLED
|
||||||
self.sched.onNodesProvisioned(req)
|
self.sched.onNodesProvisioned(req)
|
||||||
del self.requests[req.uid]
|
del self.requests[req.uid]
|
||||||
return req
|
return req
|
||||||
|
|
||||||
def cancelRequest(self, request):
|
def cancelRequest(self, request):
|
||||||
self.log.info("Canceling node request %s" % (request,))
|
log = get_annotated_logger(self.log, request.event_id)
|
||||||
|
log.info("Canceling node request %s", request)
|
||||||
if request.uid in self.requests:
|
if request.uid in self.requests:
|
||||||
request.canceled = True
|
request.canceled = True
|
||||||
try:
|
try:
|
||||||
self.sched.zk.deleteNodeRequest(request)
|
self.sched.zk.deleteNodeRequest(request)
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception("Error deleting node request:")
|
log.exception("Error deleting node request:")
|
||||||
|
|
||||||
def reviseRequest(self, request, relative_priority=None):
|
def reviseRequest(self, request, relative_priority=None):
|
||||||
'''Attempt to update the node request, if it is not currently being
|
'''Attempt to update the node request, if it is not currently being
|
||||||
|
@ -97,28 +100,29 @@ class Nodepool(object):
|
||||||
priority to set on the request.
|
priority to set on the request.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
log = get_annotated_logger(self.log, request.event_id)
|
||||||
if relative_priority is None:
|
if relative_priority is None:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
self.sched.zk.lockNodeRequest(request, blocking=False)
|
self.sched.zk.lockNodeRequest(request, blocking=False)
|
||||||
except LockException:
|
except LockException:
|
||||||
# It may be locked by nodepool, which is fine.
|
# It may be locked by nodepool, which is fine.
|
||||||
self.log.debug("Unable to revise locked node request %s", request)
|
log.debug("Unable to revise locked node request %s", request)
|
||||||
return False
|
return False
|
||||||
try:
|
try:
|
||||||
old_priority = request.relative_priority
|
old_priority = request.relative_priority
|
||||||
request.relative_priority = relative_priority
|
request.relative_priority = relative_priority
|
||||||
self.sched.zk.storeNodeRequest(request)
|
self.sched.zk.storeNodeRequest(request)
|
||||||
self.log.debug("Revised relative priority of "
|
log.debug("Revised relative priority of "
|
||||||
"node request %s from %s to %s",
|
"node request %s from %s to %s",
|
||||||
request, old_priority, relative_priority)
|
request, old_priority, relative_priority)
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception("Unable to update node request %s", request)
|
log.exception("Unable to update node request %s", request)
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
self.sched.zk.unlockNodeRequest(request)
|
self.sched.zk.unlockNodeRequest(request)
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception("Unable to unlock node request %s", request)
|
log.exception("Unable to unlock node request %s", request)
|
||||||
|
|
||||||
def holdNodeSet(self, nodeset, autohold_key):
|
def holdNodeSet(self, nodeset, autohold_key):
|
||||||
'''
|
'''
|
||||||
|
@ -218,12 +222,13 @@ class Nodepool(object):
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _updateNodeRequest(self, request, deleted):
|
def _updateNodeRequest(self, request, deleted):
|
||||||
|
log = get_annotated_logger(self.log, request.event_id)
|
||||||
# Return False to indicate that we should stop watching the
|
# Return False to indicate that we should stop watching the
|
||||||
# node.
|
# node.
|
||||||
self.log.debug("Updating node request %s" % (request,))
|
log.debug("Updating node request %s", request)
|
||||||
|
|
||||||
if request.uid not in self.requests:
|
if request.uid not in self.requests:
|
||||||
self.log.debug("Request %s is unknown" % (request.uid,))
|
log.debug("Request %s is unknown", request.uid)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if request.canceled:
|
if request.canceled:
|
||||||
|
@ -233,11 +238,11 @@ class Nodepool(object):
|
||||||
|
|
||||||
# TODOv3(jeblair): handle allocation failure
|
# TODOv3(jeblair): handle allocation failure
|
||||||
if deleted:
|
if deleted:
|
||||||
self.log.debug("Resubmitting lost node request %s" % (request,))
|
log.debug("Resubmitting lost node request %s", request)
|
||||||
request.id = None
|
request.id = None
|
||||||
self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
|
self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
|
||||||
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
|
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
|
||||||
self.log.info("Node request %s %s" % (request, request.state))
|
log.info("Node request %s %s", request, request.state)
|
||||||
|
|
||||||
# Give our results to the scheduler.
|
# Give our results to the scheduler.
|
||||||
self.sched.onNodesProvisioned(request)
|
self.sched.onNodesProvisioned(request)
|
||||||
|
@ -251,20 +256,22 @@ class Nodepool(object):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def acceptNodes(self, request, request_id):
|
def acceptNodes(self, request, request_id):
|
||||||
|
log = get_annotated_logger(self.log, request.event_id)
|
||||||
|
|
||||||
# Called by the scheduler when it wants to accept and lock
|
# Called by the scheduler when it wants to accept and lock
|
||||||
# nodes for (potential) use. Return False if there is a
|
# nodes for (potential) use. Return False if there is a
|
||||||
# problem with the request (canceled or retrying), True if it
|
# problem with the request (canceled or retrying), True if it
|
||||||
# is ready to be acted upon (success or failure).
|
# is ready to be acted upon (success or failure).
|
||||||
|
|
||||||
self.log.info("Accepting node request %s" % (request,))
|
log.info("Accepting node request %s", request)
|
||||||
|
|
||||||
if request_id != request.id:
|
if request_id != request.id:
|
||||||
self.log.info("Skipping node accept for %s (resubmitted as %s)",
|
log.info("Skipping node accept for %s (resubmitted as %s)",
|
||||||
request_id, request.id)
|
request_id, request.id)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if request.canceled:
|
if request.canceled:
|
||||||
self.log.info("Ignoring canceled node request %s" % (request,))
|
log.info("Ignoring canceled node request %s", request)
|
||||||
# The request was already deleted when it was canceled
|
# The request was already deleted when it was canceled
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -281,8 +288,8 @@ class Nodepool(object):
|
||||||
# nodes in that situation.
|
# nodes in that situation.
|
||||||
try:
|
try:
|
||||||
if not self.sched.zk.nodeRequestExists(request):
|
if not self.sched.zk.nodeRequestExists(request):
|
||||||
self.log.info("Request %s no longer exists, resubmitting",
|
log.info("Request %s no longer exists, resubmitting",
|
||||||
request.id)
|
request.id)
|
||||||
request.id = None
|
request.id = None
|
||||||
request.state = model.STATE_REQUESTED
|
request.state = model.STATE_REQUESTED
|
||||||
self.requests[request.uid] = request
|
self.requests[request.uid] = request
|
||||||
|
@ -295,7 +302,7 @@ class Nodepool(object):
|
||||||
# request probably doesn't make sense at this point in time as it
|
# request probably doesn't make sense at this point in time as it
|
||||||
# is likely to directly fail again. So just log the problem
|
# is likely to directly fail again. So just log the problem
|
||||||
# with zookeeper and fail here.
|
# with zookeeper and fail here.
|
||||||
self.log.exception("Error getting node request %s:" % request_id)
|
log.exception("Error getting node request %s:", request_id)
|
||||||
request.failed = True
|
request.failed = True
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -306,16 +313,16 @@ class Nodepool(object):
|
||||||
self.lockNodeSet(request.nodeset, request.id)
|
self.lockNodeSet(request.nodeset, request.id)
|
||||||
locked = True
|
locked = True
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception("Error locking nodes:")
|
log.exception("Error locking nodes:")
|
||||||
request.failed = True
|
request.failed = True
|
||||||
|
|
||||||
# Regardless of whether locking (or even the request)
|
# Regardless of whether locking (or even the request)
|
||||||
# succeeded, delete the request.
|
# succeeded, delete the request.
|
||||||
self.log.debug("Deleting node request %s" % (request,))
|
log.debug("Deleting node request %s", request)
|
||||||
try:
|
try:
|
||||||
self.sched.zk.deleteNodeRequest(request)
|
self.sched.zk.deleteNodeRequest(request)
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception("Error deleting node request:")
|
log.exception("Error deleting node request:")
|
||||||
request.failed = True
|
request.failed = True
|
||||||
# If deleting the request failed, and we did lock the
|
# If deleting the request failed, and we did lock the
|
||||||
# nodes, unlock the nodes since we're not going to use
|
# nodes, unlock the nodes since we're not going to use
|
||||||
|
|
|
@ -1327,29 +1327,30 @@ class Scheduler(threading.Thread):
|
||||||
request = event.request
|
request = event.request
|
||||||
request_id = event.request_id
|
request_id = event.request_id
|
||||||
build_set = request.build_set
|
build_set = request.build_set
|
||||||
|
log = get_annotated_logger(self.log, request.event_id)
|
||||||
|
|
||||||
ready = self.nodepool.acceptNodes(request, request_id)
|
ready = self.nodepool.acceptNodes(request, request_id)
|
||||||
if not ready:
|
if not ready:
|
||||||
return
|
return
|
||||||
|
|
||||||
if build_set is not build_set.item.current_build_set:
|
if build_set is not build_set.item.current_build_set:
|
||||||
self.log.warning("Build set %s is not current "
|
log.warning("Build set %s is not current "
|
||||||
"for node request %s", build_set, request)
|
"for node request %s", build_set, request)
|
||||||
if request.fulfilled:
|
if request.fulfilled:
|
||||||
self.nodepool.returnNodeSet(request.nodeset)
|
self.nodepool.returnNodeSet(request.nodeset)
|
||||||
return
|
return
|
||||||
if request.job.name not in [x.name for x in build_set.item.getJobs()]:
|
if request.job.name not in [x.name for x in build_set.item.getJobs()]:
|
||||||
self.log.warning("Item %s does not contain job %s "
|
log.warning("Item %s does not contain job %s "
|
||||||
"for node request %s",
|
"for node request %s",
|
||||||
build_set.item, request.job.name, request)
|
build_set.item, request.job.name, request)
|
||||||
build_set.removeJobNodeRequest(request.job.name)
|
build_set.removeJobNodeRequest(request.job.name)
|
||||||
if request.fulfilled:
|
if request.fulfilled:
|
||||||
self.nodepool.returnNodeSet(request.nodeset)
|
self.nodepool.returnNodeSet(request.nodeset)
|
||||||
return
|
return
|
||||||
pipeline = build_set.item.pipeline
|
pipeline = build_set.item.pipeline
|
||||||
if not pipeline:
|
if not pipeline:
|
||||||
self.log.warning("Build set %s is not associated with a pipeline "
|
log.warning("Build set %s is not associated with a pipeline "
|
||||||
"for node request %s", build_set, request)
|
"for node request %s", build_set, request)
|
||||||
if request.fulfilled:
|
if request.fulfilled:
|
||||||
self.nodepool.returnNodeSet(request.nodeset)
|
self.nodepool.returnNodeSet(request.nodeset)
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue