Remove nodeset from NodeRequest

To make things simpler for schedulers to handle node provisioned
events for node requests which they may not have in their local
pipeline state, we need to make the pipeline storage of node requests
simpler.  That starts by removing the nodeset object as an attribute
of the NodeRequest object.  This means that the scheduler can work
with a node request object without relying on having the associated
nodeset.  It also simplifies the ZooKeeper code that deserializes
NodeRequests (as it doesn't have to create fake NodeSet objects too).
And finally, it simplifies what must be stored in the pipeline
and queue item structures, which will also come in handy later.

Two tests designed to verify that the request->nodeset magic
deserialization worked have been removed since they are no longer
applicable.

Change-Id: I70ae083765d5cd9a4fd1afc2442bf22d6c52ba0b
This commit is contained in:
James E. Blair
2021-08-25 16:56:33 -07:00
parent e577ec90bd
commit dbe13ce076
13 changed files with 119 additions and 214 deletions

View File

@@ -2880,8 +2880,8 @@ class FakeBuild(object):
# the complexity around multi-node jobs here
# (self.nodes[0].label?)
self.node = None
if len(self.parameters.get('nodes')) == 1:
self.node = self.parameters['nodes'][0]['label']
if len(self.parameters['nodeset']['nodes']) == 1:
self.node = self.parameters['nodeset']['nodes'][0]['label']
self.unique = self.parameters['zuul']['build']
self.pipeline = self.parameters['zuul']['pipeline']
self.project = self.parameters['zuul']['project']['name']

View File

@@ -80,25 +80,26 @@ class TestNodepool(TestNodepoolBase):
self.assertEqual(request.state, 'fulfilled')
# Accept the nodes
accepted = self.nodepool.checkNodeRequest(request, request.id)
self.assertTrue(accepted)
new_nodeset = self.nodepool.checkNodeRequest(
request, request.id, nodeset)
self.assertIsNotNone(new_nodeset)
# acceptNodes will be called on the executor, but only if the
# noderequest was accepted before.
self.nodepool.acceptNodes(request)
nodeset = request.nodeset
executor_nodeset = nodeset.copy()
self.nodepool.acceptNodes(request, executor_nodeset)
for node in nodeset.getNodes():
for node in executor_nodeset.getNodes():
self.assertIsNotNone(node.lock)
self.assertEqual(node.state, 'ready')
# Mark the nodes in use
self.nodepool.useNodeSet(nodeset)
for node in nodeset.getNodes():
self.nodepool.useNodeSet(executor_nodeset)
for node in executor_nodeset.getNodes():
self.assertEqual(node.state, 'in-use')
# Return the nodes
self.nodepool.returnNodeSet(nodeset)
for node in nodeset.getNodes():
self.nodepool.returnNodeSet(executor_nodeset)
for node in executor_nodeset.getNodes():
self.assertIsNone(node.lock)
self.assertEqual(node.state, 'used')
@@ -151,12 +152,16 @@ class TestNodepool(TestNodepoolBase):
self.assertEqual(request.state, 'fulfilled')
# Accept the nodes, passing a different ID
accepted = self.nodepool.checkNodeRequest(request, "invalid")
self.assertFalse(accepted)
new_nodeset = self.nodepool.checkNodeRequest(
request, "invalid", nodeset)
self.assertIsNone(new_nodeset)
# Don't call acceptNodes here as the node request wasn't accepted.
nodeset = request.nodeset
for node in nodeset.getNodes():
# Nothing we have done has returned an updated nodeset with
# real node records, so we need to do that ourselves to verify
# they are still unused.
for node_id, node in zip(request.nodes, nodeset.getNodes()):
self.nodepool.zk_nodepool.updateNode(node, node_id)
self.assertIsNone(node.lock)
self.assertEqual(node.state, 'ready')
@@ -177,10 +182,10 @@ class TestNodepool(TestNodepoolBase):
self.zk_nodepool.deleteNodeRequest(request)
# Accept the nodes
accepted = self.nodepool.checkNodeRequest(request, request.id)
self.assertFalse(accepted)
new_nodeset = self.nodepool.checkNodeRequest(
request, request.id, nodeset)
self.assertIsNone(new_nodeset)
# Don't call acceptNodes here as the node request wasn't accepted.
nodeset = request.nodeset
for node in nodeset.getNodes():
self.assertIsNone(node.lock)
@@ -206,70 +211,6 @@ class TestNodepool(TestNodepoolBase):
self.assertEqual(request2.state, 'fulfilled')
self.assertTrue(request2.state_time < request1.state_time)
def test_get_node_request_with_nodeset(self):
# Test that we are able to deserialize a node request from ZK and
# update the node information while providing a valid NodeSet.
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
# Look up the node request from ZooKeeper while providing the original
# nodeset.
restored_request = self.zk_nodepool.getNodeRequest(request.id, nodeset)
# As we've provided the origial nodeset when retrieving the node
# request from ZooKeeper, they should be the same
self.assertEqual(restored_request.nodeset, nodeset)
# And the nodeset should contain the original data
restored_nodes = restored_request.nodeset.getNodes()
self.assertEqual(restored_nodes[0].name, ['controller', 'foo'])
self.assertEqual(restored_nodes[1].name, ['compute'])
def test_get_node_request_without_nodeset(self):
# Test that we are able to deserialize a node request from ZK and
# update the node information without providing a NodeSet object.
# This is used in case something went wrong when processing the
# NodesProvisionedEvents in the scheduler and the original NodeRequest
# and/or NodeSet objects are not available anymore.
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
# Look up the node request from ZooKeeper while providing no nodeset
# will result in a fake nodeset being created for the node update.
restored_request = self.zk_nodepool.getNodeRequest(request.id)
# As we didn't provide a nodeset, the nodepool client will create a
# fake one to look up the node information from nodepool.
self.assertFalse(nodeset == restored_request.nodeset)
restored_nodes = restored_request.nodeset.getNodes()
self.assertEqual(len(restored_nodes), 2)
self.assertEqual(restored_nodes[0].label, 'ubuntu-xenial')
self.assertEqual(restored_nodes[1].label, 'ubuntu-xenial')
# As the nodes were faked, they don't have the same name like the
# original ones from the config
self.assertEqual(restored_nodes[0].name, "ubuntu-xenial-0")
self.assertEqual(restored_nodes[1].name, "ubuntu-xenial-1")
class TestNodepoolResubmit(TestNodepoolBase):
def setUp(self):
@@ -291,7 +232,8 @@ class TestNodepoolResubmit(TestNodepoolBase):
self.disconnect_event.wait()
self.zk_client.client.stop()
self.zk_client.client.start()
self.nodepool.checkNodeRequest(self.request, self.request.id)
self.nodepool.checkNodeRequest(
self.request, self.request.id, self.nodeset)
def test_node_request_disconnect_late(self):
# Test that node requests are re-submitted after a disconnect
@@ -304,6 +246,7 @@ class TestNodepoolResubmit(TestNodepoolBase):
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
self.nodeset = nodeset
job = model.Job('testjob')
job.nodeset = nodeset
self.request = self.nodepool.requestNodes(

View File

@@ -3336,8 +3336,8 @@ class TestScheduler(ZuulTestCase):
p = self.history[0].parameters
self.assertEqual(p['timeout'], 40)
self.assertEqual(len(p['nodes']), 1)
self.assertEqual(p['nodes'][0]['label'], 'new')
self.assertEqual(len(p['nodeset']['nodes']), 1)
self.assertEqual(p['nodeset']['nodes'][0]['label'], 'new')
self.assertEqual([x['path'] for x in p['pre_playbooks']],
['base-pre', 'py27-pre'])
self.assertEqual([x['path'] for x in p['post_playbooks']],
@@ -3347,8 +3347,8 @@ class TestScheduler(ZuulTestCase):
p = self.history[1].parameters
self.assertEqual(p['timeout'], 50)
self.assertEqual(len(p['nodes']), 1)
self.assertEqual(p['nodes'][0]['label'], 'old')
self.assertEqual(len(p['nodeset']['nodes']), 1)
self.assertEqual(p['nodeset']['nodes'][0]['label'], 'old')
self.assertEqual([x['path'] for x in p['pre_playbooks']],
['base-pre', 'py27-pre', 'py27-diablo-pre'])
self.assertEqual([x['path'] for x in p['post_playbooks']],
@@ -3359,8 +3359,8 @@ class TestScheduler(ZuulTestCase):
p = self.history[2].parameters
self.assertEqual(p['timeout'], 40)
self.assertEqual(len(p['nodes']), 1)
self.assertEqual(p['nodes'][0]['label'], 'new')
self.assertEqual(len(p['nodeset']['nodes']), 1)
self.assertEqual(p['nodeset']['nodes'][0]['label'], 'new')
self.assertEqual([x['path'] for x in p['pre_playbooks']],
['base-pre', 'py27-pre', 'py27-essex-pre'])
self.assertEqual([x['path'] for x in p['post_playbooks']],

View File

@@ -1020,15 +1020,6 @@ class TestWeb(BaseTestWeb):
'projects': [],
'branch': 'master',
'cleanup_playbooks': [],
'groups': [],
'nodes': [{
'comment': None,
'hold_job': None,
'id': None,
'label': 'label1',
'name': ['controller'],
'state': 'unknown'
}],
'nodeset': {
'groups': [],
'name': '',

View File

@@ -61,7 +61,7 @@ class ExecutorClient(object):
job, uuid, nodeset, item.change, dependent_changes)
params = zuul.executor.common.construct_build_params(
uuid, self.sched, nodeset,
uuid, self.sched,
job, item, pipeline, dependent_changes, merger_items,
redact_secrets_and_keys=False)
# TODO: deprecate and remove this variable?
@@ -111,9 +111,10 @@ class ExecutorClient(object):
# availability zone we can get executor_zone from only the first
# node.
executor_zone = None
if params["nodes"] and params["nodes"][0].get('attributes'):
executor_zone = params[
"nodes"][0]['attributes'].get('executor-zone')
if len(nodeset.nodes):
node = nodeset.getNodes()[0]
if node.attributes:
executor_zone = node.attributes.get('executor-zone')
zone_known = False
if executor_zone:

View File

@@ -17,7 +17,7 @@ import os
from zuul.lib import strings
def construct_build_params(uuid, sched, nodeset, job, item, pipeline,
def construct_build_params(uuid, sched, job, item, pipeline,
dependent_changes=[], merger_items=[],
redact_secrets_and_keys=True):
"""Returns a list of all the parameters needed to build a job.
@@ -124,16 +124,7 @@ def construct_build_params(uuid, sched, nodeset, job, item, pipeline,
params['cleanup_playbooks'] = [make_playbook(x)
for x in job.cleanup_run]
# TODO(corvus): Remove nodes and groups since they're included in
# nodeset
nodes = []
for node in nodeset.getNodes():
n = node.toDict()
n.update(dict(name=node.name, label=node.label))
nodes.append(n)
params['nodes'] = nodes
params['groups'] = [group.toDict() for group in nodeset.getGroups()]
params["nodeset"] = nodeset.toDict()
params["nodeset"] = job.nodeset.toDict()
params['ssh_keys'] = []
if pipeline.post_review:
if redact_secrets_and_keys:

View File

@@ -1104,8 +1104,7 @@ class AnsibleJob(object):
if node_request_id:
zk_nodepool = self.executor_server.nodepool.zk_nodepool
self.node_request = zk_nodepool.getNodeRequest(
self.arguments["noderequest_id"], self.nodeset
)
self.arguments["noderequest_id"])
if self.node_request is None:
self.log.error(
@@ -1122,10 +1121,11 @@ class AnsibleJob(object):
if self.node_request:
self.log.debug("Locking nodeset")
try:
self.executor_server.nodepool.acceptNodes(self.node_request)
self.executor_server.nodepool.acceptNodes(
self.node_request, self.nodeset)
except Exception:
self.log.exception(
"Error locking nodeset %s", self.node_request.nodeset
"Error locking nodeset %s", self.nodeset
)
raise NodeRequestError
@@ -1133,13 +1133,13 @@ class AnsibleJob(object):
if self.node_request:
try:
self.executor_server.nodepool.returnNodeSet(
self.node_request.nodeset,
self.nodeset,
self,
zuul_event_id=self.zuul_event_id,
)
except Exception:
self.log.exception(
"Unable to return nodeset %s", self.node_request.nodeset
"Unable to return nodeset %s", self.nodeset
)
def _base_job_data(self):
@@ -1347,9 +1347,7 @@ class AnsibleJob(object):
# start to run tasks on nodes (prepareVars in particular uses
# Ansible to freeze hostvars).
if self.node_request:
self.executor_server.nodepool.useNodeSet(
self.node_request.nodeset, self
)
self.executor_server.nodepool.useNodeSet(self.nodeset, self)
# This prepares each playbook and the roles needed for each.
self.preparePlaybooks(args)
@@ -3786,7 +3784,7 @@ class ExecutorServer(BaseMergeServer):
if request is not None:
self.log.debug("Got autohold %s", request)
self.nodepool.holdNodeSet(
ansible_job.node_request.nodeset, request, ansible_job
ansible_job.nodeset, request, ansible_job
)
return True
return False

View File

@@ -1501,11 +1501,12 @@ class PipelineManager(metaclass=ABCMeta):
repo_state[connection] = event.repo_state[connection]
build_set.repo_state_state = build_set.COMPLETE
def onNodesProvisioned(self, request, build_set):
def onNodesProvisioned(self, request, nodeset, build_set):
# TODOv3(jeblair): handle provisioning failure here
log = get_annotated_logger(self.log, request.event_id)
build_set.jobNodeRequestComplete(request.job_name, request.nodeset)
if nodeset is not None:
build_set.jobNodeRequestComplete(request.job_name, nodeset)
# TODO (felix): Check if the failed is still needed as the
# NodesProvisionedEvents are now in ZooKeeper.
if request.failed or not request.fulfilled:
@@ -1519,7 +1520,7 @@ class PipelineManager(metaclass=ABCMeta):
log.info("Completed node request %s for job %s of item %s "
"with nodes %s",
request, request.job_name, build_set.item, request.nodeset)
request, request.job_name, build_set.item, request.nodes)
def reportItem(self, item):
log = get_annotated_logger(self.log, item.event)

View File

@@ -619,6 +619,7 @@ class Node(ConfigObject):
self.hold_expiration = None
self.resources = None
self.allocated_to = None
self.attributes = {}
@property
def state(self):
@@ -798,14 +799,15 @@ class NodeRequest(object):
"""A request for a set of nodes."""
def __init__(self, requestor, build_set_uuid, tenant_name, pipeline_name,
job_name, nodeset, provider, relative_priority,
job_name, labels, provider, relative_priority,
event_id=None):
self.requestor = requestor
self.build_set_uuid = build_set_uuid
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.job_name = job_name
self.nodeset = nodeset
self.labels = labels
self.nodes = []
self._state = STATE_REQUESTED
self.requested_time = time.time()
self.state_time = time.time()
@@ -826,7 +828,7 @@ class NodeRequest(object):
# Reset the node request for re-submission
self._zk_data = {}
# Remove any real node information
self.nodeset = self.nodeset.copy()
self.nodes = []
self.id = None
self.state = STATE_REQUESTED
self.stat = None
@@ -849,7 +851,7 @@ class NodeRequest(object):
self.state_time = time.time()
def __repr__(self):
return '<NodeRequest %s %s>' % (self.id, self.nodeset)
return '<NodeRequest %s %s>' % (self.id, self.labels)
def toDict(self):
"""
@@ -868,9 +870,7 @@ class NodeRequest(object):
"pipeline_name": self.pipeline_name,
"job_name": self.job_name,
}
nodes = [n.label for n in self.nodeset.getNodes()]
# These are immutable once set
d.setdefault('node_types', nodes)
d.setdefault('node_types', self.labels)
d.setdefault('requestor', self.requestor)
d.setdefault('created_time', self.created_time)
d.setdefault('provider', self.provider)
@@ -893,6 +893,7 @@ class NodeRequest(object):
# to errors at other places where we rely on that info.
if 'tenant_name' in data:
self.tenant_name = data['tenant_name']
self.nodes = data.get('nodes', [])
@classmethod
def fromDict(cls, data):
@@ -910,7 +911,7 @@ class NodeRequest(object):
tenant_name=requestor_data["tenant_name"],
pipeline_name=requestor_data["pipeline_name"],
job_name=requestor_data["job_name"],
nodeset=None,
labels=data["node_types"],
provider=data["provider"],
relative_priority=data.get("relative_priority", 0),
)

View File

@@ -70,13 +70,13 @@ class Nodepool(object):
if dt:
pipe.timing(key, dt)
for node in request.nodeset.getNodes():
pipe.incr(key + '.label.%s' % node.label)
for label in request.labels:
pipe.incr(key + '.label.%s' % label)
if dt:
pipe.timing(key + '.label.%s' % node.label, dt)
pipe.incr(key + '.size.%s' % len(request.nodeset.nodes))
pipe.timing(key + '.label.%s' % label, dt)
pipe.incr(key + '.size.%s' % len(request.labels))
if dt:
pipe.timing(key + '.size.%s' % len(request.nodeset.nodes), dt)
pipe.timing(key + '.size.%s' % len(request.labels), dt)
pipe.send()
def emitStatsResources(self):
@@ -112,19 +112,17 @@ class Nodepool(object):
def requestNodes(self, build_set_uuid, job, tenant_name, pipeline_name,
provider, priority, relative_priority, event=None):
log = get_annotated_logger(self.log, event)
# Create a copy of the nodeset to represent the actual nodes
# returned by nodepool.
nodeset = job.nodeset.copy()
labels = [n.label for n in job.nodeset.getNodes()]
if event:
event_id = event.zuul_event_id
else:
event_id = None
req = model.NodeRequest(self.hostname, build_set_uuid, tenant_name,
pipeline_name, job.name, nodeset, provider,
pipeline_name, job.name, labels, provider,
relative_priority, event_id)
self.requests[req.uid] = req
if nodeset.nodes:
if job.nodeset.nodes:
self.zk_nodepool.submitNodeRequest(
req, priority, self._updateNodeRequest)
# Logged after submission so that we have the request id
@@ -314,7 +312,7 @@ class Nodepool(object):
except Exception:
log.exception("Exception storing node %s "
"while unlocking:", node)
self._unlockNodes(nodeset.getNodes())
self.unlockNodeSet(nodeset)
if not ansible_job:
return
@@ -354,25 +352,26 @@ class Nodepool(object):
except Exception:
self.log.exception("Error unlocking node:")
def lockNodeSet(self, nodeset, request_id):
def lockNodes(self, request, nodeset):
# Try to lock all of the supplied nodes. If any lock fails,
# try to unlock any which have already been locked before
# re-raising the error.
locked_nodes = []
try:
for node in nodeset.getNodes():
if node.allocated_to != request_id:
for node_id, node in zip(request.nodes, nodeset.getNodes()):
self.zk_nodepool.updateNode(node, node_id)
if node.allocated_to != request.id:
raise Exception("Node %s allocated to %s, not %s" %
(node.id, node.allocated_to, request_id))
(node.id, node.allocated_to, request.id))
self.log.debug("Locking node %s" % (node,))
self.zk_nodepool.lockNode(node, timeout=30)
# Check the allocated_to again to ensure that nodepool didn't
# re-allocate the nodes to a different node request while we
# were locking them.
if node.allocated_to != request_id:
if node.allocated_to != request.id:
raise Exception(
"Node %s was reallocated during locking %s, not %s" %
(node.id, node.allocated_to, request_id))
(node.id, node.allocated_to, request.id))
locked_nodes.append(node)
except Exception:
self.log.exception("Error locking nodes:")
@@ -420,34 +419,35 @@ class Nodepool(object):
return True
def checkNodeRequest(self, request, request_id):
def checkNodeRequest(self, request, request_id, job_nodeset):
"""
Called by the scheduler when it wants to accept a node request for
potential use of its nodes. The nodes itself will be accepted and
locked by the executor when the corresponding job is started.
:returns: False if there is a problem with the request (canceled or
retrying), True if it is ready to be acted upon (success or
failure).
:returns: A new NodeSet object which contains information from
nodepool about the actual allocated nodes.
"""
log = get_annotated_logger(self.log, request.event_id)
log.info("Accepting node request %s", request)
# A copy of the nodeset with information about the real nodes
nodeset = job_nodeset.copy()
if request_id != request.id:
log.info("Skipping node accept for %s (resubmitted as %s)",
request_id, request.id)
return False
return None
if request.canceled:
log.info("Ignoring canceled node request %s", request)
# The request was already deleted when it was canceled
return False
return None
# If we didn't request nodes and the request is fulfilled then just
# reutrn. We don't have to do anything in this case. Further don't even
# ask ZK for the request as empty requests are not put into ZK.
if not request.nodeset.nodes and request.fulfilled:
return True
if not request.labels and request.fulfilled:
return nodeset
# Make sure the request still exists. It's possible it could have
# disappeared if we lost the ZK session between when the fulfillment
@@ -464,7 +464,10 @@ class Nodepool(object):
request.reset()
self.zk_nodepool.submitNodeRequest(
request, priority, self._updateNodeRequest)
return False
return None
else:
for node_id, node in zip(request.nodes, nodeset.getNodes()):
self.zk_nodepool.updateNode(node, node_id)
except Exception:
# If we cannot retrieve the node request from ZK we probably lost
# the connection and thus the ZK session. Resubmitting the node
@@ -473,16 +476,18 @@ class Nodepool(object):
# with zookeeper and fail here.
log.exception("Error getting node request %s:", request_id)
request.failed = True
return True
return nodeset
return True
return nodeset
def acceptNodes(self, request):
def acceptNodes(self, request, nodeset):
# Accept the nodes supplied by request, mutate nodeset with
# the real node information.
locked = False
if request.fulfilled:
# If the request succeeded, try to lock the nodes.
try:
self.lockNodeSet(request.nodeset, request.id)
nodes = self.lockNodes(request, nodeset)
locked = True
except Exception:
log = get_annotated_logger(self.log, request.event_id)
@@ -495,6 +500,7 @@ class Nodepool(object):
if request.failed:
raise Exception("Accepting nodes failed")
return nodes
def deleteNodeRequest(self, request, locked=False):
log = get_annotated_logger(self.log, request.event_id)

View File

@@ -540,13 +540,10 @@ class RPCListener(RPCListenerBase):
if not job:
gear_job.sendWorkComplete(json.dumps(None))
return
# TODO: check if this is frozen?
nodeset = job.nodeset
job.setBase(tenant.layout)
uuid = '0' * 32
params = zuul.executor.common.construct_build_params(
uuid, self.sched, nodeset,
job, item, pipeline)
uuid, self.sched, job, item, pipeline)
gear_job.sendWorkComplete(json.dumps(params, cls=ZuulJSONEncoder))
def handle_allowed_labels_get(self, job):

View File

@@ -2049,26 +2049,26 @@ class Scheduler(threading.Thread):
if not request:
return
ready = self.nodepool.checkNodeRequest(request, request_id)
if not ready:
log = get_annotated_logger(self.log, request.event_id)
job = build_set.item.getJob(request.job_name)
if job is None:
log.warning("Item %s does not contain job %s "
"for node request %s",
build_set.item, request.job_name, request)
build_set.removeJobNodeRequest(request.job_name)
return
log = get_annotated_logger(self.log, request.event_id)
nodeset = self.nodepool.checkNodeRequest(request, request_id,
job.nodeset)
if nodeset is None:
return
# If the request failed, we must directly delete it as the nodes will
# never be accepted.
if request.state == STATE_FAILED:
self.nodepool.deleteNodeRequest(request)
if request.job_name not in [x.name for x in build_set.item.getJobs()]:
log.warning("Item %s does not contain job %s "
"for node request %s",
build_set.item, request.job_name, request)
build_set.removeJobNodeRequest(request.job_name)
self.nodepool.deleteNodeRequest(request)
return
pipeline.manager.onNodesProvisioned(request, build_set)
pipeline.manager.onNodesProvisioned(request, nodeset, build_set)
def formatStatusJSON(self, tenant_name):
# TODOv3(jeblair): use tenants

View File

@@ -19,7 +19,7 @@ from kazoo.exceptions import NoNodeError, LockTimeout
from kazoo.recipe.lock import Lock
import zuul.model
from zuul.model import HoldRequest, Node, NodeRequest, NodeSet
from zuul.model import HoldRequest, NodeRequest
from zuul.zk import ZooKeeperClient, ZooKeeperBase
from zuul.zk.exceptions import LockException
@@ -343,14 +343,11 @@ class ZooKeeperNodepool(ZooKeeperBase):
self.kazoo_client.DataWatch(path, callback)
def getNodeRequest(self, node_request_id, nodeset=None):
def getNodeRequest(self, node_request_id):
"""
Retrieve a NodeRequest from a given path in ZooKeeper.
The serialized version of the NodeRequest doesn't contain a NodeSet, so
we have to add this to the request manually. The nodeset provided to
this method will be set on the NodeRequest before updating it. This
will ensure that all nodes are updated as well.
:param str node_request_id: The ID of the node request to retrieve.
"""
path = f"{self.REQUEST_ROOT}/{node_request_id}"
@@ -365,28 +362,6 @@ class ZooKeeperNodepool(ZooKeeperBase):
json_data = json.loads(data.decode("utf-8"))
obj = NodeRequest.fromDict(json_data)
if nodeset is None:
# If no NodeSet is provided, we create one on-the-fly based on the
# list of labels (node_types) stored in the NodeRequest's znode
# data.
# This is necessary as the logic in the updateNodeRequest() method
# below will update each node "in-place" an thus, we have to ensure
# that the NodeRequest has a valid NodeSet with all nodes available
# in advance.
# This is only used to return the nodes to nodepool in case
# something went wrong and the original NodeRequest and/or NodeSet
# objects are not available anymore.
nodeset = NodeSet()
for i, node_type in enumerate(json_data["node_types"]):
node = Node(name=f"{node_type}-{i}", label=node_type)
nodeset.addNode(node)
obj.nodeset = nodeset
# Using updateNodeRequest() here will ensure that the nodes are also
# updated. Doing the update in here directly rather than calling it
# from the outside afterwards, saves us one call to ZooKeeper as the
# data we just retrieved can directly be reused.
self.updateNodeRequest(obj, data)
obj.id = node_request_id
obj.stat = stat
return obj
@@ -441,10 +416,6 @@ class ZooKeeperNodepool(ZooKeeperBase):
path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
data, stat = self.kazoo_client.get(path)
data = json.loads(data.decode('utf8'))
request_nodes = list(node_request.nodeset.getNodes())
for i, nodeid in enumerate(data.get('nodes', [])):
request_nodes[i].id = nodeid
self._updateNode(request_nodes[i])
node_request.updateFromDict(data)
def storeNode(self, node):
@@ -459,13 +430,18 @@ class ZooKeeperNodepool(ZooKeeperBase):
path = '%s/%s' % (self.NODES_ROOT, node.id)
self.kazoo_client.set(path, json.dumps(node.toDict()).encode('utf8'))
def _updateNode(self, node):
def updateNode(self, node, node_id):
"""
Refresh an existing node.
:param Node node: The node to update.
:param str node_id: The ID of the node to update. The
existing node.id attribute (if any) will be ignored
and replaced with this.
"""
node_path = '%s/%s' % (self.NODES_ROOT, node.id)
node_path = '%s/%s' % (self.NODES_ROOT, node_id)
node.id = node_id
node_data, node_stat = self.kazoo_client.get(node_path)
node_data = json.loads(node_data.decode('utf8'))
node.updateFromDict(node_data)