Merge "Separate StackWatch out into it's own module"

This commit is contained in:
Jenkins 2014-12-19 14:20:52 +00:00 committed by Gerrit Code Review
commit caf6a1b060
4 changed files with 222 additions and 160 deletions

View File

@ -20,7 +20,6 @@ import eventlet
from oslo.config import cfg from oslo.config import cfg
from oslo import messaging from oslo import messaging
from oslo.serialization import jsonutils from oslo.serialization import jsonutils
from oslo.utils import timeutils
from osprofiler import profiler from osprofiler import profiler
import requests import requests
import six import six
@ -44,6 +43,7 @@ from heat.engine import event as evt
from heat.engine import parameter_groups from heat.engine import parameter_groups
from heat.engine import properties from heat.engine import properties
from heat.engine import resources from heat.engine import resources
from heat.engine import service_stack_watch
from heat.engine import stack as parser from heat.engine import stack as parser
from heat.engine import stack_lock from heat.engine import stack_lock
from heat.engine import template as templatem from heat.engine import template as templatem
@ -216,88 +216,6 @@ class ThreadGroupManager(object):
event.send(message) event.send(message)
class StackWatch(object):
def __init__(self, thread_group_mgr):
self.thread_group_mgr = thread_group_mgr
def start_watch_task(self, stack_id, cnxt):
def stack_has_a_watchrule(sid):
wrs = db_api.watch_rule_get_all_by_stack(cnxt, sid)
now = timeutils.utcnow()
start_watch_thread = False
for wr in wrs:
# reset the last_evaluated so we don't fire off alarms when
# the engine has not been running.
db_api.watch_rule_update(cnxt, wr.id, {'last_evaluated': now})
if wr.state != rpc_api.WATCH_STATE_CEILOMETER_CONTROLLED:
start_watch_thread = True
children = db_api.stack_get_all_by_owner_id(cnxt, sid)
for child in children:
if stack_has_a_watchrule(child.id):
start_watch_thread = True
return start_watch_thread
if stack_has_a_watchrule(stack_id):
self.thread_group_mgr.add_timer(
stack_id,
self.periodic_watcher_task,
sid=stack_id)
def check_stack_watches(self, sid):
# Retrieve the stored credentials & create context
# Require tenant_safe=False to the stack_get to defeat tenant
# scoping otherwise we fail to retrieve the stack
LOG.debug("Periodic watcher task for stack %s" % sid)
admin_context = context.get_admin_context()
db_stack = db_api.stack_get(admin_context, sid, tenant_safe=False,
eager_load=True)
if not db_stack:
LOG.error(_LE("Unable to retrieve stack %s for periodic task"),
sid)
return
stack = parser.Stack.load(admin_context, stack=db_stack,
use_stored_context=True)
# recurse into any nested stacks.
children = db_api.stack_get_all_by_owner_id(admin_context, sid)
for child in children:
self.check_stack_watches(child.id)
# Get all watchrules for this stack and evaluate them
try:
wrs = db_api.watch_rule_get_all_by_stack(admin_context, sid)
except Exception as ex:
LOG.warn(_LW('periodic_task db error watch rule removed? %(ex)s'),
ex)
return
def run_alarm_action(stack, actions, details):
for action in actions:
action(details=details)
for res in stack.itervalues():
res.metadata_update()
for wr in wrs:
rule = watchrule.WatchRule.load(stack.context, watch=wr)
actions = rule.evaluate()
if actions:
self.thread_group_mgr.start(sid, run_alarm_action, stack,
actions, rule.get_details())
def periodic_watcher_task(self, sid):
"""
Periodic task, created for each stack, triggers watch-rule
evaluation for all rules defined for the stack
sid = stack ID
"""
self.check_stack_watches(sid)
@profiler.trace_cls("rpc") @profiler.trace_cls("rpc")
class EngineListener(service.Service): class EngineListener(service.Service):
''' '''
@ -384,7 +302,8 @@ class EngineService(service.Service):
# so we need to create a ThreadGroupManager here for the periodic tasks # so we need to create a ThreadGroupManager here for the periodic tasks
if self.thread_group_mgr is None: if self.thread_group_mgr is None:
self.thread_group_mgr = ThreadGroupManager() self.thread_group_mgr = ThreadGroupManager()
self.stack_watch = StackWatch(self.thread_group_mgr) self.stack_watch = service_stack_watch.StackWatch(
self.thread_group_mgr)
# Create a periodic_watcher_task per-stack # Create a periodic_watcher_task per-stack
admin_context = context.get_admin_context() admin_context = context.get_admin_context()

