diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index bcbadab1..a5ea6061 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -96,6 +96,17 @@ POSTGRES_GONE_WAY_AWAY_ERRORS = ( # These connection urls mean sqlite is being used as an in-memory DB. SQLITE_IN_MEMORY = ('sqlite://', 'sqlite:///', 'sqlite:///:memory:') +# Transacation isolation levels that will be automatically applied, we prefer +# strong read committed isolation levels to avoid merging and using dirty +# data... +# +# See: http://en.wikipedia.org/wiki/Isolation_(database_systems) +DEFAULT_TXN_ISOLATION_LEVELS = { + 'mysql': 'READ COMMITTED', + 'postgresql': 'READ COMMITTED', + 'postgres': 'READ COMMITTED', +} + def _in_any(reason, err_haystack): """Checks if any elements of the haystack are in the given reason.""" @@ -194,6 +205,24 @@ class SQLAlchemyBackend(base.Backend): ('pool_timeout', 'pool_timeout')]: if lookup_key in conf: engine_args[k] = misc.as_int(conf.pop(lookup_key)) + if 'isolation_level' not in conf: + # Check driver name exact matches first, then try driver name + # partial matches... + txn_isolation_levels = conf.pop('isolation_levels', + DEFAULT_TXN_ISOLATION_LEVELS) + level_applied = False + for (driver, level) in six.iteritems(txn_isolation_levels): + if driver == e_url.drivername: + engine_args['isolation_level'] = level + level_applied = True + break + if not level_applied: + for (driver, level) in six.iteritems(txn_isolation_levels): + if e_url.drivername.find(driver) != -1: + engine_args['isolation_level'] = level + break + else: + engine_args['isolation_level'] = conf.pop('isolation_level') # If the configuration dict specifies any additional engine args # or engine arg overrides make sure we merge them in. engine_args.update(conf.pop('engine_args', {})) @@ -395,11 +424,6 @@ class Connection(base.Connection): def _save_logbook(self, session, lb): try: lb_m = _logbook_get_model(lb.uuid, session=session) - # NOTE(harlowja): Merge them (note that this doesn't provide - # 100% correct update semantics due to how databases have - # MVCC). This is where a stored procedure or a better backing - # store would handle this better by allowing this merge logic - # to exist in the database itself. lb_m = _logbook_merge(lb_m, lb) except exc.NotFound: lb_m = _convert_lb_to_internal(lb)