From ac26db7cfa42f751be63a406056f4ed3e828f56d Mon Sep 17 00:00:00 2001 From: Gordon Chung Date: Tue, 20 May 2014 17:48:46 -0400 Subject: [PATCH] refactor sql backend to improve write speed - drop sourceassoc table as its data is not accessible via api - drop resource table since data can be retreive from sample table Change-Id: I2d4a5175734cafce6a439ad736c47691e6e7e847 Implements: blueprint big-data-sql Closes-Bug: #1305332 Closes-Bug: #1257908 --- ceilometer/storage/impl_sqlalchemy.py | 97 ++-------- .../036_drop_sourceassoc_resource_tables.py | 166 ++++++++++++++++++ ceilometer/storage/sqlalchemy/models.py | 41 +---- .../tests/storage/test_impl_sqlalchemy.py | 12 -- 4 files changed, 188 insertions(+), 128 deletions(-) create mode 100644 ceilometer/storage/sqlalchemy/migrate_repo/versions/036_drop_sourceassoc_resource_tables.py diff --git a/ceilometer/storage/impl_sqlalchemy.py b/ceilometer/storage/impl_sqlalchemy.py index 235276e0..ace02320 100644 --- a/ceilometer/storage/impl_sqlalchemy.py +++ b/ceilometer/storage/impl_sqlalchemy.py @@ -154,8 +154,8 @@ def make_query_from_filter(session, query, sample_filter, require_meter=True): elif require_meter: raise RuntimeError('Missing required meter specifier') if sample_filter.source: - query = query.filter(models.Sample.sources.any( - id=sample_filter.source)) + query = query.filter( + models.Sample.source_id == sample_filter.source) if sample_filter.start: ts_start = sample_filter.start if sample_filter.start_timestamp_op == 'gt': @@ -192,8 +192,6 @@ class Connection(base.Connection): Tables:: - - source - - { id: source id } - meter - meter definition - { id: meter def id @@ -207,26 +205,14 @@ class Connection(base.Connection): meter_id: meter id (->meter.id) user_id: user uuid project_id: project uuid - resource_id: resource uuid (->resource.id) + resource_id: resource uuid + source_id: source id resource_metadata: metadata dictionaries volume: sample volume timestamp: datetime message_signature: message signature message_id: message uuid } - - resource - - the metadata for resources - - { id: resource uuid - resource_metadata: metadata dictionaries - project_id: project uuid - user_id: user uuid - } - - sourceassoc - - the relationships - - { sample_id: sample id (->sample.id) - resource_id: resource uuid (->resource.id) - source_id: source id (->source.id) - } """ CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, AVAILABLE_CAPABILITIES) @@ -249,34 +235,12 @@ class Connection(base.Connection): self._engine_facade._session_maker.close_all() engine.dispose() - @staticmethod - def _create_or_update(session, model_class, _id, source=None, **kwargs): - if not _id: - return None - try: - with session.begin(subtransactions=True): - obj = session.query(model_class).get(str(_id)) - if obj is None: - obj = model_class(id=str(_id)) - session.add(obj) - - if source and not filter(lambda x: x.id == source.id, - obj.sources): - obj.sources.append(source) - for k in kwargs: - setattr(obj, k, kwargs[k]) - except dbexc.DBDuplicateEntry: - # requery the object from the db if this is an other - # parallel/previous call of record_metering_data that - # have successfully created this object - obj = Connection._create_or_update(session, model_class, - _id, source, **kwargs) - return obj - @staticmethod def _create_meter(session, name, type, unit): try: - with session.begin(subtransactions=True): + nested = session.connection().dialect.name != 'sqlite' + with session.begin(nested=nested, + subtransactions=not nested): obj = session.query(models.Meter)\ .filter(models.Meter.name == name)\ .filter(models.Meter.type == type)\ @@ -285,6 +249,7 @@ class Connection(base.Connection): obj = models.Meter(name=name, type=type, unit=unit) session.add(obj) except dbexc.DBDuplicateEntry: + # retry function to pick up duplicate committed object obj = Connection._create_meter(session, name, type, unit) return obj @@ -297,26 +262,15 @@ class Connection(base.Connection): """ session = self._engine_facade.get_session() with session.begin(): - # Record the updated resource metadata - rmetadata = data['resource_metadata'] - source = self._create_or_update(session, models.Source, - data['source']) - resource = self._create_or_update(session, models.Resource, - data['resource_id'], source, - user_id=data['user_id'], - project_id=data['project_id'], - resource_metadata=rmetadata) - # Record the raw data for the sample. + rmetadata = data['resource_metadata'] meter = self._create_meter(session, data['counter_name'], data['counter_type'], data['counter_unit']) - sample = models.Sample(meter_id=meter.id, - resource=resource) + sample = models.Sample(meter_id=meter.id) session.add(sample) - if not filter(lambda x: x.id == source.id, sample.sources): - sample.sources.append(source) + sample.resource_id = data['resource_id'] sample.project_id = data['project_id'] sample.user_id = data['user_id'] sample.timestamp = data['timestamp'] @@ -324,6 +278,7 @@ class Connection(base.Connection): sample.volume = data['counter_volume'] sample.message_signature = data['message_signature'] sample.message_id = data['message_id'] + sample.source_id = data['source'] session.flush() if rmetadata: @@ -355,13 +310,6 @@ class Connection(base.Connection): for sample_obj in sample_query.all(): session.delete(sample_obj) - query = session.query(models.Resource)\ - .filter(~models.Resource.id.in_( - session.query(models.Sample.resource_id).group_by( - models.Sample.resource_id))) - for res_obj in query.all(): - session.delete(res_obj) - def get_resources(self, user=None, project=None, source=None, start_timestamp=None, start_timestamp_op=None, end_timestamp=None, end_timestamp_op=None, @@ -388,12 +336,10 @@ class Connection(base.Connection): # TODO(gordc) this should be merged with make_query_from_filter for column, value in [(models.Sample.resource_id, resource), (models.Sample.user_id, user), - (models.Sample.project_id, project)]: + (models.Sample.project_id, project), + (models.Sample.source_id, source)]: if value: query = query.filter(column == value) - if source: - query = query.filter( - models.Sample.sources.any(id=source)) if metaquery: query = apply_metaquery_filter(session, query, metaquery) if start_timestamp: @@ -438,7 +384,7 @@ class Connection(base.Connection): project_id=sample.project_id, first_sample_timestamp=min_q.first().timestamp, last_sample_timestamp=sample.timestamp, - source=sample.sources[0].id, + source=sample.source_id, user_id=sample.user_id, metadata=sample.resource_metadata ) @@ -464,12 +410,10 @@ class Connection(base.Connection): # TODO(gordc) this should be merged with make_query_from_filter for column, value in [(models.Sample.resource_id, resource), (models.Sample.user_id, user), - (models.Sample.project_id, project)]: + (models.Sample.project_id, project), + (models.Sample.source_id, source)]: if value: query = query.filter(column == value) - if source is not None: - query = query.filter( - models.Sample.sources.any(id=source)) if metaquery: query = apply_metaquery_filter(session, query, metaquery) return query @@ -500,7 +444,7 @@ class Connection(base.Connection): unit=sample.counter_unit, resource_id=sample.resource_id, project_id=sample.project_id, - source=sample.sources[0].id, + source=sample.source_id, user_id=sample.user_id) def _retrieve_samples(self, query): @@ -511,10 +455,7 @@ class Connection(base.Connection): # the sample was inserted. It is an implementation # detail that should not leak outside of the driver. yield api_models.Sample( - # Replace 'sources' with 'source' to meet the caller's - # expectation, Sample.sources contains one and only one - # source in the current implementation. - source=s.sources[0].id, + source=s.source_id, counter_name=s.counter_name, counter_type=s.counter_type, counter_unit=s.counter_unit, diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/versions/036_drop_sourceassoc_resource_tables.py b/ceilometer/storage/sqlalchemy/migrate_repo/versions/036_drop_sourceassoc_resource_tables.py new file mode 100644 index 00000000..4e112bc0 --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/versions/036_drop_sourceassoc_resource_tables.py @@ -0,0 +1,166 @@ +# -*- encoding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from migrate import ForeignKeyConstraint, UniqueConstraint +import sqlalchemy as sa + +from ceilometer.storage.sqlalchemy import migration + + +TABLES = ['sample', 'resource', 'source', 'sourceassoc'] +DROP_TABLES = ['resource', 'source', 'sourceassoc'] + +INDEXES = { + "sample": (('resource_id', 'resource', 'id'),), + "sourceassoc": (('sample_id', 'sample', 'id'), + ('resource_id', 'resource', 'id'), + ('source_id', 'source', 'id')) +} + + +def upgrade(migrate_engine): + meta = sa.MetaData(bind=migrate_engine) + load_tables = dict((table_name, sa.Table(table_name, meta, + autoload=True)) + for table_name in TABLES) + + # drop foreign keys + if migrate_engine.name != 'sqlite': + for table_name, indexes in INDEXES.items(): + table = load_tables[table_name] + for column, ref_table_name, ref_column_name in indexes: + ref_table = load_tables[ref_table_name] + params = {'columns': [table.c[column]], + 'refcolumns': [ref_table.c[ref_column_name]]} + fk_table_name = table_name + if migrate_engine.name == "mysql": + params['name'] = "_".join(('fk', fk_table_name, column)) + elif (migrate_engine.name == "postgresql" and + table_name == 'sample'): + # fk was not renamed in script 030 + params['name'] = "_".join(('meter', column, 'fkey')) + fkey = ForeignKeyConstraint(**params) + fkey.drop() + + # create source field in sample + sample = load_tables['sample'] + sample.create_column(sa.Column('source_id', sa.String(255))) + + # move source values to samples + sourceassoc = load_tables['sourceassoc'] + query = sa.select([sourceassoc.c.sample_id, sourceassoc.c.source_id])\ + .where(sourceassoc.c.sample_id.isnot(None)) + for sample_id, source_id in migration.paged(query): + sample.update().where(sample_id == sample.c.id)\ + .values({'source_id': source_id}).execute() + + # drop tables + for table_name in DROP_TABLES: + sa.Table(table_name, meta, autoload=True).drop() + + +def downgrade(migrate_engine): + meta = sa.MetaData(bind=migrate_engine) + sample = sa.Table('sample', meta, autoload=True) + resource = sa.Table( + 'resource', meta, + sa.Column('id', sa.String(255), primary_key=True), + sa.Column('resource_metadata', sa.Text), + sa.Column('user_id', sa.String(255)), + sa.Column('project_id', sa.String(255)), + sa.Index('ix_resource_project_id', 'project_id'), + sa.Index('ix_resource_user_id', 'user_id'), + sa.Index('resource_user_id_project_id_key', 'user_id', 'project_id'), + mysql_engine='InnoDB', + mysql_charset='utf8', + ) + resource.create() + + source = sa.Table( + 'source', meta, + sa.Column('id', sa.String(255), primary_key=True), + mysql_engine='InnoDB', + mysql_charset='utf8', + ) + source.create() + + sourceassoc = sa.Table( + 'sourceassoc', meta, + sa.Column('sample_id', sa.Integer), + sa.Column('resource_id', sa.String(255)), + sa.Column('source_id', sa.String(255)), + sa.Index('idx_sr', 'source_id', 'resource_id'), + sa.Index('idx_ss', 'source_id', 'sample_id'), + mysql_engine='InnoDB', + mysql_charset='utf8', + ) + sourceassoc.create() + + params = {} + if migrate_engine.name == "mysql": + params = {'name': 'uniq_sourceassoc0sample_id'} + uc = UniqueConstraint('sample_id', table=sourceassoc, **params) + uc.create() + + # reload source/resource tables. + # NOTE(gordc): fine to skip non-id attributes in table since + # they're constantly updated and not used by api + for table, col in [(source, 'source_id'), (resource, 'resource_id')]: + q = sa.select([sample.c[col]]).distinct() + # NOTE(sileht): workaround for + # https://bitbucket.org/zzzeek/sqlalchemy/ + # issue/3044/insert-from-select-union_all + q.select = lambda: q + sql_ins = table.insert().from_select([table.c.id], q) + try: + migrate_engine.execute(sql_ins) + except TypeError: + # from select is empty + pass + + # reload sourceassoc tables + for ref_col, col in [('id', 'sample_id'), ('resource_id', 'resource_id')]: + q = sa.select([sample.c.source_id, sample.c[ref_col]]).distinct() + q.select = lambda: q + sql_ins = sourceassoc.insert().from_select([sourceassoc.c.source_id, + sourceassoc.c[col]], q) + try: + migrate_engine.execute(sql_ins) + except TypeError: + # from select is empty + pass + + sample.c.source_id.drop() + + load_tables = dict((table_name, sa.Table(table_name, meta, + autoload=True)) + for table_name in TABLES) + + # add foreign keys + if migrate_engine.name != 'sqlite': + for table_name, indexes in INDEXES.items(): + table = load_tables[table_name] + for column, ref_table_name, ref_column_name in indexes: + ref_table = load_tables[ref_table_name] + params = {'columns': [table.c[column]], + 'refcolumns': [ref_table.c[ref_column_name]]} + fk_table_name = table_name + if migrate_engine.name == "mysql": + params['name'] = "_".join(('fk', fk_table_name, column)) + elif (migrate_engine.name == "postgresql" and + table_name == 'sample'): + # fk was not renamed in script 030 + params['name'] = "_".join(('meter', column, 'fkey')) + fkey = ForeignKeyConstraint(**params) + fkey.create() diff --git a/ceilometer/storage/sqlalchemy/models.py b/ceilometer/storage/sqlalchemy/models.py index 5ea89953..8de15153 100644 --- a/ceilometer/storage/sqlalchemy/models.py +++ b/ceilometer/storage/sqlalchemy/models.py @@ -20,7 +20,7 @@ SQLAlchemy models for Ceilometer data. import json -from sqlalchemy import Column, Integer, String, Table, ForeignKey, \ +from sqlalchemy import Column, Integer, String, ForeignKey, \ Index, UniqueConstraint, BigInteger, join from sqlalchemy import Float, Boolean, Text, DateTime from sqlalchemy.dialects.mysql import DECIMAL @@ -99,25 +99,6 @@ class CeilometerBase(object): Base = declarative_base(cls=CeilometerBase) -sourceassoc = Table('sourceassoc', Base.metadata, - Column('sample_id', Integer, - ForeignKey("sample.id")), - Column('resource_id', String(255), - ForeignKey("resource.id")), - Column('source_id', String(255), - ForeignKey("source.id"))) - -Index('idx_sr', sourceassoc.c['source_id'], sourceassoc.c['resource_id']), -Index('idx_ss', sourceassoc.c['source_id'], sourceassoc.c['sample_id']), -Index('ix_sourceassoc_source_id', sourceassoc.c['source_id']) -UniqueConstraint(sourceassoc.c['sample_id'], name='uniq_sourceassoc0sample_id') - - -class Source(Base): - __tablename__ = 'source' - id = Column(String(255), primary_key=True) - - class MetaText(Base): """Metering text metadata.""" @@ -193,7 +174,7 @@ class Sample(Base): meter_id = Column(Integer, ForeignKey('meter.id')) user_id = Column(String(255)) project_id = Column(String(255)) - resource_id = Column(String(255), ForeignKey('resource.id')) + resource_id = Column(String(255)) resource_metadata = Column(JSONEncodedDict()) volume = Column(Float(53)) timestamp = Column(PreciseTimestamp(), default=lambda: timeutils.utcnow()) @@ -201,7 +182,7 @@ class Sample(Base): default=lambda: timeutils.utcnow()) message_signature = Column(String(1000)) message_id = Column(String(1000)) - sources = relationship("Source", secondary=lambda: sourceassoc) + source_id = Column(String(255)) meta_text = relationship("MetaText", backref="sample", cascade="all, delete-orphan") meta_float = relationship("MetaFloat", backref="sample", @@ -226,22 +207,6 @@ class MeterSample(Base): counter_type = column_property(meter.c.type) counter_unit = column_property(meter.c.unit) counter_volume = column_property(sample.c.volume) - sources = relationship("Source", secondary=lambda: sourceassoc) - - -class Resource(Base): - __tablename__ = 'resource' - __table_args__ = ( - Index('ix_resource_project_id', 'project_id'), - Index('ix_resource_user_id', 'user_id'), - Index('resource_user_id_project_id_key', 'user_id', 'project_id') - ) - id = Column(String(255), primary_key=True) - sources = relationship("Source", secondary=lambda: sourceassoc) - resource_metadata = Column(JSONEncodedDict()) - user_id = Column(String(255)) - project_id = Column(String(255)) - samples = relationship("Sample", backref='resource') class Alarm(Base): diff --git a/ceilometer/tests/storage/test_impl_sqlalchemy.py b/ceilometer/tests/storage/test_impl_sqlalchemy.py index 75d009d5..edf1dcc4 100644 --- a/ceilometer/tests/storage/test_impl_sqlalchemy.py +++ b/ceilometer/tests/storage/test_impl_sqlalchemy.py @@ -194,18 +194,6 @@ class RelationshipTest(scenarios.DBTestBase): .group_by(sql_models.Sample.id) )).count()) - @patch.object(timeutils, 'utcnow') - def test_clear_metering_data_associations(self, mock_utcnow): - mock_utcnow.return_value = datetime.datetime(2012, 7, 2, 10, 45) - self.conn.clear_expired_metering_data(3 * 60) - - session = self.conn._engine_facade.get_session() - self.assertEqual(0, session.query(sql_models.sourceassoc) - .filter(~sql_models.sourceassoc.c.sample_id.in_( - session.query(sql_models.Sample.id) - .group_by(sql_models.Sample.id) - )).count()) - class CapabilitiesTest(test_base.BaseTestCase): # Check the returned capabilities list, which is specific to each DB