View File

@ -0,0 +1,107 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.utils import timeutils
from heat.common import context
from heat.common.i18n import _LE
from heat.common.i18n import _LW
from heat.db import api as db_api
from heat.engine import stack
from heat.engine import watchrule
from heat.openstack.common import log as logging
from heat.rpc import api as rpc_api
LOG = logging.getLogger(__name__)
class StackWatch(object):
def __init__(self, thread_group_mgr):
self.thread_group_mgr = thread_group_mgr
def start_watch_task(self, stack_id, cnxt):
def stack_has_a_watchrule(sid):
wrs = db_api.watch_rule_get_all_by_stack(cnxt, sid)
now = timeutils.utcnow()
start_watch_thread = False
for wr in wrs:
# reset the last_evaluated so we don't fire off alarms when
# the engine has not been running.
db_api.watch_rule_update(cnxt, wr.id, {'last_evaluated': now})
if wr.state != rpc_api.WATCH_STATE_CEILOMETER_CONTROLLED:
start_watch_thread = True
children = db_api.stack_get_all_by_owner_id(cnxt, sid)
for child in children:
if stack_has_a_watchrule(child.id):
start_watch_thread = True
return start_watch_thread
if stack_has_a_watchrule(stack_id):
self.thread_group_mgr.add_timer(
stack_id,
self.periodic_watcher_task,
sid=stack_id)
def check_stack_watches(self, sid):
# Retrieve the stored credentials & create context
# Require tenant_safe=False to the stack_get to defeat tenant
# scoping otherwise we fail to retrieve the stack
LOG.debug("Periodic watcher task for stack %s" % sid)
admin_context = context.get_admin_context()
db_stack = db_api.stack_get(admin_context, sid, tenant_safe=False,
eager_load=True)
if not db_stack:
LOG.error(_LE("Unable to retrieve stack %s for periodic task"),
sid)
return
stk = stack.Stack.load(admin_context, stack=db_stack,
use_stored_context=True)
# recurse into any nested stacks.
children = db_api.stack_get_all_by_owner_id(admin_context, sid)
for child in children:
self.check_stack_watches(child.id)
# Get all watchrules for this stack and evaluate them
try:
wrs = db_api.watch_rule_get_all_by_stack(admin_context, sid)
except Exception as ex:
LOG.warn(_LW('periodic_task db error watch rule removed? %(ex)s'),
ex)
return
def run_alarm_action(stk, actions, details):
for action in actions:
action(details=details)
for res in stk.itervalues():
res.metadata_update()
for wr in wrs:
rule = watchrule.WatchRule.load(stk.context, watch=wr)
actions = rule.evaluate()
if actions:
self.thread_group_mgr.start(sid, run_alarm_action, stk,
actions, rule.get_details())
def periodic_watcher_task(self, sid):
"""
Periodic task, created for each stack, triggers watch-rule
evaluation for all rules defined for the stack
sid = stack ID
"""
self.check_stack_watches(sid)

View File

