Remove stack watch service

This removes the rpc api and related code.

Change-Id: Ib89bcc3ff6a542f49467e2ad6c7e2a716a0dc2b4
Partial-Bug: #1743707
changes/62/534662/5
rabi 5 years ago
parent 5bd856627a
commit 8db1b3ea41
  1. 1
      heat/api/aws/exception.py
  2. 4
      heat/cmd/engine.py
  3. 3
      heat/common/config.py
  4. 5
      heat/common/exception.py
  5. 3
      heat/engine/resource.py
  6. 25
      heat/engine/resources/openstack/aodh/alarm.py
  7. 182
      heat/engine/resources/openstack/heat/cloud_watch.py
  8. 143
      heat/engine/service.py
  9. 109
      heat/engine/service_stack_watch.py
  10. 396
      heat/engine/watchrule.py
  11. 60
      heat/objects/watch_data.py
  12. 87
      heat/objects/watch_rule.py
  13. 60
      heat/rpc/api.py
  14. 54
      heat/rpc/client.py
  15. 270
      heat/tests/engine/service/test_stack_watch.py
  16. 60
      heat/tests/openstack/aodh/test_alarm.py
  17. 120
      heat/tests/openstack/heat/test_cloudwatch.py
  18. 161
      heat/tests/openstack/heat/test_cw_alarm.py
  19. 118
      heat/tests/test_engine_service_stack_watch.py
  20. 1
      heat/tests/test_metadata_refresh.py
  21. 17
      heat/tests/test_rpc_client.py
  22. 7
      heat/tests/test_signal.py
  23. 978
      heat/tests/test_watch.py

@ -298,7 +298,6 @@ def map_remote_error(ex):
'ResourceActionNotSupported',
'ResourceNotFound',
'ResourceNotAvailable',
'WatchRuleNotFound',
'StackValidationFailed',
'InvalidSchemaError',
'InvalidTemplateReference',

@ -74,10 +74,6 @@ def launch_engine(setup_logging=True):
launcher = service.launch(cfg.CONF, srv, workers=workers,
restart_method='mutate')
if cfg.CONF.enable_cloud_watch_lite:
# We create the periodic tasks here, which mean they are created
# only in the parent process when num_engine_workers>1 is specified
srv.create_periodic_tasks()
return launcher

