refactor sql backend to improve write speed

- drop sourceassoc table as its data is not accessible via api
- drop resource table since data can be retreive from sample table

Change-Id: I2d4a5175734cafce6a439ad736c47691e6e7e847
Implements: blueprint big-data-sql
Closes-Bug: #1305332
Closes-Bug: #1257908
This commit is contained in:
Gordon Chung 2014-05-20 17:48:46 -04:00 committed by gordon chung
parent 08eea70257
commit ac26db7cfa
4 changed files with 188 additions and 128 deletions

View File

@ -154,8 +154,8 @@ def make_query_from_filter(session, query, sample_filter, require_meter=True):
elif require_meter:
raise RuntimeError('Missing required meter specifier')
if sample_filter.source:
query = query.filter(models.Sample.sources.any(
id=sample_filter.source))
query = query.filter(
models.Sample.source_id == sample_filter.source)
if sample_filter.start:
ts_start = sample_filter.start
if sample_filter.start_timestamp_op == 'gt':
@ -192,8 +192,6 @@ class Connection(base.Connection):
Tables::
- source
- { id: source id }
- meter
- meter definition
- { id: meter def id
@ -207,26 +205,14 @@ class Connection(base.Connection):
meter_id: meter id (->meter.id)
user_id: user uuid
project_id: project uuid
resource_id: resource uuid (->resource.id)
resource_id: resource uuid
source_id: source id
resource_metadata: metadata dictionaries
volume: sample volume
timestamp: datetime
message_signature: message signature
message_id: message uuid
}
- resource
- the metadata for resources
- { id: resource uuid
resource_metadata: metadata dictionaries
project_id: project uuid
user_id: user uuid
}
- sourceassoc
- the relationships
- { sample_id: sample id (->sample.id)
resource_id: resource uuid (->resource.id)
source_id: source id (->source.id)
}
"""
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
AVAILABLE_CAPABILITIES)
@ -249,34 +235,12 @@ class Connection(base.Connection):
self._engine_facade._session_maker.close_all()
engine.dispose()
@staticmethod
def _create_or_update(session, model_class, _id, source=None, **kwargs):
if not _id:
return None
try:
with session.begin(subtransactions=True):
obj = session.query(model_class).get(str(_id))
if obj is None:
obj = model_class(id=str(_id))
session.add(obj)
if source and not filter(lambda x: x.id == source.id,
obj.sources):
obj.sources.append(source)
for k in kwargs:
setattr(obj, k, kwargs[k])
except dbexc.DBDuplicateEntry:
# requery the object from the db if this is an other
# parallel/previous call of record_metering_data that
# have successfully created this object
obj = Connection._create_or_update(session, model_class,
_id, source, **kwargs)
return obj
@staticmethod
def _create_meter(session, name, type, unit):
try:
with session.begin(subtransactions=True):
nested = session.connection().dialect.name != 'sqlite'
with session.begin(nested=nested,
subtransactions=not nested):
obj = session.query(models.Meter)\
.filter(models.Meter.name == name)\
.filter(models.Meter.type == type)\
@ -285,6 +249,7 @@ class Connection(base.Connection):
obj = models.Meter(name=name, type=type, unit=unit)
session.add(obj)
except dbexc.DBDuplicateEntry:
# retry function to pick up duplicate committed object
obj = Connection._create_meter(session, name, type, unit)
return obj
@ -297,26 +262,15 @@ class Connection(base.Connection):
"""
session = self._engine_facade.get_session()
with session.begin():
# Record the updated resource metadata
rmetadata = data['resource_metadata']
source = self._create_or_update(session, models.Source,
data['source'])
resource = self._create_or_update(session, models.Resource,
data['resource_id'], source,
user_id=data['user_id'],
project_id=data['project_id'],
resource_metadata=rmetadata)
# Record the raw data for the sample.
rmetadata = data['resource_metadata']
meter = self._create_meter(session,
data['counter_name'],
data['counter_type'],
data['counter_unit'])
sample = models.Sample(meter_id=meter.id,
resource=resource)
sample = models.Sample(meter_id=meter.id)
session.add(sample)
if not filter(lambda x: x.id == source.id, sample.sources):
sample.sources.append(source)
sample.resource_id = data['resource_id']
sample.project_id = data['project_id']
sample.user_id = data['user_id']
sample.timestamp = data['timestamp']
@ -324,6 +278,7 @@ class Connection(base.Connection):
sample.volume = data['counter_volume']
sample.message_signature = data['message_signature']
sample.message_id = data['message_id']
sample.source_id = data['source']
session.flush()
if rmetadata:
@ -355,13 +310,6 @@ class Connection(base.Connection):
for sample_obj in sample_query.all():
session.delete(sample_obj)
query = session.query(models.Resource)\
.filter(~models.Resource.id.in_(
session.query(models.Sample.resource_id).group_by(
models.Sample.resource_id)))
for res_obj in query.all():
session.delete(res_obj)
def get_resources(self, user=None, project=None, source=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None,
@ -388,12 +336,10 @@ class Connection(base.Connection):
# TODO(gordc) this should be merged with make_query_from_filter
for column, value in [(models.Sample.resource_id, resource),
(models.Sample.user_id, user),
(models.Sample.project_id, project)]:
(models.Sample.project_id, project),
(models.Sample.source_id, source)]:
if value:
query = query.filter(column == value)
if source:
query = query.filter(
models.Sample.sources.any(id=source))
if metaquery:
query = apply_metaquery_filter(session, query, metaquery)
if start_timestamp:
@ -438,7 +384,7 @@ class Connection(base.Connection):
project_id=sample.project_id,
first_sample_timestamp=min_q.first().timestamp,
last_sample_timestamp=sample.timestamp,
source=sample.sources[0].id,
source=sample.source_id,
user_id=sample.user_id,
metadata=sample.resource_metadata
)
@ -464,12 +410,10 @@ class Connection(base.Connection):
# TODO(gordc) this should be merged with make_query_from_filter
for column, value in [(models.Sample.resource_id, resource),
(models.Sample.user_id, user),
(models.Sample.project_id, project)]:
(models.Sample.project_id, project),
(models.Sample.source_id, source)]:
if value:
query = query.filter(column == value)
if source is not None:
query = query.filter(
models.Sample.sources.any(id=source))
if metaquery:
query = apply_metaquery_filter(session, query, metaquery)
return query
@ -500,7 +444,7 @@ class Connection(base.Connection):
unit=sample.counter_unit,
resource_id=sample.resource_id,
project_id=sample.project_id,
source=sample.sources[0].id,
source=sample.source_id,
user_id=sample.user_id)
def _retrieve_samples(self, query):
@ -511,10 +455,7 @@ class Connection(base.Connection):
# the sample was inserted. It is an implementation
# detail that should not leak outside of the driver.
yield api_models.Sample(
# Replace 'sources' with 'source' to meet the caller's
# expectation, Sample.sources contains one and only one
# source in the current implementation.
source=s.sources[0].id,
source=s.source_id,
counter_name=s.counter_name,
counter_type=s.counter_type,
counter_unit=s.counter_unit,

View File

@ -0,0 +1,166 @@
# -*- encoding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from migrate import ForeignKeyConstraint, UniqueConstraint
import sqlalchemy as sa
from ceilometer.storage.sqlalchemy import migration
TABLES = ['sample', 'resource', 'source', 'sourceassoc']
DROP_TABLES = ['resource', 'source', 'sourceassoc']
INDEXES = {
"sample": (('resource_id', 'resource', 'id'),),
"sourceassoc": (('sample_id', 'sample', 'id'),
('resource_id', 'resource', 'id'),
('source_id', 'source', 'id'))
}
def upgrade(migrate_engine):
meta = sa.MetaData(bind=migrate_engine)
load_tables = dict((table_name, sa.Table(table_name, meta,
autoload=True))
for table_name in TABLES)
# drop foreign keys
if migrate_engine.name != 'sqlite':
for table_name, indexes in INDEXES.items():
table = load_tables[table_name]
for column, ref_table_name, ref_column_name in indexes:
ref_table = load_tables[ref_table_name]
params = {'columns': [table.c[column]],
'refcolumns': [ref_table.c[ref_column_name]]}
fk_table_name = table_name
if migrate_engine.name == "mysql":
params['name'] = "_".join(('fk', fk_table_name, column))
elif (migrate_engine.name == "postgresql" and
table_name == 'sample'):
# fk was not renamed in script 030
params['name'] = "_".join(('meter', column, 'fkey'))
fkey = ForeignKeyConstraint(**params)
fkey.drop()
# create source field in sample
sample = load_tables['sample']
sample.create_column(sa.Column('source_id', sa.String(255)))
# move source values to samples
sourceassoc = load_tables['sourceassoc']
query = sa.select([sourceassoc.c.sample_id, sourceassoc.c.source_id])\
.where(sourceassoc.c.sample_id.isnot(None))
for sample_id, source_id in migration.paged(query):
sample.update().where(sample_id == sample.c.id)\
.values({'source_id': source_id}).execute()
# drop tables
for table_name in DROP_TABLES:
sa.Table(table_name, meta, autoload=True).drop()
def downgrade(migrate_engine):
meta = sa.MetaData(bind=migrate_engine)
sample = sa.Table('sample', meta, autoload=True)
resource = sa.Table(
'resource', meta,
sa.Column('id', sa.String(255), primary_key=True),
sa.Column('resource_metadata', sa.Text),
sa.Column('user_id', sa.String(255)),
sa.Column('project_id', sa.String(255)),
sa.Index('ix_resource_project_id', 'project_id'),
sa.Index('ix_resource_user_id', 'user_id'),
sa.Index('resource_user_id_project_id_key', 'user_id', 'project_id'),
mysql_engine='InnoDB',
mysql_charset='utf8',
)
resource.create()
source = sa.Table(
'source', meta,
sa.Column('id', sa.String(255), primary_key=True),
mysql_engine='InnoDB',
mysql_charset='utf8',
)
source.create()
sourceassoc = sa.Table(
'sourceassoc', meta,
sa.Column('sample_id', sa.Integer),
sa.Column('resource_id', sa.String(255)),
sa.Column('source_id', sa.String(255)),
sa.Index('idx_sr', 'source_id', 'resource_id'),
sa.Index('idx_ss', 'source_id', 'sample_id'),
mysql_engine='InnoDB',
mysql_charset='utf8',
)
sourceassoc.create()
params = {}
if migrate_engine.name == "mysql":
params = {'name': 'uniq_sourceassoc0sample_id'}
uc = UniqueConstraint('sample_id', table=sourceassoc, **params)
uc.create()
# reload source/resource tables.
# NOTE(gordc): fine to skip non-id attributes in table since
# they're constantly updated and not used by api
for table, col in [(source, 'source_id'), (resource, 'resource_id')]:
q = sa.select([sample.c[col]]).distinct()
# NOTE(sileht): workaround for
# https://bitbucket.org/zzzeek/sqlalchemy/
# issue/3044/insert-from-select-union_all
q.select = lambda: q
sql_ins = table.insert().from_select([table.c.id], q)
try:
migrate_engine.execute(sql_ins)
except TypeError:
# from select is empty
pass
# reload sourceassoc tables
for ref_col, col in [('id', 'sample_id'), ('resource_id', 'resource_id')]:
q = sa.select([sample.c.source_id, sample.c[ref_col]]).distinct()
q.select = lambda: q
sql_ins = sourceassoc.insert().from_select([sourceassoc.c.source_id,
sourceassoc.c[col]], q)
try:
migrate_engine.execute(sql_ins)
except TypeError:
# from select is empty
pass
sample.c.source_id.drop()
load_tables = dict((table_name, sa.Table(table_name, meta,
autoload=True))
for table_name in TABLES)
# add foreign keys
if migrate_engine.name != 'sqlite':
for table_name, indexes in INDEXES.items():
table = load_tables[table_name]
for column, ref_table_name, ref_column_name in indexes:
ref_table = load_tables[ref_table_name]
params = {'columns': [table.c[column]],
'refcolumns': [ref_table.c[ref_column_name]]}
fk_table_name = table_name
if migrate_engine.name == "mysql":
params['name'] = "_".join(('fk', fk_table_name, column))
elif (migrate_engine.name == "postgresql" and
table_name == 'sample'):
# fk was not renamed in script 030
params['name'] = "_".join(('meter', column, 'fkey'))
fkey = ForeignKeyConstraint(**params)
fkey.create()

View File

@ -20,7 +20,7 @@ SQLAlchemy models for Ceilometer data.
import json
from sqlalchemy import Column, Integer, String, Table, ForeignKey, \
from sqlalchemy import Column, Integer, String, ForeignKey, \
Index, UniqueConstraint, BigInteger, join
from sqlalchemy import Float, Boolean, Text, DateTime
from sqlalchemy.dialects.mysql import DECIMAL
@ -99,25 +99,6 @@ class CeilometerBase(object):
Base = declarative_base(cls=CeilometerBase)
sourceassoc = Table('sourceassoc', Base.metadata,
Column('sample_id', Integer,
ForeignKey("sample.id")),
Column('resource_id', String(255),
ForeignKey("resource.id")),
Column('source_id', String(255),
ForeignKey("source.id")))
Index('idx_sr', sourceassoc.c['source_id'], sourceassoc.c['resource_id']),
Index('idx_ss', sourceassoc.c['source_id'], sourceassoc.c['sample_id']),
Index('ix_sourceassoc_source_id', sourceassoc.c['source_id'])
UniqueConstraint(sourceassoc.c['sample_id'], name='uniq_sourceassoc0sample_id')
class Source(Base):
__tablename__ = 'source'
id = Column(String(255), primary_key=True)
class MetaText(Base):
"""Metering text metadata."""
@ -193,7 +174,7 @@ class Sample(Base):
meter_id = Column(Integer, ForeignKey('meter.id'))
user_id = Column(String(255))
project_id = Column(String(255))
resource_id = Column(String(255), ForeignKey('resource.id'))
resource_id = Column(String(255))
resource_metadata = Column(JSONEncodedDict())
volume = Column(Float(53))
timestamp = Column(PreciseTimestamp(), default=lambda: timeutils.utcnow())
@ -201,7 +182,7 @@ class Sample(Base):
default=lambda: timeutils.utcnow())
message_signature = Column(String(1000))
message_id = Column(String(1000))
sources = relationship("Source", secondary=lambda: sourceassoc)
source_id = Column(String(255))
meta_text = relationship("MetaText", backref="sample",
cascade="all, delete-orphan")
meta_float = relationship("MetaFloat", backref="sample",
@ -226,22 +207,6 @@ class MeterSample(Base):
counter_type = column_property(meter.c.type)
counter_unit = column_property(meter.c.unit)
counter_volume = column_property(sample.c.volume)
sources = relationship("Source", secondary=lambda: sourceassoc)
class Resource(Base):
__tablename__ = 'resource'
__table_args__ = (
Index('ix_resource_project_id', 'project_id'),
Index('ix_resource_user_id', 'user_id'),
Index('resource_user_id_project_id_key', 'user_id', 'project_id')
)
id = Column(String(255), primary_key=True)
sources = relationship("Source", secondary=lambda: sourceassoc)
resource_metadata = Column(JSONEncodedDict())
user_id = Column(String(255))
project_id = Column(String(255))
samples = relationship("Sample", backref='resource')
class Alarm(Base):

View File

@ -194,18 +194,6 @@ class RelationshipTest(scenarios.DBTestBase):
.group_by(sql_models.Sample.id)
)).count())
@patch.object(timeutils, 'utcnow')
def test_clear_metering_data_associations(self, mock_utcnow):
mock_utcnow.return_value = datetime.datetime(2012, 7, 2, 10, 45)
self.conn.clear_expired_metering_data(3 * 60)
session = self.conn._engine_facade.get_session()
self.assertEqual(0, session.query(sql_models.sourceassoc)
.filter(~sql_models.sourceassoc.c.sample_id.in_(
session.query(sql_models.Sample.id)
.group_by(sql_models.Sample.id)
)).count())
class CapabilitiesTest(test_base.BaseTestCase):
# Check the returned capabilities list, which is specific to each DB