Reuse cache result for sharing providers capacity
Getting sharing providers in _get_providers_with_shared_capacity() was heavier than the main function _get_providers_with_resource() to search the requested resource, but they had similar algorithm being a bit redundant. This patch changes _get_providers_with_shared_capacity() to reuse the result of _get_providers_with_resource() for performance optimization. Change-Id: I190c13c61314da172623c7f9fded7e4ed3d83f80 Story: 2005712 Task: 31038
This commit is contained in:
parent
f8bbda15e7
commit
7b8e2a8a6b
@ -136,12 +136,13 @@ class AllocationCandidates(object):
|
||||
def _get_by_requests(cls, context, requests, limit=None,
|
||||
group_policy=None, nested_aware=True):
|
||||
has_trees = res_ctx.has_provider_trees(context)
|
||||
sharing = res_ctx.get_sharing_providers(context)
|
||||
|
||||
candidates = {}
|
||||
for suffix, request in requests.items():
|
||||
try:
|
||||
rg_ctx = res_ctx.RequestGroupSearchContext(
|
||||
context, request, has_trees)
|
||||
context, request, has_trees, sharing)
|
||||
except exception.ResourceProviderNotFound:
|
||||
return [], []
|
||||
|
||||
|
@ -14,7 +14,6 @@
|
||||
import collections
|
||||
import os_traits
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import sql
|
||||
|
||||
@ -47,7 +46,7 @@ class RequestGroupSearchContext(object):
|
||||
"""An adapter object that represents the search for allocation candidates
|
||||
for a single request group.
|
||||
"""
|
||||
def __init__(self, context, request, has_trees):
|
||||
def __init__(self, context, request, has_trees, sharing):
|
||||
"""Initializes the object retrieving and caching matching providers
|
||||
for each conditions like resource and aggregates from database.
|
||||
|
||||
@ -119,20 +118,9 @@ class RequestGroupSearchContext(object):
|
||||
raise exception.ResourceProviderNotFound()
|
||||
self._rps_with_resource[rc_id] = provs_with_resource
|
||||
|
||||
# a dict, keyed by resource class ID, of the set of resource
|
||||
# provider IDs that share some inventory for each resource class
|
||||
# This is only used for unnumbered request group path where
|
||||
# use_same_provider is set to False
|
||||
self._sharing_providers = {}
|
||||
if not self.use_same_provider:
|
||||
for rc_id, amount in self.resources.items():
|
||||
# We may want to have a concept of "sharable resource class"
|
||||
# so that we can skip this lookup.
|
||||
# if not rc_id in (sharable_rc_ids):
|
||||
# continue
|
||||
self._sharing_providers[rc_id] = \
|
||||
get_providers_with_shared_capacity(
|
||||
context, rc_id, amount, self.member_of)
|
||||
# a set of resource provider IDs that share some inventory for some
|
||||
# resource class.
|
||||
self._sharing_providers = sharing
|
||||
|
||||
# bool indicating there is some level of nesting in the environment
|
||||
self.has_trees = has_trees
|
||||
@ -143,7 +131,8 @@ class RequestGroupSearchContext(object):
|
||||
the requested resource class (if there isn't, we take faster, simpler
|
||||
code paths)
|
||||
"""
|
||||
return any(self._sharing_providers.values())
|
||||
# NOTE: This could be refactored to see the requested resources
|
||||
return bool(self._sharing_providers)
|
||||
|
||||
@property
|
||||
def exists_nested(self):
|
||||
@ -154,7 +143,13 @@ class RequestGroupSearchContext(object):
|
||||
return self.has_trees
|
||||
|
||||
def get_rps_with_shared_capacity(self, rc_id):
|
||||
return self._sharing_providers.get(rc_id)
|
||||
sharing_in_aggs = self._sharing_providers
|
||||
if self.rps_in_aggs:
|
||||
sharing_in_aggs &= self.rps_in_aggs
|
||||
if not sharing_in_aggs:
|
||||
return set()
|
||||
rps_with_resource = set(p[0] for p in self._rps_with_resource[rc_id])
|
||||
return sharing_in_aggs & rps_with_resource
|
||||
|
||||
def get_rps_with_resource(self, rc_id):
|
||||
return self._rps_with_resource.get(rc_id)
|
||||
@ -823,10 +818,9 @@ def get_provider_ids_for_traits_and_aggs(rg_ctx):
|
||||
|
||||
|
||||
@db_api.placement_context_manager.reader
|
||||
def get_providers_with_shared_capacity(ctx, rc_id, amount, member_of=None):
|
||||
def get_sharing_providers(ctx, rp_ids=None):
|
||||
"""Returns a list of resource provider IDs (internal IDs, not UUIDs)
|
||||
that have capacity for a requested amount of a resource and indicate that
|
||||
they share resource via an aggregate association.
|
||||
that indicate that they share resource via an aggregate association.
|
||||
|
||||
Shared resource providers are marked with a standard trait called
|
||||
MISC_SHARES_VIA_AGGREGATE. This indicates that the provider allows its
|
||||
@ -853,19 +847,10 @@ def get_providers_with_shared_capacity(ctx, rc_id, amount, member_of=None):
|
||||
as potential fits for the requested set of resources.
|
||||
|
||||
To facilitate that matching query, this function returns all providers that
|
||||
indicate they share their inventory with providers in some aggregate and
|
||||
have enough capacity for the requested amount of a resource.
|
||||
indicate they share their inventory with providers in some aggregate.
|
||||
|
||||
To follow the example above, if we were to call
|
||||
get_providers_with_shared_capacity(ctx, "DISK_GB", 100), we would want to
|
||||
get back the ID for the NFS_SHARE resource provider.
|
||||
|
||||
:param rc_id: Internal ID of the requested resource class.
|
||||
:param amount: Amount of the requested resource.
|
||||
:param member_of: When present, contains a list of lists of aggregate
|
||||
uuids that are used to filter the returned list of
|
||||
resource providers that *directly* belong to the
|
||||
aggregates referenced.
|
||||
:param rp_ids: When present, returned resource providers are limited to
|
||||
only those in this value
|
||||
"""
|
||||
# The SQL we need to generate here looks like this:
|
||||
#
|
||||
@ -873,80 +858,26 @@ def get_providers_with_shared_capacity(ctx, rc_id, amount, member_of=None):
|
||||
# FROM resource_providers AS rp
|
||||
# INNER JOIN resource_provider_traits AS rpt
|
||||
# ON rp.id = rpt.resource_provider_id
|
||||
# INNER JOIN traits AS t
|
||||
# ON rpt.trait_id = t.id
|
||||
# AND t.name = "MISC_SHARES_VIA_AGGREGATE"
|
||||
# INNER JOIN inventories AS inv
|
||||
# ON rp.id = inv.resource_provider_id
|
||||
# AND inv.resource_class_id = $rc_id
|
||||
# LEFT JOIN (
|
||||
# SELECT resource_provider_id, SUM(used) as used
|
||||
# FROM allocations
|
||||
# WHERE resource_class_id = $rc_id
|
||||
# GROUP BY resource_provider_id
|
||||
# ) AS usage
|
||||
# ON rp.id = usage.resource_provider_id
|
||||
# WHERE COALESCE(usage.used, 0) + $amount <= (
|
||||
# inv.total - inv.reserved) * inv.allocation_ratio
|
||||
# ) AND
|
||||
# inv.min_unit <= $amount AND
|
||||
# inv.max_unit >= $amount AND
|
||||
# $amount % inv.step_size = 0
|
||||
# GROUP BY rp.id
|
||||
# AND rpt.trait_id = ${"MISC_SHARES_VIA_AGGREGATE" trait id}
|
||||
# WHERE rp.id IN $(RP_IDs)
|
||||
|
||||
sharing_trait = trait_obj.Trait.get_by_name(
|
||||
ctx, os_traits.MISC_SHARES_VIA_AGGREGATE)
|
||||
|
||||
rp_tbl = sa.alias(_RP_TBL, name='rp')
|
||||
inv_tbl = sa.alias(_INV_TBL, name='inv')
|
||||
t_tbl = sa.alias(_TRAIT_TBL, name='t')
|
||||
rpt_tbl = sa.alias(_RP_TRAIT_TBL, name='rpt')
|
||||
|
||||
rp_to_rpt_join = sa.join(
|
||||
rp_tbl, rpt_tbl,
|
||||
rp_tbl.c.id == rpt_tbl.c.resource_provider_id,
|
||||
sa.and_(rp_tbl.c.id == rpt_tbl.c.resource_provider_id,
|
||||
rpt_tbl.c.trait_id == sharing_trait.id)
|
||||
)
|
||||
|
||||
rpt_to_t_join = sa.join(
|
||||
rp_to_rpt_join, t_tbl,
|
||||
sa.and_(
|
||||
rpt_tbl.c.trait_id == t_tbl.c.id,
|
||||
# The traits table wants unicode trait names, but os_traits
|
||||
# presents native str, so we need to cast.
|
||||
t_tbl.c.name == six.text_type(os_traits.MISC_SHARES_VIA_AGGREGATE),
|
||||
),
|
||||
)
|
||||
sel = sa.select([rp_tbl.c.id]).select_from(rp_to_rpt_join)
|
||||
if rp_ids:
|
||||
sel = sel.where(rp_tbl.c.id.in_(rp_ids))
|
||||
|
||||
rp_to_inv_join = sa.join(
|
||||
rpt_to_t_join, inv_tbl,
|
||||
sa.and_(
|
||||
rpt_tbl.c.resource_provider_id == inv_tbl.c.resource_provider_id,
|
||||
inv_tbl.c.resource_class_id == rc_id,
|
||||
),
|
||||
)
|
||||
|
||||
usage = _usage_select([rc_id])
|
||||
|
||||
inv_to_usage_join = sa.outerjoin(
|
||||
rp_to_inv_join, usage,
|
||||
inv_tbl.c.resource_provider_id == usage.c.resource_provider_id,
|
||||
)
|
||||
|
||||
where_conds = _capacity_check_clause(amount, usage, inv_tbl=inv_tbl)
|
||||
|
||||
# If 'member_of' has values, do a separate lookup to identify the
|
||||
# resource providers that meet the member_of constraints.
|
||||
if member_of:
|
||||
rps_in_aggs = provider_ids_matching_aggregates(ctx, member_of)
|
||||
if not rps_in_aggs:
|
||||
# Short-circuit. The user either asked for a non-existing
|
||||
# aggregate or there were no resource providers that matched
|
||||
# the requirements...
|
||||
return []
|
||||
where_conds.append(rp_tbl.c.id.in_(rps_in_aggs))
|
||||
|
||||
sel = sa.select([rp_tbl.c.id]).select_from(inv_to_usage_join)
|
||||
sel = sel.where(where_conds)
|
||||
sel = sel.group_by(rp_tbl.c.id)
|
||||
|
||||
return [r[0] for r in ctx.session.execute(sel)]
|
||||
return set(r[0] for r in ctx.session.execute(sel))
|
||||
|
||||
|
||||
@db_api.placement_context_manager.reader
|
||||
|
@ -42,8 +42,9 @@ def _req_group_search_context(context, **kwargs):
|
||||
in_tree=kwargs.get('in_tree', None),
|
||||
)
|
||||
has_trees = res_ctx.has_provider_trees(context)
|
||||
sharing = res_ctx.get_sharing_providers(context)
|
||||
rg_ctx = res_ctx.RequestGroupSearchContext(
|
||||
context, request, has_trees)
|
||||
context, request, has_trees, sharing)
|
||||
|
||||
return rg_ctx
|
||||
|
||||
|
@ -18,6 +18,7 @@ from oslo_utils.fixture import uuidsentinel
|
||||
|
||||
from placement.db.sqlalchemy import models
|
||||
from placement import exception
|
||||
from placement import lib as placement_lib
|
||||
from placement.objects import allocation as alloc_obj
|
||||
from placement.objects import inventory as inv_obj
|
||||
from placement.objects import research_context as res_ctx
|
||||
@ -1088,31 +1089,47 @@ class SharedProviderTestCase(tb.PlacementDbBaseTestCase):
|
||||
allocation_ratio=1.5)
|
||||
if cn is cn1:
|
||||
tb.add_inventory(cn, orc.DISK_GB, 2000,
|
||||
min_unit=10,
|
||||
max_unit=100,
|
||||
min_unit=100,
|
||||
max_unit=2000,
|
||||
step_size=10)
|
||||
|
||||
# Create the shared storage pool
|
||||
ss = self._create_provider('shared storage')
|
||||
ss1 = self._create_provider('shared storage 1')
|
||||
ss2 = self._create_provider('shared storage 2')
|
||||
|
||||
# Give the shared storage pool some inventory of DISK_GB
|
||||
tb.add_inventory(ss, orc.DISK_GB, 2000,
|
||||
min_unit=10,
|
||||
max_unit=100,
|
||||
step_size=10)
|
||||
|
||||
# Mark the shared storage pool as having inventory shared among any
|
||||
# provider associated via aggregate
|
||||
tb.set_traits(ss, "MISC_SHARES_VIA_AGGREGATE")
|
||||
for ss, disk_amount in ((ss1, 2000), (ss2, 1000)):
|
||||
tb.add_inventory(ss, orc.DISK_GB, disk_amount,
|
||||
min_unit=100,
|
||||
max_unit=2000,
|
||||
step_size=10)
|
||||
# Mark the shared storage pool as having inventory shared among
|
||||
# any provider associated via aggregate
|
||||
tb.set_traits(ss, "MISC_SHARES_VIA_AGGREGATE")
|
||||
|
||||
# OK, now that has all been set up, let's verify that we get the ID of
|
||||
# the shared storage pool when we ask for DISK_GB
|
||||
got_ids = res_ctx.get_providers_with_shared_capacity(
|
||||
self.ctx,
|
||||
orc.STANDARDS.index(orc.DISK_GB),
|
||||
100,
|
||||
)
|
||||
self.assertEqual([ss.id], got_ids)
|
||||
# the shared storage pool
|
||||
got_ids = res_ctx.get_sharing_providers(self.ctx)
|
||||
self.assertEqual(set([ss1.id, ss2.id]), got_ids)
|
||||
|
||||
request = placement_lib.RequestGroup(
|
||||
use_same_provider=False,
|
||||
resources={orc.VCPU: 2,
|
||||
orc.MEMORY_MB: 256,
|
||||
orc.DISK_GB: 1500})
|
||||
has_trees = res_ctx.has_provider_trees(self.ctx)
|
||||
sharing = res_ctx.get_sharing_providers(self.ctx)
|
||||
rg_ctx = res_ctx.RequestGroupSearchContext(
|
||||
self.ctx, request, has_trees, sharing)
|
||||
|
||||
VCPU_ID = orc.STANDARDS.index(orc.VCPU)
|
||||
DISK_GB_ID = orc.STANDARDS.index(orc.DISK_GB)
|
||||
|
||||
rps_sharing_vcpu = rg_ctx.get_rps_with_shared_capacity(VCPU_ID)
|
||||
self.assertEqual(set(), rps_sharing_vcpu)
|
||||
|
||||
rps_sharing_dist = rg_ctx.get_rps_with_shared_capacity(DISK_GB_ID)
|
||||
self.assertEqual(set([ss1.id]), rps_sharing_dist)
|
||||
|
||||
|
||||
# We don't want to waste time sleeping in these tests. It would add
|
||||
|
Loading…
x
Reference in New Issue
Block a user