Implement launcher relative priority handling

This change adds support for revising the relative priority of nodeset
requests similar to how this is already possible for Nodepool.

Similar to build/merge requests we are not allowed to modify the nodeset
request after it has been created. To still allow revision of the
relative priority we will create a 'revision' subnode, which contains
the updated relative priority of the request.

The treecache was updated to handle the 'revision' subnode and update
the nodeset request accordingly.

More extensive tests that test the pipeline manager adjusting the
relative priority are added in a follow-up change to make review easier.

Change-Id: I8b154cd3626b1e0251c142e37b0a2e2ed7e9c0d5
This commit is contained in:
Simon Westphahl
2025-03-07 14:49:20 +01:00
parent 54964b0386
commit 2dc9ec644d
9 changed files with 334 additions and 36 deletions

View File

@ -17,6 +17,7 @@ import math
import os
import textwrap
import time
import uuid
from collections import defaultdict
from unittest import mock
@ -1034,6 +1035,58 @@ class TestLauncher(LauncherBaseTestCase):
except NoNodeError:
break
@simple_layout('layouts/nodepool-multi-provider.yaml',
enable_nodepool=True)
@driver_config('test_launcher', quotas={
'instances': 1,
})
def test_relative_priority(self):
# Test that we spread quota use out among multiple providers
self.waitUntilSettled()
client = LauncherClient(self.zk_client, None)
# Create a request so the following requests can't be fulfilled
# due to instance quota.
request0 = self.requestNodes(["debian-normal"])
nodes0 = self.getNodes(request0)
self.assertEqual(1, len(nodes0))
requests = []
ctx = self.createZKContext(None)
for _ in range(2):
request = model.NodesetRequest.new(
ctx,
tenant_name="tenant-one",
pipeline_name="test",
buildset_uuid=uuid.uuid4().hex,
job_uuid=uuid.uuid4().hex,
job_name="foobar",
labels=["debian-normal"],
priority=100,
request_time=time.time(),
zuul_event_id=uuid.uuid4().hex,
span_info=None,
)
requests.append(request)
# Revise relative priority, so that the last requests has
# the highest relative priority.
request1_p2, request2_p1 = requests
client.reviseRequest(request1_p2, relative_priority=2)
client.reviseRequest(request2_p1, relative_priority=1)
# Delete the initial request to free up the instance
request0.delete(ctx)
# Last request should be fulfilled
for _ in iterate_timeout(10, "request to be fulfilled"):
request2_p1.refresh(ctx)
if request2_p1.state == request2_p1.State.FULFILLED:
break
# Lower priority request should not be fulfilled
request1_p2.refresh(ctx)
self.assertEqual(request1_p2.State.ACCEPTED, request1_p2.state)
class TestMinReadyLauncher(LauncherBaseTestCase):
tenant_config_file = "config/launcher-min-ready/main.yaml"

View File

