Move cluster status updates into driver

This is an alternative implementation to:

https://review.openstack.org/#/c/397961

This version implements an earlier proposal from the
spec that adds a driver method for synchronizing
cluster state. This method is optional so that drivers
that do not wish to leverage the existing periodic
synchronization task can do so in whatever manner
they wish and Magnum will not force them to do anything
unnecessarily.

1. add an update_cluster_status method to the driver
   interface
2. implment update_cluster_status for Heat drivers
   using the existing tested logic
3. Remove cluster status updates from the cluster conductor
   in favor of the periodic sync_cluster_status task - this
   should avoid timeouts and race conditions possible in the
   previous implementation
4. Update the periodic sync_cluster_status method to use
   the driver to update cluster status rather than calling
   Heat directly

Change-Id: Iae0ec7af2542343cc51e85f0efd21086d693e540
Partial-Blueprint: bp-driver-consolodation
This commit is contained in:
Randall Burt 2016-12-01 19:32:42 -06:00
parent 7890725c52
commit 759c1b3b2b
10 changed files with 292 additions and 403 deletions

View File

@ -14,7 +14,6 @@
from heatclient import exc
from oslo_log import log as logging
from oslo_service import loopingcall
from pycadf import cadftaxonomy as taxonomy
import six
@ -26,7 +25,6 @@ from magnum.conductor import scale_manager
from magnum.conductor import utils as conductor_utils
import magnum.conf
from magnum.drivers.common import driver
from magnum.drivers.heat import driver as heat_driver
from magnum.i18n import _
from magnum.i18n import _LI
from magnum import objects
@ -58,10 +56,8 @@ class Handler(object):
conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_CREATE, taxonomy.OUTCOME_PENDING)
# Get driver
ct = conductor_utils.retrieve_cluster_template(context, cluster)
cluster_driver = driver.Driver.get_driver(ct.server_type,
ct.cluster_distro,
ct.coe)
cluster_driver = driver.Driver.get_driver_for_cluster(context,
cluster)
# Create cluster
cluster_driver.create_cluster(context, cluster, create_timeout)
cluster.status = fields.ClusterStatus.CREATE_IN_PROGRESS
@ -80,7 +76,6 @@ class Handler(object):
raise
cluster.create()
self._poll_and_check(osc, cluster, cluster_driver)
return cluster
def cluster_update(self, context, cluster, rollback=False):
@ -134,8 +129,6 @@ class Handler(object):
raise
cluster.save()
self._poll_and_check(osc, cluster, cluster_driver)
return cluster
def cluster_delete(self, context, uuid):
@ -179,12 +172,4 @@ class Handler(object):
raise
cluster.save()
self._poll_and_check(osc, cluster, cluster_driver)
return None
def _poll_and_check(self, osc, cluster, cluster_driver):
# TODO(randall): this is a temporary hack. Next patch will sort the
# status update checking
poller = heat_driver.HeatPoller(osc, cluster, cluster_driver)
lc = loopingcall.FixedIntervalLoopingCall(f=poller.poll_and_check)
lc.start(CONF.cluster_heat.wait_interval, True)

View File

