Move sqlalchemy alarms driver code to alarm tree

This change move the alarms related code of sqlalchemy driver to
the alarm storage subtree.

Partial implements blueprint dedicated-alarm-database

Change-Id: I70047a321581ee8b12533a8a5834d26e9d203532
This commit is contained in:
Mehdi Abaakouk 2014-07-21 10:48:51 +02:00
parent a77dd2b540
commit 74cdcbfcae
6 changed files with 458 additions and 327 deletions

View File

@ -0,0 +1,308 @@
#
# Author: John Tran <jhtran@att.com>
# Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""SQLAlchemy storage backend."""
from __future__ import absolute_import
import os
from oslo.config import cfg
from sqlalchemy import desc
from ceilometer.alarm.storage import base
from ceilometer.alarm.storage import models as alarm_api_models
from ceilometer.openstack.common.db.sqlalchemy import migration
import ceilometer.openstack.common.db.sqlalchemy.session as sqlalchemy_session
from ceilometer.openstack.common import log
from ceilometer.storage.sqlalchemy import models
from ceilometer.storage.sqlalchemy import utils as sql_utils
from ceilometer import utils
LOG = log.getLogger(__name__)
AVAILABLE_CAPABILITIES = {
'alarms': {'query': {'simple': True,
'complex': True},
'history': {'query': {'simple': True,
'complex': True}}},
}
AVAILABLE_STORAGE_CAPABILITIES = {
'storage': {'production_ready': True},
}
class Connection(base.Connection):
"""Put the data into a SQLAlchemy database.
Tables::
- meter
- meter definition
- { id: meter def id
name: meter name
type: meter type
unit: meter unit
}
- sample
- the raw incoming data
- { id: sample id
meter_id: meter id (->meter.id)
user_id: user uuid
project_id: project uuid
resource_id: resource uuid
source_id: source id
resource_metadata: metadata dictionaries
volume: sample volume
timestamp: datetime
message_signature: message signature
message_id: message uuid
}
"""
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
AVAILABLE_CAPABILITIES)
STORAGE_CAPABILITIES = utils.update_nested(
base.Connection.STORAGE_CAPABILITIES,
AVAILABLE_STORAGE_CAPABILITIES,
)
def __init__(self, url):
self._engine_facade = sqlalchemy_session.EngineFacade.from_config(
url,
cfg.CONF # TODO(Alexei_987) Remove access to global CONF object
)
def upgrade(self):
path = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'..', '..', 'storage', 'sqlalchemy',
'migrate_repo')
migration.db_sync(self._engine_facade.get_engine(), path)
def clear(self):
engine = self._engine_facade.get_engine()
for table in reversed(models.Base.metadata.sorted_tables):
engine.execute(table.delete())
self._engine_facade._session_maker.close_all()
engine.dispose()
def _retrieve_data(self, filter_expr, orderby, limit, table):
if limit == 0:
return []
session = self._engine_facade.get_session()
query = session.query(table)
transformer = sql_utils.QueryTransformer(table, query)
if filter_expr is not None:
transformer.apply_filter(filter_expr)
transformer.apply_options(orderby,
limit)
retrieve = {models.Alarm: self._retrieve_alarms,
models.AlarmChange: self._retrieve_alarm_history}
return retrieve[table](transformer.get_query())
@staticmethod
def _row_to_alarm_model(row):
return alarm_api_models.Alarm(alarm_id=row.alarm_id,
enabled=row.enabled,
type=row.type,
name=row.name,
description=row.description,
timestamp=row.timestamp,
user_id=row.user_id,
project_id=row.project_id,
state=row.state,
state_timestamp=row.state_timestamp,
ok_actions=row.ok_actions,
alarm_actions=row.alarm_actions,
insufficient_data_actions=(
row.insufficient_data_actions),
rule=row.rule,
time_constraints=row.time_constraints,
repeat_actions=row.repeat_actions)
def _retrieve_alarms(self, query):
return (self._row_to_alarm_model(x) for x in query.all())
def get_alarms(self, name=None, user=None, state=None, meter=None,
project=None, enabled=None, alarm_id=None, pagination=None):
"""Yields a lists of alarms that match filters
:param user: Optional ID for user that owns the resource.
:param state: Optional string for alarm state.
:param meter: Optional string for alarms associated with meter.
:param project: Optional ID for project that owns the resource.
:param enabled: Optional boolean to list disable alarm.
:param alarm_id: Optional alarm_id to return one alarm.
:param pagination: Optional pagination query.
"""
if pagination:
raise NotImplementedError('Pagination not implemented')
session = self._engine_facade.get_session()
query = session.query(models.Alarm)
if name is not None:
query = query.filter(models.Alarm.name == name)
if enabled is not None:
query = query.filter(models.Alarm.enabled == enabled)
if user is not None:
query = query.filter(models.Alarm.user_id == user)
if project is not None:
query = query.filter(models.Alarm.project_id == project)
if alarm_id is not None:
query = query.filter(models.Alarm.alarm_id == alarm_id)
if state is not None:
query = query.filter(models.Alarm.state == state)
alarms = self._retrieve_alarms(query)
# TODO(cmart): improve this by using sqlalchemy.func factory
if meter is not None:
alarms = filter(lambda row:
row.rule.get('meter_name', None) == meter,
alarms)
return alarms
def create_alarm(self, alarm):
"""Create an alarm.
:param alarm: The alarm to create.
"""
session = self._engine_facade.get_session()
with session.begin():
alarm_row = models.Alarm(alarm_id=alarm.alarm_id)
alarm_row.update(alarm.as_dict())
session.add(alarm_row)
return self._row_to_alarm_model(alarm_row)
def update_alarm(self, alarm):
"""Update an alarm.
:param alarm: the new Alarm to update
"""
session = self._engine_facade.get_session()
with session.begin():
alarm_row = session.merge(models.Alarm(alarm_id=alarm.alarm_id))
alarm_row.update(alarm.as_dict())
return self._row_to_alarm_model(alarm_row)
def delete_alarm(self, alarm_id):
"""Delete an alarm
:param alarm_id: ID of the alarm to delete
"""
session = self._engine_facade.get_session()
with session.begin():
session.query(models.Alarm).filter(
models.Alarm.alarm_id == alarm_id).delete()
@staticmethod
def _row_to_alarm_change_model(row):
return alarm_api_models.AlarmChange(event_id=row.event_id,
alarm_id=row.alarm_id,
type=row.type,
detail=row.detail,
user_id=row.user_id,
project_id=row.project_id,
on_behalf_of=row.on_behalf_of,
timestamp=row.timestamp)
def query_alarms(self, filter_expr=None, orderby=None, limit=None):
"""Yields a lists of alarms that match filter."""
return self._retrieve_data(filter_expr, orderby, limit, models.Alarm)
def _retrieve_alarm_history(self, query):
return (self._row_to_alarm_change_model(x) for x in query.all())
def query_alarm_history(self, filter_expr=None, orderby=None, limit=None):
"""Return an iterable of model.AlarmChange objects."""
return self._retrieve_data(filter_expr,
orderby,
limit,
models.AlarmChange)
def get_alarm_changes(self, alarm_id, on_behalf_of,
user=None, project=None, type=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None):
"""Yields list of AlarmChanges describing alarm history
Changes are always sorted in reverse order of occurrence, given
the importance of currency.
Segregation for non-administrative users is done on the basis
of the on_behalf_of parameter. This allows such users to have
visibility on both the changes initiated by themselves directly
(generally creation, rule changes, or deletion) and also on those
changes initiated on their behalf by the alarming service (state
transitions after alarm thresholds are crossed).
:param alarm_id: ID of alarm to return changes for
:param on_behalf_of: ID of tenant to scope changes query (None for
administrative user, indicating all projects)
:param user: Optional ID of user to return changes for
:param project: Optional ID of project to return changes for
:project type: Optional change type
:param start_timestamp: Optional modified timestamp start range
:param start_timestamp_op: Optional timestamp start range operation
:param end_timestamp: Optional modified timestamp end range
:param end_timestamp_op: Optional timestamp end range operation
"""
session = self._engine_facade.get_session()
query = session.query(models.AlarmChange)
query = query.filter(models.AlarmChange.alarm_id == alarm_id)
if on_behalf_of is not None:
query = query.filter(
models.AlarmChange.on_behalf_of == on_behalf_of)
if user is not None:
query = query.filter(models.AlarmChange.user_id == user)
if project is not None:
query = query.filter(models.AlarmChange.project_id == project)
if type is not None:
query = query.filter(models.AlarmChange.type == type)
if start_timestamp:
if start_timestamp_op == 'gt':
query = query.filter(
models.AlarmChange.timestamp > start_timestamp)
else:
query = query.filter(
models.AlarmChange.timestamp >= start_timestamp)
if end_timestamp:
if end_timestamp_op == 'le':
query = query.filter(
models.AlarmChange.timestamp <= end_timestamp)
else:
query = query.filter(
models.AlarmChange.timestamp < end_timestamp)
query = query.order_by(desc(models.AlarmChange.timestamp))
return self._retrieve_alarm_history(query)
def record_alarm_change(self, alarm_change):
"""Record alarm change event."""
session = self._engine_facade.get_session()
with session.begin():
alarm_change_row = models.AlarmChange(
event_id=alarm_change['event_id'])
alarm_change_row.update(alarm_change)
session.add(alarm_change_row)

