rt: explicitly pass compute node to _update()

This patch is yet another intermediate step getting us closer to having
a single resource tracker object instance containing a set of
ComputeNode objects representing the compute nodes the nova-compute
daemon manages.

We modify the ResourceTracker._update() method to take an explicit
objects.ComputeNode object instead of looking at the
ResourceTracker.compute_node attribute. We also make
ResourceTracker._update_usage() take a nodename parameter so that in the
next patch, we can change the ResourceTracker.compute_node attribute to
a dict of compute node objects, keyed by nodename.

Change-Id: I12cea81378ba774e3983dc7c4ed506e995696f2e
This commit is contained in:
Jay Pipes 2016-11-15 18:50:02 -05:00
parent d62783eb06
commit 7b95b4d607
3 changed files with 94 additions and 135 deletions

View File

@ -142,9 +142,10 @@ class ResourceTracker(object):
"GB", {'flavor': instance.flavor.root_gb,
'overhead': overhead.get('disk_gb', 0)})
cn = self.compute_node
pci_requests = objects.InstancePCIRequests.get_by_instance_uuid(
context, instance.uuid)
claim = claims.Claim(context, instance, self, self.compute_node,
claim = claims.Claim(context, instance, self, cn,
pci_requests, overhead=overhead, limits=limits)
# self._set_instance_host_and_node() will save instance to the DB
@ -166,7 +167,7 @@ class ResourceTracker(object):
elevated = context.elevated()
# persist changes to the compute node:
self._update(elevated)
self._update(elevated, cn)
return claim
@ -231,6 +232,8 @@ class ResourceTracker(object):
"GB", {'flavor': instance.flavor.root_gb,
'overhead': overhead.get('disk_gb', 0)})
cn = self.compute_node
# TODO(moshele): we are recreating the pci requests even if
# there was no change on resize. This will cause allocating
# the old/new pci device in the resize phase. In the future
@ -247,7 +250,7 @@ class ResourceTracker(object):
if request.alias_name is None:
new_pci_requests.requests.append(request)
claim = claims.MoveClaim(context, instance, new_instance_type,
image_meta, self, self.compute_node,
image_meta, self, cn,
new_pci_requests, overhead=overhead,
limits=limits)
@ -281,7 +284,7 @@ class ResourceTracker(object):
self._update_usage_from_migration(context, instance, migration,
nodename)
elevated = context.elevated()
self._update(elevated)
self._update(elevated, cn)
return claim
@ -352,7 +355,7 @@ class ResourceTracker(object):
instance.clear_numa_topology()
self._unset_instance_host_and_node(instance)
self._update(context.elevated())
self._update(context.elevated(), self.compute_node)
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
def drop_move_claim(self, context, instance, nodename,
@ -378,10 +381,10 @@ class ResourceTracker(object):
if pci_devices:
for pci_device in pci_devices:
self.pci_tracker.free_device(pci_device, instance)
self._update_usage(usage, sign=-1)
self._update_usage(usage, nodename, sign=-1)
ctxt = context.elevated()
self._update(ctxt)
self._update(ctxt, self.compute_node)
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
def update_usage(self, context, instance, nodename):
@ -397,7 +400,7 @@ class ResourceTracker(object):
# claim first:
if uuid in self.tracked_instances:
self._update_usage_from_instance(context, instance, nodename)
self._update(context.elevated())
self._update(context.elevated(), self.compute_node)
@property
def disabled(self):
@ -422,7 +425,7 @@ class ResourceTracker(object):
# if there is already a compute node just use resources
# to initialize
if self.compute_node:
self._copy_resources(resources)
self._copy_resources(self.compute_node, resources)
self._setup_pci_tracker(context, resources)
self.scheduler_client.update_resource_stats(self.compute_node)
return
@ -431,7 +434,7 @@ class ResourceTracker(object):
# database. If we get one we use resources to initialize
self.compute_node = self._get_compute_node(context, nodename)
if self.compute_node:
self._copy_resources(resources)
self._copy_resources(self.compute_node, resources)
self._setup_pci_tracker(context, resources)
self.scheduler_client.update_resource_stats(self.compute_node)
return
@ -441,7 +444,7 @@ class ResourceTracker(object):
# to be initialized with resource values.
self.compute_node = objects.ComputeNode(context)
self.compute_node.host = self.host
self._copy_resources(resources)
self._copy_resources(self.compute_node, resources)
self.compute_node.create()
LOG.info(_LI('Compute_service record created for '
'%(host)s:%(node)s'),
@ -462,22 +465,20 @@ class ResourceTracker(object):
dev_pools_obj = self.pci_tracker.stats.to_device_pools_obj()
self.compute_node.pci_device_pools = dev_pools_obj
def _copy_resources(self, resources):
"""Copy resource values to initialize compute_node and related
data structures.
"""
def _copy_resources(self, compute_node, resources):
"""Copy resource values to supplied compute_node."""
# purge old stats and init with anything passed in by the driver
self.stats.clear()
self.stats.digest_stats(resources.get('stats'))
self.compute_node.stats = copy.deepcopy(self.stats)
compute_node.stats = copy.deepcopy(self.stats)
# update the allocation ratios for the related ComputeNode object
self.compute_node.ram_allocation_ratio = self.ram_allocation_ratio
self.compute_node.cpu_allocation_ratio = self.cpu_allocation_ratio
self.compute_node.disk_allocation_ratio = self.disk_allocation_ratio
compute_node.ram_allocation_ratio = self.ram_allocation_ratio
compute_node.cpu_allocation_ratio = self.cpu_allocation_ratio
compute_node.disk_allocation_ratio = self.disk_allocation_ratio
# now copy rest to compute_node
self.compute_node.update_from_virt_driver(resources)
compute_node.update_from_virt_driver(resources)
def _get_host_metrics(self, context, nodename):
"""Get the metrics from monitors and
@ -589,7 +590,7 @@ class ResourceTracker(object):
# Detect and account for orphaned instances that may exist on the
# hypervisor, but are not in the DB:
orphans = self._find_orphaned_instances()
self._update_usage_from_orphans(orphans)
self._update_usage_from_orphans(orphans, nodename)
# NOTE(yjiang5): Because pci device tracker status is not cleared in
# this periodic task, and also because the resource tracker is not
@ -607,7 +608,7 @@ class ResourceTracker(object):
self.compute_node.metrics = jsonutils.dumps(metrics)
# update the compute_node
self._update(context)
self._update(context, self.compute_node)
LOG.debug('Compute_service record updated for %(host)s:%(node)s',
{'host': self.host, 'node': nodename})
@ -694,23 +695,23 @@ class ResourceTracker(object):
'used_vcpus': ucpu,
'pci_stats': pci_stats})
def _resource_change(self):
def _resource_change(self, compute_node):
"""Check to see if any resources have changed."""
if not obj_base.obj_equal_prims(self.compute_node, self.old_resources):
self.old_resources = copy.deepcopy(self.compute_node)
if not obj_base.obj_equal_prims(compute_node, self.old_resources):
self.old_resources = copy.deepcopy(compute_node)
return True
return False
def _update(self, context):
def _update(self, context, compute_node):
"""Update partial stats locally and populate them to Scheduler."""
if not self._resource_change():
if not self._resource_change(compute_node):
return
# Persist the stats to the Scheduler
self.scheduler_client.update_resource_stats(self.compute_node)
self.scheduler_client.update_resource_stats(compute_node)
if self.pci_tracker:
self.pci_tracker.save(context)
def _update_usage(self, usage, sign=1):
def _update_usage(self, usage, nodename, sign=1):
mem_usage = usage['memory_mb']
disk_usage = usage.get('root_gb', 0)
@ -815,7 +816,7 @@ class ResourceTracker(object):
if self.pci_tracker and sign:
self.pci_tracker.update_pci_for_instance(
context, instance, sign=sign)
self._update_usage(usage)
self._update_usage(usage, nodename)
if self.pci_tracker:
obj = self.pci_tracker.stats.to_device_pools_obj()
self.compute_node.pci_device_pools = obj
@ -901,7 +902,8 @@ class ResourceTracker(object):
self.scheduler_client.reportclient.update_instance_allocation(
self.compute_node, instance, sign)
# new instance, update compute node resource usage:
self._update_usage(self._get_usage_dict(instance), sign=sign)
self._update_usage(self._get_usage_dict(instance), nodename,
sign=sign)
self.compute_node.current_workload = self.stats.calculate_workload()
if self.pci_tracker:
@ -959,7 +961,7 @@ class ResourceTracker(object):
return orphans
def _update_usage_from_orphans(self, orphans):
def _update_usage_from_orphans(self, orphans, nodename):
"""Include orphaned instances in usage."""
for orphan in orphans:
memory_mb = orphan['memory_mb']
@ -970,7 +972,7 @@ class ResourceTracker(object):
# just record memory usage for the orphan
usage = {'memory_mb': memory_mb}
self._update_usage(usage)
self._update_usage(usage, nodename)
def _verify_resources(self, resources):
resource_keys = ["vcpus", "memory_mb", "local_gb", "cpu_info",

View File

@ -19,5 +19,5 @@ from nova.compute import resource_tracker
class FakeResourceTracker(resource_tracker.ResourceTracker):
"""Version without a DB requirement."""
def _update(self, context):
def _update(self, context, compute_node):
pass

View File

@ -517,9 +517,9 @@ class TestUpdateAvailableResources(BaseTestCase):
'running_vms': 0
}
_update_compute_node(expected_resources, **vals)
update_mock.assert_called_once_with(mock.sentinel.ctx)
actual_resources = update_mock.call_args[0][1]
self.assertTrue(obj_base.obj_equal_prims(expected_resources,
self.rt.compute_node))
actual_resources))
@mock.patch('nova.objects.InstancePCIRequests.get_by_instance',
return_value=objects.InstancePCIRequests(requests=[]))
@ -557,9 +557,9 @@ class TestUpdateAvailableResources(BaseTestCase):
'running_vms': 0
}
_update_compute_node(expected_resources, **vals)
update_mock.assert_called_once_with(mock.sentinel.ctx)
actual_resources = update_mock.call_args[0][1]
self.assertTrue(obj_base.obj_equal_prims(expected_resources,
self.rt.compute_node))
actual_resources))
@mock.patch('nova.objects.InstancePCIRequests.get_by_instance',
return_value=objects.InstancePCIRequests(requests=[]))
@ -605,9 +605,9 @@ class TestUpdateAvailableResources(BaseTestCase):
'running_vms': 1 # One active instance
}
_update_compute_node(expected_resources, **vals)
update_mock.assert_called_once_with(mock.sentinel.ctx)
actual_resources = update_mock.call_args[0][1]
self.assertTrue(obj_base.obj_equal_prims(expected_resources,
self.rt.compute_node))
actual_resources))
@mock.patch('nova.objects.InstancePCIRequests.get_by_instance',
return_value=objects.InstancePCIRequests(requests=[]))
@ -669,9 +669,9 @@ class TestUpdateAvailableResources(BaseTestCase):
'running_vms': 0
}
_update_compute_node(expected_resources, **vals)
update_mock.assert_called_once_with(mock.sentinel.ctx)
actual_resources = update_mock.call_args[0][1]
self.assertTrue(obj_base.obj_equal_prims(expected_resources,
self.rt.compute_node))
actual_resources))
@mock.patch('nova.objects.InstancePCIRequests.get_by_instance',
return_value=objects.InstancePCIRequests(requests=[]))
@ -732,9 +732,9 @@ class TestUpdateAvailableResources(BaseTestCase):
'running_vms': 0
}
_update_compute_node(expected_resources, **vals)
update_mock.assert_called_once_with(mock.sentinel.ctx)
actual_resources = update_mock.call_args[0][1]
self.assertTrue(obj_base.obj_equal_prims(expected_resources,
self.rt.compute_node))
actual_resources))
@mock.patch('nova.objects.InstancePCIRequests.get_by_instance',
return_value=objects.InstancePCIRequests(requests=[]))
@ -792,9 +792,9 @@ class TestUpdateAvailableResources(BaseTestCase):
'running_vms': 0
}
_update_compute_node(expected_resources, **vals)
update_mock.assert_called_once_with(mock.sentinel.ctx)
actual_resources = update_mock.call_args[0][1]
self.assertTrue(obj_base.obj_equal_prims(expected_resources,
self.rt.compute_node))
actual_resources))
@mock.patch('nova.objects.InstancePCIRequests.get_by_instance',
return_value=objects.InstancePCIRequests(requests=[]))
@ -848,9 +848,9 @@ class TestUpdateAvailableResources(BaseTestCase):
'running_vms': 0
}
_update_compute_node(expected_resources, **vals)
update_mock.assert_called_once_with(mock.sentinel.ctx)
actual_resources = update_mock.call_args[0][1]
self.assertTrue(obj_base.obj_equal_prims(expected_resources,
self.rt.compute_node))
actual_resources))
@mock.patch('nova.objects.InstancePCIRequests.get_by_instance',
return_value=objects.InstancePCIRequests(requests=[]))
@ -917,9 +917,9 @@ class TestUpdateAvailableResources(BaseTestCase):
'running_vms': 2
}
_update_compute_node(expected_resources, **vals)
update_mock.assert_called_once_with(mock.sentinel.ctx)
actual_resources = update_mock.call_args[0][1]
self.assertTrue(obj_base.obj_equal_prims(expected_resources,
self.rt.compute_node))
actual_resources))
class TestInitComputeNode(BaseTestCase):
@ -1057,84 +1057,40 @@ class TestUpdateComputeNode(BaseTestCase):
# This is the same set of resources as the fixture, deliberately. We
# are checking below to see that update_resource_stats() is not
# needlessly called when the resources don't actually change.
compute = objects.ComputeNode(
host_ip='1.1.1.1',
numa_topology=None,
metrics='[]',
cpu_info='',
hypervisor_hostname=_NODENAME,
free_disk_gb=6,
hypervisor_version=0,
local_gb=6,
free_ram_mb=512,
memory_mb_used=0,
pci_device_pools=objects.PciDevicePoolList(),
vcpus_used=0,
hypervisor_type='fake',
local_gb_used=0,
memory_mb=512,
current_workload=0,
vcpus=4,
running_vms=0,
cpu_allocation_ratio=16.0,
ram_allocation_ratio=1.5,
disk_allocation_ratio=1.0,
)
self.rt.compute_node = compute
self.rt._update(mock.sentinel.ctx)
orig_compute = _COMPUTE_NODE_FIXTURES[0].obj_clone()
self.rt.compute_node = orig_compute
self.rt.old_resources = orig_compute
self.assertFalse(self.rt.disabled)
self.assertFalse(service_mock.called)
new_compute = orig_compute.obj_clone()
# The above call to _update() will populate the
# RT.old_resources collection with the resources. Here, we check that
# if we call _update() again with the same resources, that
# the scheduler client won't be called again to update those
# (unchanged) resources for the compute node
self.sched_client_mock.reset_mock()
# Here, we check that if we call _update() with the same resources that
# are already stored in the resource tracker, that the scheduler client
# won't be called again to update those (unchanged) resources for the
# compute node
urs_mock = self.sched_client_mock.update_resource_stats
self.rt._update(mock.sentinel.ctx)
self.rt._update(mock.sentinel.ctx, new_compute)
self.assertFalse(urs_mock.called)
@mock.patch('nova.objects.Service.get_by_compute_host')
def test_existing_compute_node_updated_new_resources(self, service_mock):
self._setup_rt()
orig_compute = _COMPUTE_NODE_FIXTURES[0].obj_clone()
self.rt.compute_node = orig_compute
self.rt.old_resources = orig_compute
# Deliberately changing local_gb_used, vcpus_used, and memory_mb_used
# below to be different from the compute node fixture's base usages.
# We want to check that the code paths update the stored compute node
# usage records with what is supplied to _update().
compute = objects.ComputeNode(
host=_HOSTNAME,
host_ip='1.1.1.1',
numa_topology=None,
metrics='[]',
cpu_info='',
hypervisor_hostname=_NODENAME,
free_disk_gb=2,
hypervisor_version=0,
local_gb=6,
free_ram_mb=384,
memory_mb_used=128,
pci_device_pools=objects.PciDevicePoolList(),
vcpus_used=2,
hypervisor_type='fake',
local_gb_used=4,
memory_mb=512,
current_workload=0,
vcpus=4,
running_vms=0,
cpu_allocation_ratio=16.0,
ram_allocation_ratio=1.5,
disk_allocation_ratio=1.0,
)
self.rt.compute_node = compute
self.rt._update(mock.sentinel.ctx)
new_compute = orig_compute.obj_clone()
new_compute.memory_mb_used = 128
new_compute.vcpus_used = 2
new_compute.local_gb_used = 4
self.assertFalse(self.rt.disabled)
self.assertFalse(service_mock.called)
urs_mock = self.sched_client_mock.update_resource_stats
urs_mock.assert_called_once_with(self.rt.compute_node)
self.rt._update(mock.sentinel.ctx, new_compute)
urs_mock.assert_called_once_with(new_compute)
class TestInstanceClaim(BaseTestCase):
@ -1223,9 +1179,9 @@ class TestInstanceClaim(BaseTestCase):
with mock.patch.object(self.instance, 'save'):
self.rt.instance_claim(self.ctx, self.instance, _NODENAME,
None)
update_mock.assert_called_once_with(self.elevated)
self.assertTrue(obj_base.obj_equal_prims(expected,
self.rt.compute_node))
cn = self.rt.compute_node
update_mock.assert_called_once_with(self.elevated, cn)
self.assertTrue(obj_base.obj_equal_prims(expected, cn))
@mock.patch('nova.objects.InstancePCIRequests.get_by_instance_uuid')
@mock.patch('nova.objects.MigrationList.get_in_progress_by_host_and_node')
@ -1258,9 +1214,9 @@ class TestInstanceClaim(BaseTestCase):
with mock.patch.object(self.instance, 'save'):
self.rt.instance_claim(self.ctx, self.instance, _NODENAME,
None)
update_mock.assert_called_once_with(self.elevated)
self.assertTrue(obj_base.obj_equal_prims(expected,
self.rt.compute_node))
cn = self.rt.compute_node
update_mock.assert_called_once_with(self.elevated, cn)
self.assertTrue(obj_base.obj_equal_prims(expected, cn))
expected_updated = copy.deepcopy(_COMPUTE_NODE_FIXTURES[0])
vals = {
@ -1313,9 +1269,9 @@ class TestInstanceClaim(BaseTestCase):
with mock.patch.object(self.instance, 'save'):
self.rt.instance_claim(self.ctx, self.instance, _NODENAME,
None)
update_mock.assert_called_once_with(self.elevated)
self.assertTrue(obj_base.obj_equal_prims(expected,
self.rt.compute_node))
cn = self.rt.compute_node
update_mock.assert_called_once_with(self.elevated, cn)
self.assertTrue(obj_base.obj_equal_prims(expected, cn))
self.assertEqual(self.rt.host, self.instance.host)
self.assertEqual(self.rt.host, self.instance.launched_on)
@ -1371,10 +1327,10 @@ class TestInstanceClaim(BaseTestCase):
with mock.patch.object(self.instance, 'save'):
self.rt.instance_claim(self.ctx, self.instance, _NODENAME,
None)
update_mock.assert_called_once_with(self.elevated)
cn = self.rt.compute_node
update_mock.assert_called_once_with(self.elevated, cn)
pci_stats_mock.assert_called_once_with([request])
self.assertTrue(obj_base.obj_equal_prims(expected,
self.rt.compute_node))
self.assertTrue(obj_base.obj_equal_prims(expected, cn))
@mock.patch('nova.objects.InstancePCIRequests.get_by_instance_uuid')
@mock.patch('nova.objects.MigrationList.get_in_progress_by_host_and_node')
@ -1492,8 +1448,9 @@ class TestInstanceClaim(BaseTestCase):
with mock.patch.object(self.instance, 'save'):
self.rt.instance_claim(self.ctx, self.instance, _NODENAME,
limits)
update_mock.assert_called_once_with(self.ctx.elevated())
updated_compute_node = self.rt.compute_node
update_mock.assert_called_once_with(self.ctx.elevated(),
updated_compute_node)
new_numa = updated_compute_node.numa_topology
new_numa = objects.NUMATopology.obj_from_db_obj(new_numa)
self.assertEqualNUMAHostTopology(expected_numa, new_numa)
@ -1580,8 +1537,8 @@ class TestResize(BaseTestCase):
None # move_type is None for resize...
)
self.assertIsInstance(claim, claims.MoveClaim)
self.assertTrue(obj_base.obj_equal_prims(expected,
self.rt.compute_node))
cn = self.rt.compute_node
self.assertTrue(obj_base.obj_equal_prims(expected, cn))
self.assertEqual(1, len(self.rt.tracked_migrations))
# Now abort the resize claim and check that the resources have been set
@ -1591,9 +1548,9 @@ class TestResize(BaseTestCase):
claim.abort()
drop_migr_mock.assert_called_once_with()
self.assertEqual(1, self.rt.compute_node.vcpus_used)
self.assertEqual(1, self.rt.compute_node.local_gb_used)
self.assertEqual(128, self.rt.compute_node.memory_mb_used)
self.assertEqual(1, cn.vcpus_used)
self.assertEqual(1, cn.local_gb_used)
self.assertEqual(128, cn.memory_mb_used)
self.assertEqual(0, len(self.rt.tracked_migrations))
@mock.patch('nova.objects.InstancePCIRequests.get_by_instance_uuid',
@ -2124,7 +2081,7 @@ class TestUpdateUsageFromInstance(BaseTestCase):
_NODENAME)
mock_update_usage.assert_called_once_with(
self.rt._get_usage_dict(self.instance), sign=1)
self.rt._get_usage_dict(self.instance), _NODENAME, sign=1)
@mock.patch('nova.compute.resource_tracker.ResourceTracker.'
'_update_usage')
@ -2137,7 +2094,7 @@ class TestUpdateUsageFromInstance(BaseTestCase):
_NODENAME)
mock_update_usage.assert_called_once_with(
self.rt._get_usage_dict(self.instance), sign=-1)
self.rt._get_usage_dict(self.instance), _NODENAME, sign=-1)
@mock.patch('nova.compute.resource_tracker.ResourceTracker.'
'_update_usage')
@ -2147,7 +2104,7 @@ class TestUpdateUsageFromInstance(BaseTestCase):
_NODENAME)
mock_update_usage.assert_called_once_with(
self.rt._get_usage_dict(self.instance), sign=1)
self.rt._get_usage_dict(self.instance), _NODENAME, sign=1)
@mock.patch('nova.compute.resource_tracker.ResourceTracker.'
'_update_usage')
@ -2160,7 +2117,7 @@ class TestUpdateUsageFromInstance(BaseTestCase):
self.instance, _NODENAME, True)
mock_update_usage.assert_called_once_with(
self.rt._get_usage_dict(self.instance), sign=-1)
self.rt._get_usage_dict(self.instance), _NODENAME, sign=-1)
class TestInstanceInResizeState(test.NoDBTestCase):