@ -30,11 +30,12 @@ from kazoo.protocol.states import KazooState
class SimpleTreeCacheObject:
def __init__(self, key, data, zstat):
def __init__(self, root, key, data, zstat):
self.key = key
self.data = json.loads(data)
self._zstat = zstat
self.path = '/'.join(key)
self.path = '/'.join((root.rstrip("/"), *key))
self.children = {}
def _updateFromRaw(self, data, zstat, context=None):
self.data = json.loads(data)
@ -43,13 +44,29 @@ class SimpleTreeCacheObject:
class SimpleTreeCache(ZuulTreeCache):
def objectFromRaw(self, key, data, zstat):
return SimpleTreeCacheObject(key, data, zstat)
return SimpleTreeCacheObject(self.root, key, data, zstat)
def updateFromRaw(self, obj, key, data, zstat):
obj._updateFromRaw(data, zstat, None)
def parsePath(self, path):
return tuple(path.split('/'))
object_path = path[len(self.root):].strip("/")
parts = object_path.split('/')
if not parts:
return None
return tuple(parts)
class SimpleSubnodeTreeCache(SimpleTreeCache):
def preCacheHook(self, event, exists, data=None, stat=None):
parts = self.parsePath(event.path)
if len(parts) > 1:
cache_key = (parts[0],)
if exists:
self._cached_objects[cache_key].children[parts] = data
else:
self._cached_objects[cache_key].children.pop(parts)
return self.STOP_OBJECT_UPDATE
class TestTreeCache(BaseTestCase):
@ -123,7 +140,7 @@ class TestTreeCache(BaseTestCase):
'/test/foo': {},
})
# Simulate a change happening while the state was suspendede
# Simulate a change happening while the state was suspended
cache._cached_paths.add('/test/bar')
cache._sessionListener(KazooState.SUSPENDED)
cache._sessionListener(KazooState.CONNECTED)
@ -151,6 +168,53 @@ class TestTreeCache(BaseTestCase):
'/foo' in object_paths):
break
def test_tree_cache_subnode(self):
client = self.zk_client.client
data = b'{}'
client.create('/test', data)
client.create('/test/foo', data)
cache = SimpleSubnodeTreeCache(self.zk_client, "/test")
self.waitForCache(cache, {
'/test/foo': {},
})
foo = cache._cached_objects[('foo',)]
# Subnode that is processed and cached as part of foo
oof_data = b'{"value":1}'
oof_key = ('foo', 'oof')
client.create('/test/foo/oof', oof_data)
for _ in iterate_timeout(10, 'cache to sync'):
if foo.children.get(oof_key) == oof_data:
break
self.assertEqual(cache._cached_paths, {'/test/foo', '/test/foo/oof'})
# Simulate a change happening while the state was suspended
foo.children[oof_key] = b"outdated"
cache._sessionListener(KazooState.SUSPENDED)
cache._sessionListener(KazooState.CONNECTED)
for _ in iterate_timeout(10, 'cache to sync'):
if foo.children[oof_key] == oof_data:
break
# Simulate a change happening while the state was lost
cache._cached_paths.add('/test/foo/bar')
bar_key = ('foo', 'bar')
foo.children[bar_key] = b"deleted"
cache._sessionListener(KazooState.LOST)
cache._sessionListener(KazooState.CONNECTED)
for _ in iterate_timeout(10, 'cache to sync'):
if bar_key not in foo.children:
break
self.assertEqual(cache._cached_paths, {'/test/foo', '/test/foo/oof'})
# Recursively delete foo and make sure the cache is empty afterwards
client.delete("/test/foo", recursive=True)
self.waitForCache(cache, {})
self.assertEqual(cache._cached_paths, set())
self.assertEqual(cache._cached_objects, {})
def test_tree_cache_qsize_warning(self):
with self.assertLogs('zuul.zk.ZooKeeper', level='DEBUG') as logs:
cache = SimpleTreeCache(self.zk_client, "/test")

View File

@ -2856,6 +2856,53 @@ class TestLauncherApi(ZooKeeperBaseTestCase):
if not provider_nodes:
break
def test_nodeset_request_revision(self):
labels = ["foo", "bar"]
context = ZKContext(self.zk_client, None, None, self.log)
request = model.NodesetRequest.new(
context,
tenant_name="tenant",
pipeline_name="check",
buildset_uuid=uuid.uuid4().hex,
job_uuid=uuid.uuid4().hex,
job_name="foobar",
labels=labels,
priority=100,
request_time=time.time(),
zuul_event_id=None,
span_info=None,
_relative_priority=100,
)
self.assertEqual(100, request.relative_priority)
# Wait for request to show up in the cache
for _ in iterate_timeout(10, "request to show up"):
request_list = self.api.getNodesetRequests()
if len(request_list):
break
self.assertEqual(len(request_list), 1)
for req in request_list:
cached_request = self.api.getNodesetRequest(req.uuid)
self.assertEqual(100, request.relative_priority)
model.NodesetRequestRevision.new(
context, request=request, relative_priority=50)
for _ in iterate_timeout(10, "revision to be applied"):
if cached_request.relative_priority == 50:
break
# Relative priority should still be the initial value
self.assertEqual(100, request.relative_priority)
# Refresh request inc. revision
request.refresh(context)
self.assertEqual(50, request.relative_priority)
# Update revision
request.revise(context, relative_priority=10)
self.assertEqual(10, request.relative_priority)
def test_node_quota_cache(self):
context = ZKContext(self.zk_client, None, None, self.log)

