Check provider generation and retry on conflict

Update aggregate-related scheduler report client methods to use
placement microversion 1.19, which returns provider generation in GET
/rps/{u}/aggregates and handles generation conflicts in PUT
/rps/{u}/aggregates. Helper methods previously returning aggregates and
traits now also return the generation, which is fed through
appropriately to subsequent calls. As a result, the generation kwarg is
no longer needed in _refresh_associations, so it is removed.

Doing this exposes the race described in the cited bug, so we add a
retry decorator to the resource tracker's _update and the report
client's aggregate_{add|remove}_host methods.

Related to blueprint placement-aggregate-generation
Closes-Bug: #1779931

Change-Id: I3c5fbb18297db71e682fcddb5bf4536595d92383
This commit is contained in:
Eric Fried 2018-07-03 14:34:00 -05:00
parent d9e04c4ff0
commit 3518ccb665
6 changed files with 425 additions and 100 deletions

View File

@ -5306,7 +5306,8 @@ class AggregateAPI(base.Base):
host_name, aggregate.uuid)
except (exception.ResourceProviderNotFound,
exception.ResourceProviderAggregateRetrievalFailed,
exception.ResourceProviderUpdateFailed) as err:
exception.ResourceProviderUpdateFailed,
exception.ResourceProviderUpdateConflict) as err:
# NOTE(jaypipes): We don't want a failure perform the mirroring
# action in the placement service to be returned to the user (they
# probably don't know anything about the placement service and
@ -5371,7 +5372,8 @@ class AggregateAPI(base.Base):
host_name, aggregate.uuid)
except (exception.ResourceProviderNotFound,
exception.ResourceProviderAggregateRetrievalFailed,
exception.ResourceProviderUpdateFailed) as err:
exception.ResourceProviderUpdateFailed,
exception.ResourceProviderUpdateConflict) as err:
# NOTE(jaypipes): We don't want a failure perform the mirroring
# action in the placement service to be returned to the user (they
# probably don't know anything about the placement service and

View File

