Add the basic cloudwatch feature

Fix watch db tables and silly programming errors.
get basic posting data to metadata server working
add watch_rule_get_all()
check for alarms in a periodic task
delete watch_data when the rule is deleted
add a last_evaluated field to the watch_rule
remove unused option to watch_data_get
take better account of the sample period.
- still much to be done here (evaluation periods).
add some useful stats to cfn-push-stats
fix how the metric is accessed
fix a divide by zero

Change-Id: Iaf98499d0e3ac6d6f951ea38b3b0f409669258da
Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
This commit is contained in:
Angus Salkeld 2012-05-25 19:24:29 +10:00
parent a16a8bb9c2
commit 1d5aec19d7
11 changed files with 599 additions and 0 deletions

158
heat/cfntools/cfn-push-stats Executable file
View File

@ -0,0 +1,158 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Implements cfn-signal CloudFormation functionality
"""
import argparse
import logging
import psutil
import os
import random
import sys
#
# --aws-credential-file=PATH Specifies the location of the file with AWS
# credentials.
# --aws-access-key-id=VALUE Specifies the AWS access key ID to use to
# identify the caller.
# --aws-secret-key=VALUE Specifies the AWS secret key to use to sign
# the request.
# --from-cron Specifies that this script is running from cron.
#
# Examples
#
# To perform a simple test run without posting data to Amazon CloudWatch
#
# ./mon-put-instance-data.pl --mem-util --verify --verbose
#
# To set a five-minute cron schedule to report memory and disk space utilization to CloudWatch
#
# */5 * * * * ~/aws-scripts-mon/mon-put-instance-data.pl --mem-util --disk-space-util --disk-path=/ --from-cron
#
if os.path.exists('/opt/aws/bin'):
sys.path.insert(0, '/opt/aws/bin')
from cfn_helper import *
else:
from heat.cfntools.cfn_helper import *
KILO = 1024
MEGA = 1048576
GIGA = 1073741824
unit_map = {'bytes': 1,
'kilobytes': KILO,
'megabytes': MEGA,
'gigabytes': GIGA}
description = " "
parser = argparse.ArgumentParser(description=description)
parser.add_argument('-v', '--verbose', action="store_true",
help="Verbose logging", required=False)
parser.add_argument('--service-failure', required=False, action="store_true",
help='Reports a service falure.')
parser.add_argument('--mem-util', required=False, action="store_true",
help='Reports memory utilization in percentages.')
parser.add_argument('--mem-used', required=False, action="store_true",
help='Reports memory used (excluding cache and buffers) in megabytes.')
parser.add_argument('--mem-avail', required=False, action="store_true",
help='Reports available memory (including cache and buffers) in megabytes.')
parser.add_argument('--swap-util', required=False, action="store_true",
help='Reports swap utilization in percentages.')
parser.add_argument('--swap-used', required=False, action="store_true",
help='Reports allocated swap space in megabytes.')
parser.add_argument('--disk-space-util', required=False, action="store_true",
help='Reports disk space utilization in percentages.')
parser.add_argument('--disk-space-used', required=False, action="store_true",
help='Reports allocated disk space in gigabytes.')
parser.add_argument('--disk-space-avail',required=False, action="store_true",
help='Reports available disk space in gigabytes.')
parser.add_argument('--memory-units', required=False, default='megabytes',
help='Specifies units for memory metrics.')
parser.add_argument('--disk-units', required=False, default='megabytes',
help='Specifies units for disk metrics.')
parser.add_argument('--disk-path', required=False, default='/',
help='Selects the disk by the path on which to report.')
parser.add_argument('url',
help='the url to post to')
args = parser.parse_args()
log_format = '%(levelname)s [%(asctime)s] %(message)s'
logging.basicConfig(format=log_format, level=logging.DEBUG)
logger = logging.getLogger('cfn-push-stats')
data = {'Namespace': 'system/linux'}
# service failure
# ===============
if args.service_failure:
data['ServiceFailure'] = {
'Value': 1,
'Units': 'Counter'}
# memory space
# ==========
if args.mem_util or args.mem_used or args.mem_avail:
mem = psutil.phymem_usage()
if args.mem_util:
data['MemoryUtilization'] = {
'Value': mem.percent,
'Units': 'Percent'}
if args.mem_used:
data['MemoryUsed'] = {
'Value': mem.used / unit_map[args.memory_units],
'Units': args.memory_units}
if args.mem_avail:
data['MemoryAvailable'] = {
'Value': mem.free / unit_map[args.memory_units],
'Units': args.memory_units}
# swap space
# ==========
if args.swap_util or args.swap_used:
swap = psutil.virtmem_usage()
if args.swap_util:
data['SwapUtilization'] = {
'Value': swap.percent,
'Units': 'Percent'}
if args.swap_used:
data['SwapUsed'] = {
'Value': swap.used / unit_map[args.memory_units],
'Units': args.memory_units}
# disk space
# ==========
if args.disk_space_util or args.disk_space_used or args.disk_space_avail:
disk = psutil.disk_usage(args.disk_path)
if args.disk_space_util:
data['DiskSpaceUtilization'] = {
'Value': disk.percent,
'Units': 'Percent'}
if args.disk_space_used:
data['DiskSpaceUsed'] = {
'Value': disk.used / unit_map[args.disk_units],
'Units': args.disk_units}
if args.disk_space_avail:
data['DiskSpaceAvailable'] = {
'Value': disk.free / unit_map[args.disk_units],
'Units': args.disk_units}
logger.info(str(data))
cmd_str = "curl -X POST -H \'Content-Type:\' --data-binary \'%s\' %s" % \
(json.dumps(data), args.url)
cmd = CommandRunner(cmd_str)
cmd.run()
print cmd.stdout

View File

@ -125,3 +125,31 @@ def event_get_all_by_stack(context, stack_id):
def event_create(context, values):
return IMPL.event_create(context, values)
def watch_rule_get(context, watch_rule_name):
return IMPL.watch_rule_get(context, watch_rule_name)
def watch_rule_get_all(context):
return IMPL.watch_rule_get_all(context)
def watch_rule_create(context, values):
return IMPL.watch_rule_create(context, values)
def watch_rule_delete(context, watch_rule_name):
return IMPL.watch_rule_delete(context, watch_rule_name)
def watch_data_create(context, values):
return IMPL.watch_data_create(context, values)
def watch_data_get_all(context, watch_id):
return IMPL.watch_data_get_all(context, watch_id)
def watch_data_delete(context, watch_name):
return IMPL.watch_data_delete(context, watch_name)

View File

@ -195,3 +195,66 @@ def event_create(context, values):
event_ref.update(values)
event_ref.save()
return event_ref
def watch_rule_get(context, watch_rule_name):
result = model_query(context, models.WatchRule).\
filter_by(name=watch_rule_name).first()
return result
def watch_rule_get_all(context):
results = model_query(context, models.WatchRule).all()
return results
def watch_rule_create(context, values):
obj_ref = models.WatchRule()
obj_ref.update(values)
obj_ref.save()
return obj_ref
def watch_rule_delete(context, watch_name):
wr = model_query(context, models.WatchRule).\
filter_by(name=watch_name).first()
if not wr:
raise Exception('Attempt to delete a watch_rule with name: %s %s' % \
(watch_name, 'that does not exist'))
session = Session.object_session(wr)
for d in wr.watch_data:
session.delete(d)
session.delete(wr)
session.flush()
def watch_data_create(context, values):
obj_ref = models.WatchData()
obj_ref.update(values)
obj_ref.save()
return obj_ref
def watch_data_get_all(context, watch_id):
# get dataset ordered by creation_at (most recient first)
results = model_query(context, models.WatchData).\
filter_by(watch_rule_id=watch_id).all()
return results
def watch_data_delete(context, watch_name):
ds = model_query(context, models.WatchRule).\
filter_by(name=watch_name).all()
if not ds:
raise Exception('Attempt to delete watch_data with name: %s %s' % \
(watch_name, 'that does not exist'))
session = Session.object_session(ds)
for d in ds:
session.delete(d)
session.flush()

View File

@ -0,0 +1,57 @@
from sqlalchemy import *
from migrate import *
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
watch_rule = Table(
'watch_rule', meta,
Column('id', Integer, primary_key=True),
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('stack_name', String(length=255, convert_unicode=False,
assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False)),
Column('name', String(length=255, convert_unicode=False,
assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False)),
Column('state', String(length=255, convert_unicode=False,
assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False)),
Column('rule', Text()),
Column('last_evaluated', DateTime(timezone=False)),
)
try:
watch_rule.create()
except Exception:
meta.drop_all(tables=tables)
raise
watch_data = Table(
'watch_data', meta,
Column('id', Integer, primary_key=True),
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('data', Text()),
Column('watch_rule_id', Integer, ForeignKey("watch_rule.id"),
nullable=False),
)
try:
watch_data.create()
except Exception:
meta.drop_all(tables=tables)
raise
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
watch_rule = Table('watch_rule', meta, autoload=True)
watch_rule.drop()
watch_data = Table('watch_data', meta, autoload=True)
watch_data.drop()

View File

@ -182,3 +182,29 @@ class Resource(BASE, HeatBase):
stack = relationship(Stack, backref=backref('resources'))
depends_on = Column(Integer)
class WatchRule(BASE, HeatBase):
"""Represents a watch_rule created by the heat engine."""
__tablename__ = 'watch_rule'
id = Column(Integer, primary_key=True)
name = Column('name', String, nullable=False)
rule = Column('rule', Json)
state = Column('state', String)
stack_name = Column('stack_name', String)
last_evaluated = Column(DateTime, default=timeutils.utcnow)
class WatchData(BASE, HeatBase):
"""Represents a watch_data created by the heat engine."""
__tablename__ = 'watch_data'
id = Column(Integer, primary_key=True)
data = Column('data', Json)
watch_rule_id = Column(Integer, ForeignKey('watch_rule.id'),\
nullable=False)
watch_rule = relationship(WatchRule, backref=backref('watch_data'))

View File

@ -0,0 +1,92 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import logging
import json
import os
from heat.common import exception
from heat.db import api as db_api
from heat.engine.resources import Resource
logger = logging.getLogger('heat.engine.cloud_watch')
class CloudWatchAlarm(Resource):
properties_schema = {'ComparisonOperator': {'Type': 'String',
'AllowedValues': ['GreaterThanOrEqualToThreshold',
'GreaterThanThreshold', 'LessThanThreshold',
'LessThanOrEqualToThreshold']},
'EvaluationPeriods': {'Type': 'String'},
'MetricName': {'Type': 'String'},
'Namespace': {'Type': 'String'},
'Period': {'Type': 'String'},
'Statistic': {'Type': 'String',
'AllowedValues': ['SampleCount', 'Average', 'Sum',
'Minimum', 'Maximum']},
'Threshold': {'Type': 'String'},
'Units': {'Type': 'String',
'AllowedValues': ['Seconds', 'Microseconds', 'Milliseconds',
'Bytes', 'Kilobytes', 'Megabytes', 'Gigabytes',
'Terabytes', 'Bits', 'Kilobits', 'Megabits', 'Gigabits',
'Terabits', 'Percent', 'Count', 'Bytes/Second',
'Kilobytes/Second', 'Megabytes/Second', 'Gigabytes/Second',
'Terabytes/Second', 'Bits/Second', 'Kilobits/Second',
'Megabits/Second', 'Gigabits/Second', 'Terabits/Second',
'Count/Second', None]}}
def __init__(self, name, json_snippet, stack):
super(CloudWatchAlarm, self).__init__(name, json_snippet, stack)
self.instance_id = ''
def validate(self):
'''
Validate the Properties
'''
return Resource.validate(self)
def create(self):
if self.state != None:
return
self.state_set(self.CREATE_IN_PROGRESS)
Resource.create(self)
wr_values = {
'name': self.name,
'rule': self.t['Properties'],
'state': 'NORMAL',
'stack_name': self.stack.name
}
wr = db_api.watch_rule_create(None, wr_values)
self.instance_id = wr.id
self.state_set(self.CREATE_COMPLETE)
def delete(self):
if self.state == self.DELETE_IN_PROGRESS or \
self.state == self.DELETE_COMPLETE:
return
self.state_set(self.DELETE_IN_PROGRESS)
db_api.watch_rule_delete(None, self.name)
Resource.delete(self)
self.state_set(self.DELETE_COMPLETE)
def FnGetRefId(self):
return unicode(self.name)

View File

@ -16,6 +16,7 @@
import contextlib
from copy import deepcopy
import datetime
import functools
import os
import socket
@ -30,6 +31,7 @@ from heat.common import config
from heat.engine import parser
from heat.engine import resources
from heat.db import api as db_api
from heat.openstack.common import timeutils
logger = logging.getLogger('heat.engine.manager')
@ -324,3 +326,125 @@ class EngineManager(manager.Manager):
pt.template = t
pt.save()
return [None, metadata]
def do_data_cmp(self, rule, data, threshold):
op = rule['ComparisonOperator']
if op == 'GreaterThanThreshold':
return data > threshold
elif op == 'GreaterThanOrEqualToThreshold':
return data >= threshold
elif op == 'LessThanThreshold':
return data < threshold
elif op == 'LessThanOrEqualToThreshold':
return data <= threshold
else:
return False
def do_data_calc(self, rule, rolling, metric):
stat = rule['Statistic']
if stat == 'Maximum':
if metric > rolling:
return metric
else:
return rolling
elif stat == 'Minimum':
if metric < rolling:
return metric
else:
return rolling
else:
return metric + rolling
@manager.periodic_task
def _periodic_watcher_task(self, context):
now = timeutils.utcnow()
wrs = db_api.watch_rule_get_all(context)
for wr in wrs:
logger.debug('_periodic_watcher_task %s' % wr.name)
# has enough time progressed to run the rule
dt_period = datetime.timedelta(seconds=int(wr.rule['Period']))
if now < (wr.last_evaluated + dt_period):
continue
# get dataset ordered by creation_at
# - most recient first
periods = int(wr.rule['EvaluationPeriods'])
# TODO fix this
# initial assumption: all samples are in this period
period = int(wr.rule['Period'])
#wds = db_api.watch_data_get_all(context, wr.id)
wds = wr.watch_data
stat = wr.rule['Statistic']
data = 0
samples = 0
for d in wds:
if d.created_at < wr.last_evaluated:
logger.debug('ignoring old data %s: %s < %s' % \
(wr.rule['MetricName'],
str(d.created_at),
str(wr.last_evaluated)))
continue
samples = samples + 1
metric = 1
data = samples
if stat != 'SampleCount':
metric = int(d.data[wr.rule['MetricName']]['Value'])
data = self.do_data_calc(wr.rule, data, metric)
logger.debug('%s: %d/%d' % (wr.rule['MetricName'],
metric, data))
if stat == 'Average' and samples > 0:
data = data / samples
alarming = self.do_data_cmp(wr.rule, data,
int(wr.rule['Threshold']))
logger.debug('%s: %d/%d => %d (current state:%s)' % \
(wr.rule['MetricName'],
int(wr.rule['Threshold']),
data, alarming, wr.state))
if alarming and wr.state != 'ALARM':
wr.state = 'ALARM'
wr.save()
logger.info('ALARM> stack:%s, watch_name:%s',
wr.stack_name, wr.name)
#s = db_api.stack_get(None, wr.stack_name)
#if s:
# ps = parser.Stack(s.name,
# s.raw_template.parsed_template.template,
# s.id,
# params)
# for a in wr.rule['AlarmActions']:
# ps.resources[a].alarm()
elif not alarming and wr.state == 'ALARM':
wr.state = 'NORMAL'
wr.save()
logger.info('NORMAL> stack:%s, watch_name:%s',
wr.stack_name, wr.name)
wr.last_evaluated = now
def create_watch_data(self, context, watch_name, stats_data):
'''
This could be used by CloudWatch and WaitConditions
and treat HA service events like any other CloudWatch.
'''
wr = db_api.watch_rule_get(context, watch_name)
if wr is None:
return ['NoSuch Watch Rule', None]
if not wr.rule['MetricName'] in stats_data:
return ['MetricName %s missing' % wr.rule['MetricName'], None]
watch_data = {
'data': stats_data,
'watch_rule_id': wr.id
}
wd = db_api.watch_data_create(context, watch_data)
return [None, wd.data]

View File

@ -19,6 +19,7 @@ import logging
import sys
from heat.common import exception
from heat.engine import checkeddict
from heat.engine import cloud_watch
from heat.engine import eip
from heat.engine import instance
from heat.engine import resources
@ -27,6 +28,7 @@ from heat.engine import user
from heat.engine import volume
from heat.engine import wait_condition
from heat.db import api as db_api
logger = logging.getLogger(__file__)

View File

@ -52,6 +52,14 @@ class API(wsgi.Router):
mapper.connect('/events/',
controller=metadata_controller, action='create_event',
conditions=dict(method=['POST']))
mapper.connect('/stats/:watch_name/data/',
controller=metadata_controller,
action='create_watch_data',
conditions=dict(method=['POST']))
# mapper.connect('/stats/:watch_name/data/',
# controller=metadata_controller,
# action='list_watch_data',
# conditions=dict(method=['GET']))
# TODO(shadower): make sure all responses are JSON-encoded
# currently, calling an unknown route uses the default handler which

View File

@ -106,6 +106,27 @@ class MetadataController:
return json_error(400, error)
return json_response(201, event)
def create_watch_data(self, req, body, watch_name):
con = context.get_admin_context()
[error, watch_data] = rpc.call(con, 'engine',
{'method': 'create_watch_data',
'args': {'watch_name': watch_name,
'stats_data': body}})
if error:
return json_error(400, error)
return json_response(201, watch_data)
def list_watch_data(self, req, watch_name):
con = context.get_admin_context()
data = rpc.call(con, 'engine',
{'method': 'list_watch_data',
'args': {'watch_name': watch_name}})
if data:
return data
else:
return json_error(404,
'The watch "%s" does not exist.' % watch_name)
def create_resource(options):
"""

View File

@ -97,6 +97,26 @@
},
"Resources" : {
"WebServerRestartPolicy" : {
"Type" : "HEAT::Recovery::EscalationPolicy",
"Properties" : {
"Instance" : { "Ref" : "WikiDatabase" },
}
},
"HttpFailureAlarm": {
"Type": "AWS::CloudWatch::Alarm",
"Properties": {
"AlarmDescription": "Restart the WikiDatabase if httpd fails > 3 times in 10 minutes",
"MetricName": "ProcessRestartCount",
"Namespace": "HEAT",
"Statistic": "Maximum",
"Period": "300",
"EvaluationPeriods": "2",
"Threshold": "3",
"AlarmActions": [ { "Ref": "WebServerRestartPolicy" } ],
"ComparisonOperator": "GreaterThanThreshold"
}
},
"WikiDatabase": {
"Type": "AWS::EC2::Instance",
"Metadata" : {