View File

@ -39,7 +39,8 @@ class LauncherClient:
self.zk_client = zk_client
self.stop_event = stop_event
def requestNodeset(self, item, job, priority, preferred_provider):
def requestNodeset(self, item, job, priority, relative_priority,
preferred_provider):
log = get_annotated_logger(self.log, item.event)
labels = [n.label for n in job.nodeset.getNodes()]
@ -62,7 +63,7 @@ class LauncherClient:
job_name=job.name,
labels=labels,
priority=priority,
# relative_priority,
_relative_priority=relative_priority,
request_time=request_time,
zuul_event_id=item.event.zuul_event_id,
span_info=span_info,
@ -70,6 +71,16 @@ class LauncherClient:
log.info("Submitted nodeset request %s", request)
return request
def reviseRequest(self, request, relative_priority):
log = get_annotated_logger(self.log, request)
try:
with self.createZKContext(None, self.log) as ctx:
request.revise(ctx, relative_priority=relative_priority)
log.info("Revised nodeset request %s; relative_priority=%s",
request, relative_priority)
except NoNodeError:
pass
def getRequest(self, request_id):
try:
with self.createZKContext(None, self.log) as ctx:

View File

@ -1186,7 +1186,8 @@ class PipelineManager(metaclass=ABCMeta):
provider = self._getPausedParentProvider(build_set, job)
priority = self._calculateNodeRequestPriority(build_set, job)
item = build_set.item
req = self.sched.launcher.requestNodeset(item, job, priority, provider)
req = self.sched.launcher.requestNodeset(
item, job, priority, relative_priority, provider)
log.debug("Adding nodeset request %s for job %s to item %s",
req, job, item)
build_set.setJobNodeRequestID(job, req.uuid)
@ -1991,24 +1992,40 @@ class PipelineManager(metaclass=ABCMeta):
(item, failing_reasons))
if (item.live and not dequeued
and self.sched.globals.use_relative_priority):
for job, request_id in \
item.current_build_set.getNodeRequests():
node_request = self.sched.nodepool.zk_nodepool.getNodeRequest(
request_id, cached=True)
if not node_request:
continue
if node_request.state != model.STATE_REQUESTED:
# If the node request was locked and accepted by a
# provider, we can no longer update the relative priority.
continue
priority = self.getNodePriority(
item,
item.getChangeForJob(job))
if node_request.relative_priority != priority:
self.sched.nodepool.reviseRequest(
node_request, priority)
for job, request_id in item.current_build_set.getNodeRequests():
if self.sched.nodepool.isNodeRequestID(request_id):
self._reviseNodeRequest(request_id, item, job)
else:
self._reviseNodesetRequest(request_id, item, job)
return (changed, nnfi)
def _reviseNodeRequest(self, request_id, item, job):
node_request = self.sched.nodepool.zk_nodepool.getNodeRequest(
request_id, cached=True)
if not node_request:
return
if node_request.state != model.STATE_REQUESTED:
# If the node request was locked and accepted by a
# provider, we can no longer update the relative priority.
return
priority = self.getNodePriority(
item, item.getChangeForJob(job))
if node_request.relative_priority != priority:
self.sched.nodepool.reviseRequest(node_request, priority)
def _reviseNodesetRequest(self, request_id, item, job):
request = self.sched.launcher.getRequest(request_id)
if not request:
return
if request.state not in request.REVISE_STATES:
# If the nodeset request was accepted by a launcher,
# we can no longer update the relative priority.
return
relative_priority = self.getNodePriority(
item, item.getChangeForJob(job))
if request.relative_priority != relative_priority:
self.sched.launcher.reviseRequest(request, relative_priority)
def processQueue(self, tenant_lock):
# Do whatever needs to be done for each change in the queue
self.log.debug("Starting queue processor: %s" % self.pipeline.name)

