WIP: Lock/unlock nodes on executor server

Currently, the nodes are locked in the scheduler/pipeline manager before
the actual build is created in the executor client. When the nodes are
locked, the corresponding NodeRequest is also deleted.

With this change, the executor will lock the nodes directly before
starting the build and unlock them when the build is completed.

This also moves the autohold processing from the scheduler to the
executor server. To make this work, the executor now also determines
build attempts and sets the RETRY_LIMIT result if necessary.

Change-Id: I7392ce47e84dcfb8079c16e34e0ed2062ebf4136
This commit is contained in:
Felix Edel 2021-02-03 14:31:26 +01:00
parent 8bf5bcd1bb
commit af0ffbfdd4
12 changed files with 403 additions and 236 deletions

View File

@ -3210,6 +3210,15 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
job.arguments = json.dumps(args)
super(RecordingExecutorServer, self).executeJob(job)
def lockNodes(self, job, event):
locked = super().lockNodes(job, event)
if not locked:
# If the nodes could not be locked, directly remove the build
# from the list of running builds as it will never be executed.
build = self.job_builds.get(job.unique)
self.running_builds.remove(build)
return locked
def stopJob(self, job):
self.log.debug("handle stop")
parameters = json.loads(job.arguments)

View File

@ -436,8 +436,16 @@ class TestAnsibleJob(ZuulTestCase):
def setUp(self):
super(TestAnsibleJob, self).setUp()
ansible_version = AnsibleManager().default_version
args = '{"ansible_version": "%s"}' % ansible_version
job = gear.TextJob('executor:execute', args, unique='test')
args = {
"ansible_version": ansible_version,
"nodeset": {
"name": "dummy-node",
"node_request_id": 0,
"nodes": [],
"groups": [],
},
}
job = gear.TextJob('executor:execute', json.dumps(args), unique='test')
self.test_job = zuul.executor.server.AnsibleJob(self.executor_server,
job)

View File

@ -80,9 +80,14 @@ class TestNodepool(BaseTestCase):
nodeset = request.nodeset
for node in nodeset.getNodes():
self.assertIsNotNone(node.lock)
self.assertEqual(node.state, 'ready')
# Lock the nodes
self.nodepool.lockNodeSet(nodeset)
for node in nodeset.getNodes():
self.assertIsNotNone(node.lock)
# Mark the nodes in use
self.nodepool.useNodeSet(nodeset)
for node in nodeset.getNodes():

View File

@ -30,7 +30,6 @@ from kazoo.exceptions import NoNodeError
import git
import testtools
from testtools.matchers import AfterPreprocessing, MatchesRegex
from zuul.scheduler import Scheduler
import fixtures
import zuul.change_matcher
@ -45,7 +44,8 @@ from tests.base import (
repack_repo,
simple_layout,
iterate_timeout,
RecordingExecutorServer, TestConnectionRegistry,
RecordingExecutorServer,
TestConnectionRegistry,
)
@ -2279,10 +2279,10 @@ class TestScheduler(ZuulTestCase):
@simple_layout('layouts/autohold.yaml')
def test_autohold_request_expiration(self):
orig_exp = Scheduler.EXPIRED_HOLD_REQUEST_TTL
orig_exp = RecordingExecutorServer.EXPIRED_HOLD_REQUEST_TTL
def reset_exp():
self.scheds.first.sched.EXPIRED_HOLD_REQUEST_TTL = orig_exp
self.executor_server.EXPIRED_HOLD_REQUEST_TTL = orig_exp
self.addCleanup(reset_exp)
@ -2313,7 +2313,7 @@ class TestScheduler(ZuulTestCase):
# Temporarily shorten hold time so that the hold request can be
# auto-deleted (which is done on another test failure). And wait
# long enough for nodes to expire and request to delete.
self.scheds.first.sched.EXPIRED_HOLD_REQUEST_TTL = 1
self.executor_server.EXPIRED_HOLD_REQUEST_TTL = 1
time.sleep(3)
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')

View File

