Vm state management and error states

this implements the blueprint nova-vm-state-management
It implements the following functionalities:
- Filter compute api calls according to state of the VM
(defined in compute/state_checker).
- Sets error state if the scheduler cannot allocate the VM in any host
- Handles the create/delete concurrency in the compute manager

Change-Id: Ie6d016b7d4781f70bb5967f204fa88a6412bd727
This commit is contained in:
David Subiros
2011-11-16 17:31:29 +00:00
committed by Vishvananda Ishaya
parent 598753663d
commit 17a0fbe271
4 changed files with 267 additions and 33 deletions

View File

@@ -250,6 +250,11 @@ class InvalidParameterValue(Invalid):
message = _("%(err)s")
class InstanceInvalidState(Invalid):
message = _("Instance %(instance_uuid)s in state %(state)s. Cannot "
"%(method)s while the instance is in this state.")
class InstanceNotRunning(Invalid):
message = _("Instance %(instance_id)s is not running.")

View File

@@ -25,12 +25,13 @@ import functools
from nova.compute import vm_states
from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import manager
from nova import rpc
from nova import utils
from nova.scheduler import zone_manager
from nova import utils
LOG = logging.getLogger('nova.scheduler.manager')
FLAGS = flags.FLAGS
@@ -101,17 +102,24 @@ class SchedulerManager(manager.Manager):
# Scheduler methods are responsible for casting.
try:
return real_meth(*args, **kwargs)
except Exception as e:
# If this affects a particular instance, move that
# instance to the ERROR state
if 'instance_id' in kwargs:
instance_id = kwargs['instance_id']
LOG.warning(_("Failed to %(driver_method)s: %(e)s. "
"Putting instance %(instance_id)s into "
except exception.NoValidHost as ex:
self._set_instance_error(method, context, ex, *args, **kwargs)
except Exception as ex:
with utils.save_and_reraise_exception():
self._set_instance_error(method, context, ex, *args, **kwargs)
# NOTE (David Subiros) : If the exception is raised ruing run_instance
# method, the DB record probably does not exist yet.
def _set_instance_error(self, method, context, ex, *args, **kwargs):
"""Sets VM to Error state"""
LOG.warning(_("Failed to schedule_%(method)s: %(ex)s") % locals())
if method == "start_instance" or method == "run_instance":
instance_id = kwargs['instance_id']
if instance_id:
LOG.warning(_("Setting instance %(instance_id)s to "
"ERROR state.") % locals())
db.instance_update(context, kwargs['instance_id'],
dict(vm_state=vm_states.ERROR))
raise
db.instance_update(context, instance_id,
{'vm_state': vm_states.ERROR})
# NOTE (masumotok) : This method should be moved to nova.api.ec2.admin.
# Based on bexar design summit discussion,

View File

@@ -21,6 +21,7 @@ Tests For Scheduler
import datetime
import mox
import stubout
from novaclient import v1_1 as novaclient
from novaclient import exceptions as novaclient_exceptions
@@ -38,9 +39,9 @@ from nova.scheduler import driver
from nova.scheduler import manager
from nova.scheduler.simple import SimpleScheduler
from nova.compute import power_state
from nova.compute import task_states
from nova.compute import vm_states
FLAGS = flags.FLAGS
flags.DECLARE('max_cores', 'nova.scheduler.simple')
flags.DECLARE('stub_network', 'nova.compute.manager')
@@ -143,6 +144,10 @@ class SchedulerTestCase(test.TestCase):
driver = 'nova.tests.scheduler.test_scheduler.TestDriver'
self.flags(scheduler_driver=driver)
def tearDown(self):
self.stubs.UnsetAll()
super(SchedulerTestCase, self).tearDown()
def _create_compute_service(self):
"""Create compute-manager(ComputeNode and Service record)."""
ctxt = context.get_admin_context()
@@ -205,6 +210,41 @@ class SchedulerTestCase(test.TestCase):
return False
return True
def _assert_state(self, state_dict):
"""assert the instance is in the state defined by state_dict"""
instances = db.instance_get_all(context.get_admin_context())
self.assertEqual(len(instances), 1)
if 'vm_state' in state_dict:
self.assertEqual(state_dict['vm_state'], instances[0]['vm_state'])
if 'task_state' in state_dict:
self.assertEqual(state_dict['task_state'],
instances[0]['task_state'])
if 'power_state' in state_dict:
self.assertEqual(state_dict['power_state'],
instances[0]['power_state'])
def test_no_valid_host_exception_on_start(self):
"""check the vm goes to ERROR state if the scheduler fails.
If the scheduler driver cannot allocate a host for the VM during
start_instance, it will raise a NoValidHost exception. In this
scenario, we have to make sure that the VM state is set to ERROR.
"""
def NoValidHost_raiser(context, topic, *args, **kwargs):
raise exception.NoValidHost(_("Test NoValidHost exception"))
scheduler = manager.SchedulerManager()
ins_ref = _create_instance(task_state=task_states.STARTING,
vm_state=vm_states.STOPPED)
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(TestDriver, 'schedule', NoValidHost_raiser)
self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True)
ctxt = context.get_admin_context()
scheduler.start_instance(ctxt, 'topic', instance_id=ins_ref['id'])
# assert that the instance goes to ERROR state
self._assert_state({'vm_state': vm_states.ERROR,
'task_state': task_states.STARTING})
def test_show_host_resources_no_project(self):
"""No instance are running on the given host."""
@@ -247,21 +287,6 @@ class SchedulerTestCase(test.TestCase):
db.instance_destroy(ctxt, i_ref1['id'])
db.instance_destroy(ctxt, i_ref2['id'])
def test_exception_puts_instance_in_error_state(self):
"""Test that an exception from the scheduler puts an instance
in the ERROR state."""
scheduler = manager.SchedulerManager()
ctxt = context.get_admin_context()
inst = _create_instance()
self.assertRaises(Exception, scheduler._schedule,
'failing_method', ctxt, 'scheduler',
instance_id=inst['uuid'])
# Refresh the instance
inst = db.instance_get(ctxt, inst['id'])
self.assertEqual(inst['vm_state'], vm_states.ERROR)
class SimpleDriverTestCase(test.TestCase):
"""Test case for simple driver"""

View File

@@ -19,7 +19,6 @@
"""
Tests For Compute
"""
from copy import copy
from webob import exc
@@ -30,6 +29,7 @@ from nova import compute
from nova.compute import instance_types
from nova.compute import manager as compute_manager
from nova.compute import power_state
from nova.compute import state_checker
from nova.compute import task_states
from nova.compute import vm_states
from nova import context
@@ -111,6 +111,8 @@ class BaseTestCase(test.TestCase):
self.project_id = 'fake'
self.context = context.RequestContext(self.user_id, self.project_id)
test_notifier.NOTIFICATIONS = []
self.mox = mox.Mox()
self.total_waits = 0
def fake_show(meh, context, id):
return {'id': 1, 'min_disk': None, 'min_ram': None,
@@ -120,7 +122,14 @@ class BaseTestCase(test.TestCase):
self.stubs.Set(rpc, 'call', rpc_call_wrapper)
self.stubs.Set(rpc, 'cast', rpc_cast_wrapper)
def _create_fake_instance(self, params=None):
def tearDown(self):
self.mox.UnsetStubs()
instances = db.instance_get_all(self.context.elevated())
for instance in instances:
db.instance_destroy(self.context.elevated(), instance['id'])
super(BaseTestCase, self).tearDown()
def _create_fake_instance(self, params=None, type_name='m1.tiny'):
"""Create a test instance"""
if not params:
params = {}
@@ -131,12 +140,16 @@ class BaseTestCase(test.TestCase):
inst['launch_time'] = '10'
inst['user_id'] = self.user_id
inst['project_id'] = self.project_id
type_id = instance_types.get_instance_type_by_name('m1.tiny')['id']
type_id = instance_types.get_instance_type_by_name(type_name)['id']
inst['instance_type_id'] = type_id
inst['ami_launch_index'] = 0
inst.update(params)
return db.instance_create(self.context, inst)
def _create_instance(self, params=None, type_name='m1.tiny'):
"""Create a test instance. Returns uuid"""
return self._create_fake_instance(params, type_name=type_name)['uuid']
def _create_instance_type(self, params=None):
"""Create a test instance type"""
if not params:
@@ -195,6 +208,77 @@ class ComputeTestCase(BaseTestCase):
finally:
db.instance_destroy(self.context, instance['id'])
def _assert_state(self, state_dict):
"""assert the instance is in the state defined by state_dict"""
instances = db.instance_get_all(context.get_admin_context())
self.assertEqual(len(instances), 1)
if 'vm_state' in state_dict:
self.assertEqual(state_dict['vm_state'], instances[0]['vm_state'])
if 'task_state' in state_dict:
self.assertEqual(state_dict['task_state'],
instances[0]['task_state'])
if 'power_state' in state_dict:
self.assertEqual(state_dict['power_state'],
instances[0]['power_state'])
def test_fail_to_schedule_persists(self):
"""check the persistence of the ERROR(scheduling) state"""
self._create_instance(params={'vm_state': vm_states.ERROR,
'task_state': task_states.SCHEDULING})
#check state is failed even after the periodic poll
error_list = self.compute.periodic_tasks(context.get_admin_context())
self._assert_state({'vm_state': vm_states.ERROR,
'task_state': task_states.SCHEDULING})
def test_run_instance_setup_block_device_mapping_fail(self):
""" block device mapping failure test.
Make sure that when there is a block device mapping problem,
the instance goes to ERROR state, keeping the task state
"""
def fake(*args, **kwargs):
raise Exception("Failed to block device mapping")
self.stubs.Set(nova.compute.manager.ComputeManager,
'_setup_block_device_mapping', fake)
instance_uuid = self._create_instance()
self.assertRaises(Exception, self.compute.run_instance,
self.context, instance_uuid)
#check state is failed even after the periodic poll
self._assert_state({'vm_state': vm_states.ERROR,
'task_state': task_states.BLOCK_DEVICE_MAPPING})
error_list = self.compute.periodic_tasks(context.get_admin_context())
self._assert_state({'vm_state': vm_states.ERROR,
'task_state': task_states.BLOCK_DEVICE_MAPPING})
def test_run_instance_spawn_fail(self):
""" spawn failure test.
Make sure that when there is a spawning problem,
the instance goes to ERROR state, keeping the task state"""
def fake(*args, **kwargs):
raise Exception("Failed to spawn")
self.stubs.Set(self.compute.driver, 'spawn', fake)
instance_uuid = self._create_instance()
self.assertRaises(Exception, self.compute.run_instance,
self.context, instance_uuid)
#check state is failed even after the periodic poll
self._assert_state({'vm_state': vm_states.ERROR,
'task_state': task_states.SPAWNING})
error_list = self.compute.periodic_tasks(context.get_admin_context())
self._assert_state({'vm_state': vm_states.ERROR,
'task_state': task_states.SPAWNING})
def test_can_terminate_on_error_state(self):
"""Make sure that the instance can be terminated in ERROR state"""
elevated = context.get_admin_context()
#check failed to schedule --> terminate
instance_uuid = self._create_instance(params={'vm_state':
vm_states.ERROR})
self.compute.terminate_instance(self.context, instance_uuid)
self.assertRaises(exception.InstanceNotFound, db.instance_get_by_uuid,
elevated, instance_uuid)
def test_run_terminate(self):
"""Make sure it is possible to run and terminate instance"""
instance = self._create_fake_instance()
@@ -1157,6 +1241,108 @@ class ComputeAPITestCase(BaseTestCase):
'properties': {'kernel_id': 1, 'ramdisk_id': 1},
}
def test_check_vm_state_filtered_function(self):
"""Test the check_vm_state mechanism for filtered functions.
Checks that the filtered_function is correctly filtered
in the right states only for the api_check_vm_states flag set to True.
Note that the filtered_function takes the same number of arguments
than the real functions that are decorated in the compute api.
"""
@compute.api.check_vm_state('filtered_function')
def filtered_function(api, context, instance_ref):
LOG.debug("filtered_function executed")
return True
def filtered_assume_right_state(instance_ref):
self.flags(api_check_vm_states=True)
self.assertTrue(filtered_function(self.compute_api,
self.context, instance_ref))
def filtered_assume_wrong_state(instance_ref):
self.flags(api_check_vm_states=True)
self.assertRaises(exception.InstanceInvalidState,
filtered_function, self.compute_api,
self.context, instance_ref)
self.flags(api_check_vm_states=False)
self.assertTrue(filtered_function(self.compute_api,
self.context, instance_ref))
# check that the filtered_function is correctly filtered
self._execute_allowed_and_blocked('filtered_function',
filtered_assume_right_state,
filtered_assume_wrong_state)
def test_check_vm_state_non_filtered_function(self):
"""Test the check_vm_state mechanism for non filtered functions.
Checks that if a function that is decorated with the check_vm_state
but it is not defined in any blocked dictionary, it will always
be executed
"""
@compute.api.check_vm_state('non_filtered_function')
def non_filtered_function(api, context, instance_ref):
LOG.debug("non_filtered_function executed")
return True
def non_filtered_assume_executed(instance_ref):
self.flags(api_check_vm_states=True)
self.assertTrue(non_filtered_function(self.compute_api,
self.context, instance_ref))
# check that the non_filtered_function is never filtered
self._execute_allowed_and_blocked('non_filtered_function',
non_filtered_assume_executed,
non_filtered_assume_executed)
def _execute_allowed_and_blocked(self, func_name, f_allowed, f_blocked):
"""Execute f_allowed and f_blocked functions for all the scenarios.
Get an allowed vm_state, a blocked vm_state, an allowed task_state,
and a blocked task_state for the function defined by func_name to be
executed. Then it executes the function f_allowed or f_blocked
accordingly, passing as parameter a new instance id. Theses functions
have to run the func_name function and assert the expected result
"""
# define blocked and allowed states
blocked_tsk = task_states.SCHEDULING
ok_task = task_states.NETWORKING
blocked_vm = vm_states.BUILDING
ok_vm = vm_states.RESCUED
blocked_comb = {'power_state': power_state.RUNNING,
'vm_state': vm_states.ACTIVE, 'task_state': None}
ok_comb = {'power_state': power_state.RUNNING,
'vm_state': vm_states.PAUSED, 'task_state': None}
# To guarantee a 100% test coverage we create fake lists.
fake_block_for_task_state = {'filtered_function': [blocked_tsk]}
fake_block_for_vm_state = {'filtered_function': [blocked_vm]}
fake_block_for_combination = {'filtered_function': [blocked_comb]}
self.stubs.Set(nova.compute.state_checker, 'block_for_task_state',
fake_block_for_task_state)
self.stubs.Set(nova.compute.state_checker, 'block_for_vm_state',
fake_block_for_vm_state)
self.stubs.Set(nova.compute.state_checker, 'block_for_combination',
fake_block_for_combination)
i_ref = self._create_fake_instance(params={'task_state': blocked_tsk})
f_blocked(i_ref)
i_ref = self._create_fake_instance(params={'task_state': ok_task})
f_allowed(i_ref)
i_ref = self._create_fake_instance(params={'vm_state': blocked_vm})
f_blocked(i_ref)
i_ref = self._create_fake_instance(params={'vm_state': ok_vm})
f_allowed(i_ref)
i_ref = self._create_fake_instance(params=blocked_comb)
f_blocked(i_ref)
i_ref = self._create_fake_instance(params=ok_comb)
f_allowed(i_ref)
def test_create_with_too_little_ram(self):
"""Test an instance type with too little memory"""
@@ -1416,10 +1602,14 @@ class ComputeAPITestCase(BaseTestCase):
db.instance_destroy(self.context, instance_uuid)
def test_resume(self):
"""Ensure instance can be resumed"""
"""Ensure instance can be resumed (if suspended)"""
instance = self._create_fake_instance()
instance_uuid = instance['uuid']
instance_id = instance['id']
self.compute.run_instance(self.context, instance_uuid )
db.instance_update(self.context, instance_id,
{'vm_state': vm_states.SUSPENDED})
instance = db.instance_get(self.context, instance_id)
self.assertEqual(instance['task_state'], None)
@@ -1578,6 +1768,7 @@ class ComputeAPITestCase(BaseTestCase):
params = {'vm_state': vm_states.RESCUED, 'task_state': None}
db.instance_update(self.context, instance_uuid, params)
instance = db.instance_get_by_uuid(self.context, instance_uuid)
self.compute_api.unrescue(self.context, instance)
instance = db.instance_get_by_uuid(self.context, instance_uuid)
@@ -1625,7 +1816,7 @@ class ComputeAPITestCase(BaseTestCase):
db.instance_update(self.context, instance_uuid, instance_values)
instance = self.compute_api.get(self.context, instance_uuid)
self.assertRaises(exception.InstanceBackingUp,
self.assertRaises(exception.InstanceInvalidState,
self.compute_api.backup,
self.context,
instance,
@@ -1643,7 +1834,7 @@ class ComputeAPITestCase(BaseTestCase):
db.instance_update(self.context, instance_uuid, instance_values)
instance = self.compute_api.get(self.context, instance_uuid)
self.assertRaises(exception.InstanceSnapshotting,
self.assertRaises(exception.InstanceInvalidState,
self.compute_api.snapshot,
self.context,
instance,
@@ -1663,6 +1854,11 @@ class ComputeAPITestCase(BaseTestCase):
migration_ref = db.migration_create(context,
{'instance_uuid': instance['uuid'],
'status': 'finished'})
# set the state that the instance gets when resize finishes
db.instance_update(self.context, instance['uuid'],
{'task_state': task_states.RESIZE_VERIFY,
'vm_state': vm_states.ACTIVE})
instance = db.instance_get_by_uuid(context, instance['uuid'])
self.compute_api.confirm_resize(context, instance)
self.compute.terminate_instance(context, instance['uuid'])