Switch to ZooKeeper backed NodesProvisionedEvents

This puts the NodesProvisionedEvents into ZooKeeper. With this, all
result events are now in ZooKeeper.

To make the NodesProvisionedEvent serializable, we cannot store the
whole NodeRequest object in the event anymore. Instead we are using the
request.id, job name and the buildset UUID to look up the corresponding
NodeRequest object from the buildset when the NodesProvisionedEvent is
handled.

As a result of this we have to find a way to return the NodeSet even in
case the NodeRequest or the buildset cannot be found anymore (e.g. due
to a gate reset). In this case we look up the NodeRequest directly from
ZooKeeper and provide a faked NodeSet which allows us to retrieve the
node information from Zookeeper (via the update mechanism of the
NodeRequest in the Nodepool client).

Finally, we can get rid of the local result event queue in the scheduler
as now all result events are in ZooKeeper. Together with that the
`zuul.scheduler.eventqueues.result` gauge was also removed.

Change-Id: Ib5e0f13d25a21ebad908d38f0201e92b704a1c85
This commit is contained in:
Felix Edel
2021-07-05 13:58:01 +02:00
committed by James E. Blair
parent 95ee8e8150
commit 8f8f4f1898
10 changed files with 139 additions and 80 deletions

View File

@@ -474,11 +474,6 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
Holds metrics about the event queue lengths in the Zuul scheduler.
.. stat:: result
:type: gauge
The size of the current result event queue.
.. stat:: management
:type: gauge

View File

@@ -0,0 +1,3 @@
---
upgrade:
- The `zuul.scheduler.eventqueues.result` gauge was removed

View File

@@ -4176,7 +4176,6 @@ class SchedulerTestApp:
# longer use global management events.
self.event_queues = [
self.sched.reconfigure_event_queue,
self.sched.result_event_queue,
]
def start(self, validate_tenants: list):

View File

@@ -202,3 +202,67 @@ class TestNodepool(BaseTestCase):
self.assertEqual(request1.state, 'fulfilled')
self.assertEqual(request2.state, 'fulfilled')
self.assertTrue(request2.state_time < request1.state_time)
def test_get_node_request_with_nodeset(self):
# Test that we are able to deserialize a node request from ZK and
# update the node information while providing a valid NodeSet.
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
# Look up the node request from ZooKeeper while providing the original
# nodeset.
restored_request = self.zk_nodepool.getNodeRequest(request.id, nodeset)
# As we've provided the origial nodeset when retrieving the node
# request from ZooKeeper, they should be the same
self.assertEqual(restored_request.nodeset, nodeset)
# And the nodeset should contain the original data
restored_nodes = restored_request.nodeset.getNodes()
self.assertEqual(restored_nodes[0].name, ['controller', 'foo'])
self.assertEqual(restored_nodes[1].name, ['compute'])
def test_get_node_request_without_nodeset(self):
# Test that we are able to deserialize a node request from ZK and
# update the node information without providing a NodeSet object.
# This is used in case something went wrong when processing the
# NodesProvisionedEvents in the scheduler and the original NodeRequest
# and/or NodeSet objects are not available anymore.
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
# Look up the node request from ZooKeeper while providing no nodeset
# will result in a fake nodeset being created for the node update.
restored_request = self.zk_nodepool.getNodeRequest(request.id)
# As we didn't provide a nodeset, the nodepool client will create a
# fake one to look up the node information from nodepool.
self.assertFalse(nodeset == restored_request.nodeset)
restored_nodes = restored_request.nodeset.getNodes()
self.assertEqual(len(restored_nodes), 2)
self.assertEqual(restored_nodes[0].label, 'ubuntu-xenial')
self.assertEqual(restored_nodes[1].label, 'ubuntu-xenial')
# As the nodes were faked, they don't have the same name like the
# original ones from the config
self.assertEqual(restored_nodes[0].name, "ubuntu-xenial-0")
self.assertEqual(restored_nodes[1].name, "ubuntu-xenial-1")

View File

@@ -308,9 +308,6 @@ paths:
items:
$ref: '#/components/schemas/pipelineStatus'
type: array
result_event_queue:
description: The number of completed events
type: integer
trigger_event_queue:
description: The number of running events
type: integer

View File

@@ -171,10 +171,7 @@ class StatusPage extends React.Component {
}</span> events,&nbsp;
<span>{status.management_event_queue ?
status.management_event_queue.length : '0'
}</span> management events,&nbsp;
<span>{status.result_event_queue ?
status.result_event_queue.length : '0'
}</span> results.
}</span> management events.
</p>
)
}

View File

@@ -1482,12 +1482,13 @@ class PipelineManager(metaclass=ABCMeta):
repo_state[connection] = event.repo_state[connection]
build_set.repo_state_state = build_set.COMPLETE
def onNodesProvisioned(self, event, build_set):
def onNodesProvisioned(self, request, build_set):
# TODOv3(jeblair): handle provisioning failure here
request = event.request
log = get_annotated_logger(self.log, request.event_id)
build_set.jobNodeRequestComplete(request.job_name, request.nodeset)
# TODO (felix): Check if the failed is still needed as the
# NodesProvisionedEvents are now in ZooKeeper.
if request.failed or not request.fulfilled:
log.info("Node request %s: failure for %s",
request, request.job_name)