@ -346,6 +346,7 @@ class TestWeb(BaseTestWeb):
'nodeset': {
'groups': [],
'name': '',
'node_request_id': None,
'nodes': [{'comment': None,
'hold_job': None,
'id': None,
@ -393,6 +394,7 @@ class TestWeb(BaseTestWeb):
'nodeset': {
'groups': [],
'name': '',
'node_request_id': None,
'nodes': [{'comment': None,
'hold_job': None,
'id': None,

View File

@ -172,7 +172,6 @@ class ExecutorClient(object):
zuul_event_id=item.event.zuul_event_id,
)
build.parameters = params
build.nodeset = nodeset
log.debug("Adding build %s of job %s to item %s",
build, job, item)
@ -198,6 +197,15 @@ class ExecutorClient(object):
# is up to date.
attempts = build.build_set.getTries(job.name)
params["zuul"]['attempts'] = attempts
params['max_attempts'] = job.attempts
# Store the nodeset in the job arguments, so we can lock it on the
# executor side.
# TODO (felix): The nodeset shouldn't be ne necessary anymore as we are
# now updating the nodes directly on the noderequest in the executor.
params["nodeset"] = nodeset.toDict()
params["noderequest_id"] = build.build_set.getJobNodeRequest(
job.name).id
functions = getGearmanFunctions(self.gearman)
function_name = 'executor:execute'

View File

@ -59,9 +59,14 @@ from zuul.executor.sensors.ram import RAMSensor
from zuul.lib import commandsocket
from zuul.merger.server import BaseMergeServer, RepoLocks
from zuul.model import (
BuildCompletedEvent, BuildPausedEvent, BuildStartedEvent, BuildStatusEvent
BuildCompletedEvent,
BuildPausedEvent,
BuildStartedEvent,
BuildStatusEvent,
NodeSet,
SCHEME_GOLANG,
)
import zuul.model
from zuul.nodepool import Nodepool
from zuul.zk.event_queues import PipelineResultEventQueue
BUFFER_LINES_FOR_SYNTAX = 200
@ -827,12 +832,14 @@ class AnsibleJob(object):
self.ansible_version = self.arguments.get('ansible_version')
# TODO(corvus): Remove default setting after 4.3.0; this is to handle
# scheduler/executor version skew.
self.scheme = self.arguments.get('workspace_scheme',
zuul.model.SCHEME_GOLANG)
self.scheme = self.arguments.get('workspace_scheme', SCHEME_GOLANG)
self.log = get_annotated_logger(
logger, self.zuul_event_id, build=job.unique)
self.executor_server = executor_server
self.job = job
self.nodeset = NodeSet.fromDict(self.arguments["nodeset"])
self.node_request = self.executor_server.nodepool.zk_nodepool.getNodeRequest(
self.arguments["noderequest_id"])
self.jobdir = None
self.proc = None
self.proc_lock = threading.Lock()
@ -996,8 +1003,9 @@ class AnsibleJob(object):
self.executor_server.finishJob(self.job.unique)
except Exception:
self.log.exception("Error finalizing job thread:")
self.log.info("Job execution took: %.3f seconds" % (
time.monotonic() - self.time_starting_build))
self.log.info("Job execution took: %.3f seconds",
self.end_time - self.time_starting_build)
def _base_job_data(self):
return {
@ -1840,7 +1848,7 @@ class AnsibleJob(object):
root,
self.executor_server.merge_root,
logger=self.log,
scheme=zuul.model.SCHEME_GOLANG)
scheme=SCHEME_GOLANG)
merger.checkoutBranch(
project.connection_name, project.name,
branch,
@ -1880,7 +1888,7 @@ class AnsibleJob(object):
root,
self.jobdir.src_root,
logger=self.log,
scheme=zuul.model.SCHEME_GOLANG,
scheme=SCHEME_GOLANG,
cache_scheme=self.scheme)
break
@ -1890,7 +1898,7 @@ class AnsibleJob(object):
root,
self.executor_server.merge_root,
logger=self.log,
scheme=zuul.model.SCHEME_GOLANG)
scheme=SCHEME_GOLANG)
# If we don't have this repo yet prepared we need to restore
# the repo state. Otherwise we have speculative merges in the
@ -2671,6 +2679,9 @@ class ExecutorServer(BaseMergeServer):
_job_class = AnsibleJob
_repo_locks_class = RepoLocks
# Number of seconds past node expiration a hold request will remain
EXPIRED_HOLD_REQUEST_TTL = 24 * 60 * 60
def __init__(
self,
config,
@ -2823,6 +2834,7 @@ class ExecutorServer(BaseMergeServer):
self.process_merge_jobs = get_default(self.config, 'executor',
'merge_jobs', True)
self.nodepool = Nodepool(self.zk_client, self.hostname, self.statsd)
self.result_events = PipelineResultEventQueue.createRegistry(
self.zk_client
@ -3149,18 +3161,43 @@ class ExecutorServer(BaseMergeServer):
def executeJob(self, job):
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
log = get_annotated_logger(self.log, zuul_event_id)
log = get_annotated_logger(self.log, zuul_event_id, build=job.unique)
log.debug("Got %s job: %s", job.name, job.unique)
if self.statsd:
base_key = 'zuul.executor.{hostname}'
self.statsd.incr(base_key + '.builds')
# Make sure the job is added to the job workers and we have to look it
# up from there in the completeBuild() method to return the NodeSet.
self.job_workers[job.unique] = self._job_class(self, job)
if not self.lockNodes(job, event=zuul_event_id):
# Don't start the build if the nodes could not be locked
return
# Run manageLoad before starting the thread mostly for the
# benefit of the unit tests to make the calculation of the
# number of starting jobs more deterministic.
self.manageLoad()
self.job_workers[job.unique].run()
def lockNodes(self, job, event=None):
log = get_annotated_logger(self.log, event, build=job.unique)
try:
log.debug("Locking nodeset")
ansible_job = self.job_workers[job.unique]
self.nodepool.acceptNodes(ansible_job.node_request)
return True
except Exception:
data = dict(
result="NODE_FAILURE", exception=traceback.format_exc()
)
self.completeBuild(job, data)
# Remove the job from the job_workers as it was never started and
# thus won't finish on its own.
self.finishJob(job.unique)
return False
def run_governor(self):
while not self.governor_stop_event.wait(10):
try:
@ -3254,6 +3291,141 @@ class ExecutorServer(BaseMergeServer):
except Exception:
log.exception("Exception sending stop command to worker:")
def _handleExpiredHoldRequest(self, request):
'''
Check if a hold request is expired and delete it if it is.
The 'expiration' attribute will be set to the clock time when the
hold request was used for the last time. If this is NOT set, then
the request is still active.
If a node expiration time is set on the request, and the request is
expired, *and* we've waited for a defined period past the node
expiration (EXPIRED_HOLD_REQUEST_TTL), then we will delete the hold
request.
:param: request Hold request
:returns: True if it is expired, False otherwise.
'''
if not request.expired:
return False
if not request.node_expiration:
# Request has been used up but there is no node expiration, so
# we don't auto-delete it.
return True
elapsed = time.time() - request.expired
if elapsed < self.EXPIRED_HOLD_REQUEST_TTL + request.node_expiration:
# Haven't reached our defined expiration lifetime, so don't
# auto-delete it yet.
return True
try:
self.nodepool.zk_nodepool.lockHoldRequest(request)
self.log.info("Removing expired hold request %s", request)
self.nodepool.zk_nodepool.deleteHoldRequest(request)
except Exception:
self.log.exception(
"Failed to delete expired hold request %s", request
)
finally:
try:
self.nodepool.zk_nodepool.unlockHoldRequest(request)
except Exception:
pass
return True
def _getAutoholdRequest(self, ansible_job):
args = ansible_job.arguments
autohold_key_base = (
args["zuul"]["tenant"],
args["zuul"]["project"]["canonical_name"],
args["zuul"]["job"],
)
class Scope(object):
"""Enum defining a precedence/priority of autohold requests.
Autohold requests for specific refs should be fulfilled first,
before those for changes, and generic jobs.
Matching algorithm goes over all existing autohold requests, and
returns one with the highest number (in case of duplicated
requests the last one wins).
"""
NONE = 0
JOB = 1
CHANGE = 2
REF = 3
# Do a partial match of the autohold key against all autohold
# requests, ignoring the last element of the key (ref filter),
# and finally do a regex match between ref filter from
# the autohold request and the build's change ref to check
# if it matches. Lastly, make sure that we match the most
# specific autohold request by comparing "scopes"
# of requests - the most specific is selected.
autohold = None
scope = Scope.NONE
self.log.debug("Checking build autohold key %s", autohold_key_base)
for request_id in self.nodepool.zk_nodepool.getHoldRequests():
request = self.nodepool.zk_nodepool.getHoldRequest(request_id)
if not request:
continue
if self._handleExpiredHoldRequest(request):
continue
ref_filter = request.ref_filter
if request.current_count >= request.max_count:
# This request has been used the max number of times
continue
elif not (
request.tenant == autohold_key_base[0]
and request.project == autohold_key_base[1]
and request.job == autohold_key_base[2]
):
continue
elif not re.match(ref_filter, args["zuul"]["ref"]):
continue
if ref_filter == ".*":
candidate_scope = Scope.JOB
elif ref_filter.endswith(".*"):
candidate_scope = Scope.CHANGE
else:
candidate_scope = Scope.REF
self.log.debug(
"Build autohold key %s matched scope %s",
autohold_key_base,
candidate_scope,
)
if candidate_scope > scope:
scope = candidate_scope
autohold = request
return autohold
def _processAutohold(self, ansible_job, result):
# We explicitly only want to hold nodes for jobs if they have
# failed / retry_limit / post_failure and have an autohold request.
hold_list = ["FAILURE", "RETRY_LIMIT", "POST_FAILURE", "TIMED_OUT"]
if result not in hold_list:
return False
request = self._getAutoholdRequest(ansible_job)
if request is not None:
self.log.debug("Got autohold %s", request)
self.nodepool.holdNodeSet(
ansible_job.node_request.nodeset, request, ansible_job
)
return True
return False
def startBuild(self, job: gear.TextJob, data: Dict) -> None:
# Mark the gearman job as started, as we are still using it for the
# actual job execution. The data, however, will be passed to the
@ -3307,7 +3479,47 @@ class ExecutorServer(BaseMergeServer):
# result dict for that.
result["end_time"] = time.time()
# NOTE (felix): We store the end_time on the ansible job to calculate
# the in-use duration of locked nodes when the nodeset is returned.
ansible_job = self.job_workers[job.unique]
ansible_job.end_time = time.monotonic()
params = json.loads(job.arguments)
# If the result is None, check if the build has reached its max
# attempts and if so set the result to RETRY_LIMIT.
# NOTE (felix): This must be done in order to correctly process the
# autohold in the next step. Since we only want to hold the node if the
# build has reached a final result.
if result.get("result") is None:
attempts = params["zuul"]["attempts"]
max_attempts = params["max_attempts"]
if attempts >= max_attempts:
result["result"] = "RETRY_LIMIT"
zuul_event_id = params["zuul_event_id"]
log = get_annotated_logger(self.log, zuul_event_id, build=job.unique)
# Provide the hold information back to the scheduler via the build
# result.
try:
held = self._processAutohold(ansible_job, result.get("result"))
result["held"] = held
log.info("Held status set to %s", held)
except Exception:
log.exception("Unable to process autohold for %s", ansible_job)
# Regardless of any other conditions which might cause us not to pass
# the build result on to the scheduler/pipeline manager, make sure we
# return the nodes to nodepool.
try:
self.nodepool.returnNodeSet(
ansible_job.node_request.nodeset, ansible_job, zuul_event_id=zuul_event_id
)
except Exception:
self.log.exception(
"Unable to return nodeset %s", ansible_job.node_request.nodeset
)
tenant_name = params["zuul"]["tenant"]
pipeline_name = params["zuul"]["pipeline"]

View File

@ -635,10 +635,6 @@ class PipelineManager(metaclass=ABCMeta):
for job in jobs:
log.debug("Found job %s for change %s", job, item.change)
try:
nodeset = item.current_build_set.getJobNodeSet(job.name)
self.sched.nodepool.useNodeSet(
nodeset, build_set=item.current_build_set,
event=item.event)
self.sched.executor.execute(
job, item, self.pipeline,
build_set.dependent_changes,
@ -1437,7 +1433,6 @@ class PipelineManager(metaclass=ABCMeta):
if request.failed or not request.fulfilled:
log.info("Node request %s: failure for %s",
request, request.job.name)
build_set.item.setNodeRequestFailure(request.job)
self._resumeBuilds(request.build_set)
tenant = build_set.item.pipeline.tenant
tenant.semaphore_handler.release(build_set.item, request.job)

View File

@ -609,6 +609,7 @@ class Node(ConfigObject):
self.username = None
self.hold_expiration = None
self.resources = None
self.allocated_to = None
@property
def state(self):
@ -717,6 +718,7 @@ class NodeSet(ConfigObject):
self.name = name or ''
self.nodes = OrderedDict()
self.groups = OrderedDict()
self.node_request_id = None
def __ne__(self, other):
return not self.__eq__(other)
@ -728,9 +730,11 @@ class NodeSet(ConfigObject):
self.nodes == other.nodes)
def toDict(self):
d = {}
d['name'] = self.name
d['nodes'] = []
d = {
"node_request_id": self.node_request_id,
"name": self.name,
"nodes": [],
}
for node in self.nodes.values():
d['nodes'].append(node.toDict(internal_attributes=True))
d['groups'] = []
@ -741,6 +745,7 @@ class NodeSet(ConfigObject):
@classmethod
def fromDict(cls, data):
nodeset = cls(data["name"])
nodeset.node_request_id = data["node_request_id"]
for node in data["nodes"]:
nodeset.addNode(Node.fromDict(node))
for group in data["groups"]:
@ -749,6 +754,7 @@ class NodeSet(ConfigObject):
def copy(self):
n = NodeSet(self.name)
n.node_request_id = self.node_request_id
for name, node in self.nodes.items():
n.addNode(Node(node.name, node.label))
for name, group in self.groups.items():
@ -863,6 +869,10 @@ class NodeRequest(object):
def toDict(self):
# Start with any previously read data
d = self._zk_data.copy()
# This is just the nodeset structure without data, but we need it to
# update the nodes on the executor side when the node request is
# fulfilled.
d["nodeset"] = self.nodeset.toDict()
nodes = [n.label for n in self.nodeset.getNodes()]
# These are immutable once set
d.setdefault('node_types', nodes)
@ -882,6 +892,32 @@ class NodeRequest(object):
self.state_time = data['state_time']
self.relative_priority = data.get('relative_priority', 0)
@classmethod
def fromDict(cls, data):
request = cls(
requestor=data["requestor"],
# TODO (felix): How to get the build_set and the job? They
# shouldn't be relevant for the current implementation, but we
# should know that they are missing.
build_set=None,
job=None,
nodeset=NodeSet.fromDict(data["nodeset"]),
relative_priority=data.get("relative_priority", 0),
)
request.created_time = data["created_time"]
request.provider = data["provider"]
# Set _state directly to bypass the setter and avoid overwriting the
# state_time.
request._state = data["state"]
request.state_time = data["state_time"]
request.event_id = data["event_id"]
request._zk_data = data
return request
class Secret(ConfigObject):
"""A collection of private data.
@ -2040,7 +2076,6 @@ class Build(object):
self.worker = Worker()
self.node_labels = []
self.node_name = None
self.nodeset = None
self.zuul_event_id = zuul_event_id
def __repr__(self):
@ -2261,7 +2296,7 @@ class BuildSet(object):
if job_name in self.nodesets:
raise Exception("Prior node request for %s" % (job_name))
self.nodesets[job_name] = nodeset
del self.node_requests[job_name]
#del self.node_requests[job_name]
def getTries(self, job_name):
return self.tries.get(job_name, 0)
@ -2942,14 +2977,6 @@ class QueueItem(object):
fakebuild.result = 'SKIPPED'
self.addBuild(fakebuild)
def setNodeRequestFailure(self, job):
fakebuild = Build(job, self.current_build_set, None)
fakebuild.start_time = time.time()
fakebuild.end_time = time.time()
self.addBuild(fakebuild)
fakebuild.result = 'NODE_FAILURE'
self.setResult(fakebuild)
def setDequeuedNeedingChange(self):
self.dequeued_needing_change = True
self._setAllJobsSkipped()

View File

@ -128,6 +128,10 @@ class Nodepool(object):
# the executor side.
self.sched.onNodesProvisioned(req)
del self.requests[req.uid]
# Store the node_request id on the nodeset for later evaluation
nodeset.node_request_id = req.id
return req
def cancelRequest(self, request):
@ -173,7 +177,7 @@ class Nodepool(object):
except Exception:
log.exception("Unable to unlock node request %s", request)
def holdNodeSet(self, nodeset, request, build):
def holdNodeSet(self, nodeset, request, ansible_job):
'''
Perform a hold on the given set of nodes.
@ -197,7 +201,7 @@ class Nodepool(object):
self.zk_nodepool.storeNode(node)
request.nodes.append(dict(
build=build.uuid,
build=ansible_job.job.unique,
nodes=[node.id for node in nodes],
))
request.current_count += 1
@ -225,21 +229,22 @@ class Nodepool(object):
# _doBuildCompletedEvent, we always want to try to unlock it.
self.zk_nodepool.unlockHoldRequest(request)
def useNodeSet(self, nodeset, build_set=None, event=None):
self.log.info("Setting nodeset %s in use" % (nodeset,))
def useNodeSet(self, nodeset, ansible_job=None, event=None):
self.log.info("Setting nodeset %s in use", nodeset)
resources = defaultdict(int)
for node in nodeset.getNodes():
if node.lock is None:
raise Exception("Node %s is not locked" % (node,))
raise Exception("Node %s is not locked", node)
node.state = model.STATE_IN_USE
self.zk_nodepool.storeNode(node)
if node.resources:
add_resources(resources, node.resources)
if build_set and resources:
if ansible_job and resources:
args = ansible_job.arguments
# we have a buildset and thus also tenant and project so we
# can emit project specific resource usage stats
tenant_name = build_set.item.layout.tenant.name
project_name = build_set.item.change.project.canonical_name
tenant_name = args["zuul"]["tenant"]
project_name = args["zuul"]["project"]["canonical_name"]
self.current_resources_by_tenant.setdefault(
tenant_name, defaultdict(int))
@ -252,24 +257,11 @@ class Nodepool(object):
resources)
self.emitStatsResources()
def returnNodeSet(self, nodeset, build=None, zuul_event_id=None):
def returnNodeSet(self, nodeset, ansible_job=None, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
log.info("Returning nodeset %s", nodeset)
resources = defaultdict(int)
duration = None
project = None
tenant = None
if build:
project = build.build_set.item.change.project
tenant = build.build_set.item.pipeline.tenant.name
if (build and build.start_time and build.end_time and
build.build_set and build.build_set.item and
build.build_set.item.change and
build.build_set.item.change.project):
duration = build.end_time - build.start_time
log.info("Nodeset %s with %s nodes was in use "
"for %s seconds for build %s for project %s",
nodeset, len(nodeset.nodes), duration, build, project)
for node in nodeset.getNodes():
if node.lock is None:
log.error("Node %s is not locked", node)
@ -285,21 +277,33 @@ class Nodepool(object):
"while unlocking:", node)
self._unlockNodes(nodeset.getNodes())
if not ansible_job:
return
args = ansible_job.arguments
project = args["zuul"]["project"]["canonical_name"]
tenant = args["zuul"]["tenant"]
duration = 0
if ansible_job.end_time and ansible_job.time_starting_build:
duration = ansible_job.end_time - ansible_job.time_starting_build
log.info("Nodeset %s with %s nodes was in use "
"for %s seconds for build %s for project %s",
nodeset, len(nodeset.nodes), duration, ansible_job, project)
# When returning a nodeset we need to update the gauges if we have a
# build. Further we calculate resource*duration and increment their
# tenant or project specific counters. With that we have both the
# current value and also counters to be able to perform accounting.
if tenant and project and resources:
project_name = project.canonical_name
if resources:
subtract_resources(
self.current_resources_by_tenant[tenant], resources)
subtract_resources(
self.current_resources_by_project[project_name], resources)
self.current_resources_by_project[project], resources)
self.emitStatsResources()
if duration:
self.emitStatsResourceCounters(
tenant, project_name, resources, duration)
tenant, project, resources, duration)
def unlockNodeSet(self, nodeset):
self._unlockNodes(nodeset.getNodes())
@ -312,15 +316,12 @@ class Nodepool(object):
self.log.exception("Error unlocking node:")
def lockNodeSet(self, nodeset, request_id):
self._lockNodes(nodeset.getNodes(), request_id)
def _lockNodes(self, nodes, request_id):
# Try to lock all of the supplied nodes. If any lock fails,
# try to unlock any which have already been locked before
# re-raising the error.
locked_nodes = []
try:
for node in nodes:
for node in nodeset.getNodes():
if node.allocated_to != request_id:
raise Exception("Node %s allocated to %s, not %s" %
(node.id, node.allocated_to, request_id))
@ -338,6 +339,9 @@ class Nodepool(object):
# node.
log.debug("Updating node request %s", request)
# Update the node_request id on the nodeset
request.nodeset.node_request_id = request.id
if request.uid not in self.requests:
log.debug("Request %s is unknown", request.uid)
return False
@ -372,7 +376,7 @@ class Nodepool(object):
return True
def acceptNodes(self, request, request_id):
def acceptNodes(self, request):
log = get_annotated_logger(self.log, request.event_id)
# Called by the scheduler when it wants to accept and lock
@ -382,11 +386,19 @@ class Nodepool(object):
log.info("Accepting node request %s", request)
# TODO (felix): I think this shouldn't be a problem anymore as the
# node request is now directly retrieved from ZooKeeper before
# accepting the nodes on the executor.
"""
if request_id != request.id:
log.info("Skipping node accept for %s (resubmitted as %s)",
request_id, request.id)
return False
"""
# TODO (felix): The canceled might also not be necessary anymore as the
# executor won't be able to retrieve the NodeRequest from ZooKeeper if
# it was deleted.
if request.canceled:
log.info("Ignoring canceled node request %s", request)
# The request was already deleted when it was canceled
@ -419,13 +431,13 @@ class Nodepool(object):
# request probably doesn't make sense at this point in time as it
# is likely to directly fail again. So just log the problem
# with zookeeper and fail here.
log.exception("Error getting node request %s:", request_id)
log.exception("Error getting node request %s:", request)
request.failed = True
return True
locked = False
if request.fulfilled:
# If the request suceeded, try to lock the nodes.
# If the request succeeded, try to lock the nodes.
try:
self.lockNodeSet(request.nodeset, request.id)
locked = True
@ -446,4 +458,6 @@ class Nodepool(object):
# them.
if locked:
self.unlockNodeSet(request.nodeset)
return True
if request.failed:
raise Exception("Accepting nodes failed")

View File

@ -18,7 +18,6 @@
import json
import logging
import os
import re
import socket
import sys
import threading
@ -110,9 +109,6 @@ class Scheduler(threading.Thread):
_cleanup_interval = 60 * 60
_merger_client_class = MergeClient
# Number of seconds past node expiration a hold request will remain
EXPIRED_HOLD_REQUEST_TTL = 24 * 60 * 60
def __init__(self, config, connections, app, testonly=False):
threading.Thread.__init__(self)
self.daemon = True
@ -1539,131 +1535,6 @@ class Scheduler(threading.Thread):
return
pipeline.manager.onBuildPaused(build)
def _handleExpiredHoldRequest(self, request):
'''
Check if a hold request is expired and delete it if it is.
The 'expiration' attribute will be set to the clock time when the
hold request was used for the last time. If this is NOT set, then
the request is still active.
If a node expiration time is set on the request, and the request is
expired, *and* we've waited for a defined period past the node
expiration (EXPIRED_HOLD_REQUEST_TTL), then we will delete the hold
request.
:returns: True if it is expired, False otherwise.
'''
if not request.expired:
return False
if not request.node_expiration:
# Request has been used up but there is no node expiration, so
# we don't auto-delete it.
return True
elapsed = time.time() - request.expired
if elapsed < self.EXPIRED_HOLD_REQUEST_TTL + request.node_expiration:
# Haven't reached our defined expiration lifetime, so don't
# auto-delete it yet.
return True
try:
self.zk_nodepool.lockHoldRequest(request)
self.log.info("Removing expired hold request %s", request)
self.zk_nodepool.deleteHoldRequest(request)
except Exception:
self.log.exception(
"Failed to delete expired hold request %s", request)
finally:
try:
self.zk_nodepool.unlockHoldRequest(request)
except Exception:
pass
return True
def _getAutoholdRequest(self, build):
change = build.build_set.item.change
autohold_key_base = (build.pipeline.tenant.name,
change.project.canonical_name,
build.job.name)
class Scope(object):
"""Enum defining a precedence/priority of autohold requests.
Autohold requests for specific refs should be fulfilled first,
before those for changes, and generic jobs.
Matching algorithm goes over all existing autohold requests, and
returns one with the highest number (in case of duplicated
requests the last one wins).
"""
NONE = 0
JOB = 1
CHANGE = 2
REF = 3
# Do a partial match of the autohold key against all autohold
# requests, ignoring the last element of the key (ref filter),
# and finally do a regex match between ref filter from
# the autohold request and the build's change ref to check
# if it matches. Lastly, make sure that we match the most
# specific autohold request by comparing "scopes"
# of requests - the most specific is selected.
autohold = None
scope = Scope.NONE
self.log.debug("Checking build autohold key %s", autohold_key_base)
for request_id in self.zk_nodepool.getHoldRequests():
request = self.zk_nodepool.getHoldRequest(request_id)
if not request:
continue
if self._handleExpiredHoldRequest(request):
continue
ref_filter = request.ref_filter
if request.current_count >= request.max_count:
# This request has been used the max number of times
continue
elif not (request.tenant == autohold_key_base[0] and
request.project == autohold_key_base[1] and
request.job == autohold_key_base[2]):
continue
elif not re.match(ref_filter, change.ref):
continue
if ref_filter == ".*":
candidate_scope = Scope.JOB
elif ref_filter.endswith(".*"):
candidate_scope = Scope.CHANGE
else:
candidate_scope = Scope.REF
self.log.debug("Build autohold key %s matched scope %s",
autohold_key_base, candidate_scope)
if candidate_scope > scope:
scope = candidate_scope
autohold = request
return autohold
def _processAutohold(self, build):
# We explicitly only want to hold nodes for jobs if they have
# failed / retry_limit / post_failure and have an autohold request.
hold_list = ["FAILURE", "RETRY_LIMIT", "POST_FAILURE", "TIMED_OUT"]
if build.result not in hold_list:
return False
request = self._getAutoholdRequest(build)
if request is not None:
self.log.debug("Got autohold %s", request)
self.nodepool.holdNodeSet(build.nodeset, request, build)
return True
return False
def _doBuildCompletedEvent(self, event):
# Get the local build object from the executor client
build = self.executor.builds.get(event.build)
@ -1682,13 +1553,8 @@ class Scheduler(threading.Thread):
build.error_detail = event_result.get("error_detail")
if result is None:
if (
build.build_set.getTries(build.job.name) >= build.job.attempts
):
result = "RETRY_LIMIT"
else:
build.retry = True
if result in ("DISCONNECT", "ABORTED"):
build.retry = True
if result == "ABORTED":
# Always retry if the executor just went away
build.retry = True
if result == "MERGER_FAILURE":
@ -1709,7 +1575,6 @@ class Scheduler(threading.Thread):
result_data = event_result.get("data", {})
warnings = event_result.get("warnings", [])
log.info("Build complete, result %s, warnings %s", result, warnings)
if build.retry:
@ -1724,25 +1589,15 @@ class Scheduler(threading.Thread):
build.end_time = event_result["end_time"]
build.result_data = result_data
build.build_set.warning_messages.extend(warnings)
build.held = event_result.get("held")
build.result = result
self._reportBuildStats(build)
# Regardless of any other conditions which might cause us not
# to pass this on to the pipeline manager, make sure we return
# the nodes to nodepool.
try:
build.held = self._processAutohold(build)
self.log.debug(
'build "%s" held status set to %s', build, build.held
)
except Exception:
log.exception("Unable to process autohold for %s", build)
try:
self.nodepool.returnNodeSet(build.nodeset, build=build,
zuul_event_id=build.zuul_event_id)
except Exception:
log.exception("Unable to return nodeset %s" % build.nodeset)
# The build is completed and the nodes were already returned by the
# executor. For consistency, also remove the node request from the
# build set.
build.build_set.removeJobNodeRequest(build.job.name)
# The test suite expects the build to be removed from the
# internal dict after it's added to the report queue.
@ -1790,14 +1645,9 @@ class Scheduler(threading.Thread):
def _doNodesProvisionedEvent(self, event):
request = event.request
request_id = event.request_id
build_set = request.build_set
log = get_annotated_logger(self.log, request.event_id)
ready = self.nodepool.acceptNodes(request, request_id)
if not ready:
return
if build_set is not build_set.item.current_build_set:
log.warning("Build set %s is not current "
"for node request %s", build_set, request)
@ -1919,7 +1769,7 @@ class Scheduler(threading.Thread):
nodeset = buildset.getJobNodeSet(job_name)
if nodeset:
self.nodepool.returnNodeSet(
nodeset, build=build, zuul_event_id=item.event)
nodeset, zuul_event_id=item.event)
# In the unlikely case that a build is removed and
# later added back, make sure we clear out the nodeset

View File

@ -21,7 +21,7 @@ from kazoo.recipe.cache import TreeEvent
from kazoo.recipe.lock import Lock
import zuul.model
from zuul.model import HoldRequest
from zuul.model import HoldRequest, NodeRequest
from zuul.zk import ZooKeeperClient, ZooKeeperBase
from zuul.zk.exceptions import LockException
@ -400,6 +400,41 @@ class ZooKeeperNodepool(ZooKeeperBase):
self.kazoo_client.DataWatch(path, callback)
def getNodeRequest(self, node_request_id):
"""
Retrieve a NodeRequest from a given path in ZooKeeper
"""
# Create an empty NodeRequest object which will be updated with the
# current data in ZooKeeper. This will ensure that also the nodes are
# updated.
"""
obj = NodeRequest(
requestor=None,
build_set=None,
job=None,
nodeset=None,
relative_priority=0,
event=None
)
obj.id = node_request_id
"""
path = f"{self.REQUEST_ROOT}/{node_request_id}"
try:
data, stat = self.kazoo_client.get(path)
except NoNodeError:
return None
if not data:
return None
obj = NodeRequest.fromDict(json.loads(data.decode("utf-8")))
self.updateNodeRequest(obj, data)
obj.id = node_request_id
obj.stat = stat
return obj
def deleteNodeRequest(self, node_request):
"""
Delete a request for nodes.
@ -455,6 +490,7 @@ class ZooKeeperNodepool(ZooKeeperBase):
request_nodes[i].id = nodeid
self._updateNode(request_nodes[i])
node_request.updateFromDict(data)
#node_request.stat = stat
def storeNode(self, node):
"""
@ -509,15 +545,16 @@ class ZooKeeperNodepool(ZooKeeperBase):
"""
Unlock a node.
The node must already have been locked.
:param Node node: The node which should be unlocked.
"""
if node.lock is None:
raise LockException("Node %s does not hold a lock" % (node,))
node.lock.release()
node.lock = None
# This could happen if acquiring the lock failed and shouldn't be
# treated as an error.
self.log.warning("Node %s does not hold a lock", node)
else:
node.lock.release()
node.lock = None
def lockNodeRequest(self, request, blocking=True, timeout=None):
"""