Support aodh-evaluator built-in active/active deployment mode
Support to deploy aodh-evaluator in active/active mode by leveraging database non-locking mechanism. With this feature, there could be multiple aodh-evaluator processes running without dependency of etcd or zookeeper. The main change is in aodh/evaluator/__init__.py, in order to use non-locking db update, a new column named 'evaluate_timestamp' is introduced to 'alarm' table. all other code is changed because of 'storage_conn.get_alarms()' implementation changed. Change-Id: I666817cdcf9083f2ad5871d157900cb6dc5a86e0
This commit is contained in:
parent
3bb8de94f7
commit
1349706e2e
@ -50,7 +50,7 @@ class AlarmEventRule(base.AlarmRule):
|
||||
def validate_alarm(cls, alarm):
|
||||
super(AlarmEventRule, cls).validate_alarm(alarm)
|
||||
for i in alarm.event_rule.query:
|
||||
i._get_value_as_type()
|
||||
i.get_value()
|
||||
try:
|
||||
_q_validator({"field": i.field, "op": i.op,
|
||||
"value": i.type})
|
||||
|
@ -76,6 +76,12 @@ severity_kind_enum = wtypes.Enum(str, *severity_kind)
|
||||
ALARM_REASON_DEFAULT = "Not evaluated yet"
|
||||
ALARM_REASON_MANUAL = "Manually set via API"
|
||||
|
||||
ALARM_QUERY_FIELDS_ALLOWED = set([
|
||||
'all_projects', 'user_id', 'project_id', 'type', 'name', 'enabled',
|
||||
'state', 'severity', 'timestamp', 'repeat_actions'
|
||||
])
|
||||
ALARM_QUERY_OPS_ALLOWED = set(['eq'])
|
||||
|
||||
|
||||
class OverQuota(base.ClientSideError):
|
||||
def __init__(self, data):
|
||||
@ -101,14 +107,14 @@ def is_over_quota(conn, project_id, user_id):
|
||||
# Start by checking for user quota
|
||||
user_alarm_quota = pecan.request.cfg.api.user_alarm_quota
|
||||
if user_alarm_quota is not None:
|
||||
user_alarms = list(conn.get_alarms(user=user_id))
|
||||
user_alarms = conn.get_alarms(user_id=user_id)
|
||||
over_quota = len(user_alarms) >= user_alarm_quota
|
||||
|
||||
# If the user quota isn't reached, we check for the project quota
|
||||
if not over_quota:
|
||||
project_alarm_quota = pecan.request.cfg.api.project_alarm_quota
|
||||
if project_alarm_quota is not None:
|
||||
project_alarms = list(conn.get_alarms(project=project_id))
|
||||
project_alarms = conn.get_alarms(project_id=project_id)
|
||||
over_quota = len(project_alarms) >= project_alarm_quota
|
||||
|
||||
return over_quota
|
||||
@ -260,6 +266,9 @@ class Alarm(base.Base):
|
||||
default='low')
|
||||
"The severity of the alarm"
|
||||
|
||||
evaluate_timestamp = datetime.datetime
|
||||
"The latest alarm evaluation time"
|
||||
|
||||
def __init__(self, rule=None, time_constraints=None, **kwargs):
|
||||
super(Alarm, self).__init__(**kwargs)
|
||||
|
||||
@ -548,14 +557,16 @@ class AlarmController(rest.RestController):
|
||||
self._id = alarm_id
|
||||
|
||||
def _enforce_rbac(self, rbac_directive):
|
||||
# TODO(sileht): We should be able to relax this since we
|
||||
# pass the alarm object to the enforcer.
|
||||
auth_project = rbac.get_limited_to_project(pecan.request.headers,
|
||||
pecan.request.enforcer)
|
||||
alarms = list(pecan.request.storage.get_alarms(alarm_id=self._id,
|
||||
project=auth_project))
|
||||
auth_project = pecan.request.headers.get('X-Project-Id')
|
||||
|
||||
filters = {'alarm_id': self._id}
|
||||
if not rbac.is_admin(pecan.request.headers):
|
||||
filters['project_id'] = auth_project
|
||||
|
||||
alarms = pecan.request.storage.get_alarms(**filters)
|
||||
if not alarms:
|
||||
raise base.AlarmNotFound(alarm=self._id, auth_project=auth_project)
|
||||
raise base.AlarmNotFound(alarm=self._id, auth_project=None)
|
||||
|
||||
alarm = alarms[0]
|
||||
target = {'user_id': alarm.user_id,
|
||||
'project_id': alarm.project_id}
|
||||
@ -850,12 +861,50 @@ class AlarmsController(rest.RestController):
|
||||
pecan.request.enforcer, target)
|
||||
|
||||
q = q or []
|
||||
# Timestamp is not supported field for Simple Alarm queries
|
||||
kwargs = v2_utils.query_to_kwargs(
|
||||
q, pecan.request.storage.get_alarms,
|
||||
allow_timestamps=False)
|
||||
filters = {}
|
||||
|
||||
# Check field
|
||||
keys = set([query.field for query in q])
|
||||
if not keys.issubset(ALARM_QUERY_FIELDS_ALLOWED):
|
||||
raise wsme.exc.InvalidInput(
|
||||
'field', keys,
|
||||
'only fields %s are allowed' % ALARM_QUERY_FIELDS_ALLOWED
|
||||
)
|
||||
# Check op
|
||||
ops = set([query.op for query in q])
|
||||
if any([op not in ALARM_QUERY_OPS_ALLOWED for op in ops]):
|
||||
raise wsme.exc.InvalidInput(
|
||||
'op', ops,
|
||||
'only operations %s are allowed' % ALARM_QUERY_OPS_ALLOWED
|
||||
)
|
||||
|
||||
if 'all_projects' in keys:
|
||||
if v2_utils.get_query_value(q, 'all_projects', 'boolean'):
|
||||
rbac.enforce('get_alarms:all_projects', pecan.request.headers,
|
||||
pecan.request.enforcer, target)
|
||||
keys.remove('all_projects')
|
||||
else:
|
||||
project_id = pecan.request.headers.get('X-Project-Id')
|
||||
is_admin = rbac.is_admin(pecan.request.headers)
|
||||
|
||||
if not v2_utils.is_field_exist(q, 'project_id'):
|
||||
q.append(
|
||||
base.Query(field='project_id', op='eq', value=project_id)
|
||||
)
|
||||
else:
|
||||
request_project = v2_utils.get_query_value(q, 'project_id')
|
||||
if not is_admin and request_project != project_id:
|
||||
raise base.ProjectNotAuthorized(request_project)
|
||||
|
||||
for query in q:
|
||||
if query.field in keys:
|
||||
filters[query.field] = {query.op: query.get_value(query.type)}
|
||||
|
||||
if sort or limit or marker:
|
||||
kwargs['pagination'] = v2_utils.get_pagination_options(
|
||||
filters['pagination'] = v2_utils.get_pagination_options(
|
||||
sort, limit, marker, models.Alarm)
|
||||
|
||||
LOG.debug('Getting alarms from database, filters: %s', filters)
|
||||
|
||||
return [Alarm.from_db_model_scrubbed(m)
|
||||
for m in pecan.request.storage.get_alarms(**kwargs)]
|
||||
for m in pecan.request.storage.get_alarms(**filters)]
|
||||
|
@ -153,7 +153,7 @@ class Query(Base):
|
||||
def as_dict(self):
|
||||
return self.as_dict_from_keys(['field', 'op', 'type', 'value'])
|
||||
|
||||
def _get_value_as_type(self, forced_type=None):
|
||||
def get_value(self, forced_type=None):
|
||||
"""Convert metadata value to the specified data type.
|
||||
|
||||
This method is called during metadata query to help convert the
|
||||
|
@ -157,9 +157,9 @@ def validate_query(query, db_func, internal_keys=None,
|
||||
if key in valid_keys or _is_field_metadata(i.field):
|
||||
if operator == 'eq':
|
||||
if key == 'enabled':
|
||||
i._get_value_as_type('boolean')
|
||||
i.get_value('boolean')
|
||||
elif _is_field_metadata(key):
|
||||
i._get_value_as_type()
|
||||
i.get_value()
|
||||
else:
|
||||
raise wsme.exc.InvalidInput('op', i.op,
|
||||
'unimplemented operator for '
|
||||
@ -235,7 +235,7 @@ def query_to_kwargs(query, db_func, internal_keys=None,
|
||||
if i.field == 'search_offset':
|
||||
stamp['search_offset'] = i.value
|
||||
elif i.field == 'enabled':
|
||||
kwargs[i.field] = i._get_value_as_type('boolean')
|
||||
kwargs[i.field] = i.get_value('boolean')
|
||||
else:
|
||||
key = translation.get(i.field, i.field)
|
||||
kwargs[key] = i.value
|
||||
@ -320,3 +320,33 @@ def get_pagination_options(sort, limit, marker, api_model):
|
||||
return {'limit': limit,
|
||||
'marker': marker,
|
||||
'sort': sorts}
|
||||
|
||||
|
||||
def get_query_value(queries, field, type=None):
|
||||
"""Get value of the specified query field.
|
||||
|
||||
:param queries: A list of Query object.
|
||||
:param field: Field name.
|
||||
"""
|
||||
for q in queries:
|
||||
if q.field == field:
|
||||
return q.get_value(type)
|
||||
|
||||
raise wsme.exc.InvalidInput(
|
||||
'field',
|
||||
field,
|
||||
"field %s is not provided" % field
|
||||
)
|
||||
|
||||
|
||||
def is_field_exist(queries, field):
|
||||
"""Check if a given field exists in a query list.
|
||||
|
||||
:param queries: A list of Query object.
|
||||
:param field: Field name.
|
||||
"""
|
||||
for q in queries:
|
||||
if q.field == field:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
@ -57,6 +57,17 @@ rules = [
|
||||
}
|
||||
]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name="telemetry:get_alarms:all_projects",
|
||||
check_str=RULE_CONTEXT_IS_ADMIN,
|
||||
description='Get alarms of all projects.',
|
||||
operations=[
|
||||
{
|
||||
'path': '/v2/alarms',
|
||||
'method': 'GET'
|
||||
}
|
||||
]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name="telemetry:query_alarm",
|
||||
check_str=RULE_ADMIN_OR_OWNER,
|
||||
|
@ -105,3 +105,7 @@ def get_limited_to_project(headers, enforcer):
|
||||
|
||||
"""
|
||||
return get_limited_to(headers, enforcer)[1]
|
||||
|
||||
|
||||
def is_admin(headers):
|
||||
return 'admin' in headers.get('X-Roles', "").split(",")
|
||||
|
@ -220,10 +220,12 @@ class PartitionCoordinator(object):
|
||||
`tooz`. We then hash all the objects into buckets and return only
|
||||
the ones that hashed into *our* bucket.
|
||||
"""
|
||||
if not group_id:
|
||||
if not group_id or not self.is_active():
|
||||
return universal_set
|
||||
|
||||
if group_id not in self._groups:
|
||||
self.join_group(group_id)
|
||||
|
||||
try:
|
||||
members = self._get_members(group_id)
|
||||
LOG.debug('Members of group: %s, Me: %s', members, self._my_id)
|
||||
|
@ -38,6 +38,7 @@ from aodh import messaging
|
||||
from aodh import queue
|
||||
from aodh import storage
|
||||
from aodh.storage import models
|
||||
from aodh.storage.sqlalchemy import models as sql_models
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -249,6 +250,7 @@ class AlarmEvaluationService(cotyledon.Service):
|
||||
alarms = self._assigned_alarms()
|
||||
LOG.info('initiating evaluation cycle on %d alarms',
|
||||
len(alarms))
|
||||
|
||||
for alarm in alarms:
|
||||
self._evaluate_alarm(alarm)
|
||||
except Exception:
|
||||
@ -257,23 +259,49 @@ class AlarmEvaluationService(cotyledon.Service):
|
||||
def _evaluate_alarm(self, alarm):
|
||||
"""Evaluate the alarms assigned to this evaluator."""
|
||||
if alarm.type not in self.evaluators:
|
||||
LOG.debug('skipping alarm %s: type unsupported', alarm.alarm_id)
|
||||
LOG.warning('Skipping alarm %s, unsupported type: %s',
|
||||
alarm.alarm_id, alarm.type)
|
||||
return
|
||||
|
||||
LOG.debug('evaluating alarm %s', alarm.alarm_id)
|
||||
# If the coordinator is not available, fallback to database non-locking
|
||||
# mechanism in order to support aodh-evaluator active/active
|
||||
# deployment.
|
||||
if not self.partition_coordinator.is_active():
|
||||
modified = self.storage_conn.conditional_update(
|
||||
sql_models.Alarm,
|
||||
{'evaluate_timestamp': timeutils.utcnow()},
|
||||
{
|
||||
'alarm_id': alarm.alarm_id,
|
||||
'evaluate_timestamp': alarm.evaluate_timestamp
|
||||
},
|
||||
)
|
||||
if not modified:
|
||||
LOG.debug(
|
||||
'Alarm %s has been already handled by another evaluator',
|
||||
alarm.alarm_id
|
||||
)
|
||||
return
|
||||
|
||||
LOG.debug('Evaluating alarm %s', alarm.alarm_id)
|
||||
try:
|
||||
self.evaluators[alarm.type].obj.evaluate(alarm)
|
||||
except Exception:
|
||||
LOG.exception('Failed to evaluate alarm %s', alarm.alarm_id)
|
||||
|
||||
def _assigned_alarms(self):
|
||||
# NOTE(r-mibu): The 'event' type alarms will be evaluated by the
|
||||
# event-driven alarm evaluator, so this periodical evaluator skips
|
||||
# those alarms.
|
||||
all_alarms = self.storage_conn.get_alarms(enabled=True,
|
||||
exclude=dict(type='event'))
|
||||
all_alarms = list(all_alarms)
|
||||
all_alarm_ids = [a.alarm_id for a in all_alarms]
|
||||
selected = self.partition_coordinator.extract_my_subset(
|
||||
self.PARTITIONING_GROUP_NAME, all_alarm_ids)
|
||||
return list(filter(lambda a: a.alarm_id in selected, all_alarms))
|
||||
before = (timeutils.utcnow() - datetime.timedelta(
|
||||
seconds=self.conf.evaluation_interval / 2))
|
||||
selected = self.storage_conn.get_alarms(
|
||||
enabled=True,
|
||||
type={'ne': 'event'},
|
||||
evaluate_timestamp={'lt': before},
|
||||
)
|
||||
|
||||
if self.partition_coordinator.is_active():
|
||||
all_alarm_ids = [a.alarm_id for a in selected]
|
||||
selected_ids = self.partition_coordinator.extract_my_subset(
|
||||
self.PARTITIONING_GROUP_NAME, all_alarm_ids
|
||||
)
|
||||
selected = [a for a in selected if a.alarm_id in selected_ids]
|
||||
|
||||
return selected
|
||||
|
@ -193,8 +193,8 @@ class EventAlarmEvaluator(evaluator.Evaluator):
|
||||
# this function update only alarms changed from the last access.
|
||||
alarms = {a.alarm_id: Alarm(a) for a in
|
||||
self._storage_conn.get_alarms(enabled=True,
|
||||
alarm_type='event',
|
||||
project=project)}
|
||||
type='event',
|
||||
project_id=project)}
|
||||
|
||||
if self.conf.event_alarm_cache_ttl:
|
||||
self.caches[project] = {
|
||||
|
@ -72,8 +72,15 @@ def prepare_service(argv=None, config_files=None):
|
||||
conf = cfg.ConfigOpts()
|
||||
oslo_i18n.enable_lazy()
|
||||
log.register_options(conf)
|
||||
log_levels = (conf.default_log_levels +
|
||||
['futurist=INFO', 'keystoneclient=INFO'])
|
||||
log_levels = (
|
||||
conf.default_log_levels +
|
||||
[
|
||||
'futurist=INFO',
|
||||
'keystoneclient=INFO',
|
||||
'oslo_db.sqlalchemy=WARN',
|
||||
'cotyledon=INFO'
|
||||
]
|
||||
)
|
||||
log.set_defaults(default_log_levels=log_levels)
|
||||
defaults.set_cors_middleware_defaults()
|
||||
db_options.set_defaults(conf)
|
||||
|
@ -91,24 +91,8 @@ class Connection(object):
|
||||
"""Migrate the database to `version` or the most recent version."""
|
||||
|
||||
@staticmethod
|
||||
def get_alarms(name=None, user=None, state=None, meter=None,
|
||||
project=None, enabled=None, alarm_id=None,
|
||||
alarm_type=None, severity=None, exclude=None,
|
||||
pagination=None):
|
||||
"""Yields a lists of alarms that match filters.
|
||||
|
||||
:param name: Optional name for alarm.
|
||||
:param user: Optional ID for user that owns the resource.
|
||||
:param state: Optional string for alarm state.
|
||||
:param meter: Optional string for alarms associated with meter.
|
||||
:param project: Optional ID for project that owns the resource.
|
||||
:param enabled: Optional boolean to list disable alarm.
|
||||
:param alarm_id: Optional alarm_id to return one alarm.
|
||||
:param alarm_type: Optional alarm type.
|
||||
:param severity: Optional alarm severity.
|
||||
:param exclude: Optional dict for inequality constraint.
|
||||
:param pagination: Pagination parameters.
|
||||
"""
|
||||
def get_alarms(*args, **kwargs):
|
||||
"""Yields a lists of alarms that match filters."""
|
||||
raise aodh.NotImplementedError('Alarms not implemented')
|
||||
|
||||
@staticmethod
|
||||
|
@ -25,7 +25,6 @@ from oslo_db.sqlalchemy import utils as oslo_sql_utils
|
||||
from oslo_log import log
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
import sqlalchemy
|
||||
from sqlalchemy import asc
|
||||
from sqlalchemy import desc
|
||||
@ -58,6 +57,41 @@ AVAILABLE_STORAGE_CAPABILITIES = {
|
||||
}
|
||||
|
||||
|
||||
def apply_filters(query, model, **filters):
|
||||
filter_dict = {}
|
||||
|
||||
for key, value in filters.items():
|
||||
column_attr = getattr(model, key)
|
||||
|
||||
if isinstance(value, dict):
|
||||
if 'in' in value:
|
||||
query = query.filter(column_attr.in_(value['in']))
|
||||
elif 'nin' in value:
|
||||
query = query.filter(~column_attr.in_(value['nin']))
|
||||
elif 'ne' in value:
|
||||
query = query.filter(column_attr != value['ne'])
|
||||
elif 'gt' in value:
|
||||
query = query.filter(column_attr > value['gt'])
|
||||
elif 'ge' in value:
|
||||
query = query.filter(column_attr >= value['ge'])
|
||||
elif 'lt' in value:
|
||||
query = query.filter(column_attr < value['lt'])
|
||||
elif 'le' in value:
|
||||
query = query.filter(column_attr <= value['le'])
|
||||
elif 'eq' in value:
|
||||
query = query.filter(column_attr == value['eq'])
|
||||
elif 'has' in value:
|
||||
like_pattern = '%{0}%'.format(value['has'])
|
||||
query = query.filter(column_attr.like(like_pattern))
|
||||
else:
|
||||
filter_dict[key] = value
|
||||
|
||||
if filter_dict:
|
||||
query = query.filter_by(**filter_dict)
|
||||
|
||||
return query
|
||||
|
||||
|
||||
class Connection(base.Connection):
|
||||
"""Put the data into a SQLAlchemy database. """
|
||||
CAPABILITIES = base.update_nested(base.Connection.CAPABILITIES,
|
||||
@ -147,28 +181,30 @@ class Connection(base.Connection):
|
||||
|
||||
@staticmethod
|
||||
def _row_to_alarm_model(row):
|
||||
return alarm_api_models.Alarm(alarm_id=row.alarm_id,
|
||||
enabled=row.enabled,
|
||||
type=row.type,
|
||||
name=row.name,
|
||||
description=row.description,
|
||||
timestamp=row.timestamp,
|
||||
user_id=row.user_id,
|
||||
project_id=row.project_id,
|
||||
state=row.state,
|
||||
state_timestamp=row.state_timestamp,
|
||||
state_reason=row.state_reason,
|
||||
ok_actions=row.ok_actions,
|
||||
alarm_actions=row.alarm_actions,
|
||||
insufficient_data_actions=(
|
||||
row.insufficient_data_actions),
|
||||
rule=row.rule,
|
||||
time_constraints=row.time_constraints,
|
||||
repeat_actions=row.repeat_actions,
|
||||
severity=row.severity)
|
||||
return alarm_api_models.Alarm(
|
||||
alarm_id=row.alarm_id,
|
||||
enabled=row.enabled,
|
||||
type=row.type,
|
||||
name=row.name,
|
||||
description=row.description,
|
||||
timestamp=row.timestamp,
|
||||
user_id=row.user_id,
|
||||
project_id=row.project_id,
|
||||
state=row.state,
|
||||
state_timestamp=row.state_timestamp,
|
||||
state_reason=row.state_reason,
|
||||
ok_actions=row.ok_actions,
|
||||
alarm_actions=row.alarm_actions,
|
||||
insufficient_data_actions=(row.insufficient_data_actions),
|
||||
rule=row.rule,
|
||||
time_constraints=row.time_constraints,
|
||||
repeat_actions=row.repeat_actions,
|
||||
severity=row.severity,
|
||||
evaluate_timestamp=row.evaluate_timestamp
|
||||
)
|
||||
|
||||
def _retrieve_alarms(self, query):
|
||||
return (self._row_to_alarm_model(x) for x in query.all())
|
||||
return [self._row_to_alarm_model(x) for x in query.all()]
|
||||
|
||||
@staticmethod
|
||||
def _get_pagination_query(session, query, pagination, api_model, model):
|
||||
@ -204,50 +240,15 @@ class Connection(base.Connection):
|
||||
return oslo_sql_utils.paginate_query(
|
||||
query, model, limit, sort_keys, sort_dirs=sort_dirs, marker=marker)
|
||||
|
||||
def get_alarms(self, name=None, user=None, state=None, meter=None,
|
||||
project=None, enabled=None, alarm_id=None,
|
||||
alarm_type=None, severity=None, exclude=None,
|
||||
pagination=None):
|
||||
"""Yields a lists of alarms that match filters.
|
||||
|
||||
:param name: Optional name for alarm.
|
||||
:param user: Optional ID for user that owns the resource.
|
||||
:param state: Optional string for alarm state.
|
||||
:param meter: Optional string for alarms associated with meter.
|
||||
:param project: Optional ID for project that owns the resource.
|
||||
:param enabled: Optional boolean to list disable alarm.
|
||||
:param alarm_id: Optional alarm_id to return one alarm.
|
||||
:param alarm_type: Optional alarm type.
|
||||
:param severity: Optional alarm severity.
|
||||
:param exclude: Optional dict for inequality constraint.
|
||||
:param pagination: Pagination query parameters.
|
||||
"""
|
||||
|
||||
def get_alarms(self, meter=None, pagination=None, **kwargs):
|
||||
"""Yields a lists of alarms that match filters."""
|
||||
pagination = pagination or {}
|
||||
session = self._engine_facade.get_session()
|
||||
query = session.query(models.Alarm)
|
||||
if name is not None:
|
||||
query = query.filter(models.Alarm.name == name)
|
||||
if enabled is not None:
|
||||
query = query.filter(models.Alarm.enabled == enabled)
|
||||
if user is not None:
|
||||
query = query.filter(models.Alarm.user_id == user)
|
||||
if project is not None:
|
||||
query = query.filter(models.Alarm.project_id == project)
|
||||
if alarm_id is not None:
|
||||
query = query.filter(models.Alarm.alarm_id == alarm_id)
|
||||
if state is not None:
|
||||
query = query.filter(models.Alarm.state == state)
|
||||
if alarm_type is not None:
|
||||
query = query.filter(models.Alarm.type == alarm_type)
|
||||
if severity is not None:
|
||||
query = query.filter(models.Alarm.severity == severity)
|
||||
if exclude is not None:
|
||||
for key, value in six.iteritems(exclude):
|
||||
query = query.filter(getattr(models.Alarm, key) != value)
|
||||
|
||||
query = apply_filters(query, models.Alarm, **kwargs)
|
||||
query = self._get_pagination_query(
|
||||
session, query, pagination, alarm_api_models.Alarm, models.Alarm)
|
||||
|
||||
alarms = self._retrieve_alarms(query)
|
||||
|
||||
# TODO(cmart): improve this by using sqlalchemy.func factory
|
||||
@ -416,3 +417,18 @@ class Connection(base.Connection):
|
||||
.delete())
|
||||
LOG.info("%d alarm histories are removed from database",
|
||||
deleted_rows)
|
||||
|
||||
def conditional_update(self, model, values, expected_values, filters=None):
|
||||
"""Compare-and-swap conditional update SQLAlchemy implementation."""
|
||||
filters = filters or {}
|
||||
filters.update(expected_values)
|
||||
|
||||
session = self._engine_facade.get_session()
|
||||
query = session.query(model)
|
||||
if filters:
|
||||
query = query.filter_by(**filters)
|
||||
|
||||
update_args = {'synchronize_session': False}
|
||||
|
||||
result = query.update(values, **update_args)
|
||||
return 0 != result
|
||||
|
@ -68,12 +68,14 @@ class Alarm(base.Model):
|
||||
:param repeat_actions: Is the actions should be triggered on each
|
||||
alarm evaluation.
|
||||
:param severity: Alarm level (low/moderate/critical)
|
||||
:param evaluate_timestamp: The timestamp when the alarm is finished
|
||||
evaluating.
|
||||
"""
|
||||
def __init__(self, alarm_id, type, enabled, name, description,
|
||||
timestamp, user_id, project_id, state, state_timestamp,
|
||||
state_reason, ok_actions, alarm_actions,
|
||||
insufficient_data_actions, repeat_actions, rule,
|
||||
time_constraints, severity=None):
|
||||
time_constraints, severity=None, evaluate_timestamp=None):
|
||||
if not isinstance(timestamp, datetime.datetime):
|
||||
raise TypeError(_("timestamp should be datetime object"))
|
||||
if not isinstance(state_timestamp, datetime.datetime):
|
||||
@ -97,7 +99,8 @@ class Alarm(base.Model):
|
||||
repeat_actions=repeat_actions,
|
||||
rule=rule,
|
||||
time_constraints=time_constraints,
|
||||
severity=severity)
|
||||
severity=severity,
|
||||
evaluate_timestamp=evaluate_timestamp)
|
||||
|
||||
|
||||
class AlarmChange(base.Model):
|
||||
|
@ -0,0 +1,36 @@
|
||||
# Copyright 2019 Catalyst Cloud Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Add evaluate_timestamp column to alarm table
|
||||
|
||||
Revision ID: 006
|
||||
Revises: 6ae0d05d9451
|
||||
Create Date: 2019-12-05 11:23:42.379029
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '006'
|
||||
down_revision = '6ae0d05d9451'
|
||||
|
||||
from alembic import op
|
||||
from oslo_utils import timeutils
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column(
|
||||
'alarm',
|
||||
sa.Column('evaluate_timestamp', sa.DateTime(), nullable=True,
|
||||
server_default=str(timeutils.utcnow()))
|
||||
)
|
@ -106,6 +106,8 @@ class Alarm(Base):
|
||||
rule = Column(JSONEncodedDict)
|
||||
time_constraints = Column(JSONEncodedDict)
|
||||
|
||||
evaluate_timestamp = Column(DateTime, default=lambda: timeutils.utcnow())
|
||||
|
||||
|
||||
class AlarmChange(Base):
|
||||
"""Define AlarmChange data."""
|
||||
|
@ -71,6 +71,14 @@ class BaseTestCase(base.BaseTestCase):
|
||||
except (TypeError, AttributeError):
|
||||
self.fail("%s doesn't have length" % type(obj))
|
||||
|
||||
def assertDictContains(self, parent, child):
|
||||
"""Checks whether child dict is a subset of parent.
|
||||
|
||||
assertDictContainsSubset() in standard Python 2.7 has been deprecated
|
||||
since Python 3.2
|
||||
"""
|
||||
self.assertEqual(parent, dict(parent, **child))
|
||||
|
||||
@staticmethod
|
||||
def path_get(project_file=None):
|
||||
root = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
|
@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
"""Tests alarm operation."""
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
import json as jsonlib
|
||||
import operator
|
||||
@ -206,9 +207,6 @@ class TestAlarms(TestAlarmsBase):
|
||||
'value': isotime}],
|
||||
expect_errors=True)
|
||||
self.assertEqual(resp.status_code, 400)
|
||||
self.assertEqual(resp.json['error_message']['faultstring'],
|
||||
'Unknown argument: "timestamp": '
|
||||
'not valid for this resource')
|
||||
|
||||
def test_alarms_query_with_state(self):
|
||||
alarm = models.Alarm(name='disabled',
|
||||
@ -260,14 +258,38 @@ class TestAlarms(TestAlarmsBase):
|
||||
self.assertEqual(set(['gnocchi_aggregation_by_metrics_threshold']),
|
||||
set(alarm['type'] for alarm in alarms))
|
||||
|
||||
def test_list_alarms_all_projects_by_admin(self):
|
||||
auth_headers = copy.copy(self.auth_headers)
|
||||
auth_headers['X-Roles'] = 'admin'
|
||||
|
||||
alarms = self.get_json(
|
||||
'/alarms',
|
||||
headers=auth_headers,
|
||||
q=[{'field': 'all_projects', 'op': 'eq', 'value': 'true'}]
|
||||
)
|
||||
|
||||
self.assertEqual(3, len(alarms))
|
||||
|
||||
def test_list_alarms_all_projects_forbidden(self):
|
||||
response = self.get_json(
|
||||
'/alarms',
|
||||
headers=self.auth_headers,
|
||||
q=[{'field': 'all_projects', 'op': 'eq', 'value': 'true'}],
|
||||
expect_errors=True,
|
||||
status=401
|
||||
)
|
||||
|
||||
faultstring = 'RBAC Authorization Failed'
|
||||
self.assertIn(faultstring,
|
||||
response.json['error_message']['faultstring'])
|
||||
|
||||
def test_get_not_existing_alarm(self):
|
||||
resp = self.get_json('/alarms/alarm-id-3',
|
||||
headers=self.auth_headers,
|
||||
expect_errors=True)
|
||||
self.assertEqual(404, resp.status_code)
|
||||
self.assertEqual('Alarm alarm-id-3 not found in project %s' %
|
||||
self.auth_headers["X-Project-Id"],
|
||||
resp.json['error_message']['faultstring'])
|
||||
self.assertIn('Alarm alarm-id-3 not found',
|
||||
resp.json['error_message']['faultstring'])
|
||||
|
||||
def test_get_alarm(self):
|
||||
alarms = self.get_json('/alarms',
|
||||
@ -344,13 +366,11 @@ class TestAlarms(TestAlarmsBase):
|
||||
expect_errors=True,
|
||||
status=400,
|
||||
headers=self.auth_headers)
|
||||
faultstring = ('Invalid input for field/attribute op. '
|
||||
'Value: \'%(op)s\'. unimplemented operator '
|
||||
'for %(field)s' % {'field': field, 'op': op})
|
||||
self.assertEqual(faultstring,
|
||||
response.json['error_message']['faultstring'])
|
||||
|
||||
_test('project', 'ne')
|
||||
faultstring = 'Invalid input for field/attribute op'
|
||||
self.assertIn(faultstring,
|
||||
response.json['error_message']['faultstring'])
|
||||
|
||||
_test('project_id', 'ne')
|
||||
|
||||
def test_get_alarm_project_filter_normal_user(self):
|
||||
@ -364,7 +384,6 @@ class TestAlarms(TestAlarmsBase):
|
||||
'value': project}])
|
||||
self.assertEqual(3, len(alarms))
|
||||
|
||||
_test('project')
|
||||
_test('project_id')
|
||||
|
||||
def test_get_alarm_other_project_normal_user(self):
|
||||
@ -376,11 +395,10 @@ class TestAlarms(TestAlarmsBase):
|
||||
expect_errors=True,
|
||||
status=401,
|
||||
headers=self.auth_headers)
|
||||
faultstring = 'Not Authorized to access project other-project'
|
||||
self.assertEqual(faultstring,
|
||||
response.json['error_message']['faultstring'])
|
||||
faultstring = 'Not Authorized to access'
|
||||
self.assertIn(faultstring,
|
||||
response.json['error_message']['faultstring'])
|
||||
|
||||
_test('project')
|
||||
_test('project_id')
|
||||
|
||||
def test_get_alarm_forbiden(self):
|
||||
|
@ -35,121 +35,121 @@ class TestQuery(base.BaseTestCase):
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'pecan.response', mock.MagicMock()))
|
||||
|
||||
def test_get_value_as_type_with_integer(self):
|
||||
def test_get_value_with_integer(self):
|
||||
query = v2_base.Query(field='metadata.size',
|
||||
op='eq',
|
||||
value='123',
|
||||
type='integer')
|
||||
expected = 123
|
||||
self.assertEqual(expected, query._get_value_as_type())
|
||||
self.assertEqual(expected, query.get_value())
|
||||
|
||||
def test_get_value_as_type_with_float(self):
|
||||
def test_get_value_with_float(self):
|
||||
query = v2_base.Query(field='metadata.size',
|
||||
op='eq',
|
||||
value='123.456',
|
||||
type='float')
|
||||
expected = 123.456
|
||||
self.assertEqual(expected, query._get_value_as_type())
|
||||
self.assertEqual(expected, query.get_value())
|
||||
|
||||
def test_get_value_as_type_with_boolean(self):
|
||||
def test_get_value_with_boolean(self):
|
||||
query = v2_base.Query(field='metadata.is_public',
|
||||
op='eq',
|
||||
value='True',
|
||||
type='boolean')
|
||||
expected = True
|
||||
self.assertEqual(expected, query._get_value_as_type())
|
||||
self.assertEqual(expected, query.get_value())
|
||||
|
||||
def test_get_value_as_type_with_string(self):
|
||||
def test_get_value_with_string(self):
|
||||
query = v2_base.Query(field='metadata.name',
|
||||
op='eq',
|
||||
value='linux',
|
||||
type='string')
|
||||
expected = 'linux'
|
||||
self.assertEqual(expected, query._get_value_as_type())
|
||||
self.assertEqual(expected, query.get_value())
|
||||
|
||||
def test_get_value_as_type_with_datetime(self):
|
||||
def test_get_value_with_datetime(self):
|
||||
query = v2_base.Query(field='metadata.date',
|
||||
op='eq',
|
||||
value='2014-01-01T05:00:00',
|
||||
type='datetime')
|
||||
self.assertIsInstance(query._get_value_as_type(), datetime.datetime)
|
||||
self.assertIsNone(query._get_value_as_type().tzinfo)
|
||||
self.assertIsInstance(query.get_value(), datetime.datetime)
|
||||
self.assertIsNone(query.get_value().tzinfo)
|
||||
|
||||
def test_get_value_as_type_with_integer_without_type(self):
|
||||
def test_get_value_with_integer_without_type(self):
|
||||
query = v2_base.Query(field='metadata.size',
|
||||
op='eq',
|
||||
value='123')
|
||||
expected = 123
|
||||
self.assertEqual(expected, query._get_value_as_type())
|
||||
self.assertEqual(expected, query.get_value())
|
||||
|
||||
def test_get_value_as_type_with_float_without_type(self):
|
||||
def test_get_value_with_float_without_type(self):
|
||||
query = v2_base.Query(field='metadata.size',
|
||||
op='eq',
|
||||
value='123.456')
|
||||
expected = 123.456
|
||||
self.assertEqual(expected, query._get_value_as_type())
|
||||
self.assertEqual(expected, query.get_value())
|
||||
|
||||
def test_get_value_as_type_with_boolean_without_type(self):
|
||||
def test_get_value_with_boolean_without_type(self):
|
||||
query = v2_base.Query(field='metadata.is_public',
|
||||
op='eq',
|
||||
value='True')
|
||||
expected = True
|
||||
self.assertEqual(expected, query._get_value_as_type())
|
||||
self.assertEqual(expected, query.get_value())
|
||||
|
||||
def test_get_value_as_type_with_string_without_type(self):
|
||||
def test_get_value_with_string_without_type(self):
|
||||
query = v2_base.Query(field='metadata.name',
|
||||
op='eq',
|
||||
value='linux')
|
||||
expected = 'linux'
|
||||
self.assertEqual(expected, query._get_value_as_type())
|
||||
self.assertEqual(expected, query.get_value())
|
||||
|
||||
def test_get_value_as_type_with_bad_type(self):
|
||||
def test_get_value_with_bad_type(self):
|
||||
query = v2_base.Query(field='metadata.size',
|
||||
op='eq',
|
||||
value='123.456',
|
||||
type='blob')
|
||||
self.assertRaises(wsme.exc.ClientSideError, query._get_value_as_type)
|
||||
self.assertRaises(wsme.exc.ClientSideError, query.get_value)
|
||||
|
||||
def test_get_value_as_type_with_bad_value(self):
|
||||
def test_get_value_with_bad_value(self):
|
||||
query = v2_base.Query(field='metadata.size',
|
||||
op='eq',
|
||||
value='fake',
|
||||
type='integer')
|
||||
self.assertRaises(wsme.exc.ClientSideError, query._get_value_as_type)
|
||||
self.assertRaises(wsme.exc.ClientSideError, query.get_value)
|
||||
|
||||
def test_get_value_as_type_integer_expression_without_type(self):
|
||||
def test_get_value_integer_expression_without_type(self):
|
||||
# bug 1221736
|
||||
query = v2_base.Query(field='should_be_a_string',
|
||||
op='eq',
|
||||
value='WWW-Layer-4a80714f')
|
||||
expected = 'WWW-Layer-4a80714f'
|
||||
self.assertEqual(expected, query._get_value_as_type())
|
||||
self.assertEqual(expected, query.get_value())
|
||||
|
||||
def test_get_value_as_type_boolean_expression_without_type(self):
|
||||
def test_get_value_boolean_expression_without_type(self):
|
||||
# bug 1221736
|
||||
query = v2_base.Query(field='should_be_a_string',
|
||||
op='eq',
|
||||
value='True or False')
|
||||
expected = 'True or False'
|
||||
self.assertEqual(expected, query._get_value_as_type())
|
||||
self.assertEqual(expected, query.get_value())
|
||||
|
||||
def test_get_value_as_type_with_syntax_error(self):
|
||||
def test_get_value_with_syntax_error(self):
|
||||
# bug 1221736
|
||||
value = 'WWW-Layer-4a80714f-0232-4580-aa5e-81494d1a4147-uolhh25p5xxm'
|
||||
query = v2_base.Query(field='group_id',
|
||||
op='eq',
|
||||
value=value)
|
||||
expected = value
|
||||
self.assertEqual(expected, query._get_value_as_type())
|
||||
self.assertEqual(expected, query.get_value())
|
||||
|
||||
def test_get_value_as_type_with_syntax_error_colons(self):
|
||||
def test_get_value_with_syntax_error_colons(self):
|
||||
# bug 1221736
|
||||
value = 'Ref::StackId'
|
||||
query = v2_base.Query(field='field_name',
|
||||
op='eq',
|
||||
value=value)
|
||||
expected = value
|
||||
self.assertEqual(expected, query._get_value_as_type())
|
||||
self.assertEqual(expected, query.get_value())
|
||||
|
||||
|
||||
class TestQueryToKwArgs(tests_base.BaseTestCase):
|
||||
@ -320,13 +320,8 @@ class TestQueryToKwArgs(tests_base.BaseTestCase):
|
||||
q = [v2_base.Query(field='abc',
|
||||
op='eq',
|
||||
value='abc')]
|
||||
exc = self.assertRaises(
|
||||
self.assertRaises(
|
||||
wsme.exc.UnknownArgument,
|
||||
utils.query_to_kwargs, q,
|
||||
alarm_storage_base.Connection.get_alarms)
|
||||
valid_keys = ['alarm_id', 'enabled', 'exclude', 'meter', 'name',
|
||||
'project', 'severity', 'state', 'type', 'user']
|
||||
msg = ("unrecognized field in query: %s, "
|
||||
"valid keys: %s") % (q, valid_keys)
|
||||
expected_exc = wsme.exc.UnknownArgument('abc', msg)
|
||||
self.assertEqual(str(expected_exc), str(exc))
|
||||
alarm_storage_base.Connection.get_alarms
|
||||
)
|
||||
|
@ -166,13 +166,12 @@ class AlarmTest(AlarmTestBase):
|
||||
|
||||
def test_list_by_type(self):
|
||||
self.add_some_alarms()
|
||||
alarms = list(self.alarm_conn.get_alarms(alarm_type=ALARM_TYPE))
|
||||
alarms = list(self.alarm_conn.get_alarms(type=ALARM_TYPE))
|
||||
self.assertEqual(3, len(alarms))
|
||||
|
||||
def test_list_excluded_by_name(self):
|
||||
self.add_some_alarms()
|
||||
exclude = {'name': 'yellow-alert'}
|
||||
alarms = list(self.alarm_conn.get_alarms(exclude=exclude))
|
||||
alarms = list(self.alarm_conn.get_alarms(name={'ne': 'yellow-alert'}))
|
||||
self.assertEqual(2, len(alarms))
|
||||
alarm_names = sorted([a.name for a in alarms])
|
||||
self.assertEqual(['orange-alert', 'red-alert'], alarm_names)
|
||||
|
@ -49,3 +49,11 @@ class TestEvaluatorBase(base.BaseTestCase):
|
||||
def _assert_all_alarms(self, state):
|
||||
for alarm in self.alarms:
|
||||
self.assertEqual(state, alarm.state)
|
||||
|
||||
def assertDictContains(self, parent, child):
|
||||
"""Checks whether child dict is a subset of parent.
|
||||
|
||||
assertDictContainsSubset() in standard Python 2.7 has been deprecated
|
||||
since Python 3.2
|
||||
"""
|
||||
self.assertEqual(parent, dict(parent, **child))
|
||||
|
@ -100,8 +100,8 @@ class TestEventAlarmEvaluate(base.TestEvaluatorBase):
|
||||
|
||||
if expect_db_queries is not None:
|
||||
expected = [mock.call(enabled=True,
|
||||
alarm_type='event',
|
||||
project=p) for p in expect_db_queries]
|
||||
type='event',
|
||||
project_id=p) for p in expect_db_queries]
|
||||
self.assertEqual(expected,
|
||||
self.storage_conn.get_alarms.call_args_list)
|
||||
|
||||
|
@ -94,7 +94,7 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
|
||||
alarm = mock.Mock(type='gnocchi_aggregation_by_metrics_threshold',
|
||||
alarm_id="alarm_id1")
|
||||
self._fake_pc.extract_my_subset.return_value = ["alarm_id1"]
|
||||
self._fake_pc.is_active.return_value = False
|
||||
self._fake_pc.is_active.side_effect = [False, False, True, True]
|
||||
self._fake_conn.get_alarms.return_value = [alarm]
|
||||
self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None]
|
||||
|
||||
@ -116,7 +116,7 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
|
||||
]
|
||||
self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None]
|
||||
|
||||
self._fake_pc.is_active.return_value = False
|
||||
self._fake_pc.is_active.side_effect = [False, False, True, True, True]
|
||||
self._fake_pc.extract_my_subset.return_value = ['a', 'b']
|
||||
self._fake_conn.get_alarms.return_value = alarms
|
||||
|
||||
@ -150,6 +150,42 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
|
||||
svc = evaluator.AlarmEvaluationService(0, self.CONF)
|
||||
self.addCleanup(svc.terminate)
|
||||
time.sleep(1)
|
||||
expected = [({'enabled': True, 'exclude': {'type': 'event'}},)]
|
||||
self.assertEqual(expected,
|
||||
svc.storage_conn.get_alarms.call_args_list)
|
||||
|
||||
child = {'enabled': True, 'type': {'ne': 'event'}}
|
||||
self.assertDictContains(svc.storage_conn.get_alarms.call_args[1],
|
||||
child)
|
||||
|
||||
def test_evaluation_cycle_no_coordination(self):
|
||||
alarm = mock.Mock(type='gnocchi_aggregation_by_metrics_threshold',
|
||||
alarm_id="alarm_id1")
|
||||
|
||||
self._fake_pc.is_active.return_value = False
|
||||
self._fake_conn.get_alarms.return_value = [alarm]
|
||||
self._fake_conn.conditional_update.return_value = True
|
||||
|
||||
svc = evaluator.AlarmEvaluationService(0, self.CONF)
|
||||
self.addCleanup(svc.terminate)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
target = svc.partition_coordinator.extract_my_subset
|
||||
self.assertEqual(0, target.call_count)
|
||||
|
||||
self.threshold_eval.evaluate.assert_called_once_with(alarm)
|
||||
|
||||
def test_evaluation_cycle_no_coordination_alarm_modified(self):
|
||||
alarm = mock.Mock(type='gnocchi_aggregation_by_metrics_threshold',
|
||||
alarm_id="alarm_id1")
|
||||
|
||||
self._fake_pc.is_active.return_value = False
|
||||
self._fake_conn.get_alarms.return_value = [alarm]
|
||||
self._fake_conn.conditional_update.return_value = False
|
||||
|
||||
svc = evaluator.AlarmEvaluationService(0, self.CONF)
|
||||
self.addCleanup(svc.terminate)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
target = svc.partition_coordinator.extract_my_subset
|
||||
self.assertEqual(0, target.call_count)
|
||||
self.assertEqual(0, self.threshold_eval.evaluate.call_count)
|
||||
|
@ -0,0 +1,5 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
The admin user can fetch alarms of all the projects, e.g.
|
||||
``curl -X GET "${aodh_prefix}/v2/alarms?q.field=all_projects&q.op=eq&q.value=true" X-Auth-Token:$token``
|
@ -0,0 +1,5 @@
|
||||
---
|
||||
features:
|
||||
- Support to deploy aodh-evaluator in active/active mode by leveraging
|
||||
database non-locking mechanism. With this feature, there could be multiple
|
||||
aodh-evaluator processes running without dependency of etcd or zookeeper.
|
Loading…
Reference in New Issue
Block a user