From 848aaddb1ac0b930e2782a9b44d528947723712c Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 4 Jun 2014 12:49:54 -0700 Subject: [PATCH] Add in default transaction isolation levels Apply a default setting for transaction isolation levels for mysql and postgresql to help avoid consistency issues that happen when two transactions occur at the same time on the same set of records. Closes-Bug: 1326507 Change-Id: I1819722889d0d66d938641af6aa6f79fcfd2deb4 --- .../persistence/backends/impl_sqlalchemy.py | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 2f2cffcb..0cdea01f 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -95,6 +95,17 @@ POSTGRES_GONE_WAY_AWAY_ERRORS = ( # These connection urls mean sqlite is being used as an in-memory DB. SQLITE_IN_MEMORY = ('sqlite://', 'sqlite:///', 'sqlite:///:memory:') +# Transacation isolation levels that will be automatically applied, we prefer +# strong read committed isolation levels to avoid merging and using dirty +# data... +# +# See: http://en.wikipedia.org/wiki/Isolation_(database_systems) +DEFAULT_TXN_ISOLATION_LEVELS = { + 'mysql': 'READ COMMITTED', + 'postgresql': 'READ COMMITTED', + 'postgres': 'READ COMMITTED', +} + def _in_any(reason, err_haystack): """Checks if any elements of the haystack are in the given reason.""" @@ -189,6 +200,24 @@ class SQLAlchemyBackend(base.Backend): ('pool_timeout', 'pool_timeout')]: if lookup_key in conf: engine_args[k] = misc.as_int(conf.pop(lookup_key)) + if 'isolation_level' not in conf: + # Check driver name exact matches first, then try driver name + # partial matches... + txn_isolation_levels = conf.pop('isolation_levels', + DEFAULT_TXN_ISOLATION_LEVELS) + level_applied = False + for (driver, level) in six.iteritems(txn_isolation_levels): + if driver == e_url.drivername: + engine_args['isolation_level'] = level + level_applied = True + break + if not level_applied: + for (driver, level) in six.iteritems(txn_isolation_levels): + if e_url.drivername.find(driver) != -1: + engine_args['isolation_level'] = level + break + else: + engine_args['isolation_level'] = conf.pop('isolation_level') # If the configuration dict specifies any additional engine args # or engine arg overrides make sure we merge them in. engine_args.update(conf.pop('engine_args', {})) @@ -384,11 +413,6 @@ class Connection(base.Connection): def _save_logbook(self, session, lb): try: lb_m = _logbook_get_model(lb.uuid, session=session) - # NOTE(harlowja): Merge them (note that this doesn't provide - # 100% correct update semantics due to how databases have - # MVCC). This is where a stored procedure or a better backing - # store would handle this better by allowing this merge logic - # to exist in the database itself. lb_m = _logbook_merge(lb_m, lb) except exc.NotFound: lb_m = _convert_lb_to_internal(lb)