@ -182,6 +182,9 @@ engine_opts = [
' for stack locking.')),
cfg.BoolOpt('enable_cloud_watch_lite',
default=False,
deprecated_for_removal=True,
deprecated_reason='Heat CloudWatch Service has been removed.',
deprecated_since='10.0.0',
help=_('Enable the legacy OS::Heat::CWLiteAlarm resource.')),
cfg.BoolOpt('enable_stack_abandon',
default=False,

@ -304,11 +304,6 @@ class ClientNotAvailable(HeatException):
msg_fmt = _("The client (%(client_name)s) is not available.")
class WatchRuleNotFound(EntityNotFound):
"""Keep this for AWS compatibility."""
msg_fmt = _("The Watch Rule (%(watch_name)s) could not be found.")
class ResourceFailure(HeatExceptionWithPath):
def __init__(self, exception_or_error, resource, action=None):
self.resource = resource

@ -2449,9 +2449,6 @@ class Resource(status.ResourceStatus):
# this is from Ceilometer.
auto = '%(previous)s to %(current)s (%(reason)s)' % details
return 'alarm state changed from %s' % auto
elif 'state' in details:
# this is from watchrule
return 'alarm state changed to %(state)s' % details
return 'Unknown'

@ -13,14 +13,12 @@
import six
from heat.common import exception
from heat.common.i18n import _
from heat.engine import constraints
from heat.engine import properties
from heat.engine.resources import alarm_base
from heat.engine.resources.openstack.heat import none_resource
from heat.engine import support
from heat.engine import watchrule
class AodhAlarm(alarm_base.BaseAlarm):
@ -178,17 +176,6 @@ class AodhAlarm(alarm_base.BaseAlarm):
alarm = self.client().alarm.create(props)
self.resource_id_set(alarm['alarm_id'])
# the watchrule below is for backwards compatibility.
# 1) so we don't create watch tasks unnecessarily
# 2) to support CW stats post, we will redirect the request
# to ceilometer.
wr = watchrule.WatchRule(context=self.context,
watch_name=self.physical_resource_name(),
rule=dict(self.properties),
stack_id=self.stack.id)
wr.state = wr.CEILOMETER_CONTROLLED
wr.store()
def handle_update(self, json_snippet, tmpl_diff, prop_diff):
if prop_diff:
new_props = json_snippet.properties(self.properties_schema,
@ -209,19 +196,7 @@ class AodhAlarm(alarm_base.BaseAlarm):
return record_reality
def handle_delete(self):
try:
wr = watchrule.WatchRule.load(
self.context, watch_name=self.physical_resource_name())
wr.destroy()
except exception.EntityNotFound:
pass
return super(AodhAlarm, self).handle_delete()
def handle_check(self):
watch_name = self.physical_resource_name()
watchrule.WatchRule.load(self.context, watch_name=watch_name)
self.client().alarm.get(self.resource_id)

@ -13,193 +13,23 @@
from oslo_config import cfg
from heat.common import exception
from heat.common.i18n import _
from heat.engine import constraints
from heat.engine import properties
from heat.engine import resource
from heat.engine.resources.openstack.heat import none_resource
from heat.engine import support
from heat.engine import watchrule
class CloudWatchAlarm(resource.Resource):
PROPERTIES = (
COMPARISON_OPERATOR, ALARM_DESCRIPTION, EVALUATION_PERIODS,
METRIC_NAME, NAMESPACE, PERIOD, STATISTIC, ALARM_ACTIONS,
OKACTIONS, DIMENSIONS, INSUFFICIENT_DATA_ACTIONS, THRESHOLD,
UNITS,
) = (
'ComparisonOperator', 'AlarmDescription', 'EvaluationPeriods',
'MetricName', 'Namespace', 'Period', 'Statistic', 'AlarmActions',
'OKActions', 'Dimensions', 'InsufficientDataActions', 'Threshold',
'Units',
)
properties_schema = {
COMPARISON_OPERATOR: properties.Schema(
properties.Schema.STRING,
_('Operator used to compare the specified Statistic with '
'Threshold.'),
constraints=[
constraints.AllowedValues(['GreaterThanOrEqualToThreshold',
'GreaterThanThreshold',
'LessThanThreshold',
'LessThanOrEqualToThreshold']),
],
required=True,
update_allowed=True
),
ALARM_DESCRIPTION: properties.Schema(
properties.Schema.STRING,
_('Description for the alarm.'),
update_allowed=True
),
EVALUATION_PERIODS: properties.Schema(
properties.Schema.STRING,
_('Number of periods to evaluate over.'),
required=True,
update_allowed=True
),
METRIC_NAME: properties.Schema(
properties.Schema.STRING,
_('Metric name watched by the alarm.'),
required=True
),
NAMESPACE: properties.Schema(
properties.Schema.STRING,
_('Namespace for the metric.'),
required=True
),
PERIOD: properties.Schema(
properties.Schema.STRING,
_('Period (seconds) to evaluate over.'),
required=True,
update_allowed=True
),
STATISTIC: properties.Schema(
properties.Schema.STRING,
_('Metric statistic to evaluate.'),
constraints=[
constraints.AllowedValues(['SampleCount', 'Average', 'Sum',
'Minimum', 'Maximum']),
],
required=True,
update_allowed=True
),
ALARM_ACTIONS: properties.Schema(
properties.Schema.LIST,
_('A list of actions to execute when state transitions to alarm.'),
update_allowed=True
),
OKACTIONS: properties.Schema(
properties.Schema.LIST,
_('A list of actions to execute when state transitions to ok.'),
update_allowed=True
),
DIMENSIONS: properties.Schema(
properties.Schema.LIST,
_('A list of dimensions (arbitrary name/value pairs) associated '
'with the metric.')
),
INSUFFICIENT_DATA_ACTIONS: properties.Schema(
properties.Schema.LIST,
_('A list of actions to execute when state transitions to '
'insufficient-data.'),
update_allowed=True
),
THRESHOLD: properties.Schema(
properties.Schema.STRING,
_('Threshold to evaluate against.'),
required=True,
update_allowed=True
),
UNITS: properties.Schema(
properties.Schema.STRING,
_('Unit for the metric.'),
constraints=[
constraints.AllowedValues(['Seconds', 'Microseconds',
'Milliseconds', 'Bytes',
'Kilobytes', 'Megabytes',
'Gigabytes', 'Terabytes', 'Bits',
'Kilobits', 'Megabits',
'Gigabits', 'Terabits', 'Percent',
'Count', 'Bytes/Second',
'Kilobytes/Second',
'Megabytes/Second',
'Gigabytes/Second',
'Terabytes/Second', 'Bits/Second',
'Kilobits/Second',
'Megabits/Second',
'Gigabits/Second',
'Terabits/Second', 'Count/Second',
None]),
],
update_allowed=True
),
}
strict_dependency = False
class CloudWatchAlarm(none_resource.NoneResource):
support_status = support.SupportStatus(
status=support.HIDDEN,
message=_('OS::Heat::CWLiteAlarm is deprecated, '
'use OS::Aodh::Alarm instead.'),
message=_('OS::Heat::CWLiteAlarm resource has been removed '
'since version 10.0.0. Existing stacks can still '
'use it, where it would do nothing for update/delete.'),
version='5.0.0',
previous_status=support.SupportStatus(
status=support.DEPRECATED,
version='2014.2'
)
version='2014.2')
)
def handle_create(self):
wr = watchrule.WatchRule(context=self.context,
watch_name=self.physical_resource_name(),
rule=dict(self.properties),
stack_id=self.stack.id)
wr.store()
def handle_update(self, json_snippet, tmpl_diff, prop_diff):
# If Properties has changed, update self.properties, so we
# get the new values during any subsequent adjustment
if prop_diff:
self.properties = json_snippet.properties(self.properties_schema,
self.context)
loader = watchrule.WatchRule.load
wr = loader(self.context,
watch_name=self.physical_resource_name())
wr.rule = dict(self.properties)
wr.store()
def handle_delete(self):
try:
wr = watchrule.WatchRule.load(
self.context, watch_name=self.physical_resource_name())
wr.destroy()
except exception.EntityNotFound:
pass
def handle_suspend(self):
wr = watchrule.WatchRule.load(self.context,
watch_name=self.physical_resource_name())
wr.state_set(wr.SUSPENDED)
def handle_resume(self):
wr = watchrule.WatchRule.load(self.context,
watch_name=self.physical_resource_name())
# Just set to NODATA, which will be re-evaluated next periodic task
wr.state_set(wr.NODATA)
def handle_check(self):
watch_name = self.physical_resource_name()
watchrule.WatchRule.load(self.context, watch_name=watch_name)
def get_reference_id(self):
return self.physical_resource_name_or_FnGetRefId()
def physical_resource_name(self):
return '%s-%s' % (self.stack.name, self.name)
def resource_mapping():
cfg.CONF.import_opt('enable_cloud_watch_lite', 'heat.common.config')

@ -15,7 +15,6 @@ import collections
import datetime
import functools
import itertools
import os
import pydoc
import socket
@ -52,22 +51,18 @@ from heat.engine import parameter_groups
from heat.engine import properties
from heat.engine import resources
from heat.engine import service_software_config
from heat.engine import service_stack_watch
from heat.engine import stack as parser
from heat.engine import stack_lock
from heat.engine import stk_defn
from heat.engine import support
from heat.engine import template as templatem
from heat.engine import update
from heat.engine import watchrule
from heat.engine import worker
from heat.objects import event as event_object
from heat.objects import resource as resource_objects
from heat.objects import service as service_objects
from heat.objects import snapshot as snapshot_object
from heat.objects import stack as stack_object
from heat.objects import watch_data
from heat.objects import watch_rule
from heat.rpc import api as rpc_api
from heat.rpc import worker_api as rpc_worker_api
@ -322,7 +317,6 @@ class EngineService(service.ServiceBase):
# The following are initialized here, but assigned in start() which
# happens after the fork when spawning multiple worker processes
self.stack_watch = None
self.listener = None
self.worker_service = None
self.engine_id = None
@ -341,35 +335,6 @@ class EngineService(service.ServiceBase):
'Please keep the same if you do not want to '
'delegate subset roles when upgrading.')
def create_periodic_tasks(self):
LOG.debug("Starting periodic watch tasks pid=%s", os.getpid())
# Note with multiple workers, the parent process hasn't called start()
# so we need to create a ThreadGroupManager here for the periodic tasks
if self.thread_group_mgr is None:
self.thread_group_mgr = ThreadGroupManager()
self.stack_watch = service_stack_watch.StackWatch(
self.thread_group_mgr)
def create_watch_tasks():
while True:
try:
# Create a periodic_watcher_task per-stack
admin_context = context.get_admin_context()
stacks = stack_object.Stack.get_all(
admin_context,
show_hidden=True)
for s in stacks:
self.stack_watch.start_watch_task(s.id, admin_context)
LOG.info("Watch tasks created")
return
except Exception as e:
LOG.error("Watch task creation attempt failed, %s", e)
eventlet.sleep(5)
if self.manage_thread_grp is None:
self.manage_thread_grp = threadgroup.ThreadGroup()
self.manage_thread_grp.add_thread(create_watch_tasks)
def start(self):
self.engine_id = service_utils.generate_engine_id()
if self.thread_group_mgr is None:
@ -819,14 +784,6 @@ class EngineService(service.ServiceBase):
elif stack.status != stack.FAILED:
stack.create(msg_queue=msg_queue)
if (stack.action in (stack.CREATE, stack.ADOPT)
and stack.status == stack.COMPLETE):
if self.stack_watch:
# Schedule a periodic watcher task for this stack
self.stack_watch.start_watch_task(stack.id, cnxt)
else:
LOG.info("Stack create failed, status %s", stack.status)
convergence = cfg.CONF.convergence_engine
stack = self._parse_template_and_validate_stack(
@ -2173,106 +2130,6 @@ class EngineService(service.ServiceBase):
data = snapshot_object.Snapshot.get_all(cnxt, s.id)
return [api.format_snapshot(snapshot) for snapshot in data]
@context.request_context
def create_watch_data(self, cnxt, watch_name, stats_data):
"""Creates data for CloudWatch and WaitConditions.
This could be used by CloudWatch and WaitConditions
and treat HA service events like any other CloudWatch.
"""
def get_matching_watches():
if watch_name:
yield watchrule.WatchRule.load(cnxt, watch_name)
else:
for wr in watch_rule.WatchRule.get_all(cnxt):
if watchrule.rule_can_use_sample(wr, stats_data):
yield watchrule.WatchRule.load(cnxt, watch=wr)
rule_run = False
for rule in get_matching_watches():
rule.create_watch_data(stats_data)
rule_run = True
if not rule_run:
if watch_name is None:
watch_name = 'Unknown'
raise exception.EntityNotFound(entity='Watch Rule',
name=watch_name)
return stats_data
@context.request_context
def show_watch(self, cnxt, watch_name):
"""Return the attributes of one watch/alarm.
:param cnxt: RPC context.
:param watch_name: Name of the watch you want to see, or None to see
all.
"""
if watch_name:
wrn = [watch_name]
else:
try:
wrn = [w.name for w in watch_rule.WatchRule.get_all(cnxt)]
except Exception as ex:
LOG.warning('show_watch (all) db error %s', ex)
return
wrs = [watchrule.WatchRule.load(cnxt, w) for w in wrn]
result = [api.format_watch(w) for w in wrs]
return result
@context.request_context
def show_watch_metric(self, cnxt, metric_namespace=None, metric_name=None):
"""Return the datapoints for a metric.
:param cnxt: RPC context.
:param metric_namespace: Name of the namespace you want to see, or None
to see all.
:param metric_name: Name of the metric you want to see, or None to see
all.
"""
# DB API and schema does not yet allow us to easily query by
# namespace/metric, but we will want this at some point
# for now, the API can query all metric data and filter locally
if metric_namespace is not None or metric_name is not None:
LOG.error("Filtering by namespace/metric not yet supported")
return
try:
wds = watch_data.WatchData.get_all(cnxt)
rule_names = {
r.id: r.name for r in watch_rule.WatchRule.get_all(cnxt)
}
except Exception as ex:
LOG.warning('show_metric (all) db error %s', ex)
return
result = [api.format_watch_data(w, rule_names) for w in wds]
return result
@context.request_context
def set_watch_state(self, cnxt, watch_name, state):
"""Temporarily set the state of a given watch.
:param cnxt: RPC context.
:param watch_name: Name of the watch.
:param state: State (must be one defined in WatchRule class.
"""
wr = watchrule.WatchRule.load(cnxt, watch_name)
if wr.state == rpc_api.WATCH_STATE_CEILOMETER_CONTROLLED:
return
actions = wr.set_watch_state(state)
for action in actions:
self.thread_group_mgr.start(wr.stack_id, action)
# Return the watch with the state overridden to indicate success
# We do not update the timestamps as we are not modifying the DB
result = api.format_watch(wr)
result[rpc_api.WATCH_STATE_VALUE] = state
return result
@context.request_context
def show_software_config(self, cnxt, config_id):
return self.software_config.show_software_config(cnxt, config_id)

@ -1,109 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from oslo_utils import timeutils
from heat.common import context
from heat.engine import stack
from heat.engine import stk_defn
from heat.engine import watchrule
from heat.objects import stack as stack_object
from heat.objects import watch_rule as watch_rule_object
from heat.rpc import api as rpc_api
LOG = logging.getLogger(__name__)
class StackWatch(object):
def __init__(self, thread_group_mgr):
self.thread_group_mgr = thread_group_mgr
def start_watch_task(self, stack_id, cnxt):
def stack_has_a_watchrule(sid):
wrs = watch_rule_object.WatchRule.get_all_by_stack(cnxt, sid)
now = timeutils.utcnow()
start_watch_thread = False
for wr in wrs:
# reset the last_evaluated so we don't fire off alarms when
# the engine has not been running.
watch_rule_object.WatchRule.update_by_id(
cnxt, wr.id,
{'last_evaluated': now})
if wr.state != rpc_api.WATCH_STATE_CEILOMETER_CONTROLLED:
start_watch_thread = True
children = stack_object.Stack.get_all_by_owner_id(cnxt, sid)
for child in children:
if stack_has_a_watchrule(child.id):
start_watch_thread = True
return start_watch_thread
if stack_has_a_watchrule(stack_id):
self.thread_group_mgr.add_timer(
stack_id,
self.periodic_watcher_task,
sid=stack_id)
def check_stack_watches(self, sid):
# Use admin_context for stack_get to defeat tenant
# scoping otherwise we fail to retrieve the stack
LOG.debug("Periodic watcher task for stack %s", sid)
admin_context = context.get_admin_context()
db_stack = stack_object.Stack.get_by_id(admin_context,
sid)
if not db_stack:
LOG.error("Unable to retrieve stack %s for periodic task", sid)
return
stk = stack.Stack.load(admin_context, stack=db_stack,
use_stored_context=True)
# recurse into any nested stacks.
children = stack_object.Stack.get_all_by_owner_id(admin_context, sid)
for child in children:
self.check_stack_watches(child.id)
# Get all watchrules for this stack and evaluate them
try:
wrs = watch_rule_object.WatchRule.get_all_by_stack(admin_context,
sid)
except Exception as ex:
LOG.warning('periodic_task db error watch rule removed? %s', ex)
return
def run_alarm_action(stk, actions, details):
for action in actions:
action(details=details)
for res in stk._explicit_dependencies():
res.metadata_update()
stk_defn.update_resource_data(stk.defn, res.name,
res.node_data())
for wr in wrs:
rule = watchrule.WatchRule.load(stk.context, watch=wr)
actions = rule.evaluate()
if actions:
self.thread_group_mgr.start(sid, run_alarm_action, stk,
actions, rule.get_details())
def periodic_watcher_task(self, sid):
"""Evaluate all watch-rules defined for stack ID.
Periodic task, created for each stack, triggers watch-rule evaluation
for all rules defined for the stack sid = stack ID.
"""
self.check_stack_watches(sid)

@ -1,396 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from oslo_log import log as logging
from oslo_utils import timeutils
from heat.common import exception
from heat.common.i18n import _
from heat.engine import stack
from heat.engine import timestamp
from heat.objects import stack as stack_object
from heat.objects import watch_data as watch_data_objects
from heat.objects import watch_rule as watch_rule_objects
from heat.rpc import api as rpc_api
LOG = logging.getLogger(__name__)
class WatchRule(object):
WATCH_STATES = (
ALARM,
NORMAL,
NODATA,
SUSPENDED,
CEILOMETER_CONTROLLED,
) = (
rpc_api.WATCH_STATE_ALARM,
rpc_api.WATCH_STATE_OK,
rpc_api.WATCH_STATE_NODATA,
rpc_api.WATCH_STATE_SUSPENDED,
rpc_api.WATCH_STATE_CEILOMETER_CONTROLLED,
)
ACTION_MAP = {ALARM: 'AlarmActions',
NORMAL: 'OKActions',
NODATA: 'InsufficientDataActions'}
created_at = timestamp.Timestamp(watch_rule_objects.WatchRule.get_by_id,
'created_at')
updated_at = timestamp.Timestamp(watch_rule_objects.WatchRule.get_by_id,
'updated_at')
def __init__(self, context, watch_name, rule, stack_id=None,
state=NODATA, wid=None, watch_data=None,
last_evaluated=None):
self.context = context
self.now = timeutils.utcnow()
self.name = watch_name
self.state = state
self.rule = rule
self.stack_id = stack_id
period = 0
if 'Period' in rule:
period = int(rule['Period'])
elif 'period' in rule:
period = int(rule['period'])
self.timeperiod = datetime.timedelta(seconds=period)
self.id = wid
self.watch_data = watch_data or []
self.last_evaluated = last_evaluated or timeutils.utcnow()
@classmethod
def load(cls, context, watch_name=None, watch=None):
"""Load the watchrule object.
The object can be loaded either from the DB by name or from an existing
DB object.
"""
if watch is None:
try:
watch = watch_rule_objects.WatchRule.get_by_name(context,
watch_name)
except Exception as ex:
LOG.warning('WatchRule.load (%(watch_name)s) db error %(ex)s',
{'watch_name': watch_name, 'ex': ex})
if watch is None:
raise exception.EntityNotFound(entity='Watch Rule',
name=watch_name)
else:
return cls(context=context,
watch_name=watch.name,
rule=watch.rule,
stack_id=watch.stack_id,
state=watch.state,
wid=watch.id,
watch_data=watch.watch_data,
last_evaluated=watch.last_evaluated)
def store(self):
"""Store the watchrule in the database and return its ID.
If self.id is set, we update the existing rule.
"""
wr_values = {
'name': self.name,
'rule': self.rule,
'state': self.state,
'stack_id': self.stack_id
}
if self.id is None:
wr = watch_rule_objects.WatchRule.create(self.context, wr_values)
self.id = wr.id
else:
watch_rule_objects.WatchRule.update_by_id(self.context, self.id,
wr_values)
def destroy(self):
"""Delete the watchrule from the database."""
if self.id is not None:
watch_rule_objects.WatchRule.delete(self.context, self.id)
def do_data_cmp(self, data, threshold):
op = self.rule['ComparisonOperator']
if op == 'GreaterThanThreshold':
return data > threshold
elif op == 'GreaterThanOrEqualToThreshold':
return data >= threshold
elif op == 'LessThanThreshold':
return data < threshold
elif op == 'LessThanOrEqualToThreshold':
return data <= threshold
else:
return False
def do_Maximum(self):
data = 0
have_data = False
for d in self.watch_data:
if d.created_at < self.now - self.timeperiod:
continue
if not have_data:
data = float(d.data[self.rule['MetricName']]['Value'])
have_data = True
if float(d.data[self.rule['MetricName']]['Value']) > data:
data = float(d.data[self.rule['MetricName']]['Value'])
if not have_data:
return self.NODATA
if self.do_data_cmp(data,
float(self.rule['Threshold'])):
return self.ALARM
else:
return self.NORMAL
def do_Minimum(self):
data = 0
have_data = False
for d in self.watch_data:
if d.created_at < self.now - self.timeperiod:
continue
if not have_data:
data = float(d.data[self.rule['MetricName']]['Value'])
have_data = True
elif float(d.data[self.rule['MetricName']]['Value']) < data:
data = float(d.data[self.rule['MetricName']]['Value'])
if not have_data:
return self.NODATA
if self.do_data_cmp(data,
float(self.rule['Threshold'])):
return self.ALARM
else:
return self.NORMAL
def do_SampleCount(self):
"""Count all samples within the specified period."""
data = 0
for d in self.watch_data:
if d.created_at < self.now - self.timeperiod:
continue
data = data + 1
if self.do_data_cmp(data,
float(self.rule['Threshold'])):
return self.ALARM
else:
return self.NORMAL
def do_Average(self):
data = 0
samples = 0
for d in self.watch_data:
if d.created_at < self.now - self.timeperiod:
continue
samples = samples + 1
data = data + float(d.data[self.rule['MetricName']]['Value'])
if samples == 0:
return self.NODATA
data = data / samples
if self.do_data_cmp(data,
float(self.rule['Threshold'])):
return self.ALARM
else:
return self.NORMAL
def do_Sum(self):
data = 0
for d in self.watch_data:
if d.created_at < self.now - self.timeperiod:
LOG.debug('ignoring %s', str(d.data))
continue
data = data + float(d.data[self.rule['MetricName']]['Value'])
if self.do_data_cmp(data,
float(self.rule['Threshold'])):
return self.ALARM
else:
return self.NORMAL
def get_alarm_state(self):
fn = getattr(self, 'do_%s' % self.rule['Statistic'])
return fn()
def evaluate(self):
if self.state in [self.CEILOMETER_CONTROLLED, self.SUSPENDED]:
return []
# has enough time progressed to run the rule
self.now = timeutils.utcnow()
if self.now < (self.last_evaluated + self.timeperiod):
return []
return self.run_rule()
def get_details(self):
return {'alarm': self.name,
'state': self.state}
def run_rule(self):
new_state = self.get_alarm_state()
actions = self.rule_actions(new_state)
self.state = new_state
self.last_evaluated = self.now
self.store()
return actions
def rule_actions(self, new_state):
LOG.info('WATCH: stack:%(stack)s, watch_name:%(watch_name)s, '
'new_state:%(new_state)s', {'stack': self.stack_id,
'watch_name': self.name,
'new_state': new_state})
actions = []
if self.ACTION_MAP[new_state] not in self.rule:
LOG.info('no action for new state %s', new_state)
else:
s = stack_object.Stack.get_by_id(
self.context,
self.stack_id)
stk = stack.Stack.load(self.context, stack=s)
if (stk.action != stk.DELETE
and stk.status == stk.COMPLETE):
for refid in self.rule[self.ACTION_MAP[new_state]]:
actions.append(stk.resource_by_refid(refid).signal)
else:
LOG.warning("Could not process watch state %s for stack",
new_state)
return actions
def _to_ceilometer(self, data):
clients = self.context.clients
sample = {}
sample['counter_type'] = 'gauge'
for k, d in iter(data.items()):
if k == 'Namespace':
continue
sample['counter_name'] = k
sample['counter_volume'] = d['Value']
sample['counter_unit'] = d['Unit']
dims = d.get('Dimensions', {})
if isinstance(dims, list):
dims = dims[0]
sample['resource_metadata'] = dims
sample['resource_id'] = dims.get('InstanceId')
LOG.debug('new sample:%(k)s data:%(sample)s', {
'k': k, 'sample': sample})
clients.client('ceilometer').samples.create(**sample)
def create_watch_data(self, data):
if self.state == self.CEILOMETER_CONTROLLED:
# this is a short term measure for those that have cfn-push-stats
# within their templates, but want to use Ceilometer alarms.
self._to_ceilometer(data)
return
if self.state == self.SUSPENDED:
LOG.debug('Ignoring metric data for %s, SUSPENDED state',
self.name)
return []
if self.rule['MetricName'] not in data:
# Our simplified cloudwatch implementation only expects a single
# Metric associated with each alarm, but some cfn-push-stats
# options, e.g --haproxy try to push multiple metrics when we
# actually only care about one (the one we're alarming on)
# so just ignore any data which doesn't contain MetricName
LOG.debug('Ignoring metric data (only accept %(metric)s) '
': %(data)s' % {'metric': self.rule['MetricName'],
'data': data})
return
watch_data = {
'data': data,
'watch_rule_id': self.id
}
wd = watch_data_objects.WatchData.create(self.context, watch_data)
LOG.debug('new watch:%(name)s data:%(data)s'
% {'name': self.name, 'data': str(wd.data)})
def state_set(self, state):
"""Persistently store the watch state."""
if state not in self.WATCH_STATES:
raise ValueError(_("Invalid watch state %s") % state)
self.state = state
self.store()
def set_watch_state(self, state):
"""Temporarily set the watch state.
:returns: list of functions to be scheduled in the stack ThreadGroup
for the specified state.
"""
if state not in self.WATCH_STATES:
raise ValueError(_('Unknown watch state %s') % state)
actions = []
if state != self.state:
actions = self.rule_actions(state)
if actions:
LOG.debug("Overriding state %(self_state)s for watch "
"%(name)s with %(state)s"
% {'self_state': self.state, 'name': self.name,
'state': state})
else:
LOG.warning("Unable to override state %(state)s for "
"watch %(name)s", {'state': self.state,
'name': self.name})
return actions
def rule_can_use_sample(wr, stats_data):
def match_dimesions(rule, data):
for k, v in iter(rule.items()):
if k not in data:
return False
elif v != data[k]:
return False
return True
if wr.state == WatchRule.SUSPENDED:
return False
if wr.state == WatchRule.CEILOMETER_CONTROLLED:
metric = wr.rule['meter_name']
rule_dims = {}
for k, v in iter(wr.rule.get('matching_metadata', {}).items()):
name = k.split('.')[-1]
rule_dims[name] = v
else:
metric = wr.rule['MetricName']
rule_dims = dict((d['Name'], d['Value'])
for d in wr.rule.get('Dimensions', []))
if metric not in stats_data:
return False
for k, v in iter(stats_data.items()):
if k == 'Namespace':
continue
if k == metric:
data_dims = v.get('Dimensions', {})
if isinstance(data_dims, list):
data_dims = data_dims[0]
if match_dimesions(rule_dims, data_dims):
return True
return False

@ -1,60 +0,0 @@
# Copyright 2014 Intel Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""WatchData object."""
from oslo_versionedobjects import base
from oslo_versionedobjects import fields
from heat.db.sqlalchemy import api as db_api
from heat.objects import base as heat_base
from heat.objects import fields as heat_fields
class WatchData(
heat_base.HeatObject,
base.VersionedObjectDictCompat,
):
fields = {
'id': fields.IntegerField(),
'data': heat_fields.JsonField(nullable=True),
'watch_rule_id': fields.StringField(),
'created_at': fields.DateTimeField(read_only=True),
'updated_at': fields.DateTimeField(nullable=True),
}
@staticmethod
def _from_db_object(context, rule, db_data):
for field in rule.fields:
rule[field] = db_data[field]
rule._context = context
rule.obj_reset_changes()
return rule
@classmethod
def create(cls, context, values):
db_data = db_api.watch_data_create(context, values)
return cls._from_db_object(context, cls(), db_data)
@classmethod
def get_all(cls, context):
return [cls._from_db_object(context, cls(), db_data)
for db_data in db_api.watch_data_get_all(context)]
@classmethod
def get_all_by_watch_rule_id(cls, context, watch_rule_id):
return (cls._from_db_object(context, cls(), db_data)
for db_data in db_api.watch_data_get_all_by_watch_rule_id(
context, watch_rule_id))

@ -1,87 +0,0 @@
# Copyright 2014 Intel Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""WatchRule object."""
from oslo_versionedobjects import base
from oslo_versionedobjects import fields
from heat.db.sqlalchemy import api as db_api
from heat.objects import base as heat_base
from heat.objects import fields as heat_fields
from heat.objects import watch_data
class WatchRule(
heat_base.HeatObject,
base.VersionedObjectDictCompat,
):
fields = {
'id': fields.IntegerField(),
'name': fields.StringField(nullable=True),
'rule': heat_fields.JsonField(nullable=True),
'state': fields.StringField(nullable=True),
'last_evaluated': fields.DateTimeField(nullable=True),
'stack_id': fields.StringField(),
'watch_data': fields.ListOfObjectsField(watch_data.WatchData),
'created_at': fields.DateTimeField(read_only=True),
'updated_at': fields.DateTimeField(nullable=True),
}
@staticmethod
def _from_db_object(context, rule, db_rule):
for field in rule.fields:
if field == 'watch_data':
rule[field] = watch_data.WatchData.get_all_by_watch_rule_id(
context, db_rule['id'])
else:
rule[field] = db_rule[field]
rule._context = context
rule.obj_reset_changes()
return rule
@classmethod
def get_by_id(cls, context, rule_id):
db_rule = db_api.watch_rule_get(context, rule_id)
return cls._from_db_object(context, cls(), db_rule)
@classmethod
def get_by_name(cls, context, watch_rule_name):
db_rule = db_api.watch_rule_get_by_name(context, watch_rule_name)
return cls._from_db_object(context, cls(), db_rule)
@classmethod
def get_all(cls, context):
return [cls._from_db_object(context, cls(), db_rule)
for db_rule in db_api.watch_rule_get_all(context)]
@classmethod
def get_all_by_stack(cls, context, stack_id):
return [cls._from_db_object(context, cls(), db_rule)
for db_rule in db_api.watch_rule_get_all_by_stack(context,
stack_id)]
@classmethod
def update_by_id(cls, context, watch_id, values):
db_api.watch_rule_update(context, watch_id, values)
@classmethod
def create(cls, context, values):
return cls._from_db_object(context, cls(),
db_api.watch_rule_create(context, values))
@classmethod
def delete(cls, context, watch_id):
db_api.watch_rule_delete(context, watch_id)

@ -127,66 +127,6 @@ NOTIFY_KEYS = (
STACK_TAGS,
)
# This is the representation of a watch we expose to the API via RPC
WATCH_KEYS = (
WATCH_ACTIONS_ENABLED, WATCH_ALARM_ACTIONS, WATCH_TOPIC,
WATCH_UPDATED_TIME, WATCH_DESCRIPTION, WATCH_NAME,
WATCH_COMPARISON, WATCH_DIMENSIONS, WATCH_PERIODS,
WATCH_INSUFFICIENT_ACTIONS, WATCH_METRIC_NAME, WATCH_NAMESPACE,
WATCH_OK_ACTIONS, WATCH_PERIOD, WATCH_STATE_REASON,
WATCH_STATE_REASON_DATA, WATCH_STATE_UPDATED_TIME, WATCH_STATE_VALUE,
WATCH_STATISTIC, WATCH_THRESHOLD, WATCH_UNIT, WATCH_STACK_ID,
) = (
'actions_enabled', 'actions', 'topic',
'updated_time', 'description', 'name',
'comparison', 'dimensions', 'periods',
'insufficient_actions', 'metric_name', 'namespace',
'ok_actions', 'period', 'state_reason',
'state_reason_data', 'state_updated_time', 'state_value',
'statistic', 'threshold', 'unit', 'stack_id',
)
# Alternate representation of a watch rule to align with DB format
# FIXME : These align with AWS naming for compatibility with the
# current cfn-push-stats & metadata server, fix when we've ported
# cfn-push-stats to use the Cloudwatch server and/or moved metric
# collection into ceilometer, these should just be WATCH_KEYS
# or each field should be stored separately in the DB watch_data
# table if we stick to storing watch data in the heat DB
WATCH_RULE_KEYS = (
RULE_ACTIONS_ENABLED, RULE_ALARM_ACTIONS, RULE_TOPIC,
RULE_UPDATED_TIME, RULE_DESCRIPTION, RULE_NAME,
RULE_COMPARISON, RULE_DIMENSIONS, RULE_PERIODS,
RULE_INSUFFICIENT_ACTIONS, RULE_METRIC_NAME, RULE_NAMESPACE,
RULE_OK_ACTIONS, RULE_PERIOD, RULE_STATE_REASON,
RULE_STATE_REASON_DATA, RULE_STATE_UPDATED_TIME, RULE_STATE_VALUE,
RULE_STATISTIC, RULE_THRESHOLD, RULE_UNIT, RULE_STACK_NAME,
) = (
'ActionsEnabled', 'AlarmActions', 'AlarmArn',
'AlarmConfigurationUpdatedTimestamp', 'AlarmDescription', 'AlarmName',
'ComparisonOperator', 'Dimensions', 'EvaluationPeriods',
'InsufficientDataActions', 'MetricName', 'Namespace',
'OKActions', 'Period', 'StateReason',
'StateReasonData', 'StateUpdatedTimestamp', 'StateValue',
'Statistic', 'Threshold', 'Unit', 'StackName',
)
WATCH_STATES = (
WATCH_STATE_OK, WATCH_STATE_ALARM, WATCH_STATE_NODATA,
WATCH_STATE_SUSPENDED, WATCH_STATE_CEILOMETER_CONTROLLED
) = (
'NORMAL', 'ALARM', 'NODATA',
'SUSPENDED', 'CEILOMETER_CONTROLLED'
)
WATCH_DATA_KEYS = (
WATCH_DATA_ALARM, WATCH_DATA_METRIC, WATCH_DATA_TIME,
WATCH_DATA_NAMESPACE, WATCH_DATA
) = (
'watch_name', 'metric_name', 'timestamp',
'namespace', 'data'
)
VALIDATE_PARAM_KEYS = (
PARAM_TYPE, PARAM_DEFAULT, PARAM_NO_ECHO,
PARAM_ALLOWED_VALUES, PARAM_ALLOWED_PATTERN, PARAM_MAX_LENGTH,

@ -676,60 +676,6 @@ class EngineClient(object):
resource_status_reason=resource_status_reason),
version='1.26')
def create_watch_data(self, ctxt, watch_name, stats_data):
"""Creates data for CloudWatch and WaitConditions.
This could be used by CloudWatch and WaitConditions and treat HA
service events like any other CloudWatch.
:param ctxt: RPC context.
:param watch_name: Name of the watch/alarm
:param stats_data: The data to post.
"""
return self.call(ctxt, self.make_msg('create_watch_data',
watch_name=watch_name,
stats_data=stats_data))
def show_watch(self, ctxt, watch_name):
"""Returns the attributes of one watch/alarm.
The show_watch method returns the attributes of one watch
or all watches if no watch_name is passed.
:param ctxt: RPC context.
:param watch_name: Name of the watch/alarm you want to see,
or None to see all
"""
return self.call(ctxt, self.make_msg('show_watch',
watch_name=watch_name))
def show_watch_metric(self, ctxt, metric_namespace=None, metric_name=None):
"""Returns the datapoints for a metric.
The show_watch_metric method returns the datapoints associated
with a specified metric, or all metrics if no metric_name is passed.
:param ctxt: RPC context.
:param metric_namespace: Name of the namespace you want to see,
or None to see all
:param metric_name: Name of the metric you want to see,
or None to see all
"""
return self.call(ctxt, self.make_msg('show_watch_metric',
metric_namespace=metric_namespace,
metric_name=metric_name))
def set_watch_state(self, ctxt, watch_name, state):
"""Temporarily set the state of a given watch.
:param ctxt: RPC context.
:param watch_name: Name of the watch
:param state: State (must be one defined in WatchRule class)
"""
return self.call(ctxt, self.make_msg('set_watch_state',
watch_name=watch_name,
state=state))
def get_revision(self, ctxt):
return self.call(ctxt, self.make_msg('get_revision'))

@ -1,270 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain