Merge "Better start/stop handling for cells"
This commit is contained in:
@@ -65,7 +65,7 @@ class CellsManager(manager.Manager):
|
||||
|
||||
Scheduling requests get passed to the scheduler class.
|
||||
"""
|
||||
RPC_API_VERSION = '1.11'
|
||||
RPC_API_VERSION = '1.12'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
# Mostly for tests.
|
||||
@@ -417,3 +417,14 @@ class CellsManager(manager.Manager):
|
||||
for response in responses:
|
||||
migrations += response.value_or_raise()
|
||||
return migrations
|
||||
|
||||
def start_instance(self, ctxt, instance):
|
||||
"""Start an instance in its cell."""
|
||||
self.msg_runner.start_instance(ctxt, instance)
|
||||
|
||||
def stop_instance(self, ctxt, instance, do_cast=True):
|
||||
"""Stop an instance in its cell."""
|
||||
response = self.msg_runner.stop_instance(ctxt, instance,
|
||||
do_cast=do_cast)
|
||||
if not do_cast:
|
||||
return response.value_or_raise()
|
||||
|
||||
@@ -797,8 +797,34 @@ class _TargetedMessageMethods(_BaseMessageMethods):
|
||||
instance, console_port, console_type)
|
||||
|
||||
def get_migrations(self, message, filters):
|
||||
context = message.ctxt
|
||||
return self.compute_api.get_migrations(context, filters)
|
||||
return self.compute_api.get_migrations(message.ctxt, filters)
|
||||
|
||||
def _call_compute_api_with_obj(self, ctxt, instance, method, *args,
|
||||
**kwargs):
|
||||
try:
|
||||
# NOTE(comstud): We need to refresh the instance from this
|
||||
# cell's view in the DB.
|
||||
instance.refresh(ctxt)
|
||||
except exception.InstanceNotFound:
|
||||
with excutils.save_and_reraise_exception():
|
||||
# Must be a race condition. Let's try to resolve it by
|
||||
# telling the top level cells that this instance doesn't
|
||||
# exist.
|
||||
instance = {'uuid': instance.uuid}
|
||||
self.msg_runner.instance_destroy_at_top(ctxt,
|
||||
instance)
|
||||
fn = getattr(self.compute_api, method, None)
|
||||
return fn(ctxt, instance, *args, **kwargs)
|
||||
|
||||
def start_instance(self, message, instance):
|
||||
"""Start an instance via compute_api.start()."""
|
||||
self._call_compute_api_with_obj(message.ctxt, instance, 'start')
|
||||
|
||||
def stop_instance(self, message, instance):
|
||||
"""Stop an instance via compute_api.stop()."""
|
||||
do_cast = not message.need_response
|
||||
return self._call_compute_api_with_obj(message.ctxt, instance,
|
||||
'stop', do_cast=do_cast)
|
||||
|
||||
|
||||
class _BroadcastMessageMethods(_BaseMessageMethods):
|
||||
@@ -1473,6 +1499,34 @@ class MessageRunner(object):
|
||||
need_response=True)
|
||||
return message.process()
|
||||
|
||||
def _instance_action(self, ctxt, instance, method, extra_kwargs=None,
|
||||
need_response=False):
|
||||
"""Call instance_<method> in correct cell for instance."""
|
||||
cell_name = instance.cell_name
|
||||
if not cell_name:
|
||||
LOG.warn(_("No cell_name for %(method)s() from API"),
|
||||
dict(method=method), instance=instance)
|
||||
return
|
||||
method_kwargs = {'instance': instance}
|
||||
if extra_kwargs:
|
||||
method_kwargs.update(extra_kwargs)
|
||||
message = _TargetedMessage(self, ctxt, method, method_kwargs,
|
||||
'down', cell_name,
|
||||
need_response=need_response)
|
||||
return message.process()
|
||||
|
||||
def start_instance(self, ctxt, instance):
|
||||
"""Start an instance in its cell."""
|
||||
self._instance_action(ctxt, instance, 'start_instance')
|
||||
|
||||
def stop_instance(self, ctxt, instance, do_cast=True):
|
||||
"""Stop an instance in its cell."""
|
||||
if do_cast:
|
||||
self._instance_action(ctxt, instance, 'stop_instance')
|
||||
else:
|
||||
return self._instance_action(ctxt, instance, 'stop_instance',
|
||||
need_response=True)
|
||||
|
||||
@staticmethod
|
||||
def get_message_types():
|
||||
return _CELL_MESSAGE_TYPE_TO_MESSAGE_CLS.keys()
|
||||
|
||||
@@ -67,6 +67,7 @@ class CellsAPI(rpc_proxy.RpcProxy):
|
||||
1.9 - Adds get_capacities()
|
||||
1.10 - Adds bdm_update_or_create_at_top(), and bdm_destroy_at_top()
|
||||
1.11 - Adds get_migrations()
|
||||
1.12 - Adds instance_start() and instance_stop()
|
||||
'''
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
|
||||
@@ -357,3 +358,27 @@ class CellsAPI(rpc_proxy.RpcProxy):
|
||||
"""Get all migrations applying the filters."""
|
||||
return self.call(ctxt, self.make_msg('get_migrations',
|
||||
filters=filters), version='1.11')
|
||||
|
||||
def start_instance(self, ctxt, instance):
|
||||
"""Start an instance in its cell.
|
||||
|
||||
This method takes a new-world instance object.
|
||||
"""
|
||||
if not CONF.cells.enable:
|
||||
return
|
||||
self.cast(ctxt,
|
||||
self.make_msg('start_instance', instance=instance),
|
||||
version='1.12')
|
||||
|
||||
def stop_instance(self, ctxt, instance, do_cast=True):
|
||||
"""Stop an instance in its cell.
|
||||
|
||||
This method takes a new-world instance object.
|
||||
"""
|
||||
if not CONF.cells.enable:
|
||||
return
|
||||
method = do_cast and self.cast or self.call
|
||||
return method(ctxt,
|
||||
self.make_msg('stop_instance', instance=instance,
|
||||
do_cast=do_cast),
|
||||
version='1.12')
|
||||
|
||||
@@ -284,17 +284,14 @@ class ComputeCellsAPI(compute_api.API):
|
||||
def stop(self, context, instance, do_cast=True):
|
||||
"""Stop an instance."""
|
||||
super(ComputeCellsAPI, self).stop(context, instance)
|
||||
if do_cast:
|
||||
self._cast_to_cells(context, instance, 'stop', do_cast=True)
|
||||
else:
|
||||
return self._call_to_cells(context, instance, 'stop',
|
||||
do_cast=False)
|
||||
return self.cells_rpcapi.stop_instance(context, instance,
|
||||
do_cast=do_cast)
|
||||
|
||||
@validate_cell
|
||||
def start(self, context, instance):
|
||||
"""Start an instance."""
|
||||
super(ComputeCellsAPI, self).start(context, instance)
|
||||
self._cast_to_cells(context, instance, 'start')
|
||||
self.cells_rpcapi.start_instance(context, instance)
|
||||
|
||||
@validate_cell
|
||||
def reboot(self, context, instance, *args, **kwargs):
|
||||
|
||||
@@ -602,3 +602,18 @@ class CellsManagerClassTestCase(test.TestCase):
|
||||
|
||||
response = self.cells_manager.get_migrations(self.ctxt, filters)
|
||||
self.assertEqual(migrations, response)
|
||||
|
||||
def test_start_instance(self):
|
||||
self.mox.StubOutWithMock(self.msg_runner, 'start_instance')
|
||||
self.msg_runner.start_instance(self.ctxt, 'fake-instance')
|
||||
self.mox.ReplayAll()
|
||||
self.cells_manager.start_instance(self.ctxt, instance='fake-instance')
|
||||
|
||||
def test_stop_instance(self):
|
||||
self.mox.StubOutWithMock(self.msg_runner, 'stop_instance')
|
||||
self.msg_runner.stop_instance(self.ctxt, 'fake-instance',
|
||||
do_cast='meow')
|
||||
self.mox.ReplayAll()
|
||||
self.cells_manager.stop_instance(self.ctxt,
|
||||
instance='fake-instance',
|
||||
do_cast='meow')
|
||||
|
||||
@@ -27,6 +27,7 @@ from nova.objects import base as objects_base
|
||||
from nova.objects import instance as instance_obj
|
||||
from nova.openstack.common import rpc
|
||||
from nova.openstack.common import timeutils
|
||||
from nova.openstack.common import uuidutils
|
||||
from nova import test
|
||||
from nova.tests.cells import fakes
|
||||
from nova.tests import fake_instance_actions
|
||||
@@ -1069,6 +1070,79 @@ class CellsTargetedMethodsTestCase(test.TestCase):
|
||||
|
||||
self.assertEqual(0, len(responses))
|
||||
|
||||
def test_call_compute_api_with_obj(self):
|
||||
instance = instance_obj.Instance()
|
||||
instance.uuid = uuidutils.generate_uuid()
|
||||
self.mox.StubOutWithMock(instance, 'refresh')
|
||||
# Using 'snapshot' for this test, because it
|
||||
# takes args and kwargs.
|
||||
self.mox.StubOutWithMock(self.tgt_compute_api, 'snapshot')
|
||||
instance.refresh(self.ctxt)
|
||||
self.tgt_compute_api.snapshot(
|
||||
self.ctxt, instance, 'name',
|
||||
extra_properties='props').AndReturn('foo')
|
||||
|
||||
self.mox.ReplayAll()
|
||||
result = self.tgt_methods_cls._call_compute_api_with_obj(
|
||||
self.ctxt, instance, 'snapshot', 'name',
|
||||
extra_properties='props')
|
||||
self.assertEqual('foo', result)
|
||||
|
||||
def test_call_compute_with_obj_unknown_instance(self):
|
||||
instance = instance_obj.Instance()
|
||||
instance.uuid = uuidutils.generate_uuid()
|
||||
instance.vm_state = vm_states.ACTIVE
|
||||
instance.task_state = None
|
||||
self.mox.StubOutWithMock(instance, 'refresh')
|
||||
self.mox.StubOutWithMock(self.tgt_msg_runner,
|
||||
'instance_destroy_at_top')
|
||||
|
||||
instance.refresh(self.ctxt).AndRaise(
|
||||
exception.InstanceNotFound(instance_id=instance.uuid))
|
||||
|
||||
self.tgt_msg_runner.instance_destroy_at_top(self.ctxt,
|
||||
{'uuid': instance.uuid})
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.assertRaises(exception.InstanceNotFound,
|
||||
self.tgt_methods_cls._call_compute_api_with_obj,
|
||||
self.ctxt, instance, 'snapshot', 'name')
|
||||
|
||||
def _test_instance_action_method(self, method, args, kwargs,
|
||||
expected_args, expected_kwargs,
|
||||
expect_result):
|
||||
class FakeMessage(object):
|
||||
pass
|
||||
|
||||
message = FakeMessage()
|
||||
message.ctxt = self.ctxt
|
||||
message.need_response = expect_result
|
||||
|
||||
meth_cls = self.tgt_methods_cls
|
||||
self.mox.StubOutWithMock(meth_cls, '_call_compute_api_with_obj')
|
||||
|
||||
meth_cls._call_compute_api_with_obj(
|
||||
self.ctxt, 'fake-instance', method,
|
||||
*expected_args, **expected_kwargs).AndReturn('meow')
|
||||
|
||||
self.mox.ReplayAll()
|
||||
|
||||
result = getattr(meth_cls, '%s_instance' % method)(
|
||||
message, 'fake-instance', *args, **kwargs)
|
||||
if expect_result:
|
||||
self.assertEqual('meow', result)
|
||||
|
||||
def test_start_instance(self):
|
||||
self._test_instance_action_method('start', (), {}, (), {}, False)
|
||||
|
||||
def test_stop_instance_cast(self):
|
||||
self._test_instance_action_method('stop', (), {}, (),
|
||||
{'do_cast': True}, False)
|
||||
|
||||
def test_stop_instance_call(self):
|
||||
self._test_instance_action_method('stop', (), {}, (),
|
||||
{'do_cast': False}, True)
|
||||
|
||||
|
||||
class CellsBroadcastMethodsTestCase(test.TestCase):
|
||||
"""Test case for _BroadcastMessageMethods class. Most of these
|
||||
|
||||
@@ -461,3 +461,36 @@ class CellsAPITestCase(test.TestCase):
|
||||
expected_args = {'filters': filters}
|
||||
self._check_result(call_info, 'get_migrations', expected_args,
|
||||
version="1.11")
|
||||
|
||||
def test_start_instance(self):
|
||||
call_info = self._stub_rpc_method('cast', None)
|
||||
|
||||
self.cells_rpcapi.start_instance(
|
||||
self.fake_context, 'fake-instance')
|
||||
|
||||
expected_args = {'instance': 'fake-instance'}
|
||||
self._check_result(call_info, 'start_instance',
|
||||
expected_args, version='1.12')
|
||||
|
||||
def test_stop_instance_cast(self):
|
||||
call_info = self._stub_rpc_method('cast', None)
|
||||
|
||||
self.cells_rpcapi.stop_instance(
|
||||
self.fake_context, 'fake-instance', do_cast=True)
|
||||
|
||||
expected_args = {'instance': 'fake-instance',
|
||||
'do_cast': True}
|
||||
self._check_result(call_info, 'stop_instance',
|
||||
expected_args, version='1.12')
|
||||
|
||||
def test_stop_instance_call(self):
|
||||
call_info = self._stub_rpc_method('call', 'fake_response')
|
||||
|
||||
result = self.cells_rpcapi.stop_instance(
|
||||
self.fake_context, 'fake-instance', do_cast=False)
|
||||
|
||||
expected_args = {'instance': 'fake-instance',
|
||||
'do_cast': False}
|
||||
self._check_result(call_info, 'stop_instance',
|
||||
expected_args, version='1.12')
|
||||
self.assertEqual(result, 'fake_response')
|
||||
|
||||
@@ -124,9 +124,10 @@ class _ComputeAPIUnitTestMixIn(object):
|
||||
self.context, instance)
|
||||
|
||||
if self.is_cells:
|
||||
self.mox.StubOutWithMock(self.compute_api, '_cast_to_cells')
|
||||
self.compute_api._cast_to_cells(
|
||||
self.context, instance, 'start')
|
||||
self.mox.StubOutWithMock(self.compute_api.cells_rpcapi,
|
||||
'start_instance')
|
||||
self.compute_api.cells_rpcapi.start_instance(
|
||||
self.context, instance)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
|
||||
@@ -167,9 +168,10 @@ class _ComputeAPIUnitTestMixIn(object):
|
||||
self.context, instance, cast=True)
|
||||
|
||||
if self.is_cells:
|
||||
self.mox.StubOutWithMock(self.compute_api, '_cast_to_cells')
|
||||
self.compute_api._cast_to_cells(
|
||||
self.context, instance, 'stop', do_cast=True)
|
||||
self.mox.StubOutWithMock(self.compute_api.cells_rpcapi,
|
||||
'stop_instance')
|
||||
self.compute_api.cells_rpcapi.stop_instance(
|
||||
self.context, instance, do_cast=True)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user