View File

@@ -871,6 +871,7 @@ class NodeRequest(object):
self._state = data['state']
self.state_time = data['state_time']
self.relative_priority = data.get('relative_priority', 0)
self.event_id = data['event_id']
@classmethod
def fromDict(cls, data):
@@ -4096,20 +4097,26 @@ class NodesProvisionedEvent(ResultEvent):
:arg NodeRequest request: The fulfilled node request.
"""
def __init__(self, request):
self.request = request
self.build_set_uuid = request.build_set_uuid
self.request_id = request.id
def __init__(self, request_id, job_name, build_set_uuid):
self.request_id = request_id
# We have to use the job_name to look up empty node requests from the
# buildset (as empty node requests don't have an id).
self.job_name = job_name
self.build_set_uuid = build_set_uuid
def toDict(self):
return {
"request": self.request,
"request_id": self.request_id,
"job_name": self.job_name,
"build_set_uuid": self.build_set_uuid,
}
@classmethod
def fromDict(cls, data):
return cls(
data.get("request"),
data.get("request_id"),
data.get("job_name"),
data.get("build_set_uuid"),
)

View File

@@ -163,7 +163,6 @@ class Scheduler(threading.Thread):
# TODO (swestphahl): Remove after we've refactored reconfigurations
# to be performed on the tenant level.
self.reconfigure_event_queue = NamedQueue("ReconfigureEventQueue")
self.result_event_queue = NamedQueue("ResultEventQueue")
self.event_watcher = EventWatcher(
self.zk_client, self.wake_event.set
)
@@ -405,8 +404,6 @@ class Scheduler(threading.Thread):
merge_running += running
self.statsd.gauge('zuul.mergers.jobs_running', merge_running)
self.statsd.gauge('zuul.mergers.jobs_queued', merge_queue)
self.statsd.gauge('zuul.scheduler.eventqueues.result',
self.result_event_queue.qsize())
self.statsd.gauge('zuul.scheduler.eventqueues.management',
self.reconfigure_event_queue.qsize())
base = 'zuul.scheduler.eventqueues.connection'
@@ -588,9 +585,10 @@ class Scheduler(threading.Thread):
self.pipeline_result_events[tenant_name][pipeline_name].put(event)
def onNodesProvisioned(self, req):
event = NodesProvisionedEvent(req)
self.result_event_queue.put(event)
self.wake_event.set()
tenant_name = req.tenant_name
pipeline_name = req.pipeline_name
event = NodesProvisionedEvent(req.id, req.job_name, req.build_set_uuid)
self.pipeline_result_events[tenant_name][pipeline_name].put(event)
def reconfigureTenant(self, tenant, project, event):
self.log.debug("Submitting tenant reconfiguration event for "
@@ -1344,11 +1342,6 @@ class Scheduler(threading.Thread):
if not self._stopped:
self.process_reconfigure_queue()
# TODO(swestphahl): Remove legacy result event queue after
# NodesProvisionedEvents are dispatched via Zookeeper.
if not self._stopped:
self.process_result_queue()
# Process tenant management events separate from other events
# as they might reload the tenant.
for tenant in self.abide.tenants.values():
@@ -1622,21 +1615,6 @@ class Scheduler(threading.Thread):
pipeline.name
].ack(event)
def process_result_queue(self):
# TODO (felix): The old result event queue is still used for the nodes
# provisioned results and will be removed once we move those to ZK as
# well.
while not self.result_event_queue.empty() and not self._stopped:
self.log.debug("Fetching result event")
event = self.result_event_queue.get()
try:
if isinstance(event, NodesProvisionedEvent):
self._doNodesProvisionedEvent(event)
else:
self.log.error("Unable to handle event %s", event)
finally:
self.result_event_queue.task_done()
def _process_result_event(self, event, pipeline):
if isinstance(event, BuildStartedEvent):
self._doBuildStartedEvent(event)
@@ -1650,6 +1628,8 @@ class Scheduler(threading.Thread):
self._doMergeCompletedEvent(event, pipeline)
elif isinstance(event, FilesChangesCompletedEvent):
self._doFilesChangesCompletedEvent(event, pipeline)
elif isinstance(event, NodesProvisionedEvent):
self._doNodesProvisionedEvent(event, pipeline)
else:
self.log.error("Unable to handle event %s", event)
@@ -1839,40 +1819,41 @@ class Scheduler(threading.Thread):
return
pipeline.manager.onFilesChangesCompleted(event, build_set)
def _doNodesProvisionedEvent(self, event):
request = event.request
def _doNodesProvisionedEvent(self, event, pipeline):
request_id = event.request_id
tenant_name = request.tenant_name
pipeline_name = request.pipeline_name
log = get_annotated_logger(self.log, request.event_id)
# Look up the buildset to access the local node request object
build_set = self._getBuildSetFromPipeline(event, pipeline)
if not build_set:
# Directly look up the node request in ZK and provide a dummy
# nodeset, so we can return the nodes to nodepool.
request = self.zk_nodepool.getNodeRequest(request_id)
if request.fulfilled:
self.nodepool.returnNodeSet(request.nodeset,
zuul_event_id=request.event_id)
return
request = build_set.getJobNodeRequest(event.job_name)
if not request:
# Directly look up the node request in ZK and provide a dummy
# nodeset, so we can return the nodes to nodepool.
request = self.zk_nodepool.getNodeRequest(request_id)
if request.fulfilled:
self.nodepool.returnNodeSet(request.nodeset,
zuul_event_id=request.event_id)
return
ready = self.nodepool.checkNodeRequest(request, request_id)
if not ready:
return
log = get_annotated_logger(self.log, request.event_id)
# If the request failed, we must directly delete it as the nodes will
# never be accepted.
if request.state == STATE_FAILED:
self.nodepool.deleteNodeRequest(request)
# Look up the pipeline by its name
# TODO (felix): The pipeline lookup can be removed once the
# NodesProvisionedEvents are in ZooKeeper.
pipeline = None
tenant = self.abide.tenants.get(tenant_name)
for pl in tenant.layout.pipelines.values():
if pl.name == pipeline_name:
pipeline = pl
break
build_set = self._getBuildSetFromPipeline(event, pipeline)
if not build_set:
if request.fulfilled:
self.nodepool.returnNodeSet(request.nodeset,
zuul_event_id=request.event_id)
return
if request.job_name not in [x.name for x in build_set.item.getJobs()]:
log.warning("Item %s does not contain job %s "
"for node request %s",
@@ -1883,7 +1864,7 @@ class Scheduler(threading.Thread):
zuul_event_id=request.event_id)
return
pipeline.manager.onNodesProvisioned(event, build_set)
pipeline.manager.onNodesProvisioned(request, build_set)
def formatStatusJSON(self, tenant_name):
# TODOv3(jeblair): use tenants
@@ -1895,9 +1876,6 @@ class Scheduler(threading.Thread):
data['trigger_event_queue'] = {}
data['trigger_event_queue']['length'] = len(
self.trigger_events[tenant_name])
data['result_event_queue'] = {}
data['result_event_queue']['length'] = \
self.result_event_queue.qsize()
data['management_event_queue'] = {}
data['management_event_queue']['length'] = len(
self.management_events[tenant_name]

View File

@@ -21,7 +21,7 @@ from kazoo.recipe.cache import TreeEvent
from kazoo.recipe.lock import Lock
import zuul.model
from zuul.model import HoldRequest, NodeRequest
from zuul.model import HoldRequest, Node, NodeRequest, NodeSet
from zuul.zk import ZooKeeperClient, ZooKeeperBase
from zuul.zk.exceptions import LockException
@@ -400,14 +400,14 @@ class ZooKeeperNodepool(ZooKeeperBase):
self.kazoo_client.DataWatch(path, callback)
def getNodeRequest(self, node_request_id, nodeset):
def getNodeRequest(self, node_request_id, nodeset=None):
"""
Retrieve a NodeRequest from a given path in ZooKeeper.
The nodeset provided to this method will be set on the NodeRequest
before updating it. This will ensure that all nodes are updated as
well.
The serialized version of the NodeRequest doesn't contain a NodeSet, so
we have to add this to the request manually. The nodeset provided to
this method will be set on the NodeRequest before updating it. This
will ensure that all nodes are updated as well.
"""
path = f"{self.REQUEST_ROOT}/{node_request_id}"
@@ -419,7 +419,25 @@ class ZooKeeperNodepool(ZooKeeperBase):
if not data:
return None
obj = NodeRequest.fromDict(json.loads(data.decode("utf-8")))
json_data = json.loads(data.decode("utf-8"))
obj = NodeRequest.fromDict(json_data)
if nodeset is None:
# If no NodeSet is provided, we create one on-the-fly based on the
# list of labels (node_types) stored in the NodeRequest's znode
# data.
# This is necessary as the logic in the updateNodeRequest() method
# below will update each node "in-place" an thus, we have to ensure
# that the NodeRequest has a valid NodeSet with all nodes available
# in advance.
# This is only used to return the nodes to nodepool in case
# something went wrong and the original NodeRequest and/or NodeSet
# objects are not available anymore.
nodeset = NodeSet()
for i, node_type in enumerate(json_data["node_types"]):
node = Node(name=f"{node_type}-{i}", label=node_type)
nodeset.addNode(node)
obj.nodeset = nodeset
# Using updateNodeRequest() here will ensure that the nodes are also
# updated. Doing the update in here directly rather than calling it