Merge "placement: genericize on resource providers"

This commit is contained in:
Jenkins 2016-11-27 11:19:38 +00:00 committed by Gerrit Code Review
commit 7b0db52b45
2 changed files with 83 additions and 71 deletions

View File

@ -264,29 +264,29 @@ class SchedulerReportClient(object):
self._resource_providers[uuid] = rp
return rp
def _get_inventory(self, compute_node):
url = '/resource_providers/%s/inventories' % compute_node.uuid
def _get_inventory(self, rp_uuid):
url = '/resource_providers/%s/inventories' % rp_uuid
result = self.get(url)
if not result:
return {'inventories': {}}
return result.json()
def _update_inventory_attempt(self, compute_node):
"""Update the inventory for this compute node if needed.
def _update_inventory_attempt(self, rp_uuid, inv_data):
"""Update the inventory for this resource provider if needed.
:param compute_node: The objects.ComputeNode for the operation
:param rp_uuid: The resource provider UUID for the operation
:param inv_data: The new inventory for the resource provider
:returns: True if the inventory was updated (or did not need to be),
False otherwise.
"""
inv_data = _compute_node_to_inventory_dict(compute_node)
curr = self._get_inventory(compute_node)
curr = self._get_inventory(rp_uuid)
# Update our generation immediately, if possible. Even if there
# are no inventories we should always have a generation but let's
# be careful.
server_gen = curr.get('resource_provider_generation')
if server_gen:
my_rp = self._resource_providers[compute_node.uuid]
my_rp = self._resource_providers[rp_uuid]
if server_gen != my_rp.generation:
LOG.debug('Updating our resource provider generation '
'from %(old)i to %(new)i',
@ -298,26 +298,28 @@ class SchedulerReportClient(object):
if inv_data == curr.get('inventories', {}):
return True
cur_rp_gen = self._resource_providers[compute_node.uuid].generation
cur_rp_gen = self._resource_providers[rp_uuid].generation
payload = {
'resource_provider_generation': cur_rp_gen,
'inventories': inv_data,
}
url = '/resource_providers/%s/inventories' % compute_node.uuid
url = '/resource_providers/%s/inventories' % rp_uuid
result = self.put(url, payload)
if result.status_code == 409:
LOG.info(_LI('Inventory update conflict for %s'),
compute_node.uuid)
rp_uuid)
# Invalidate our cache and re-fetch the resource provider
# to be sure to get the latest generation.
del self._resource_providers[compute_node.uuid]
self._ensure_resource_provider(compute_node.uuid,
compute_node.hypervisor_hostname)
del self._resource_providers[rp_uuid]
# NOTE(jaypipes): We don't need to pass a name parameter to
# _ensure_resource_provider() because we know the resource provider
# record already exists. We're just reloading the record here.
self._ensure_resource_provider(rp_uuid)
return False
elif not result:
LOG.warning(_LW('Failed to update inventory for '
LOG.warning(_LW('Failed to update inventory for resource provider '
'%(uuid)s: %(status)i %(text)s'),
{'uuid': compute_node.uuid,
{'uuid': rp_uuid,
'status': result.status_code,
'text': result.text})
return False
@ -325,9 +327,9 @@ class SchedulerReportClient(object):
if result.status_code != 200:
LOG.info(
_LI('Received unexpected response code %(code)i while '
'trying to update inventory for compute node %(uuid)s'
'trying to update inventory for resource provider %(uuid)s'
': %(text)s'),
{'uuid': compute_node.uuid,
{'uuid': rp_uuid,
'code': result.status_code,
'text': result.text})
return False
@ -336,15 +338,15 @@ class SchedulerReportClient(object):
updated_inventories_result = result.json()
new_gen = updated_inventories_result['resource_provider_generation']
self._resource_providers[compute_node.uuid].generation = new_gen
self._resource_providers[rp_uuid].generation = new_gen
LOG.debug('Updated inventory for %s at generation %i',
compute_node.uuid, new_gen)
rp_uuid, new_gen)
return True
@safe_connect
def _update_inventory(self, compute_node):
def _update_inventory(self, rp_uuid, inv_data):
for attempt in (1, 2, 3):
if compute_node.uuid not in self._resource_providers:
if rp_uuid not in self._resource_providers:
# NOTE(danms): Either we failed to fetch/create the RP
# on our first attempt, or a previous attempt had to
# invalidate the cache, and we were unable to refresh
@ -352,7 +354,7 @@ class SchedulerReportClient(object):
LOG.warning(_LW(
'Unable to refresh my resource provider record'))
return False
if self._update_inventory_attempt(compute_node):
if self._update_inventory_attempt(rp_uuid, inv_data):
return True
time.sleep(1)
return False
@ -365,26 +367,27 @@ class SchedulerReportClient(object):
compute_node.save()
self._ensure_resource_provider(compute_node.uuid,
compute_node.hypervisor_hostname)
self._update_inventory(compute_node)
inv_data = _compute_node_to_inventory_dict(compute_node)
self._update_inventory(compute_node.uuid, inv_data)
def _get_allocations_for_instance(self, compute_node, instance):
def _get_allocations_for_instance(self, rp_uuid, instance):
url = '/allocations/%s' % instance.uuid
resp = self.get(url)
if not resp:
return {}
else:
# NOTE(cdent): This trims to just the allocations being
# used on this compute node. In the future when there
# used on this resource provider. In the future when there
# are shared resources there might be other providers.
return resp.json()['allocations'].get(
compute_node.uuid, {}).get('resources', {})
rp_uuid, {}).get('resources', {})
@safe_connect
def _allocate_for_instance(self, compute_node, instance):
def _allocate_for_instance(self, rp_uuid, instance):
url = '/allocations/%s' % instance.uuid
my_allocations = _instance_to_allocations_dict(instance)
current_allocations = self._get_allocations_for_instance(compute_node,
current_allocations = self._get_allocations_for_instance(rp_uuid,
instance)
if current_allocations == my_allocations:
allocstr = ','.join(['%s=%s' % (k, v)
@ -397,7 +400,7 @@ class SchedulerReportClient(object):
'allocations': [
{
'resource_provider': {
'uuid': compute_node.uuid,
'uuid': rp_uuid,
},
'resources': my_allocations,
},
@ -435,13 +438,13 @@ class SchedulerReportClient(object):
def update_instance_allocation(self, compute_node, instance, sign):
if sign > 0:
self._allocate_for_instance(compute_node, instance)
self._allocate_for_instance(compute_node.uuid, instance)
else:
self._delete_allocation_for_instance(instance.uuid)
@safe_connect
def _get_allocations(self, compute_node):
url = '/resource_providers/%s/allocations' % compute_node.uuid
def _get_allocations(self, rp_uuid):
url = '/resource_providers/%s/allocations' % rp_uuid
resp = self.get(url)
if not resp:
return {}
@ -449,7 +452,7 @@ class SchedulerReportClient(object):
return resp.json()['allocations']
def remove_deleted_instances(self, compute_node, instance_uuids):
allocations = self._get_allocations(compute_node)
allocations = self._get_allocations(compute_node.uuid)
if allocations is None:
allocations = {}

View File

@ -98,6 +98,16 @@ class SchedulerReportClientTestCase(test.NoDBTestCase):
super(SchedulerReportClientTestCase, self).setUp()
self.context = context.get_admin_context()
self.ks_sess_mock = mock.Mock()
self.compute_node = objects.ComputeNode(
uuid=uuids.compute_node,
hypervisor_hostname='foo',
vcpus=8,
cpu_allocation_ratio=16.0,
memory_mb=1024,
ram_allocation_ratio=1.5,
local_gb=10,
disk_allocation_ratio=1.0,
)
with test.nested(
mock.patch('keystoneauth1.session.Session',
@ -398,40 +408,17 @@ class SchedulerReportClientTestCase(test.NoDBTestCase):
'_ensure_resource_provider')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_update_inventory_attempt')
def test_update_resource_stats_rp_fail(self, mock_ui, mock_erp):
cn = mock.MagicMock()
@mock.patch('nova.objects.ComputeNode.save')
def test_update_resource_stats(self, mock_save, mock_ui, mock_erp):
cn = self.compute_node
self.client.update_resource_stats(cn)
cn.save.assert_called_once_with()
mock_save.assert_called_once_with()
mock_erp.assert_called_once_with(cn.uuid, cn.hypervisor_hostname)
self.assertFalse(mock_ui.called)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_ensure_resource_provider')
@mock.patch.object(objects.ComputeNode, 'save')
def test_update_resource_stats_saves(self, mock_save, mock_ensure):
cn = objects.ComputeNode(context=self.context,
uuid=uuids.compute_node,
hypervisor_hostname='host1')
self.client.update_resource_stats(cn)
mock_save.assert_called_once_with()
mock_ensure.assert_called_once_with(uuids.compute_node, 'host1')
class TestInventory(SchedulerReportClientTestCase):
def setUp(self):
super(TestInventory, self).setUp()
self.compute_node = objects.ComputeNode(
uuid=uuids.compute_node,
hypervisor_hostname='foo',
vcpus=8,
cpu_allocation_ratio=16.0,
memory_mb=1024,
ram_allocation_ratio=1.5,
local_gb=10,
disk_allocation_ratio=1.0,
)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'get')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
@ -463,7 +450,10 @@ class TestInventory(SchedulerReportClientTestCase):
}
}
result = self.client._update_inventory_attempt(compute_node)
inv_data = report._compute_node_to_inventory_dict(compute_node)
result = self.client._update_inventory_attempt(
compute_node.uuid, inv_data
)
self.assertTrue(result)
exp_url = '/resource_providers/%s/inventories' % uuid
@ -541,7 +531,10 @@ class TestInventory(SchedulerReportClientTestCase):
},
}
}
result = self.client._update_inventory_attempt(compute_node)
inv_data = report._compute_node_to_inventory_dict(compute_node)
result = self.client._update_inventory_attempt(
compute_node.uuid, inv_data
)
self.assertTrue(result)
exp_url = '/resource_providers/%s/inventories' % uuid
mock_get.assert_called_once_with(exp_url)
@ -569,13 +562,16 @@ class TestInventory(SchedulerReportClientTestCase):
mock_get.return_value = {}
mock_put.return_value.status_code = 409
result = self.client._update_inventory_attempt(compute_node)
inv_data = report._compute_node_to_inventory_dict(compute_node)
result = self.client._update_inventory_attempt(
compute_node.uuid, inv_data
)
self.assertFalse(result)
# Invalidated the cache
self.assertNotIn(uuid, self.client._resource_providers)
# Refreshed our resource provider
mock_ensure.assert_called_once_with(uuid, 'foo')
mock_ensure.assert_called_once_with(uuid)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_inventory')
@ -593,7 +589,10 @@ class TestInventory(SchedulerReportClientTestCase):
mock_get.return_value = {}
mock_put.return_value.status_code = 234
result = self.client._update_inventory_attempt(compute_node)
inv_data = report._compute_node_to_inventory_dict(compute_node)
result = self.client._update_inventory_attempt(
compute_node.uuid, inv_data
)
self.assertFalse(result)
# No cache invalidation
@ -619,7 +618,10 @@ class TestInventory(SchedulerReportClientTestCase):
# Thanks py3
mock_put.return_value.__bool__.return_value = False
result = self.client._update_inventory_attempt(compute_node)
inv_data = report._compute_node_to_inventory_dict(compute_node)
result = self.client._update_inventory_attempt(
compute_node.uuid, inv_data
)
self.assertFalse(result)
# No cache invalidation
@ -639,7 +641,9 @@ class TestInventory(SchedulerReportClientTestCase):
mock_update.side_effect = (False, True)
self.client._resource_providers[cn.uuid] = True
result = self.client._update_inventory(cn)
result = self.client._update_inventory(
cn.uuid, mock.sentinel.inv_data
)
self.assertTrue(result)
# Only slept once
@ -658,15 +662,20 @@ class TestInventory(SchedulerReportClientTestCase):
mock_update.side_effect = (False, False, False)
self.client._resource_providers[cn.uuid] = True
result = self.client._update_inventory(cn)
result = self.client._update_inventory(
cn.uuid, mock.sentinel.inv_data
)
self.assertFalse(result)
# Slept three times
mock_sleep.assert_has_calls([mock.call(1), mock.call(1), mock.call(1)])
# Three attempts to update
mock_update.assert_has_calls([mock.call(cn), mock.call(cn),
mock.call(cn)])
mock_update.assert_has_calls([
mock.call(cn.uuid, mock.sentinel.inv_data),
mock.call(cn.uuid, mock.sentinel.inv_data),
mock.call(cn.uuid, mock.sentinel.inv_data),
])
# Slept three times
mock_sleep.assert_has_calls([mock.call(1), mock.call(1), mock.call(1)])