Merge "Report compute node inventories through placement"
This commit is contained in:
commit
8ee0cdbfc1
@ -85,6 +85,15 @@ class SchedulerReportClient(object):
|
||||
url, json=data,
|
||||
endpoint_filter=self.ks_filter, raise_exc=False)
|
||||
|
||||
def put(self, url, data):
|
||||
# NOTE(sdague): using json= instead of data= sets the
|
||||
# media type to application/json for us. Placement API is
|
||||
# more sensitive to this than other APIs in the OpenStack
|
||||
# ecosystem.
|
||||
return self._client.put(
|
||||
url, json=data,
|
||||
endpoint_filter=self.ks_filter, raise_exc=False)
|
||||
|
||||
@safe_connect
|
||||
def _get_resource_provider(self, uuid):
|
||||
"""Queries the placement API for a resource provider record with the
|
||||
@ -191,6 +200,89 @@ class SchedulerReportClient(object):
|
||||
self._resource_providers[uuid] = rp
|
||||
return rp
|
||||
|
||||
def _compute_node_inventory(self, compute_node):
|
||||
inventories = [
|
||||
{'resource_class': 'VCPU',
|
||||
'total': compute_node.vcpus,
|
||||
'reserved': 0,
|
||||
'min_unit': 1,
|
||||
'max_unit': 1,
|
||||
'step_size': 1,
|
||||
'allocation_ratio': compute_node.cpu_allocation_ratio},
|
||||
{'resource_class': 'MEMORY_MB',
|
||||
'total': compute_node.memory_mb,
|
||||
'reserved': CONF.reserved_host_memory_mb,
|
||||
'min_unit': 1,
|
||||
'max_unit': 1,
|
||||
'step_size': 1,
|
||||
'allocation_ratio': compute_node.ram_allocation_ratio},
|
||||
{'resource_class': 'DISK_GB',
|
||||
'total': compute_node.local_gb,
|
||||
'reserved': CONF.reserved_host_disk_mb * 1024,
|
||||
'min_unit': 1,
|
||||
'max_unit': 1,
|
||||
'step_size': 1,
|
||||
'allocation_ratio': compute_node.disk_allocation_ratio},
|
||||
]
|
||||
generation = self._resource_providers[compute_node.uuid].generation
|
||||
data = {
|
||||
'resource_provider_generation': generation,
|
||||
'inventories': inventories,
|
||||
}
|
||||
return data
|
||||
|
||||
@safe_connect
|
||||
def _update_inventory(self, compute_node):
|
||||
"""Update the inventory for this compute node if needed.
|
||||
|
||||
:param compute_node: The objects.ComputeNode for the operation
|
||||
:returns: True if the inventory was updated (or did not need to be),
|
||||
False otherwise.
|
||||
"""
|
||||
url = '/resource_providers/%s/inventories' % compute_node.uuid
|
||||
data = self._compute_node_inventory(compute_node)
|
||||
result = self.put(url, data)
|
||||
if result.status_code == 409:
|
||||
# Generation fail, re-poll and then re-try
|
||||
del self._resource_providers[compute_node.uuid]
|
||||
self._ensure_resource_provider(
|
||||
compute_node.uuid, compute_node.hypervisor_hostname)
|
||||
LOG.info(_LI('Retrying update inventory for %s'),
|
||||
compute_node.uuid)
|
||||
# Regenerate the body with the new generation
|
||||
data = self._compute_node_inventory(compute_node)
|
||||
result = self.put(url, data)
|
||||
elif not result:
|
||||
LOG.warning(_LW('Failed to update inventory for '
|
||||
'%(uuid)s: %(status)i %(text)s'),
|
||||
{'uuid': compute_node.uuid,
|
||||
'status': result.status_code,
|
||||
'text': result.text})
|
||||
return False
|
||||
|
||||
generation = data['resource_provider_generation']
|
||||
if result.status_code == 200:
|
||||
self._resource_providers[compute_node.uuid].generation = (
|
||||
generation + 1)
|
||||
LOG.debug('Updated inventory for %s at generation %i' % (
|
||||
compute_node.uuid, generation))
|
||||
return True
|
||||
elif result.status_code == 409:
|
||||
LOG.info(_LI('Double generation clash updating inventory '
|
||||
'for %(uuid)s at generation %(gen)i'),
|
||||
{'uuid': compute_node.uuid,
|
||||
'gen': generation})
|
||||
return False
|
||||
|
||||
LOG.info(_LI('Received unexpected response code %(code)i while '
|
||||
'trying to update inventory for compute node %(uuid)s '
|
||||
'at generation %(gen)i: %(text)s'),
|
||||
{'uuid': compute_node.uuid,
|
||||
'code': result.status_code,
|
||||
'gen': generation,
|
||||
'text': result.text})
|
||||
return False
|
||||
|
||||
def update_resource_stats(self, compute_node):
|
||||
"""Creates or updates stats for the supplied compute node.
|
||||
|
||||
@ -199,3 +291,5 @@ class SchedulerReportClient(object):
|
||||
compute_node.save()
|
||||
self._ensure_resource_provider(compute_node.uuid,
|
||||
compute_node.hypervisor_hostname)
|
||||
if compute_node.uuid in self._resource_providers:
|
||||
self._update_inventory(compute_node)
|
||||
|
@ -278,7 +278,252 @@ class SchedulerReportClientTestCase(test.NoDBTestCase):
|
||||
# A 503 Service Unavailable should log an error and
|
||||
# _create_resource_provider() should return None
|
||||
self.assertTrue(logging_mock.called)
|
||||
self.assertIsNone(result)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_compute_node_inventory(self):
|
||||
# This is for making sure we only check once the I/O so we can directly
|
||||
# call this helper method for the next tests.
|
||||
uuid = uuids.compute_node
|
||||
name = 'computehost'
|
||||
compute_node = objects.ComputeNode(uuid=uuid,
|
||||
hypervisor_hostname=name,
|
||||
vcpus=2,
|
||||
cpu_allocation_ratio=16.0,
|
||||
memory_mb=1024,
|
||||
ram_allocation_ratio=1.5,
|
||||
local_gb=10,
|
||||
disk_allocation_ratio=1.0)
|
||||
rp = objects.ResourceProvider(uuid=uuid, name=name, generation=42)
|
||||
self.client._resource_providers[uuid] = rp
|
||||
|
||||
self.flags(reserved_host_memory_mb=1000)
|
||||
self.flags(reserved_host_disk_mb=2000)
|
||||
|
||||
result = self.client._compute_node_inventory(compute_node)
|
||||
|
||||
expected_inventories = [
|
||||
{'resource_class': 'VCPU',
|
||||
'total': compute_node.vcpus,
|
||||
'reserved': 0,
|
||||
'min_unit': 1,
|
||||
'max_unit': 1,
|
||||
'step_size': 1,
|
||||
'allocation_ratio': compute_node.cpu_allocation_ratio},
|
||||
{'resource_class': 'MEMORY_MB',
|
||||
'total': compute_node.memory_mb,
|
||||
'reserved': CONF.reserved_host_memory_mb,
|
||||
'min_unit': 1,
|
||||
'max_unit': 1,
|
||||
'step_size': 1,
|
||||
'allocation_ratio': compute_node.ram_allocation_ratio},
|
||||
{'resource_class': 'DISK_GB',
|
||||
'total': compute_node.local_gb,
|
||||
'reserved': CONF.reserved_host_disk_mb * 1024,
|
||||
'min_unit': 1,
|
||||
'max_unit': 1,
|
||||
'step_size': 1,
|
||||
'allocation_ratio': compute_node.disk_allocation_ratio},
|
||||
]
|
||||
expected = {
|
||||
'resource_provider_generation': rp.generation,
|
||||
'inventories': expected_inventories,
|
||||
}
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
def test_update_inventory(self):
|
||||
# Ensure _update_inventory() returns a list of Inventories objects
|
||||
# after creating or updating the existing values
|
||||
uuid = uuids.compute_node
|
||||
name = 'computehost'
|
||||
compute_node = objects.ComputeNode(uuid=uuid,
|
||||
hypervisor_hostname=name,
|
||||
vcpus=2,
|
||||
cpu_allocation_ratio=16.0,
|
||||
memory_mb=1024,
|
||||
ram_allocation_ratio=1.5,
|
||||
local_gb=10,
|
||||
disk_allocation_ratio=1.0)
|
||||
rp = objects.ResourceProvider(uuid=uuid, name=name, generation=42)
|
||||
# Make sure the ResourceProvider exists for preventing to call the API
|
||||
self.client._resource_providers[uuid] = rp
|
||||
|
||||
expected_output = mock.sentinel.inventories
|
||||
resp_mock = mock.Mock(status_code=200, json=lambda: expected_output)
|
||||
self.ks_sess_mock.put.return_value = resp_mock
|
||||
|
||||
# Make sure we store the original generation bit before it's updated
|
||||
original_generation = rp.generation
|
||||
expected_payload = self.client._compute_node_inventory(compute_node)
|
||||
|
||||
result = self.client._update_inventory(compute_node)
|
||||
|
||||
expected_url = '/resource_providers/' + uuid + '/inventories'
|
||||
self.ks_sess_mock.put.assert_called_once_with(
|
||||
expected_url,
|
||||
endpoint_filter=mock.ANY,
|
||||
json=expected_payload,
|
||||
raise_exc=False)
|
||||
self.assertTrue(result)
|
||||
# Make sure the generation bit has been incremented
|
||||
rp = self.client._resource_providers[compute_node.uuid]
|
||||
self.assertEqual(original_generation + 1, rp.generation)
|
||||
|
||||
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
|
||||
'_ensure_resource_provider')
|
||||
def test_update_inventory_conflicts_and_then_succeeds(self, ensure_mock):
|
||||
# Ensure _update_inventory() fails if we have a conflict when updating
|
||||
# but retries correctly.
|
||||
uuid = uuids.compute_node
|
||||
name = 'computehost'
|
||||
compute_node = objects.ComputeNode(uuid=uuid,
|
||||
hypervisor_hostname=name,
|
||||
vcpus=2,
|
||||
cpu_allocation_ratio=16.0,
|
||||
memory_mb=1024,
|
||||
ram_allocation_ratio=1.5,
|
||||
local_gb=10,
|
||||
disk_allocation_ratio=1.0)
|
||||
rp = objects.ResourceProvider(uuid=uuid, name=name, generation=42)
|
||||
|
||||
# Make sure the ResourceProvider exists for preventing to call the API
|
||||
def fake_ensure_rp(uuid, name=None):
|
||||
self.client._resource_providers[uuid] = rp
|
||||
ensure_mock.side_effect = fake_ensure_rp
|
||||
|
||||
self.client._resource_providers[uuid] = rp
|
||||
|
||||
# Make sure we store the original generation bit before it's updated
|
||||
original_generation = rp.generation
|
||||
expected_payload = self.client._compute_node_inventory(compute_node)
|
||||
expected_output = mock.sentinel.inventories
|
||||
|
||||
conflict_mock = mock.Mock(status_code=409)
|
||||
success_mock = mock.Mock(status_code=200, json=lambda: expected_output)
|
||||
self.ks_sess_mock.put.side_effect = (conflict_mock, success_mock)
|
||||
|
||||
result = self.client._update_inventory(compute_node)
|
||||
|
||||
expected_url = '/resource_providers/' + uuid + '/inventories'
|
||||
self.ks_sess_mock.put.assert_has_calls(
|
||||
[
|
||||
mock.call(expected_url,
|
||||
endpoint_filter=mock.ANY,
|
||||
json=expected_payload,
|
||||
raise_exc=False),
|
||||
mock.call(expected_url,
|
||||
endpoint_filter=mock.ANY,
|
||||
json=expected_payload,
|
||||
raise_exc=False),
|
||||
])
|
||||
|
||||
self.assertTrue(result)
|
||||
# Make sure the generation bit has been incremented
|
||||
rp = self.client._resource_providers[compute_node.uuid]
|
||||
self.assertEqual(original_generation + 1, rp.generation)
|
||||
|
||||
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
|
||||
'_ensure_resource_provider')
|
||||
def test_update_inventory_conflicts_and_then_fails(self, ensure_mock):
|
||||
# Ensure _update_inventory() fails if we have a conflict when updating
|
||||
# but fails again.
|
||||
uuid = uuids.compute_node
|
||||
name = 'computehost'
|
||||
compute_node = objects.ComputeNode(uuid=uuid,
|
||||
hypervisor_hostname=name,
|
||||
vcpus=2,
|
||||
cpu_allocation_ratio=16.0,
|
||||
memory_mb=1024,
|
||||
ram_allocation_ratio=1.5,
|
||||
local_gb=10,
|
||||
disk_allocation_ratio=1.0)
|
||||
rp = objects.ResourceProvider(uuid=uuid, name=name, generation=42)
|
||||
|
||||
# Make sure the ResourceProvider exists for preventing to call the API
|
||||
def fake_ensure_rp(uuid, name=None):
|
||||
self.client._resource_providers[uuid] = rp
|
||||
ensure_mock.side_effect = fake_ensure_rp
|
||||
|
||||
self.client._resource_providers[uuid] = rp
|
||||
|
||||
expected_payload = self.client._compute_node_inventory(compute_node)
|
||||
|
||||
conflict_mock = mock.Mock(status_code=409)
|
||||
fail_mock = mock.Mock(status_code=400)
|
||||
self.ks_sess_mock.put.side_effect = (conflict_mock, fail_mock)
|
||||
|
||||
result = self.client._update_inventory(compute_node)
|
||||
|
||||
expected_url = '/resource_providers/' + uuid + '/inventories'
|
||||
self.ks_sess_mock.put.assert_has_calls(
|
||||
[
|
||||
mock.call(expected_url,
|
||||
endpoint_filter=mock.ANY,
|
||||
json=expected_payload,
|
||||
raise_exc=False),
|
||||
mock.call(expected_url,
|
||||
endpoint_filter=mock.ANY,
|
||||
json=expected_payload,
|
||||
raise_exc=False),
|
||||
])
|
||||
|
||||
self.assertFalse(result)
|
||||
|
||||
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
|
||||
'_ensure_resource_provider')
|
||||
def test_update_inventory_conflicts_and_then_conflicts(self, ensure_mock):
|
||||
# Ensure _update_inventory() fails if we have a conflict when updating
|
||||
# but fails again.
|
||||
uuid = uuids.compute_node
|
||||
name = 'computehost'
|
||||
compute_node = objects.ComputeNode(uuid=uuid,
|
||||
hypervisor_hostname=name,
|
||||
vcpus=2,
|
||||
cpu_allocation_ratio=16.0,
|
||||
memory_mb=1024,
|
||||
ram_allocation_ratio=1.5,
|
||||
local_gb=10,
|
||||
disk_allocation_ratio=1.0)
|
||||
rp = objects.ResourceProvider(uuid=uuid, name=name, generation=42)
|
||||
|
||||
# Make sure the ResourceProvider exists for preventing to call the API
|
||||
def fake_ensure_rp(uuid, name=None):
|
||||
self.client._resource_providers[uuid] = rp
|
||||
ensure_mock.side_effect = fake_ensure_rp
|
||||
|
||||
self.client._resource_providers[uuid] = rp
|
||||
|
||||
expected_payload = self.client._compute_node_inventory(compute_node)
|
||||
|
||||
conflict_mock = mock.Mock(status_code=409)
|
||||
self.ks_sess_mock.put.return_value = conflict_mock
|
||||
|
||||
result = self.client._update_inventory(compute_node)
|
||||
|
||||
expected_url = '/resource_providers/' + uuid + '/inventories'
|
||||
self.ks_sess_mock.put.assert_has_calls(
|
||||
[
|
||||
mock.call(expected_url,
|
||||
endpoint_filter=mock.ANY,
|
||||
json=expected_payload,
|
||||
raise_exc=False),
|
||||
mock.call(expected_url,
|
||||
endpoint_filter=mock.ANY,
|
||||
json=expected_payload,
|
||||
raise_exc=False),
|
||||
])
|
||||
|
||||
self.assertFalse(result)
|
||||
|
||||
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
|
||||
'_ensure_resource_provider')
|
||||
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
|
||||
'_update_inventory')
|
||||
def test_update_resource_stats_rp_fail(self, mock_ui, mock_erp):
|
||||
cn = mock.MagicMock()
|
||||
self.client.update_resource_stats(cn)
|
||||
cn.save.assert_called_once_with()
|
||||
mock_erp.assert_called_once_with(cn.uuid, cn.hypervisor_hostname)
|
||||
self.assertFalse(mock_ui.called)
|
||||
|
||||
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
|
||||
'_ensure_resource_provider')
|
||||
|
Loading…
Reference in New Issue
Block a user