View File

@ -20,20 +20,13 @@ from __future__ import absolute_import
import datetime
import operator
import os
import types
from oslo.config import cfg
from sqlalchemy import and_
from sqlalchemy import asc
from sqlalchemy import desc
from sqlalchemy import distinct
from sqlalchemy import func
from sqlalchemy import not_
from sqlalchemy import or_
from sqlalchemy.orm import aliased
from ceilometer.alarm.storage import base as alarm_base
from ceilometer.alarm.storage import models as alarm_api_models
from ceilometer.openstack.common.db import exception as dbexc
from ceilometer.openstack.common.db.sqlalchemy import migration
import ceilometer.openstack.common.db.sqlalchemy.session as sqlalchemy_session
@ -44,19 +37,12 @@ from ceilometer import storage
from ceilometer.storage import base
from ceilometer.storage import models as api_models
from ceilometer.storage.sqlalchemy import models
from ceilometer.storage.sqlalchemy import utils as sql_utils
from ceilometer import utils
LOG = log.getLogger(__name__)
META_TYPE_MAP = {bool: models.MetaBool,
str: models.MetaText,
unicode: models.MetaText,
types.NoneType: models.MetaText,
int: models.MetaBigInt,
long: models.MetaBigInt,
float: models.MetaFloat}
STANDARD_AGGREGATES = dict(
avg=func.avg(models.Sample.volume).label('avg'),
sum=func.sum(models.Sample.volume).label('sum'),
@ -103,10 +89,6 @@ AVAILABLE_CAPABILITIES = {
'stddev': True,
'cardinality': True}}
},
'alarms': {'query': {'simple': True,
'complex': True},
'history': {'query': {'simple': True,
'complex': True}}},
'events': {'query': {'simple': True}},
}
@ -126,7 +108,7 @@ def apply_metaquery_filter(session, query, metaquery):
for k, value in metaquery.iteritems():
key = k[9:] # strip out 'metadata.' prefix
try:
_model = META_TYPE_MAP[type(value)]
_model = sql_utils.META_TYPE_MAP[type(value)]
except KeyError:
raise NotImplementedError('Query on %(key)s is of %(value)s '
'type and is not supported' %
@ -221,9 +203,7 @@ class Connection(base.Connection):
}
"""
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
utils.update_nested(
alarm_base.Connection.CAPABILITIES,
AVAILABLE_CAPABILITIES))
AVAILABLE_CAPABILITIES)
STORAGE_CAPABILITIES = utils.update_nested(
base.Connection.STORAGE_CAPABILITIES,
AVAILABLE_STORAGE_CAPABILITIES,
@ -297,7 +277,7 @@ class Connection(base.Connection):
if isinstance(rmetadata, dict):
for key, v in utils.dict_to_keyval(rmetadata):
try:
_model = META_TYPE_MAP[type(v)]
_model = sql_utils.META_TYPE_MAP[type(v)]
except KeyError:
LOG.warn(_("Unknown metadata type. Key (%s) will "
"not be queryable."), key)
@ -509,34 +489,24 @@ class Connection(base.Connection):
query = session.query(table)
query = make_query_from_filter(session, query, sample_filter,
require_meter=False)
transformer = QueryTransformer(table, query)
transformer = sql_utils.QueryTransformer(table, query)
transformer.apply_options(None,
limit)
return self._retrieve_samples(transformer.get_query())
def _retrieve_data(self, filter_expr, orderby, limit, table):
def query_samples(self, filter_expr=None, orderby=None, limit=None):
if limit == 0:
return []
session = self._engine_facade.get_session()
query = session.query(table)
transformer = QueryTransformer(table, query)
query = session.query(models.MeterSample)
transformer = sql_utils.QueryTransformer(models.MeterSample, query)
if filter_expr is not None:
transformer.apply_filter(filter_expr)
transformer.apply_options(orderby,
limit)
retrieve = {models.MeterSample: self._retrieve_samples,
models.Alarm: self._retrieve_alarms,
models.AlarmChange: self._retrieve_alarm_history}
return retrieve[table](transformer.get_query())
def query_samples(self, filter_expr=None, orderby=None, limit=None):
return self._retrieve_data(filter_expr,
orderby,
limit,
models.MeterSample)
return self._retrieve_samples(transformer.get_query())
@staticmethod
def _get_aggregate_functions(aggregate):
@ -678,197 +648,6 @@ class Connection(base.Connection):
aggregate=aggregate
)
@staticmethod
def _row_to_alarm_model(row):
return alarm_api_models.Alarm(alarm_id=row.alarm_id,
enabled=row.enabled,
type=row.type,
name=row.name,
description=row.description,
timestamp=row.timestamp,
user_id=row.user_id,
project_id=row.project_id,
state=row.state,
state_timestamp=row.state_timestamp,
ok_actions=row.ok_actions,
alarm_actions=row.alarm_actions,
insufficient_data_actions=(
row.insufficient_data_actions),
rule=row.rule,
time_constraints=row.time_constraints,
repeat_actions=row.repeat_actions)
def _retrieve_alarms(self, query):
return (self._row_to_alarm_model(x) for x in query.all())
def get_alarms(self, name=None, user=None, state=None, meter=None,
project=None, enabled=None, alarm_id=None, pagination=None):
"""Yields a lists of alarms that match filters
:param user: Optional ID for user that owns the resource.
:param state: Optional string for alarm state.
:param meter: Optional string for alarms associated with meter.
:param project: Optional ID for project that owns the resource.
:param enabled: Optional boolean to list disable alarm.
:param alarm_id: Optional alarm_id to return one alarm.
:param pagination: Optional pagination query.
"""
if pagination:
raise NotImplementedError('Pagination not implemented')
session = self._engine_facade.get_session()
query = session.query(models.Alarm)
if name is not None:
query = query.filter(models.Alarm.name == name)
if enabled is not None:
query = query.filter(models.Alarm.enabled == enabled)
if user is not None:
query = query.filter(models.Alarm.user_id == user)
if project is not None:
query = query.filter(models.Alarm.project_id == project)
if alarm_id is not None:
query = query.filter(models.Alarm.alarm_id == alarm_id)
if state is not None:
query = query.filter(models.Alarm.state == state)
alarms = self._retrieve_alarms(query)
# TODO(cmart): improve this by using sqlalchemy.func factory
if meter is not None:
alarms = filter(lambda row:
row.rule.get('meter_name', None) == meter,
alarms)
return alarms
def create_alarm(self, alarm):
"""Create an alarm.
:param alarm: The alarm to create.
"""
session = self._engine_facade.get_session()
with session.begin():
alarm_row = models.Alarm(alarm_id=alarm.alarm_id)
alarm_row.update(alarm.as_dict())
session.add(alarm_row)
return self._row_to_alarm_model(alarm_row)
def update_alarm(self, alarm):
"""Update an alarm.
:param alarm: the new Alarm to update
"""
session = self._engine_facade.get_session()
with session.begin():
alarm_row = session.merge(models.Alarm(alarm_id=alarm.alarm_id))
alarm_row.update(alarm.as_dict())
return self._row_to_alarm_model(alarm_row)
def delete_alarm(self, alarm_id):
"""Delete an alarm
:param alarm_id: ID of the alarm to delete
"""
session = self._engine_facade.get_session()
with session.begin():
session.query(models.Alarm).filter(
models.Alarm.alarm_id == alarm_id).delete()
@staticmethod
def _row_to_alarm_change_model(row):
return alarm_api_models.AlarmChange(event_id=row.event_id,
alarm_id=row.alarm_id,
type=row.type,
detail=row.detail,
user_id=row.user_id,
project_id=row.project_id,
on_behalf_of=row.on_behalf_of,
timestamp=row.timestamp)
def query_alarms(self, filter_expr=None, orderby=None, limit=None):
"""Yields a lists of alarms that match filter."""
return self._retrieve_data(filter_expr, orderby, limit, models.Alarm)
def _retrieve_alarm_history(self, query):
return (self._row_to_alarm_change_model(x) for x in query.all())
def query_alarm_history(self, filter_expr=None, orderby=None, limit=None):
"""Return an iterable of model.AlarmChange objects."""
return self._retrieve_data(filter_expr,
orderby,
limit,
models.AlarmChange)
def get_alarm_changes(self, alarm_id, on_behalf_of,
user=None, project=None, type=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None):
"""Yields list of AlarmChanges describing alarm history
Changes are always sorted in reverse order of occurrence, given
the importance of currency.
Segregation for non-administrative users is done on the basis
of the on_behalf_of parameter. This allows such users to have
visibility on both the changes initiated by themselves directly
(generally creation, rule changes, or deletion) and also on those
changes initiated on their behalf by the alarming service (state
transitions after alarm thresholds are crossed).
:param alarm_id: ID of alarm to return changes for
:param on_behalf_of: ID of tenant to scope changes query (None for
administrative user, indicating all projects)
:param user: Optional ID of user to return changes for
:param project: Optional ID of project to return changes for
:project type: Optional change type
:param start_timestamp: Optional modified timestamp start range
:param start_timestamp_op: Optional timestamp start range operation
:param end_timestamp: Optional modified timestamp end range
:param end_timestamp_op: Optional timestamp end range operation
"""
session = self._engine_facade.get_session()
query = session.query(models.AlarmChange)
query = query.filter(models.AlarmChange.alarm_id == alarm_id)
if on_behalf_of is not None:
query = query.filter(
models.AlarmChange.on_behalf_of == on_behalf_of)
if user is not None:
query = query.filter(models.AlarmChange.user_id == user)
if project is not None:
query = query.filter(models.AlarmChange.project_id == project)
if type is not None:
query = query.filter(models.AlarmChange.type == type)
if start_timestamp:
if start_timestamp_op == 'gt':
query = query.filter(
models.AlarmChange.timestamp > start_timestamp)
else:
query = query.filter(
models.AlarmChange.timestamp >= start_timestamp)
if end_timestamp:
if end_timestamp_op == 'le':
query = query.filter(
models.AlarmChange.timestamp <= end_timestamp)
else:
query = query.filter(
models.AlarmChange.timestamp < end_timestamp)
query = query.order_by(desc(models.AlarmChange.timestamp))
return self._retrieve_alarm_history(query)
def record_alarm_change(self, alarm_change):
"""Record alarm change event."""
session = self._engine_facade.get_session()
with session.begin():
alarm_change_row = models.AlarmChange(
event_id=alarm_change['event_id'])
alarm_change_row.update(alarm_change)
session.add(alarm_change_row)
def _get_or_create_trait_type(self, trait_type, data_type, session=None):
"""Find if this trait already exists in the database.
@ -1143,91 +922,3 @@ class Connection(base.Connection):
yield api_models.Trait(name=type.desc,
dtype=type.data_type,
value=trait.get_value())
class QueryTransformer(object):
operators = {"=": operator.eq,
"<": operator.lt,
">": operator.gt,
"<=": operator.le,
"=<": operator.le,
">=": operator.ge,
"=>": operator.ge,
"!=": operator.ne,
"in": lambda field_name, values: field_name.in_(values)}
complex_operators = {"or": or_,
"and": and_,
"not": not_}
ordering_functions = {"asc": asc,
"desc": desc}
def __init__(self, table, query):
self.table = table
self.query = query
def _handle_complex_op(self, complex_op, nodes):
op = self.complex_operators[complex_op]
if op == not_:
nodes = [nodes]
element_list = []
for node in nodes:
element = self._transform(node)
element_list.append(element)
return op(*element_list)
def _handle_simple_op(self, simple_op, nodes):
op = self.operators[simple_op]
field_name = nodes.keys()[0]
value = nodes.values()[0]
if field_name.startswith('resource_metadata.'):
return self._handle_metadata(op, field_name, value)
else:
return op(getattr(self.table, field_name), value)
def _handle_metadata(self, op, field_name, value):
if op == self.operators["in"]:
raise NotImplementedError('Metadata query with in '
'operator is not implemented')
field_name = field_name[len('resource_metadata.'):]
meta_table = META_TYPE_MAP[type(value)]
meta_alias = aliased(meta_table)
on_clause = and_(self.table.id == meta_alias.id,
meta_alias.meta_key == field_name)
# outer join is needed to support metaquery
# with or operator on non existent metadata field
# see: test_query_non_existing_metadata_with_result
# test case.
self.query = self.query.outerjoin(meta_alias, on_clause)
return op(meta_alias.value, value)
def _transform(self, sub_tree):
operator = sub_tree.keys()[0]
nodes = sub_tree.values()[0]
if operator in self.complex_operators:
return self._handle_complex_op(operator, nodes)
else:
return self._handle_simple_op(operator, nodes)
def apply_filter(self, expression_tree):
condition = self._transform(expression_tree)
self.query = self.query.filter(condition)
def apply_options(self, orderby, limit):
self._apply_order_by(orderby)
if limit is not None:
self.query = self.query.limit(limit)
def _apply_order_by(self, orderby):
if orderby is not None:
for field in orderby:
ordering_function = self.ordering_functions[field.values()[0]]
self.query = self.query.order_by(ordering_function(
getattr(self.table, field.keys()[0])))
else:
self.query = self.query.order_by(desc(self.table.timestamp))
def get_query(self):
return self.query

