Fix race condition between evacuation and its confirmation

Masakari can face a race condition where after evacuation of an
instance to other host user might perform some actions on that
instance which gives wrong instance vm_state to ConfirmEvacuationTask
that results into notification failure.

To fix this issue this patch proposes to lock the instance before
evacuation till its confirmation so that any normal user will not
be able to perform any actions on it. To achieve this the
ConfirmEvacuationTask is completly removed and the confirmation is
done in the EvacuateInstancesTask itself by per instance.
Evacuating an instance and confirming it's evacuation immediately
can reduce the performance so this patch uses the
eventlet.greenpool.GreenPool which executes the complete evacuation
and confirmation of an instance in a separate thread.
To check if the server is already locked or not upgraded the
novaclient's NOVA_API_VERSION from 2.1 to 2.9  as the 'locked'
property is available in nova api_version 2.9 and above.

This patch introduces a new config option
'host_failure_recovery_threads' which will be the number of threads
to be used for evacuating and confirming the instances evacuation.
The default value for this config option is 3.

Closes-Bug: #1693728
Change-Id: Ib5145878633fd424bca5bcbd5cfed13d20362f94
This commit is contained in:
dineshbhor 2017-05-23 13:50:37 +05:30 committed by Dinesh Bhor
parent 7edf071253
commit 25d33d2cb1
7 changed files with 204 additions and 151 deletions

View File

@ -39,7 +39,7 @@ CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token')
LOG = logging.getLogger(__name__)
NOVA_API_VERSION = "2.1"
NOVA_API_VERSION = "2.9"
nova_extensions = [ext for ext in
nova_client.discover_extensions(NOVA_API_VERSION)
@ -221,3 +221,19 @@ class API(object):
"aggregate '%(aggregate_name)s'.")
LOG.info(msg, {'host_name': host, 'aggregate_name': aggregate.name})
return nova.aggregates.add_host(aggregate.id, host)
@translate_nova_exception
def lock_server(self, context, uuid):
"""Lock a server."""
nova = novaclient(context)
msg = ('Call lock server command for instance %(uuid)s')
LOG.info(msg, {'uuid': uuid})
return nova.servers.lock(uuid)
@translate_nova_exception
def unlock_server(self, context, uuid):
"""Unlock a server."""
nova = novaclient(context)
msg = ('Call unlock server command for instance %(uuid)s')
LOG.info(msg, {'uuid': uuid})
return nova.servers.unlock(uuid)

View File

@ -90,6 +90,12 @@ notification_opts = [
"generated_time, then it is considered that notification "
"is ignored by the messaging queue and will be processed "
"by 'process_unfinished_notifications' periodic task."),
cfg.IntOpt('host_failure_recovery_threads',
default=3,
min=1,
help="Number of threads to be used for evacuating and "
"confirming instances during execution of host_failure "
"workflow."),
]

View File