@ -20,6 +20,7 @@ model.
"""
import collections
import copy
import retrying
from oslo_log import log as logging
from oslo_serialization import jsonutils
@ -858,6 +859,9 @@ class ResourceTracker(object):
return True
return False
@retrying.retry(stop_max_attempt_number=4,
retry_on_exception=lambda e: isinstance(
e, exception.ResourceProviderUpdateConflict))
def _update(self, context, compute_node):
"""Update partial stats locally and populate them to Scheduler."""
if self._resource_change(compute_node):

View File

@ -13,11 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import copy
import functools
import random
import re
import retrying
import time
from keystoneauth1 import exceptions as ks_exc
@ -47,10 +49,14 @@ WARN_EVERY = 10
PLACEMENT_CLIENT_SEMAPHORE = 'placement_client'
GRANULAR_AC_VERSION = '1.25'
POST_RPS_RETURNS_PAYLOAD_API_VERSION = '1.20'
AGGREGATE_GENERATION_VERSION = '1.19'
NESTED_PROVIDER_API_VERSION = '1.14'
POST_ALLOCATIONS_API_VERSION = '1.13'
ALLOCATION_PROJECT_USER = '1.12'
AggInfo = collections.namedtuple('AggInfo', ['aggregates', 'generation'])
TraitInfo = collections.namedtuple('TraitInfo', ['traits', 'generation'])
def warn_limit(self, msg):
if self._warn_count:
@ -365,18 +371,23 @@ class SchedulerReportClient(object):
"""Queries the placement API for a resource provider's aggregates.
:param rp_uuid: UUID of the resource provider to grab aggregates for.
:return: A set() of aggregate UUIDs, which may be empty if the
specified provider has no aggregate associations.
:return: A namedtuple comprising:
* .aggregates: A set() of string aggregate UUIDs, which may
be empty if the specified provider is associated with no
aggregates.
* .generation: The resource provider generation.
:raise: ResourceProviderAggregateRetrievalFailed on errors. In
particular, we raise this exception (as opposed to returning
None or the empty set()) if the specified resource provider
does not exist.
"""
resp = self.get("/resource_providers/%s/aggregates" % rp_uuid,
version='1.1', global_request_id=context.global_id)
version=AGGREGATE_GENERATION_VERSION,
global_request_id=context.global_id)
if resp.status_code == 200:
data = resp.json()
return set(data['aggregates'])
return AggInfo(aggregates=set(data['aggregates']),
generation=data['resource_provider_generation'])
placement_req_id = get_placement_request_id(resp)
msg = ("[%(placement_req_id)s] Failed to retrieve aggregates from "
@ -397,8 +408,10 @@ class SchedulerReportClient(object):
:param context: The security context
:param rp_uuid: UUID of the resource provider to grab traits for.
:return: A set() of string trait names, which may be empty if the
specified provider has no traits.
:return: A namedtuple comprising:
* .traits: A set() of string trait names, which may be
empty if the specified provider has no traits.
* .generation: The resource provider generation.
:raise: ResourceProviderTraitRetrievalFailed on errors. In particular,
we raise this exception (as opposed to returning None or the
empty set()) if the specified resource provider does not exist.
@ -407,7 +420,9 @@ class SchedulerReportClient(object):
version='1.6', global_request_id=context.global_id)
if resp.status_code == 200:
return set(resp.json()['traits'])
json = resp.json()
return TraitInfo(traits=set(json['traits']),
generation=json['resource_provider_generation'])
placement_req_id = get_placement_request_id(resp)
LOG.error(
@ -674,9 +689,8 @@ class SchedulerReportClient(object):
# NOTE(efried): _refresh_associations doesn't refresh inventory
# (yet) - see that method's docstring for the why.
self._refresh_and_get_inventory(context, rp_to_refresh['uuid'])
self._refresh_associations(
context, rp_to_refresh['uuid'],
generation=rp_to_refresh.get('generation'), force=True)
self._refresh_associations(context, rp_to_refresh['uuid'],
force=True)
return uuid
@ -734,20 +748,14 @@ class SchedulerReportClient(object):
if curr is None:
return None
cur_gen = curr['resource_provider_generation']
# TODO(efried): This condition banks on the generation for a new RP
# starting at zero, which isn't part of the API. It also is only
# useful as an optimization on a freshly-created RP to which nothing
# has ever been done. And it's not much of an optimization, because
# updating the cache is super cheap. We should remove the condition.
if cur_gen:
curr_inv = curr['inventories']
self._provider_tree.update_inventory(rp_uuid, curr_inv,
generation=cur_gen)
self._provider_tree.update_inventory(
rp_uuid, curr['inventories'],
generation=curr['resource_provider_generation'])
return curr
def _refresh_associations(self, context, rp_uuid, generation=None,
force=False, refresh_sharing=True):
def _refresh_associations(self, context, rp_uuid, force=False,
refresh_sharing=True):
"""Refresh aggregates, traits, and (optionally) aggregate-associated
sharing providers for the specified resource provider uuid.
@ -762,8 +770,6 @@ class SchedulerReportClient(object):
:param context: The security context
:param rp_uuid: UUID of the resource provider to check for fresh
aggregates and traits
:param generation: The resource provider generation to set. If None,
the provider's generation is not updated.
:param force: If True, force the refresh
:param refresh_sharing: If True, fetch all the providers associated
by aggregate with the specified provider,
@ -776,7 +782,10 @@ class SchedulerReportClient(object):
"""
if force or self._associations_stale(rp_uuid):
# Refresh aggregates
aggs = self._get_provider_aggregates(context, rp_uuid)
agg_info = self._get_provider_aggregates(context, rp_uuid)
# If @safe_connect makes the above return None, this will raise
# TypeError. Good.
aggs, generation = agg_info.aggregates, agg_info.generation
msg = ("Refreshing aggregate associations for resource provider "
"%s, aggregates: %s")
LOG.debug(msg, rp_uuid, ','.join(aggs or ['None']))
@ -787,7 +796,10 @@ class SchedulerReportClient(object):
rp_uuid, aggs, generation=generation)
# Refresh traits
traits = self._get_provider_traits(context, rp_uuid)
trait_info = self._get_provider_traits(context, rp_uuid)
# If @safe_connect makes the above return None, this will raise
# TypeError. Good.
traits, generation = trait_info.traits, trait_info.generation
msg = ("Refreshing trait associations for resource provider %s, "
"traits: %s")
LOG.debug(msg, rp_uuid, ','.join(traits or ['None']))
@ -1250,7 +1262,7 @@ class SchedulerReportClient(object):
@safe_connect
def set_aggregates_for_provider(self, context, rp_uuid, aggregates,
use_cache=True):
use_cache=True, generation=None):
"""Replace a provider's aggregates with those specified.
The provider must exist - this method does not attempt to create it.
@ -1261,18 +1273,55 @@ class SchedulerReportClient(object):
:param aggregates: Iterable of aggregates to set on the provider.
:param use_cache: If False, indicates not to update the cache of
resource providers.
:raises: ResourceProviderUpdateFailed on any placement API failure.
:param generation: Resource provider generation. Required if use_cache
is False.
:raises: ResourceProviderUpdateConflict if the provider's generation
doesn't match the generation in the cache. Callers may choose
to retrieve the provider and its associations afresh and
redrive this operation.
:raises: ResourceProviderUpdateFailed on any other placement API
failure.
"""
# TODO(efried): Handle generation conflicts when supported by placement
# If a generation is specified, it trumps whatever's in the cache.
# Otherwise...
if generation is None:
if use_cache:
generation = self._provider_tree.data(rp_uuid).generation
else:
# Either cache or generation is required
raise ValueError(
_("generation is required with use_cache=False"))
# Check whether aggregates need updating. We can only do this if we
# have a cache entry with a matching generation.
try:
if (self._provider_tree.data(rp_uuid).generation == generation
and not self._provider_tree.have_aggregates_changed(
rp_uuid, aggregates)):
return
except ValueError:
# Not found in the cache; proceed
pass
url = '/resource_providers/%s/aggregates' % rp_uuid
aggregates = list(aggregates) if aggregates else []
resp = self.put(url, aggregates, version='1.1',
payload = {'aggregates': aggregates,
'resource_provider_generation': generation}
resp = self.put(url, payload, version=AGGREGATE_GENERATION_VERSION,
global_request_id=context.global_id)
if resp.status_code == 200:
placement_aggs = resp.json()['aggregates']
if use_cache:
self._provider_tree.update_aggregates(rp_uuid, placement_aggs)
# Try to update the cache regardless. If use_cache=False, ignore
# any failures.
try:
data = resp.json()
self._provider_tree.update_aggregates(
rp_uuid, data['aggregates'],
generation=data['resource_provider_generation'])
except ValueError:
if use_cache:
# The entry should've been there
raise
return
# Some error occurred; log it
@ -1286,8 +1335,24 @@ class SchedulerReportClient(object):
'status_code': resp.status_code,
'err_text': resp.text,
}
LOG.error(msg, args)
# If a conflict, invalidate the cache and raise special exception
if resp.status_code == 409:
# No reason to condition cache invalidation on use_cache - if we
# got a 409, the cache entry is still bogus if it exists; and the
# below is a no-op if it doesn't.
try:
self._provider_tree.remove(rp_uuid)
except ValueError:
pass
self._association_refresh_time.pop(rp_uuid, None)
LOG.warning(msg, args)
raise exception.ResourceProviderUpdateConflict(
uuid=rp_uuid, generation=generation, error=resp.text)
# Otherwise, raise generic exception
LOG.error(msg, args)
raise exception.ResourceProviderUpdateFailed(url=url, error=resp.text)
@safe_connect
@ -1929,6 +1994,9 @@ class SchedulerReportClient(object):
raise exception.ResourceProviderNotFound(name_or_uuid=name)
@retrying.retry(stop_max_attempt_number=4,
retry_on_exception=lambda e: isinstance(
e, exception.ResourceProviderUpdateConflict))
def aggregate_add_host(self, context, agg_uuid, host_name):
"""Looks up a resource provider by the supplied host name, and adds the
aggregate with supplied UUID to that resource provider.
@ -1947,6 +2015,8 @@ class SchedulerReportClient(object):
failing to get a provider's existing aggregates
:raises: `exception.ResourceProviderUpdateFailed` if there was a
failure attempting to save the provider aggregates
:raises: `exception.ResourceProviderUpdateConflict` if a concurrent
update to the provider was detected.
"""
rp = self._get_provider_by_name(context, host_name)
# NOTE(jaypipes): Unfortunately, due to @safe_connect,
@ -1962,16 +2032,21 @@ class SchedulerReportClient(object):
# with, however, so we first grab the list of aggregates for this
# provider and add the aggregate to the list of aggregates it already
# has
existing_aggs = self._get_provider_aggregates(context, rp_uuid)
agg_info = self._get_provider_aggregates(context, rp_uuid)
# @safe_connect can make the above return None
if agg_info is None:
raise exception.PlacementAPIConnectFailure()
existing_aggs, gen = agg_info.aggregates, agg_info.generation
if agg_uuid in existing_aggs:
return
new_aggs = existing_aggs | set([agg_uuid])
# TODO(jaypipes): Send provider generation (which is in the rp dict)
# along to set_aggregates_for_provider()
self.set_aggregates_for_provider(
context, rp_uuid, new_aggs, use_cache=False)
context, rp_uuid, new_aggs, use_cache=False, generation=gen)
@retrying.retry(stop_max_attempt_number=4,
retry_on_exception=lambda e: isinstance(
e, exception.ResourceProviderUpdateConflict))
def aggregate_remove_host(self, context, agg_uuid, host_name):
"""Looks up a resource provider by the supplied host name, and removes
the aggregate with supplied UUID from that resource provider.
@ -1990,6 +2065,8 @@ class SchedulerReportClient(object):
failing to get a provider's existing aggregates
:raises: `exception.ResourceProviderUpdateFailed` if there was a
failure attempting to save the provider aggregates
:raises: `exception.ResourceProviderUpdateConflict` if a concurrent
update to the provider was detected.
"""
rp = self._get_provider_by_name(context, host_name)
# NOTE(jaypipes): Unfortunately, due to @safe_connect,
@ -2005,12 +2082,14 @@ class SchedulerReportClient(object):
# associated with, however, so we first grab the list of aggregates for
# this provider and remove the aggregate from the list of aggregates it
# already has
existing_aggs = self._get_provider_aggregates(context, rp_uuid)
agg_info = self._get_provider_aggregates(context, rp_uuid)
# @safe_connect can make the above return None
if agg_info is None:
raise exception.PlacementAPIConnectFailure()
existing_aggs, gen = agg_info.aggregates, agg_info.generation
if agg_uuid not in existing_aggs:
return
new_aggs = existing_aggs - set([agg_uuid])
# TODO(jaypipes): Send provider generation (which is in the rp dict)
# along to set_aggregates_for_provider()
self.set_aggregates_for_provider(
context, rp_uuid, new_aggs, use_cache=False)
context, rp_uuid, new_aggs, use_cache=False, generation=gen)

View File

@ -881,8 +881,8 @@ class SchedulerReportClientTests(SchedulerReportClientTestBase):
self.client.update_from_provider_tree, self.context, new_tree)
# Placement didn't get updated
self.assertEqual(set(['HW_CPU_X86_AVX', 'HW_CPU_X86_AVX2']),
self.client._get_provider_traits(self.context,
uuids.root))
self.client._get_provider_traits(
self.context, uuids.root).traits)
# ...and the root was removed from the cache
self.assertFalse(self.client._provider_tree.exists(uuids.root))
@ -940,13 +940,14 @@ class SchedulerReportClientTests(SchedulerReportClientTestBase):
resp = self.client.get('/resource_providers/%s' % uuid)
self.assertEqual(404, resp.status_code)
@mock.patch('nova.compute.provider_tree.ProviderTree.update_aggregates')
def test_non_tree_aggregate_membership(self, upd_aggs_mock):
"""There are some methods of the reportclient that do NOT interact with
the reportclient's provider_tree cache of information. These methods
are called to add and remove members from a nova host aggregate and
ensure that the placement API has a mirrored record of the resource
provider's aggregate associations.
def test_non_tree_aggregate_membership(self):
"""There are some methods of the reportclient that interact with the
reportclient's provider_tree cache of information on a best-effort
basis. These methods are called to add and remove members from a nova
host aggregate and ensure that the placement API has a mirrored record
of the resource provider's aggregate associations. We want to simulate
this use case by invoking these methods with an empty cache and making
sure it never gets populated (and we don't raise ValueError).
"""
agg_uuid = uuids.agg
with self._interceptor():
@ -955,11 +956,17 @@ class SchedulerReportClientTests(SchedulerReportClientTestBase):
ptree = self.client.get_provider_tree_and_ensure_root(
self.context, self.compute_uuid, name=self.compute_name)
self.assertEqual([self.compute_uuid], ptree.get_provider_uuids())
# Now blow away the cache so we can ensure the use_cache=False
# behavior of aggregate_{add|remove}_host correctly ignores and/or
# doesn't attempt to populate/update it.
self.client._provider_tree.remove(self.compute_uuid)
self.assertEqual(
[], self.client._provider_tree.get_provider_uuids())
# Use the reportclient's _get_provider_aggregates() private method
# to verify no aggregates are yet associated with this provider
aggs = self.client._get_provider_aggregates(
self.context, self.compute_uuid)
self.context, self.compute_uuid).aggregates
self.assertEqual(set(), aggs)
# Now associate the compute **host name** with an aggregate and
@ -967,30 +974,30 @@ class SchedulerReportClientTests(SchedulerReportClientTestBase):
self.client.aggregate_add_host(
self.context, agg_uuid, self.compute_name)
# Check that the ProviderTree cache that was populated above during
# get_provider_tree_and_ensure_root() hasn't been modified (since
# Check that the ProviderTree cache hasn't been modified (since
# the aggregate_add_host() method is only called from nova-api and
# we don't want to have a ProviderTree cache at that layer.
cache_data = self.client._provider_tree.data(self.compute_uuid)
self.assertNotIn(agg_uuid, cache_data.aggregates)
self.assertEqual(
[], self.client._provider_tree.get_provider_uuids())
aggs = self.client._get_provider_aggregates(
self.context, self.compute_uuid)
self.context, self.compute_uuid).aggregates
self.assertEqual(set([agg_uuid]), aggs)
# Finally, remove the association and verify it's removed in
# placement
self.client.aggregate_remove_host(
self.context, agg_uuid, self.compute_name)
cache_data = self.client._provider_tree.data(self.compute_uuid)
self.assertNotIn(agg_uuid, cache_data.aggregates)
self.assertEqual(
[], self.client._provider_tree.get_provider_uuids())
aggs = self.client._get_provider_aggregates(
self.context, self.compute_uuid)
self.context, self.compute_uuid).aggregates
self.assertEqual(set(), aggs)
# Try removing the same host and verify no error
self.client.aggregate_remove_host(
self.context, agg_uuid, self.compute_name)
upd_aggs_mock.assert_not_called()
self.assertEqual(
[], self.client._provider_tree.get_provider_uuids())
def test_alloc_cands_smoke(self):
"""Simple call to get_allocation_candidates for version checking."""