View File

@ -0,0 +1,124 @@
# Author: John Tran <jhtran@att.com>
# Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import operator
import types
from sqlalchemy import and_
from sqlalchemy import asc
from sqlalchemy import desc
from sqlalchemy import not_
from sqlalchemy import or_
from sqlalchemy.orm import aliased
from ceilometer.storage.sqlalchemy import models
META_TYPE_MAP = {bool: models.MetaBool,
str: models.MetaText,
unicode: models.MetaText,
types.NoneType: models.MetaText,
int: models.MetaBigInt,
long: models.MetaBigInt,
float: models.MetaFloat}
class QueryTransformer(object):
operators = {"=": operator.eq,
"<": operator.lt,
">": operator.gt,
"<=": operator.le,
"=<": operator.le,
">=": operator.ge,
"=>": operator.ge,
"!=": operator.ne,
"in": lambda field_name, values: field_name.in_(values)}
complex_operators = {"or": or_,
"and": and_,
"not": not_}
ordering_functions = {"asc": asc,
"desc": desc}
def __init__(self, table, query):
self.table = table
self.query = query
def _handle_complex_op(self, complex_op, nodes):
op = self.complex_operators[complex_op]
if op == not_:
nodes = [nodes]
element_list = []
for node in nodes:
element = self._transform(node)
element_list.append(element)
return op(*element_list)
def _handle_simple_op(self, simple_op, nodes):
op = self.operators[simple_op]
field_name = nodes.keys()[0]
value = nodes.values()[0]
if field_name.startswith('resource_metadata.'):
return self._handle_metadata(op, field_name, value)
else:
return op(getattr(self.table, field_name), value)
def _handle_metadata(self, op, field_name, value):
if op == self.operators["in"]:
raise NotImplementedError('Metadata query with in '
'operator is not implemented')
field_name = field_name[len('resource_metadata.'):]
meta_table = META_TYPE_MAP[type(value)]
meta_alias = aliased(meta_table)
on_clause = and_(self.table.id == meta_alias.id,
meta_alias.meta_key == field_name)
# outer join is needed to support metaquery
# with or operator on non existent metadata field
# see: test_query_non_existing_metadata_with_result
# test case.
self.query = self.query.outerjoin(meta_alias, on_clause)
return op(meta_alias.value, value)
def _transform(self, sub_tree):
operator = sub_tree.keys()[0]
nodes = sub_tree.values()[0]
if operator in self.complex_operators:
return self._handle_complex_op(operator, nodes)
else:
return self._handle_simple_op(operator, nodes)
def apply_filter(self, expression_tree):
condition = self._transform(expression_tree)
self.query = self.query.filter(condition)
def apply_options(self, orderby, limit):
self._apply_order_by(orderby)
if limit is not None:
self.query = self.query.limit(limit)
def _apply_order_by(self, orderby):
if orderby is not None:
for field in orderby:
ordering_function = self.ordering_functions[field.values()[0]]
self.query = self.query.order_by(ordering_function(
getattr(self.table, field.keys()[0])))
else:
self.query = self.query.order_by(desc(self.table.timestamp))
def get_query(self):
return self.query

