Add the instance update calls from Compute
Now that the Scheduler has RPC calls to receive updates from compute whenever a compute node's instances change significantly, we need to add calls to compute to send those updates. DocImpact We add a new CONF option 'scheduler_instance_sync_interval'. This option is an integer representing the interval (in seconds) that the compute node waits between sending a sync message to the Scheduler. It defaults to 120 (2 minutes). Of course, if the CONF option 'scheduler_tracks_instance_changes' is False, the sync calls will not be made. If an operator notices in the logs that out-of-sync situations are not very common, this value can be increased to lower the number of RPC messages being sent. Likewise, if sync issues turn out to be a problem, this can be lowered to check more frequently. Partially-Implements: blueprint isolate-scheduler-db (https://blueprints.launchpad.net/nova/+spec/isolate-scheduler-db) Change-Id: I5e93ec949e2fa0e29c52beb20201696caad85776
This commit is contained in:
@@ -82,6 +82,7 @@ from nova.openstack.common import periodic_task
|
||||
from nova import paths
|
||||
from nova import rpc
|
||||
from nova import safe_utils
|
||||
from nova.scheduler import client as scheduler_client
|
||||
from nova.scheduler import rpcapi as scheduler_rpcapi
|
||||
from nova import utils
|
||||
from nova.virt import block_device as driver_block_device
|
||||
@@ -171,7 +172,14 @@ interval_opts = [
|
||||
cfg.IntOpt('block_device_allocate_retries_interval',
|
||||
default=3,
|
||||
help='Waiting time interval (seconds) between block'
|
||||
' device allocation retries on failures')
|
||||
' device allocation retries on failures'),
|
||||
cfg.IntOpt('scheduler_instance_sync_interval',
|
||||
default=120,
|
||||
help='Waiting time interval (seconds) between sending the '
|
||||
'scheduler a list of current instance UUIDs to verify '
|
||||
'that its view of instances is in sync with nova. If the '
|
||||
'CONF option `scheduler_tracks_instance_changes` is '
|
||||
'False, changing this option will have no effect.'),
|
||||
]
|
||||
|
||||
timeout_opts = [
|
||||
@@ -241,7 +249,8 @@ CONF.import_opt('html5_proxy_base_url', 'nova.rdp', group='rdp')
|
||||
CONF.import_opt('enabled', 'nova.console.serial', group='serial_console')
|
||||
CONF.import_opt('base_url', 'nova.console.serial', group='serial_console')
|
||||
CONF.import_opt('destroy_after_evacuate', 'nova.utils', group='workarounds')
|
||||
|
||||
CONF.import_opt('scheduler_tracks_instance_changes',
|
||||
'nova.scheduler.host_manager')
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -632,10 +641,12 @@ class ComputeManager(manager.Manager):
|
||||
self.consoleauth_rpcapi = consoleauth.rpcapi.ConsoleAuthAPI()
|
||||
self.cells_rpcapi = cells_rpcapi.CellsAPI()
|
||||
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
|
||||
self.scheduler_client = scheduler_client.SchedulerClient()
|
||||
self._resource_tracker_dict = {}
|
||||
self.instance_events = InstanceEvents()
|
||||
self._sync_power_pool = eventlet.GreenPool()
|
||||
self._syncs_in_progress = {}
|
||||
self.send_instance_updates = CONF.scheduler_tracks_instance_changes
|
||||
if CONF.max_concurrent_builds != 0:
|
||||
self._build_semaphore = eventlet.semaphore.Semaphore(
|
||||
CONF.max_concurrent_builds)
|
||||
@@ -864,6 +875,7 @@ class ComputeManager(manager.Manager):
|
||||
else:
|
||||
self.consoleauth_rpcapi.delete_tokens_for_instance(context,
|
||||
instance.uuid)
|
||||
self._delete_scheduler_instance_info(context, instance.uuid)
|
||||
|
||||
def _init_instance(self, context, instance):
|
||||
'''Initialize this instance during service init.'''
|
||||
@@ -1217,6 +1229,7 @@ class ComputeManager(manager.Manager):
|
||||
finally:
|
||||
if CONF.defer_iptables_apply:
|
||||
self.driver.filter_defer_apply_off()
|
||||
self._update_scheduler_instance_info(context, instances)
|
||||
|
||||
def cleanup_host(self):
|
||||
self.driver.cleanup_host(host=self.host)
|
||||
@@ -1969,6 +1982,40 @@ class ComputeManager(manager.Manager):
|
||||
instance.save(expected_task_state=task_states.SPAWNING)
|
||||
return instance
|
||||
|
||||
def _update_scheduler_instance_info(self, context, instance):
|
||||
"""Sends an InstanceList with created or updated Instance objects to
|
||||
the Scheduler client.
|
||||
|
||||
In the case of init_host, the value passed will already be an
|
||||
InstanceList. Other calls will send individual Instance objects that
|
||||
have been created or resized. In this case, we create an InstanceList
|
||||
object containing that Instance.
|
||||
"""
|
||||
if not self.send_instance_updates:
|
||||
return
|
||||
if isinstance(instance, objects.Instance):
|
||||
instance = objects.InstanceList(objects=[instance])
|
||||
context = context.elevated()
|
||||
self.scheduler_client.update_instance_info(context, self.host,
|
||||
instance)
|
||||
|
||||
def _delete_scheduler_instance_info(self, context, instance_uuid):
|
||||
"""Sends the uuid of the deleted Instance to the Scheduler client."""
|
||||
if not self.send_instance_updates:
|
||||
return
|
||||
context = context.elevated()
|
||||
self.scheduler_client.delete_instance_info(context, self.host,
|
||||
instance_uuid)
|
||||
|
||||
@periodic_task.periodic_task(spacing=CONF.scheduler_instance_sync_interval)
|
||||
def _sync_scheduler_instance_info(self, context):
|
||||
if not self.send_instance_updates:
|
||||
return
|
||||
context = context.elevated()
|
||||
instances = objects.InstanceList.get_by_host(context, self.host)
|
||||
uuids = [instance.uuid for instance in instances]
|
||||
self.scheduler_client.sync_instance_info(context, self.host, uuids)
|
||||
|
||||
def _notify_about_instance_usage(self, context, instance, event_suffix,
|
||||
network_info=None, system_metadata=None,
|
||||
extra_usage_info=None, fault=None):
|
||||
@@ -2275,6 +2322,7 @@ class ComputeManager(manager.Manager):
|
||||
self._notify_about_instance_usage(context, instance,
|
||||
'create.end', fault=e)
|
||||
|
||||
self._update_scheduler_instance_info(context, instance)
|
||||
self._notify_about_instance_usage(context, instance, 'create.end',
|
||||
extra_usage_info={'message': _('Success')},
|
||||
network_info=network_info)
|
||||
@@ -2939,7 +2987,7 @@ class ComputeManager(manager.Manager):
|
||||
instance.progress = 0
|
||||
instance.save()
|
||||
self.stop_instance(context, instance)
|
||||
|
||||
self._update_scheduler_instance_info(context, instance)
|
||||
self._notify_about_instance_usage(
|
||||
context, instance, "rebuild.end",
|
||||
network_info=network_info,
|
||||
@@ -3985,6 +4033,7 @@ class ComputeManager(manager.Manager):
|
||||
instance.launched_at = timeutils.utcnow()
|
||||
instance.save(expected_task_state=task_states.RESIZE_FINISH)
|
||||
|
||||
self._update_scheduler_instance_info(context, instance)
|
||||
self._notify_about_instance_usage(
|
||||
context, instance, "finish_resize.end",
|
||||
network_info=network_info)
|
||||
@@ -4298,6 +4347,7 @@ class ComputeManager(manager.Manager):
|
||||
instance.task_state = None
|
||||
instance.save(expected_task_state=[task_states.SHELVING,
|
||||
task_states.SHELVING_OFFLOADING])
|
||||
self._delete_scheduler_instance_info(context, instance.uuid)
|
||||
self._notify_about_instance_usage(context, instance,
|
||||
'shelve_offload.end')
|
||||
|
||||
@@ -4383,6 +4433,7 @@ class ComputeManager(manager.Manager):
|
||||
self._unshelve_instance_key_restore(instance, scrubbed_keys)
|
||||
self._update_instance_after_spawn(context, instance)
|
||||
instance.save(expected_task_state=task_states.SPAWNING)
|
||||
self._update_scheduler_instance_info(context, instance)
|
||||
self._notify_about_instance_usage(context, instance, 'unshelve.end')
|
||||
|
||||
@messaging.expected_exceptions(NotImplementedError)
|
||||
@@ -5245,6 +5296,7 @@ class ComputeManager(manager.Manager):
|
||||
# host even before next periodic task.
|
||||
self.update_available_resource(ctxt)
|
||||
|
||||
self._update_scheduler_instance_info(ctxt, instance)
|
||||
self._notify_about_instance_usage(ctxt, instance,
|
||||
"live_migration._post.end",
|
||||
network_info=network_info)
|
||||
|
||||
@@ -26,6 +26,7 @@ from nova.api.ec2 import cloud
|
||||
from nova.api.ec2 import ec2utils
|
||||
from nova.compute import api as compute_api
|
||||
from nova.compute import flavors
|
||||
from nova.compute import manager as compute_manager
|
||||
from nova.compute import utils as compute_utils
|
||||
from nova import context
|
||||
from nova import db
|
||||
@@ -153,6 +154,12 @@ class CinderCloudTestCase(test.TestCase):
|
||||
self.volume_api = volume.API()
|
||||
self.volume_api.reset_fake_api(self.context)
|
||||
|
||||
self.stubs.Set(compute_manager.ComputeManager,
|
||||
'_update_scheduler_instance_info', dumb)
|
||||
self.stubs.Set(compute_manager.ComputeManager,
|
||||
'_delete_scheduler_instance_info', dumb)
|
||||
self.stubs.Set(compute_manager.ComputeManager,
|
||||
'_sync_scheduler_instance_info', dumb)
|
||||
self.useFixture(cast_as_call.CastAsCall(self.stubs))
|
||||
|
||||
# make sure we can map ami-00000001/2 to a uuid in FakeImageService
|
||||
|
||||
@@ -39,6 +39,7 @@ from nova.api.metadata import password
|
||||
from nova import availability_zones
|
||||
from nova.compute import api as compute_api
|
||||
from nova.compute import flavors
|
||||
from nova.compute import manager as compute_manager
|
||||
from nova.compute import power_state
|
||||
from nova.compute import rpcapi as compute_rpcapi
|
||||
from nova.compute import utils as compute_utils
|
||||
@@ -201,6 +202,12 @@ class CloudTestCase(test.TestCase):
|
||||
is_admin=True)
|
||||
self.volume_api = volume.API()
|
||||
|
||||
self.stubs.Set(compute_manager.ComputeManager,
|
||||
'_update_scheduler_instance_info', dumb)
|
||||
self.stubs.Set(compute_manager.ComputeManager,
|
||||
'_delete_scheduler_instance_info', dumb)
|
||||
self.stubs.Set(compute_manager.ComputeManager,
|
||||
'_sync_scheduler_instance_info', dumb)
|
||||
self.useFixture(cast_as_call.CastAsCall(self.stubs))
|
||||
|
||||
# make sure we can map ami-00000001/2 to a uuid in FakeImageService
|
||||
|
||||
@@ -26,6 +26,7 @@ from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
import nova
|
||||
from nova.compute import build_results
|
||||
from nova.compute import manager
|
||||
from nova.compute import power_state
|
||||
@@ -383,7 +384,6 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
|
||||
|
||||
def test_init_host(self):
|
||||
our_host = self.compute.host
|
||||
fake_context = 'fake-context'
|
||||
inst = fake_instance.fake_db_instance(
|
||||
vm_state=vm_states.ACTIVE,
|
||||
info_cache=dict(test_instance_info_cache.fake_info_cache,
|
||||
@@ -393,19 +393,19 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
|
||||
|
||||
def _do_mock_calls(defer_iptables_apply):
|
||||
self.compute.driver.init_host(host=our_host)
|
||||
context.get_admin_context().AndReturn(fake_context)
|
||||
context.get_admin_context().AndReturn(self.context)
|
||||
db.instance_get_all_by_host(
|
||||
fake_context, our_host, columns_to_join=['info_cache'],
|
||||
self.context, our_host, columns_to_join=['info_cache'],
|
||||
use_slave=False
|
||||
).AndReturn(startup_instances)
|
||||
if defer_iptables_apply:
|
||||
self.compute.driver.filter_defer_apply_on()
|
||||
self.compute._destroy_evacuated_instances(fake_context)
|
||||
self.compute._init_instance(fake_context,
|
||||
self.compute._destroy_evacuated_instances(self.context)
|
||||
self.compute._init_instance(self.context,
|
||||
mox.IsA(objects.Instance))
|
||||
self.compute._init_instance(fake_context,
|
||||
self.compute._init_instance(self.context,
|
||||
mox.IsA(objects.Instance))
|
||||
self.compute._init_instance(fake_context,
|
||||
self.compute._init_instance(self.context,
|
||||
mox.IsA(objects.Instance))
|
||||
if defer_iptables_apply:
|
||||
self.compute.driver.filter_defer_apply_off()
|
||||
@@ -460,7 +460,6 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
|
||||
def test_init_host_with_deleted_migration(self):
|
||||
our_host = self.compute.host
|
||||
not_our_host = 'not-' + our_host
|
||||
fake_context = 'fake-context'
|
||||
|
||||
deleted_instance = fake_instance.fake_instance_obj(
|
||||
self.context, host=not_our_host, uuid='fake-uuid')
|
||||
@@ -475,8 +474,8 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
|
||||
self.mox.StubOutWithMock(self.compute, '_get_instance_nw_info')
|
||||
|
||||
self.compute.driver.init_host(host=our_host)
|
||||
context.get_admin_context().AndReturn(fake_context)
|
||||
db.instance_get_all_by_host(fake_context, our_host,
|
||||
context.get_admin_context().AndReturn(self.context)
|
||||
db.instance_get_all_by_host(self.context, our_host,
|
||||
columns_to_join=['info_cache'],
|
||||
use_slave=False
|
||||
).AndReturn([])
|
||||
@@ -484,13 +483,13 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
|
||||
|
||||
# simulate failed instance
|
||||
self.compute._get_instances_on_driver(
|
||||
fake_context, {'deleted': False}).AndReturn([deleted_instance])
|
||||
self.compute._get_instance_nw_info(fake_context, deleted_instance
|
||||
self.context, {'deleted': False}).AndReturn([deleted_instance])
|
||||
self.compute._get_instance_nw_info(self.context, deleted_instance
|
||||
).AndRaise(exception.InstanceNotFound(
|
||||
instance_id=deleted_instance['uuid']))
|
||||
# ensure driver.destroy is called so that driver may
|
||||
# clean up any dangling files
|
||||
self.compute.driver.destroy(fake_context, deleted_instance,
|
||||
self.compute.driver.destroy(self.context, deleted_instance,
|
||||
mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
|
||||
|
||||
self.mox.ReplayAll()
|
||||
@@ -1034,8 +1033,6 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
|
||||
self.assertIsNone(init_return)
|
||||
|
||||
def test_get_instances_on_driver(self):
|
||||
fake_context = context.get_admin_context()
|
||||
|
||||
driver_instances = []
|
||||
for x in xrange(10):
|
||||
driver_instances.append(fake_instance.fake_db_instance())
|
||||
@@ -1047,7 +1044,7 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
|
||||
self.compute.driver.list_instance_uuids().AndReturn(
|
||||
[inst['uuid'] for inst in driver_instances])
|
||||
db.instance_get_all_by_filters(
|
||||
fake_context,
|
||||
self.context,
|
||||
{'uuid': [inst['uuid'] for
|
||||
inst in driver_instances]},
|
||||
'created_at', 'desc', columns_to_join=None,
|
||||
@@ -1057,17 +1054,16 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
|
||||
|
||||
self.mox.ReplayAll()
|
||||
|
||||
result = self.compute._get_instances_on_driver(fake_context)
|
||||
result = self.compute._get_instances_on_driver(self.context)
|
||||
self.assertEqual([x['uuid'] for x in driver_instances],
|
||||
[x['uuid'] for x in result])
|
||||
|
||||
@mock.patch('nova.virt.driver.ComputeDriver.list_instance_uuids')
|
||||
@mock.patch('nova.db.api.instance_get_all_by_filters')
|
||||
def test_get_instances_on_driver_empty(self, mock_list, mock_db):
|
||||
fake_context = context.get_admin_context()
|
||||
mock_list.return_value = []
|
||||
|
||||
result = self.compute._get_instances_on_driver(fake_context)
|
||||
result = self.compute._get_instances_on_driver(self.context)
|
||||
# instance_get_all_by_filters should not be called
|
||||
self.assertEqual(0, mock_db.call_count)
|
||||
self.assertEqual([],
|
||||
@@ -1078,7 +1074,6 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
|
||||
# 'list_instance_uuids'
|
||||
self.compute.host = 'host'
|
||||
filters = {'host': self.compute.host}
|
||||
fake_context = context.get_admin_context()
|
||||
|
||||
self.flags(instance_name_template='inst-%i')
|
||||
|
||||
@@ -1102,14 +1097,14 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
|
||||
self.compute.driver.list_instances().AndReturn(
|
||||
[inst['name'] for inst in driver_instances])
|
||||
db.instance_get_all_by_filters(
|
||||
fake_context, filters,
|
||||
self.context, filters,
|
||||
'created_at', 'desc', columns_to_join=None,
|
||||
limit=None, marker=None,
|
||||
use_slave=True).AndReturn(all_instances)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
|
||||
result = self.compute._get_instances_on_driver(fake_context, filters)
|
||||
result = self.compute._get_instances_on_driver(self.context, filters)
|
||||
self.assertEqual([x['uuid'] for x in driver_instances],
|
||||
[x['uuid'] for x in result])
|
||||
|
||||
@@ -2326,6 +2321,70 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
|
||||
|
||||
self.assertFalse(log_mock.called)
|
||||
|
||||
@mock.patch.object(nova.scheduler.client.SchedulerClient,
|
||||
'update_instance_info')
|
||||
def test_update_scheduler_instance_info(self, mock_update):
|
||||
instance = objects.Instance(uuid='fake')
|
||||
self.compute._update_scheduler_instance_info(self.context, instance)
|
||||
self.assertEqual(mock_update.call_count, 1)
|
||||
args = mock_update.call_args[0]
|
||||
self.assertNotEqual(args[0], self.context)
|
||||
self.assertIsInstance(args[0], self.context.__class__)
|
||||
self.assertEqual(args[1], self.compute.host)
|
||||
# Send a single instance; check that the method converts to an
|
||||
# InstanceList
|
||||
self.assertIsInstance(args[2], objects.InstanceList)
|
||||
self.assertEqual(args[2].objects[0], instance)
|
||||
|
||||
@mock.patch.object(nova.scheduler.client.SchedulerClient,
|
||||
'delete_instance_info')
|
||||
def test_delete_scheduler_instance_info(self, mock_delete):
|
||||
self.compute._delete_scheduler_instance_info(self.context,
|
||||
mock.sentinel.inst_uuid)
|
||||
self.assertEqual(mock_delete.call_count, 1)
|
||||
args = mock_delete.call_args[0]
|
||||
self.assertNotEqual(args[0], self.context)
|
||||
self.assertIsInstance(args[0], self.context.__class__)
|
||||
self.assertEqual(args[1], self.compute.host)
|
||||
self.assertEqual(args[2], mock.sentinel.inst_uuid)
|
||||
|
||||
@mock.patch.object(nova.objects.InstanceList, 'get_by_host')
|
||||
@mock.patch.object(nova.scheduler.client.SchedulerClient,
|
||||
'sync_instance_info')
|
||||
def test_sync_scheduler_instance_info(self, mock_sync, mock_get_by_host):
|
||||
inst1 = objects.Instance(uuid='fake1')
|
||||
inst2 = objects.Instance(uuid='fake2')
|
||||
inst3 = objects.Instance(uuid='fake3')
|
||||
mock_get_by_host.return_value = objects.InstanceList(
|
||||
objects=[inst1, inst2, inst3])
|
||||
self.compute._sync_scheduler_instance_info(self.context)
|
||||
self.assertEqual(mock_sync.call_count, 1)
|
||||
args = mock_sync.call_args[0]
|
||||
exp_uuids = [inst.uuid for inst in [inst1, inst2, inst3]]
|
||||
self.assertIsInstance(args[0], self.context.__class__)
|
||||
self.assertEqual(args[1], self.compute.host)
|
||||
self.assertEqual(args[2], exp_uuids)
|
||||
|
||||
@mock.patch.object(nova.scheduler.client.SchedulerClient,
|
||||
'sync_instance_info')
|
||||
@mock.patch.object(nova.scheduler.client.SchedulerClient,
|
||||
'delete_instance_info')
|
||||
@mock.patch.object(nova.scheduler.client.SchedulerClient,
|
||||
'update_instance_info')
|
||||
def test_scheduler_info_updates_off(self, mock_update, mock_delete,
|
||||
mock_sync):
|
||||
mgr = self.compute
|
||||
mgr.send_instance_updates = False
|
||||
mgr._update_scheduler_instance_info(self.context,
|
||||
mock.sentinel.instance)
|
||||
mgr._delete_scheduler_instance_info(self.context,
|
||||
mock.sentinel.instance_uuid)
|
||||
mgr._sync_scheduler_instance_info(self.context)
|
||||
# None of the calls should have been made
|
||||
self.assertFalse(mock_update.called)
|
||||
self.assertFalse(mock_delete.called)
|
||||
self.assertFalse(mock_sync.called)
|
||||
|
||||
|
||||
class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
|
||||
def setUp(self):
|
||||
@@ -3323,13 +3382,15 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
|
||||
self.assertIsNotNone(args[1].launched_at)
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.compute,
|
||||
'_update_scheduler_instance_info'),
|
||||
mock.patch.object(self.compute.driver, 'spawn'),
|
||||
mock.patch.object(self.compute,
|
||||
'_build_networks_for_instance', return_value=[]),
|
||||
mock.patch.object(self.instance, 'save'),
|
||||
mock.patch.object(self.compute, '_notify_about_instance_usage',
|
||||
side_effect=fake_notify)
|
||||
) as (mock_spawn, mock_networks, mock_save, mock_notify):
|
||||
) as (mock_upd, mock_spawn, mock_networks, mock_save, mock_notify):
|
||||
self.compute._build_and_run_instance(self.context, self.instance,
|
||||
self.image, self.injected_files, self.admin_pass,
|
||||
self.requested_networks, self.security_groups,
|
||||
|
||||
Reference in New Issue
Block a user