From 67f43a2f7eba4536024304e52a4f155070f5e191 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Wed, 22 Jul 2015 12:38:18 +0200 Subject: [PATCH] sqlalchemy: switch to new oslo.db facade The SQLAlchemyIndexer is a class with methods, which is not the standalone-function based calling style that the new enginefacade was designed for; additionally, it does not use the global engine scope and instead has its own, which is even disconnectable (do we need that?) So an interim PerInstanceFacade object is produced locally which ideally should be replaced with first-class patterns in oslo.db, supporting context on an methods with "self" as well as supporting self-local transaction facades (if we really need that?). Change-Id: I4e60cca3434e4290145a6a38ce6f27a6c7b1d2be Co-Authored-By: Mike Bayer --- gnocchi/indexer/alembic/env.py | 16 +- gnocchi/indexer/sqlalchemy.py | 408 +++++++++--------- .../indexer/sqlalchemy/test_migrations.py | 2 +- 3 files changed, 221 insertions(+), 205 deletions(-) diff --git a/gnocchi/indexer/alembic/env.py b/gnocchi/indexer/alembic/env.py index dc577200d..cf636cfae 100644 --- a/gnocchi/indexer/alembic/env.py +++ b/gnocchi/indexer/alembic/env.py @@ -71,16 +71,16 @@ def run_migrations_online(): conf = config.conf indexer = sqlalchemy.SQLAlchemyIndexer(conf) indexer.connect() - connectable = indexer.engine_facade.get_engine() + with indexer.facade.writer_connection() as connectable: - with connectable.connect() as connection: - context.configure( - connection=connection, - target_metadata=target_metadata - ) + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=target_metadata + ) - with context.begin_transaction(): - context.run_migrations() + with context.begin_transaction(): + context.run_migrations() indexer.disconnect() diff --git a/gnocchi/indexer/sqlalchemy.py b/gnocchi/indexer/sqlalchemy.py index 41e7ff818..0ed8865ab 100644 --- a/gnocchi/indexer/sqlalchemy.py +++ b/gnocchi/indexer/sqlalchemy.py @@ -17,12 +17,13 @@ from __future__ import absolute_import import itertools import operator import os.path +import threading import uuid import oslo_db.api from oslo_db import exception +from oslo_db.sqlalchemy import enginefacade from oslo_db.sqlalchemy import models -from oslo_db.sqlalchemy import session from oslo_db.sqlalchemy import utils as oslo_db_utils import six import sqlalchemy @@ -63,6 +64,44 @@ def get_resource_mappers(ext): 'history': resource_history_ext} +class PerInstanceFacade(object): + def __init__(self, conf): + self.trans = enginefacade.transaction_context() + self.trans.configure( + **dict(conf.database.items()) + ) + self._context = threading.local() + + def independent_writer(self): + return self.trans.independent.writer.using(self._context) + + def independent_reader(self): + return self.trans.independent.reader.using(self._context) + + def writer_connection(self): + return self.trans.connection.writer.using(self._context) + + def reader_connection(self): + return self.trans.connection.reader.using(self._context) + + def writer(self): + return self.trans.writer.using(self._context) + + def reader(self): + return self.trans.reader.using(self._context) + + def get_engine(self): + # TODO(mbayer): add get_engine() to enginefacade + if not self.trans._factory._started: + self.trans._factory._start() + return self.trans._factory._writer_engine + + def dispose(self): + # TODO(mbayer): add dispose() to enginefacade + if self.trans._factory._started: + self.trans._factory._writer_engine.dispose() + + class SQLAlchemyIndexer(indexer.IndexerDriver): resources = extension.ExtensionManager('gnocchi.indexer.resources') @@ -72,12 +111,10 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): def __init__(self, conf): conf.set_override("connection", conf.indexer.url, "database") self.conf = conf - - def connect(self): - self.engine_facade = session.EngineFacade.from_config(self.conf) + self.facade = PerInstanceFacade(conf) def disconnect(self): - self.engine_facade.get_engine().dispose() + self.facade.dispose() def _get_alembic_config(self): from alembic import config @@ -88,6 +125,9 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): self.conf.database.connection) return cfg + def get_engine(self): + return self.facade.get_engine() + def upgrade(self, nocreate=False): from alembic import command from alembic import migration @@ -97,14 +137,14 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): if nocreate: command.upgrade(cfg, "head") else: - engine = self.engine_facade.get_engine() - ctxt = migration.MigrationContext.configure(engine.connect()) - current_version = ctxt.get_current_revision() - if current_version is None: - Base.metadata.create_all(engine) - command.stamp(cfg, "head") - else: - command.upgrade(cfg, "head") + with self.facade.writer_connection() as connection: + ctxt = migration.MigrationContext.configure(connection) + current_version = ctxt.get_current_revision() + if current_version is None: + Base.metadata.create_all(connection) + command.stamp(cfg, "head") + else: + command.upgrade(cfg, "head") def _resource_type_to_class(self, resource_type, purpose="resource"): if resource_type not in self._RESOURCE_CLASS_MAPPER: @@ -112,42 +152,36 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): return self._RESOURCE_CLASS_MAPPER[resource_type][purpose] def list_archive_policies(self): - session = self.engine_facade.get_session() - aps = list(session.query(ArchivePolicy).all()) - session.expunge_all() - return aps + with self.facade.independent_reader() as session: + return list(session.query(ArchivePolicy).all()) def get_archive_policy(self, name): - session = self.engine_facade.get_session() - ap = session.query(ArchivePolicy).get(name) - session.expunge_all() - return ap + with self.facade.independent_reader() as session: + return session.query(ArchivePolicy).get(name) def delete_archive_policy(self, name): - session = self.engine_facade.get_session() - try: - if session.query(ArchivePolicy).filter( - ArchivePolicy.name == name).delete() == 0: - raise indexer.NoSuchArchivePolicy(name) - except exception.DBReferenceError as e: - if (e.constraint == - 'fk_metric_archive_policy_name_archive_policy_name'): - raise indexer.ArchivePolicyInUse(name) - raise + with self.facade.writer() as session: + try: + if session.query(ArchivePolicy).filter( + ArchivePolicy.name == name).delete() == 0: + raise indexer.NoSuchArchivePolicy(name) + except exception.DBReferenceError as e: + if (e.constraint == + 'fk_metric_archive_policy_name_archive_policy_name'): + raise indexer.ArchivePolicyInUse(name) + raise def get_metrics(self, uuids, active_only=True, with_resource=False): if not uuids: return [] - session = self.engine_facade.get_session() - query = session.query(Metric).filter(Metric.id.in_(uuids)) - if active_only: - query = query.filter(Metric.status == 'active') - if with_resource: - query = query.options(sqlalchemy.orm.joinedload('resource')) + with self.facade.independent_reader() as session: + query = session.query(Metric).filter(Metric.id.in_(uuids)) + if active_only: + query = query.filter(Metric.status == 'active') + if with_resource: + query = query.options(sqlalchemy.orm.joinedload('resource')) - metrics = list(query.all()) - session.expunge_all() - return metrics + return list(query.all()) def create_archive_policy(self, archive_policy): ap = ArchivePolicy( @@ -156,33 +190,27 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): definition=archive_policy.definition, aggregation_methods=list(archive_policy.aggregation_methods), ) - session = self.engine_facade.get_session() - session.add(ap) try: - session.flush() + with self.facade.writer() as session: + session.add(ap) except exception.DBDuplicateEntry: raise indexer.ArchivePolicyAlreadyExists(archive_policy.name) - session.expunge_all() return ap def list_archive_policy_rules(self): - session = self.engine_facade.get_session() - aps = session.query(ArchivePolicyRule).order_by( - ArchivePolicyRule.metric_pattern.desc()).all() - session.expunge_all() - return aps + with self.facade.independent_reader() as session: + return session.query(ArchivePolicyRule).order_by( + ArchivePolicyRule.metric_pattern.desc()).all() def get_archive_policy_rule(self, name): - session = self.engine_facade.get_session() - ap = session.query(ArchivePolicyRule).get(name) - session.expunge_all() - return ap + with self.facade.independent_reader() as session: + return session.query(ArchivePolicyRule).get(name) def delete_archive_policy_rule(self, name): - session = self.engine_facade.get_session() - if session.query(ArchivePolicyRule).filter( - ArchivePolicyRule.name == name).delete() == 0: - raise indexer.NoSuchArchivePolicyRule(name) + with self.facade.writer() as session: + if session.query(ArchivePolicyRule).filter( + ArchivePolicyRule.name == name).delete() == 0: + raise indexer.NoSuchArchivePolicyRule(name) def create_archive_policy_rule(self, name, metric_pattern, archive_policy_name): @@ -191,13 +219,11 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): archive_policy_name=archive_policy_name, metric_pattern=metric_pattern ) - session = self.engine_facade.get_session() - session.add(apr) try: - session.flush() + with self.facade.writer() as session: + session.add(apr) except exception.DBDuplicateEntry: raise indexer.ArchivePolicyRuleAlreadyExists(name) - session.expunge_all() return apr def create_metric(self, id, created_by_user_id, created_by_project_id, @@ -209,35 +235,31 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): archive_policy_name=archive_policy_name, name=name, resource_id=resource_id) - session = self.engine_facade.get_session() - session.add(m) try: - session.flush() + with self.facade.writer() as session: + session.add(m) except exception.DBReferenceError as e: if (e.constraint == 'fk_metric_archive_policy_name_archive_policy_name'): raise indexer.NoSuchArchivePolicy(archive_policy_name) raise - session.expunge_all() return m def list_metrics(self, user_id=None, project_id=None, details=False, status='active', **kwargs): - session = self.engine_facade.get_session() - q = session.query(Metric).filter( - Metric.status == status).order_by(Metric.id) - if user_id is not None: - q = q.filter(Metric.created_by_user_id == user_id) - if project_id is not None: - q = q.filter(Metric.created_by_project_id == project_id) - for attr in kwargs: - q = q.filter(getattr(Metric, attr) == kwargs[attr]) - if details: - q = q.options(sqlalchemy.orm.joinedload('resource')) + with self.facade.independent_reader() as session: + q = session.query(Metric).filter( + Metric.status == status).order_by(Metric.id) + if user_id is not None: + q = q.filter(Metric.created_by_user_id == user_id) + if project_id is not None: + q = q.filter(Metric.created_by_project_id == project_id) + for attr in kwargs: + q = q.filter(getattr(Metric, attr) == kwargs[attr]) + if details: + q = q.options(sqlalchemy.orm.joinedload('resource')) - metrics = list(q.all()) - session.expunge_all() - return metrics + return list(q.all()) def create_resource(self, resource_type, id, created_by_user_id, created_by_project_id, @@ -248,19 +270,19 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): if (started_at is not None and ended_at is not None and started_at > ended_at): - raise ValueError("Start timestamp cannot be after end timestamp") - r = resource_cls( - id=id, - type=resource_type, - created_by_user_id=created_by_user_id, - created_by_project_id=created_by_project_id, - user_id=user_id, - project_id=project_id, - started_at=started_at, - ended_at=ended_at, - **kwargs) - session = self.engine_facade.get_session() - with session.begin(): + raise ValueError( + "Start timestamp cannot be after end timestamp") + with self.facade.writer() as session: + r = resource_cls( + id=id, + type=resource_type, + created_by_user_id=created_by_user_id, + created_by_project_id=created_by_project_id, + user_id=user_id, + project_id=project_id, + started_at=started_at, + ended_at=ended_at, + **kwargs) session.add(r) try: session.flush() @@ -273,11 +295,10 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): if metrics is not None: self._set_metrics_for_resource(session, r, metrics) - # NOTE(jd) Force load of metrics :) - r.metrics + # NOTE(jd) Force load of metrics :) + r.metrics - session.expunge_all() - return r + return r @oslo_db.api.retry_on_deadlock def update_resource(self, resource_type, @@ -288,9 +309,8 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): resource_cls = self._resource_type_to_class(resource_type) resource_history_cls = self._resource_type_to_class(resource_type, "history") - session = self.engine_facade.get_session() - try: - with session.begin(): + with self.facade.writer() as session: + try: # NOTE(sileht): We use FOR UPDATE that is not galera friendly, # but they are no other way to cleanly patch a resource and # store the history that safe when two concurrent calls are @@ -315,7 +335,7 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): # Update the resource if ended_at is not _marker: # NOTE(jd) MySQL does not honor checks. I hate it. - engine = self.engine_facade.get_engine() + engine = session.connection() if engine.dialect.name == "mysql": if r.started_at is not None and ended_at is not None: if r.started_at > ended_at: @@ -338,17 +358,18 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): Metric.status == 'active').update( {"resource_id": None}) self._set_metrics_for_resource(session, r, metrics) - except exception.DBConstraintError as e: - if e.check_name == "ck_started_before_ended": - raise indexer.ResourceValueError( - resource_type, "ended_at", ended_at) - raise - # NOTE(jd) Force load of metrics – do it outside the session! - r.metrics + session.flush() + except exception.DBConstraintError as e: + if e.check_name == "ck_started_before_ended": + raise indexer.ResourceValueError( + resource_type, "ended_at", ended_at) + raise - session.expunge_all() - return r + # NOTE(jd) Force load of metrics – do it outside the session! + r.metrics + + return r @staticmethod def _set_metrics_for_resource(session, r, metrics): @@ -389,8 +410,7 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): session.expire(r, ['metrics']) def delete_resource(self, resource_id): - session = self.engine_facade.get_session() - with session.begin(): + with self.facade.writer() as session: # We are going to delete the resource; the on delete will set the # resource_id of the attached metrics to NULL, we just have to mark # their status as 'delete' @@ -403,15 +423,13 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): def get_resource(self, resource_type, resource_id, with_metrics=False): resource_cls = self._resource_type_to_class(resource_type) - session = self.engine_facade.get_session() - q = session.query( - resource_cls).filter( - resource_cls.id == resource_id) - if with_metrics: - q = q.options(sqlalchemy.orm.joinedload('metrics')) - r = q.first() - session.expunge_all() - return r + with self.facade.independent_reader() as session: + q = session.query( + resource_cls).filter( + resource_cls.id == resource_id) + if with_metrics: + q = q.options(sqlalchemy.orm.joinedload('metrics')) + return q.first() def _get_history_result_mapper(self, resource_type): resource_cls = self._resource_type_to_class(resource_type) @@ -461,97 +479,95 @@ class SQLAlchemyIndexer(indexer.IndexerDriver): sorts=None): sorts = sorts or [] - session = self.engine_facade.get_session() - if history: target_cls = self._get_history_result_mapper(resource_type) else: target_cls = self._resource_type_to_class(resource_type) - q = session.query(target_cls) + with self.facade.independent_reader() as session: + q = session.query(target_cls) + + if attribute_filter: + engine = session.connection() + try: + f = QueryTransformer.build_filter(engine.dialect.name, + target_cls, + attribute_filter) + except indexer.QueryAttributeError as e: + # NOTE(jd) The QueryAttributeError does not know about + # resource_type, so convert it + raise indexer.ResourceAttributeError(resource_type, + e.attribute) + + q = q.filter(f) + + # transform the api-wg representation to the oslo.db one + sort_keys = [] + sort_dirs = [] + for sort in sorts: + sort_key, __, sort_dir = sort.partition(":") + sort_keys.append(sort_key.strip()) + sort_dirs.append(sort_dir or 'asc') + + # paginate_query require at list one uniq column + if 'id' not in sort_keys: + sort_keys.append('id') + sort_dirs.append('asc') + + if marker: + resource_marker = self.get_resource(resource_type, marker) + if resource_marker is None: + raise indexer.InvalidPagination( + "Invalid marker: `%s'" % marker) + else: + resource_marker = None - if attribute_filter: - engine = self.engine_facade.get_engine() try: - f = QueryTransformer.build_filter(engine.dialect.name, - target_cls, - attribute_filter) - except indexer.QueryAttributeError as e: - # NOTE(jd) The QueryAttributeError does not know about - # resource_type, so convert it - raise indexer.ResourceAttributeError(resource_type, - e.attribute) + q = oslo_db_utils.paginate_query(q, target_cls, limit=limit, + sort_keys=sort_keys, + marker=resource_marker, + sort_dirs=sort_dirs) + except (exception.InvalidSortKey, ValueError) as e: + raise indexer.InvalidPagination(e) - q = q.filter(f) + # Always include metrics + q = q.options(sqlalchemy.orm.joinedload("metrics")) + all_resources = q.all() - # transform the api-wg representation to the oslo.db one - sort_keys = [] - sort_dirs = [] - for sort in sorts: - sort_key, __, sort_dir = sort.partition(":") - sort_keys.append(sort_key.strip()) - sort_dirs.append(sort_dir or 'asc') - - # paginate_query require at list one uniq column - if 'id' not in sort_keys: - sort_keys.append('id') - sort_dirs.append('asc') - - if marker: - resource_marker = self.get_resource(resource_type, marker) - if resource_marker is None: - raise indexer.InvalidPagination( - "Invalid marker: `%s'" % marker) - else: - resource_marker = None - - try: - q = oslo_db_utils.paginate_query(q, target_cls, limit=limit, - sort_keys=sort_keys, - marker=resource_marker, - sort_dirs=sort_dirs) - except (exception.InvalidSortKey, ValueError) as e: - raise indexer.InvalidPagination(e) - - # Always include metrics - q = q.options(sqlalchemy.orm.joinedload("metrics")) - all_resources = q.all() - - if details: - grouped_by_type = itertools.groupby( - all_resources, lambda r: (r.revision != -1, r.type)) - all_resources = [] - for (is_history, type), resources in grouped_by_type: - if type == 'generic': - # No need for a second query - all_resources.extend(resources) - else: - if is_history: - target_cls = self._resource_type_to_class(type, - "history") - f = target_cls.revision.in_( - [r.revision for r in resources]) + if details: + grouped_by_type = itertools.groupby( + all_resources, lambda r: (r.revision != -1, r.type)) + all_resources = [] + for (is_history, type), resources in grouped_by_type: + if type == 'generic': + # No need for a second query + all_resources.extend(resources) else: - target_cls = self._resource_type_to_class(type) - f = target_cls.id.in_([r.id for r in resources]) + if is_history: + target_cls = self._resource_type_to_class( + type, "history") + f = target_cls.revision.in_( + [r.revision for r in resources]) + else: + target_cls = self._resource_type_to_class(type) + f = target_cls.id.in_([r.id for r in resources]) - q = session.query(target_cls).filter(f) - # Always include metrics - q = q.options(sqlalchemy.orm.joinedload('metrics')) - all_resources.extend(q.all()) - session.expunge_all() - return all_resources + q = session.query(target_cls).filter(f) + # Always include metrics + q = q.options(sqlalchemy.orm.joinedload('metrics')) + all_resources.extend(q.all()) + return all_resources def expunge_metric(self, id): - session = self.engine_facade.get_session() - if session.query(Metric).filter(Metric.id == id).delete() == 0: - raise indexer.NoSuchMetric(id) + with self.facade.writer() as session: + if session.query(Metric).filter(Metric.id == id).delete() == 0: + raise indexer.NoSuchMetric(id) def delete_metric(self, id): - session = self.engine_facade.get_session() - if session.query(Metric).filter( - Metric.id == id).update({"status": "delete"}) == 0: - raise indexer.NoSuchMetric(id) + with self.facade.writer() as session: + if session.query(Metric).filter( + Metric.id == id).update({"status": "delete"}) == 0: + raise indexer.NoSuchMetric(id) class QueryTransformer(object): diff --git a/gnocchi/tests/indexer/sqlalchemy/test_migrations.py b/gnocchi/tests/indexer/sqlalchemy/test_migrations.py index 0b917e395..63f22f47d 100644 --- a/gnocchi/tests/indexer/sqlalchemy/test_migrations.py +++ b/gnocchi/tests/indexer/sqlalchemy/test_migrations.py @@ -40,7 +40,7 @@ class ModelsMigrationsSync( return sqlalchemy_base.Base.metadata def get_engine(self): - return self.index.engine_facade.get_engine() + return self.index.get_engine() @staticmethod def db_sync(engine):