@ -14,6 +14,7 @@
# under the License.
import eventlet
from eventlet import greenpool
from eventlet import timeout as etimeout
from oslo_log import log as logging
@ -87,7 +88,6 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask):
class EvacuateInstancesTask(base.MasakariTask):
default_provides = set(["instance_list"])
def __init__(self, novaclient):
requires = ["host_name", "instance_list"]
@ -95,9 +95,77 @@ class EvacuateInstancesTask(base.MasakariTask):
requires=requires)
self.novaclient = novaclient
def _evacuate_and_confirm(self, context, instance, host_name,
failed_evacuation_instances, reserved_host=None):
vm_state = getattr(instance, "OS-EXT-STS:vm_state")
if vm_state in ['active', 'error', 'resized', 'stopped']:
# Before locking the instance check whether it is already locked
# by user, if yes don't lock the instance
instance_already_locked = self.novaclient.get_server(
context, instance.id).locked
if not instance_already_locked:
# lock the instance so that until evacuation and confirmation
# is not complete, user won't be able to perform any actions
# on the instance.
self.novaclient.lock_server(context, instance.id)
def _wait_for_evacuation():
new_instance = self.novaclient.get_server(context, instance.id)
instance_host = getattr(new_instance,
"OS-EXT-SRV-ATTR:hypervisor_hostname")
old_vm_state = getattr(instance, "OS-EXT-STS:vm_state")
new_vm_state = getattr(new_instance, "OS-EXT-STS:vm_state")
if instance_host != host_name:
if ((old_vm_state == 'error' and
new_vm_state == 'active') or
old_vm_state == new_vm_state):
raise loopingcall.LoopingCallDone()
try:
# Nova evacuates an instance only when vm_state is in active,
# stopped or error state. If an instance is in resized
# vm_state, masakari resets the instance state to *error* so
# that the instance can be evacuated.
if vm_state == 'resized':
self.novaclient.reset_instance_state(context, instance.id)
# evacuate the instance
self.novaclient.evacuate_instance(
context, instance.id,
target=reserved_host.name if reserved_host else None)
periodic_call = loopingcall.FixedIntervalLoopingCall(
_wait_for_evacuation)
try:
# add a timeout to the periodic call.
periodic_call.start(interval=CONF.verify_interval)
etimeout.with_timeout(
CONF.wait_period_after_evacuation,
periodic_call.wait)
except etimeout.Timeout:
# Instance is not evacuated in the expected time_limit.
failed_evacuation_instances.append(instance.id)
finally:
# stop the periodic call, in case of exceptions or
# Timeout.
periodic_call.stop()
except Exception:
# Exception is raised while resetting instance state or
# evacuating the instance itself.
failed_evacuation_instances.append(instance.id)
finally:
if not instance_already_locked:
# Unlock the server after evacuation and confirmation
self.novaclient.unlock_server(context, instance.id)
def execute(self, context, host_name, instance_list, reserved_host=None):
def _do_evacuate(context, host_name, instance_list,
reserved_host=None):
failed_evacuation_instances = []
if reserved_host:
if CONF.host_failure.add_reserved_host_to_aggregate:
# Assign reserved_host to an aggregate to which the failed
@ -128,21 +196,21 @@ class EvacuateInstancesTask(base.MasakariTask):
reserved_host.reserved = False
reserved_host.save()
thread_pool = greenpool.GreenPool(
CONF.host_failure_recovery_threads)
for instance in instance_list:
vm_state = getattr(instance, "OS-EXT-STS:vm_state")
if vm_state in ['active', 'error', 'resized', 'stopped']:
# Evacuate API only evacuates an instance in
# active, stop or error state. If an instance is in
# resized status, masakari resets the instance
# state to *error* to evacuate it.
if vm_state == 'resized':
self.novaclient.reset_instance_state(
context, instance.id)
thread_pool.spawn_n(self._evacuate_and_confirm, context,
instance, host_name,
failed_evacuation_instances, reserved_host)
thread_pool.waitall()
# evacuate the instance
self.novaclient.evacuate_instance(
context, instance.id,
target=reserved_host.name if reserved_host else None)
if failed_evacuation_instances:
msg = _("Failed to evacuate instances %(instances)s from "
"host %(host_name)s.") % {
'instances': failed_evacuation_instances,
'host_name': host_name
}
raise exception.HostRecoveryFailureException(message=msg)
lock_name = reserved_host.name if reserved_host else None
@ -160,57 +228,6 @@ class EvacuateInstancesTask(base.MasakariTask):
# 'auto' as the selection of compute host will be decided by nova.
_do_evacuate(context, host_name, instance_list)
return {
"instance_list": instance_list,
}
class ConfirmEvacuationTask(base.MasakariTask):
def __init__(self, novaclient):
requires = ["instance_list", "host_name"]
super(ConfirmEvacuationTask, self).__init__(addons=[ACTION],
requires=requires)
self.novaclient = novaclient
def execute(self, context, instance_list, host_name):
failed_evacuation_instances = []
for instance in instance_list:
def _wait_for_evacuation():
new_instance = self.novaclient.get_server(context, instance.id)
instance_host = getattr(new_instance,
"OS-EXT-SRV-ATTR:hypervisor_hostname")
old_vm_state = getattr(instance, "OS-EXT-STS:vm_state")
new_vm_state = getattr(new_instance,
"OS-EXT-STS:vm_state")
if instance_host != host_name:
if ((old_vm_state == 'error' and
new_vm_state == 'active') or
old_vm_state == new_vm_state):
raise loopingcall.LoopingCallDone()
periodic_call = loopingcall.FixedIntervalLoopingCall(
_wait_for_evacuation)
try:
# add a timeout to the periodic call.
periodic_call.start(interval=CONF.verify_interval)
etimeout.with_timeout(CONF.wait_period_after_evacuation,
periodic_call.wait)
except etimeout.Timeout:
# Instance is not evacuated in the expected time_limit.
failed_evacuation_instances.append(instance.id)
finally:
# stop the periodic call, in case of exceptions or Timeout.
periodic_call.stop()
if failed_evacuation_instances:
msg = _("Failed to evacuate instances %(instances)s from "
"host %(host_name)s.") % {
'instances': failed_evacuation_instances,
'host_name': host_name
}
raise exception.HostRecoveryFailureException(message=msg)
def get_auto_flow(novaclient, process_what):
"""Constructs and returns the engine entrypoint flow.
@ -228,8 +245,7 @@ def get_auto_flow(novaclient, process_what):
auto_evacuate_flow.add(DisableComputeServiceTask(novaclient),
PrepareHAEnabledInstancesTask(novaclient),
EvacuateInstancesTask(novaclient),
ConfirmEvacuationTask(novaclient))
EvacuateInstancesTask(novaclient))
return taskflow.engines.load(auto_evacuate_flow, store=process_what)
@ -252,8 +268,7 @@ def get_rh_flow(novaclient, process_what):
rebind=['reserved_host_list'], provides='reserved_host'))
rh_flow.add(PrepareHAEnabledInstancesTask(novaclient),
EvacuateInstancesTask(novaclient),
ConfirmEvacuationTask(novaclient))
EvacuateInstancesTask(novaclient))
nested_flow.add(DisableComputeServiceTask(novaclient), rh_flow)

View File

@ -261,3 +261,23 @@ class NovaApiTestCase(test.TestCase):
mock_novaclient.assert_called_once_with(self.ctx)
mock_aggregates.add_host.assert_called_once_with(
mock_aggregate.id, 'fake_host')
@mock.patch('masakari.compute.nova.novaclient')
def test_lock_server(self, mock_novaclient):
uuid = uuidsentinel.fake_server
mock_servers = mock.MagicMock()
mock_novaclient.return_value = mock.MagicMock(servers=mock_servers)
self.api.lock_server(self.ctx, uuid)
mock_novaclient.assert_called_once_with(self.ctx)
mock_servers.lock.assert_called_once_with(uuidsentinel.fake_server)
@mock.patch('masakari.compute.nova.novaclient')
def test_unlock_server(self, mock_novaclient):
uuid = uuidsentinel.fake_server
mock_servers = mock.MagicMock()
mock_novaclient.return_value = mock.MagicMock(servers=mock_servers)
self.api.unlock_server(self.ctx, uuid)
mock_novaclient.assert_called_once_with(self.ctx)
mock_servers.unlock.assert_called_once_with(uuidsentinel.fake_server)

View File

@ -32,6 +32,9 @@ from masakari.tests.unit import fakes
CONF = conf.CONF
@mock.patch.object(nova.API, "enable_disable_service")
@mock.patch.object(nova.API, "lock_server")
@mock.patch.object(nova.API, "unlock_server")
class HostFailureTestCase(test.TestCase):
def setUp(self):
@ -58,14 +61,11 @@ class HostFailureTestCase(test.TestCase):
getattr(instance,
'OS-EXT-SRV-ATTR:hypervisor_hostname'))
def _test_disable_compute_service(self):
def _test_disable_compute_service(self, mock_enable_disable):
task = host_failure.DisableComputeServiceTask(self.novaclient)
with mock.patch.object(
self.novaclient,
"enable_disable_service") as mock_enable_disable_service:
task.execute(self.ctxt, self.instance_host)
task.execute(self.ctxt, self.instance_host)
mock_enable_disable_service.assert_called_once_with(
mock_enable_disable.assert_called_once_with(
self.ctxt, self.instance_host)
def _test_instance_list(self):
@ -83,33 +83,26 @@ class HostFailureTestCase(test.TestCase):
return instance_list
def _evacuate_instances(self, instance_list, reserved_host=None):
def _evacuate_instances(self, instance_list, mock_enable_disable,
reserved_host=None):
task = host_failure.EvacuateInstancesTask(self.novaclient)
if reserved_host:
with mock.patch.object(
self.novaclient,
"enable_disable_service") as mock_enable_disable_service:
instance_list = task.execute(self.ctxt, self.instance_host,
instance_list['instance_list'],
reserved_host=reserved_host)
task.execute(self.ctxt, self.instance_host,
instance_list['instance_list'],
reserved_host=reserved_host)
mock_enable_disable_service.assert_called_once_with(
self.ctxt, reserved_host.name, enable=True)
self.assertTrue(mock_enable_disable.called)
else:
instance_list = task.execute(
task.execute(
self.ctxt, self.instance_host, instance_list['instance_list'])
return instance_list
def _test_confirm_evacuate_task(self, instance_list):
task = host_failure.ConfirmEvacuationTask(self.novaclient)
task.execute(self.ctxt, instance_list['instance_list'],
self.instance_host)
# make sure instance is active and has different host
self._verify_instance_evacuated()
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_for_auto_recovery(self, _mock_novaclient):
def test_host_failure_flow_for_auto_recovery(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
self.override_config("evacuate_all_instances",
True, "host_failure")
@ -120,20 +113,18 @@ class HostFailureTestCase(test.TestCase):
self.fake_client.servers.create(id="2", host=self.instance_host)
# execute DisableComputeServiceTask
self._test_disable_compute_service()
self._test_disable_compute_service(mock_enable_disable)
# execute PrepareHAEnabledInstancesTask
instance_list = self._test_instance_list()
# execute EvacuateInstancesTask
instance_list = self._evacuate_instances(instance_list)
# execute ConfirmEvacuationTask
self._test_confirm_evacuate_task(instance_list)
self._evacuate_instances(instance_list, mock_enable_disable)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_for_reserved_host_recovery(
self, _mock_novaclient):
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
self.override_config("evacuate_all_instances",
True, "host_failure")
@ -150,50 +141,44 @@ class HostFailureTestCase(test.TestCase):
hosts=[self.instance_host])
# execute DisableComputeServiceTask
self._test_disable_compute_service()
self._test_disable_compute_service(mock_enable_disable)
# execute PrepareHAEnabledInstancesTask
instance_list = self._test_instance_list()
# execute EvacuateInstancesTask
with mock.patch.object(host_obj.Host, "save") as mock_save:
instance_list = self._evacuate_instances(
instance_list, reserved_host=reserved_host)
self._evacuate_instances(
instance_list, mock_enable_disable,
reserved_host=reserved_host)
self.assertEqual(1, mock_save.call_count)
self.assertIn(reserved_host.name,
self.fake_client.aggregates.get('1').hosts)
# execute ConfirmEvacuationTask
self._test_confirm_evacuate_task(instance_list)
@mock.patch('masakari.compute.nova.novaclient')
def test_evacuate_instances_task(self, _mock_novaclient):
def test_evacuate_instances_task(self, _mock_novaclient, mock_unlock,
mock_lock, mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create test data
self.fake_client.servers.create(id="1", host=self.instance_host,
ha_enabled=True)
vm_state="error", ha_enabled=True)
self.fake_client.servers.create(id="2", host=self.instance_host,
ha_enabled=True)
vm_state="error", ha_enabled=True)
# execute DisableComputeServiceTask
self._test_disable_compute_service()
self._test_disable_compute_service(mock_enable_disable)
# execute PrepareHAEnabledInstancesTask
instance_list = self._test_instance_list()
# execute EvacuateInstancesTask
task = host_failure.EvacuateInstancesTask(self.novaclient)
# mock evacuate method of FakeNovaClient to confirm that evacuate
# method is called.
with mock.patch.object(fakes.FakeNovaClient.ServerManager,
"evacuate") as mock_evacuate:
task.execute(self.ctxt, self.instance_host,
instance_list['instance_list'])
self.assertEqual(2, mock_evacuate.call_count)
self._evacuate_instances(instance_list, mock_enable_disable)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_no_ha_enabled_instances(self, _mock_novaclient):
def test_host_failure_flow_no_ha_enabled_instances(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create test data
@ -201,7 +186,7 @@ class HostFailureTestCase(test.TestCase):
self.fake_client.servers.create(id="2", host=self.instance_host)
# execute DisableComputeServiceTask
self._test_disable_compute_service()
self._test_disable_compute_service(mock_enable_disable)
# execute PrepareHAEnabledInstancesTask
task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient)
@ -209,21 +194,19 @@ class HostFailureTestCase(test.TestCase):
self.ctxt, self.instance_host)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_evacuation_failed(self, _mock_novaclient):
def test_host_failure_flow_evacuation_failed(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create ha_enabled test data
server = self.fake_client.servers.create(id="1",
server = self.fake_client.servers.create(id="1", vm_state='active',
host=self.instance_host,
ha_enabled=True)
instance_list = {
"instance_list": self.fake_client.servers.list()
}
# execute EvacuateInstancesTask
instance_list = self._evacuate_instances(
instance_list)
def fake_get_server(context, host):
# assume that while evacuating instance goes into error state
fake_server = copy.deepcopy(server)
@ -231,15 +214,15 @@ class HostFailureTestCase(test.TestCase):
return fake_server
with mock.patch.object(self.novaclient, "get_server", fake_get_server):
# execute ConfirmEvacuationTask
task = host_failure.ConfirmEvacuationTask(self.novaclient)
# execute EvacuateInstancesTask
self.assertRaises(
exception.HostRecoveryFailureException, task.execute,
self.ctxt, instance_list['instance_list'],
self.instance_host)
exception.HostRecoveryFailureException,
self._evacuate_instances, instance_list, mock_enable_disable)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_resized_instance(self, _mock_novaclient):
def test_host_failure_flow_resized_instance(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create ha_enabled test data
@ -254,14 +237,12 @@ class HostFailureTestCase(test.TestCase):
}
# execute EvacuateInstancesTask
instance_list = self._evacuate_instances(
instance_list)
# execute ConfirmEvacuationTask
self._test_confirm_evacuate_task(instance_list)
self._evacuate_instances(instance_list, mock_enable_disable)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_shutdown_instance(self, _mock_novaclient):
def test_host_failure_flow_shutdown_instance(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create ha_enabled test data
@ -276,14 +257,12 @@ class HostFailureTestCase(test.TestCase):
}
# execute EvacuateInstancesTask
instance_list = self._evacuate_instances(
instance_list)
# execute ConfirmEvacuationTask
self._test_confirm_evacuate_task(instance_list)
self._evacuate_instances(instance_list, mock_enable_disable)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_instance_in_error(self, _mock_novaclient):
def test_host_failure_flow_instance_in_error(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create ha_enabled test data
@ -298,20 +277,18 @@ class HostFailureTestCase(test.TestCase):
}
# execute EvacuateInstancesTask
instance_list = self._evacuate_instances(
instance_list)
# execute ConfirmEvacuationTask
self._test_confirm_evacuate_task(instance_list)
self._evacuate_instances(instance_list, mock_enable_disable)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_no_instances_on_host(self, _mock_novaclient):
def test_host_failure_flow_no_instances_on_host(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
self.override_config("evacuate_all_instances",
True, "host_failure")
# execute DisableComputeServiceTask
self._test_disable_compute_service()
self._test_disable_compute_service(mock_enable_disable)
# execute PrepareHAEnabledInstancesTask
task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient)

View File

@ -24,13 +24,14 @@ NOW = timeutils.utcnow().replace(microsecond=0)
class FakeNovaClient(object):
class Server(object):
def __init__(self, id=None, uuid=None, host=None, vm_state=None,
ha_enabled=None):
ha_enabled=None, locked=False):
self.id = id
self.uuid = uuid or uuidutils.generate_uuid()
self.host = host
setattr(self, 'OS-EXT-SRV-ATTR:hypervisor_hostname', host)
setattr(self, 'OS-EXT-STS:vm_state', vm_state)
self.metadata = {"HA_Enabled": ha_enabled}
self.locked = locked
class ServerManager(object):
def __init__(self):

View File

@ -0,0 +1,18 @@
---
fixes:
- |
Fixes `bug 1693728`_ which will fix the race condition where after
evacuation of an instance to other host user might perform some actions on
that instance which gives wrong instance vm_state to ConfirmEvacuationTask
that results into notification failure.
To fix this issue, following config option is added under ``DEFAULT``
section in 'masakari.conf' file::
[DEFAULT]
host_failure_recovery_threads = 3
This config option decides the number of threads going to be used for
evacuating the instances.
.. _`bug 1693728`: https://bugs.launchpad.net/masakari/+bug/1693728