Launcher: defer requests with insufficient quota
We currently accept requests as fast as we can and assign new nodes to them, and then allow those nodes to wait for quota before we try to actually start building them. This means that when at capacity, the system quickly stabilizes in a state where all requests are accepted and many nodes are waiting in the start state. Once a request is accepted, we no longer consider its priority. This reduces the effect of request priority (including relative priority). To correct this, have the accept processing method defer requests if the chosen provider is at quota. But if we have already started assigning nodes to a request, continue to do so (and if the provider is at quota, we will continue to wait in the start state). This lets multi-node jobs get a foothold in the queue once their first label is able to be fulfilled. Change-Id: I494b2d42fff96ab5d26e52296ee03b01e9ed3d5d
This commit is contained in:
@ -147,7 +147,7 @@ class BaseCloudDriverTest(ZuulTestCase):
|
||||
with testtools.ExpectedException(Exception):
|
||||
self.waitForNodeRequest(request2, 10)
|
||||
request2 = client.getRequest(request2.uuid)
|
||||
self.assertEqual(request2.state, model.NodesetRequest.State.ACCEPTED)
|
||||
self.assertEqual(request2.state, model.NodesetRequest.State.REQUESTED)
|
||||
|
||||
client.returnNodeset(nodeset1)
|
||||
self.waitUntilSettled()
|
||||
|
@ -26,6 +26,7 @@ from zuul import model
|
||||
import zuul.driver.aws.awsendpoint
|
||||
from zuul.launcher.client import LauncherClient
|
||||
|
||||
import cachetools
|
||||
import fixtures
|
||||
import responses
|
||||
import testtools
|
||||
@ -903,7 +904,9 @@ class TestLauncher(LauncherBaseTestCase):
|
||||
|
||||
@simple_layout('layouts/nodepool.yaml', enable_nodepool=True)
|
||||
@okay_tracebacks('_getQuotaForInstanceType')
|
||||
def test_failed_node(self):
|
||||
@mock.patch('zuul.launcher.server.Launcher.doesProviderHaveQuotaForLabel',
|
||||
return_value=True)
|
||||
def test_failed_node(self, mock_quota):
|
||||
# Test a node failure outside of the create state machine
|
||||
ctx = self.createZKContext(None)
|
||||
request = self.requestNodes(["debian-invalid"])
|
||||
@ -1257,13 +1260,12 @@ class TestLauncher(LauncherBaseTestCase):
|
||||
except NoNodeError:
|
||||
break
|
||||
|
||||
@simple_layout('layouts/nodepool-multi-provider.yaml',
|
||||
@simple_layout('layouts/nodepool.yaml',
|
||||
enable_nodepool=True)
|
||||
@driver_config('test_launcher', quotas={
|
||||
'instances': 1,
|
||||
})
|
||||
def test_relative_priority(self):
|
||||
# Test that we spread quota use out among multiple providers
|
||||
self.waitUntilSettled()
|
||||
|
||||
client = LauncherClient(self.zk_client, None)
|
||||
@ -1273,6 +1275,10 @@ class TestLauncher(LauncherBaseTestCase):
|
||||
nodes0 = self.getNodes(request0)
|
||||
self.assertEqual(1, len(nodes0))
|
||||
|
||||
# Make sure the next requests always have current quota info
|
||||
self.launcher._provider_quota_cache = cachetools.TTLCache(
|
||||
maxsize=8192, ttl=0)
|
||||
|
||||
requests = []
|
||||
ctx = self.createZKContext(None)
|
||||
for _ in range(2):
|
||||
@ -1291,14 +1297,19 @@ class TestLauncher(LauncherBaseTestCase):
|
||||
)
|
||||
requests.append(request)
|
||||
|
||||
# Allow the main loop to run to verify that we defer the
|
||||
# requests
|
||||
time.sleep(2)
|
||||
# Revise relative priority, so that the last requests has
|
||||
# the highest relative priority.
|
||||
request1_p2, request2_p1 = requests
|
||||
client.reviseRequest(request1_p2, relative_priority=2)
|
||||
client.reviseRequest(request2_p1, relative_priority=1)
|
||||
with self.launcher._run_lock:
|
||||
request1_p2, request2_p1 = requests
|
||||
client.reviseRequest(request1_p2, relative_priority=2)
|
||||
client.reviseRequest(request2_p1, relative_priority=1)
|
||||
|
||||
# Delete the initial request to free up the instance
|
||||
request0.delete(ctx)
|
||||
|
||||
# Delete the initial request to free up the instance
|
||||
request0.delete(ctx)
|
||||
# Last request should be fulfilled
|
||||
for _ in iterate_timeout(10, "request to be fulfilled"):
|
||||
request2_p1.refresh(ctx)
|
||||
@ -1307,7 +1318,7 @@ class TestLauncher(LauncherBaseTestCase):
|
||||
|
||||
# Lower priority request should not be fulfilled
|
||||
request1_p2.refresh(ctx)
|
||||
self.assertEqual(request1_p2.State.ACCEPTED, request1_p2.state)
|
||||
self.assertEqual(request1_p2.State.REQUESTED, request1_p2.state)
|
||||
|
||||
|
||||
class TestLauncherUpload(LauncherBaseTestCase):
|
||||
|
@ -488,9 +488,9 @@ class OpenstackProviderEndpoint(BaseProviderEndpoint):
|
||||
volume = None
|
||||
return quota_from_limits(compute, volume)
|
||||
|
||||
def getQuotaForLabel(self, label):
|
||||
flavor = self._findFlavor(label.flavor_name, label.min_ram)
|
||||
return quota_from_flavor(flavor, label=label)
|
||||
def getQuotaForLabel(self, label, flavor):
|
||||
os_flavor = self._findFlavorByName(flavor.flavor_name)
|
||||
return quota_from_flavor(os_flavor, label=label)
|
||||
|
||||
def getAZs(self):
|
||||
# TODO: This is currently unused; it's unclear if we will need
|
||||
|
@ -1036,6 +1036,7 @@ class Launcher:
|
||||
)
|
||||
|
||||
self.nodescan_worker = NodescanWorker()
|
||||
self._run_lock = threading.Lock()
|
||||
self.launcher_thread = threading.Thread(
|
||||
target=self.run,
|
||||
name="Launcher",
|
||||
@ -1070,7 +1071,8 @@ class Launcher:
|
||||
while self._running:
|
||||
loop_start = time.monotonic()
|
||||
try:
|
||||
self._run()
|
||||
with self._run_lock:
|
||||
self._run()
|
||||
except Exception:
|
||||
self.log.exception("Error in main thread:")
|
||||
loop_duration = time.monotonic() - loop_start
|
||||
@ -1142,7 +1144,7 @@ class Launcher:
|
||||
)
|
||||
|
||||
def _acceptRequest(self, request, log, ready_nodes):
|
||||
log.debug("Accepting request %s", request)
|
||||
log.debug("Considering request %s", request)
|
||||
# Create provider nodes for the requested labels
|
||||
label_providers = self._selectProviders(request, log)
|
||||
with (self.createZKContext(request._lock, log) as ctx,
|
||||
@ -1195,11 +1197,36 @@ class Launcher:
|
||||
finally:
|
||||
node.releaseLock(ctx)
|
||||
else:
|
||||
# If we have not assigned any nodes to this
|
||||
# request we don't have to proceed if there is no
|
||||
# quota.
|
||||
if not bool(request.provider_node_data):
|
||||
try:
|
||||
has_quota = self.doesProviderHaveQuotaForLabel(
|
||||
provider, label, log)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Error checking quota for label %s "
|
||||
"in provider %s", label, provider)
|
||||
raise NodesetRequestError(
|
||||
"Unable to determine quota")
|
||||
if not has_quota:
|
||||
log.debug("Deferring request %s "
|
||||
"due to insufficient quota", request)
|
||||
# TODO: We may want to consider all the
|
||||
# labels at once so that if we can
|
||||
# fulfilly any label immediately, we
|
||||
# accept the request; currently this will
|
||||
# happen only if we can fulfill the first
|
||||
# label immediately.
|
||||
return
|
||||
node = self._requestNode(
|
||||
label, request, provider, log, ctx)
|
||||
log.debug("Requested node %s", node.uuid)
|
||||
request.addProviderNode(node)
|
||||
request.state = model.NodesetRequest.State.ACCEPTED
|
||||
if request.provider_node_data:
|
||||
log.debug("Accepting request %s", request)
|
||||
request.state = model.NodesetRequest.State.ACCEPTED
|
||||
|
||||
def _selectProviders(self, request, log):
|
||||
providers = self.tenant_providers.get(request.tenant_name)
|
||||
@ -1208,7 +1235,7 @@ class Launcher:
|
||||
f"No provider for tenant {request.tenant_name}")
|
||||
|
||||
# Start with a randomized list of providers so that different
|
||||
# requsets may have different behavior.
|
||||
# requests may have different behavior.
|
||||
random.shuffle(providers)
|
||||
|
||||
# Sort that list by quota
|
||||
@ -2292,6 +2319,16 @@ class Launcher:
|
||||
pct = round(pct, 1)
|
||||
return pct
|
||||
|
||||
def doesProviderHaveQuotaForLabel(self, provider, label, log):
|
||||
total = self.getProviderQuota(provider).copy()
|
||||
log.debug("Provider %s quota before Zuul: %s", provider, total)
|
||||
total.subtract(self.getQuotaUsed(provider))
|
||||
log.debug("Provider %s quota including Zuul: %s", provider, total)
|
||||
label_quota = provider.getQuotaForLabel(label)
|
||||
total.subtract(label_quota)
|
||||
log.debug("Label %s required quota: %s", label, label_quota)
|
||||
return total.nonNegative()
|
||||
|
||||
def doesProviderHaveQuotaForNode(self, provider, node, log):
|
||||
total = self.getProviderQuota(provider).copy()
|
||||
log.debug("Provider %s quota before Zuul: %s", provider, total)
|
||||
|
Reference in New Issue
Block a user