@ -21,6 +21,7 @@ from pkg_resources import iter_entry_points
from stevedore import driver
from magnum.common import exception
from magnum.objects import cluster_template
CONF = cfg.CONF
@ -135,6 +136,21 @@ class Driver(object):
return driver.DriverManager("magnum.drivers",
driver_info['entry_point_name']).driver()
@classmethod
def get_driver_for_cluster(cls, context, cluster):
ct = cluster_template.ClusterTemplate.get_by_uuid(
context, cluster.cluster_template_id)
return cls.get_driver(ct.server_type, ct.cluster_distro, ct.coe)
def update_cluster_status(self, context, cluster):
'''Update the cluster status based on underlying orchestration
This is an optional method if your implementation does not need
to poll the orchestration for status updates (for example, your
driver uses some notification-based mechanism instead).
'''
return
@abc.abstractproperty
def provides(self):
'''return a list of (server_type, os, coe) tuples

View File

@ -17,10 +17,10 @@ import six
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import importutils
from heatclient.common import template_utils
from heatclient import exc as heatexc
from magnum.common import clients
from magnum.common import exception
@ -72,6 +72,10 @@ class HeatDriver(driver.Driver):
raise NotImplementedError("Must implement 'get_template_definition'")
def update_cluster_status(self, context, cluster):
poller = HeatPoller(clients.OpenStackClients(context), cluster, self)
poller.poll_and_check()
def create_cluster(self, context, cluster, cluster_create_timeout):
stack = self._create_stack(context, clients.OpenStackClients(context),
cluster, cluster_create_timeout)
@ -147,11 +151,21 @@ class HeatDriver(driver.Driver):
class HeatPoller(object):
status_to_event = {
fields.ClusterStatus.DELETE_COMPLETE: taxonomy.ACTION_DELETE,
fields.ClusterStatus.CREATE_COMPLETE: taxonomy.ACTION_CREATE,
fields.ClusterStatus.UPDATE_COMPLETE: taxonomy.ACTION_UPDATE,
fields.ClusterStatus.ROLLBACK_COMPLETE: taxonomy.ACTION_UPDATE,
fields.ClusterStatus.CREATE_FAILED: taxonomy.ACTION_CREATE,
fields.ClusterStatus.DELETE_FAILED: taxonomy.ACTION_DELETE,
fields.ClusterStatus.UPDATE_FAILED: taxonomy.ACTION_UPDATE,
fields.ClusterStatus.ROLLBACK_FAILED: taxonomy.ACTION_UPDATE
}
def __init__(self, openstack_client, cluster, cluster_driver):
self.openstack_client = openstack_client
self.context = self.openstack_client.context
self.cluster = cluster
self.attempts = 0
self.cluster_template = conductor_utils.retrieve_cluster_template(
self.context, cluster)
self.template_def = cluster_driver.get_template_definition()
@ -159,34 +173,29 @@ class HeatPoller(object):
def poll_and_check(self):
# TODO(yuanying): temporary implementation to update api_address,
# node_addresses and cluster status
stack = self.openstack_client.heat().stacks.get(self.cluster.stack_id)
self.attempts += 1
status_to_event = {
fields.ClusterStatus.DELETE_COMPLETE: taxonomy.ACTION_DELETE,
fields.ClusterStatus.CREATE_COMPLETE: taxonomy.ACTION_CREATE,
fields.ClusterStatus.UPDATE_COMPLETE: taxonomy.ACTION_UPDATE,
fields.ClusterStatus.ROLLBACK_COMPLETE: taxonomy.ACTION_UPDATE,
fields.ClusterStatus.CREATE_FAILED: taxonomy.ACTION_CREATE,
fields.ClusterStatus.DELETE_FAILED: taxonomy.ACTION_DELETE,
fields.ClusterStatus.UPDATE_FAILED: taxonomy.ACTION_UPDATE,
fields.ClusterStatus.ROLLBACK_FAILED: taxonomy.ACTION_UPDATE
}
try:
stack = self.openstack_client.heat().stacks.get(
self.cluster.stack_id)
except heatexc.NotFound:
self._sync_missing_heat_stack()
return
# poll_and_check is detached and polling long time to check status,
# so another user/client can call delete cluster/stack.
if stack.stack_status == fields.ClusterStatus.DELETE_COMPLETE:
self._delete_complete()
# TODO(randall): Move the status notification up the stack
conductor_utils.notify_about_cluster_operation(
self.context, status_to_event[stack.stack_status],
self.context, self.status_to_event[stack.stack_status],
taxonomy.OUTCOME_SUCCESS)
raise loopingcall.LoopingCallDone()
if stack.stack_status in (fields.ClusterStatus.CREATE_COMPLETE,
fields.ClusterStatus.UPDATE_COMPLETE):
self._sync_cluster_and_template_status(stack)
# TODO(randall): Move the status notification up the stack
conductor_utils.notify_about_cluster_operation(
self.context, status_to_event[stack.stack_status],
self.context, self.status_to_event[stack.stack_status],
taxonomy.OUTCOME_SUCCESS)
raise loopingcall.LoopingCallDone()
elif stack.stack_status != self.cluster.status:
self._sync_cluster_status(stack)
@ -197,30 +206,10 @@ class HeatPoller(object):
fields.ClusterStatus.ROLLBACK_FAILED):
self._sync_cluster_and_template_status(stack)
self._cluster_failed(stack)
# TODO(randall): Move the status notification up the stack
conductor_utils.notify_about_cluster_operation(
self.context, status_to_event[stack.stack_status],
self.context, self.status_to_event[stack.stack_status],
taxonomy.OUTCOME_FAILURE)
raise loopingcall.LoopingCallDone()
# only check max attempts when the stack is being created when
# the timeout hasn't been set. If the timeout has been set then
# the loop will end when the stack completes or the timeout occurs
if stack.stack_status == fields.ClusterStatus.CREATE_IN_PROGRESS:
if (stack.timeout_mins is None and
self.attempts > cfg.CONF.cluster_heat.max_attempts):
LOG.error(_LE('Cluster check exit after %(attempts)s attempts,'
'stack_id: %(id)s, stack_status: %(status)s') %
{'attempts': cfg.CONF.cluster_heat.max_attempts,
'id': self.cluster.stack_id,
'status': stack.stack_status})
raise loopingcall.LoopingCallDone()
else:
if self.attempts > cfg.CONF.cluster_heat.max_attempts:
LOG.error(_LE('Cluster check exit after %(attempts)s attempts,'
'stack_id: %(id)s, stack_status: %(status)s') %
{'attempts': cfg.CONF.cluster_heat.max_attempts,
'id': self.cluster.stack_id,
'status': stack.stack_status})
raise loopingcall.LoopingCallDone()
def _delete_complete(self):
LOG.info(_LI('Cluster has been deleted, stack_id: %s')
@ -271,3 +260,26 @@ class HeatPoller(object):
{'cluster_status': stack.stack_status,
'stack_id': self.cluster.stack_id,
'reason': self.cluster.status_reason})
def _sync_missing_heat_stack(self):
if self.cluster.status == fields.ClusterStatus.DELETE_IN_PROGRESS:
self._delete_complete()
elif self.cluster.status == fields.ClusterStatus.CREATE_IN_PROGRESS:
self._sync_missing_stack(fields.ClusterStatus.CREATE_FAILED)
elif self.cluster.status == fields.ClusterStatus.UPDATE_IN_PROGRESS:
self._sync_missing_stack(fields.ClusterStatus.UPDATE_FAILED)
def _sync_missing_stack(self, new_status):
self.cluster.status = new_status
self.cluster.status_reason = _("Stack with id %s not found in "
"Heat.") % self.cluster.stack_id
self.cluster.save()
# TODO(randall): Move the status notification up the stack
conductor_utils.notify_about_cluster_operation(
self.context, self.status_to_event[self.cluster.status],
taxonomy.OUTCOME_FAILURE)
LOG.info(_LI("Cluster with id %(id)s has been set to "
"%(status)s due to stack with id %(sid)s "
"not found in Heat."),
{'id': self.cluster.id, 'status': self.cluster.status,
'sid': self.cluster.stack_id})

View File

@ -15,22 +15,17 @@
import functools
from heatclient import exc as heat_exc
from oslo_log import log
from oslo_service import loopingcall
from oslo_service import periodic_task
import six
from magnum.common import clients
from magnum.common import context
from magnum.common import exception
from magnum.common import rpc
from magnum.conductor import monitors
import magnum.conf
from magnum.i18n import _
from magnum.i18n import _LI
from magnum.drivers.common import driver
from magnum.i18n import _LW
from magnum import objects
from magnum.objects import fields
CONF = magnum.conf.CONF
@ -47,6 +42,28 @@ def set_context(func):
return handler
class ClusterUpdateJob(object):
def __init__(self, ctx, cluster):
self.ctx = ctx
self.cluster = cluster
def update_status(self):
LOG.debug("Updating status for cluster %s", self.cluster.id)
# get the driver for the cluster
cdriver = driver.Driver.get_driver_for_cluster(self.ctx, self.cluster)
# ask the driver to sync status
cdriver.update_cluster_status(self.ctx, self.cluster)
# end the "loop"
LOG.debug("Status for cluster %s updated to %s (%s)",
self.cluster.id, self.cluster.status,
self.cluster.status_reason)
# if we're done with it, delete it
if self.cluster.status == objects.fields.ClusterStatus.DELETE_COMPLETE:
self.cluster.destroy()
raise loopingcall.LoopingCallDone()
class MagnumPeriodicTasks(periodic_task.PeriodicTasks):
'''Magnum periodic Task class
@ -67,138 +84,44 @@ class MagnumPeriodicTasks(periodic_task.PeriodicTasks):
super(MagnumPeriodicTasks, self).__init__(conf)
self.notifier = rpc.get_notifier()
@periodic_task.periodic_task(run_immediately=True)
@periodic_task.periodic_task(spacing=10, run_immediately=True)
@set_context
def sync_cluster_status(self, ctx):
try:
LOG.debug('Starting to sync up cluster status')
osc = clients.OpenStackClients(ctx)
status = [fields.ClusterStatus.CREATE_IN_PROGRESS,
fields.ClusterStatus.UPDATE_IN_PROGRESS,
fields.ClusterStatus.DELETE_IN_PROGRESS,
fields.ClusterStatus.ROLLBACK_IN_PROGRESS]
# get all the clusters that are IN_PROGRESS
status = [objects.fields.ClusterStatus.CREATE_IN_PROGRESS,
objects.fields.ClusterStatus.UPDATE_IN_PROGRESS,
objects.fields.ClusterStatus.DELETE_IN_PROGRESS,
objects.fields.ClusterStatus.ROLLBACK_IN_PROGRESS]
filters = {'status': status}
clusters = objects.Cluster.list(ctx, filters=filters)
if not clusters:
return
sid_to_cluster_mapping = {cluster.stack_id:
cluster for cluster in clusters}
cluster_stack_ids = sid_to_cluster_mapping.keys()
if CONF.periodic_global_stack_list:
stacks = osc.heat().stacks.list(
global_tenant=True, filters={'id': cluster_stack_ids})
else:
ret = self._get_cluster_stacks(
clusters, sid_to_cluster_mapping, cluster_stack_ids)
[stacks, clusters, cluster_stack_ids,
sid_to_cluster_mapping] = ret
sid_to_stack_mapping = {s.id: s for s in stacks}
# intersection of clusters magnum has and heat has
for sid in (six.viewkeys(sid_to_cluster_mapping) &
six.viewkeys(sid_to_stack_mapping)):
stack = sid_to_stack_mapping[sid]
cluster = sid_to_cluster_mapping[sid]
self._sync_existing_cluster(cluster, stack)
# the stacks that magnum has but heat doesn't have
for sid in (six.viewkeys(sid_to_cluster_mapping) -
six.viewkeys(sid_to_stack_mapping)):
cluster = sid_to_cluster_mapping[sid]
self._sync_missing_heat_stack(cluster)
# synchronize with underlying orchestration
for cluster in clusters:
job = ClusterUpdateJob(ctx, cluster)
# though this call isn't really looping, we use this
# abstraction anyway to avoid dealing directly with eventlet
# hooey
lc = loopingcall.FixedIntervalLoopingCall(f=job.update_status)
lc.start(1, stop_on_exception=True)
except Exception as e:
LOG.warning(_LW(
"Ignore error [%s] when syncing up cluster status."
), e, exc_info=True)
def _get_cluster_stacks(
self, clusters, sid_to_cluster_mapping, cluster_stack_ids):
stacks = []
_clusters = clusters
_sid_to_cluster_mapping = sid_to_cluster_mapping
_cluster_stack_ids = cluster_stack_ids
for cluster in _clusters:
try:
# Create client with cluster's trustee user context
bosc = clients.OpenStackClients(
context.make_cluster_context(cluster))
stack = bosc.heat().stacks.get(cluster.stack_id)
stacks.append(stack)
# No need to do anything in this case
except heat_exc.HTTPNotFound:
pass
except Exception as e:
# Any other exception means we do not perform any
# action on this cluster in the current sync run, so remove
# it from all records.
LOG.warning(
_LW("Exception while attempting to retrieve "
"Heat stack %(stack_id)s for cluster %(cluster_id)s. "
"Traceback follows."),
{'stack_id': cluster.stack_id, 'cluster_id': cluster.id})
LOG.warning(e)
_sid_to_cluster_mapping.pop(cluster.stack_id)
_cluster_stack_ids.remove(cluster.stack_id)
_clusters.remove(cluster)
return [stacks, _clusters, _cluster_stack_ids, _sid_to_cluster_mapping]
def _sync_existing_cluster(self, cluster, stack):
if cluster.status != stack.stack_status:
old_status = cluster.status
cluster.status = stack.stack_status
cluster.status_reason = stack.stack_status_reason
cluster.save()
LOG.info(_LI("Sync up cluster with id %(id)s from "
"%(old_status)s to %(status)s."),
{'id': cluster.id, 'old_status': old_status,
'status': cluster.status})
def _sync_missing_heat_stack(self, cluster):
if cluster.status == fields.ClusterStatus.DELETE_IN_PROGRESS:
self._sync_deleted_stack(cluster)
elif cluster.status == fields.ClusterStatus.CREATE_IN_PROGRESS:
self._sync_missing_stack(cluster,
fields.ClusterStatus.CREATE_FAILED)
elif cluster.status == fields.ClusterStatus.UPDATE_IN_PROGRESS:
self._sync_missing_stack(cluster,
fields.ClusterStatus.UPDATE_FAILED)
def _sync_deleted_stack(self, cluster):
try:
cluster.destroy()
except exception.ClusterNotFound:
LOG.info(_LI('The cluster %s has been deleted by others.'),
cluster.uuid)
else:
LOG.info(_LI("cluster with id %(id)s not found in heat "
"with stack id %(sid)s, with status_reason: "
"%(reason)s."), {'id': cluster.id,
'sid': cluster.stack_id,
'reason': cluster.status_reason})
def _sync_missing_stack(self, cluster, new_status):
cluster.status = new_status
cluster.status_reason = _("Stack with id %s not found in "
"Heat.") % cluster.stack_id
cluster.save()
LOG.info(_LI("Cluster with id %(id)s has been set to "
"%(status)s due to stack with id %(sid)s "
"not found in Heat."),
{'id': cluster.id, 'status': cluster.status,
'sid': cluster.stack_id})
@periodic_task.periodic_task(run_immediately=True)
@set_context
def _send_cluster_metrics(self, ctx):
LOG.debug('Starting to send cluster metrics')
for cluster in objects.Cluster.list(ctx):
if cluster.status not in [fields.ClusterStatus.CREATE_COMPLETE,
fields.ClusterStatus.UPDATE_COMPLETE]:
if cluster.status not in (
objects.fields.ClusterStatus.CREATE_COMPLETE,
objects.fields.ClusterStatus.UPDATE_COMPLETE):
continue
monitor = monitors.create_monitor(ctx, cluster)

