diff --git a/ceilometer/alarm/storage/impl_sqlalchemy.py b/ceilometer/alarm/storage/impl_sqlalchemy.py new file mode 100644 index 00000000..c7d87d99 --- /dev/null +++ b/ceilometer/alarm/storage/impl_sqlalchemy.py @@ -0,0 +1,308 @@ +# +# Author: John Tran +# Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""SQLAlchemy storage backend.""" + +from __future__ import absolute_import +import os + +from oslo.config import cfg +from sqlalchemy import desc + +from ceilometer.alarm.storage import base +from ceilometer.alarm.storage import models as alarm_api_models +from ceilometer.openstack.common.db.sqlalchemy import migration +import ceilometer.openstack.common.db.sqlalchemy.session as sqlalchemy_session +from ceilometer.openstack.common import log +from ceilometer.storage.sqlalchemy import models +from ceilometer.storage.sqlalchemy import utils as sql_utils +from ceilometer import utils + +LOG = log.getLogger(__name__) + +AVAILABLE_CAPABILITIES = { + 'alarms': {'query': {'simple': True, + 'complex': True}, + 'history': {'query': {'simple': True, + 'complex': True}}}, +} + + +AVAILABLE_STORAGE_CAPABILITIES = { + 'storage': {'production_ready': True}, +} + + +class Connection(base.Connection): + """Put the data into a SQLAlchemy database. + + Tables:: + + - meter + - meter definition + - { id: meter def id + name: meter name + type: meter type + unit: meter unit + } + - sample + - the raw incoming data + - { id: sample id + meter_id: meter id (->meter.id) + user_id: user uuid + project_id: project uuid + resource_id: resource uuid + source_id: source id + resource_metadata: metadata dictionaries + volume: sample volume + timestamp: datetime + message_signature: message signature + message_id: message uuid + } + """ + CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, + AVAILABLE_CAPABILITIES) + STORAGE_CAPABILITIES = utils.update_nested( + base.Connection.STORAGE_CAPABILITIES, + AVAILABLE_STORAGE_CAPABILITIES, + ) + + def __init__(self, url): + self._engine_facade = sqlalchemy_session.EngineFacade.from_config( + url, + cfg.CONF # TODO(Alexei_987) Remove access to global CONF object + ) + + def upgrade(self): + path = os.path.join(os.path.abspath(os.path.dirname(__file__)), + '..', '..', 'storage', 'sqlalchemy', + 'migrate_repo') + migration.db_sync(self._engine_facade.get_engine(), path) + + def clear(self): + engine = self._engine_facade.get_engine() + for table in reversed(models.Base.metadata.sorted_tables): + engine.execute(table.delete()) + self._engine_facade._session_maker.close_all() + engine.dispose() + + def _retrieve_data(self, filter_expr, orderby, limit, table): + if limit == 0: + return [] + + session = self._engine_facade.get_session() + query = session.query(table) + transformer = sql_utils.QueryTransformer(table, query) + if filter_expr is not None: + transformer.apply_filter(filter_expr) + + transformer.apply_options(orderby, + limit) + + retrieve = {models.Alarm: self._retrieve_alarms, + models.AlarmChange: self._retrieve_alarm_history} + return retrieve[table](transformer.get_query()) + + @staticmethod + def _row_to_alarm_model(row): + return alarm_api_models.Alarm(alarm_id=row.alarm_id, + enabled=row.enabled, + type=row.type, + name=row.name, + description=row.description, + timestamp=row.timestamp, + user_id=row.user_id, + project_id=row.project_id, + state=row.state, + state_timestamp=row.state_timestamp, + ok_actions=row.ok_actions, + alarm_actions=row.alarm_actions, + insufficient_data_actions=( + row.insufficient_data_actions), + rule=row.rule, + time_constraints=row.time_constraints, + repeat_actions=row.repeat_actions) + + def _retrieve_alarms(self, query): + return (self._row_to_alarm_model(x) for x in query.all()) + + def get_alarms(self, name=None, user=None, state=None, meter=None, + project=None, enabled=None, alarm_id=None, pagination=None): + """Yields a lists of alarms that match filters + + :param user: Optional ID for user that owns the resource. + :param state: Optional string for alarm state. + :param meter: Optional string for alarms associated with meter. + :param project: Optional ID for project that owns the resource. + :param enabled: Optional boolean to list disable alarm. + :param alarm_id: Optional alarm_id to return one alarm. + :param pagination: Optional pagination query. + """ + + if pagination: + raise NotImplementedError('Pagination not implemented') + + session = self._engine_facade.get_session() + query = session.query(models.Alarm) + if name is not None: + query = query.filter(models.Alarm.name == name) + if enabled is not None: + query = query.filter(models.Alarm.enabled == enabled) + if user is not None: + query = query.filter(models.Alarm.user_id == user) + if project is not None: + query = query.filter(models.Alarm.project_id == project) + if alarm_id is not None: + query = query.filter(models.Alarm.alarm_id == alarm_id) + if state is not None: + query = query.filter(models.Alarm.state == state) + + alarms = self._retrieve_alarms(query) + + # TODO(cmart): improve this by using sqlalchemy.func factory + if meter is not None: + alarms = filter(lambda row: + row.rule.get('meter_name', None) == meter, + alarms) + + return alarms + + def create_alarm(self, alarm): + """Create an alarm. + + :param alarm: The alarm to create. + """ + session = self._engine_facade.get_session() + with session.begin(): + alarm_row = models.Alarm(alarm_id=alarm.alarm_id) + alarm_row.update(alarm.as_dict()) + session.add(alarm_row) + + return self._row_to_alarm_model(alarm_row) + + def update_alarm(self, alarm): + """Update an alarm. + + :param alarm: the new Alarm to update + """ + session = self._engine_facade.get_session() + with session.begin(): + alarm_row = session.merge(models.Alarm(alarm_id=alarm.alarm_id)) + alarm_row.update(alarm.as_dict()) + + return self._row_to_alarm_model(alarm_row) + + def delete_alarm(self, alarm_id): + """Delete an alarm + + :param alarm_id: ID of the alarm to delete + """ + session = self._engine_facade.get_session() + with session.begin(): + session.query(models.Alarm).filter( + models.Alarm.alarm_id == alarm_id).delete() + + @staticmethod + def _row_to_alarm_change_model(row): + return alarm_api_models.AlarmChange(event_id=row.event_id, + alarm_id=row.alarm_id, + type=row.type, + detail=row.detail, + user_id=row.user_id, + project_id=row.project_id, + on_behalf_of=row.on_behalf_of, + timestamp=row.timestamp) + + def query_alarms(self, filter_expr=None, orderby=None, limit=None): + """Yields a lists of alarms that match filter.""" + return self._retrieve_data(filter_expr, orderby, limit, models.Alarm) + + def _retrieve_alarm_history(self, query): + return (self._row_to_alarm_change_model(x) for x in query.all()) + + def query_alarm_history(self, filter_expr=None, orderby=None, limit=None): + """Return an iterable of model.AlarmChange objects.""" + return self._retrieve_data(filter_expr, + orderby, + limit, + models.AlarmChange) + + def get_alarm_changes(self, alarm_id, on_behalf_of, + user=None, project=None, type=None, + start_timestamp=None, start_timestamp_op=None, + end_timestamp=None, end_timestamp_op=None): + """Yields list of AlarmChanges describing alarm history + + Changes are always sorted in reverse order of occurrence, given + the importance of currency. + + Segregation for non-administrative users is done on the basis + of the on_behalf_of parameter. This allows such users to have + visibility on both the changes initiated by themselves directly + (generally creation, rule changes, or deletion) and also on those + changes initiated on their behalf by the alarming service (state + transitions after alarm thresholds are crossed). + + :param alarm_id: ID of alarm to return changes for + :param on_behalf_of: ID of tenant to scope changes query (None for + administrative user, indicating all projects) + :param user: Optional ID of user to return changes for + :param project: Optional ID of project to return changes for + :project type: Optional change type + :param start_timestamp: Optional modified timestamp start range + :param start_timestamp_op: Optional timestamp start range operation + :param end_timestamp: Optional modified timestamp end range + :param end_timestamp_op: Optional timestamp end range operation + """ + session = self._engine_facade.get_session() + query = session.query(models.AlarmChange) + query = query.filter(models.AlarmChange.alarm_id == alarm_id) + + if on_behalf_of is not None: + query = query.filter( + models.AlarmChange.on_behalf_of == on_behalf_of) + if user is not None: + query = query.filter(models.AlarmChange.user_id == user) + if project is not None: + query = query.filter(models.AlarmChange.project_id == project) + if type is not None: + query = query.filter(models.AlarmChange.type == type) + if start_timestamp: + if start_timestamp_op == 'gt': + query = query.filter( + models.AlarmChange.timestamp > start_timestamp) + else: + query = query.filter( + models.AlarmChange.timestamp >= start_timestamp) + if end_timestamp: + if end_timestamp_op == 'le': + query = query.filter( + models.AlarmChange.timestamp <= end_timestamp) + else: + query = query.filter( + models.AlarmChange.timestamp < end_timestamp) + + query = query.order_by(desc(models.AlarmChange.timestamp)) + return self._retrieve_alarm_history(query) + + def record_alarm_change(self, alarm_change): + """Record alarm change event.""" + session = self._engine_facade.get_session() + with session.begin(): + alarm_change_row = models.AlarmChange( + event_id=alarm_change['event_id']) + alarm_change_row.update(alarm_change) + session.add(alarm_change_row) diff --git a/ceilometer/storage/impl_sqlalchemy.py b/ceilometer/storage/impl_sqlalchemy.py index 12bf6cc8..0e133c31 100644 --- a/ceilometer/storage/impl_sqlalchemy.py +++ b/ceilometer/storage/impl_sqlalchemy.py @@ -20,20 +20,13 @@ from __future__ import absolute_import import datetime import operator import os -import types from oslo.config import cfg from sqlalchemy import and_ -from sqlalchemy import asc -from sqlalchemy import desc from sqlalchemy import distinct from sqlalchemy import func -from sqlalchemy import not_ -from sqlalchemy import or_ from sqlalchemy.orm import aliased -from ceilometer.alarm.storage import base as alarm_base -from ceilometer.alarm.storage import models as alarm_api_models from ceilometer.openstack.common.db import exception as dbexc from ceilometer.openstack.common.db.sqlalchemy import migration import ceilometer.openstack.common.db.sqlalchemy.session as sqlalchemy_session @@ -44,19 +37,12 @@ from ceilometer import storage from ceilometer.storage import base from ceilometer.storage import models as api_models from ceilometer.storage.sqlalchemy import models +from ceilometer.storage.sqlalchemy import utils as sql_utils from ceilometer import utils LOG = log.getLogger(__name__) -META_TYPE_MAP = {bool: models.MetaBool, - str: models.MetaText, - unicode: models.MetaText, - types.NoneType: models.MetaText, - int: models.MetaBigInt, - long: models.MetaBigInt, - float: models.MetaFloat} - STANDARD_AGGREGATES = dict( avg=func.avg(models.Sample.volume).label('avg'), sum=func.sum(models.Sample.volume).label('sum'), @@ -103,10 +89,6 @@ AVAILABLE_CAPABILITIES = { 'stddev': True, 'cardinality': True}} }, - 'alarms': {'query': {'simple': True, - 'complex': True}, - 'history': {'query': {'simple': True, - 'complex': True}}}, 'events': {'query': {'simple': True}}, } @@ -126,7 +108,7 @@ def apply_metaquery_filter(session, query, metaquery): for k, value in metaquery.iteritems(): key = k[9:] # strip out 'metadata.' prefix try: - _model = META_TYPE_MAP[type(value)] + _model = sql_utils.META_TYPE_MAP[type(value)] except KeyError: raise NotImplementedError('Query on %(key)s is of %(value)s ' 'type and is not supported' % @@ -221,9 +203,7 @@ class Connection(base.Connection): } """ CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, - utils.update_nested( - alarm_base.Connection.CAPABILITIES, - AVAILABLE_CAPABILITIES)) + AVAILABLE_CAPABILITIES) STORAGE_CAPABILITIES = utils.update_nested( base.Connection.STORAGE_CAPABILITIES, AVAILABLE_STORAGE_CAPABILITIES, @@ -297,7 +277,7 @@ class Connection(base.Connection): if isinstance(rmetadata, dict): for key, v in utils.dict_to_keyval(rmetadata): try: - _model = META_TYPE_MAP[type(v)] + _model = sql_utils.META_TYPE_MAP[type(v)] except KeyError: LOG.warn(_("Unknown metadata type. Key (%s) will " "not be queryable."), key) @@ -509,34 +489,24 @@ class Connection(base.Connection): query = session.query(table) query = make_query_from_filter(session, query, sample_filter, require_meter=False) - transformer = QueryTransformer(table, query) + transformer = sql_utils.QueryTransformer(table, query) transformer.apply_options(None, limit) return self._retrieve_samples(transformer.get_query()) - def _retrieve_data(self, filter_expr, orderby, limit, table): + def query_samples(self, filter_expr=None, orderby=None, limit=None): if limit == 0: return [] session = self._engine_facade.get_session() - query = session.query(table) - transformer = QueryTransformer(table, query) + query = session.query(models.MeterSample) + transformer = sql_utils.QueryTransformer(models.MeterSample, query) if filter_expr is not None: transformer.apply_filter(filter_expr) transformer.apply_options(orderby, limit) - - retrieve = {models.MeterSample: self._retrieve_samples, - models.Alarm: self._retrieve_alarms, - models.AlarmChange: self._retrieve_alarm_history} - return retrieve[table](transformer.get_query()) - - def query_samples(self, filter_expr=None, orderby=None, limit=None): - return self._retrieve_data(filter_expr, - orderby, - limit, - models.MeterSample) + return self._retrieve_samples(transformer.get_query()) @staticmethod def _get_aggregate_functions(aggregate): @@ -678,197 +648,6 @@ class Connection(base.Connection): aggregate=aggregate ) - @staticmethod - def _row_to_alarm_model(row): - return alarm_api_models.Alarm(alarm_id=row.alarm_id, - enabled=row.enabled, - type=row.type, - name=row.name, - description=row.description, - timestamp=row.timestamp, - user_id=row.user_id, - project_id=row.project_id, - state=row.state, - state_timestamp=row.state_timestamp, - ok_actions=row.ok_actions, - alarm_actions=row.alarm_actions, - insufficient_data_actions=( - row.insufficient_data_actions), - rule=row.rule, - time_constraints=row.time_constraints, - repeat_actions=row.repeat_actions) - - def _retrieve_alarms(self, query): - return (self._row_to_alarm_model(x) for x in query.all()) - - def get_alarms(self, name=None, user=None, state=None, meter=None, - project=None, enabled=None, alarm_id=None, pagination=None): - """Yields a lists of alarms that match filters - - :param user: Optional ID for user that owns the resource. - :param state: Optional string for alarm state. - :param meter: Optional string for alarms associated with meter. - :param project: Optional ID for project that owns the resource. - :param enabled: Optional boolean to list disable alarm. - :param alarm_id: Optional alarm_id to return one alarm. - :param pagination: Optional pagination query. - """ - - if pagination: - raise NotImplementedError('Pagination not implemented') - - session = self._engine_facade.get_session() - query = session.query(models.Alarm) - if name is not None: - query = query.filter(models.Alarm.name == name) - if enabled is not None: - query = query.filter(models.Alarm.enabled == enabled) - if user is not None: - query = query.filter(models.Alarm.user_id == user) - if project is not None: - query = query.filter(models.Alarm.project_id == project) - if alarm_id is not None: - query = query.filter(models.Alarm.alarm_id == alarm_id) - if state is not None: - query = query.filter(models.Alarm.state == state) - - alarms = self._retrieve_alarms(query) - - # TODO(cmart): improve this by using sqlalchemy.func factory - if meter is not None: - alarms = filter(lambda row: - row.rule.get('meter_name', None) == meter, - alarms) - - return alarms - - def create_alarm(self, alarm): - """Create an alarm. - - :param alarm: The alarm to create. - """ - session = self._engine_facade.get_session() - with session.begin(): - alarm_row = models.Alarm(alarm_id=alarm.alarm_id) - alarm_row.update(alarm.as_dict()) - session.add(alarm_row) - - return self._row_to_alarm_model(alarm_row) - - def update_alarm(self, alarm): - """Update an alarm. - - :param alarm: the new Alarm to update - """ - session = self._engine_facade.get_session() - with session.begin(): - alarm_row = session.merge(models.Alarm(alarm_id=alarm.alarm_id)) - alarm_row.update(alarm.as_dict()) - - return self._row_to_alarm_model(alarm_row) - - def delete_alarm(self, alarm_id): - """Delete an alarm - - :param alarm_id: ID of the alarm to delete - """ - session = self._engine_facade.get_session() - with session.begin(): - session.query(models.Alarm).filter( - models.Alarm.alarm_id == alarm_id).delete() - - @staticmethod - def _row_to_alarm_change_model(row): - return alarm_api_models.AlarmChange(event_id=row.event_id, - alarm_id=row.alarm_id, - type=row.type, - detail=row.detail, - user_id=row.user_id, - project_id=row.project_id, - on_behalf_of=row.on_behalf_of, - timestamp=row.timestamp) - - def query_alarms(self, filter_expr=None, orderby=None, limit=None): - """Yields a lists of alarms that match filter.""" - return self._retrieve_data(filter_expr, orderby, limit, models.Alarm) - - def _retrieve_alarm_history(self, query): - return (self._row_to_alarm_change_model(x) for x in query.all()) - - def query_alarm_history(self, filter_expr=None, orderby=None, limit=None): - """Return an iterable of model.AlarmChange objects.""" - return self._retrieve_data(filter_expr, - orderby, - limit, - models.AlarmChange) - - def get_alarm_changes(self, alarm_id, on_behalf_of, - user=None, project=None, type=None, - start_timestamp=None, start_timestamp_op=None, - end_timestamp=None, end_timestamp_op=None): - """Yields list of AlarmChanges describing alarm history - - Changes are always sorted in reverse order of occurrence, given - the importance of currency. - - Segregation for non-administrative users is done on the basis - of the on_behalf_of parameter. This allows such users to have - visibility on both the changes initiated by themselves directly - (generally creation, rule changes, or deletion) and also on those - changes initiated on their behalf by the alarming service (state - transitions after alarm thresholds are crossed). - - :param alarm_id: ID of alarm to return changes for - :param on_behalf_of: ID of tenant to scope changes query (None for - administrative user, indicating all projects) - :param user: Optional ID of user to return changes for - :param project: Optional ID of project to return changes for - :project type: Optional change type - :param start_timestamp: Optional modified timestamp start range - :param start_timestamp_op: Optional timestamp start range operation - :param end_timestamp: Optional modified timestamp end range - :param end_timestamp_op: Optional timestamp end range operation - """ - session = self._engine_facade.get_session() - query = session.query(models.AlarmChange) - query = query.filter(models.AlarmChange.alarm_id == alarm_id) - - if on_behalf_of is not None: - query = query.filter( - models.AlarmChange.on_behalf_of == on_behalf_of) - if user is not None: - query = query.filter(models.AlarmChange.user_id == user) - if project is not None: - query = query.filter(models.AlarmChange.project_id == project) - if type is not None: - query = query.filter(models.AlarmChange.type == type) - if start_timestamp: - if start_timestamp_op == 'gt': - query = query.filter( - models.AlarmChange.timestamp > start_timestamp) - else: - query = query.filter( - models.AlarmChange.timestamp >= start_timestamp) - if end_timestamp: - if end_timestamp_op == 'le': - query = query.filter( - models.AlarmChange.timestamp <= end_timestamp) - else: - query = query.filter( - models.AlarmChange.timestamp < end_timestamp) - - query = query.order_by(desc(models.AlarmChange.timestamp)) - return self._retrieve_alarm_history(query) - - def record_alarm_change(self, alarm_change): - """Record alarm change event.""" - session = self._engine_facade.get_session() - with session.begin(): - alarm_change_row = models.AlarmChange( - event_id=alarm_change['event_id']) - alarm_change_row.update(alarm_change) - session.add(alarm_change_row) - def _get_or_create_trait_type(self, trait_type, data_type, session=None): """Find if this trait already exists in the database. @@ -1143,91 +922,3 @@ class Connection(base.Connection): yield api_models.Trait(name=type.desc, dtype=type.data_type, value=trait.get_value()) - - -class QueryTransformer(object): - operators = {"=": operator.eq, - "<": operator.lt, - ">": operator.gt, - "<=": operator.le, - "=<": operator.le, - ">=": operator.ge, - "=>": operator.ge, - "!=": operator.ne, - "in": lambda field_name, values: field_name.in_(values)} - - complex_operators = {"or": or_, - "and": and_, - "not": not_} - - ordering_functions = {"asc": asc, - "desc": desc} - - def __init__(self, table, query): - self.table = table - self.query = query - - def _handle_complex_op(self, complex_op, nodes): - op = self.complex_operators[complex_op] - if op == not_: - nodes = [nodes] - element_list = [] - for node in nodes: - element = self._transform(node) - element_list.append(element) - return op(*element_list) - - def _handle_simple_op(self, simple_op, nodes): - op = self.operators[simple_op] - field_name = nodes.keys()[0] - value = nodes.values()[0] - if field_name.startswith('resource_metadata.'): - return self._handle_metadata(op, field_name, value) - else: - return op(getattr(self.table, field_name), value) - - def _handle_metadata(self, op, field_name, value): - if op == self.operators["in"]: - raise NotImplementedError('Metadata query with in ' - 'operator is not implemented') - - field_name = field_name[len('resource_metadata.'):] - meta_table = META_TYPE_MAP[type(value)] - meta_alias = aliased(meta_table) - on_clause = and_(self.table.id == meta_alias.id, - meta_alias.meta_key == field_name) - # outer join is needed to support metaquery - # with or operator on non existent metadata field - # see: test_query_non_existing_metadata_with_result - # test case. - self.query = self.query.outerjoin(meta_alias, on_clause) - return op(meta_alias.value, value) - - def _transform(self, sub_tree): - operator = sub_tree.keys()[0] - nodes = sub_tree.values()[0] - if operator in self.complex_operators: - return self._handle_complex_op(operator, nodes) - else: - return self._handle_simple_op(operator, nodes) - - def apply_filter(self, expression_tree): - condition = self._transform(expression_tree) - self.query = self.query.filter(condition) - - def apply_options(self, orderby, limit): - self._apply_order_by(orderby) - if limit is not None: - self.query = self.query.limit(limit) - - def _apply_order_by(self, orderby): - if orderby is not None: - for field in orderby: - ordering_function = self.ordering_functions[field.values()[0]] - self.query = self.query.order_by(ordering_function( - getattr(self.table, field.keys()[0]))) - else: - self.query = self.query.order_by(desc(self.table.timestamp)) - - def get_query(self): - return self.query diff --git a/ceilometer/storage/sqlalchemy/utils.py b/ceilometer/storage/sqlalchemy/utils.py new file mode 100644 index 00000000..38d6e190 --- /dev/null +++ b/ceilometer/storage/sqlalchemy/utils.py @@ -0,0 +1,124 @@ +# Author: John Tran +# Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import operator +import types + +from sqlalchemy import and_ +from sqlalchemy import asc +from sqlalchemy import desc +from sqlalchemy import not_ +from sqlalchemy import or_ +from sqlalchemy.orm import aliased + +from ceilometer.storage.sqlalchemy import models + + +META_TYPE_MAP = {bool: models.MetaBool, + str: models.MetaText, + unicode: models.MetaText, + types.NoneType: models.MetaText, + int: models.MetaBigInt, + long: models.MetaBigInt, + float: models.MetaFloat} + + +class QueryTransformer(object): + operators = {"=": operator.eq, + "<": operator.lt, + ">": operator.gt, + "<=": operator.le, + "=<": operator.le, + ">=": operator.ge, + "=>": operator.ge, + "!=": operator.ne, + "in": lambda field_name, values: field_name.in_(values)} + + complex_operators = {"or": or_, + "and": and_, + "not": not_} + + ordering_functions = {"asc": asc, + "desc": desc} + + def __init__(self, table, query): + self.table = table + self.query = query + + def _handle_complex_op(self, complex_op, nodes): + op = self.complex_operators[complex_op] + if op == not_: + nodes = [nodes] + element_list = [] + for node in nodes: + element = self._transform(node) + element_list.append(element) + return op(*element_list) + + def _handle_simple_op(self, simple_op, nodes): + op = self.operators[simple_op] + field_name = nodes.keys()[0] + value = nodes.values()[0] + if field_name.startswith('resource_metadata.'): + return self._handle_metadata(op, field_name, value) + else: + return op(getattr(self.table, field_name), value) + + def _handle_metadata(self, op, field_name, value): + if op == self.operators["in"]: + raise NotImplementedError('Metadata query with in ' + 'operator is not implemented') + + field_name = field_name[len('resource_metadata.'):] + meta_table = META_TYPE_MAP[type(value)] + meta_alias = aliased(meta_table) + on_clause = and_(self.table.id == meta_alias.id, + meta_alias.meta_key == field_name) + # outer join is needed to support metaquery + # with or operator on non existent metadata field + # see: test_query_non_existing_metadata_with_result + # test case. + self.query = self.query.outerjoin(meta_alias, on_clause) + return op(meta_alias.value, value) + + def _transform(self, sub_tree): + operator = sub_tree.keys()[0] + nodes = sub_tree.values()[0] + if operator in self.complex_operators: + return self._handle_complex_op(operator, nodes) + else: + return self._handle_simple_op(operator, nodes) + + def apply_filter(self, expression_tree): + condition = self._transform(expression_tree) + self.query = self.query.filter(condition) + + def apply_options(self, orderby, limit): + self._apply_order_by(orderby) + if limit is not None: + self.query = self.query.limit(limit) + + def _apply_order_by(self, orderby): + if orderby is not None: + for field in orderby: + ordering_function = self.ordering_functions[field.values()[0]] + self.query = self.query.order_by(ordering_function( + getattr(self.table, field.keys()[0]))) + else: + self.query = self.query.order_by(desc(self.table.timestamp)) + + def get_query(self): + return self.query diff --git a/ceilometer/tests/storage/test_get_connection.py b/ceilometer/tests/storage/test_get_connection.py index 0e9d044c..60469ea3 100644 --- a/ceilometer/tests/storage/test_get_connection.py +++ b/ceilometer/tests/storage/test_get_connection.py @@ -18,11 +18,11 @@ """ from ceilometer.alarm.storage import impl_log as impl_log_alarm +from ceilometer.alarm.storage import impl_sqlalchemy as impl_sqlalchemy_alarm from ceilometer.openstack.common.fixture import config from ceilometer.openstack.common import test from ceilometer import storage from ceilometer.storage import impl_log -from ceilometer.storage import impl_sqlalchemy import six @@ -65,4 +65,4 @@ class ConnectionConfigTest(test.BaseTestCase): conn = storage.get_connection_from_config(self.CONF, 'metering')._conn self.assertIsInstance(conn, impl_log.Connection) conn = storage.get_connection_from_config(self.CONF, 'alarm')._conn - self.assertIsInstance(conn, impl_sqlalchemy.Connection) + self.assertIsInstance(conn, impl_sqlalchemy_alarm.Connection) diff --git a/ceilometer/tests/storage/test_impl_sqlalchemy.py b/ceilometer/tests/storage/test_impl_sqlalchemy.py index 59162944..c1c232d7 100644 --- a/ceilometer/tests/storage/test_impl_sqlalchemy.py +++ b/ceilometer/tests/storage/test_impl_sqlalchemy.py @@ -27,6 +27,7 @@ import repr import mock +from ceilometer.alarm.storage import impl_sqlalchemy as impl_sqla_alarm from ceilometer.openstack.common import timeutils from ceilometer.storage import impl_sqlalchemy from ceilometer.storage import models @@ -226,16 +227,23 @@ class CapabilitiesTest(test_base.BaseTestCase): 'stddev': True, 'cardinality': True}} }, - 'alarms': {'query': {'simple': True, - 'complex': True}, - 'history': {'query': {'simple': True, - 'complex': True}}}, 'events': {'query': {'simple': True}} } actual_capabilities = impl_sqlalchemy.Connection.get_capabilities() self.assertEqual(expected_capabilities, actual_capabilities) + def test_alarm_capabilities(self): + expected_capabilities = { + 'alarms': {'query': {'simple': True, + 'complex': True}, + 'history': {'query': {'simple': True, + 'complex': True}}}, + } + + actual_capabilities = impl_sqla_alarm.Connection.get_capabilities() + self.assertEqual(expected_capabilities, actual_capabilities) + def test_storage_capabilities(self): expected_capabilities = { 'storage': {'production_ready': True}, diff --git a/setup.cfg b/setup.cfg index fd7ff3fb..3cdf6fc7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -164,9 +164,9 @@ ceilometer.poll.central = ceilometer.alarm.storage = log = ceilometer.alarm.storage.impl_log:Connection mongodb = ceilometer.alarm.storage.impl_mongodb:Connection - mysql = ceilometer.storage.impl_sqlalchemy:Connection - postgresql = ceilometer.storage.impl_sqlalchemy:Connection - sqlite = ceilometer.storage.impl_sqlalchemy:Connection + mysql = ceilometer.alarm.storage.impl_sqlalchemy:Connection + postgresql = ceilometer.alarm.storage.impl_sqlalchemy:Connection + sqlite = ceilometer.alarm.storage.impl_sqlalchemy:Connection hbase = ceilometer.alarm.storage.impl_hbase:Connection db2 = ceilometer.alarm.storage.impl_db2:Connection