Add host_failure workflow for 'reserved_host' recovery_method
Added workflow for evacuating instances from failed_host using 'reserved_host' recovery method. Partially-Implements: blueprint implement-recovery-methods Change-Id: I6dc2b35cf40c4506beec866c0a660179b19cd3ca
This commit is contained in:
@@ -62,9 +62,10 @@ notification_opts = [
|
|||||||
"the notification will be considered as duplicate and "
|
"the notification will be considered as duplicate and "
|
||||||
"it will be ignored."
|
"it will be ignored."
|
||||||
),
|
),
|
||||||
cfg.IntOpt('wait_period_after_service_disabled',
|
cfg.IntOpt('wait_period_after_service_update',
|
||||||
default=180,
|
default=180,
|
||||||
help='Wait until service is disabled'),
|
help='Number of seconds to wait after a service is enabled '
|
||||||
|
'or disabled.'),
|
||||||
cfg.IntOpt('wait_period_after_evacuation',
|
cfg.IntOpt('wait_period_after_evacuation',
|
||||||
default=90,
|
default=90,
|
||||||
help='Wait until instance is evacuated'),
|
help='Wait until instance is evacuated'),
|
||||||
|
@@ -40,7 +40,7 @@ LOG = logging.getLogger(__name__)
|
|||||||
class NotificationDriver(object):
|
class NotificationDriver(object):
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def execute_host_failure(self, context, host_name, recovery_method,
|
def execute_host_failure(self, context, host_name, recovery_method,
|
||||||
notification_uuid):
|
notification_uuid, reserved_host_list=None):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
|
@@ -53,7 +53,8 @@ class MasakariTask(task.Task):
|
|||||||
class SpecialFormatter(formatters.FailureFormatter):
|
class SpecialFormatter(formatters.FailureFormatter):
|
||||||
|
|
||||||
# Exception is an excepted case, don't include traceback in log if fails.
|
# Exception is an excepted case, don't include traceback in log if fails.
|
||||||
_NO_TRACE_EXCEPTIONS = (exception.SkipInstanceRecoveryException,)
|
_NO_TRACE_EXCEPTIONS = (exception.SkipInstanceRecoveryException,
|
||||||
|
exception.SkipHostRecoveryException)
|
||||||
|
|
||||||
def __init__(self, engine):
|
def __init__(self, engine):
|
||||||
super(SpecialFormatter, self).__init__(engine)
|
super(SpecialFormatter, self).__init__(engine)
|
||||||
|
@@ -40,7 +40,7 @@ class TaskFlowDriver(driver.NotificationDriver):
|
|||||||
super(TaskFlowDriver, self).__init__()
|
super(TaskFlowDriver, self).__init__()
|
||||||
|
|
||||||
def execute_host_failure(self, context, host_name, recovery_method,
|
def execute_host_failure(self, context, host_name, recovery_method,
|
||||||
notification_uuid):
|
notification_uuid, reserved_host_list=None):
|
||||||
novaclient = nova.API()
|
novaclient = nova.API()
|
||||||
# get flow for host failure
|
# get flow for host failure
|
||||||
process_what = {
|
process_what = {
|
||||||
@@ -54,9 +54,14 @@ class TaskFlowDriver(driver.NotificationDriver):
|
|||||||
process_what)
|
process_what)
|
||||||
elif recovery_method == (
|
elif recovery_method == (
|
||||||
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST):
|
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST):
|
||||||
raise NotImplementedError(_("Flow not implemented for "
|
if not reserved_host_list:
|
||||||
"recovery_method"),
|
msg = _('No reserved_hosts available for evacuation.')
|
||||||
recovery_method)
|
LOG.info(msg)
|
||||||
|
raise exception.ReservedHostsUnavailable(message=msg)
|
||||||
|
|
||||||
|
process_what['reserved_host_list'] = reserved_host_list
|
||||||
|
flow_engine = host_failure.get_rh_flow(
|
||||||
|
novaclient, process_what)
|
||||||
elif recovery_method == (
|
elif recovery_method == (
|
||||||
fields.FailoverSegmentRecoveryMethod.AUTO_PRIORITY):
|
fields.FailoverSegmentRecoveryMethod.AUTO_PRIORITY):
|
||||||
raise NotImplementedError(_("Flow not implemented for "
|
raise NotImplementedError(_("Flow not implemented for "
|
||||||
@@ -77,7 +82,10 @@ class TaskFlowDriver(driver.NotificationDriver):
|
|||||||
# taskflow sends out and redirect them to a more useful log for
|
# taskflow sends out and redirect them to a more useful log for
|
||||||
# masakari's debugging (or error reporting) usage.
|
# masakari's debugging (or error reporting) usage.
|
||||||
with base.DynamicLogListener(flow_engine, logger=LOG):
|
with base.DynamicLogListener(flow_engine, logger=LOG):
|
||||||
flow_engine.run()
|
try:
|
||||||
|
flow_engine.run()
|
||||||
|
except exception.LockAlreadyAcquired as ex:
|
||||||
|
raise exception.HostRecoveryFailureException(ex.message)
|
||||||
|
|
||||||
def execute_instance_failure(self, context, instance_uuid,
|
def execute_instance_failure(self, context, instance_uuid,
|
||||||
notification_uuid):
|
notification_uuid):
|
||||||
|
@@ -21,11 +21,13 @@ from oslo_service import loopingcall
|
|||||||
from oslo_utils import strutils
|
from oslo_utils import strutils
|
||||||
import taskflow.engines
|
import taskflow.engines
|
||||||
from taskflow.patterns import linear_flow
|
from taskflow.patterns import linear_flow
|
||||||
|
from taskflow import retry
|
||||||
|
|
||||||
import masakari.conf
|
import masakari.conf
|
||||||
from masakari.engine.drivers.taskflow import base
|
from masakari.engine.drivers.taskflow import base
|
||||||
from masakari import exception
|
from masakari import exception
|
||||||
from masakari.i18n import _, _LI
|
from masakari.i18n import _, _LI
|
||||||
|
from masakari import utils
|
||||||
|
|
||||||
|
|
||||||
CONF = masakari.conf.CONF
|
CONF = masakari.conf.CONF
|
||||||
@@ -48,8 +50,8 @@ class DisableComputeServiceTask(base.MasakariTask):
|
|||||||
# Sleep until nova-compute service is marked as disabled.
|
# Sleep until nova-compute service is marked as disabled.
|
||||||
msg = _LI("Sleeping %(wait)s sec before starting recovery "
|
msg = _LI("Sleeping %(wait)s sec before starting recovery "
|
||||||
"thread until nova recognizes the node down.")
|
"thread until nova recognizes the node down.")
|
||||||
LOG.info(msg, {'wait': CONF.wait_period_after_service_disabled})
|
LOG.info(msg, {'wait': CONF.wait_period_after_service_update})
|
||||||
eventlet.sleep(CONF.wait_period_after_service_disabled)
|
eventlet.sleep(CONF.wait_period_after_service_update)
|
||||||
|
|
||||||
|
|
||||||
class PrepareHAEnabledInstancesTask(base.MasakariTask):
|
class PrepareHAEnabledInstancesTask(base.MasakariTask):
|
||||||
@@ -74,34 +76,72 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask):
|
|||||||
[instance for instance in instance_list if
|
[instance for instance in instance_list if
|
||||||
strutils.bool_from_string(instance.metadata.get('HA_Enabled',
|
strutils.bool_from_string(instance.metadata.get('HA_Enabled',
|
||||||
False))])
|
False))])
|
||||||
|
if not instance_list:
|
||||||
|
msg = _('No instances to evacuate on host: %s.') % host_name
|
||||||
|
LOG.info(msg)
|
||||||
|
raise exception.SkipHostRecoveryException(message=msg)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"instance_list": instance_list,
|
"instance_list": instance_list,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class AutoEvacuationInstancesTask(base.MasakariTask):
|
class EvacuateInstancesTask(base.MasakariTask):
|
||||||
default_provides = set(["instance_list"])
|
default_provides = set(["instance_list"])
|
||||||
|
|
||||||
def __init__(self, novaclient):
|
def __init__(self, novaclient):
|
||||||
requires = ["instance_list"]
|
requires = ["instance_list"]
|
||||||
super(AutoEvacuationInstancesTask, self).__init__(addons=[ACTION],
|
super(EvacuateInstancesTask, self).__init__(addons=[ACTION],
|
||||||
requires=requires)
|
requires=requires)
|
||||||
self.novaclient = novaclient
|
self.novaclient = novaclient
|
||||||
|
|
||||||
def execute(self, context, instance_list):
|
def execute(self, context, instance_list, reserved_host=None):
|
||||||
for instance in instance_list:
|
def _do_evacuate(context, instance_list, reserved_host=None):
|
||||||
vm_state = getattr(instance, "OS-EXT-STS:vm_state")
|
if reserved_host:
|
||||||
if vm_state in ['active', 'error', 'resized', 'stopped']:
|
self.novaclient.enable_disable_service(
|
||||||
# Evacuate API only evacuates an instance in
|
context, reserved_host.name, enable=True)
|
||||||
# active, stop or error state. If an instance is in
|
|
||||||
# resized status, masakari resets the instance
|
# Sleep until nova-compute service is marked as enabled.
|
||||||
# state to *error* to evacuate it.
|
msg = _LI("Sleeping %(wait)s sec before starting recovery "
|
||||||
if vm_state == 'resized':
|
"thread until nova recognizes the node up.")
|
||||||
self.novaclient.reset_instance_state(
|
LOG.info(msg, {
|
||||||
context, instance.id)
|
'wait': CONF.wait_period_after_service_update})
|
||||||
# evacuate the instances to new host
|
eventlet.sleep(CONF.wait_period_after_service_update)
|
||||||
self.novaclient.evacuate_instance(context, instance.id)
|
|
||||||
|
# Set reserved property of reserved_host to False
|
||||||
|
reserved_host.reserved = False
|
||||||
|
reserved_host.save()
|
||||||
|
|
||||||
|
for instance in instance_list:
|
||||||
|
vm_state = getattr(instance, "OS-EXT-STS:vm_state")
|
||||||
|
if vm_state in ['active', 'error', 'resized', 'stopped']:
|
||||||
|
# Evacuate API only evacuates an instance in
|
||||||
|
# active, stop or error state. If an instance is in
|
||||||
|
# resized status, masakari resets the instance
|
||||||
|
# state to *error* to evacuate it.
|
||||||
|
if vm_state == 'resized':
|
||||||
|
self.novaclient.reset_instance_state(
|
||||||
|
context, instance.id)
|
||||||
|
|
||||||
|
# evacuate the instance
|
||||||
|
self.novaclient.evacuate_instance(
|
||||||
|
context, instance.id,
|
||||||
|
target=reserved_host.name if reserved_host else None)
|
||||||
|
|
||||||
|
lock_name = reserved_host.name if reserved_host else None
|
||||||
|
|
||||||
|
@utils.synchronized(lock_name)
|
||||||
|
def do_evacuate_with_reserved_host(context, instance_list,
|
||||||
|
reserved_host):
|
||||||
|
_do_evacuate(context, instance_list, reserved_host=reserved_host)
|
||||||
|
|
||||||
|
if lock_name:
|
||||||
|
do_evacuate_with_reserved_host(context, instance_list,
|
||||||
|
reserved_host)
|
||||||
|
else:
|
||||||
|
# No need to acquire lock on reserved_host when recovery_method is
|
||||||
|
# 'auto' as the selection of compute host will be decided by nova.
|
||||||
|
_do_evacuate(context, instance_list)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"instance_list": instance_list,
|
"instance_list": instance_list,
|
||||||
@@ -112,7 +152,7 @@ class ConfirmEvacuationTask(base.MasakariTask):
|
|||||||
def __init__(self, novaclient):
|
def __init__(self, novaclient):
|
||||||
requires = ["instance_list", "host_name"]
|
requires = ["instance_list", "host_name"]
|
||||||
super(ConfirmEvacuationTask, self).__init__(addons=[ACTION],
|
super(ConfirmEvacuationTask, self).__init__(addons=[ACTION],
|
||||||
requires=requires)
|
requires=requires)
|
||||||
self.novaclient = novaclient
|
self.novaclient = novaclient
|
||||||
|
|
||||||
def execute(self, context, instance_list, host_name):
|
def execute(self, context, instance_list, host_name):
|
||||||
@@ -152,7 +192,7 @@ class ConfirmEvacuationTask(base.MasakariTask):
|
|||||||
'instances': failed_evacuation_instances,
|
'instances': failed_evacuation_instances,
|
||||||
'host_name': host_name
|
'host_name': host_name
|
||||||
}
|
}
|
||||||
raise exception.AutoRecoveryFailureException(message=msg)
|
raise exception.HostRecoveryFailureException(message=msg)
|
||||||
|
|
||||||
|
|
||||||
def get_auto_flow(novaclient, process_what):
|
def get_auto_flow(novaclient, process_what):
|
||||||
@@ -171,7 +211,33 @@ def get_auto_flow(novaclient, process_what):
|
|||||||
|
|
||||||
auto_evacuate_flow.add(DisableComputeServiceTask(novaclient),
|
auto_evacuate_flow.add(DisableComputeServiceTask(novaclient),
|
||||||
PrepareHAEnabledInstancesTask(novaclient),
|
PrepareHAEnabledInstancesTask(novaclient),
|
||||||
AutoEvacuationInstancesTask(novaclient),
|
EvacuateInstancesTask(novaclient),
|
||||||
ConfirmEvacuationTask(novaclient))
|
ConfirmEvacuationTask(novaclient))
|
||||||
|
|
||||||
return taskflow.engines.load(auto_evacuate_flow, store=process_what)
|
return taskflow.engines.load(auto_evacuate_flow, store=process_what)
|
||||||
|
|
||||||
|
|
||||||
|
def get_rh_flow(novaclient, process_what):
|
||||||
|
"""Constructs and returns the engine entrypoint flow.
|
||||||
|
|
||||||
|
This flow will do the following:
|
||||||
|
|
||||||
|
1. Disable compute service on source host
|
||||||
|
2. Get all HA_Enabled instances.
|
||||||
|
3. Evacuate all the HA_Enabled instances using reserved_host.
|
||||||
|
4. Confirm evacuation of instances.
|
||||||
|
"""
|
||||||
|
flow_name = ACTION.replace(":", "_") + "_engine"
|
||||||
|
nested_flow = linear_flow.Flow(flow_name)
|
||||||
|
|
||||||
|
rh_flow = linear_flow.Flow(
|
||||||
|
"retry_%s" % flow_name, retry=retry.ParameterizedForEach(
|
||||||
|
rebind=['reserved_host_list'], provides='reserved_host'))
|
||||||
|
|
||||||
|
rh_flow.add(PrepareHAEnabledInstancesTask(novaclient),
|
||||||
|
EvacuateInstancesTask(novaclient),
|
||||||
|
ConfirmEvacuationTask(novaclient))
|
||||||
|
|
||||||
|
nested_flow.add(DisableComputeServiceTask(novaclient), rh_flow)
|
||||||
|
|
||||||
|
return taskflow.engines.load(nested_flow, store=process_what)
|
||||||
|
@@ -70,8 +70,9 @@ class ConfirmComputeNodeDisabledTask(base.MasakariTask):
|
|||||||
try:
|
try:
|
||||||
# add a timeout to the periodic call.
|
# add a timeout to the periodic call.
|
||||||
periodic_call.start(interval=CONF.verify_interval)
|
periodic_call.start(interval=CONF.verify_interval)
|
||||||
etimeout.with_timeout(CONF.wait_period_after_service_disabled,
|
etimeout.with_timeout(
|
||||||
periodic_call.wait)
|
CONF.wait_period_after_service_update,
|
||||||
|
periodic_call.wait)
|
||||||
except etimeout.Timeout:
|
except etimeout.Timeout:
|
||||||
msg = _("Failed to disable service %(process_name)s") % {
|
msg = _("Failed to disable service %(process_name)s") % {
|
||||||
'process_name': process_name
|
'process_name': process_name
|
||||||
|
@@ -145,12 +145,24 @@ class MasakariManager(manager.Manager):
|
|||||||
host_obj.update(update_data)
|
host_obj.update(update_data)
|
||||||
host_obj.save()
|
host_obj.save()
|
||||||
|
|
||||||
|
reserved_host_list = None
|
||||||
|
if not recovery_method == (
|
||||||
|
fields.FailoverSegmentRecoveryMethod.AUTO):
|
||||||
|
reserved_host_list = objects.HostList.get_all(
|
||||||
|
context, filters={
|
||||||
|
'failover_segment_id': host_obj.failover_segment_id,
|
||||||
|
'reserved': True})
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.driver.execute_host_failure(
|
self.driver.execute_host_failure(
|
||||||
context, host_name, recovery_method,
|
context, host_name, recovery_method,
|
||||||
notification.notification_uuid)
|
notification.notification_uuid,
|
||||||
except (exception.MasakariException,
|
reserved_host_list=reserved_host_list)
|
||||||
exception.AutoRecoveryFailureException):
|
except exception.SkipHostRecoveryException:
|
||||||
|
notification_status = fields.NotificationStatus.FINISHED
|
||||||
|
except (exception.HostRecoveryFailureException,
|
||||||
|
exception.ReservedHostsUnavailable,
|
||||||
|
exception.MasakariException):
|
||||||
notification_status = fields.NotificationStatus.ERROR
|
notification_status = fields.NotificationStatus.ERROR
|
||||||
else:
|
else:
|
||||||
LOG.warning(_LW("Invalid event: %(event)s received for "
|
LOG.warning(_LW("Invalid event: %(event)s received for "
|
||||||
@@ -163,7 +175,7 @@ class MasakariManager(manager.Manager):
|
|||||||
return notification_status
|
return notification_status
|
||||||
|
|
||||||
def _process_notification(self, context, notification):
|
def _process_notification(self, context, notification):
|
||||||
@utils.synchronized(notification.source_host_uuid)
|
@utils.synchronized(notification.source_host_uuid, blocking=True)
|
||||||
def do_process_notification(notification):
|
def do_process_notification(notification):
|
||||||
LOG.info(_LI('Processing notification %(notification_uuid)s of '
|
LOG.info(_LI('Processing notification %(notification_uuid)s of '
|
||||||
'type: %(type)s'), {
|
'type: %(type)s'), {
|
||||||
|
@@ -307,8 +307,8 @@ class HostOnMaintenanceError(Invalid):
|
|||||||
code = http.CONFLICT
|
code = http.CONFLICT
|
||||||
|
|
||||||
|
|
||||||
class AutoRecoveryFailureException(MasakariException):
|
class HostRecoveryFailureException(MasakariException):
|
||||||
msg_fmt = _('Failed to execute auto recovery method.')
|
msg_fmt = _('Failed to execute host recovery.')
|
||||||
|
|
||||||
|
|
||||||
class InstanceRecoveryFailureException(MasakariException):
|
class InstanceRecoveryFailureException(MasakariException):
|
||||||
@@ -323,6 +323,10 @@ class SkipProcessRecoveryException(MasakariException):
|
|||||||
msg_fmt = _('Skipping execution of process recovery workflow.')
|
msg_fmt = _('Skipping execution of process recovery workflow.')
|
||||||
|
|
||||||
|
|
||||||
|
class SkipHostRecoveryException(MasakariException):
|
||||||
|
msg_fmt = _('Skipping execution of host recovery workflow.')
|
||||||
|
|
||||||
|
|
||||||
class ProcessRecoveryFailureException(MasakariException):
|
class ProcessRecoveryFailureException(MasakariException):
|
||||||
msg_fmt = _('Failed to execute process recovery workflow.')
|
msg_fmt = _('Failed to execute process recovery workflow.')
|
||||||
|
|
||||||
@@ -340,3 +344,11 @@ class FailoverSegmentInUse(Conflict):
|
|||||||
class HostInUse(Conflict):
|
class HostInUse(Conflict):
|
||||||
msg_fmt = _("Host %(uuid)s can't be updated as it is in-use to process "
|
msg_fmt = _("Host %(uuid)s can't be updated as it is in-use to process "
|
||||||
"notifications.")
|
"notifications.")
|
||||||
|
|
||||||
|
|
||||||
|
class ReservedHostsUnavailable(MasakariException):
|
||||||
|
msg_fmt = _('No reserved_hosts available for evacuation.')
|
||||||
|
|
||||||
|
|
||||||
|
class LockAlreadyAcquired(MasakariException):
|
||||||
|
msg_fmt = _('Lock is already acquired on %(resource)s.')
|
||||||
|
@@ -25,6 +25,7 @@ from masakari import conf
|
|||||||
from masakari import context
|
from masakari import context
|
||||||
from masakari.engine.drivers.taskflow import host_failure
|
from masakari.engine.drivers.taskflow import host_failure
|
||||||
from masakari import exception
|
from masakari import exception
|
||||||
|
from masakari.objects import host as host_obj
|
||||||
from masakari import test
|
from masakari import test
|
||||||
from masakari.tests.unit import fakes
|
from masakari.tests.unit import fakes
|
||||||
|
|
||||||
@@ -37,10 +38,10 @@ class HostFailureTestCase(test.TestCase):
|
|||||||
super(HostFailureTestCase, self).setUp()
|
super(HostFailureTestCase, self).setUp()
|
||||||
self.ctxt = context.get_admin_context()
|
self.ctxt = context.get_admin_context()
|
||||||
# overriding 'wait_period_after_evacuation' and
|
# overriding 'wait_period_after_evacuation' and
|
||||||
# 'wait_period_after_service_disabled' to 2 seconds to
|
# 'wait_period_after_service_update' to 2 seconds to
|
||||||
# reduce the wait period.
|
# reduce the wait period.
|
||||||
self.override_config("wait_period_after_evacuation", 2)
|
self.override_config("wait_period_after_evacuation", 2)
|
||||||
self.override_config("wait_period_after_service_disabled", 2)
|
self.override_config("wait_period_after_service_update", 2)
|
||||||
self.override_config("evacuate_all_instances",
|
self.override_config("evacuate_all_instances",
|
||||||
False, "host_failure")
|
False, "host_failure")
|
||||||
self.instance_host = "fake-host"
|
self.instance_host = "fake-host"
|
||||||
@@ -59,11 +60,13 @@ class HostFailureTestCase(test.TestCase):
|
|||||||
|
|
||||||
def _test_disable_compute_service(self):
|
def _test_disable_compute_service(self):
|
||||||
task = host_failure.DisableComputeServiceTask(self.novaclient)
|
task = host_failure.DisableComputeServiceTask(self.novaclient)
|
||||||
with mock.patch.object(fakes.FakeNovaClient.Services,
|
with mock.patch.object(
|
||||||
"disable") as mock_disable:
|
self.novaclient,
|
||||||
|
"enable_disable_service") as mock_enable_disable_service:
|
||||||
task.execute(self.ctxt, self.instance_host)
|
task.execute(self.ctxt, self.instance_host)
|
||||||
mock_disable.assert_called_once_with(self.instance_host,
|
|
||||||
"nova-compute")
|
mock_enable_disable_service.assert_called_once_with(
|
||||||
|
self.ctxt, self.instance_host)
|
||||||
|
|
||||||
def _test_instance_list(self):
|
def _test_instance_list(self):
|
||||||
task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient)
|
task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient)
|
||||||
@@ -80,10 +83,21 @@ class HostFailureTestCase(test.TestCase):
|
|||||||
|
|
||||||
return instance_list
|
return instance_list
|
||||||
|
|
||||||
def _auto_evacuate_instances(self, instance_list):
|
def _evacuate_instances(self, instance_list, reserved_host=None):
|
||||||
task = host_failure.AutoEvacuationInstancesTask(self.novaclient)
|
task = host_failure.EvacuateInstancesTask(self.novaclient)
|
||||||
instance_list = task.execute(
|
if reserved_host:
|
||||||
self.ctxt, instance_list['instance_list'])
|
with mock.patch.object(
|
||||||
|
self.novaclient,
|
||||||
|
"enable_disable_service") as mock_enable_disable_service:
|
||||||
|
instance_list = task.execute(self.ctxt,
|
||||||
|
instance_list['instance_list'],
|
||||||
|
reserved_host=reserved_host)
|
||||||
|
|
||||||
|
mock_enable_disable_service.assert_called_once_with(
|
||||||
|
self.ctxt, reserved_host.name, enable=True)
|
||||||
|
else:
|
||||||
|
instance_list = task.execute(
|
||||||
|
self.ctxt, instance_list['instance_list'])
|
||||||
|
|
||||||
return instance_list
|
return instance_list
|
||||||
|
|
||||||
@@ -95,7 +109,7 @@ class HostFailureTestCase(test.TestCase):
|
|||||||
self._verify_instance_evacuated()
|
self._verify_instance_evacuated()
|
||||||
|
|
||||||
@mock.patch('masakari.compute.nova.novaclient')
|
@mock.patch('masakari.compute.nova.novaclient')
|
||||||
def test_host_failure_flow(self, _mock_novaclient):
|
def test_host_failure_flow_for_auto_recovery(self, _mock_novaclient):
|
||||||
_mock_novaclient.return_value = self.fake_client
|
_mock_novaclient.return_value = self.fake_client
|
||||||
self.override_config("evacuate_all_instances",
|
self.override_config("evacuate_all_instances",
|
||||||
True, "host_failure")
|
True, "host_failure")
|
||||||
@@ -111,15 +125,43 @@ class HostFailureTestCase(test.TestCase):
|
|||||||
# execute PrepareHAEnabledInstancesTask
|
# execute PrepareHAEnabledInstancesTask
|
||||||
instance_list = self._test_instance_list()
|
instance_list = self._test_instance_list()
|
||||||
|
|
||||||
# execute AutoEvacuationInstancesTask
|
# execute EvacuateInstancesTask
|
||||||
instance_list = self._auto_evacuate_instances(
|
instance_list = self._evacuate_instances(instance_list)
|
||||||
instance_list)
|
|
||||||
|
|
||||||
# execute ConfirmEvacuationTask
|
# execute ConfirmEvacuationTask
|
||||||
self._test_confirm_evacuate_task(instance_list)
|
self._test_confirm_evacuate_task(instance_list)
|
||||||
|
|
||||||
@mock.patch('masakari.compute.nova.novaclient')
|
@mock.patch('masakari.compute.nova.novaclient')
|
||||||
def test_auto_evacuate_instances_task(self, _mock_novaclient):
|
def test_host_failure_flow_for_reserved_host_recovery(
|
||||||
|
self, _mock_novaclient):
|
||||||
|
_mock_novaclient.return_value = self.fake_client
|
||||||
|
self.override_config("evacuate_all_instances",
|
||||||
|
True, "host_failure")
|
||||||
|
|
||||||
|
# create test data
|
||||||
|
self.fake_client.servers.create(id="1", host=self.instance_host,
|
||||||
|
ha_enabled=True)
|
||||||
|
self.fake_client.servers.create(id="2", host=self.instance_host)
|
||||||
|
reserved_host = fakes.create_fake_host(name="fake-reserved-host",
|
||||||
|
reserved=True)
|
||||||
|
|
||||||
|
# execute DisableComputeServiceTask
|
||||||
|
self._test_disable_compute_service()
|
||||||
|
|
||||||
|
# execute PrepareHAEnabledInstancesTask
|
||||||
|
instance_list = self._test_instance_list()
|
||||||
|
|
||||||
|
# execute EvacuateInstancesTask
|
||||||
|
with mock.patch.object(host_obj.Host, "save") as mock_save:
|
||||||
|
instance_list = self._evacuate_instances(
|
||||||
|
instance_list, reserved_host=reserved_host)
|
||||||
|
self.assertEqual(1, mock_save.call_count)
|
||||||
|
|
||||||
|
# execute ConfirmEvacuationTask
|
||||||
|
self._test_confirm_evacuate_task(instance_list)
|
||||||
|
|
||||||
|
@mock.patch('masakari.compute.nova.novaclient')
|
||||||
|
def test_evacuate_instances_task(self, _mock_novaclient):
|
||||||
_mock_novaclient.return_value = self.fake_client
|
_mock_novaclient.return_value = self.fake_client
|
||||||
|
|
||||||
# create test data
|
# create test data
|
||||||
@@ -134,8 +176,8 @@ class HostFailureTestCase(test.TestCase):
|
|||||||
# execute PrepareHAEnabledInstancesTask
|
# execute PrepareHAEnabledInstancesTask
|
||||||
instance_list = self._test_instance_list()
|
instance_list = self._test_instance_list()
|
||||||
|
|
||||||
# execute AutoEvacuationInstancesTask
|
# execute EvacuateInstancesTask
|
||||||
task = host_failure.AutoEvacuationInstancesTask(self.novaclient)
|
task = host_failure.EvacuateInstancesTask(self.novaclient)
|
||||||
# mock evacuate method of FakeNovaClient to confirm that evacuate
|
# mock evacuate method of FakeNovaClient to confirm that evacuate
|
||||||
# method is called.
|
# method is called.
|
||||||
with mock.patch.object(fakes.FakeNovaClient.ServerManager,
|
with mock.patch.object(fakes.FakeNovaClient.ServerManager,
|
||||||
@@ -157,8 +199,8 @@ class HostFailureTestCase(test.TestCase):
|
|||||||
|
|
||||||
# execute PrepareHAEnabledInstancesTask
|
# execute PrepareHAEnabledInstancesTask
|
||||||
task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient)
|
task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient)
|
||||||
instance_list = task.execute(self.ctxt, self.instance_host)
|
self.assertRaises(exception.SkipHostRecoveryException, task.execute,
|
||||||
self.assertEqual(0, len(instance_list['instance_list']))
|
self.ctxt, self.instance_host)
|
||||||
|
|
||||||
@mock.patch('masakari.compute.nova.novaclient')
|
@mock.patch('masakari.compute.nova.novaclient')
|
||||||
def test_host_failure_flow_evacuation_failed(self, _mock_novaclient):
|
def test_host_failure_flow_evacuation_failed(self, _mock_novaclient):
|
||||||
@@ -172,8 +214,8 @@ class HostFailureTestCase(test.TestCase):
|
|||||||
"instance_list": self.fake_client.servers.list()
|
"instance_list": self.fake_client.servers.list()
|
||||||
}
|
}
|
||||||
|
|
||||||
# execute AutoEvacuationInstancesTask
|
# execute EvacuateInstancesTask
|
||||||
instance_list = self._auto_evacuate_instances(
|
instance_list = self._evacuate_instances(
|
||||||
instance_list)
|
instance_list)
|
||||||
|
|
||||||
def fake_get_server(context, host):
|
def fake_get_server(context, host):
|
||||||
@@ -186,7 +228,7 @@ class HostFailureTestCase(test.TestCase):
|
|||||||
# execute ConfirmEvacuationTask
|
# execute ConfirmEvacuationTask
|
||||||
task = host_failure.ConfirmEvacuationTask(self.novaclient)
|
task = host_failure.ConfirmEvacuationTask(self.novaclient)
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
exception.AutoRecoveryFailureException, task.execute,
|
exception.HostRecoveryFailureException, task.execute,
|
||||||
self.ctxt, instance_list['instance_list'],
|
self.ctxt, instance_list['instance_list'],
|
||||||
self.instance_host)
|
self.instance_host)
|
||||||
|
|
||||||
@@ -205,8 +247,8 @@ class HostFailureTestCase(test.TestCase):
|
|||||||
"instance_list": self.fake_client.servers.list()
|
"instance_list": self.fake_client.servers.list()
|
||||||
}
|
}
|
||||||
|
|
||||||
# execute AutoEvacuationInstancesTask
|
# execute EvacuateInstancesTask
|
||||||
instance_list = self._auto_evacuate_instances(
|
instance_list = self._evacuate_instances(
|
||||||
instance_list)
|
instance_list)
|
||||||
|
|
||||||
# execute ConfirmEvacuationTask
|
# execute ConfirmEvacuationTask
|
||||||
@@ -227,8 +269,8 @@ class HostFailureTestCase(test.TestCase):
|
|||||||
"instance_list": self.fake_client.servers.list()
|
"instance_list": self.fake_client.servers.list()
|
||||||
}
|
}
|
||||||
|
|
||||||
# execute AutoEvacuationInstancesTask
|
# execute EvacuateInstancesTask
|
||||||
instance_list = self._auto_evacuate_instances(
|
instance_list = self._evacuate_instances(
|
||||||
instance_list)
|
instance_list)
|
||||||
|
|
||||||
# execute ConfirmEvacuationTask
|
# execute ConfirmEvacuationTask
|
||||||
@@ -249,9 +291,23 @@ class HostFailureTestCase(test.TestCase):
|
|||||||
"instance_list": self.fake_client.servers.list()
|
"instance_list": self.fake_client.servers.list()
|
||||||
}
|
}
|
||||||
|
|
||||||
# execute AutoEvacuationInstancesTask
|
# execute EvacuateInstancesTask
|
||||||
instance_list = self._auto_evacuate_instances(
|
instance_list = self._evacuate_instances(
|
||||||
instance_list)
|
instance_list)
|
||||||
|
|
||||||
# execute ConfirmEvacuationTask
|
# execute ConfirmEvacuationTask
|
||||||
self._test_confirm_evacuate_task(instance_list)
|
self._test_confirm_evacuate_task(instance_list)
|
||||||
|
|
||||||
|
@mock.patch('masakari.compute.nova.novaclient')
|
||||||
|
def test_host_failure_flow_no_instances_on_host(self, _mock_novaclient):
|
||||||
|
_mock_novaclient.return_value = self.fake_client
|
||||||
|
self.override_config("evacuate_all_instances",
|
||||||
|
True, "host_failure")
|
||||||
|
|
||||||
|
# execute DisableComputeServiceTask
|
||||||
|
self._test_disable_compute_service()
|
||||||
|
|
||||||
|
# execute PrepareHAEnabledInstancesTask
|
||||||
|
task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient)
|
||||||
|
self.assertRaises(exception.SkipHostRecoveryException, task.execute,
|
||||||
|
self.ctxt, self.instance_host)
|
||||||
|
@@ -36,9 +36,9 @@ class ProcessFailureTestCase(test.TestCase):
|
|||||||
self.service_host = "fake-host"
|
self.service_host = "fake-host"
|
||||||
self.novaclient = nova.API()
|
self.novaclient = nova.API()
|
||||||
self.fake_client = fakes.FakeNovaClient()
|
self.fake_client = fakes.FakeNovaClient()
|
||||||
# overriding 'wait_period_after_service_disabled' to 2 seconds to
|
# overriding 'wait_period_after_service_update' to 2 seconds
|
||||||
# reduce the wait period.
|
# to reduce the wait period.
|
||||||
self.override_config('wait_period_after_service_disabled', 2)
|
self.override_config('wait_period_after_service_update', 2)
|
||||||
|
|
||||||
@mock.patch('masakari.compute.nova.novaclient')
|
@mock.patch('masakari.compute.nova.novaclient')
|
||||||
def test_compute_process_failure_flow(self, _mock_novaclient):
|
def test_compute_process_failure_flow(self, _mock_novaclient):
|
||||||
|
@@ -216,15 +216,17 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
|
|||||||
|
|
||||||
@mock.patch.object(host_obj.Host, "get_by_uuid")
|
@mock.patch.object(host_obj.Host, "get_by_uuid")
|
||||||
@mock.patch.object(host_obj.Host, "save")
|
@mock.patch.object(host_obj.Host, "save")
|
||||||
|
@mock.patch.object(host_obj.HostList, "get_all")
|
||||||
@mock.patch("masakari.engine.drivers.taskflow."
|
@mock.patch("masakari.engine.drivers.taskflow."
|
||||||
"TaskFlowDriver.execute_host_failure")
|
"TaskFlowDriver.execute_host_failure")
|
||||||
@mock.patch.object(notification_obj.Notification, "save")
|
@mock.patch.object(notification_obj.Notification, "save")
|
||||||
def test_process_notification_type_compute_host_event_stopped(
|
def test_process_notification_type_compute_host_event_stopped(
|
||||||
self, mock_notification_save, mock_host_failure,
|
self, mock_notification_save, mock_host_failure, mock_get_all,
|
||||||
mock_host_save, mock_host_obj):
|
mock_host_save, mock_host_obj):
|
||||||
notification = self._get_compute_host_type_notification()
|
notification = self._get_compute_host_type_notification()
|
||||||
mock_host_failure.side_effect = self._fake_notification_workflow()
|
mock_host_failure.side_effect = self._fake_notification_workflow()
|
||||||
fake_host = fakes.create_fake_host()
|
fake_host = fakes.create_fake_host()
|
||||||
|
mock_get_all.return_value = None
|
||||||
fake_host.failover_segment = fakes.create_fake_failover_segment()
|
fake_host.failover_segment = fakes.create_fake_failover_segment()
|
||||||
mock_host_obj.return_value = fake_host
|
mock_host_obj.return_value = fake_host
|
||||||
self.engine.process_notification(self.context,
|
self.engine.process_notification(self.context,
|
||||||
@@ -233,22 +235,76 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
|
|||||||
mock_host_failure.assert_called_once_with(
|
mock_host_failure.assert_called_once_with(
|
||||||
self.context,
|
self.context,
|
||||||
fake_host.name, fake_host.failover_segment.recovery_method,
|
fake_host.name, fake_host.failover_segment.recovery_method,
|
||||||
notification.notification_uuid)
|
notification.notification_uuid, reserved_host_list=None)
|
||||||
|
|
||||||
@mock.patch.object(host_obj.Host, "get_by_uuid")
|
@mock.patch.object(host_obj.Host, "get_by_uuid")
|
||||||
@mock.patch.object(host_obj.Host, "save")
|
@mock.patch.object(host_obj.Host, "save")
|
||||||
|
@mock.patch.object(host_obj.HostList, "get_all")
|
||||||
|
@mock.patch.object(notification_obj.Notification, "save")
|
||||||
|
def test_process_notification_host_failure_without_reserved_hosts(
|
||||||
|
self, mock_notification_save, mock_get_all,
|
||||||
|
mock_host_save, mock_host_obj):
|
||||||
|
reserved_host_list = []
|
||||||
|
mock_get_all.return_value = reserved_host_list
|
||||||
|
|
||||||
|
fake_host = fakes.create_fake_host()
|
||||||
|
fake_host.failover_segment = fakes.create_fake_failover_segment(
|
||||||
|
recovery_method='reserved_host')
|
||||||
|
mock_host_obj.return_value = fake_host
|
||||||
|
|
||||||
|
notification = self._get_compute_host_type_notification()
|
||||||
|
|
||||||
|
self.engine.process_notification(self.context,
|
||||||
|
notification=notification)
|
||||||
|
|
||||||
|
self.assertEqual("error", notification.status)
|
||||||
|
|
||||||
|
@mock.patch.object(host_obj.Host, "get_by_uuid")
|
||||||
|
@mock.patch.object(host_obj.Host, "save")
|
||||||
|
@mock.patch.object(host_obj.HostList, "get_all")
|
||||||
|
@mock.patch("masakari.engine.drivers.taskflow."
|
||||||
|
"TaskFlowDriver.execute_host_failure")
|
||||||
|
@mock.patch.object(notification_obj.Notification, "save")
|
||||||
|
def test_process_notification_host_failure_with_reserved_hosts(
|
||||||
|
self, mock_notification_save, mock_host_failure, mock_get_all,
|
||||||
|
mock_host_save, mock_host_obj):
|
||||||
|
reserved_host_list = [fakes.create_fake_host(reserved=True)]
|
||||||
|
mock_get_all.return_value = reserved_host_list
|
||||||
|
|
||||||
|
fake_host = fakes.create_fake_host()
|
||||||
|
fake_host.failover_segment = fakes.create_fake_failover_segment(
|
||||||
|
recovery_method='reserved_host')
|
||||||
|
mock_host_obj.return_value = fake_host
|
||||||
|
|
||||||
|
notification = self._get_compute_host_type_notification()
|
||||||
|
mock_host_failure.side_effect = self._fake_notification_workflow()
|
||||||
|
|
||||||
|
self.engine.process_notification(self.context,
|
||||||
|
notification=notification)
|
||||||
|
|
||||||
|
self.assertEqual("finished", notification.status)
|
||||||
|
mock_host_failure.assert_called_once_with(
|
||||||
|
self.context,
|
||||||
|
fake_host.name, fake_host.failover_segment.recovery_method,
|
||||||
|
notification.notification_uuid,
|
||||||
|
reserved_host_list=reserved_host_list)
|
||||||
|
|
||||||
|
@mock.patch.object(host_obj.Host, "get_by_uuid")
|
||||||
|
@mock.patch.object(host_obj.Host, "save")
|
||||||
|
@mock.patch.object(host_obj.HostList, "get_all")
|
||||||
@mock.patch("masakari.engine.drivers.taskflow."
|
@mock.patch("masakari.engine.drivers.taskflow."
|
||||||
"TaskFlowDriver.execute_host_failure")
|
"TaskFlowDriver.execute_host_failure")
|
||||||
@mock.patch.object(notification_obj.Notification, "save")
|
@mock.patch.object(notification_obj.Notification, "save")
|
||||||
def test_process_notification_type_compute_host_recovery_exception(
|
def test_process_notification_type_compute_host_recovery_exception(
|
||||||
self, mock_notification_save, mock_host_failure,
|
self, mock_notification_save, mock_host_failure, mock_get_all,
|
||||||
mock_host_save, mock_host_obj):
|
mock_host_save, mock_host_obj):
|
||||||
notification = self._get_compute_host_type_notification()
|
notification = self._get_compute_host_type_notification()
|
||||||
fake_host = fakes.create_fake_host()
|
fake_host = fakes.create_fake_host()
|
||||||
|
mock_get_all.return_value = None
|
||||||
fake_host.failover_segment = fakes.create_fake_failover_segment()
|
fake_host.failover_segment = fakes.create_fake_failover_segment()
|
||||||
mock_host_obj.return_value = fake_host
|
mock_host_obj.return_value = fake_host
|
||||||
mock_host_failure.side_effect = self._fake_notification_workflow(
|
mock_host_failure.side_effect = self._fake_notification_workflow(
|
||||||
exc=exception.AutoRecoveryFailureException)
|
exc=exception.HostRecoveryFailureException)
|
||||||
self.engine.process_notification(self.context,
|
self.engine.process_notification(self.context,
|
||||||
notification=notification)
|
notification=notification)
|
||||||
self.assertEqual("error", notification.status)
|
self.assertEqual("error", notification.status)
|
||||||
|
@@ -41,8 +41,6 @@ CONF = masakari.conf.CONF
|
|||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
synchronized = lockutils.synchronized_with_prefix('masakari-')
|
|
||||||
|
|
||||||
|
|
||||||
def utf8(value):
|
def utf8(value):
|
||||||
"""Try to turn a string into utf-8 if possible.
|
"""Try to turn a string into utf-8 if possible.
|
||||||
@@ -267,3 +265,27 @@ def validate_integer(value, name, min_value=None, max_value=None):
|
|||||||
'max_value': max_value})
|
'max_value': max_value})
|
||||||
)
|
)
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
def synchronized(name, semaphores=None, blocking=False):
|
||||||
|
def wrap(f):
|
||||||
|
@six.wraps(f)
|
||||||
|
def inner(*args, **kwargs):
|
||||||
|
lock_name = 'masakari-%s' % name
|
||||||
|
int_lock = lockutils.internal_lock(lock_name,
|
||||||
|
semaphores=semaphores)
|
||||||
|
LOG.debug("Acquiring lock: %(lock_name)s on resource: "
|
||||||
|
"%(resource)s", {'lock_name': lock_name,
|
||||||
|
'resource': f.__name__})
|
||||||
|
|
||||||
|
if not int_lock.acquire(blocking=blocking):
|
||||||
|
raise exception.LockAlreadyAcquired(resource=name)
|
||||||
|
try:
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
finally:
|
||||||
|
LOG.debug("Releasing lock: %(lock_name)s on resource: "
|
||||||
|
"%(resource)s", {'lock_name': lock_name,
|
||||||
|
'resource': f.__name__})
|
||||||
|
int_lock.release()
|
||||||
|
return inner
|
||||||
|
return wrap
|
||||||
|
@@ -0,0 +1,8 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Implemented workflow for 'reserved_host' recovery method in case of host
|
||||||
|
failure. Now operator can create or update failover segment with
|
||||||
|
'reserved_host' recovery method along with the existing 'auto' method.
|
||||||
|
When 'reserved_host' recovery_method is set to a failover segment,
|
||||||
|
operators should also add one or more hosts with reserved flag set as True.
|
Reference in New Issue
Block a user