@ -31,7 +31,6 @@ import six
from heat.common import exception from heat.common import exception
from heat.common import identifier from heat.common import identifier
from heat.common import template_format from heat.common import template_format
from heat.common import urlfetch
from heat.db import api as db_api from heat.db import api as db_api
from heat.engine.clients.os import glance from heat.engine.clients.os import glance
from heat.engine.clients.os import keystone from heat.engine.clients.os import keystone
@ -42,6 +41,7 @@ from heat.engine import properties
from heat.engine import resource as res from heat.engine import resource as res
from heat.engine.resources import instance as instances from heat.engine.resources import instance as instances
from heat.engine import service from heat.engine import service
from heat.engine import service_stack_watch
from heat.engine import stack as parser from heat.engine import stack as parser
from heat.engine import stack_lock from heat.engine import stack_lock
from heat.engine import template as templatem from heat.engine import template as templatem
@ -106,38 +106,6 @@ wp_template_no_default = '''
} }
''' '''
nested_alarm_template = '''
HeatTemplateFormatVersion: '2012-12-12'
Resources:
the_nested:
Type: AWS::CloudFormation::Stack
Properties:
TemplateURL: https://server.test/alarm.template
'''
alarm_template = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "alarming",
"Resources" : {
"service_alarm": {
"Type": "AWS::CloudWatch::Alarm",
"Properties": {
"EvaluationPeriods": "1",
"AlarmActions": [],
"AlarmDescription": "do the thing",
"Namespace": "dev/null",
"Period": "300",
"ComparisonOperator": "GreaterThanThreshold",
"Statistic": "SampleCount",
"Threshold": "2",
"MetricName": "ServiceFailure"
}
}
}
}
'''
policy_template = ''' policy_template = '''
{ {
"AWSTemplateFormatVersion" : "2010-09-09", "AWSTemplateFormatVersion" : "2010-09-09",
@ -1633,7 +1601,7 @@ class StackServiceTest(common.HeatTestCase):
res._register_class('ResourceWithPropsType', res._register_class('ResourceWithPropsType',
generic_rsrc.ResourceWithProps) generic_rsrc.ResourceWithProps)
@mock.patch.object(service.StackWatch, 'start_watch_task') @mock.patch.object(service_stack_watch.StackWatch, 'start_watch_task')
@mock.patch.object(service.db_api, 'stack_get_all') @mock.patch.object(service.db_api, 'stack_get_all')
@mock.patch.object(service.service.Service, 'start') @mock.patch.object(service.service.Service, 'start')
def test_start_watches_all_stacks(self, mock_super_start, mock_get_all, def test_start_watches_all_stacks(self, mock_super_start, mock_get_all,
@ -2633,48 +2601,6 @@ class StackServiceTest(common.HeatTestCase):
self.m.VerifyAll() self.m.VerifyAll()
@stack_context('periodic_watch_task_not_created')
def test_periodic_watch_task_not_created(self):
self.eng.thread_group_mgr.groups[self.stack.id] = DummyThreadGroup()
self.eng.stack_watch.start_watch_task(self.stack.id, self.ctx)
self.assertEqual(
[], self.eng.thread_group_mgr.groups[self.stack.id].threads)
def test_periodic_watch_task_created(self):
stack = get_stack('period_watch_task_created',
utils.dummy_context(),
alarm_template)
self.stack = stack
self.m.ReplayAll()
stack.store()
stack.create()
self.eng.thread_group_mgr.groups[stack.id] = DummyThreadGroup()
self.eng.stack_watch.start_watch_task(stack.id, self.ctx)
expected = [self.eng.stack_watch.periodic_watcher_task]
observed = self.eng.thread_group_mgr.groups[stack.id].threads
self.assertEqual(expected, observed)
self.stack.delete()
def test_periodic_watch_task_created_nested(self):
self.m.StubOutWithMock(urlfetch, 'get')
urlfetch.get('https://server.test/alarm.template').MultipleTimes().\
AndReturn(alarm_template)
self.m.ReplayAll()
stack = get_stack('period_watch_task_created_nested',
utils.dummy_context(),
nested_alarm_template)
setup_keystone_mocks(self.m, stack)
self.stack = stack
self.m.ReplayAll()
stack.store()
stack.create()
self.eng.thread_group_mgr.groups[stack.id] = DummyThreadGroup()
self.eng.stack_watch.start_watch_task(stack.id, self.ctx)
self.assertEqual([self.eng.stack_watch.periodic_watcher_task],
self.eng.thread_group_mgr.groups[stack.id].threads)
self.stack.delete()
@stack_context('service_show_watch_test_stack', False) @stack_context('service_show_watch_test_stack', False)
def test_show_watch(self): def test_show_watch(self):
# Insert two dummy watch rules into the DB # Insert two dummy watch rules into the DB

View File

@ -0,0 +1,110 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from heat.engine import service_stack_watch
from heat.rpc import api as rpc_api
from heat.tests import common
from heat.tests import utils
class StackServiceWatcherTest(common.HeatTestCase):
def setUp(self):
super(StackServiceWatcherTest, self).setUp()
self.ctx = utils.dummy_context(tenant_id='stack_service_test_tenant')
self.patch('heat.engine.service.warnings')
@mock.patch.object(service_stack_watch.db_api, 'stack_get_all_by_owner_id')
@mock.patch.object(service_stack_watch.db_api,
'watch_rule_get_all_by_stack')
@mock.patch.object(service_stack_watch.db_api, 'watch_rule_update')
def test_periodic_watch_task_not_created(self, watch_rule_update,
watch_rule_get_all_by_stack,
stack_get_all_by_owner_id):
"""If there is no cloud watch lite alarm, then don't create
a periodic task for it.
"""
stack_id = 83
watch_rule_get_all_by_stack.return_value = []
stack_get_all_by_owner_id.return_value = []
tg = mock.Mock()
sw = service_stack_watch.StackWatch(tg)
sw.start_watch_task(stack_id, self.ctx)
# assert that add_timer is NOT called.
self.assertEqual([], tg.add_timer.call_args_list)
@mock.patch.object(service_stack_watch.db_api, 'stack_get_all_by_owner_id')
@mock.patch.object(service_stack_watch.db_api,
'watch_rule_get_all_by_stack')
@mock.patch.object(service_stack_watch.db_api, 'watch_rule_update')
def test_periodic_watch_task_created(self, watch_rule_update,
watch_rule_get_all_by_stack,
stack_get_all_by_owner_id):
"""If there is no cloud watch lite alarm, then DO create
a periodic task for it.
"""
stack_id = 86
wr1 = mock.Mock()
wr1.id = 4
wr1.state = rpc_api.WATCH_STATE_NODATA
watch_rule_get_all_by_stack.return_value = [wr1]
stack_get_all_by_owner_id.return_value = []
tg = mock.Mock()
sw = service_stack_watch.StackWatch(tg)
sw.start_watch_task(stack_id, self.ctx)
# assert that add_timer IS called.
self.assertEqual([mock.call(stack_id, sw.periodic_watcher_task,
sid=stack_id)],
tg.add_timer.call_args_list)
@mock.patch.object(service_stack_watch.db_api, 'stack_get_all_by_owner_id')
@mock.patch.object(service_stack_watch.db_api,
'watch_rule_get_all_by_stack')
@mock.patch.object(service_stack_watch.db_api, 'watch_rule_update')
def test_periodic_watch_task_created_nested(self, watch_rule_update,
watch_rule_get_all_by_stack,
stack_get_all_by_owner_id):
stack_id = 90
def my_wr_get(cnxt, sid):
if sid == stack_id:
return []
wr1 = mock.Mock()
wr1.id = 4
wr1.state = rpc_api.WATCH_STATE_NODATA
return [wr1]
watch_rule_get_all_by_stack.side_effect = my_wr_get
def my_nested_get(cnxt, sid):
if sid == stack_id:
nested_stack = mock.Mock()
nested_stack.id = 55
return [nested_stack]
return []
stack_get_all_by_owner_id.side_effect = my_nested_get
tg = mock.Mock()
sw = service_stack_watch.StackWatch(tg)
sw.start_watch_task(stack_id, self.ctx)
# assert that add_timer IS called.
self.assertEqual([mock.call(stack_id, sw.periodic_watcher_task,
sid=stack_id)],
tg.add_timer.call_args_list)