View File

@ -1386,6 +1386,53 @@ class TestUpdateComputeNode(BaseTestCase):
exp_inv[rc_fields.ResourceClass.DISK_GB]['reserved'] = 1
self.assertEqual(exp_inv, ptree.data(new_compute.uuid).inventory)
@mock.patch('nova.objects.ComputeNode.save', new=mock.Mock())
def test_update_retry_success(self):
self._setup_rt()
orig_compute = _COMPUTE_NODE_FIXTURES[0].obj_clone()
self.rt.compute_nodes[_NODENAME] = orig_compute
self.rt.old_resources[_NODENAME] = orig_compute
# Deliberately changing local_gb to trigger updating inventory
new_compute = orig_compute.obj_clone()
new_compute.local_gb = 210000
# Emulate a driver that has implemented the update_from_provider_tree()
# virt driver method, so we hit the update_from_provider_tree path.
self.driver_mock.update_provider_tree.side_effect = lambda *a: None
ufpt_mock = self.rt.reportclient.update_from_provider_tree
ufpt_mock.side_effect = (
exc.ResourceProviderUpdateConflict(
uuid='uuid', generation=42, error='error'), None)
self.rt._update(mock.sentinel.ctx, new_compute)
self.assertEqual(2, ufpt_mock.call_count)
@mock.patch('nova.objects.ComputeNode.save', new=mock.Mock())
def test_update_retry_raises(self):
self._setup_rt()
orig_compute = _COMPUTE_NODE_FIXTURES[0].obj_clone()
self.rt.compute_nodes[_NODENAME] = orig_compute
self.rt.old_resources[_NODENAME] = orig_compute
# Deliberately changing local_gb to trigger updating inventory
new_compute = orig_compute.obj_clone()
new_compute.local_gb = 210000
# Emulate a driver that has implemented the update_from_provider_tree()
# virt driver method, so we hit the update_from_provider_tree path.
self.driver_mock.update_provider_tree.side_effect = lambda *a: None
ufpt_mock = self.rt.reportclient.update_from_provider_tree
ufpt_mock.side_effect = (
exc.ResourceProviderUpdateConflict(
uuid='uuid', generation=42, error='error'))
self.assertRaises(exc.ResourceProviderUpdateConflict,
self.rt._update, mock.sentinel.ctx, new_compute)
self.assertEqual(4, ufpt_mock.call_count)
def test_get_node_uuid(self):
self._setup_rt()
orig_compute = _COMPUTE_NODE_FIXTURES[0].obj_clone()

