Don't use the AnsibleJob in the nodepool client
This change follows up on a few TODOs left by the lock/unlock nodes on executor change. When locking the nodes on the executor we used the AnsibleJob as a replacement for the old build parameter that was provided to the nodepool client methods as they were originally called by the scheduler. However, the AnsibleJob class should only be used internally by the executor, so we now provide all parameters directly to the nodepool methods. This also annotates the logger in the updated nodepool client methods and fixes an outdated method signature in test_scheduler.TestSemaphore.test_semaphore_zk_error. Remove two comments about storing timestamps on the build request in ZooKeeper as this doesn't make much sense. It sounded like a good idea in the beginning, but with the current solution, the scheduler doesn't need to care about the build request anymore after it was submitted (except for canceling/cleanup purposes) and the result data is self-contained. Change-Id: I2d1005f69904c6ace8f79523133f382af0024c52
This commit is contained in:
parent
65cac91e6c
commit
38776452bb
|
@ -4296,9 +4296,8 @@ class SchedulerTestApp:
|
|||
if validate_tenants is None:
|
||||
self.connections.registerScheduler(self.sched)
|
||||
|
||||
# TODO (felix, swestphahl): Can be removed when the nodes
|
||||
# provisioned events are switched to ZooKeeper and after we no
|
||||
# longer use global management events.
|
||||
# TODO (swestphahl): Can be removed when we no longer use global
|
||||
# management events.
|
||||
self.event_queues = [
|
||||
self.sched.reconfigure_event_queue,
|
||||
]
|
||||
|
@ -4666,8 +4665,8 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.fake_sql, self.addCleanup, self.validate_tenants)
|
||||
|
||||
def __event_queues(self, matcher) -> List[Queue]:
|
||||
# TODO (felix): Can be removed when the nodes provisioned events are
|
||||
# switched to ZooKeeper.
|
||||
# TODO (swestphahl): Can be removed when we no longer use global
|
||||
# management events.
|
||||
sched_queues = map(lambda app: app.event_queues,
|
||||
self.scheds.filter(matcher))
|
||||
return [item for sublist in sched_queues for item in sublist] + \
|
||||
|
|
|
@ -74,12 +74,14 @@ class TestNodepoolIntegration(BaseTestCase):
|
|||
self.assertEqual(node.state, model.STATE_READY)
|
||||
|
||||
# Mark the nodes in use
|
||||
self.nodepool.useNodeSet(nodeset)
|
||||
self.nodepool.useNodeSet(nodeset, tenant_name=None, project_name=None)
|
||||
for node in nodeset.getNodes():
|
||||
self.assertEqual(node.state, model.STATE_IN_USE)
|
||||
|
||||
# Return the nodes
|
||||
self.nodepool.returnNodeSet(nodeset)
|
||||
self.nodepool.returnNodeSet(
|
||||
nodeset, build=None, tenant_name=None, project_name=None,
|
||||
duration=0)
|
||||
for node in nodeset.getNodes():
|
||||
self.assertIsNone(node.lock)
|
||||
self.assertEqual(node.state, model.STATE_USED)
|
||||
|
|
|
@ -91,12 +91,15 @@ class TestNodepool(TestNodepoolBase):
|
|||
self.assertEqual(node.state, 'ready')
|
||||
|
||||
# Mark the nodes in use
|
||||
self.nodepool.useNodeSet(executor_nodeset)
|
||||
self.nodepool.useNodeSet(
|
||||
executor_nodeset, tenant_name=None, project_name=None)
|
||||
for node in executor_nodeset.getNodes():
|
||||
self.assertEqual(node.state, 'in-use')
|
||||
|
||||
# Return the nodes
|
||||
self.nodepool.returnNodeSet(executor_nodeset)
|
||||
self.nodepool.returnNodeSet(
|
||||
executor_nodeset, build=None, tenant_name=None, project_name=None,
|
||||
duration=0)
|
||||
for node in executor_nodeset.getNodes():
|
||||
self.assertIsNone(node.lock)
|
||||
self.assertEqual(node.state, 'used')
|
||||
|
|
|
@ -7890,7 +7890,7 @@ class TestSemaphore(ZuulTestCase):
|
|||
# Simulate a single zk error in useNodeSet
|
||||
orig_useNodeSet = self.scheds.first.sched.nodepool.useNodeSet
|
||||
|
||||
def broken_use_nodeset(nodeset, build_set=None, event=None):
|
||||
def broken_use_nodeset(nodeset, tenant_name, project_name):
|
||||
# restore original useNodeSet
|
||||
self.scheds.first.sched.nodepool.useNodeSet = orig_useNodeSet
|
||||
raise NoNodeError()
|
||||
|
|
|
@ -1134,10 +1134,16 @@ class AnsibleJob(object):
|
|||
|
||||
def unlockNodes(self):
|
||||
if self.node_request:
|
||||
tenant_name = self.arguments["zuul"]["tenant"]
|
||||
project_name = self.arguments["zuul"]["project"]["canonical_name"]
|
||||
duration = self.end_time - self.time_starting_build
|
||||
try:
|
||||
self.executor_server.nodepool.returnNodeSet(
|
||||
self.nodeset,
|
||||
self,
|
||||
self.build_request,
|
||||
tenant_name,
|
||||
project_name,
|
||||
duration,
|
||||
zuul_event_id=self.zuul_event_id,
|
||||
)
|
||||
except Exception:
|
||||
|
@ -1350,7 +1356,10 @@ class AnsibleJob(object):
|
|||
# start to run tasks on nodes (prepareVars in particular uses
|
||||
# Ansible to freeze hostvars).
|
||||
if self.node_request:
|
||||
self.executor_server.nodepool.useNodeSet(self.nodeset, self)
|
||||
tenant_name = self.arguments["zuul"]["tenant"]
|
||||
project_name = self.arguments["zuul"]["project"]["canonical_name"]
|
||||
self.executor_server.nodepool.useNodeSet(
|
||||
self.nodeset, tenant_name, project_name, self.zuul_event_id)
|
||||
|
||||
# This prepares each playbook and the roles needed for each.
|
||||
self.preparePlaybooks(args)
|
||||
|
@ -3734,8 +3743,7 @@ class ExecutorServer(BaseMergeServer):
|
|||
|
||||
return True
|
||||
|
||||
def _getAutoholdRequest(self, ansible_job):
|
||||
args = ansible_job.arguments
|
||||
def _getAutoholdRequest(self, args):
|
||||
autohold_key_base = (
|
||||
args["zuul"]["tenant"],
|
||||
args["zuul"]["project"]["canonical_name"],
|
||||
|
@ -3807,26 +3815,23 @@ class ExecutorServer(BaseMergeServer):
|
|||
|
||||
return autohold
|
||||
|
||||
def _processAutohold(self, ansible_job, result):
|
||||
def _processAutohold(self, ansible_job, duration, result):
|
||||
# We explicitly only want to hold nodes for jobs if they have
|
||||
# failed / retry_limit / post_failure and have an autohold request.
|
||||
hold_list = ["FAILURE", "RETRY_LIMIT", "POST_FAILURE", "TIMED_OUT"]
|
||||
if result not in hold_list:
|
||||
return False
|
||||
|
||||
request = self._getAutoholdRequest(ansible_job)
|
||||
request = self._getAutoholdRequest(ansible_job.arguments)
|
||||
if request is not None:
|
||||
self.log.debug("Got autohold %s", request)
|
||||
self.nodepool.holdNodeSet(
|
||||
ansible_job.nodeset, request, ansible_job
|
||||
)
|
||||
ansible_job.nodeset, request, ansible_job.build_request,
|
||||
duration, ansible_job.zuul_event_id)
|
||||
return True
|
||||
return False
|
||||
|
||||
def startBuild(self, build_request, data):
|
||||
# TODO (felix): Once the builds are stored in ZooKeeper, we can store
|
||||
# the start_time directly on the build. But for now we have to use the
|
||||
# data dict for that.
|
||||
data["start_time"] = time.time()
|
||||
|
||||
event = BuildStartedEvent(build_request.uuid, data)
|
||||
|
@ -3859,9 +3864,6 @@ class ExecutorServer(BaseMergeServer):
|
|||
return
|
||||
|
||||
def completeBuild(self, build_request, result):
|
||||
# TODO (felix): Once the builds are stored in ZooKeeper, we can store
|
||||
# the end_time directly on the build. But for now we have to use the
|
||||
# result dict for that.
|
||||
result["end_time"] = time.time()
|
||||
|
||||
log = get_annotated_logger(self.log, build_request.event_id,
|
||||
|
@ -3873,6 +3875,7 @@ class ExecutorServer(BaseMergeServer):
|
|||
ansible_job = self.job_workers.get(build_request.uuid)
|
||||
if ansible_job:
|
||||
ansible_job.end_time = time.monotonic()
|
||||
duration = ansible_job.end_time - ansible_job.time_starting_build
|
||||
|
||||
params = ansible_job.arguments
|
||||
# If the result is None, check if the build has reached
|
||||
|
@ -3890,11 +3893,13 @@ class ExecutorServer(BaseMergeServer):
|
|||
# Provide the hold information back to the scheduler via the build
|
||||
# result.
|
||||
try:
|
||||
held = self._processAutohold(ansible_job, result.get("result"))
|
||||
held = self._processAutohold(ansible_job, duration,
|
||||
result.get("result"))
|
||||
result["held"] = held
|
||||
log.info("Held status set to %s", held)
|
||||
except Exception:
|
||||
log.exception("Unable to process autohold for %s", ansible_job)
|
||||
log.exception("Unable to process autohold for %s",
|
||||
build_request)
|
||||
|
||||
def update_build_request(log, build_request):
|
||||
try:
|
||||
|
|
|
@ -249,33 +249,27 @@ class Nodepool(object):
|
|||
except Exception:
|
||||
log.exception("Unable to unlock node request %s", request)
|
||||
|
||||
# TODO (felix): Switch back to use a build object here rather than the
|
||||
# ansible_job once it's available via ZK.
|
||||
def holdNodeSet(self, nodeset, request, ansible_job):
|
||||
def holdNodeSet(self, nodeset, request, build, duration,
|
||||
zuul_event_id=None):
|
||||
'''
|
||||
Perform a hold on the given set of nodes.
|
||||
|
||||
:param NodeSet nodeset: The object containing the set of nodes to hold.
|
||||
:param HoldRequest request: Hold request associated with the NodeSet
|
||||
'''
|
||||
self.log.info("Holding nodeset %s" % (nodeset,))
|
||||
log = get_annotated_logger(self.log, zuul_event_id)
|
||||
log.info("Holding nodeset %s", nodeset)
|
||||
resources = defaultdict(int)
|
||||
nodes = nodeset.getNodes()
|
||||
|
||||
args = ansible_job.arguments
|
||||
project = args["zuul"]["project"]["canonical_name"]
|
||||
tenant = args["zuul"]["tenant"]
|
||||
duration = 0
|
||||
if ansible_job.end_time and ansible_job.time_starting_build:
|
||||
duration = ansible_job.end_time - ansible_job.time_starting_build
|
||||
self.log.info(
|
||||
log.info(
|
||||
"Nodeset %s with %s nodes was in use for %s seconds for build %s "
|
||||
"for project %s",
|
||||
nodeset, len(nodeset.nodes), duration, ansible_job, project)
|
||||
nodeset, len(nodeset.nodes), duration, build, request.project)
|
||||
|
||||
for node in nodes:
|
||||
if node.lock is None:
|
||||
raise Exception("Node %s is not locked" % (node,))
|
||||
raise Exception(f"Node {node} is not locked")
|
||||
if node.resources:
|
||||
self.addResources(resources, node.resources)
|
||||
node.state = model.STATE_HOLD
|
||||
|
@ -289,7 +283,7 @@ class Nodepool(object):
|
|||
self.zk_nodepool.storeNode(node)
|
||||
|
||||
request.nodes.append(dict(
|
||||
build=ansible_job.build_request.uuid,
|
||||
build=build.uuid,
|
||||
nodes=[node.id for node in nodes],
|
||||
))
|
||||
request.current_count += 1
|
||||
|
@ -311,30 +305,25 @@ class Nodepool(object):
|
|||
# just get used more than the original count specified.
|
||||
# It's possible to leak some held nodes, though, which would
|
||||
# require manual node deletes.
|
||||
self.log.exception("Unable to update hold request %s:", request)
|
||||
log.exception("Unable to update hold request %s:", request)
|
||||
finally:
|
||||
# Although any exceptions thrown here are handled higher up in
|
||||
# _doBuildCompletedEvent, we always want to try to unlock it.
|
||||
self.zk_nodepool.unlockHoldRequest(request)
|
||||
|
||||
if tenant and project and resources and duration:
|
||||
if resources and duration:
|
||||
self.emitStatsResourceCounters(
|
||||
tenant, project, resources, duration)
|
||||
request.tenant, request.project, resources, duration)
|
||||
|
||||
# TODO (felix): Switch back to use a build object here rather than the
|
||||
# ansible_job once it's available via ZK.
|
||||
def useNodeSet(self, nodeset, ansible_job=None):
|
||||
self.log.info("Setting nodeset %s in use", nodeset)
|
||||
user_data = None
|
||||
if ansible_job:
|
||||
args = ansible_job.arguments
|
||||
tenant_name = args["zuul"]["tenant"]
|
||||
project_name = args["zuul"]["project"]["canonical_name"]
|
||||
user_data = dict(
|
||||
zuul_system=self.system_id,
|
||||
tenant_name=tenant_name,
|
||||
project_name=project_name,
|
||||
)
|
||||
def useNodeSet(self, nodeset, tenant_name, project_name,
|
||||
zuul_event_id=None):
|
||||
log = get_annotated_logger(self.log, zuul_event_id)
|
||||
log.info("Setting nodeset %s in use", nodeset)
|
||||
user_data = dict(
|
||||
zuul_system=self.system_id,
|
||||
tenant_name=tenant_name,
|
||||
project_name=project_name,
|
||||
)
|
||||
for node in nodeset.getNodes():
|
||||
if node.lock is None:
|
||||
raise Exception("Node %s is not locked", node)
|
||||
|
@ -342,9 +331,8 @@ class Nodepool(object):
|
|||
node.user_data = user_data
|
||||
self.zk_nodepool.storeNode(node)
|
||||
|
||||
# TODO (felix): Switch back to use a build object here rather than the
|
||||
# ansible_job once it's available via ZK.
|
||||
def returnNodeSet(self, nodeset, ansible_job=None, zuul_event_id=None):
|
||||
def returnNodeSet(self, nodeset, build, tenant_name, project_name,
|
||||
duration, zuul_event_id=None):
|
||||
log = get_annotated_logger(self.log, zuul_event_id)
|
||||
log.info("Returning nodeset %s", nodeset)
|
||||
resources = defaultdict(int)
|
||||
|
@ -364,22 +352,13 @@ class Nodepool(object):
|
|||
"while unlocking:", node)
|
||||
self.unlockNodeSet(nodeset)
|
||||
|
||||
if not ansible_job:
|
||||
return
|
||||
|
||||
args = ansible_job.arguments
|
||||
project = args["zuul"]["project"]["canonical_name"]
|
||||
tenant = args["zuul"]["tenant"]
|
||||
duration = 0
|
||||
if ansible_job.end_time and ansible_job.time_starting_build:
|
||||
duration = ansible_job.end_time - ansible_job.time_starting_build
|
||||
log.info("Nodeset %s with %s nodes was in use "
|
||||
"for %s seconds for build %s for project %s",
|
||||
nodeset, len(nodeset.nodes), duration, ansible_job, project)
|
||||
nodeset, len(nodeset.nodes), duration, build, project_name)
|
||||
|
||||
if resources and duration:
|
||||
self.emitStatsResourceCounters(
|
||||
tenant, project, resources, duration)
|
||||
tenant_name, project_name, resources, duration)
|
||||
|
||||
def unlockNodeSet(self, nodeset):
|
||||
self._unlockNodes(nodeset.getNodes())
|
||||
|
|
Loading…
Reference in New Issue