View File

@ -18,11 +18,11 @@
"""
from ceilometer.alarm.storage import impl_log as impl_log_alarm
from ceilometer.alarm.storage import impl_sqlalchemy as impl_sqlalchemy_alarm
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import test
from ceilometer import storage
from ceilometer.storage import impl_log
from ceilometer.storage import impl_sqlalchemy
import six
@ -65,4 +65,4 @@ class ConnectionConfigTest(test.BaseTestCase):
conn = storage.get_connection_from_config(self.CONF, 'metering')._conn
self.assertIsInstance(conn, impl_log.Connection)
conn = storage.get_connection_from_config(self.CONF, 'alarm')._conn
self.assertIsInstance(conn, impl_sqlalchemy.Connection)
self.assertIsInstance(conn, impl_sqlalchemy_alarm.Connection)

View File

@ -27,6 +27,7 @@ import repr
import mock
from ceilometer.alarm.storage import impl_sqlalchemy as impl_sqla_alarm
from ceilometer.openstack.common import timeutils
from ceilometer.storage import impl_sqlalchemy
from ceilometer.storage import models
@ -226,16 +227,23 @@ class CapabilitiesTest(test_base.BaseTestCase):
'stddev': True,
'cardinality': True}}
},
'alarms': {'query': {'simple': True,
'complex': True},
'history': {'query': {'simple': True,
'complex': True}}},
'events': {'query': {'simple': True}}
}
actual_capabilities = impl_sqlalchemy.Connection.get_capabilities()
self.assertEqual(expected_capabilities, actual_capabilities)
def test_alarm_capabilities(self):
expected_capabilities = {
'alarms': {'query': {'simple': True,
'complex': True},
'history': {'query': {'simple': True,
'complex': True}}},
}
actual_capabilities = impl_sqla_alarm.Connection.get_capabilities()
self.assertEqual(expected_capabilities, actual_capabilities)
def test_storage_capabilities(self):
expected_capabilities = {
'storage': {'production_ready': True},

View File

@ -164,9 +164,9 @@ ceilometer.poll.central =
ceilometer.alarm.storage =
log = ceilometer.alarm.storage.impl_log:Connection
mongodb = ceilometer.alarm.storage.impl_mongodb:Connection
mysql = ceilometer.storage.impl_sqlalchemy:Connection
postgresql = ceilometer.storage.impl_sqlalchemy:Connection
sqlite = ceilometer.storage.impl_sqlalchemy:Connection
mysql = ceilometer.alarm.storage.impl_sqlalchemy:Connection
postgresql = ceilometer.alarm.storage.impl_sqlalchemy:Connection
sqlite = ceilometer.alarm.storage.impl_sqlalchemy:Connection
hbase = ceilometer.alarm.storage.impl_hbase:Connection
db2 = ceilometer.alarm.storage.impl_db2:Connection