View File

@ -15,6 +15,7 @@ import time
import fixtures
from keystoneauth1 import exceptions as ks_exc
import mock
from oslo_serialization import jsonutils
from six.moves.urllib import parse
import nova.conf
@ -1192,8 +1193,10 @@ class TestProviderOperations(SchedulerReportClientTestCase):
}]
get_inv_mock.return_value = None
get_agg_mock.return_value = set([uuids.agg1])
get_trait_mock.return_value = set(['CUSTOM_GOLD'])
get_agg_mock.return_value = report.AggInfo(
aggregates=set([uuids.agg1]), generation=42)
get_trait_mock.return_value = report.TraitInfo(
traits=set(['CUSTOM_GOLD']), generation=43)
get_shr_mock.return_value = []
self.client._ensure_resource_provider(self.context, uuids.compute_node)
@ -1216,7 +1219,9 @@ class TestProviderOperations(SchedulerReportClientTestCase):
self.client._provider_tree.has_traits(uuids.compute_node,
['CUSTOM_SILVER']))
get_shr_mock.assert_called_once_with(self.context, set([uuids.agg1]))
self.assertTrue(self.client._provider_tree.exists(uuids.compute_node))
self.assertEqual(
43,
self.client._provider_tree.data(uuids.compute_node).generation)
self.assertFalse(create_rp_mock.called)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
@ -1409,7 +1414,7 @@ class TestProviderOperations(SchedulerReportClientTestCase):
mock_ref_inv.assert_has_calls([mock.call(self.context, uuid)
for uuid in tree_uuids])
mock_ref_assoc.assert_has_calls(
[mock.call(self.context, uuid, generation=42, force=True)
[mock.call(self.context, uuid, force=True)
for uuid in tree_uuids])
self.assertEqual(tree_uuids,
set(self.client._provider_tree.get_provider_uuids()))
@ -1931,11 +1936,10 @@ class TestProviderOperations(SchedulerReportClientTestCase):
def test_set_aggregates_for_provider(self):
aggs = [uuids.agg1, uuids.agg2]
resp_mock = mock.Mock(status_code=200)
resp_mock.json.return_value = {
'aggregates': aggs,
}
self.ks_adap_mock.put.return_value = resp_mock
self.ks_adap_mock.put.return_value = fake_requests.FakeResponse(
200, content=jsonutils.dumps({
'aggregates': aggs,
'resource_provider_generation': 1}))
# Prime the provider tree cache
self.client._provider_tree.new_root('rp', uuids.rp, generation=0)
@ -1944,16 +1948,26 @@ class TestProviderOperations(SchedulerReportClientTestCase):
self.client.set_aggregates_for_provider(self.context, uuids.rp, aggs)
exp_payload = {'aggregates': aggs,
'resource_provider_generation': 0}
self.ks_adap_mock.put.assert_called_once_with(
'/resource_providers/%s/aggregates' % uuids.rp, json=aggs,
microversion='1.1',
'/resource_providers/%s/aggregates' % uuids.rp, json=exp_payload,
microversion='1.19',
headers={'X-Openstack-Request-Id': self.context.global_id})
# Cache was updated
self.assertEqual(set(aggs),
self.client._provider_tree.data(uuids.rp).aggregates)
ptree_data = self.client._provider_tree.data(uuids.rp)
self.assertEqual(set(aggs), ptree_data.aggregates)
self.assertEqual(1, ptree_data.generation)
def test_set_aggregates_for_provider_bad_args(self):
self.assertRaises(ValueError, self.client.set_aggregates_for_provider,
self.context, uuids.rp, {}, use_cache=False)
self.assertRaises(ValueError, self.client.set_aggregates_for_provider,
self.context, uuids.rp, {}, use_cache=False,
generation=None)
def test_set_aggregates_for_provider_fail(self):
self.ks_adap_mock.put.return_value = mock.Mock(status_code=503)
self.ks_adap_mock.put.return_value = fake_requests.FakeResponse(503)
# Prime the provider tree cache
self.client._provider_tree.new_root('rp', uuids.rp, generation=0)
self.assertRaises(
@ -1964,6 +1978,49 @@ class TestProviderOperations(SchedulerReportClientTestCase):
self.assertEqual(set(),
self.client._provider_tree.data(uuids.rp).aggregates)
def test_set_aggregates_for_provider_conflict(self):
# Prime the provider tree cache
self.client._provider_tree.new_root('rp', uuids.rp, generation=0)
self.ks_adap_mock.put.return_value = fake_requests.FakeResponse(409)
self.assertRaises(
exception.ResourceProviderUpdateConflict,
self.client.set_aggregates_for_provider,
self.context, uuids.rp, [uuids.agg])
# The cache was invalidated
self.assertNotIn(uuids.rp,
self.client._provider_tree.get_provider_uuids())
self.assertNotIn(uuids.rp, self.client._association_refresh_time)
def test_set_aggregates_for_provider_short_circuit(self):
"""No-op when aggregates have not changed."""
# Prime the provider tree cache
self.client._provider_tree.new_root('rp', uuids.rp, generation=7)
self.client.set_aggregates_for_provider(self.context, uuids.rp, [])
self.ks_adap_mock.put.assert_not_called()
def test_set_aggregates_for_provider_no_short_circuit(self):
"""Don't short-circuit if generation doesn't match, even if aggs have
not changed.
"""
# Prime the provider tree cache
self.client._provider_tree.new_root('rp', uuids.rp, generation=2)
self.ks_adap_mock.put.return_value = fake_requests.FakeResponse(
200, content=jsonutils.dumps({
'aggregates': [],
'resource_provider_generation': 5}))
self.client.set_aggregates_for_provider(self.context, uuids.rp, [],
generation=4)
exp_payload = {'aggregates': [],
'resource_provider_generation': 4}
self.ks_adap_mock.put.assert_called_once_with(
'/resource_providers/%s/aggregates' % uuids.rp, json=exp_payload,
microversion='1.19',
headers={'X-Openstack-Request-Id': self.context.global_id})
# Cache was updated
ptree_data = self.client._provider_tree.data(uuids.rp)
self.assertEqual(set(), ptree_data.aggregates)
self.assertEqual(5, ptree_data.generation)
class TestAggregates(SchedulerReportClientTestCase):
def test_get_provider_aggregates_found(self):
@ -1973,16 +2030,18 @@ class TestAggregates(SchedulerReportClientTestCase):
uuids.agg1,
uuids.agg2,
]
resp_mock.json.return_value = {'aggregates': aggs}
resp_mock.json.return_value = {'aggregates': aggs,
'resource_provider_generation': 42}
self.ks_adap_mock.get.return_value = resp_mock
result = self.client._get_provider_aggregates(self.context, uuid)
result, gen = self.client._get_provider_aggregates(self.context, uuid)
expected_url = '/resource_providers/' + uuid + '/aggregates'
self.ks_adap_mock.get.assert_called_once_with(
expected_url, microversion='1.1',
expected_url, microversion='1.19',
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertEqual(set(aggs), result)
self.assertEqual(42, gen)
@mock.patch.object(report.LOG, 'error')
def test_get_provider_aggregates_error(self, log_mock):
@ -2002,7 +2061,7 @@ class TestAggregates(SchedulerReportClientTestCase):
expected_url = '/resource_providers/' + uuid + '/aggregates'
self.ks_adap_mock.get.assert_called_once_with(
expected_url, microversion='1.1',
expected_url, microversion='1.19',
headers={'X-Openstack-Request-Id': self.context.global_id})
self.assertTrue(log_mock.called)
self.assertEqual(uuids.request_id,
@ -2021,10 +2080,11 @@ class TestTraits(SchedulerReportClientTestCase):
'CUSTOM_GOLD',
'CUSTOM_SILVER',
]
resp_mock.json.return_value = {'traits': traits}
resp_mock.json.return_value = {'traits': traits,
'resource_provider_generation': 42}
self.ks_adap_mock.get.return_value = resp_mock
result = self.client._get_provider_traits(self.context, uuid)
result, gen = self.client._get_provider_traits(self.context, uuid)
expected_url = '/resource_providers/' + uuid + '/traits'
self.ks_adap_mock.get.assert_called_once_with(
@ -2032,6 +2092,7 @@ class TestTraits(SchedulerReportClientTestCase):
headers={'X-Openstack-Request-Id': self.context.global_id},
**self.trait_api_kwargs)
self.assertEqual(set(traits), result)
self.assertEqual(42, gen)
@mock.patch.object(report.LOG, 'error')
def test_get_provider_traits_error(self, log_mock):
@ -2220,13 +2281,15 @@ class TestAssociations(SchedulerReportClientTestCase):
uuid = uuids.compute_node
# Seed the provider tree so _refresh_associations finds the provider
self.client._provider_tree.new_root('compute', uuid, generation=1)
mock_agg_get.return_value = set([uuids.agg1])
mock_trait_get.return_value = set(['CUSTOM_GOLD'])
mock_agg_get.return_value = report.AggInfo(
aggregates=set([uuids.agg1]), generation=42)
mock_trait_get.return_value = report.TraitInfo(
traits=set(['CUSTOM_GOLD']), generation=43)
self.client._refresh_associations(self.context, uuid)
mock_agg_get.assert_called_once_with(self.context, uuid)
mock_trait_get.assert_called_once_with(self.context, uuid)
mock_shr_get.assert_called_once_with(
self.context, mock_agg_get.return_value)
self.context, mock_agg_get.return_value[0])
self.assertIn(uuid, self.client._association_refresh_time)
self.assertTrue(
self.client._provider_tree.in_aggregates(uuid, [uuids.agg1]))
@ -2236,6 +2299,7 @@ class TestAssociations(SchedulerReportClientTestCase):
self.client._provider_tree.has_traits(uuid, ['CUSTOM_GOLD']))
self.assertFalse(
self.client._provider_tree.has_traits(uuid, ['CUSTOM_SILVER']))
self.assertEqual(43, self.client._provider_tree.data(uuid).generation)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_provider_aggregates')
@ -2250,8 +2314,10 @@ class TestAssociations(SchedulerReportClientTestCase):
uuid = uuids.compute_node
# Seed the provider tree so _refresh_associations finds the provider
self.client._provider_tree.new_root('compute', uuid, generation=1)
mock_agg_get.return_value = set([uuids.agg1])
mock_trait_get.return_value = set(['CUSTOM_GOLD'])
mock_agg_get.return_value = report.AggInfo(
aggregates=set([uuids.agg1]), generation=42)
mock_trait_get.return_value = report.TraitInfo(
traits=set(['CUSTOM_GOLD']), generation=43)
self.client._refresh_associations(self.context, uuid,
refresh_sharing=False)
mock_agg_get.assert_called_once_with(self.context, uuid)
@ -2266,6 +2332,7 @@ class TestAssociations(SchedulerReportClientTestCase):
self.client._provider_tree.has_traits(uuid, ['CUSTOM_GOLD']))
self.assertFalse(
self.client._provider_tree.has_traits(uuid, ['CUSTOM_SILVER']))
self.assertEqual(43, self.client._provider_tree.data(uuid).generation)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_provider_aggregates')
@ -2301,8 +2368,10 @@ class TestAssociations(SchedulerReportClientTestCase):
uuid = uuids.compute_node
# Seed the provider tree so _refresh_associations finds the provider
self.client._provider_tree.new_root('compute', uuid, generation=1)
mock_agg_get.return_value = set([])
mock_trait_get.return_value = set([])
mock_agg_get.return_value = report.AggInfo(aggregates=set([]),
generation=42)
mock_trait_get.return_value = report.TraitInfo(traits=set([]),
generation=43)
mock_shr_get.return_value = []
# Called a first time because association_refresh_time is empty.
@ -3455,13 +3524,16 @@ class TestAggregateAddRemoveHost(SchedulerReportClientTestCase):
self, mock_get_by_name, mock_get_aggs, mock_set_aggs):
mock_get_by_name.return_value = {
'uuid': uuids.cn1,
'generation': 1,
}
agg_uuid = uuids.agg1
mock_get_aggs.return_value = set([])
mock_get_aggs.return_value = report.AggInfo(aggregates=set([]),
generation=42)
name = 'cn1'
self.client.aggregate_add_host(self.context, agg_uuid, name)
mock_set_aggs.assert_called_once_with(
self.context, uuids.cn1, set([agg_uuid]), use_cache=False)
self.context, uuids.cn1, set([agg_uuid]), use_cache=False,
generation=42)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'set_aggregates_for_provider')
@ -3473,21 +3545,24 @@ class TestAggregateAddRemoveHost(SchedulerReportClientTestCase):
self, mock_get_by_name, mock_get_aggs, mock_set_aggs):
mock_get_by_name.return_value = {
'uuid': uuids.cn1,
'generation': 1,
}
agg1_uuid = uuids.agg1
agg2_uuid = uuids.agg2
agg3_uuid = uuids.agg3
mock_get_aggs.return_value = set([agg1_uuid])
mock_get_aggs.return_value = report.AggInfo(
aggregates=set([agg1_uuid]), generation=42)
name = 'cn1'
self.client.aggregate_add_host(self.context, agg1_uuid, name)
mock_set_aggs.assert_not_called()
mock_get_aggs.reset_mock()
mock_set_aggs.reset_mock()
mock_get_aggs.return_value = set([agg1_uuid, agg3_uuid])
mock_get_aggs.return_value = report.AggInfo(
aggregates=set([agg1_uuid, agg3_uuid]), generation=43)
self.client.aggregate_add_host(self.context, agg2_uuid, name)
mock_set_aggs.assert_called_once_with(
self.context, uuids.cn1, set([agg1_uuid, agg2_uuid, agg3_uuid]),
use_cache=False)
use_cache=False, generation=43)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_provider_by_name')
@ -3505,6 +3580,58 @@ class TestAggregateAddRemoveHost(SchedulerReportClientTestCase):
self.client.aggregate_add_host, self.context, agg_uuid, name)
self.mock_get.assert_not_called()
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'set_aggregates_for_provider')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_provider_aggregates')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_provider_by_name')
def test_aggregate_add_host_retry_success(
self, mock_get_by_name, mock_get_aggs, mock_set_aggs):
mock_get_by_name.return_value = {
'uuid': uuids.cn1,
'generation': 1,
}
gens = (42, 43, 44)
mock_get_aggs.side_effect = (
report.AggInfo(aggregates=set([]), generation=gen) for gen in gens)
mock_set_aggs.side_effect = (
exception.ResourceProviderUpdateConflict(
uuid='uuid', generation=42, error='error'),
exception.ResourceProviderUpdateConflict(
uuid='uuid', generation=43, error='error'),
None,
)
self.client.aggregate_add_host(self.context, uuids.agg1, 'cn1')
mock_set_aggs.assert_has_calls([mock.call(
self.context, uuids.cn1, set([uuids.agg1]), use_cache=False,
generation=gen) for gen in gens])
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'set_aggregates_for_provider')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_provider_aggregates')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_provider_by_name')
def test_aggregate_add_host_retry_raises(
self, mock_get_by_name, mock_get_aggs, mock_set_aggs):
mock_get_by_name.return_value = {
'uuid': uuids.cn1,
'generation': 1,
}
gens = (42, 43, 44, 45)
mock_get_aggs.side_effect = (
report.AggInfo(aggregates=set([]), generation=gen) for gen in gens)
mock_set_aggs.side_effect = (
exception.ResourceProviderUpdateConflict(
uuid='uuid', generation=gen, error='error') for gen in gens)
self.assertRaises(
exception.ResourceProviderUpdateConflict,
self.client.aggregate_add_host, self.context, uuids.agg1, 'cn1')
mock_set_aggs.assert_has_calls([mock.call(
self.context, uuids.cn1, set([uuids.agg1]), use_cache=False,
generation=gen) for gen in gens])
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_provider_by_name')
def test_aggregate_remove_host_no_placement(self, mock_get_by_name):
@ -3531,13 +3658,15 @@ class TestAggregateAddRemoveHost(SchedulerReportClientTestCase):
self, mock_get_by_name, mock_get_aggs, mock_set_aggs):
mock_get_by_name.return_value = {
'uuid': uuids.cn1,
'generation': 1,
}
agg_uuid = uuids.agg1
mock_get_aggs.return_value = set([agg_uuid])
mock_get_aggs.return_value = report.AggInfo(aggregates=set([agg_uuid]),
generation=42)
name = 'cn1'
self.client.aggregate_remove_host(self.context, agg_uuid, name)
mock_set_aggs.assert_called_once_with(
self.context, uuids.cn1, set([]), use_cache=False)
self.context, uuids.cn1, set([]), use_cache=False, generation=42)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'set_aggregates_for_provider')
@ -3549,18 +3678,75 @@ class TestAggregateAddRemoveHost(SchedulerReportClientTestCase):
self, mock_get_by_name, mock_get_aggs, mock_set_aggs):
mock_get_by_name.return_value = {
'uuid': uuids.cn1,
'generation': 1,
}
agg1_uuid = uuids.agg1
agg2_uuid = uuids.agg2
agg3_uuid = uuids.agg3
mock_get_aggs.return_value = set([])
mock_get_aggs.return_value = report.AggInfo(aggregates=set([]),
generation=42)
name = 'cn1'
self.client.aggregate_remove_host(self.context, agg2_uuid, name)
mock_set_aggs.assert_not_called()
mock_get_aggs.reset_mock()
mock_set_aggs.reset_mock()
mock_get_aggs.return_value = set([agg1_uuid, agg2_uuid, agg3_uuid])
mock_get_aggs.return_value = report.AggInfo(
aggregates=set([agg1_uuid, agg2_uuid, agg3_uuid]), generation=43)
self.client.aggregate_remove_host(self.context, agg2_uuid, name)
mock_set_aggs.assert_called_once_with(
self.context, uuids.cn1, set([agg1_uuid, agg3_uuid]),
use_cache=False)
use_cache=False, generation=43)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'set_aggregates_for_provider')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_provider_aggregates')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_provider_by_name')
def test_aggregate_remove_host_retry_success(
self, mock_get_by_name, mock_get_aggs, mock_set_aggs):
mock_get_by_name.return_value = {
'uuid': uuids.cn1,
'generation': 1,
}
gens = (42, 43, 44)
mock_get_aggs.side_effect = (
report.AggInfo(aggregates=set([uuids.agg1]), generation=gen)
for gen in gens)
mock_set_aggs.side_effect = (
exception.ResourceProviderUpdateConflict(
uuid='uuid', generation=42, error='error'),
exception.ResourceProviderUpdateConflict(
uuid='uuid', generation=43, error='error'),
None,
)
self.client.aggregate_remove_host(self.context, uuids.agg1, 'cn1')
mock_set_aggs.assert_has_calls([mock.call(
self.context, uuids.cn1, set([]), use_cache=False,
generation=gen) for gen in gens])
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'set_aggregates_for_provider')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_provider_aggregates')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_provider_by_name')
def test_aggregate_remove_host_retry_raises(
self, mock_get_by_name, mock_get_aggs, mock_set_aggs):
mock_get_by_name.return_value = {
'uuid': uuids.cn1,
'generation': 1,
}
gens = (42, 43, 44, 45)
mock_get_aggs.side_effect = (
report.AggInfo(aggregates=set([uuids.agg1]), generation=gen)
for gen in gens)
mock_set_aggs.side_effect = (
exception.ResourceProviderUpdateConflict(
uuid='uuid', generation=gen, error='error') for gen in gens)
self.assertRaises(
exception.ResourceProviderUpdateConflict,
self.client.aggregate_remove_host, self.context, uuids.agg1, 'cn1')
mock_set_aggs.assert_has_calls([mock.call(
self.context, uuids.cn1, set([]), use_cache=False,
generation=gen) for gen in gens])