enable sql metadata query

explode metadata key/values to their own tables/rows (based on type).
build a key string using dot notation similar to other nosql db
and filter based on that.

Blueprint: sqlalchemy-metadata-query
Related-Bug: #1093625

Change-Id: I2076e67b79448f98124a57b62b5bfed7aa8ae2ad
This commit is contained in:
Gordon Chung 2013-10-02 15:45:26 -04:00
parent 4b2275db19
commit 1570462507
7 changed files with 239 additions and 11 deletions

View File

@ -18,10 +18,12 @@
"""SQLAlchemy storage backend."""
from __future__ import absolute_import
import datetime
import operator
import os
import types
from sqlalchemy import and_
from sqlalchemy import func
from sqlalchemy import desc
from sqlalchemy.orm import aliased
@ -39,6 +41,10 @@ from ceilometer.storage.sqlalchemy.models import AlarmChange
from ceilometer.storage.sqlalchemy.models import Base
from ceilometer.storage.sqlalchemy.models import Event
from ceilometer.storage.sqlalchemy.models import Meter
from ceilometer.storage.sqlalchemy.models import MetaBool
from ceilometer.storage.sqlalchemy.models import MetaFloat
from ceilometer.storage.sqlalchemy.models import MetaInt
from ceilometer.storage.sqlalchemy.models import MetaText
from ceilometer.storage.sqlalchemy.models import Project
from ceilometer.storage.sqlalchemy.models import Resource
from ceilometer.storage.sqlalchemy.models import Source
@ -100,7 +106,40 @@ class SQLAlchemyStorage(base.StorageEngine):
return Connection(conf)
def make_query_from_filter(query, sample_filter, require_meter=True):
META_TYPE_MAP = {bool: MetaBool,
str: MetaText,
unicode: MetaText,
types.NoneType: MetaText,
int: MetaInt,
long: MetaInt,
float: MetaFloat}
def apply_metaquery_filter(session, query, metaquery):
"""Apply provided metaquery filter to existing query.
:param session: session used for original query
:param query: Query instance
:param metaquery: dict with metadata to match on.
"""
for k, v in metaquery.iteritems():
key = k[9:] # strip out 'metadata.' prefix
try:
_model = META_TYPE_MAP[type(v)]
except KeyError:
raise NotImplementedError(_('Query on %(key)s is of %(value)s '
'type and is not supported') %
{"key": k, "value": type(v)})
else:
meta_q = session.query(_model).\
filter(and_(_model.meta_key == key,
_model.value == v)).subquery()
query = query.filter_by(id=meta_q.c.id)
return query
def make_query_from_filter(session, query, sample_filter, require_meter=True):
"""Return a query dictionary based on the settings in the filter.
:param filter: SampleFilter instance
@ -134,7 +173,8 @@ def make_query_from_filter(query, sample_filter, require_meter=True):
query = query.filter_by(resource_id=sample_filter.resource)
if sample_filter.metaquery:
raise NotImplementedError(_('metaquery not implemented'))
query = apply_metaquery_filter(session, query,
sample_filter.metaquery)
return query
@ -217,6 +257,21 @@ class Connection(base.Connection):
meter.message_id = data['message_id']
session.flush()
if rmetadata:
if isinstance(rmetadata, dict):
for key, v in utils.dict_to_keyval(rmetadata):
try:
_model = META_TYPE_MAP[type(v)]
except KeyError:
LOG.warn(_("Unknown metadata type. Key (%s) will "
"not be queryable."), key)
else:
session.add(_model(id=meter.id,
meta_key=key,
value=v))
session.flush()
@staticmethod
def clear_expired_metering_data(ttl):
"""Clear expired data from the backend storage system according to the
@ -294,8 +349,6 @@ class Connection(base.Connection):
# just fail.
if pagination:
raise NotImplementedError(_('Pagination not implemented'))
if metaquery:
raise NotImplementedError(_('metaquery not implemented'))
# (thomasm) We need to get the max timestamp first, since that's the
# most accurate. We also need to filter down in the subquery to
@ -319,6 +372,11 @@ class Connection(base.Connection):
ts_subquery = ts_subquery.filter(
Meter.sources.any(id=source))
if metaquery:
ts_subquery = apply_metaquery_filter(session,
ts_subquery,
metaquery)
# Here we limit the samples being used to a specific time period,
# if requested.
if start_timestamp:
@ -397,8 +455,6 @@ class Connection(base.Connection):
if pagination:
raise NotImplementedError(_('Pagination not implemented'))
if metaquery:
raise NotImplementedError(_('metaquery not implemented'))
session = sqlalchemy_session.get_session()
@ -422,6 +478,11 @@ class Connection(base.Connection):
query_meter = session.query(Meter).\
join(subquery_meter, Meter.id == subquery_meter.c.id)
if metaquery:
query_meter = apply_metaquery_filter(session,
query_meter,
metaquery)
alias_meter = aliased(Meter, query_meter.subquery())
query = session.query(Resource, alias_meter).join(
alias_meter, Resource.id == alias_meter.resource_id)
@ -457,7 +518,7 @@ class Connection(base.Connection):
session = sqlalchemy_session.get_session()
query = session.query(Meter)
query = make_query_from_filter(query, sample_filter,
query = make_query_from_filter(session, query, sample_filter,
require_meter=False)
if limit:
query = query.limit(limit)
@ -509,7 +570,7 @@ class Connection(base.Connection):
if groupby:
query = query.group_by(*group_attributes)
return make_query_from_filter(query, sample_filter)
return make_query_from_filter(session, query, sample_filter)
@staticmethod
def _stats_result_to_model(result, period, period_start,

View File

@ -0,0 +1,78 @@
#
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from sqlalchemy import Boolean
from sqlalchemy import Column
from sqlalchemy import Float
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import Text
from sqlalchemy.sql import select
from ceilometer import utils
tables = [('metadata_text', Text, True),
('metadata_bool', Boolean, False),
('metadata_int', Integer, False),
('metadata_float', Float, False)]
def upgrade(migrate_engine):
meta = MetaData(bind=migrate_engine)
meter = Table('meter', meta, autoload=True)
meta_tables = {}
for t_name, t_type, t_nullable in tables:
meta_tables[t_name] = Table(
t_name, meta,
Column('id', Integer, ForeignKey('meter.id'), primary_key=True),
Column('meta_key', String(255), index=True, primary_key=True),
Column('value', t_type, nullable=t_nullable),
mysql_engine='InnoDB',
mysql_charset='utf8',
)
meta_tables[t_name].create()
for row in select([meter]).execute():
meter_id = row['id']
rmeta = json.loads(row['resource_metadata'])
for key, v in utils.dict_to_keyval(rmeta):
if isinstance(v, basestring) or v is None:
meta_tables['metadata_text'].insert().values(id=meter_id,
meta_key=key,
value=v)
elif isinstance(v, bool):
meta_tables['metadata_bool'].insert().values(id=meter_id,
meta_key=key,
value=v)
elif isinstance(v, (int, long)):
meta_tables['metadata_int'].insert().values(id=meter_id,
meta_key=key,
value=v)
elif isinstance(v, float):
meta_tables['metadata_float'].insert().values(id=meter_id,
meta_key=key,
value=v)
def downgrade(migrate_engine):
meta = MetaData(bind=migrate_engine)
for t in tables:
table = Table(t[0], meta, autoload=True)
table.drop()

View File

@ -141,6 +141,54 @@ class Source(Base):
id = Column(String(255), primary_key=True)
class MetaText(Base):
"""Metering text metadata."""
__tablename__ = 'metadata_text'
__table_args__ = (
Index('ix_meta_text_key', 'meta_key'),
)
id = Column(Integer, ForeignKey('meter.id'), primary_key=True)
meta_key = Column(String(255), primary_key=True)
value = Column(Text)
class MetaBool(Base):
"""Metering boolean metadata."""
__tablename__ = 'metadata_bool'
__table_args__ = (
Index('ix_meta_bool_key', 'meta_key'),
)
id = Column(Integer, ForeignKey('meter.id'), primary_key=True)
meta_key = Column(String(255), primary_key=True)
value = Column(Boolean)
class MetaInt(Base):
"""Metering integer metadata."""
__tablename__ = 'metadata_int'
__table_args__ = (
Index('ix_meta_int_key', 'meta_key'),
)
id = Column(Integer, ForeignKey('meter.id'), primary_key=True)
meta_key = Column(String(255), primary_key=True)
value = Column(Integer, default=False)
class MetaFloat(Base):
"""Metering float metadata."""
__tablename__ = 'metadata_float'
__table_args__ = (
Index('ix_meta_float_key', 'meta_key'),
)
id = Column(Integer, ForeignKey('meter.id'), primary_key=True)
meta_key = Column(String(255), primary_key=True)
value = Column(Float, default=False)
class Meter(Base):
"""Metering data."""

View File

@ -79,3 +79,27 @@ def stringify_timestamps(data):
isa_timestamp = lambda v: isinstance(v, datetime.datetime)
return dict((k, v.isoformat() if isa_timestamp(v) else v)
for (k, v) in data.iteritems())
def dict_to_keyval(value, key_base=None):
"""Expand a given dict to its corresponding key-value pairs.
Generated keys are fully qualified, delimited using dot notation.
ie. key = 'key.child_key.grandchild_key[0]'
"""
val_iter, key_func = None, None
if isinstance(value, dict):
val_iter = value.iteritems()
key_func = lambda k: key_base + '.' + k if key_base else k
elif isinstance(value, (tuple, list)):
val_iter = enumerate(value)
key_func = lambda k: key_base + '[%d]' % k
if val_iter:
for k, v in val_iter:
key_gen = key_func(k)
if isinstance(v, dict) or isinstance(v, (tuple, list)):
for key_gen, v in dict_to_keyval(v, key_gen):
yield key_gen, v
else:
yield key_gen, v

View File

@ -43,8 +43,8 @@ The following is a table indicating the status of each database drivers:
Driver API querying API statistics Alarms
================== ============================= =================== ======
MongoDB Yes Yes Yes
MySQL Yes, except metadata querying Yes Yes
PostgreSQL Yes, except metadata querying Yes Yes
MySQL Yes Yes Yes
PostgreSQL Yes Yes Yes
HBase Yes Yes, except groupby No
DB2 Yes Yes No
================== ============================= =================== ======

View File

@ -252,6 +252,7 @@ class TestListMeters(FunctionalTest,
set(['meter.mine']))
self.assertEqual(set(r['resource_metadata']['is_public'] for r
in data), set(['False']))
# FIXME(gordc): verify no false positive (Bug#1236496)
def test_list_meters_query_string_metadata(self):
data = self.get_json('/meters/meter.test',

View File

@ -70,3 +70,19 @@ class TestUtils(tests_base.TestCase):
def test_decimal_to_dt_with_none_parameter(self):
self.assertEqual(utils.decimal_to_dt(None), None)
def test_dict_to_kv(self):
data = {'a': 'A',
'b': 'B',
'nested': {'a': 'A',
'b': 'B',
},
'nested2': [{'c': 'A'}, {'c': 'B'}]
}
pairs = list(utils.dict_to_keyval(data))
self.assertEqual(pairs, [('a', 'A'),
('b', 'B'),
('nested2[0].c', 'A'),
('nested2[1].c', 'B'),
('nested.a', 'A'),
('nested.b', 'B')])