View File

@ -2417,6 +2417,7 @@ class NodesetInfo:
)
@total_ordering
class NodesetRequest(zkobject.LockableZKObject):
class State(StrEnum):
@ -2432,12 +2433,20 @@ class NodesetRequest(zkobject.LockableZKObject):
State.FAILED,
)
REVISE_STATES = (
State.REQUESTED,
State.ACCEPTED,
State.TEST_HOLD,
)
ROOT = "/zuul/nodeset"
REQUESTS_PATH = "requests"
LOCKS_PATH = "locks"
def __init__(self):
super().__init__()
revision = NodesetRequestRevision()
revision._set(request=self)
self._set(
uuid=uuid4().hex,
state=self.State.REQUESTED,
@ -2451,15 +2460,33 @@ class NodesetRequest(zkobject.LockableZKObject):
request_time=time.time(),
zuul_event_id="",
span_info=None,
# Revisable attributes
_relative_priority=0,
# A dict of info about the node we have assigned to each label
provider_node_data=[],
# Attributes that are not serialized
lock=None,
is_locked=False,
_revision=revision,
# Attributes set by the launcher
_lscores=None,
)
@property
def relative_priority(self):
if self._revision.getZKVersion() is not None:
return self._revision.relative_priority
return self._relative_priority
def __lt__(self, other):
return (
(self.priority, self.relative_priority, self.request_time)
< (other.priority, other.relative_priority, other.request_time)
)
def revise(self, ctx, relative_priority):
self._revision.force(ctx, relative_priority=relative_priority)
def addProviderNode(self, provider_node):
self.provider_node_data.append(dict(
uuid=provider_node.uuid,
@ -2493,7 +2520,11 @@ class NodesetRequest(zkobject.LockableZKObject):
return self.request_time
def getPath(self):
return f"{self.ROOT}/{self.REQUESTS_PATH}/{self.uuid}"
return self._getPath(self.uuid)
@classmethod
def _getPath(cls, request_id):
return f"{cls.ROOT}/{cls.REQUESTS_PATH}/{request_id}"
def getLockPath(self):
return f"{self.ROOT}/{self.LOCKS_PATH}/{self.uuid}"
@ -2513,14 +2544,55 @@ class NodesetRequest(zkobject.LockableZKObject):
zuul_event_id=self.zuul_event_id,
span_info=self.span_info,
provider_node_data=self.provider_node_data,
_relative_priority=self.relative_priority,
)
return json.dumps(data, sort_keys=True).encode("utf-8")
def deserialize(self, raw, context, extra=None):
if context is not None:
try:
self._revision.refresh(context)
except NoNodeError:
pass
return super().deserialize(raw, context, extra)
def __repr__(self):
return (f"<NodesetRequest uuid={self.uuid}, state={self.state},"
f" labels={self.labels}, path={self.getPath()}>")
class NodesetRequestRevision(zkobject.ZKObject):
# We don't want to re-create the request in case it was deleted
makepath = False
def __init__(self):
super().__init__()
self._set(
relative_priority=0,
# Not serialized
request=None,
)
def force(self, context, **kw):
self._set(**kw)
if getattr(self, '_zstat', None) is None:
try:
return self.internalCreate(context)
except NodeExistsError:
pass
data = self._trySerialize(context)
self._save(context, data)
def getPath(self):
return f"{self.request.getPath()}/revision"
def serialize(self, context):
data = dict(
relative_priority=self.relative_priority,
)
return json.dumps(data, sort_keys=True).encode("utf-8")
class ProviderNode(zkobject.PolymorphicZKObjectMixin,
zkobject.LockableZKObject):

View File

@ -47,6 +47,10 @@ class ZuulTreeCache(abc.ABC):
event_log = logging.getLogger("zuul.zk.cache.event")
qsize_warning_threshold = 1024
# Sentinel object that can be returned from preCacheHook to not
# update the current object.
STOP_OBJECT_UPDATE = object()
def __init__(self, zk_client, root, async_worker=True):
"""Use watchers to keep a cache of local objects up to date.
@ -312,7 +316,9 @@ class ZuulTreeCache(abc.ABC):
self._cached_paths.discard(event.path)
# Some caches have special handling for certain sub-objects
self.preCacheHook(event, exists, stat)
if (self.preCacheHook(event, exists, data, stat)
== self.STOP_OBJECT_UPDATE):
return
# If we don't actually cache this kind of object, return now
if key is None:
@ -349,7 +355,7 @@ class ZuulTreeCache(abc.ABC):
return sentinel.wait(timeout)
# Methods for subclasses:
def preCacheHook(self, event, exists, stat=None):
def preCacheHook(self, event, exists, data=None, stat=None):
"""Called before the cache is updated
This is called for any add/update/remove event under the root,
@ -362,8 +368,12 @@ class ZuulTreeCache(abc.ABC):
Otherwise, it indicates whether or not the EventType would
cause the node to exist in ZK.
Return sentinel STOP_OBJECT_UPDATE to skip the object update
step.
:param EventType event: The event.
:param bool exists: Whether the object exists in ZK.
:param bytes data: data when fetched, else None
:param ZnodeStat stat: ZNode stat when the node exists, else None
"""

View File

@ -125,7 +125,7 @@ class JobRequestCache(ZuulTreeCache):
if parts[0] == 'requests':
return (parts[0], parts[1])
def preCacheHook(self, event, exists, stat=None):
def preCacheHook(self, event, exists, data=None, stat=None):
parts = self._parsePath(event.path)
if parts is None:
return

View File

@ -16,7 +16,6 @@ import collections
import json
import logging
import threading
from operator import attrgetter
import mmh3
from kazoo.exceptions import NoNodeError
@ -63,14 +62,14 @@ class LockableZKObjectCache(ZuulTreeCache):
parts = self._parsePath(path)
if parts is None:
return None
if len(parts) != 2:
if len(parts) < 2:
return None
if parts[0] != self.items_path:
return None
item_uuid = parts[-1]
item_uuid = parts[1]
return (item_uuid,)
def preCacheHook(self, event, exists, stat=None):
def preCacheHook(self, event, exists, data=None, stat=None):
parts = self._parsePath(event.path)
if parts is None:
return
@ -116,6 +115,32 @@ class LockableZKObjectCache(ZuulTreeCache):
return set(self._cached_objects.keys())
class RequestCache(LockableZKObjectCache):
def preCacheHook(self, event, exists, data=None, stat=None):
parts = self._parsePath(event.path)
if parts is None:
return
# Expecting (<self.locks_path>, <uuid>, <lock>,)
# or (<self.items_path>, <uuid>, revision,)
if len(parts) != 3:
return
object_type, request_uuid, *_ = parts
key = (request_uuid,)
request = self._cached_objects.get(key)
if not request:
return
if object_type == self.locks_path:
request._set(is_locked=exists)
elif data is not None:
request._revision._updateFromRaw(data, stat, None)
return self.STOP_OBJECT_UPDATE
class NodeCache(LockableZKObjectCache):
def __init__(self, *args, **kw):
# Key -> quota, for each cached object
@ -159,7 +184,7 @@ class LauncherApi:
self.component_registry = component_registry
self.component_info = component_info
self.event_callback = event_callback
self.requests_cache = LockableZKObjectCache(
self.requests_cache = RequestCache(
self.zk_client,
self.event_callback,
root=NodesetRequest.ROOT,
@ -186,8 +211,7 @@ class LauncherApi:
c.hostname: c for c in self.component_registry.all("launcher")}
candidate_names = set(candidate_launchers.keys())
for request in sorted(self.requests_cache.getItems(),
key=attrgetter("priority")):
for request in sorted(self.requests_cache.getItems()):
if request.hasLock():
# We are holding a lock, so short-circuit here.
yield request
@ -221,7 +245,7 @@ class LauncherApi:
return self.requests_cache.getItem(request_id)
def getNodesetRequests(self):
return self.requests_cache.getItems()
return sorted(self.requests_cache.getItems())
def getMatchingProviderNodes(self):
all_launchers = {