Merge "Store failed provider node data on request"
This commit is contained in:
commit
d4c73a2f02
@ -690,10 +690,9 @@ class TestLauncher(LauncherBaseTestCase):
|
||||
ctx = self.createZKContext(None)
|
||||
request = self.requestNodes(["debian-invalid"])
|
||||
self.assertEqual(request.state, model.NodesetRequest.State.FAILED)
|
||||
provider_nodes = request.provider_nodes[0]
|
||||
self.assertEqual(len(provider_nodes), 2)
|
||||
provider_node_data = request.provider_node_data[0]
|
||||
self.assertEqual(len(provider_node_data['failed_providers']), 1)
|
||||
self.assertEqual(len(request.nodes), 1)
|
||||
self.assertEqual(provider_nodes[-1], request.nodes[-1])
|
||||
|
||||
provider_nodes = []
|
||||
for node_id in request.nodes:
|
||||
@ -720,10 +719,9 @@ class TestLauncher(LauncherBaseTestCase):
|
||||
ctx = self.createZKContext(None)
|
||||
request = self.requestNodes(["debian-normal"])
|
||||
self.assertEqual(request.state, model.NodesetRequest.State.FAILED)
|
||||
provider_nodes = request.provider_nodes[0]
|
||||
self.assertEqual(len(provider_nodes), 2)
|
||||
provider_node_data = request.provider_node_data[0]
|
||||
self.assertEqual(len(provider_node_data['failed_providers']), 1)
|
||||
self.assertEqual(len(request.nodes), 1)
|
||||
self.assertEqual(provider_nodes[-1], request.nodes[-1])
|
||||
|
||||
provider_nodes = []
|
||||
for node_id in request.nodes:
|
||||
@ -755,10 +753,9 @@ class TestLauncher(LauncherBaseTestCase):
|
||||
ctx = self.createZKContext(None)
|
||||
request = self.requestNodes(["debian-normal"])
|
||||
self.assertEqual(request.state, model.NodesetRequest.State.FAILED)
|
||||
provider_nodes = request.provider_nodes[0]
|
||||
self.assertEqual(len(provider_nodes), 2)
|
||||
provider_node_data = request.provider_node_data[0]
|
||||
self.assertEqual(len(provider_node_data['failed_providers']), 1)
|
||||
self.assertEqual(len(request.nodes), 1)
|
||||
self.assertEqual(provider_nodes[-1], request.nodes[-1])
|
||||
|
||||
provider_nodes = []
|
||||
for node_id in request.nodes:
|
||||
@ -806,10 +803,9 @@ class TestLauncher(LauncherBaseTestCase):
|
||||
ctx = self.createZKContext(None)
|
||||
request = self.requestNodes(["debian-normal"], timeout=30)
|
||||
self.assertEqual(request.state, model.NodesetRequest.State.FAILED)
|
||||
provider_nodes = request.provider_nodes[0]
|
||||
self.assertEqual(len(provider_nodes), 2)
|
||||
provider_node_data = request.provider_node_data[0]
|
||||
self.assertEqual(len(provider_node_data['failed_providers']), 1)
|
||||
self.assertEqual(len(request.nodes), 1)
|
||||
self.assertEqual(provider_nodes[-1], request.nodes[-1])
|
||||
|
||||
provider_nodes = []
|
||||
for node_id in request.nodes:
|
||||
@ -841,8 +837,8 @@ class TestLauncher(LauncherBaseTestCase):
|
||||
ctx = self.createZKContext(None)
|
||||
request = self.requestNodes(["debian-normal"])
|
||||
self.assertEqual(request.state, model.NodesetRequest.State.FULFILLED)
|
||||
provider_nodes = request.provider_nodes[0]
|
||||
self.assertEqual(len(provider_nodes), 1)
|
||||
provider_node_data = request.provider_node_data[0]
|
||||
self.assertEqual(len(provider_node_data['failed_providers']), 0)
|
||||
self.assertEqual(len(request.nodes), 1)
|
||||
|
||||
node = model.ProviderNode.fromZK(
|
||||
|
@ -2752,10 +2752,10 @@ class TestLauncherApi(ZooKeeperBaseTestCase):
|
||||
break
|
||||
|
||||
# Accept and update the nodeset request
|
||||
request.updateAttributes(
|
||||
context,
|
||||
state=model.NodesetRequest.State.ACCEPTED,
|
||||
provider_nodes=[[n.uuid] for n in provider_nodes])
|
||||
with request.activeContext(context):
|
||||
for n in provider_nodes:
|
||||
request.addProviderNode(n)
|
||||
request.state = model.NodesetRequest.State.ACCEPTED
|
||||
|
||||
# "Fulfill" requested provider nodes
|
||||
for node in self.api.getMatchingProviderNodes():
|
||||
|
@ -18,7 +18,6 @@ import concurrent.futures
|
||||
import collections
|
||||
import errno
|
||||
import fcntl
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
@ -837,15 +836,15 @@ class Launcher:
|
||||
# Make sure we have all associated provider nodes in the cache
|
||||
return all(
|
||||
self.api.getProviderNode(n)
|
||||
for n in itertools.chain.from_iterable(request.provider_nodes)
|
||||
for n in request.nodes
|
||||
)
|
||||
|
||||
def _acceptRequest(self, request, log, ready_nodes):
|
||||
log.debug("Accepting request %s", request)
|
||||
# Create provider nodes for the requested labels
|
||||
provider_nodes = []
|
||||
label_providers = self._selectProviders(request, log)
|
||||
with self.createZKContext(request._lock, log) as ctx:
|
||||
with (self.createZKContext(request._lock, log) as ctx,
|
||||
request.activeContext(ctx)):
|
||||
for i, (label, provider) in enumerate(label_providers):
|
||||
# TODO: sort by age? use old nodes first? random to reduce
|
||||
# chance of thundering herd?
|
||||
@ -892,12 +891,8 @@ class Launcher:
|
||||
node = self._requestNode(
|
||||
label, request, provider, log, ctx)
|
||||
log.debug("Requested node %s", node.uuid)
|
||||
provider_nodes.append([node.uuid])
|
||||
|
||||
request.updateAttributes(
|
||||
ctx,
|
||||
state=model.NodesetRequest.State.ACCEPTED,
|
||||
provider_nodes=provider_nodes)
|
||||
request.addProviderNode(node)
|
||||
request.state = model.NodesetRequest.State.ACCEPTED
|
||||
|
||||
def _selectProviders(self, request, log):
|
||||
providers = self.tenant_providers.get(request.tenant_name)
|
||||
@ -905,40 +900,73 @@ class Launcher:
|
||||
raise NodesetRequestError(
|
||||
f"No provider for tenant {request.tenant_name}")
|
||||
|
||||
existing_nodes = [
|
||||
self.api.getProviderNode(n)
|
||||
for n in itertools.chain.from_iterable(request.provider_nodes)
|
||||
]
|
||||
provider_failures = collections.Counter(
|
||||
n.provider for n in existing_nodes
|
||||
if n.state == n.State.FAILED)
|
||||
# Start with a randomized list of providers so that different
|
||||
# requsets may have different behavior.
|
||||
random.shuffle(providers)
|
||||
|
||||
# Sort that list by quota
|
||||
providers.sort(key=lambda p: self.getQuotaPercentage(p))
|
||||
|
||||
# A list of providers that could work for all labels in the request
|
||||
providers_for_all_labels = set(providers)
|
||||
# A list of providers for each label in the request, indexed
|
||||
# by label index number
|
||||
providers_for_label = {}
|
||||
# Which providers are used by existing nodes in this request
|
||||
existing_providers = collections.Counter()
|
||||
|
||||
label_providers = []
|
||||
for i, label_name in enumerate(request.labels):
|
||||
# Get a list of candidate providers along with their
|
||||
# approximate quota usage.
|
||||
provider_failures = collections.Counter()
|
||||
pnd = request.getProviderNodeData(i)
|
||||
if pnd:
|
||||
provider_failures.update(pnd['failed_providers'])
|
||||
if pnd['uuid']:
|
||||
existing_node = self.api.getProviderNode(pnd['uuid'])
|
||||
# Special case for the current node just failed
|
||||
if existing_node.state == existing_node.State.FAILED:
|
||||
provider_failures[existing_node.provider] += 1
|
||||
existing_providers[existing_node.provider] += 1
|
||||
candidate_providers = [
|
||||
(p, self.getQuotaPercentage(p))
|
||||
for p in providers
|
||||
p for p in providers
|
||||
if p.hasLabel(label_name)
|
||||
and provider_failures[p.canonical_name] < p.launch_attempts
|
||||
]
|
||||
providers_for_label[i] = candidate_providers
|
||||
providers_for_all_labels &= set(candidate_providers)
|
||||
|
||||
# Turn the reduced set union of providers that work for all
|
||||
# labels back into an ordered list.
|
||||
providers_for_all_labels = [
|
||||
p for p in providers if p in providers_for_all_labels]
|
||||
|
||||
main_provider = None
|
||||
most_common = existing_providers.most_common(1)
|
||||
if most_common:
|
||||
main_provider = most_common[0][0]
|
||||
elif providers_for_all_labels:
|
||||
main_provider = providers_for_all_labels[0]
|
||||
|
||||
label_providers = []
|
||||
for i, label_name in enumerate(request.labels):
|
||||
candidate_providers = providers_for_label[i]
|
||||
if not candidate_providers:
|
||||
raise NodesetRequestError(
|
||||
f"No provider found for label {label_name}")
|
||||
|
||||
# TODO: make provider selection more sophisticated
|
||||
# Start by randomizing them
|
||||
random.shuffle(candidate_providers)
|
||||
# Then sort by quota used; providers under quota will
|
||||
# still be randomized with other providers within 10% of
|
||||
# the same quota usage values.
|
||||
candidate_providers.sort(key=lambda x: x[1])
|
||||
provider = candidate_providers[0][0]
|
||||
log.debug("Selected provider %s from candidate providers: %s",
|
||||
provider, candidate_providers)
|
||||
if main_provider in candidate_providers:
|
||||
provider = main_provider
|
||||
log.debug(
|
||||
"Selected request main provider %s "
|
||||
"from candidate providers: %s",
|
||||
provider, candidate_providers)
|
||||
else:
|
||||
provider = candidate_providers[0]
|
||||
log.debug(
|
||||
"Selected provider %s "
|
||||
"from candidate providers: %s",
|
||||
provider, candidate_providers)
|
||||
label = provider.labels[label_name]
|
||||
label_providers.append((label, provider))
|
||||
|
||||
return label_providers
|
||||
|
||||
def _requestNode(self, label, request, provider, log, ctx):
|
||||
@ -978,14 +1006,25 @@ class Launcher:
|
||||
for i, node_id in enumerate(request.nodes):
|
||||
node = self.api.getProviderNode(node_id)
|
||||
if node.state in (node.State.FAILED, node.State.TEMPFAILED):
|
||||
# In all cases, delete the old node
|
||||
# if this was a tempfail, retry again as normal
|
||||
# otherwise, add to the failed providers list in the request
|
||||
label_providers = self._selectProviders(request, log)
|
||||
label, provider = label_providers[i]
|
||||
if node.state == node.State.FAILED:
|
||||
add_failed_provider = node.provider
|
||||
else:
|
||||
add_failed_provider = None
|
||||
log.info("Retrying request with provider %s", provider)
|
||||
with self.createZKContext(request._lock, log) as ctx:
|
||||
node = self._requestNode(
|
||||
label, request, provider, log, ctx)
|
||||
with request.activeContext(ctx):
|
||||
request.provider_nodes[i].append(node.uuid)
|
||||
request.updateProviderNode(
|
||||
i,
|
||||
uuid=node.uuid,
|
||||
add_failed_provider=add_failed_provider,
|
||||
)
|
||||
|
||||
requested_nodes.append(node)
|
||||
|
||||
@ -1073,7 +1112,9 @@ class Launcher:
|
||||
# longer exists
|
||||
(node.request_id is not None and not request)
|
||||
# ... it is failed/outdated
|
||||
or node.state in (node.State.FAILED, node.State.OUTDATED)
|
||||
or node.state in (node.State.FAILED,
|
||||
node.State.TEMPFAILED,
|
||||
node.State.OUTDATED)
|
||||
):
|
||||
try:
|
||||
self._cleanupNode(node, log)
|
||||
|
@ -2451,10 +2451,8 @@ class NodesetRequest(zkobject.LockableZKObject):
|
||||
request_time=time.time(),
|
||||
zuul_event_id="",
|
||||
span_info=None,
|
||||
# A list of list with the attempted provider nodes. The last
|
||||
# item in the contained lists is the current attempt. E.g.
|
||||
# [[label-A-failed, label-A-building], [label-b-ready], ...]
|
||||
provider_nodes=[],
|
||||
# A dict of info about the node we have assigned to each label
|
||||
provider_node_data=[],
|
||||
# Attributes that are not serialized
|
||||
lock=None,
|
||||
is_locked=False,
|
||||
@ -2462,13 +2460,33 @@ class NodesetRequest(zkobject.LockableZKObject):
|
||||
_lscores=None,
|
||||
)
|
||||
|
||||
def addProviderNode(self, provider_node):
|
||||
self.provider_node_data.append(dict(
|
||||
uuid=provider_node.uuid,
|
||||
failed_providers=[],
|
||||
))
|
||||
|
||||
def updateProviderNode(self, index,
|
||||
uuid=None,
|
||||
add_failed_provider=None):
|
||||
data = self.provider_node_data[index]
|
||||
if uuid is not None:
|
||||
data['uuid'] = uuid
|
||||
if add_failed_provider is not None:
|
||||
data['failed_providers'].append(add_failed_provider)
|
||||
|
||||
def getProviderNodeData(self, index):
|
||||
if index < len(self.provider_node_data):
|
||||
return self.provider_node_data[index]
|
||||
return None
|
||||
|
||||
@property
|
||||
def fulfilled(self):
|
||||
return self.state == self.State.FULFILLED
|
||||
|
||||
@property
|
||||
def nodes(self):
|
||||
return [n[-1] for n in self.provider_nodes]
|
||||
return [n['uuid'] for n in self.provider_node_data]
|
||||
|
||||
@property
|
||||
def created_time(self):
|
||||
@ -2494,7 +2512,7 @@ class NodesetRequest(zkobject.LockableZKObject):
|
||||
request_time=self.request_time,
|
||||
zuul_event_id=self.zuul_event_id,
|
||||
span_info=self.span_info,
|
||||
provider_nodes=self.provider_nodes,
|
||||
provider_node_data=self.provider_node_data,
|
||||
)
|
||||
return json.dumps(data, sort_keys=True).encode("utf-8")
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user