Merge "Factor out allocation candidate generation strategy"

This commit is contained in:
Zuul 2025-01-08 21:44:29 +00:00 committed by Gerrit Code Review
commit 93674ecfa5

View File

@ -687,7 +687,44 @@ def _consolidate_allocation_requests(areqs, rw_ctx):
mappings=mappings)
def _get_areq_list_generators(areq_lists_by_anchor, all_suffixes):
"""Returns a generator for each anchor provider that generates viable
candidates (areq_lists) for the given anchor
"""
return [
# We're using itertools.product to go from this:
# areq_lists_by_suffix = {
# '': [areq__A, areq__B, ...],
# '1': [areq_1_A, areq_1_B, ...],
# ...
# '42': [areq_42_A, areq_42_B, ...],
# }
# to this:
# [ [areq__A, areq_1_A, ..., areq_42_A], Each of these lists is one
# [areq__A, areq_1_A, ..., areq_42_B], solution to return.
# [areq__A, areq_1_B, ..., areq_42_A], Each solution contains one
# [areq__A, areq_1_B, ..., areq_42_B], AllocationRequest from each
# [areq__B, areq_1_A, ..., areq_42_A], RequestGroup. So taken as a
# [areq__B, areq_1_A, ..., areq_42_B], whole, each list is a viable
# [areq__B, areq_1_B, ..., areq_42_A], (preliminary) candidate to
# [areq__B, areq_1_B, ..., areq_42_B], return.
# ...,
# ]
itertools.product(*list(areq_lists_by_suffix.values()))
for areq_lists_by_suffix in areq_lists_by_anchor.values()
# Filter out any entries that don't have allocation requests for
# *all* suffixes (i.e. all RequestGroups)
if set(areq_lists_by_suffix) == all_suffixes
]
def _generate_areq_lists(areq_lists_by_anchor, all_suffixes):
generators = _get_areq_list_generators(areq_lists_by_anchor, all_suffixes)
return itertools.chain(*generators)
# TODO(efried): Move _merge_candidates to rw_ctx?
def _merge_candidates(candidates, rw_ctx):
"""Given a dict, keyed by RequestGroup suffix, of allocation_requests,
produce a single tuple of (allocation_requests, provider_summaries) that
@ -732,73 +769,40 @@ def _merge_candidates(candidates, rw_ctx):
all_suffixes = set(candidates)
num_granular_groups = len(all_suffixes - set(['']))
max_a_c = rw_ctx.config.placement.max_allocation_candidates
for areq_lists_by_suffix in areq_lists_by_anchor.values():
# Filter out any entries that don't have allocation requests for
# *all* suffixes (i.e. all RequestGroups)
if set(areq_lists_by_suffix) != all_suffixes:
for areq_list in _generate_areq_lists(areq_lists_by_anchor, all_suffixes):
# At this point, each AllocationRequest in areq_list is still
# marked as use_same_provider. This is necessary to filter by group
# policy, which enforces how these interact with each other.
# TODO(efried): Move _satisfies_group_policy to rw_ctx?
if not _satisfies_group_policy(
areq_list, rw_ctx.group_policy, num_granular_groups):
continue
# We're using itertools.product to go from this:
# areq_lists_by_suffix = {
# '': [areq__A, areq__B, ...],
# '1': [areq_1_A, areq_1_B, ...],
# ...
# '42': [areq_42_A, areq_42_B, ...],
# }
# to this:
# [ [areq__A, areq_1_A, ..., areq_42_A], Each of these lists is one
# [areq__A, areq_1_A, ..., areq_42_B], areq_list in the loop below.
# [areq__A, areq_1_B, ..., areq_42_A], each areq_list contains one
# [areq__A, areq_1_B, ..., areq_42_B], AllocationRequest from each
# [areq__B, areq_1_A, ..., areq_42_A], RequestGroup. So taken as a
# [areq__B, areq_1_A, ..., areq_42_B], whole, each list is a viable
# [areq__B, areq_1_B, ..., areq_42_A], (preliminary) candidate to
# [areq__B, areq_1_B, ..., areq_42_B], return.
if not _satisfies_same_subtree(areq_list, rw_ctx):
continue
# Now we go from this (where 'arr' is AllocationRequestResource):
# [ areq__B(arrX, arrY, arrZ),
# areq_1_A(arrM, arrN),
# ...,
# areq_42_B(arrQ)
# ]
# This loops on each merged candidate where a candidate is represented
# by the areq_list containing the allocations that fulfills each
# request groups
for areq_list in itertools.product(
*list(areq_lists_by_suffix.values())):
# At this point, each AllocationRequest in areq_list is still
# marked as use_same_provider. This is necessary to filter by group
# policy, which enforces how these interact with each other.
# TODO(efried): Move _satisfies_group_policy to rw_ctx?
if not _satisfies_group_policy(
areq_list, rw_ctx.group_policy, num_granular_groups):
continue
if not _satisfies_same_subtree(areq_list, rw_ctx):
continue
# Now we go from this (where 'arr' is AllocationRequestResource):
# [ areq__B(arrX, arrY, arrZ),
# areq_1_A(arrM, arrN),
# ...,
# areq_42_B(arrQ)
# ]
# to this:
# areq_combined(arrX, arrY, arrZ, arrM, arrN, arrQ)
# Note that the information telling us which RequestGroup led to
# which piece of the AllocationRequest has been lost from the outer
# layer of the data structure (the key of areq_lists_by_suffix).
# => We needed that to be present for the previous filter; we need
# it to be *absent* for the next one.
# => However, it still exists embedded in each
# AllocationRequestResource. That's needed to construct the
# mappings for the output.
areq = _consolidate_allocation_requests(areq_list, rw_ctx)
# Since we sourced this AllocationRequest from multiple
# *independent* queries, it's possible that the combined result
# now exceeds capacity where amounts of the same RP+RC were
# folded together. So do a final capacity check/filter.
if rw_ctx.exceeds_capacity(areq):
continue
areqs.add(areq)
if max_a_c >= 0 and len(areqs) >= max_a_c:
# This duplicated check will go away in the next patch that
# refactors this logic a bit.
break
# to this:
# areq_combined(arrX, arrY, arrZ, arrM, arrN, arrQ)
# Note that the information telling us which RequestGroup led to
# which piece of the AllocationRequest has been lost from the outer
# layer of the data structure (the key of areq_lists_by_suffix).
# => We needed that to be present for the previous filter; we need
# it to be *absent* for the next one.
# => However, it still exists embedded in each
# AllocationRequestResource. That's needed to construct the
# mappings for the output.
areq = _consolidate_allocation_requests(areq_list, rw_ctx)
# Since we sourced this AllocationRequest from multiple
# *independent* queries, it's possible that the combined result
# now exceeds capacity where amounts of the same RP+RC were
# folded together. So do a final capacity check/filter.
if rw_ctx.exceeds_capacity(areq):
continue
areqs.add(areq)
if max_a_c >= 0 and len(areqs) >= max_a_c:
break