From 1349706e2ee6c2735c224b93e442a998a221118d Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Thu, 5 Dec 2019 15:17:56 +1300 Subject: [PATCH] Support aodh-evaluator built-in active/active deployment mode Support to deploy aodh-evaluator in active/active mode by leveraging database non-locking mechanism. With this feature, there could be multiple aodh-evaluator processes running without dependency of etcd or zookeeper. The main change is in aodh/evaluator/__init__.py, in order to use non-locking db update, a new column named 'evaluate_timestamp' is introduced to 'alarm' table. all other code is changed because of 'storage_conn.get_alarms()' implementation changed. Change-Id: I666817cdcf9083f2ad5871d157900cb6dc5a86e0 --- aodh/api/controllers/v2/alarm_rules/event.py | 2 +- aodh/api/controllers/v2/alarms.py | 79 ++++++++-- aodh/api/controllers/v2/base.py | 2 +- aodh/api/controllers/v2/utils.py | 36 ++++- aodh/api/policies.py | 11 ++ aodh/api/rbac.py | 4 + aodh/coordination.py | 4 +- aodh/evaluator/__init__.py | 52 +++++-- aodh/evaluator/event.py | 4 +- aodh/service.py | 11 +- aodh/storage/base.py | 20 +-- aodh/storage/impl_sqlalchemy.py | 136 ++++++++++-------- aodh/storage/models.py | 7 +- .../006_add_evaluate_timestamp_to_alarm.py | 36 +++++ aodh/storage/sqlalchemy/models.py | 2 + aodh/tests/base.py | 8 ++ .../functional/api/v2/test_alarm_scenarios.py | 52 ++++--- aodh/tests/functional/api/v2/test_query.py | 73 +++++----- .../storage/test_storage_scenarios.py | 5 +- aodh/tests/unit/evaluator/base.py | 8 ++ aodh/tests/unit/evaluator/test_event.py | 4 +- aodh/tests/unit/test_evaluator.py | 46 +++++- ...ctive-aodh-evaluator-a935577e17a211ea.yaml | 5 + ...ects-alarms-by-admin-3ecccf2217d711ea.yaml | 5 + 24 files changed, 429 insertions(+), 183 deletions(-) create mode 100644 aodh/storage/sqlalchemy/alembic/versions/006_add_evaluate_timestamp_to_alarm.py create mode 100644 releasenotes/notes/ussuri-support-builtin-active-active-aodh-evaluator-a935577e17a211ea.yaml create mode 100644 releasenotes/notes/ussuri-support-query-all-projects-alarms-by-admin-3ecccf2217d711ea.yaml diff --git a/aodh/api/controllers/v2/alarm_rules/event.py b/aodh/api/controllers/v2/alarm_rules/event.py index 7c217e458..f633bc74c 100644 --- a/aodh/api/controllers/v2/alarm_rules/event.py +++ b/aodh/api/controllers/v2/alarm_rules/event.py @@ -50,7 +50,7 @@ class AlarmEventRule(base.AlarmRule): def validate_alarm(cls, alarm): super(AlarmEventRule, cls).validate_alarm(alarm) for i in alarm.event_rule.query: - i._get_value_as_type() + i.get_value() try: _q_validator({"field": i.field, "op": i.op, "value": i.type}) diff --git a/aodh/api/controllers/v2/alarms.py b/aodh/api/controllers/v2/alarms.py index 8a38a2bfa..dcc4f8d1c 100644 --- a/aodh/api/controllers/v2/alarms.py +++ b/aodh/api/controllers/v2/alarms.py @@ -76,6 +76,12 @@ severity_kind_enum = wtypes.Enum(str, *severity_kind) ALARM_REASON_DEFAULT = "Not evaluated yet" ALARM_REASON_MANUAL = "Manually set via API" +ALARM_QUERY_FIELDS_ALLOWED = set([ + 'all_projects', 'user_id', 'project_id', 'type', 'name', 'enabled', + 'state', 'severity', 'timestamp', 'repeat_actions' +]) +ALARM_QUERY_OPS_ALLOWED = set(['eq']) + class OverQuota(base.ClientSideError): def __init__(self, data): @@ -101,14 +107,14 @@ def is_over_quota(conn, project_id, user_id): # Start by checking for user quota user_alarm_quota = pecan.request.cfg.api.user_alarm_quota if user_alarm_quota is not None: - user_alarms = list(conn.get_alarms(user=user_id)) + user_alarms = conn.get_alarms(user_id=user_id) over_quota = len(user_alarms) >= user_alarm_quota # If the user quota isn't reached, we check for the project quota if not over_quota: project_alarm_quota = pecan.request.cfg.api.project_alarm_quota if project_alarm_quota is not None: - project_alarms = list(conn.get_alarms(project=project_id)) + project_alarms = conn.get_alarms(project_id=project_id) over_quota = len(project_alarms) >= project_alarm_quota return over_quota @@ -260,6 +266,9 @@ class Alarm(base.Base): default='low') "The severity of the alarm" + evaluate_timestamp = datetime.datetime + "The latest alarm evaluation time" + def __init__(self, rule=None, time_constraints=None, **kwargs): super(Alarm, self).__init__(**kwargs) @@ -548,14 +557,16 @@ class AlarmController(rest.RestController): self._id = alarm_id def _enforce_rbac(self, rbac_directive): - # TODO(sileht): We should be able to relax this since we - # pass the alarm object to the enforcer. - auth_project = rbac.get_limited_to_project(pecan.request.headers, - pecan.request.enforcer) - alarms = list(pecan.request.storage.get_alarms(alarm_id=self._id, - project=auth_project)) + auth_project = pecan.request.headers.get('X-Project-Id') + + filters = {'alarm_id': self._id} + if not rbac.is_admin(pecan.request.headers): + filters['project_id'] = auth_project + + alarms = pecan.request.storage.get_alarms(**filters) if not alarms: - raise base.AlarmNotFound(alarm=self._id, auth_project=auth_project) + raise base.AlarmNotFound(alarm=self._id, auth_project=None) + alarm = alarms[0] target = {'user_id': alarm.user_id, 'project_id': alarm.project_id} @@ -850,12 +861,50 @@ class AlarmsController(rest.RestController): pecan.request.enforcer, target) q = q or [] - # Timestamp is not supported field for Simple Alarm queries - kwargs = v2_utils.query_to_kwargs( - q, pecan.request.storage.get_alarms, - allow_timestamps=False) + filters = {} + + # Check field + keys = set([query.field for query in q]) + if not keys.issubset(ALARM_QUERY_FIELDS_ALLOWED): + raise wsme.exc.InvalidInput( + 'field', keys, + 'only fields %s are allowed' % ALARM_QUERY_FIELDS_ALLOWED + ) + # Check op + ops = set([query.op for query in q]) + if any([op not in ALARM_QUERY_OPS_ALLOWED for op in ops]): + raise wsme.exc.InvalidInput( + 'op', ops, + 'only operations %s are allowed' % ALARM_QUERY_OPS_ALLOWED + ) + + if 'all_projects' in keys: + if v2_utils.get_query_value(q, 'all_projects', 'boolean'): + rbac.enforce('get_alarms:all_projects', pecan.request.headers, + pecan.request.enforcer, target) + keys.remove('all_projects') + else: + project_id = pecan.request.headers.get('X-Project-Id') + is_admin = rbac.is_admin(pecan.request.headers) + + if not v2_utils.is_field_exist(q, 'project_id'): + q.append( + base.Query(field='project_id', op='eq', value=project_id) + ) + else: + request_project = v2_utils.get_query_value(q, 'project_id') + if not is_admin and request_project != project_id: + raise base.ProjectNotAuthorized(request_project) + + for query in q: + if query.field in keys: + filters[query.field] = {query.op: query.get_value(query.type)} + if sort or limit or marker: - kwargs['pagination'] = v2_utils.get_pagination_options( + filters['pagination'] = v2_utils.get_pagination_options( sort, limit, marker, models.Alarm) + + LOG.debug('Getting alarms from database, filters: %s', filters) + return [Alarm.from_db_model_scrubbed(m) - for m in pecan.request.storage.get_alarms(**kwargs)] + for m in pecan.request.storage.get_alarms(**filters)] diff --git a/aodh/api/controllers/v2/base.py b/aodh/api/controllers/v2/base.py index 2f84d137c..888ec3378 100644 --- a/aodh/api/controllers/v2/base.py +++ b/aodh/api/controllers/v2/base.py @@ -153,7 +153,7 @@ class Query(Base): def as_dict(self): return self.as_dict_from_keys(['field', 'op', 'type', 'value']) - def _get_value_as_type(self, forced_type=None): + def get_value(self, forced_type=None): """Convert metadata value to the specified data type. This method is called during metadata query to help convert the diff --git a/aodh/api/controllers/v2/utils.py b/aodh/api/controllers/v2/utils.py index 6547e9cb6..76f690328 100644 --- a/aodh/api/controllers/v2/utils.py +++ b/aodh/api/controllers/v2/utils.py @@ -157,9 +157,9 @@ def validate_query(query, db_func, internal_keys=None, if key in valid_keys or _is_field_metadata(i.field): if operator == 'eq': if key == 'enabled': - i._get_value_as_type('boolean') + i.get_value('boolean') elif _is_field_metadata(key): - i._get_value_as_type() + i.get_value() else: raise wsme.exc.InvalidInput('op', i.op, 'unimplemented operator for ' @@ -235,7 +235,7 @@ def query_to_kwargs(query, db_func, internal_keys=None, if i.field == 'search_offset': stamp['search_offset'] = i.value elif i.field == 'enabled': - kwargs[i.field] = i._get_value_as_type('boolean') + kwargs[i.field] = i.get_value('boolean') else: key = translation.get(i.field, i.field) kwargs[key] = i.value @@ -320,3 +320,33 @@ def get_pagination_options(sort, limit, marker, api_model): return {'limit': limit, 'marker': marker, 'sort': sorts} + + +def get_query_value(queries, field, type=None): + """Get value of the specified query field. + + :param queries: A list of Query object. + :param field: Field name. + """ + for q in queries: + if q.field == field: + return q.get_value(type) + + raise wsme.exc.InvalidInput( + 'field', + field, + "field %s is not provided" % field + ) + + +def is_field_exist(queries, field): + """Check if a given field exists in a query list. + + :param queries: A list of Query object. + :param field: Field name. + """ + for q in queries: + if q.field == field: + return True + + return False diff --git a/aodh/api/policies.py b/aodh/api/policies.py index 2f11c1476..989f57c35 100644 --- a/aodh/api/policies.py +++ b/aodh/api/policies.py @@ -57,6 +57,17 @@ rules = [ } ] ), + policy.DocumentedRuleDefault( + name="telemetry:get_alarms:all_projects", + check_str=RULE_CONTEXT_IS_ADMIN, + description='Get alarms of all projects.', + operations=[ + { + 'path': '/v2/alarms', + 'method': 'GET' + } + ] + ), policy.DocumentedRuleDefault( name="telemetry:query_alarm", check_str=RULE_ADMIN_OR_OWNER, diff --git a/aodh/api/rbac.py b/aodh/api/rbac.py index 7b592b07b..7f8dfec82 100644 --- a/aodh/api/rbac.py +++ b/aodh/api/rbac.py @@ -105,3 +105,7 @@ def get_limited_to_project(headers, enforcer): """ return get_limited_to(headers, enforcer)[1] + + +def is_admin(headers): + return 'admin' in headers.get('X-Roles', "").split(",") diff --git a/aodh/coordination.py b/aodh/coordination.py index e0956fc28..db33b9cbb 100644 --- a/aodh/coordination.py +++ b/aodh/coordination.py @@ -220,10 +220,12 @@ class PartitionCoordinator(object): `tooz`. We then hash all the objects into buckets and return only the ones that hashed into *our* bucket. """ - if not group_id: + if not group_id or not self.is_active(): return universal_set + if group_id not in self._groups: self.join_group(group_id) + try: members = self._get_members(group_id) LOG.debug('Members of group: %s, Me: %s', members, self._my_id) diff --git a/aodh/evaluator/__init__.py b/aodh/evaluator/__init__.py index 78bbdfa4e..91b6c07ba 100644 --- a/aodh/evaluator/__init__.py +++ b/aodh/evaluator/__init__.py @@ -38,6 +38,7 @@ from aodh import messaging from aodh import queue from aodh import storage from aodh.storage import models +from aodh.storage.sqlalchemy import models as sql_models LOG = log.getLogger(__name__) @@ -249,6 +250,7 @@ class AlarmEvaluationService(cotyledon.Service): alarms = self._assigned_alarms() LOG.info('initiating evaluation cycle on %d alarms', len(alarms)) + for alarm in alarms: self._evaluate_alarm(alarm) except Exception: @@ -257,23 +259,49 @@ class AlarmEvaluationService(cotyledon.Service): def _evaluate_alarm(self, alarm): """Evaluate the alarms assigned to this evaluator.""" if alarm.type not in self.evaluators: - LOG.debug('skipping alarm %s: type unsupported', alarm.alarm_id) + LOG.warning('Skipping alarm %s, unsupported type: %s', + alarm.alarm_id, alarm.type) return - LOG.debug('evaluating alarm %s', alarm.alarm_id) + # If the coordinator is not available, fallback to database non-locking + # mechanism in order to support aodh-evaluator active/active + # deployment. + if not self.partition_coordinator.is_active(): + modified = self.storage_conn.conditional_update( + sql_models.Alarm, + {'evaluate_timestamp': timeutils.utcnow()}, + { + 'alarm_id': alarm.alarm_id, + 'evaluate_timestamp': alarm.evaluate_timestamp + }, + ) + if not modified: + LOG.debug( + 'Alarm %s has been already handled by another evaluator', + alarm.alarm_id + ) + return + + LOG.debug('Evaluating alarm %s', alarm.alarm_id) try: self.evaluators[alarm.type].obj.evaluate(alarm) except Exception: LOG.exception('Failed to evaluate alarm %s', alarm.alarm_id) def _assigned_alarms(self): - # NOTE(r-mibu): The 'event' type alarms will be evaluated by the - # event-driven alarm evaluator, so this periodical evaluator skips - # those alarms. - all_alarms = self.storage_conn.get_alarms(enabled=True, - exclude=dict(type='event')) - all_alarms = list(all_alarms) - all_alarm_ids = [a.alarm_id for a in all_alarms] - selected = self.partition_coordinator.extract_my_subset( - self.PARTITIONING_GROUP_NAME, all_alarm_ids) - return list(filter(lambda a: a.alarm_id in selected, all_alarms)) + before = (timeutils.utcnow() - datetime.timedelta( + seconds=self.conf.evaluation_interval / 2)) + selected = self.storage_conn.get_alarms( + enabled=True, + type={'ne': 'event'}, + evaluate_timestamp={'lt': before}, + ) + + if self.partition_coordinator.is_active(): + all_alarm_ids = [a.alarm_id for a in selected] + selected_ids = self.partition_coordinator.extract_my_subset( + self.PARTITIONING_GROUP_NAME, all_alarm_ids + ) + selected = [a for a in selected if a.alarm_id in selected_ids] + + return selected diff --git a/aodh/evaluator/event.py b/aodh/evaluator/event.py index 2b4dd1e18..9ee8f74b2 100644 --- a/aodh/evaluator/event.py +++ b/aodh/evaluator/event.py @@ -193,8 +193,8 @@ class EventAlarmEvaluator(evaluator.Evaluator): # this function update only alarms changed from the last access. alarms = {a.alarm_id: Alarm(a) for a in self._storage_conn.get_alarms(enabled=True, - alarm_type='event', - project=project)} + type='event', + project_id=project)} if self.conf.event_alarm_cache_ttl: self.caches[project] = { diff --git a/aodh/service.py b/aodh/service.py index 1321374c5..e83198791 100644 --- a/aodh/service.py +++ b/aodh/service.py @@ -72,8 +72,15 @@ def prepare_service(argv=None, config_files=None): conf = cfg.ConfigOpts() oslo_i18n.enable_lazy() log.register_options(conf) - log_levels = (conf.default_log_levels + - ['futurist=INFO', 'keystoneclient=INFO']) + log_levels = ( + conf.default_log_levels + + [ + 'futurist=INFO', + 'keystoneclient=INFO', + 'oslo_db.sqlalchemy=WARN', + 'cotyledon=INFO' + ] + ) log.set_defaults(default_log_levels=log_levels) defaults.set_cors_middleware_defaults() db_options.set_defaults(conf) diff --git a/aodh/storage/base.py b/aodh/storage/base.py index 6399d538b..b6e66e3a7 100644 --- a/aodh/storage/base.py +++ b/aodh/storage/base.py @@ -91,24 +91,8 @@ class Connection(object): """Migrate the database to `version` or the most recent version.""" @staticmethod - def get_alarms(name=None, user=None, state=None, meter=None, - project=None, enabled=None, alarm_id=None, - alarm_type=None, severity=None, exclude=None, - pagination=None): - """Yields a lists of alarms that match filters. - - :param name: Optional name for alarm. - :param user: Optional ID for user that owns the resource. - :param state: Optional string for alarm state. - :param meter: Optional string for alarms associated with meter. - :param project: Optional ID for project that owns the resource. - :param enabled: Optional boolean to list disable alarm. - :param alarm_id: Optional alarm_id to return one alarm. - :param alarm_type: Optional alarm type. - :param severity: Optional alarm severity. - :param exclude: Optional dict for inequality constraint. - :param pagination: Pagination parameters. - """ + def get_alarms(*args, **kwargs): + """Yields a lists of alarms that match filters.""" raise aodh.NotImplementedError('Alarms not implemented') @staticmethod diff --git a/aodh/storage/impl_sqlalchemy.py b/aodh/storage/impl_sqlalchemy.py index 4ace54016..5bfbace3c 100644 --- a/aodh/storage/impl_sqlalchemy.py +++ b/aodh/storage/impl_sqlalchemy.py @@ -25,7 +25,6 @@ from oslo_db.sqlalchemy import utils as oslo_sql_utils from oslo_log import log from oslo_utils import importutils from oslo_utils import timeutils -import six import sqlalchemy from sqlalchemy import asc from sqlalchemy import desc @@ -58,6 +57,41 @@ AVAILABLE_STORAGE_CAPABILITIES = { } +def apply_filters(query, model, **filters): + filter_dict = {} + + for key, value in filters.items(): + column_attr = getattr(model, key) + + if isinstance(value, dict): + if 'in' in value: + query = query.filter(column_attr.in_(value['in'])) + elif 'nin' in value: + query = query.filter(~column_attr.in_(value['nin'])) + elif 'ne' in value: + query = query.filter(column_attr != value['ne']) + elif 'gt' in value: + query = query.filter(column_attr > value['gt']) + elif 'ge' in value: + query = query.filter(column_attr >= value['ge']) + elif 'lt' in value: + query = query.filter(column_attr < value['lt']) + elif 'le' in value: + query = query.filter(column_attr <= value['le']) + elif 'eq' in value: + query = query.filter(column_attr == value['eq']) + elif 'has' in value: + like_pattern = '%{0}%'.format(value['has']) + query = query.filter(column_attr.like(like_pattern)) + else: + filter_dict[key] = value + + if filter_dict: + query = query.filter_by(**filter_dict) + + return query + + class Connection(base.Connection): """Put the data into a SQLAlchemy database. """ CAPABILITIES = base.update_nested(base.Connection.CAPABILITIES, @@ -147,28 +181,30 @@ class Connection(base.Connection): @staticmethod def _row_to_alarm_model(row): - return alarm_api_models.Alarm(alarm_id=row.alarm_id, - enabled=row.enabled, - type=row.type, - name=row.name, - description=row.description, - timestamp=row.timestamp, - user_id=row.user_id, - project_id=row.project_id, - state=row.state, - state_timestamp=row.state_timestamp, - state_reason=row.state_reason, - ok_actions=row.ok_actions, - alarm_actions=row.alarm_actions, - insufficient_data_actions=( - row.insufficient_data_actions), - rule=row.rule, - time_constraints=row.time_constraints, - repeat_actions=row.repeat_actions, - severity=row.severity) + return alarm_api_models.Alarm( + alarm_id=row.alarm_id, + enabled=row.enabled, + type=row.type, + name=row.name, + description=row.description, + timestamp=row.timestamp, + user_id=row.user_id, + project_id=row.project_id, + state=row.state, + state_timestamp=row.state_timestamp, + state_reason=row.state_reason, + ok_actions=row.ok_actions, + alarm_actions=row.alarm_actions, + insufficient_data_actions=(row.insufficient_data_actions), + rule=row.rule, + time_constraints=row.time_constraints, + repeat_actions=row.repeat_actions, + severity=row.severity, + evaluate_timestamp=row.evaluate_timestamp + ) def _retrieve_alarms(self, query): - return (self._row_to_alarm_model(x) for x in query.all()) + return [self._row_to_alarm_model(x) for x in query.all()] @staticmethod def _get_pagination_query(session, query, pagination, api_model, model): @@ -204,50 +240,15 @@ class Connection(base.Connection): return oslo_sql_utils.paginate_query( query, model, limit, sort_keys, sort_dirs=sort_dirs, marker=marker) - def get_alarms(self, name=None, user=None, state=None, meter=None, - project=None, enabled=None, alarm_id=None, - alarm_type=None, severity=None, exclude=None, - pagination=None): - """Yields a lists of alarms that match filters. - - :param name: Optional name for alarm. - :param user: Optional ID for user that owns the resource. - :param state: Optional string for alarm state. - :param meter: Optional string for alarms associated with meter. - :param project: Optional ID for project that owns the resource. - :param enabled: Optional boolean to list disable alarm. - :param alarm_id: Optional alarm_id to return one alarm. - :param alarm_type: Optional alarm type. - :param severity: Optional alarm severity. - :param exclude: Optional dict for inequality constraint. - :param pagination: Pagination query parameters. - """ - + def get_alarms(self, meter=None, pagination=None, **kwargs): + """Yields a lists of alarms that match filters.""" pagination = pagination or {} session = self._engine_facade.get_session() query = session.query(models.Alarm) - if name is not None: - query = query.filter(models.Alarm.name == name) - if enabled is not None: - query = query.filter(models.Alarm.enabled == enabled) - if user is not None: - query = query.filter(models.Alarm.user_id == user) - if project is not None: - query = query.filter(models.Alarm.project_id == project) - if alarm_id is not None: - query = query.filter(models.Alarm.alarm_id == alarm_id) - if state is not None: - query = query.filter(models.Alarm.state == state) - if alarm_type is not None: - query = query.filter(models.Alarm.type == alarm_type) - if severity is not None: - query = query.filter(models.Alarm.severity == severity) - if exclude is not None: - for key, value in six.iteritems(exclude): - query = query.filter(getattr(models.Alarm, key) != value) - + query = apply_filters(query, models.Alarm, **kwargs) query = self._get_pagination_query( session, query, pagination, alarm_api_models.Alarm, models.Alarm) + alarms = self._retrieve_alarms(query) # TODO(cmart): improve this by using sqlalchemy.func factory @@ -416,3 +417,18 @@ class Connection(base.Connection): .delete()) LOG.info("%d alarm histories are removed from database", deleted_rows) + + def conditional_update(self, model, values, expected_values, filters=None): + """Compare-and-swap conditional update SQLAlchemy implementation.""" + filters = filters or {} + filters.update(expected_values) + + session = self._engine_facade.get_session() + query = session.query(model) + if filters: + query = query.filter_by(**filters) + + update_args = {'synchronize_session': False} + + result = query.update(values, **update_args) + return 0 != result diff --git a/aodh/storage/models.py b/aodh/storage/models.py index 43358054f..ad2425d3d 100644 --- a/aodh/storage/models.py +++ b/aodh/storage/models.py @@ -68,12 +68,14 @@ class Alarm(base.Model): :param repeat_actions: Is the actions should be triggered on each alarm evaluation. :param severity: Alarm level (low/moderate/critical) + :param evaluate_timestamp: The timestamp when the alarm is finished + evaluating. """ def __init__(self, alarm_id, type, enabled, name, description, timestamp, user_id, project_id, state, state_timestamp, state_reason, ok_actions, alarm_actions, insufficient_data_actions, repeat_actions, rule, - time_constraints, severity=None): + time_constraints, severity=None, evaluate_timestamp=None): if not isinstance(timestamp, datetime.datetime): raise TypeError(_("timestamp should be datetime object")) if not isinstance(state_timestamp, datetime.datetime): @@ -97,7 +99,8 @@ class Alarm(base.Model): repeat_actions=repeat_actions, rule=rule, time_constraints=time_constraints, - severity=severity) + severity=severity, + evaluate_timestamp=evaluate_timestamp) class AlarmChange(base.Model): diff --git a/aodh/storage/sqlalchemy/alembic/versions/006_add_evaluate_timestamp_to_alarm.py b/aodh/storage/sqlalchemy/alembic/versions/006_add_evaluate_timestamp_to_alarm.py new file mode 100644 index 000000000..5c89670ba --- /dev/null +++ b/aodh/storage/sqlalchemy/alembic/versions/006_add_evaluate_timestamp_to_alarm.py @@ -0,0 +1,36 @@ +# Copyright 2019 Catalyst Cloud Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Add evaluate_timestamp column to alarm table + +Revision ID: 006 +Revises: 6ae0d05d9451 +Create Date: 2019-12-05 11:23:42.379029 +""" + +# revision identifiers, used by Alembic. +revision = '006' +down_revision = '6ae0d05d9451' + +from alembic import op +from oslo_utils import timeutils +import sqlalchemy as sa + + +def upgrade(): + op.add_column( + 'alarm', + sa.Column('evaluate_timestamp', sa.DateTime(), nullable=True, + server_default=str(timeutils.utcnow())) + ) diff --git a/aodh/storage/sqlalchemy/models.py b/aodh/storage/sqlalchemy/models.py index f8d5eb2ba..b0830289f 100644 --- a/aodh/storage/sqlalchemy/models.py +++ b/aodh/storage/sqlalchemy/models.py @@ -106,6 +106,8 @@ class Alarm(Base): rule = Column(JSONEncodedDict) time_constraints = Column(JSONEncodedDict) + evaluate_timestamp = Column(DateTime, default=lambda: timeutils.utcnow()) + class AlarmChange(Base): """Define AlarmChange data.""" diff --git a/aodh/tests/base.py b/aodh/tests/base.py index 2a4942e0c..1c0811c34 100644 --- a/aodh/tests/base.py +++ b/aodh/tests/base.py @@ -71,6 +71,14 @@ class BaseTestCase(base.BaseTestCase): except (TypeError, AttributeError): self.fail("%s doesn't have length" % type(obj)) + def assertDictContains(self, parent, child): + """Checks whether child dict is a subset of parent. + + assertDictContainsSubset() in standard Python 2.7 has been deprecated + since Python 3.2 + """ + self.assertEqual(parent, dict(parent, **child)) + @staticmethod def path_get(project_file=None): root = os.path.abspath(os.path.join(os.path.dirname(__file__), diff --git a/aodh/tests/functional/api/v2/test_alarm_scenarios.py b/aodh/tests/functional/api/v2/test_alarm_scenarios.py index 0d29edd36..79206771f 100644 --- a/aodh/tests/functional/api/v2/test_alarm_scenarios.py +++ b/aodh/tests/functional/api/v2/test_alarm_scenarios.py @@ -14,6 +14,7 @@ # under the License. """Tests alarm operation.""" +import copy import datetime import json as jsonlib import operator @@ -206,9 +207,6 @@ class TestAlarms(TestAlarmsBase): 'value': isotime}], expect_errors=True) self.assertEqual(resp.status_code, 400) - self.assertEqual(resp.json['error_message']['faultstring'], - 'Unknown argument: "timestamp": ' - 'not valid for this resource') def test_alarms_query_with_state(self): alarm = models.Alarm(name='disabled', @@ -260,14 +258,38 @@ class TestAlarms(TestAlarmsBase): self.assertEqual(set(['gnocchi_aggregation_by_metrics_threshold']), set(alarm['type'] for alarm in alarms)) + def test_list_alarms_all_projects_by_admin(self): + auth_headers = copy.copy(self.auth_headers) + auth_headers['X-Roles'] = 'admin' + + alarms = self.get_json( + '/alarms', + headers=auth_headers, + q=[{'field': 'all_projects', 'op': 'eq', 'value': 'true'}] + ) + + self.assertEqual(3, len(alarms)) + + def test_list_alarms_all_projects_forbidden(self): + response = self.get_json( + '/alarms', + headers=self.auth_headers, + q=[{'field': 'all_projects', 'op': 'eq', 'value': 'true'}], + expect_errors=True, + status=401 + ) + + faultstring = 'RBAC Authorization Failed' + self.assertIn(faultstring, + response.json['error_message']['faultstring']) + def test_get_not_existing_alarm(self): resp = self.get_json('/alarms/alarm-id-3', headers=self.auth_headers, expect_errors=True) self.assertEqual(404, resp.status_code) - self.assertEqual('Alarm alarm-id-3 not found in project %s' % - self.auth_headers["X-Project-Id"], - resp.json['error_message']['faultstring']) + self.assertIn('Alarm alarm-id-3 not found', + resp.json['error_message']['faultstring']) def test_get_alarm(self): alarms = self.get_json('/alarms', @@ -344,13 +366,11 @@ class TestAlarms(TestAlarmsBase): expect_errors=True, status=400, headers=self.auth_headers) - faultstring = ('Invalid input for field/attribute op. ' - 'Value: \'%(op)s\'. unimplemented operator ' - 'for %(field)s' % {'field': field, 'op': op}) - self.assertEqual(faultstring, - response.json['error_message']['faultstring']) - _test('project', 'ne') + faultstring = 'Invalid input for field/attribute op' + self.assertIn(faultstring, + response.json['error_message']['faultstring']) + _test('project_id', 'ne') def test_get_alarm_project_filter_normal_user(self): @@ -364,7 +384,6 @@ class TestAlarms(TestAlarmsBase): 'value': project}]) self.assertEqual(3, len(alarms)) - _test('project') _test('project_id') def test_get_alarm_other_project_normal_user(self): @@ -376,11 +395,10 @@ class TestAlarms(TestAlarmsBase): expect_errors=True, status=401, headers=self.auth_headers) - faultstring = 'Not Authorized to access project other-project' - self.assertEqual(faultstring, - response.json['error_message']['faultstring']) + faultstring = 'Not Authorized to access' + self.assertIn(faultstring, + response.json['error_message']['faultstring']) - _test('project') _test('project_id') def test_get_alarm_forbiden(self): diff --git a/aodh/tests/functional/api/v2/test_query.py b/aodh/tests/functional/api/v2/test_query.py index 0e244f4f3..f4e4d0cf2 100644 --- a/aodh/tests/functional/api/v2/test_query.py +++ b/aodh/tests/functional/api/v2/test_query.py @@ -35,121 +35,121 @@ class TestQuery(base.BaseTestCase): self.useFixture(fixtures.MonkeyPatch( 'pecan.response', mock.MagicMock())) - def test_get_value_as_type_with_integer(self): + def test_get_value_with_integer(self): query = v2_base.Query(field='metadata.size', op='eq', value='123', type='integer') expected = 123 - self.assertEqual(expected, query._get_value_as_type()) + self.assertEqual(expected, query.get_value()) - def test_get_value_as_type_with_float(self): + def test_get_value_with_float(self): query = v2_base.Query(field='metadata.size', op='eq', value='123.456', type='float') expected = 123.456 - self.assertEqual(expected, query._get_value_as_type()) + self.assertEqual(expected, query.get_value()) - def test_get_value_as_type_with_boolean(self): + def test_get_value_with_boolean(self): query = v2_base.Query(field='metadata.is_public', op='eq', value='True', type='boolean') expected = True - self.assertEqual(expected, query._get_value_as_type()) + self.assertEqual(expected, query.get_value()) - def test_get_value_as_type_with_string(self): + def test_get_value_with_string(self): query = v2_base.Query(field='metadata.name', op='eq', value='linux', type='string') expected = 'linux' - self.assertEqual(expected, query._get_value_as_type()) + self.assertEqual(expected, query.get_value()) - def test_get_value_as_type_with_datetime(self): + def test_get_value_with_datetime(self): query = v2_base.Query(field='metadata.date', op='eq', value='2014-01-01T05:00:00', type='datetime') - self.assertIsInstance(query._get_value_as_type(), datetime.datetime) - self.assertIsNone(query._get_value_as_type().tzinfo) + self.assertIsInstance(query.get_value(), datetime.datetime) + self.assertIsNone(query.get_value().tzinfo) - def test_get_value_as_type_with_integer_without_type(self): + def test_get_value_with_integer_without_type(self): query = v2_base.Query(field='metadata.size', op='eq', value='123') expected = 123 - self.assertEqual(expected, query._get_value_as_type()) + self.assertEqual(expected, query.get_value()) - def test_get_value_as_type_with_float_without_type(self): + def test_get_value_with_float_without_type(self): query = v2_base.Query(field='metadata.size', op='eq', value='123.456') expected = 123.456 - self.assertEqual(expected, query._get_value_as_type()) + self.assertEqual(expected, query.get_value()) - def test_get_value_as_type_with_boolean_without_type(self): + def test_get_value_with_boolean_without_type(self): query = v2_base.Query(field='metadata.is_public', op='eq', value='True') expected = True - self.assertEqual(expected, query._get_value_as_type()) + self.assertEqual(expected, query.get_value()) - def test_get_value_as_type_with_string_without_type(self): + def test_get_value_with_string_without_type(self): query = v2_base.Query(field='metadata.name', op='eq', value='linux') expected = 'linux' - self.assertEqual(expected, query._get_value_as_type()) + self.assertEqual(expected, query.get_value()) - def test_get_value_as_type_with_bad_type(self): + def test_get_value_with_bad_type(self): query = v2_base.Query(field='metadata.size', op='eq', value='123.456', type='blob') - self.assertRaises(wsme.exc.ClientSideError, query._get_value_as_type) + self.assertRaises(wsme.exc.ClientSideError, query.get_value) - def test_get_value_as_type_with_bad_value(self): + def test_get_value_with_bad_value(self): query = v2_base.Query(field='metadata.size', op='eq', value='fake', type='integer') - self.assertRaises(wsme.exc.ClientSideError, query._get_value_as_type) + self.assertRaises(wsme.exc.ClientSideError, query.get_value) - def test_get_value_as_type_integer_expression_without_type(self): + def test_get_value_integer_expression_without_type(self): # bug 1221736 query = v2_base.Query(field='should_be_a_string', op='eq', value='WWW-Layer-4a80714f') expected = 'WWW-Layer-4a80714f' - self.assertEqual(expected, query._get_value_as_type()) + self.assertEqual(expected, query.get_value()) - def test_get_value_as_type_boolean_expression_without_type(self): + def test_get_value_boolean_expression_without_type(self): # bug 1221736 query = v2_base.Query(field='should_be_a_string', op='eq', value='True or False') expected = 'True or False' - self.assertEqual(expected, query._get_value_as_type()) + self.assertEqual(expected, query.get_value()) - def test_get_value_as_type_with_syntax_error(self): + def test_get_value_with_syntax_error(self): # bug 1221736 value = 'WWW-Layer-4a80714f-0232-4580-aa5e-81494d1a4147-uolhh25p5xxm' query = v2_base.Query(field='group_id', op='eq', value=value) expected = value - self.assertEqual(expected, query._get_value_as_type()) + self.assertEqual(expected, query.get_value()) - def test_get_value_as_type_with_syntax_error_colons(self): + def test_get_value_with_syntax_error_colons(self): # bug 1221736 value = 'Ref::StackId' query = v2_base.Query(field='field_name', op='eq', value=value) expected = value - self.assertEqual(expected, query._get_value_as_type()) + self.assertEqual(expected, query.get_value()) class TestQueryToKwArgs(tests_base.BaseTestCase): @@ -320,13 +320,8 @@ class TestQueryToKwArgs(tests_base.BaseTestCase): q = [v2_base.Query(field='abc', op='eq', value='abc')] - exc = self.assertRaises( + self.assertRaises( wsme.exc.UnknownArgument, utils.query_to_kwargs, q, - alarm_storage_base.Connection.get_alarms) - valid_keys = ['alarm_id', 'enabled', 'exclude', 'meter', 'name', - 'project', 'severity', 'state', 'type', 'user'] - msg = ("unrecognized field in query: %s, " - "valid keys: %s") % (q, valid_keys) - expected_exc = wsme.exc.UnknownArgument('abc', msg) - self.assertEqual(str(expected_exc), str(exc)) + alarm_storage_base.Connection.get_alarms + ) diff --git a/aodh/tests/functional/storage/test_storage_scenarios.py b/aodh/tests/functional/storage/test_storage_scenarios.py index 2fffe29f9..b20bfdb3a 100644 --- a/aodh/tests/functional/storage/test_storage_scenarios.py +++ b/aodh/tests/functional/storage/test_storage_scenarios.py @@ -166,13 +166,12 @@ class AlarmTest(AlarmTestBase): def test_list_by_type(self): self.add_some_alarms() - alarms = list(self.alarm_conn.get_alarms(alarm_type=ALARM_TYPE)) + alarms = list(self.alarm_conn.get_alarms(type=ALARM_TYPE)) self.assertEqual(3, len(alarms)) def test_list_excluded_by_name(self): self.add_some_alarms() - exclude = {'name': 'yellow-alert'} - alarms = list(self.alarm_conn.get_alarms(exclude=exclude)) + alarms = list(self.alarm_conn.get_alarms(name={'ne': 'yellow-alert'})) self.assertEqual(2, len(alarms)) alarm_names = sorted([a.name for a in alarms]) self.assertEqual(['orange-alert', 'red-alert'], alarm_names) diff --git a/aodh/tests/unit/evaluator/base.py b/aodh/tests/unit/evaluator/base.py index 1363819b4..f1c01ef81 100644 --- a/aodh/tests/unit/evaluator/base.py +++ b/aodh/tests/unit/evaluator/base.py @@ -49,3 +49,11 @@ class TestEvaluatorBase(base.BaseTestCase): def _assert_all_alarms(self, state): for alarm in self.alarms: self.assertEqual(state, alarm.state) + + def assertDictContains(self, parent, child): + """Checks whether child dict is a subset of parent. + + assertDictContainsSubset() in standard Python 2.7 has been deprecated + since Python 3.2 + """ + self.assertEqual(parent, dict(parent, **child)) diff --git a/aodh/tests/unit/evaluator/test_event.py b/aodh/tests/unit/evaluator/test_event.py index 4d682d530..c81bae8ca 100644 --- a/aodh/tests/unit/evaluator/test_event.py +++ b/aodh/tests/unit/evaluator/test_event.py @@ -100,8 +100,8 @@ class TestEventAlarmEvaluate(base.TestEvaluatorBase): if expect_db_queries is not None: expected = [mock.call(enabled=True, - alarm_type='event', - project=p) for p in expect_db_queries] + type='event', + project_id=p) for p in expect_db_queries] self.assertEqual(expected, self.storage_conn.get_alarms.call_args_list) diff --git a/aodh/tests/unit/test_evaluator.py b/aodh/tests/unit/test_evaluator.py index ae4246000..65ffeb68a 100644 --- a/aodh/tests/unit/test_evaluator.py +++ b/aodh/tests/unit/test_evaluator.py @@ -94,7 +94,7 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase): alarm = mock.Mock(type='gnocchi_aggregation_by_metrics_threshold', alarm_id="alarm_id1") self._fake_pc.extract_my_subset.return_value = ["alarm_id1"] - self._fake_pc.is_active.return_value = False + self._fake_pc.is_active.side_effect = [False, False, True, True] self._fake_conn.get_alarms.return_value = [alarm] self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None] @@ -116,7 +116,7 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase): ] self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None] - self._fake_pc.is_active.return_value = False + self._fake_pc.is_active.side_effect = [False, False, True, True, True] self._fake_pc.extract_my_subset.return_value = ['a', 'b'] self._fake_conn.get_alarms.return_value = alarms @@ -150,6 +150,42 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase): svc = evaluator.AlarmEvaluationService(0, self.CONF) self.addCleanup(svc.terminate) time.sleep(1) - expected = [({'enabled': True, 'exclude': {'type': 'event'}},)] - self.assertEqual(expected, - svc.storage_conn.get_alarms.call_args_list) + + child = {'enabled': True, 'type': {'ne': 'event'}} + self.assertDictContains(svc.storage_conn.get_alarms.call_args[1], + child) + + def test_evaluation_cycle_no_coordination(self): + alarm = mock.Mock(type='gnocchi_aggregation_by_metrics_threshold', + alarm_id="alarm_id1") + + self._fake_pc.is_active.return_value = False + self._fake_conn.get_alarms.return_value = [alarm] + self._fake_conn.conditional_update.return_value = True + + svc = evaluator.AlarmEvaluationService(0, self.CONF) + self.addCleanup(svc.terminate) + + time.sleep(1) + + target = svc.partition_coordinator.extract_my_subset + self.assertEqual(0, target.call_count) + + self.threshold_eval.evaluate.assert_called_once_with(alarm) + + def test_evaluation_cycle_no_coordination_alarm_modified(self): + alarm = mock.Mock(type='gnocchi_aggregation_by_metrics_threshold', + alarm_id="alarm_id1") + + self._fake_pc.is_active.return_value = False + self._fake_conn.get_alarms.return_value = [alarm] + self._fake_conn.conditional_update.return_value = False + + svc = evaluator.AlarmEvaluationService(0, self.CONF) + self.addCleanup(svc.terminate) + + time.sleep(1) + + target = svc.partition_coordinator.extract_my_subset + self.assertEqual(0, target.call_count) + self.assertEqual(0, self.threshold_eval.evaluate.call_count) diff --git a/releasenotes/notes/ussuri-support-builtin-active-active-aodh-evaluator-a935577e17a211ea.yaml b/releasenotes/notes/ussuri-support-builtin-active-active-aodh-evaluator-a935577e17a211ea.yaml new file mode 100644 index 000000000..24c361a14 --- /dev/null +++ b/releasenotes/notes/ussuri-support-builtin-active-active-aodh-evaluator-a935577e17a211ea.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + The admin user can fetch alarms of all the projects, e.g. + ``curl -X GET "${aodh_prefix}/v2/alarms?q.field=all_projects&q.op=eq&q.value=true" X-Auth-Token:$token`` diff --git a/releasenotes/notes/ussuri-support-query-all-projects-alarms-by-admin-3ecccf2217d711ea.yaml b/releasenotes/notes/ussuri-support-query-all-projects-alarms-by-admin-3ecccf2217d711ea.yaml new file mode 100644 index 000000000..d3e9adef7 --- /dev/null +++ b/releasenotes/notes/ussuri-support-query-all-projects-alarms-by-admin-3ecccf2217d711ea.yaml @@ -0,0 +1,5 @@ +--- +features: + - Support to deploy aodh-evaluator in active/active mode by leveraging + database non-locking mechanism. With this feature, there could be multiple + aodh-evaluator processes running without dependency of etcd or zookeeper.