View File

@ -11,6 +11,9 @@
# under the License.
import mock
import time
from oslo_service import loopingcall
fakeAuthTokenHeaders = {'X-User-Id': u'773a902f022949619b5c2f32cd89d419',
'X-Project-Id': u'5588aebbcdc24e17a061595f80574376',
@ -91,3 +94,33 @@ class FakeAuthProtocol(mock.Mock):
super(FakeAuthProtocol, self).__init__(**kwargs)
self.app = FakeApp()
self.config = ''
class FakeLoopingCall(object):
'''Fake a looping call without the eventlet stuff
For tests, just do a simple implementation so that we can ensure the
called logic works rather than testing LoopingCall
'''
def __init__(self, **kwargs):
func = kwargs.pop("f", None)
if func is None:
raise ValueError("Must pass a callable in the -f kwarg.")
self.call_func = func
def start(self, interval, **kwargs):
intitial_delay = kwargs.pop("initial_delay", -1)
stop_on_exception = kwargs.pop("stop_on_exception", True)
if intitial_delay:
time.sleep(intitial_delay)
while True:
try:
self.call_func()
except loopingcall.LoopingCallDone:
return 0
except Exception as exc:
if stop_on_exception:
raise exc
if interval:
time.sleep(interval)

View File

@ -49,18 +49,13 @@ class TestHandler(db_base.DbTestCase):
self.cluster.create()
@patch('magnum.conductor.scale_manager.get_scale_manager')
@patch(
'magnum.conductor.handlers.cluster_conductor.Handler._poll_and_check')
@patch('magnum.drivers.common.driver.Driver.get_driver')
@patch('magnum.common.clients.OpenStackClients')
def test_update_node_count_success(
self, mock_openstack_client_class,
mock_driver, mock_poll_and_check,
mock_driver,
mock_scale_manager):
def side_effect(*args, **kwargs):
self.cluster.node_count = 2
self.cluster.save()
mock_poll_and_check.side_effect = side_effect
mock_heat_stack = mock.MagicMock()
mock_heat_stack.stack_status = cluster_status.CREATE_COMPLETE
mock_heat_client = mock.MagicMock()
@ -87,16 +82,10 @@ class TestHandler(db_base.DbTestCase):
cluster = objects.Cluster.get(self.context, self.cluster.uuid)
self.assertEqual(2, cluster.node_count)
@patch(
'magnum.conductor.handlers.cluster_conductor.Handler._poll_and_check')
@patch('magnum.common.clients.OpenStackClients')
def test_update_node_count_failure(
self, mock_openstack_client_class,
mock_poll_and_check):
def side_effect(*args, **kwargs):
self.cluster.node_count = 2
self.cluster.save()
mock_poll_and_check.side_effect = side_effect
self, mock_openstack_client_class):
mock_heat_stack = mock.MagicMock()
mock_heat_stack.stack_status = cluster_status.CREATE_FAILED
mock_heat_client = mock.MagicMock()
@ -120,18 +109,12 @@ class TestHandler(db_base.DbTestCase):
self.assertEqual(1, cluster.node_count)
@patch('magnum.conductor.scale_manager.get_scale_manager')
@patch(
'magnum.conductor.handlers.cluster_conductor.Handler._poll_and_check')
@patch('magnum.drivers.common.driver.Driver.get_driver')
@patch('magnum.common.clients.OpenStackClients')
def _test_update_cluster_status_complete(
self, expect_status, mock_openstack_client_class,
mock_driver, mock_poll_and_check,
mock_scale_manager):
def side_effect(*args, **kwargs):
self.cluster.node_count = 2
self.cluster.save()
mock_poll_and_check.side_effect = side_effect
mock_driver, mock_scale_manager):
mock_heat_stack = mock.MagicMock()
mock_heat_stack.stack_status = expect_status
mock_heat_client = mock.MagicMock()

