Separate StackWatch out into it's own module
In the interest in small modules that have a matching unit test file. Part of blueprint decouple-nested Change-Id: Ia40310007113b1111883a20db99cb7f754ae43bf
This commit is contained in:
parent
9f736de757
commit
0191ce10c2
@ -20,7 +20,6 @@ import eventlet
|
||||
from oslo.config import cfg
|
||||
from oslo import messaging
|
||||
from oslo.serialization import jsonutils
|
||||
from oslo.utils import timeutils
|
||||
from osprofiler import profiler
|
||||
import requests
|
||||
import six
|
||||
@ -44,6 +43,7 @@ from heat.engine import event as evt
|
||||
from heat.engine import parameter_groups
|
||||
from heat.engine import properties
|
||||
from heat.engine import resources
|
||||
from heat.engine import service_stack_watch
|
||||
from heat.engine import stack as parser
|
||||
from heat.engine import stack_lock
|
||||
from heat.engine import template as templatem
|
||||
@ -216,88 +216,6 @@ class ThreadGroupManager(object):
|
||||
event.send(message)
|
||||
|
||||
|
||||
class StackWatch(object):
|
||||
def __init__(self, thread_group_mgr):
|
||||
self.thread_group_mgr = thread_group_mgr
|
||||
|
||||
def start_watch_task(self, stack_id, cnxt):
|
||||
|
||||
def stack_has_a_watchrule(sid):
|
||||
wrs = db_api.watch_rule_get_all_by_stack(cnxt, sid)
|
||||
|
||||
now = timeutils.utcnow()
|
||||
start_watch_thread = False
|
||||
for wr in wrs:
|
||||
# reset the last_evaluated so we don't fire off alarms when
|
||||
# the engine has not been running.
|
||||
db_api.watch_rule_update(cnxt, wr.id, {'last_evaluated': now})
|
||||
|
||||
if wr.state != rpc_api.WATCH_STATE_CEILOMETER_CONTROLLED:
|
||||
start_watch_thread = True
|
||||
|
||||
children = db_api.stack_get_all_by_owner_id(cnxt, sid)
|
||||
for child in children:
|
||||
if stack_has_a_watchrule(child.id):
|
||||
start_watch_thread = True
|
||||
|
||||
return start_watch_thread
|
||||
|
||||
if stack_has_a_watchrule(stack_id):
|
||||
self.thread_group_mgr.add_timer(
|
||||
stack_id,
|
||||
self.periodic_watcher_task,
|
||||
sid=stack_id)
|
||||
|
||||
def check_stack_watches(self, sid):
|
||||
# Retrieve the stored credentials & create context
|
||||
# Require tenant_safe=False to the stack_get to defeat tenant
|
||||
# scoping otherwise we fail to retrieve the stack
|
||||
LOG.debug("Periodic watcher task for stack %s" % sid)
|
||||
admin_context = context.get_admin_context()
|
||||
db_stack = db_api.stack_get(admin_context, sid, tenant_safe=False,
|
||||
eager_load=True)
|
||||
if not db_stack:
|
||||
LOG.error(_LE("Unable to retrieve stack %s for periodic task"),
|
||||
sid)
|
||||
return
|
||||
stack = parser.Stack.load(admin_context, stack=db_stack,
|
||||
use_stored_context=True)
|
||||
|
||||
# recurse into any nested stacks.
|
||||
children = db_api.stack_get_all_by_owner_id(admin_context, sid)
|
||||
for child in children:
|
||||
self.check_stack_watches(child.id)
|
||||
|
||||
# Get all watchrules for this stack and evaluate them
|
||||
try:
|
||||
wrs = db_api.watch_rule_get_all_by_stack(admin_context, sid)
|
||||
except Exception as ex:
|
||||
LOG.warn(_LW('periodic_task db error watch rule removed? %(ex)s'),
|
||||
ex)
|
||||
return
|
||||
|
||||
def run_alarm_action(stack, actions, details):
|
||||
for action in actions:
|
||||
action(details=details)
|
||||
for res in stack.itervalues():
|
||||
res.metadata_update()
|
||||
|
||||
for wr in wrs:
|
||||
rule = watchrule.WatchRule.load(stack.context, watch=wr)
|
||||
actions = rule.evaluate()
|
||||
if actions:
|
||||
self.thread_group_mgr.start(sid, run_alarm_action, stack,
|
||||
actions, rule.get_details())
|
||||
|
||||
def periodic_watcher_task(self, sid):
|
||||
"""
|
||||
Periodic task, created for each stack, triggers watch-rule
|
||||
evaluation for all rules defined for the stack
|
||||
sid = stack ID
|
||||
"""
|
||||
self.check_stack_watches(sid)
|
||||
|
||||
|
||||
@profiler.trace_cls("rpc")
|
||||
class EngineListener(service.Service):
|
||||
'''
|
||||
@ -384,7 +302,8 @@ class EngineService(service.Service):
|
||||
# so we need to create a ThreadGroupManager here for the periodic tasks
|
||||
if self.thread_group_mgr is None:
|
||||
self.thread_group_mgr = ThreadGroupManager()
|
||||
self.stack_watch = StackWatch(self.thread_group_mgr)
|
||||
self.stack_watch = service_stack_watch.StackWatch(
|
||||
self.thread_group_mgr)
|
||||
|
||||
# Create a periodic_watcher_task per-stack
|
||||
admin_context = context.get_admin_context()
|
||||
|
107
heat/engine/service_stack_watch.py
Normal file
107
heat/engine/service_stack_watch.py
Normal file
@ -0,0 +1,107 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo.utils import timeutils
|
||||
|
||||
from heat.common import context
|
||||
from heat.common.i18n import _LE
|
||||
from heat.common.i18n import _LW
|
||||
from heat.db import api as db_api
|
||||
from heat.engine import stack
|
||||
from heat.engine import watchrule
|
||||
from heat.openstack.common import log as logging
|
||||
from heat.rpc import api as rpc_api
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StackWatch(object):
|
||||
def __init__(self, thread_group_mgr):
|
||||
self.thread_group_mgr = thread_group_mgr
|
||||
|
||||
def start_watch_task(self, stack_id, cnxt):
|
||||
|
||||
def stack_has_a_watchrule(sid):
|
||||
wrs = db_api.watch_rule_get_all_by_stack(cnxt, sid)
|
||||
|
||||
now = timeutils.utcnow()
|
||||
start_watch_thread = False
|
||||
for wr in wrs:
|
||||
# reset the last_evaluated so we don't fire off alarms when
|
||||
# the engine has not been running.
|
||||
db_api.watch_rule_update(cnxt, wr.id, {'last_evaluated': now})
|
||||
|
||||
if wr.state != rpc_api.WATCH_STATE_CEILOMETER_CONTROLLED:
|
||||
start_watch_thread = True
|
||||
|
||||
children = db_api.stack_get_all_by_owner_id(cnxt, sid)
|
||||
for child in children:
|
||||
if stack_has_a_watchrule(child.id):
|
||||
start_watch_thread = True
|
||||
|
||||
return start_watch_thread
|
||||
|
||||
if stack_has_a_watchrule(stack_id):
|
||||
self.thread_group_mgr.add_timer(
|
||||
stack_id,
|
||||
self.periodic_watcher_task,
|
||||
sid=stack_id)
|
||||
|
||||
def check_stack_watches(self, sid):
|
||||
# Retrieve the stored credentials & create context
|
||||
# Require tenant_safe=False to the stack_get to defeat tenant
|
||||
# scoping otherwise we fail to retrieve the stack
|
||||
LOG.debug("Periodic watcher task for stack %s" % sid)
|
||||
admin_context = context.get_admin_context()
|
||||
db_stack = db_api.stack_get(admin_context, sid, tenant_safe=False,
|
||||
eager_load=True)
|
||||
if not db_stack:
|
||||
LOG.error(_LE("Unable to retrieve stack %s for periodic task"),
|
||||
sid)
|
||||
return
|
||||
stk = stack.Stack.load(admin_context, stack=db_stack,
|
||||
use_stored_context=True)
|
||||
|
||||
# recurse into any nested stacks.
|
||||
children = db_api.stack_get_all_by_owner_id(admin_context, sid)
|
||||
for child in children:
|
||||
self.check_stack_watches(child.id)
|
||||
|
||||
# Get all watchrules for this stack and evaluate them
|
||||
try:
|
||||
wrs = db_api.watch_rule_get_all_by_stack(admin_context, sid)
|
||||
except Exception as ex:
|
||||
LOG.warn(_LW('periodic_task db error watch rule removed? %(ex)s'),
|
||||
ex)
|
||||
return
|
||||
|
||||
def run_alarm_action(stk, actions, details):
|
||||
for action in actions:
|
||||
action(details=details)
|
||||
for res in stk.itervalues():
|
||||
res.metadata_update()
|
||||
|
||||
for wr in wrs:
|
||||
rule = watchrule.WatchRule.load(stk.context, watch=wr)
|
||||
actions = rule.evaluate()
|
||||
if actions:
|
||||
self.thread_group_mgr.start(sid, run_alarm_action, stk,
|
||||
actions, rule.get_details())
|
||||
|
||||
def periodic_watcher_task(self, sid):
|
||||
"""
|
||||
Periodic task, created for each stack, triggers watch-rule
|
||||
evaluation for all rules defined for the stack
|
||||
sid = stack ID
|
||||
"""
|
||||
self.check_stack_watches(sid)
|
@ -31,7 +31,6 @@ import six
|
||||
from heat.common import exception
|
||||
from heat.common import identifier
|
||||
from heat.common import template_format
|
||||
from heat.common import urlfetch
|
||||
from heat.db import api as db_api
|
||||
from heat.engine.clients.os import glance
|
||||
from heat.engine.clients.os import keystone
|
||||
@ -42,6 +41,7 @@ from heat.engine import properties
|
||||
from heat.engine import resource as res
|
||||
from heat.engine.resources import instance as instances
|
||||
from heat.engine import service
|
||||
from heat.engine import service_stack_watch
|
||||
from heat.engine import stack as parser
|
||||
from heat.engine import stack_lock
|
||||
from heat.engine import template as templatem
|
||||
@ -106,38 +106,6 @@ wp_template_no_default = '''
|
||||
}
|
||||
'''
|
||||
|
||||
nested_alarm_template = '''
|
||||
HeatTemplateFormatVersion: '2012-12-12'
|
||||
Resources:
|
||||
the_nested:
|
||||
Type: AWS::CloudFormation::Stack
|
||||
Properties:
|
||||
TemplateURL: https://server.test/alarm.template
|
||||
'''
|
||||
|
||||
alarm_template = '''
|
||||
{
|
||||
"AWSTemplateFormatVersion" : "2010-09-09",
|
||||
"Description" : "alarming",
|
||||
"Resources" : {
|
||||
"service_alarm": {
|
||||
"Type": "AWS::CloudWatch::Alarm",
|
||||
"Properties": {
|
||||
"EvaluationPeriods": "1",
|
||||
"AlarmActions": [],
|
||||
"AlarmDescription": "do the thing",
|
||||
"Namespace": "dev/null",
|
||||
"Period": "300",
|
||||
"ComparisonOperator": "GreaterThanThreshold",
|
||||
"Statistic": "SampleCount",
|
||||
"Threshold": "2",
|
||||
"MetricName": "ServiceFailure"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
'''
|
||||
|
||||
policy_template = '''
|
||||
{
|
||||
"AWSTemplateFormatVersion" : "2010-09-09",
|
||||
@ -1633,7 +1601,7 @@ class StackServiceTest(common.HeatTestCase):
|
||||
res._register_class('ResourceWithPropsType',
|
||||
generic_rsrc.ResourceWithProps)
|
||||
|
||||
@mock.patch.object(service.StackWatch, 'start_watch_task')
|
||||
@mock.patch.object(service_stack_watch.StackWatch, 'start_watch_task')
|
||||
@mock.patch.object(service.db_api, 'stack_get_all')
|
||||
@mock.patch.object(service.service.Service, 'start')
|
||||
def test_start_watches_all_stacks(self, mock_super_start, mock_get_all,
|
||||
@ -2633,48 +2601,6 @@ class StackServiceTest(common.HeatTestCase):
|
||||
|
||||
self.m.VerifyAll()
|
||||
|
||||
@stack_context('periodic_watch_task_not_created')
|
||||
def test_periodic_watch_task_not_created(self):
|
||||
self.eng.thread_group_mgr.groups[self.stack.id] = DummyThreadGroup()
|
||||
self.eng.stack_watch.start_watch_task(self.stack.id, self.ctx)
|
||||
self.assertEqual(
|
||||
[], self.eng.thread_group_mgr.groups[self.stack.id].threads)
|
||||
|
||||
def test_periodic_watch_task_created(self):
|
||||
stack = get_stack('period_watch_task_created',
|
||||
utils.dummy_context(),
|
||||
alarm_template)
|
||||
self.stack = stack
|
||||
self.m.ReplayAll()
|
||||
stack.store()
|
||||
stack.create()
|
||||
self.eng.thread_group_mgr.groups[stack.id] = DummyThreadGroup()
|
||||
self.eng.stack_watch.start_watch_task(stack.id, self.ctx)
|
||||
expected = [self.eng.stack_watch.periodic_watcher_task]
|
||||
observed = self.eng.thread_group_mgr.groups[stack.id].threads
|
||||
self.assertEqual(expected, observed)
|
||||
self.stack.delete()
|
||||
|
||||
def test_periodic_watch_task_created_nested(self):
|
||||
self.m.StubOutWithMock(urlfetch, 'get')
|
||||
urlfetch.get('https://server.test/alarm.template').MultipleTimes().\
|
||||
AndReturn(alarm_template)
|
||||
self.m.ReplayAll()
|
||||
|
||||
stack = get_stack('period_watch_task_created_nested',
|
||||
utils.dummy_context(),
|
||||
nested_alarm_template)
|
||||
setup_keystone_mocks(self.m, stack)
|
||||
self.stack = stack
|
||||
self.m.ReplayAll()
|
||||
stack.store()
|
||||
stack.create()
|
||||
self.eng.thread_group_mgr.groups[stack.id] = DummyThreadGroup()
|
||||
self.eng.stack_watch.start_watch_task(stack.id, self.ctx)
|
||||
self.assertEqual([self.eng.stack_watch.periodic_watcher_task],
|
||||
self.eng.thread_group_mgr.groups[stack.id].threads)
|
||||
self.stack.delete()
|
||||
|
||||
@stack_context('service_show_watch_test_stack', False)
|
||||
def test_show_watch(self):
|
||||
# Insert two dummy watch rules into the DB
|
||||
|
110
heat/tests/test_engine_service_stack_watch.py
Normal file
110
heat/tests/test_engine_service_stack_watch.py
Normal file
@ -0,0 +1,110 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from heat.engine import service_stack_watch
|
||||
from heat.rpc import api as rpc_api
|
||||
from heat.tests import common
|
||||
from heat.tests import utils
|
||||
|
||||
|
||||
class StackServiceWatcherTest(common.HeatTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(StackServiceWatcherTest, self).setUp()
|
||||
|
||||
self.ctx = utils.dummy_context(tenant_id='stack_service_test_tenant')
|
||||
self.patch('heat.engine.service.warnings')
|
||||
|
||||
@mock.patch.object(service_stack_watch.db_api, 'stack_get_all_by_owner_id')
|
||||
@mock.patch.object(service_stack_watch.db_api,
|
||||
'watch_rule_get_all_by_stack')
|
||||
@mock.patch.object(service_stack_watch.db_api, 'watch_rule_update')
|
||||
def test_periodic_watch_task_not_created(self, watch_rule_update,
|
||||
watch_rule_get_all_by_stack,
|
||||
stack_get_all_by_owner_id):
|
||||
"""If there is no cloud watch lite alarm, then don't create
|
||||
a periodic task for it.
|
||||
"""
|
||||
stack_id = 83
|
||||
watch_rule_get_all_by_stack.return_value = []
|
||||
stack_get_all_by_owner_id.return_value = []
|
||||
tg = mock.Mock()
|
||||
sw = service_stack_watch.StackWatch(tg)
|
||||
sw.start_watch_task(stack_id, self.ctx)
|
||||
|
||||
# assert that add_timer is NOT called.
|
||||
self.assertEqual([], tg.add_timer.call_args_list)
|
||||
|
||||
@mock.patch.object(service_stack_watch.db_api, 'stack_get_all_by_owner_id')
|
||||
@mock.patch.object(service_stack_watch.db_api,
|
||||
'watch_rule_get_all_by_stack')
|
||||
@mock.patch.object(service_stack_watch.db_api, 'watch_rule_update')
|
||||
def test_periodic_watch_task_created(self, watch_rule_update,
|
||||
watch_rule_get_all_by_stack,
|
||||
stack_get_all_by_owner_id):
|
||||
"""If there is no cloud watch lite alarm, then DO create
|
||||
a periodic task for it.
|
||||
"""
|
||||
stack_id = 86
|
||||
wr1 = mock.Mock()
|
||||
wr1.id = 4
|
||||
wr1.state = rpc_api.WATCH_STATE_NODATA
|
||||
|
||||
watch_rule_get_all_by_stack.return_value = [wr1]
|
||||
stack_get_all_by_owner_id.return_value = []
|
||||
tg = mock.Mock()
|
||||
sw = service_stack_watch.StackWatch(tg)
|
||||
sw.start_watch_task(stack_id, self.ctx)
|
||||
|
||||
# assert that add_timer IS called.
|
||||
self.assertEqual([mock.call(stack_id, sw.periodic_watcher_task,
|
||||
sid=stack_id)],
|
||||
tg.add_timer.call_args_list)
|
||||
|
||||
@mock.patch.object(service_stack_watch.db_api, 'stack_get_all_by_owner_id')
|
||||
@mock.patch.object(service_stack_watch.db_api,
|
||||
'watch_rule_get_all_by_stack')
|
||||
@mock.patch.object(service_stack_watch.db_api, 'watch_rule_update')
|
||||
def test_periodic_watch_task_created_nested(self, watch_rule_update,
|
||||
watch_rule_get_all_by_stack,
|
||||
stack_get_all_by_owner_id):
|
||||
stack_id = 90
|
||||
|
||||
def my_wr_get(cnxt, sid):
|
||||
if sid == stack_id:
|
||||
return []
|
||||
wr1 = mock.Mock()
|
||||
wr1.id = 4
|
||||
wr1.state = rpc_api.WATCH_STATE_NODATA
|
||||
return [wr1]
|
||||
|
||||
watch_rule_get_all_by_stack.side_effect = my_wr_get
|
||||
|
||||
def my_nested_get(cnxt, sid):
|
||||
if sid == stack_id:
|
||||
nested_stack = mock.Mock()
|
||||
nested_stack.id = 55
|
||||
return [nested_stack]
|
||||
return []
|
||||
|
||||
stack_get_all_by_owner_id.side_effect = my_nested_get
|
||||
tg = mock.Mock()
|
||||
sw = service_stack_watch.StackWatch(tg)
|
||||
sw.start_watch_task(stack_id, self.ctx)
|
||||
|
||||
# assert that add_timer IS called.
|
||||
self.assertEqual([mock.call(stack_id, sw.periodic_watcher_task,
|
||||
sid=stack_id)],
|
||||
tg.add_timer.call_args_list)
|
Loading…
Reference in New Issue
Block a user