Set instance host field after resource claim
Set the 'host' field on the instance after the resource tracker on the compute node has accepted the build. The field is set after resources are confirmed to be available while the COMPUTE_RESOURCES_SEMAPHORE is held. The semaphore ensures the resources usage values will be consistent even if the update_available_resource periodic task audit runs. bug 1060255 Change-Id: I92105ec14924960ac8ef7ca8c810783085314e10
This commit is contained in:
parent
9d4ecc2c55
commit
5fd7a9dba1
@ -488,10 +488,6 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||||||
limits = filter_properties.get('limits', {})
|
limits = filter_properties.get('limits', {})
|
||||||
with self.resource_tracker.resource_claim(context, instance,
|
with self.resource_tracker.resource_claim(context, instance,
|
||||||
limits):
|
limits):
|
||||||
# Resources are available to build this instance here,
|
|
||||||
# mark it as belonging to this host:
|
|
||||||
self._instance_update(context, instance['uuid'],
|
|
||||||
host=self.host, launched_on=self.host)
|
|
||||||
|
|
||||||
block_device_info = self._prep_block_device(context,
|
block_device_info = self._prep_block_device(context,
|
||||||
instance)
|
instance)
|
||||||
|
@ -23,6 +23,7 @@ from nova.compute import vm_states
|
|||||||
from nova import db
|
from nova import db
|
||||||
from nova import exception
|
from nova import exception
|
||||||
from nova import flags
|
from nova import flags
|
||||||
|
from nova import notifications
|
||||||
from nova.openstack.common import cfg
|
from nova.openstack.common import cfg
|
||||||
from nova.openstack.common import importutils
|
from nova.openstack.common import importutils
|
||||||
from nova.openstack.common import jsonutils
|
from nova.openstack.common import jsonutils
|
||||||
@ -152,8 +153,18 @@ class ResourceTracker(object):
|
|||||||
failed.
|
failed.
|
||||||
"""
|
"""
|
||||||
if self.disabled:
|
if self.disabled:
|
||||||
|
# compute_driver doesn't support resource tracking, just
|
||||||
|
# set the 'host' field and continue the build:
|
||||||
|
instance_ref = self._set_instance_host(context,
|
||||||
|
instance_ref['uuid'])
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# sanity check:
|
||||||
|
if instance_ref['host']:
|
||||||
|
LOG.warning(_("Host field should be not be set on the instance "
|
||||||
|
"until resources have been claimed."),
|
||||||
|
instance=instance_ref)
|
||||||
|
|
||||||
if not limits:
|
if not limits:
|
||||||
limits = {}
|
limits = {}
|
||||||
|
|
||||||
@ -184,6 +195,8 @@ class ResourceTracker(object):
|
|||||||
if not self._can_claim_cpu(vcpus, vcpu_limit):
|
if not self._can_claim_cpu(vcpus, vcpu_limit):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
instance_ref = self._set_instance_host(context, instance_ref['uuid'])
|
||||||
|
|
||||||
# keep track of this claim until we know whether the compute operation
|
# keep track of this claim until we know whether the compute operation
|
||||||
# was successful/completed:
|
# was successful/completed:
|
||||||
claim = Claim(instance_ref, timeout)
|
claim = Claim(instance_ref, timeout)
|
||||||
@ -196,6 +209,17 @@ class ResourceTracker(object):
|
|||||||
self._update(context, self.compute_node)
|
self._update(context, self.compute_node)
|
||||||
return claim
|
return claim
|
||||||
|
|
||||||
|
def _set_instance_host(self, context, instance_uuid):
|
||||||
|
"""Tag the instance as belonging to this host. This should be done
|
||||||
|
while the COMPUTE_RESOURCES_SEMPAHORE is being held so the resource
|
||||||
|
claim will not be lost if the audit process starts.
|
||||||
|
"""
|
||||||
|
values = {'host': self.host, 'launched_on': self.host}
|
||||||
|
(old_ref, instance_ref) = db.instance_update_and_get_original(context,
|
||||||
|
instance_uuid, values)
|
||||||
|
notifications.send_update(context, old_ref, instance_ref)
|
||||||
|
return instance_ref
|
||||||
|
|
||||||
def _can_claim_memory(self, memory_mb, memory_mb_limit):
|
def _can_claim_memory(self, memory_mb, memory_mb_limit):
|
||||||
"""Test if memory needed for a claim can be safely allocated"""
|
"""Test if memory needed for a claim can be safely allocated"""
|
||||||
# Installed memory and usage info:
|
# Installed memory and usage info:
|
||||||
|
@ -68,7 +68,7 @@ class ChanceScheduler(driver.Scheduler):
|
|||||||
host = self._schedule(context, 'compute', request_spec,
|
host = self._schedule(context, 'compute', request_spec,
|
||||||
filter_properties)
|
filter_properties)
|
||||||
updated_instance = driver.instance_update_db(context,
|
updated_instance = driver.instance_update_db(context,
|
||||||
instance_uuid, host)
|
instance_uuid)
|
||||||
self.compute_rpcapi.run_instance(context,
|
self.compute_rpcapi.run_instance(context,
|
||||||
instance=updated_instance, host=host,
|
instance=updated_instance, host=host,
|
||||||
requested_networks=requested_networks,
|
requested_networks=requested_networks,
|
||||||
|
@ -101,13 +101,13 @@ def cast_to_volume_host(context, host, method, **kwargs):
|
|||||||
LOG.debug(_("Casted '%(method)s' to volume '%(host)s'") % locals())
|
LOG.debug(_("Casted '%(method)s' to volume '%(host)s'") % locals())
|
||||||
|
|
||||||
|
|
||||||
def instance_update_db(context, instance_uuid, host):
|
def instance_update_db(context, instance_uuid):
|
||||||
'''Set the host and scheduled_at fields of an Instance.
|
'''Clear the host and set the scheduled_at field of an Instance.
|
||||||
|
|
||||||
:returns: An Instance with the updated fields set properly.
|
:returns: An Instance with the updated fields set properly.
|
||||||
'''
|
'''
|
||||||
now = timeutils.utcnow()
|
now = timeutils.utcnow()
|
||||||
values = {'host': host, 'scheduled_at': now}
|
values = {'host': None, 'scheduled_at': now}
|
||||||
return db.instance_update(context, instance_uuid, values)
|
return db.instance_update(context, instance_uuid, values)
|
||||||
|
|
||||||
|
|
||||||
@ -116,7 +116,7 @@ def cast_to_compute_host(context, host, method, **kwargs):
|
|||||||
|
|
||||||
instance_uuid = kwargs.get('instance_uuid', None)
|
instance_uuid = kwargs.get('instance_uuid', None)
|
||||||
if instance_uuid:
|
if instance_uuid:
|
||||||
instance_update_db(context, instance_uuid, host)
|
instance_update_db(context, instance_uuid)
|
||||||
|
|
||||||
rpc.cast(context,
|
rpc.cast(context,
|
||||||
rpc.queue_get_for(context, 'compute', host),
|
rpc.queue_get_for(context, 'compute', host),
|
||||||
|
@ -142,8 +142,7 @@ class FilterScheduler(driver.Scheduler):
|
|||||||
'scheduler.run_instance.scheduled', notifier.INFO,
|
'scheduler.run_instance.scheduled', notifier.INFO,
|
||||||
payload)
|
payload)
|
||||||
|
|
||||||
updated_instance = driver.instance_update_db(context,
|
updated_instance = driver.instance_update_db(context, instance_uuid)
|
||||||
instance_uuid, weighted_host.host_state.host)
|
|
||||||
|
|
||||||
self.compute_rpcapi.run_instance(context, instance=updated_instance,
|
self.compute_rpcapi.run_instance(context, instance=updated_instance,
|
||||||
host=weighted_host.host_state.host,
|
host=weighted_host.host_state.host,
|
||||||
|
@ -22,6 +22,7 @@ import uuid
|
|||||||
from nova.compute import resource_tracker
|
from nova.compute import resource_tracker
|
||||||
from nova.compute import task_states
|
from nova.compute import task_states
|
||||||
from nova.compute import vm_states
|
from nova.compute import vm_states
|
||||||
|
from nova import context
|
||||||
from nova import db
|
from nova import db
|
||||||
from nova import exception
|
from nova import exception
|
||||||
from nova.openstack.common import log as logging
|
from nova.openstack.common import log as logging
|
||||||
@ -32,14 +33,6 @@ from nova.virt import driver
|
|||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class FakeContext(object):
|
|
||||||
def __init__(self, is_admin=False):
|
|
||||||
self.is_admin = is_admin
|
|
||||||
|
|
||||||
def elevated(self):
|
|
||||||
return FakeContext(is_admin=True)
|
|
||||||
|
|
||||||
|
|
||||||
class UnsupportedVirtDriver(driver.ComputeDriver):
|
class UnsupportedVirtDriver(driver.ComputeDriver):
|
||||||
"""Pretend version of a lame virt driver"""
|
"""Pretend version of a lame virt driver"""
|
||||||
def get_available_resource(self):
|
def get_available_resource(self):
|
||||||
@ -81,11 +74,13 @@ class BaseTestCase(test.TestCase):
|
|||||||
self.flags(reserved_host_disk_mb=0,
|
self.flags(reserved_host_disk_mb=0,
|
||||||
reserved_host_memory_mb=0)
|
reserved_host_memory_mb=0)
|
||||||
|
|
||||||
self.context = FakeContext()
|
self.context = context.RequestContext('fake', 'fake')
|
||||||
|
|
||||||
self._instances = []
|
self._instances = {}
|
||||||
self.stubs.Set(db, 'instance_get_all_by_host',
|
self.stubs.Set(db, 'instance_get_all_by_host',
|
||||||
lambda c, h: self._instances)
|
lambda c, h: self._instances.values())
|
||||||
|
self.stubs.Set(db, 'instance_update_and_get_original',
|
||||||
|
self._fake_instance_update_and_get_original)
|
||||||
|
|
||||||
def _create_compute_node(self, values=None):
|
def _create_compute_node(self, values=None):
|
||||||
compute = {
|
compute = {
|
||||||
@ -122,8 +117,10 @@ class BaseTestCase(test.TestCase):
|
|||||||
return service
|
return service
|
||||||
|
|
||||||
def _fake_instance(self, *args, **kwargs):
|
def _fake_instance(self, *args, **kwargs):
|
||||||
|
|
||||||
|
instance_uuid = str(uuid.uuid1())
|
||||||
instance = {
|
instance = {
|
||||||
'uuid': str(uuid.uuid1()),
|
'uuid': instance_uuid,
|
||||||
'vm_state': vm_states.BUILDING,
|
'vm_state': vm_states.BUILDING,
|
||||||
'task_state': None,
|
'task_state': None,
|
||||||
'memory_mb': 2,
|
'memory_mb': 2,
|
||||||
@ -136,9 +133,17 @@ class BaseTestCase(test.TestCase):
|
|||||||
}
|
}
|
||||||
instance.update(kwargs)
|
instance.update(kwargs)
|
||||||
|
|
||||||
self._instances.append(instance)
|
self._instances[instance_uuid] = instance
|
||||||
return instance
|
return instance
|
||||||
|
|
||||||
|
def _fake_instance_update_and_get_original(self, context, instance_uuid,
|
||||||
|
values):
|
||||||
|
instance = self._instances[instance_uuid]
|
||||||
|
instance.update(values)
|
||||||
|
# the test doesn't care what the original instance values are, it's
|
||||||
|
# only used in the subsequent notification:
|
||||||
|
return (instance, instance)
|
||||||
|
|
||||||
def _tracker(self, unsupported=False):
|
def _tracker(self, unsupported=False):
|
||||||
host = "fakehost"
|
host = "fakehost"
|
||||||
|
|
||||||
@ -168,7 +173,8 @@ class UnsupportedDriverTestCase(BaseTestCase):
|
|||||||
|
|
||||||
def testDisabledClaim(self):
|
def testDisabledClaim(self):
|
||||||
# basic claim:
|
# basic claim:
|
||||||
claim = self.tracker.begin_resource_claim(self.context, 1, 1)
|
instance = self._fake_instance()
|
||||||
|
claim = self.tracker.begin_resource_claim(self.context, instance)
|
||||||
self.assertEqual(None, claim)
|
self.assertEqual(None, claim)
|
||||||
|
|
||||||
def testDisabledInstanceClaim(self):
|
def testDisabledInstanceClaim(self):
|
||||||
@ -200,7 +206,7 @@ class UnsupportedDriverTestCase(BaseTestCase):
|
|||||||
class MissingServiceTestCase(BaseTestCase):
|
class MissingServiceTestCase(BaseTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(MissingServiceTestCase, self).setUp()
|
super(MissingServiceTestCase, self).setUp()
|
||||||
self.context = FakeContext(is_admin=True)
|
self.context = context.get_admin_context()
|
||||||
self.tracker = self._tracker()
|
self.tracker = self._tracker()
|
||||||
|
|
||||||
def testMissingService(self):
|
def testMissingService(self):
|
||||||
|
@ -90,8 +90,8 @@ class ChanceSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
|||||||
self.driver.hosts_up(ctxt_elevated, 'compute').AndReturn(
|
self.driver.hosts_up(ctxt_elevated, 'compute').AndReturn(
|
||||||
['host1', 'host2', 'host3', 'host4'])
|
['host1', 'host2', 'host3', 'host4'])
|
||||||
random.random().AndReturn(.5)
|
random.random().AndReturn(.5)
|
||||||
driver.instance_update_db(ctxt, instance1['uuid'],
|
driver.instance_update_db(ctxt, instance1['uuid']).WithSideEffects(
|
||||||
'host3').WithSideEffects(inc_launch_index).AndReturn(instance1)
|
inc_launch_index).AndReturn(instance1)
|
||||||
compute_rpcapi.ComputeAPI.run_instance(ctxt, host='host3',
|
compute_rpcapi.ComputeAPI.run_instance(ctxt, host='host3',
|
||||||
instance=instance1, requested_networks=None,
|
instance=instance1, requested_networks=None,
|
||||||
injected_files=None, admin_password=None, is_first_time=None,
|
injected_files=None, admin_password=None, is_first_time=None,
|
||||||
@ -102,8 +102,8 @@ class ChanceSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
|||||||
self.driver.hosts_up(ctxt_elevated, 'compute').AndReturn(
|
self.driver.hosts_up(ctxt_elevated, 'compute').AndReturn(
|
||||||
['host1', 'host2', 'host3', 'host4'])
|
['host1', 'host2', 'host3', 'host4'])
|
||||||
random.random().AndReturn(.2)
|
random.random().AndReturn(.2)
|
||||||
driver.instance_update_db(ctxt, instance2['uuid'],
|
driver.instance_update_db(ctxt, instance2['uuid']).WithSideEffects(
|
||||||
'host1').WithSideEffects(inc_launch_index).AndReturn(instance2)
|
inc_launch_index).AndReturn(instance2)
|
||||||
compute_rpcapi.ComputeAPI.run_instance(ctxt, host='host1',
|
compute_rpcapi.ComputeAPI.run_instance(ctxt, host='host1',
|
||||||
instance=instance2, requested_networks=None,
|
instance=instance2, requested_networks=None,
|
||||||
injected_files=None, admin_password=None, is_first_time=None,
|
injected_files=None, admin_password=None, is_first_time=None,
|
||||||
|
@ -712,7 +712,7 @@ class SchedulerDriverModuleTestCase(test.TestCase):
|
|||||||
|
|
||||||
timeutils.utcnow().AndReturn('fake-now')
|
timeutils.utcnow().AndReturn('fake-now')
|
||||||
db.instance_update(self.context, 'fake_uuid',
|
db.instance_update(self.context, 'fake_uuid',
|
||||||
{'host': host, 'scheduled_at': 'fake-now'})
|
{'host': None, 'scheduled_at': 'fake-now'})
|
||||||
rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue)
|
rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue)
|
||||||
rpc.cast(self.context, queue,
|
rpc.cast(self.context, queue,
|
||||||
{'method': method,
|
{'method': method,
|
||||||
|
Loading…
Reference in New Issue
Block a user