View File

@ -14,7 +14,6 @@
import mock
from mock import patch
from oslo_service import loopingcall
from magnum.drivers.heat import driver as heat_driver
from magnum.drivers.mesos_ubuntu_v1 import driver as mesos_dr
@ -328,6 +327,6 @@ class TestClusterConductorWithMesos(base.TestCase):
mock_heat_stack.parameters = {'number_of_slaves': 2}
mock_heat_stack.stack_status = cluster_status.UPDATE_COMPLETE
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
poller.poll_and_check()
self.assertEqual(2, cluster.node_count)

View File

@ -14,7 +14,6 @@
import mock
from mock import patch
from oslo_service import loopingcall
import magnum.conf
from magnum.drivers.heat import driver as heat_driver
@ -455,6 +454,6 @@ class TestClusterConductorWithSwarm(base.TestCase):
mock_heat_stack.parameters = {'number_of_nodes': 2}
mock_heat_stack.stack_status = cluster_status.UPDATE_COMPLETE
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
poller.poll_and_check()
self.assertEqual(2, cluster.node_count)

View File

@ -12,7 +12,6 @@
import mock
from mock import patch
from oslo_service import loopingcall
from pycadf import cadftaxonomy as taxonomy
import magnum.conf
@ -56,19 +55,24 @@ class TestHeatPoller(base.TestCase):
def test_poll_and_check_send_notification(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.CREATE_COMPLETE
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
self.assertEqual(mock_heat_stack.stack_status, cluster.status)
mock_heat_stack.stack_status = cluster_status.CREATE_FAILED
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
self.assertEqual(mock_heat_stack.stack_status, cluster.status)
mock_heat_stack.stack_status = cluster_status.DELETE_COMPLETE
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
self.assertEqual(mock_heat_stack.stack_status, cluster.status)
mock_heat_stack.stack_status = cluster_status.DELETE_FAILED
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
self.assertEqual(mock_heat_stack.stack_status, cluster.status)
mock_heat_stack.stack_status = cluster_status.UPDATE_COMPLETE
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
self.assertEqual(mock_heat_stack.stack_status, cluster.status)
mock_heat_stack.stack_status = cluster_status.UPDATE_FAILED
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
self.assertEqual(mock_heat_stack.stack_status, cluster.status)
self.assertEqual(6, poller.attempts)
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(6, len(notifications))
self.assertEqual(
@ -102,9 +106,7 @@ class TestHeatPoller(base.TestCase):
cluster.status = cluster_status.CREATE_IN_PROGRESS
mock_heat_stack.stack_status = cluster_status.CREATE_IN_PROGRESS
poller.poll_and_check()
self.assertEqual(0, cluster.save.call_count)
self.assertEqual(1, poller.attempts)
def test_poll_save(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
@ -112,76 +114,70 @@ class TestHeatPoller(base.TestCase):
cluster.status = cluster_status.CREATE_IN_PROGRESS
mock_heat_stack.stack_status = cluster_status.CREATE_FAILED
mock_heat_stack.stack_status_reason = 'Create failed'
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
self.assertEqual(2, cluster.save.call_count)
self.assertEqual(cluster_status.CREATE_FAILED, cluster.status)
self.assertEqual('Create failed', cluster.status_reason)
self.assertEqual(1, poller.attempts)
def test_poll_done(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.DELETE_COMPLETE
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
mock_heat_stack.stack_status = cluster_status.CREATE_FAILED
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertEqual(2, poller.attempts)
self.assertIsNone(poller.poll_and_check())
def test_poll_done_by_update(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.UPDATE_COMPLETE
mock_heat_stack.parameters = {'number_of_minions': 2}
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
self.assertEqual(1, cluster.save.call_count)
self.assertEqual(cluster_status.UPDATE_COMPLETE, cluster.status)
self.assertEqual(2, cluster.node_count)
self.assertEqual(1, poller.attempts)
def test_poll_done_by_update_failed(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.UPDATE_FAILED
mock_heat_stack.parameters = {'number_of_minions': 2}
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
self.assertEqual(2, cluster.save.call_count)
self.assertEqual(cluster_status.UPDATE_FAILED, cluster.status)
self.assertEqual(2, cluster.node_count)
self.assertEqual(1, poller.attempts)
def test_poll_done_by_rollback_complete(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.ROLLBACK_COMPLETE
mock_heat_stack.parameters = {'number_of_minions': 1}
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
self.assertEqual(2, cluster.save.call_count)
self.assertEqual(cluster_status.ROLLBACK_COMPLETE, cluster.status)
self.assertEqual(1, cluster.node_count)
self.assertEqual(1, poller.attempts)
def test_poll_done_by_rollback_failed(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.ROLLBACK_FAILED
mock_heat_stack.parameters = {'number_of_minions': 1}
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
self.assertEqual(2, cluster.save.call_count)
self.assertEqual(cluster_status.ROLLBACK_FAILED, cluster.status)
self.assertEqual(1, cluster.node_count)
self.assertEqual(1, poller.attempts)
def test_poll_destroy(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.DELETE_FAILED
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
# Destroy method is not called when stack delete failed
self.assertEqual(0, cluster.destroy.call_count)
@ -191,77 +187,13 @@ class TestHeatPoller(base.TestCase):
self.assertEqual(cluster_status.DELETE_IN_PROGRESS, cluster.status)
mock_heat_stack.stack_status = cluster_status.DELETE_COMPLETE
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
# The cluster status should still be DELETE_IN_PROGRESS, because
# the destroy() method may be failed. If success, this cluster record
# will delete directly, change status is meaningless.
self.assertEqual(cluster_status.DELETE_IN_PROGRESS, cluster.status)
self.assertEqual(cluster_status.DELETE_COMPLETE, cluster.status)
self.assertEqual(1, cluster.destroy.call_count)
def test_poll_delete_in_progress_timeout_set(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.DELETE_IN_PROGRESS
mock_heat_stack.timeout_mins = 60
# timeout only affects stack creation so expecting this
# to process normally
poller.poll_and_check()
def test_poll_delete_in_progress_max_attempts_reached(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.DELETE_IN_PROGRESS
poller.attempts = CONF.cluster_heat.max_attempts
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
def test_poll_create_in_prog_max_att_reached_no_timeout(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.CREATE_IN_PROGRESS
poller.attempts = CONF.cluster_heat.max_attempts
mock_heat_stack.timeout_mins = None
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
def test_poll_create_in_prog_max_att_reached_timeout_set(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.CREATE_IN_PROGRESS
poller.attempts = CONF.cluster_heat.max_attempts
mock_heat_stack.timeout_mins = 60
# since the timeout is set the max attempts gets ignored since
# the timeout will eventually stop the poller either when
# the stack gets created or the timeout gets reached
poller.poll_and_check()
def test_poll_create_in_prog_max_att_reached_timed_out(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.CREATE_FAILED
poller.attempts = CONF.cluster_heat.max_attempts
mock_heat_stack.timeout_mins = 60
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
def test_poll_create_in_prog_max_att_not_reached_no_timeout(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.CREATE_IN_PROGRESS
mock_heat_stack.timeout.mins = None
poller.poll_and_check()
def test_poll_create_in_prog_max_att_not_reached_timeout_set(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.CREATE_IN_PROGRESS
mock_heat_stack.timeout_mins = 60
poller.poll_and_check()
def test_poll_create_in_prog_max_att_not_reached_timed_out(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
mock_heat_stack.stack_status = cluster_status.CREATE_FAILED
mock_heat_stack.timeout_mins = 60
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
def test_poll_node_count(self):
mock_heat_stack, cluster, poller = self.setup_poll_test()
@ -276,7 +208,7 @@ class TestHeatPoller(base.TestCase):
mock_heat_stack.parameters = {'number_of_minions': 2}
mock_heat_stack.stack_status = cluster_status.UPDATE_COMPLETE
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertIsNone(poller.poll_and_check())
self.assertEqual(2, cluster.node_count)

View File

@ -12,16 +12,17 @@
# License for the specific language governing permissions and limitations
# under the License.
from heatclient import exc as heat_exc
import mock
from magnum.common import context
from magnum.common.rpc_service import CONF
from magnum.db.sqlalchemy import api as dbapi
from magnum.drivers.common import driver
from magnum import objects
from magnum.objects.fields import ClusterStatus as cluster_status
from magnum.service import periodic
from magnum.tests import base
from magnum.tests import fakes
from magnum.tests.unit.db import utils
@ -36,7 +37,7 @@ class PeriodicTestCase(base.TestCase):
def setUp(self):
super(PeriodicTestCase, self).setUp()
ctx = context.make_admin_context()
self.context = context.make_admin_context()
# Can be identical for all clusters.
trust_attrs = {
@ -46,165 +47,167 @@ class PeriodicTestCase(base.TestCase):
}
trust_attrs.update({'id': 1, 'stack_id': '11',
'status': cluster_status.CREATE_IN_PROGRESS})
'status': cluster_status.CREATE_IN_PROGRESS,
'status_reason': 'no change'})
cluster1 = utils.get_test_cluster(**trust_attrs)
trust_attrs.update({'id': 2, 'stack_id': '22',
'status': cluster_status.DELETE_IN_PROGRESS})
'status': cluster_status.DELETE_IN_PROGRESS,
'status_reason': 'no change'})
cluster2 = utils.get_test_cluster(**trust_attrs)
trust_attrs.update({'id': 3, 'stack_id': '33',
'status': cluster_status.UPDATE_IN_PROGRESS})
'status': cluster_status.UPDATE_IN_PROGRESS,
'status_reason': 'no change'})
cluster3 = utils.get_test_cluster(**trust_attrs)
trust_attrs.update({'id': 4, 'stack_id': '44',
'status': cluster_status.CREATE_COMPLETE})
'status': cluster_status.DELETE_IN_PROGRESS,
'status_reason': 'no change'})
cluster4 = utils.get_test_cluster(**trust_attrs)
trust_attrs.update({'id': 5, 'stack_id': '55',
'status': cluster_status.ROLLBACK_IN_PROGRESS})
'status': cluster_status.ROLLBACK_IN_PROGRESS,
'status_reason': 'no change'})
cluster5 = utils.get_test_cluster(**trust_attrs)
self.cluster1 = objects.Cluster(ctx, **cluster1)
self.cluster2 = objects.Cluster(ctx, **cluster2)
self.cluster3 = objects.Cluster(ctx, **cluster3)
self.cluster4 = objects.Cluster(ctx, **cluster4)
self.cluster5 = objects.Cluster(ctx, **cluster5)
self.cluster1 = objects.Cluster(self.context, **cluster1)
self.cluster2 = objects.Cluster(self.context, **cluster2)
self.cluster3 = objects.Cluster(self.context, **cluster3)
self.cluster4 = objects.Cluster(self.context, **cluster4)
self.cluster5 = objects.Cluster(self.context, **cluster5)
@mock.patch.object(objects.Cluster, 'list')
@mock.patch('magnum.common.clients.OpenStackClients')
@mock.patch.object(dbapi.Connection, 'destroy_cluster')
@mock.patch.object(dbapi.Connection, 'update_cluster')
def test_sync_cluster_status_changes(self, mock_db_update, mock_db_destroy,
mock_oscc, mock_cluster_list):
mock_heat_client = mock.MagicMock()
stack1 = fake_stack(
# these tests are based on the basic behavior of our standard
# Heat-based drivers, but drivers based on other orchestration
# methods should generally behave in a similar fashion as far
# as the actual calls go. It is up to the driver implementor
# to ensure their implementation of update_cluster_status behaves
# as expected regardless of how the periodic updater task works
self.mock_heat_client = mock.MagicMock()
self.stack1 = fake_stack(
id='11', stack_status=cluster_status.CREATE_COMPLETE,
stack_status_reason='fake_reason_11')
stack3 = fake_stack(
self.stack2 = fake_stack(
id='22', stack_status=cluster_status.DELETE_IN_PROGRESS,
stack_status_reason='fake_reason_11')
self.stack3 = fake_stack(
id='33', stack_status=cluster_status.UPDATE_COMPLETE,
stack_status_reason='fake_reason_33')
stack5 = fake_stack(
self.stack5 = fake_stack(
id='55', stack_status=cluster_status.ROLLBACK_COMPLETE,
stack_status_reason='fake_reason_55')
mock_heat_client.stacks.list.return_value = [stack1, stack3, stack5]
get_stacks = {'11': stack1, '33': stack3, '55': stack5}
self.mock_heat_client.stacks.list.return_value = [
self.stack1, self.stack2, self.stack3, self.stack5]
def stack_get_sideefect(arg):
if arg == '22':
raise heat_exc.HTTPNotFound
return get_stacks[arg]
self.get_stacks = {
'11': self.stack1,
'22': self.stack2,
'33': self.stack3,
'55': self.stack5
}
self.mock_driver = mock.MagicMock(spec=driver.Driver)
def _mock_update_status(context, cluster):
try:
stack = self.get_stacks[cluster.stack_id]
except KeyError:
cluster.status_reason = "Stack %s not found" % cluster.stack_id
if cluster.status == "DELETE_IN_PROGRESS":
cluster.status = cluster_status.DELETE_COMPLETE
else:
cluster.status = cluster.status.replace("IN_PROGRESS",
"FAILED")
cluster.status = cluster.status.replace("COMPLETE",
"FAILED")
else:
if cluster.status != stack.stack_status:
cluster.status = stack.stack_status
cluster.status_reason = stack.stack_status_reason
self.mock_driver.update_cluster_status.side_effect = (
_mock_update_status)
@mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall',
new=fakes.FakeLoopingCall)
@mock.patch('magnum.drivers.common.driver.Driver.get_driver_for_cluster')
@mock.patch('magnum.objects.Cluster.list')
@mock.patch.object(dbapi.Connection, 'destroy_cluster')
def test_sync_cluster_status_changes(self, mock_db_destroy,
mock_cluster_list, mock_get_driver):
mock_heat_client.stacks.get.side_effect = stack_get_sideefect
mock_osc = mock_oscc.return_value
mock_osc.heat.return_value = mock_heat_client
mock_cluster_list.return_value = [self.cluster1, self.cluster2,
self.cluster3, self.cluster5]
mock_keystone_client = mock.MagicMock()
mock_keystone_client.client.project_id = "fake_project"
mock_osc.keystone.return_value = mock_keystone_client
self.cluster3, self.cluster4,
self.cluster5]
mock_get_driver.return_value = self.mock_driver
periodic.MagnumPeriodicTasks(CONF).sync_cluster_status(None)
self.assertEqual(cluster_status.CREATE_COMPLETE, self.cluster1.status)
self.assertEqual('fake_reason_11', self.cluster1.status_reason)
mock_db_destroy.assert_called_once_with(self.cluster2.uuid)
# make sure cluster 2 didn't change
self.assertEqual(cluster_status.DELETE_IN_PROGRESS,
self.cluster2.status)
self.assertEqual('no change', self.cluster2.status_reason)
self.assertEqual(cluster_status.UPDATE_COMPLETE, self.cluster3.status)
self.assertEqual('fake_reason_33', self.cluster3.status_reason)
mock_db_destroy.assert_called_once_with(self.cluster4.uuid)
self.assertEqual(cluster_status.ROLLBACK_COMPLETE,
self.cluster5.status)
self.assertEqual('fake_reason_55', self.cluster5.status_reason)
@mock.patch.object(objects.Cluster, 'list')
@mock.patch('magnum.common.clients.OpenStackClients')
def test_sync_auth_fail(self, mock_oscc, mock_cluster_list):
"""Tests handling for unexpected exceptions in _get_cluster_stacks()
@mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall',
new=fakes.FakeLoopingCall)
@mock.patch('magnum.drivers.common.driver.Driver.get_driver_for_cluster')
@mock.patch('magnum.objects.Cluster.list')
def test_sync_cluster_status_not_changes(self, mock_cluster_list,
mock_get_driver):
It does this by raising an a HTTPUnauthorized exception in Heat client.
The affected stack thus missing from the stack list should not lead to
cluster state changing in this case. Likewise, subsequent clusters
should still change state, despite the affected cluster being skipped.
"""
stack1 = fake_stack(id='11',
stack_status=cluster_status.CREATE_COMPLETE)
mock_heat_client = mock.MagicMock()
def stack_get_sideefect(arg):
raise heat_exc.HTTPUnauthorized
mock_heat_client.stacks.get.side_effect = stack_get_sideefect
mock_heat_client.stacks.list.return_value = [stack1]
mock_osc = mock_oscc.return_value
mock_osc.heat.return_value = mock_heat_client
mock_cluster_list.return_value = [self.cluster1]
periodic.MagnumPeriodicTasks(CONF).sync_cluster_status(None)
self.assertEqual(cluster_status.CREATE_IN_PROGRESS,
self.cluster1.status)
@mock.patch.object(objects.Cluster, 'list')
@mock.patch('magnum.common.clients.OpenStackClients')
def test_sync_cluster_status_not_changes(self, mock_oscc,
mock_cluster_list):
mock_heat_client = mock.MagicMock()
stack1 = fake_stack(id='11',
stack_status=cluster_status.CREATE_IN_PROGRESS)
stack2 = fake_stack(id='22',
stack_status=cluster_status.DELETE_IN_PROGRESS)
stack3 = fake_stack(id='33',
stack_status=cluster_status.UPDATE_IN_PROGRESS)
stack5 = fake_stack(id='55',
stack_status=cluster_status.ROLLBACK_IN_PROGRESS)
get_stacks = {'11': stack1, '22': stack2, '33': stack3, '55': stack5}
def stack_get_sideefect(arg):
if arg == '22':
raise heat_exc.HTTPNotFound
return get_stacks[arg]
mock_heat_client.stacks.get.side_effect = stack_get_sideefect
mock_heat_client.stacks.list.return_value = [stack1, stack2, stack3,
stack5]
mock_osc = mock_oscc.return_value
mock_osc.heat.return_value = mock_heat_client
self.stack1.stack_status = self.cluster1.status
self.stack2.stack_status = self.cluster2.status
self.stack3.stack_status = self.cluster3.status
self.stack5.stack_status = self.cluster5.status
mock_cluster_list.return_value = [self.cluster1, self.cluster2,
self.cluster3, self.cluster5]
mock_get_driver.return_value = self.mock_driver
periodic.MagnumPeriodicTasks(CONF).sync_cluster_status(None)
self.assertEqual(cluster_status.CREATE_IN_PROGRESS,
self.cluster1.status)
self.assertEqual('no change', self.cluster1.status_reason)
self.assertEqual(cluster_status.DELETE_IN_PROGRESS,
self.cluster2.status)
self.assertEqual('no change', self.cluster2.status_reason)
self.assertEqual(cluster_status.UPDATE_IN_PROGRESS,
self.cluster3.status)
self.assertEqual('no change', self.cluster3.status_reason)
self.assertEqual(cluster_status.ROLLBACK_IN_PROGRESS,
self.cluster5.status)
self.assertEqual('no change', self.cluster5.status_reason)
@mock.patch.object(objects.Cluster, 'list')
@mock.patch('magnum.common.clients.OpenStackClients')
@mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall',
new=fakes.FakeLoopingCall)
@mock.patch('magnum.drivers.common.driver.Driver.get_driver_for_cluster')
@mock.patch('magnum.objects.Cluster.list')
@mock.patch.object(dbapi.Connection, 'destroy_cluster')
@mock.patch.object(dbapi.Connection, 'update_cluster')
def test_sync_cluster_status_heat_not_found(self, mock_db_update,
mock_db_destroy, mock_oscc,
mock_cluster_list):
mock_heat_client = mock.MagicMock()
mock_heat_client.stacks.list.return_value = []
mock_osc = mock_oscc.return_value
mock_osc.heat.return_value = mock_heat_client
def test_sync_cluster_status_heat_not_found(self, mock_db_destroy,
mock_cluster_list,
mock_get_driver):
self.get_stacks.clear()
mock_get_driver.return_value = self.mock_driver
mock_cluster_list.return_value = [self.cluster1, self.cluster2,
self.cluster3]
mock_keystone_client = mock.MagicMock()
mock_keystone_client.client.project_id = "fake_project"
mock_osc.keystone.return_value = mock_keystone_client
self.cluster3, self.cluster4,
self.cluster5]
periodic.MagnumPeriodicTasks(CONF).sync_cluster_status(None)
self.assertEqual(cluster_status.CREATE_FAILED, self.cluster1.status)
self.assertEqual('Stack with id 11 not found in Heat.',
self.cluster1.status_reason)
mock_db_destroy.assert_called_once_with(self.cluster2.uuid)
self.assertEqual('Stack 11 not found', self.cluster1.status_reason)
self.assertEqual(cluster_status.UPDATE_FAILED, self.cluster3.status)
self.assertEqual('Stack with id 33 not found in Heat.',
self.cluster3.status_reason)
self.assertEqual('Stack 33 not found', self.cluster3.status_reason)
self.assertEqual(cluster_status.ROLLBACK_FAILED, self.cluster5.status)
self.assertEqual('Stack 55 not found', self.cluster5.status_reason)
mock_db_destroy.assert_has_calls([
mock.call(self.cluster2.uuid),
mock.call(self.cluster4.uuid)
])
self.assertEqual(2, mock_db_destroy.call_count)
@mock.patch('magnum.conductor.monitors.create_monitor')
@mock.patch('magnum.objects.Cluster.list')
@ -219,6 +222,7 @@ class PeriodicTestCase(base.TestCase):
mock_get_notifier.return_value = notifier
mock_cluster_list.return_value = [self.cluster1, self.cluster2,
self.cluster3, self.cluster4]
self.cluster4.status = cluster_status.CREATE_COMPLETE
monitor = mock.MagicMock()
monitor.get_metric_names.return_value = ['metric1', 'metric2']
monitor.compute_metric_value.return_value = 30
@ -262,6 +266,7 @@ class PeriodicTestCase(base.TestCase):
notifier = mock.MagicMock()
mock_get_notifier.return_value = notifier
mock_cluster_list.return_value = [self.cluster4]
self.cluster4.status = cluster_status.CREATE_COMPLETE
monitor = mock.MagicMock()
monitor.get_metric_names.return_value = ['metric1', 'metric2']
monitor.compute_metric_value.side_effect = Exception(
@ -292,6 +297,7 @@ class PeriodicTestCase(base.TestCase):
notifier = mock.MagicMock()
mock_get_notifier.return_value = notifier
mock_cluster_list.return_value = [self.cluster4]
self.cluster4.status = cluster_status.CREATE_COMPLETE
monitor = mock.MagicMock()
monitor.pull_data.side_effect = Exception("error on pulling data")
mock_create_monitor.return_value = monitor
@ -312,6 +318,7 @@ class PeriodicTestCase(base.TestCase):
notifier = mock.MagicMock()
mock_get_notifier.return_value = notifier
mock_cluster_list.return_value = [self.cluster4]
self.cluster4.status = cluster_status.CREATE_COMPLETE
mock_create_monitor.return_value = None
periodic.MagnumPeriodicTasks(CONF)._send_cluster_metrics(self.context)