diff --git a/setup.cfg b/setup.cfg
index 758024dff..c96636842 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -27,6 +27,13 @@ setup-hooks =
 packages =
+taskflow.persistence =
+    memory = taskflow.persistence.backends.impl_memory:MemoryBackend
+    mysql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend
+    postgresql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend
+    sqlite = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend
 cover-erase = true
 verbosity = 2
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py
index 038373b1f..92ceb5a8c 100644
--- a/taskflow/engines/action_engine/engine.py
+++ b/taskflow/engines/action_engine/engine.py
@@ -27,6 +27,8 @@ from taskflow.engines.action_engine import task_action
 from taskflow.patterns import linear_flow as lf
 from taskflow.patterns import unordered_flow as uf
+from taskflow.persistence import utils as p_utils
 from taskflow import decorators
 from taskflow import exceptions as exc
 from taskflow import states
@@ -140,9 +142,13 @@ class SingleThreadedTranslator(Translator):
 class SingleThreadedActionEngine(ActionEngine):
     translator_cls = SingleThreadedTranslator
-    def __init__(self, flow, flow_detail=None):
+    def __init__(self, flow, flow_detail=None, book=None, backend=None):
+        if flow_detail is None:
+            flow_detail = p_utils.create_flow_detail(flow,
+                                                     book=book,
+                                                     backend=backend)
         ActionEngine.__init__(self, flow,
-                              storage=t_storage.Storage(flow_detail))
+                              storage=t_storage.Storage(flow_detail, backend))
 class MultiThreadedTranslator(Translator):
@@ -168,9 +174,15 @@ class MultiThreadedTranslator(Translator):
 class MultiThreadedActionEngine(ActionEngine):
     translator_cls = MultiThreadedTranslator
-    def __init__(self, flow, flow_detail=None, thread_pool=None):
+    def __init__(self, flow, flow_detail=None, book=None, backend=None,
+                 thread_pool=None):
+        if flow_detail is None:
+            flow_detail = p_utils.create_flow_detail(flow,
+                                                     book=book,
+                                                     backend=backend)
         ActionEngine.__init__(self, flow,
-                              storage=t_storage.ThreadSafeStorage(flow_detail))
+                              storage=t_storage.ThreadSafeStorage(flow_detail,
+                                                                  backend))
         if thread_pool:
             self._thread_pool = thread_pool
             self._owns_thread_pool = False
diff --git a/taskflow/persistence/__init__.py b/taskflow/persistence/__init__.py
index 3a554a5f7..fbe1837c3 100644
--- a/taskflow/persistence/__init__.py
+++ b/taskflow/persistence/__init__.py
@@ -2,7 +2,7 @@
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
-#    Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
+#    Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
diff --git a/taskflow/persistence/backends/__init__.py b/taskflow/persistence/backends/__init__.py
index 3a554a5f7..18ba20988 100644
--- a/taskflow/persistence/backends/__init__.py
+++ b/taskflow/persistence/backends/__init__.py
@@ -2,7 +2,7 @@
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
-#    Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
+#    Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -15,3 +15,27 @@
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+import logging
+import urlparse
+from stevedore import driver
+from taskflow import exceptions as exc
+# NOTE(harlowja): this is the entrypoint namespace, not the module namespace.
+BACKEND_NAMESPACE = 'taskflow.persistence'
+LOG = logging.getLogger(__name__)
+def fetch(conf, namespace=BACKEND_NAMESPACE):
+    backend_name = urlparse.urlparse(conf['connection']).scheme
+    LOG.debug('Looking for %r backend driver in %r', backend_name, namespace)
+    try:
+        mgr = driver.DriverManager(namespace, backend_name,
+                                   invoke_on_load=True,
+                                   invoke_kwds={'conf': conf})
+        return mgr.driver
+    except RuntimeError as e:
+        raise exc.NotFound("Could not find backend %s: %s" % (backend_name, e))
diff --git a/taskflow/persistence/backends/api.py b/taskflow/persistence/backends/api.py
deleted file mode 100644
index 8845e0227..000000000
--- a/taskflow/persistence/backends/api.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# -*- coding: utf-8 -*-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#    Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#         http://www.apache.org/licenses/LICENSE-2.0
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-import threading
-from taskflow import exceptions as exc
-from taskflow.openstack.common import importutils
-    'memory': 'taskflow.persistence.backends.memory.api',
-    'sqlalchemy': 'taskflow.persistence.backends.sqlalchemy.api',
-    # TODO(harlowja): we likely need to allow more customization here so that
-    # its easier for a user of this library to alter the impl to there own
-    # choicing, aka, cinder has its own DB, or heat may want to write this
-    # information into swift, we need a way to accomodate that.
-_BACKEND_MAPPING_LOCK = threading.RLock()
-_BACKEND_LOCK = threading.RLock()
-def register(backend, module):
-    """Register a new (or override an old) backend type.
-    Instead of being restricted to the existing types that are pre-registered
-    in taskflow it is useful to allow others to either override those types
-    or add new ones (since all backend types can not be predicted ahead of
-    time).
-    """
-        _BACKEND_MAPPING[backend] = str(module)
-def fetch(backend):
-    """Fetch a backend impl. for a given backend type."""
-        if backend not in _BACKEND_MAPPING:
-            raise exc.NotFound("Unknown backend %s requested" % (backend))
-        mod = _BACKEND_MAPPING.get(backend, backend)
-    with _BACKEND_LOCK:
-        if mod in _BACKENDS:
-            return _BACKENDS[mod]
-        backend_mod = importutils.import_module(mod)
-        backend_impl = backend_mod.get_backend()
-        _BACKENDS[mod] = backend_impl
-        return backend_impl
diff --git a/taskflow/persistence/backends/base.py b/taskflow/persistence/backends/base.py
new file mode 100644
index 000000000..070d73719
--- /dev/null
+++ b/taskflow/persistence/backends/base.py
@@ -0,0 +1,104 @@
+# -*- coding: utf-8 -*-
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#    Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#         http://www.apache.org/licenses/LICENSE-2.0
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+import abc
+class Backend(object):
+    """Base class for persistence backends."""
+    __metaclass__ = abc.ABCMeta
+    def __init__(self, conf):
+        self._conf = conf
+    @abc.abstractmethod
+    def get_connection(self):
+        """Return a Connection instance based on the configuration settings."""
+        pass
+    @abc.abstractmethod
+    def close(self):
+        """Closes any resources this backend has open."""
+        pass
+class Connection(object):
+    """Base class for backend connections."""
+    __metaclass__ = abc.ABCMeta
+    @abc.abstractproperty
+    def backend(self):
+        """Returns the backend this connection is associated with."""
+        pass
+    @abc.abstractmethod
+    def close(self):
+        """Closes any resources this connection has open."""
+        pass
+    @abc.abstractmethod
+    def upgrade(self):
+        """Migrate the persistence backend to the most recent version."""
+        pass
+    @abc.abstractmethod
+    def clear_all(self):
+        """Clear all entries from this backend."""
+        pass
+    @abc.abstractmethod
+    def update_task_details(self, task_detail):
+        """Updates a given task details and returns the updated version.
+        NOTE(harlowja): the details that is to be updated must already have
+        been created by saving a flow details with the given task detail inside
+        of it.
+        """
+        pass
+    @abc.abstractmethod
+    def update_flow_details(self, flow_detail):
+        """Updates a given flow details and returns the updated version.
+        NOTE(harlowja): the details that is to be updated must already have
+        been created by saving a logbook with the given flow detail inside
+        of it.
+        """
+        pass
+    @abc.abstractmethod
+    def save_logbook(self, book):
+        """Saves a logbook, and all its contained information."""
+        pass
+    @abc.abstractmethod
+    def destroy_logbook(self, book_uuid):
+        """Deletes/destroys a logbook matching the given uuid."""
+        pass
+    @abc.abstractmethod
+    def get_logbook(self, book_uuid):
+        """Fetches a logbook object matching the given uuid."""
+        pass
+    @abc.abstractmethod
+    def get_logbooks(self):
+        """Return an iterable of logbook objects."""
+        pass
diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py
new file mode 100644
index 000000000..9ba041add
--- /dev/null
+++ b/taskflow/persistence/backends/impl_memory.py
@@ -0,0 +1,204 @@
+# -*- coding: utf-8 -*-
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#    Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#         http://www.apache.org/licenses/LICENSE-2.0
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""Implementation of in-memory backend."""
+import copy
+import logging
+import threading
+import weakref
+from taskflow import decorators
+from taskflow import exceptions as exc
+from taskflow.openstack.common import timeutils
+from taskflow.persistence.backends import base
+LOG = logging.getLogger(__name__)
+# TODO(harlowja): we likely need to figure out a better place to put these
+# rather than globals.
+_LOG_BOOKS = {}
+# For now this will be a pretty big lock, since it is not expected that saves
+# will be that frequent this seems ok for the time being. I imagine that this
+# can be done better but it will require much more careful usage of a dict as
+# a key/value map. Aka I wish python had a concurrent dict that was safe and
+# known good to use.
+_SAVE_LOCK = threading.RLock()
+_READ_LOCK = threading.RLock()
+def _copy(obj):
+    return copy.deepcopy(obj)
+class MemoryBackend(base.Backend):
+    def get_connection(self):
+        return Connection(self)
+    def close(self):
+        pass
+class Connection(base.Connection):
+    def __init__(self, backend):
+        self._read_lock = _READ_LOCK
+        self._save_locks = _READ_SAVE_ORDER
+        self._backend = weakref.proxy(backend)
+    def upgrade(self):
+        pass
+    @property
+    def backend(self):
+        return self._backend
+    def close(self):
+        pass
+    @decorators.locked(lock="_save_locks")
+    def clear_all(self):
+        count = 0
+        for uuid in list(_LOG_BOOKS.iterkeys()):
+            self.destroy_logbook(uuid)
+            count += 1
+        return count
+    @decorators.locked(lock="_save_locks")
+    def destroy_logbook(self, book_uuid):
+        try:
+            # Do the same cascading delete that the sql layer does.
+            lb = _LOG_BOOKS.pop(book_uuid)
+            for fd in lb:
+                _FLOW_DETAILS.pop(fd.uuid, None)
+                for td in fd:
+                    _TASK_DETAILS.pop(td.uuid, None)
+        except KeyError:
+            raise exc.NotFound("No logbook found with id: %s" % book_uuid)
+    @decorators.locked(lock="_save_locks")
+    def update_task_details(self, task_detail):
+        try:
+            return _task_details_merge(_TASK_DETAILS[task_detail.uuid],
+                                       task_detail)
+        except KeyError:
+            raise exc.NotFound("No task details found with id: %s"
+                               % task_detail.uuid)
+    @decorators.locked(lock="_save_locks")
+    def update_flow_details(self, flow_detail):
+        try:
+            e_fd = _flow_details_merge(_FLOW_DETAILS[flow_detail.uuid],
+                                       flow_detail)
+            for task_detail in flow_detail:
+                if e_fd.find(task_detail.uuid) is None:
+                    _TASK_DETAILS[task_detail.uuid] = _copy(task_detail)
+                    e_fd.add(task_detail)
+                if task_detail.uuid not in _TASK_DETAILS:
+                    _TASK_DETAILS[task_detail.uuid] = _copy(task_detail)
+                task_detail.update(self.update_task_details(task_detail))
+            return e_fd
+        except KeyError:
+            raise exc.NotFound("No flow details found with id: %s"
+                               % flow_detail.uuid)
+    @decorators.locked(lock="_save_locks")
+    def save_logbook(self, book):
+        # Get a existing logbook model (or create it if it isn't there).
+        try:
+            e_lb = _logbook_merge(_LOG_BOOKS[book.uuid], book)
+            # Add anything in to the new logbook that isn't already
+            # in the existing logbook.
+            for flow_detail in book:
+                if e_lb.find(flow_detail.uuid) is None:
+                    _FLOW_DETAILS[flow_detail.uuid] = _copy(flow_detail)
+                    e_lb.add(flow_detail)
+                if flow_detail.uuid not in _FLOW_DETAILS:
+                    _FLOW_DETAILS[flow_detail.uuid] = _copy(flow_detail)
+                flow_detail.update(self.update_flow_details(flow_detail))
+            # TODO(harlowja): figure out a better way to set this property
+            # without actually setting a 'private' property.
+            e_lb._updated_at = timeutils.utcnow()
+        except KeyError:
+            # Ok the one given is now the one we will save
+            e_lb = _copy(book)
+            # TODO(harlowja): figure out a better way to set this property
+            # without actually setting a 'private' property.
+            e_lb._created_at = timeutils.utcnow()
+            # Record all the pieces as being saved.
+            _LOG_BOOKS[e_lb.uuid] = e_lb
+            for flow_detail in e_lb:
+                _FLOW_DETAILS[flow_detail.uuid] = _copy(flow_detail)
+                flow_detail.update(self.update_flow_details(flow_detail))
+        return e_lb
+    @decorators.locked(lock='_read_lock')
+    def get_logbook(self, book_uuid):
+        try:
+            return _LOG_BOOKS[book_uuid]
+        except KeyError:
+            raise exc.NotFound("No logbook found with id: %s" % book_uuid)
+    def get_logbooks(self):
+        # NOTE(harlowja): don't hold the lock while iterating
+        with self._read_lock:
+            books = list(_LOG_BOOKS.values())
+        for lb in books:
+            yield lb
+# Merging + other helper functions.
+def _task_details_merge(td_e, td_new):
+    if td_e is td_new:
+        return td_e
+    if td_e.state != td_new.state:
+        td_e.state = td_new.state
+    if td_e.results != td_new.results:
+        td_e.results = td_new.results
+    if td_e.exception != td_new.exception:
+        td_e.exception = td_new.exception
+    if td_e.stacktrace != td_new.stacktrace:
+        td_e.stacktrace = td_new.stacktrace
+    if td_e.meta != td_new.meta:
+        td_e.meta = td_new.meta
+    return td_e
+def _flow_details_merge(fd_e, fd_new):
+    if fd_e is fd_new:
+        return fd_e
+    if fd_e.meta != fd_new.meta:
+        fd_e.meta = fd_new.meta
+    if fd_e.state != fd_new.state:
+        fd_e.state = fd_new.state
+    return fd_e
+def _logbook_merge(lb_e, lb_new):
+    if lb_e is lb_new:
+        return lb_e
+    if lb_e.meta != lb_new.meta:
+        lb_e.meta = lb_new.meta
+    return lb_e
diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py
new file mode 100644
index 000000000..1456cbf57
--- /dev/null
+++ b/taskflow/persistence/backends/impl_sqlalchemy.py
@@ -0,0 +1,519 @@
+# -*- coding: utf-8 -*-
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#    Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#         http://www.apache.org/licenses/LICENSE-2.0
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""Implementation of a SQLAlchemy storage backend."""
+from __future__ import absolute_import
+import contextlib
+import copy
+import logging
+import time
+import weakref
+import sqlalchemy as sa
+from sqlalchemy import exceptions as sa_exc
+from sqlalchemy import orm as sa_orm
+from sqlalchemy import pool as sa_pool
+from taskflow import exceptions as exc
+from taskflow.persistence.backends import base
+from taskflow.persistence.backends.sqlalchemy import migration
+from taskflow.persistence.backends.sqlalchemy import models
+from taskflow.persistence import logbook
+from taskflow.utils import misc
+LOG = logging.getLogger(__name__)
+# NOTE(harlowja): This is all very similar to what oslo-incubator uses but is
+# not based on using oslo.cfg and its global configuration (which should not be
+# used in libraries such as taskflow).
+# TODO(harlowja): once oslo.db appears we should be able to use that instead
+# since it's not supposed to have any usage of oslo.cfg in it when it
+# materializes as a library.
+# See: http://dev.mysql.com/doc/refman/5.0/en/error-messages-client.html
+    # Lost connection to MySQL server at '%s', system error: %d
+    '2006',
+    # Can't connect to MySQL server on '%s' (%d)
+    '2003',
+    # Can't connect to local MySQL server through socket '%s' (%d)
+    '2002',
+    # Lost connection to MySQL server at '%s', system error: %d
+    '2006',
+    # Lost connection to MySQL server during query
+    '2013',
+    # Commands out of sync; you can't run this command now
+    '2014',
+    # Can't open shared memory; no answer from server (%lu)
+    '2045',
+    # Lost connection to MySQL server at '%s', system error: %d
+    '2055',
+# See: http://www.postgresql.org/docs/9.1/static/errcodes-appendix.html
+    # connection_exception
+    '08000',
+    # connection_does_not_exist
+    '08003',
+    # connection_failure
+    '08006',
+    # sqlclient_unable_to_establish_sqlconnection
+    '08001',
+    # sqlserver_rejected_establishment_of_sqlconnection
+    '08004',
+    # Just couldn't connect (postgres errors are pretty weird)
+    'could not connect to server',
+    # Server terminated while in progress (postgres errors are pretty weird)
+    'server closed the connection unexpectedly',
+    'terminating connection due to administrator command',
+# These connection urls mean sqlite is being used as an in-memory DB
+SQLITE_IN_MEMORY = ("sqlite://", 'sqlite:///:memory:')
+def _in_any(reason, err_haystack):
+    """Checks if any elements of the haystack are in the given reason"""
+    for err in err_haystack:
+        if reason.find(str(err)) != -1:
+            return True
+    return False
+def _is_db_connection_error(reason):
+    return _in_any(reason, list(MY_SQL_CONN_ERRORS + POSTGRES_CONN_ERRORS))
+def _thread_yield(dbapi_con, con_record):
+    """Ensure other greenthreads get a chance to be executed.
+    If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
+    execute instead of time.sleep(0).
+    Force a context switch. With common database backends (eg MySQLdb and
+    sqlite), there is no implicit yield caused by network I/O since they are
+    implemented by C libraries that eventlet cannot monkey patch.
+    """
+    time.sleep(0)
+def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
+    """Ensures that MySQL connections checked out of the pool are alive.
+    Modified + borrowed from: http://bit.ly/14BYaW6
+    """
+    try:
+        dbapi_conn.cursor().execute('select 1')
+    except dbapi_conn.OperationalError as ex:
+        if _in_any(str(ex.args[0]), MY_SQL_GONE_WAY_AWAY_ERRORS):
+            LOG.warn('Got mysql server has gone away: %s', ex)
+            raise sa_exc.DisconnectionError("Database server went away")
+        elif _in_any(str(ex.args[0]), POSTGRES_GONE_WAY_AWAY_ERRORS):
+            LOG.warn('Got postgres server has gone away: %s', ex)
+            raise sa_exc.DisconnectionError("Database server went away")
+        else:
+            raise
+class SQLAlchemyBackend(base.Backend):
+    def __init__(self, conf):
+        super(SQLAlchemyBackend, self).__init__(conf)
+        self._engine = None
+        self._session_maker = None
+    def _test_connected(self, engine, max_retries=0):
+        def test_connect(failures):
+            try:
+                # See if we can make a connection happen.
+                #
+                # NOTE(harlowja): note that even though we are connecting
+                # once it does not mean that we will be able to connect in
+                # the future, so this is more of a sanity test and is not
+                # complete connection insurance.
+                with contextlib.closing(engine.connect()):
+                    pass
+            except sa_exc.OperationalError as ex:
+                if _is_db_connection_error(str(ex.args[0])):
+                    failures.append(misc.Failure())
+                    return False
+            return True
+        failures = []
+        if test_connect(failures):
+            return engine
+        # Sorry it didn't work out...
+        if max_retries <= 0:
+            failures[-1].reraise()
+        # Go through the exponential backoff loop and see if we can connect
+        # after a given number of backoffs (with a backoff sleeping period
+        # between each attempt)...
+        attempts_left = max_retries
+        for sleepy_secs in misc.ExponentialBackoff(attempts=max_retries):
+            LOG.warn("SQL connection failed due to '%s', %s attempts left.",
+                     failures[-1].exc, attempts_left)
+            LOG.info("Attempting to test the connection again in %s seconds.",
+                     sleepy_secs)
+            time.sleep(sleepy_secs)
+            if test_connect(failures):
+                return engine
+            attempts_left -= 1
+        # Sorry it didn't work out...
+        failures[-1].reraise()
+    def _create_engine(self):
+        # NOTE(harlowja): copy the internal one so that we don't modify it via
+        # all the popping that will happen below.
+        conf = copy.deepcopy(self._conf)
+        engine_args = {
+            'echo': misc.as_bool(conf.pop('echo', False)),
+            'convert_unicode': misc.as_bool(conf.pop('convert_unicode', True)),
+            'pool_recycle': 3600,
+        }
+        try:
+            idle_timeout = misc.as_int(conf.pop('idle_timeout', None))
+            engine_args['pool_recycle'] = idle_timeout
+        except TypeError:
+            pass
+        sql_connection = conf.pop('connection')
+        e_url = sa.engine.url.make_url(sql_connection)
+        if 'sqlite' in e_url.drivername:
+            engine_args["poolclass"] = sa_pool.NullPool
+            # Adjustments for in-memory sqlite usage
+            if sql_connection.lower().strip() in SQLITE_IN_MEMORY:
+                engine_args["poolclass"] = sa_pool.StaticPool
+                engine_args["connect_args"] = {'check_same_thread': False}
+        else:
+            for (k, lookup_key) in [('pool_size', 'max_pool_size'),
+                                    ('max_overflow', 'max_overflow'),
+                                    ('pool_timeout', 'pool_timeout')]:
+                try:
+                    engine_args[k] = misc.as_int(conf.pop(lookup_key, None))
+                except TypeError:
+                    pass
+        # If the configuration dict specifies any additional engine args
+        # or engine arg overrides make sure we merge them in.
+        engine_args.update(conf.pop('engine_args', {}))
+        engine = sa.create_engine(sql_connection, **engine_args)
+        if misc.as_bool(conf.pop('checkin_yield', True)):
+            sa.event.listen(engine, 'checkin', _thread_yield)
+        if 'mysql' in e_url.drivername:
+            if misc.as_bool(conf.pop('checkout_ping', True)):
+                sa.event.listen(engine, 'checkout', _ping_listener)
+        try:
+            max_retries = misc.as_int(conf.pop('max_retries', None))
+        except TypeError:
+            max_retries = 0
+        return self._test_connected(engine, max_retries=max_retries)
+    @property
+    def engine(self):
+        if self._engine is None:
+            self._engine = self._create_engine()
+        return self._engine
+    def _get_session_maker(self):
+        if self._session_maker is None:
+            self._session_maker = sa_orm.sessionmaker(bind=self.engine,
+                                                      autocommit=True)
+        return self._session_maker
+    def get_connection(self):
+        return Connection(self, self._get_session_maker())
+    def close(self):
+        if self._session_maker is not None:
+            self._session_maker.close_all()
+        if self._engine is not None:
+            self._engine.dispose()
+        self._engine = None
+        self._session_maker = None
+class Connection(base.Connection):
+    def __init__(self, backend, session_maker):
+        self._backend = weakref.proxy(backend)
+        self._session_maker = session_maker
+        self._engine = backend.engine
+    @property
+    def backend(self):
+        return self._backend
+    def _run_in_session(self, functor, *args, **kwargs):
+        """Runs a function in a session and makes sure that sqlalchemy
+        exceptions aren't emitted from that sessions actions (as that would
+        expose the underlying backends exception model).
+        """
+        try:
+            session = self._make_session()
+            with session.begin():
+                return functor(session, *args, **kwargs)
+        except sa_exc.SQLAlchemyError as e:
+            raise exc.StorageError("Failed running database session: %s" % e,
+                                   e)
+    def _make_session(self):
+        try:
+            return self._session_maker()
+        except sa_exc.SQLAlchemyError as e:
+            raise exc.StorageError("Failed creating database session: %s"
+                                   % e, e)
+    def upgrade(self):
+        try:
+            with contextlib.closing(self._engine.connect()) as conn:
+                migration.db_sync(conn)
+        except sa_exc.SQLAlchemyError as e:
+            raise exc.StorageError("Failed upgrading database version: %s" % e,
+                                   e)
+    def _clear_all(self, session):
+        # NOTE(harlowja): due to how we have our relationship setup and
+        # cascading deletes are enabled, this will cause all associated
+        # task details and flow details to automatically be purged.
+        try:
+            return session.query(models.LogBook).delete()
+        except sa_exc.DBAPIError as e:
+            raise exc.StorageError("Failed clearing all entries: %s" % e, e)
+    def clear_all(self):
+        return self._run_in_session(self._clear_all)
+    def _update_task_details(self, session, td):
+        # Must already exist since a tasks details has a strong connection to
+        # a flow details, and tasks details can not be saved on there own since
+        # they *must* have a connection to an existing flow details.
+        td_m = _task_details_get_model(td.uuid, session=session)
+        td_m = _taskdetails_merge(td_m, td)
+        td_m = session.merge(td_m)
+        return _convert_td_to_external(td_m)
+    def update_task_details(self, task_detail):
+        return self._run_in_session(self._update_task_details, td=task_detail)
+    def _update_flow_details(self, session, fd):
+        # Must already exist since a flow details has a strong connection to
+        # a logbook, and flow details can not be saved on there own since they
+        # *must* have a connection to an existing logbook.
+        fd_m = _flow_details_get_model(fd.uuid, session=session)
+        fd_m = _flowdetails_merge(fd_m, fd)
+        fd_m = session.merge(fd_m)
+        return _convert_fd_to_external(fd_m)
+    def update_flow_details(self, flow_detail):
+        return self._run_in_session(self._update_flow_details, fd=flow_detail)
+    def _destroy_logbook(self, session, lb_id):
+        try:
+            lb = _logbook_get_model(lb_id, session=session)
+            session.delete(lb)
+        except sa_exc.DBAPIError as e:
+            raise exc.StorageError("Failed destroying"
+                                   " logbook %s: %s" % (lb_id, e), e)
+    def destroy_logbook(self, book_uuid):
+        return self._run_in_session(self._destroy_logbook, lb_id=book_uuid)
+    def _save_logbook(self, session, lb):
+        try:
+            lb_m = _logbook_get_model(lb.uuid, session=session)
+            # NOTE(harlowja): Merge them (note that this doesn't provide
+            # 100% correct update semantics due to how databases have
+            # MVCC). This is where a stored procedure or a better backing
+            # store would handle this better by allowing this merge logic
+            # to exist in the database itself.
+            lb_m = _logbook_merge(lb_m, lb)
+        except exc.NotFound:
+            lb_m = _convert_lb_to_internal(lb)
+        try:
+            lb_m = session.merge(lb_m)
+            return _convert_lb_to_external(lb_m)
+        except sa_exc.DBAPIError as e:
+            raise exc.StorageError("Failed saving logbook %s: %s" %
+                                   (lb.uuid, e), e)
+    def save_logbook(self, book):
+        return self._run_in_session(self._save_logbook, lb=book)
+    def get_logbook(self, book_uuid):
+        session = self._make_session()
+        try:
+            lb = _logbook_get_model(book_uuid, session=session)
+            return _convert_lb_to_external(lb)
+        except sa_exc.DBAPIError as e:
+            raise exc.StorageError("Failed getting logbook %s: %s"
+                                   % (book_uuid, e), e)
+    def get_logbooks(self):
+        session = self._make_session()
+        try:
+            raw_books = session.query(models.LogBook).all()
+            books = [_convert_lb_to_external(lb) for lb in raw_books]
+        except sa_exc.DBAPIError as e:
+            raise exc.StorageError("Failed getting logbooks: %s" % e, e)
+        for lb in books:
+            yield lb
+    def close(self):
+        pass
+# Internal <-> external model + merging + other helper functions.
+def _convert_fd_to_external(fd):
+    fd_c = logbook.FlowDetail(fd.name, uuid=fd.uuid)
+    fd_c.meta = fd.meta
+    fd_c.state = fd.state
+    for td in fd.taskdetails:
+        fd_c.add(_convert_td_to_external(td))
+    return fd_c
+def _convert_fd_to_internal(fd, parent_uuid):
+    fd_m = models.FlowDetail(name=fd.name, uuid=fd.uuid,
+                             parent_uuid=parent_uuid, meta=fd.meta,
+                             state=fd.state)
+    fd_m.taskdetails = []
+    for td in fd:
+        fd_m.taskdetails.append(_convert_td_to_internal(td, fd_m.uuid))
+    return fd_m
+def _convert_td_to_internal(td, parent_uuid):
+    return models.TaskDetail(name=td.name, uuid=td.uuid,
+                             state=td.state, results=td.results,
+                             exception=td.exception, meta=td.meta,
+                             stacktrace=td.stacktrace,
+                             version=td.version, parent_uuid=parent_uuid)
+def _convert_td_to_external(td):
+    # Convert from sqlalchemy model -> external model, this allows us
+    # to change the internal sqlalchemy model easily by forcing a defined
+    # interface (that isn't the sqlalchemy model itself).
+    td_c = logbook.TaskDetail(td.name, uuid=td.uuid)
+    td_c.state = td.state
+    td_c.results = td.results
+    td_c.exception = td.exception
+    td_c.stacktrace = td.stacktrace
+    td_c.meta = td.meta
+    td_c.version = td.version
+    return td_c
+def _convert_lb_to_external(lb_m):
+    """Don't expose the internal sqlalchemy ORM model to the external api."""
+    lb_c = logbook.LogBook(lb_m.name, lb_m.uuid,
+                           updated_at=lb_m.updated_at,
+                           created_at=lb_m.created_at)
+    lb_c.meta = lb_m.meta
+    for fd_m in lb_m.flowdetails:
+        lb_c.add(_convert_fd_to_external(fd_m))
+    return lb_c
+def _convert_lb_to_internal(lb_c):
+    """Don't expose the external model to the sqlalchemy ORM model."""
+    lb_m = models.LogBook(uuid=lb_c.uuid, meta=lb_c.meta, name=lb_c.name)
+    lb_m.flowdetails = []
+    for fd_c in lb_c:
+        lb_m.flowdetails.append(_convert_fd_to_internal(fd_c, lb_c.uuid))
+    return lb_m
+def _logbook_get_model(lb_id, session):
+    entry = session.query(models.LogBook).filter_by(uuid=lb_id).first()
+    if entry is None:
+        raise exc.NotFound("No logbook found with id: %s" % lb_id)
+    return entry
+def _flow_details_get_model(f_id, session):
+    entry = session.query(models.FlowDetail).filter_by(uuid=f_id).first()
+    if entry is None:
+        raise exc.NotFound("No flow details found with id: %s" % f_id)
+    return entry
+def _task_details_get_model(t_id, session):
+    entry = session.query(models.TaskDetail).filter_by(uuid=t_id).first()
+    if entry is None:
+        raise exc.NotFound("No task details found with id: %s" % t_id)
+    return entry
+def _logbook_merge(lb_m, lb):
+    if lb_m.meta != lb.meta:
+        lb_m.meta = lb.meta
+    for fd in lb:
+        existing_fd = False
+        for fd_m in lb_m.flowdetails:
+            if fd_m.uuid == fd.uuid:
+                existing_fd = True
+                fd_m = _flowdetails_merge(fd_m, fd)
+        if not existing_fd:
+            lb_m.flowdetails.append(_convert_fd_to_internal(fd, lb_m.uuid))
+    return lb_m
+def _flowdetails_merge(fd_m, fd):
+    if fd_m.meta != fd.meta:
+        fd_m.meta = fd.meta
+    if fd_m.state != fd.state:
+        fd_m.state = fd.state
+    for td in fd:
+        existing_td = False
+        for td_m in fd_m.taskdetails:
+            if td_m.uuid == td.uuid:
+                existing_td = True
+                td_m = _taskdetails_merge(td_m, td)
+                break
+        if not existing_td:
+            td_m = _convert_td_to_internal(td, fd_m.uuid)
+            fd_m.taskdetails.append(td_m)
+    return fd_m
+def _taskdetails_merge(td_m, td):
+    if td_m.state != td.state:
+        td_m.state = td.state
+    if td_m.results != td.results:
+        td_m.results = td.results
+    if td_m.exception != td.exception:
+        td_m.exception = td.exception
+    if td_m.stacktrace != td.stacktrace:
+        td_m.stacktrace = td.stacktrace
+    if td_m.meta != td.meta:
+        td_m.meta = td.meta
+    return td_m
diff --git a/taskflow/persistence/backends/memory/__init__.py b/taskflow/persistence/backends/memory/__init__.py
deleted file mode 100644
index 275e7e13c..000000000
--- a/taskflow/persistence/backends/memory/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-# -*- coding: utf-8 -*-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#    Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#         http://www.apache.org/licenses/LICENSE-2.0
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
diff --git a/taskflow/persistence/backends/memory/api.py b/taskflow/persistence/backends/memory/api.py
deleted file mode 100644
index 06375d909..000000000
--- a/taskflow/persistence/backends/memory/api.py
+++ /dev/null
@@ -1,166 +0,0 @@
-# -*- coding: utf-8 -*-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#    Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#         http://www.apache.org/licenses/LICENSE-2.0
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-"""Implementation of in-memory backend."""
-import copy
-import logging
-import sys
-import threading
-from taskflow import exceptions as exc
-from taskflow.openstack.common import timeutils
-from taskflow.utils import threading_utils
-LOG = logging.getLogger(__name__)
-# TODO(harlowja): we likely need to figure out a better place to put these
-# rather than globals.
-# For now this will be a pretty big lock, since it is not expected that saves
-# will be that frequent this seems ok for the time being. I imagine that this
-# can be done better but it will require much more careful usage of a dict as
-# a key/value map. Aka I wish python had a concurrent dict that was safe and
-# known good to use.
-SAVE_LOCK = threading.RLock()
-READ_LOCK = threading.RLock()
-def get_backend():
-    """The backend is this module itself."""
-    return sys.modules[__name__]
-def _taskdetails_merge(td_e, td_new):
-    """Merges an existing taskdetails with a new one."""
-    if td_e.state != td_new.state:
-        td_e.state = td_new.state
-    if td_e.results != td_new.results:
-        td_e.results = td_new.results
-    if td_e.exception != td_new.exception:
-        td_e.exception = td_new.exception
-    if td_e.stacktrace != td_new.stacktrace:
-        td_e.stacktrace = td_new.stacktrace
-    if td_e.meta != td_new.meta:
-        td_e.meta = td_new.meta
-    return td_e
-def taskdetails_save(td):
-    with threading_utils.MultiLock(READ_SAVE_ORDER):
-        try:
-            return _taskdetails_merge(TASK_DETAILS[td.uuid], td)
-        except KeyError:
-            raise exc.NotFound("No task details found with id: %s" % td.uuid)
-def flowdetails_save(fd):
-    try:
-        with threading_utils.MultiLock(READ_SAVE_ORDER):
-            e_fd = FLOW_DETAILS[fd.uuid]
-            if e_fd.meta != fd.meta:
-                e_fd.meta = fd.meta
-            if e_fd.state != fd.state:
-                e_fd.state = fd.state
-            for td in fd:
-                if td not in e_fd:
-                    td = copy.deepcopy(td)
-                    TASK_DETAILS[td.uuid] = td
-                    e_fd.add(td)
-                else:
-                    # Previously added but not saved into the taskdetails
-                    # 'permanent' storage.
-                    if td.uuid not in TASK_DETAILS:
-                        TASK_DETAILS[td.uuid] = copy.deepcopy(td)
-                    taskdetails_save(td)
-            return e_fd
-    except KeyError:
-        raise exc.NotFound("No flow details found with id: %s" % fd.uuid)
-def clear_all():
-    with threading_utils.MultiLock(READ_SAVE_ORDER):
-        count = 0
-        for lb_id in list(LOG_BOOKS.iterkeys()):
-            logbook_destroy(lb_id)
-            count += 1
-        return count
-def logbook_get(lb_id):
-    try:
-        with READ_LOCK:
-            return LOG_BOOKS[lb_id]
-    except KeyError:
-        raise exc.NotFound("No logbook found with id: %s" % lb_id)
-def logbook_destroy(lb_id):
-    try:
-        with threading_utils.MultiLock(READ_SAVE_ORDER):
-            # Do the same cascading delete that the sql layer does.
-            lb = LOG_BOOKS.pop(lb_id)
-            for fd in lb:
-                FLOW_DETAILS.pop(fd.uuid, None)
-                for td in fd:
-                    TASK_DETAILS.pop(td.uuid, None)
-    except KeyError:
-        raise exc.NotFound("No logbook found with id: %s" % lb_id)
-def logbook_save(lb):
-    # Acquire all the locks that will be needed to perform this operation with
-    # out being affected by other threads doing it at the same time.
-    with threading_utils.MultiLock(READ_SAVE_ORDER):
-        # Get a existing logbook model (or create it if it isn't there).
-        try:
-            backing_lb = LOG_BOOKS[lb.uuid]
-            if backing_lb.meta != lb.meta:
-                backing_lb.meta = lb.meta
-            # Add anything on to the existing loaded logbook that isn't already
-            # in the existing logbook.
-            for fd in lb:
-                if fd not in backing_lb:
-                    FLOW_DETAILS[fd.uuid] = copy.deepcopy(fd)
-                    backing_lb.add(flowdetails_save(fd))
-                else:
-                    # Previously added but not saved into the flowdetails
-                    # 'permanent' storage.
-                    if fd.uuid not in FLOW_DETAILS:
-                        FLOW_DETAILS[fd.uuid] = copy.deepcopy(fd)
-                    flowdetails_save(fd)
-            # TODO(harlowja): figure out a better way to set this property
-            # without actually letting others set it external.
-            backing_lb._updated_at = timeutils.utcnow()
-        except KeyError:
-            backing_lb = copy.deepcopy(lb)
-            # TODO(harlowja): figure out a better way to set this property
-            # without actually letting others set it external.
-            backing_lb._created_at = timeutils.utcnow()
-            # Record all the pieces as being saved.
-            LOG_BOOKS[lb.uuid] = backing_lb
-            for fd in backing_lb:
-                FLOW_DETAILS[fd.uuid] = fd
-                for td in fd:
-                    TASK_DETAILS[td.uuid] = td
-        return backing_lb
diff --git a/taskflow/persistence/backends/sqlalchemy/api.py b/taskflow/persistence/backends/sqlalchemy/api.py
deleted file mode 100644
index 1dc043765..000000000
--- a/taskflow/persistence/backends/sqlalchemy/api.py
+++ /dev/null
@@ -1,246 +0,0 @@
-# -*- coding: utf-8 -*-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#    Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#         http://www.apache.org/licenses/LICENSE-2.0
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-"""Implementation of a SQLAlchemy storage backend."""
-import logging
-import sys
-from sqlalchemy import exceptions as sql_exc
-from taskflow import exceptions as exc
-from taskflow.openstack.common.db.sqlalchemy import session as db_session
-from taskflow.persistence.backends.sqlalchemy import models
-from taskflow.persistence import flowdetail
-from taskflow.persistence import logbook
-from taskflow.persistence import taskdetail
-LOG = logging.getLogger(__name__)
-def get_backend():
-    """The backend is this module itself."""
-    return sys.modules[__name__]
-def _convert_fd_to_external(fd):
-    fd_c = flowdetail.FlowDetail(fd.name, uuid=fd.uuid, backend='sqlalchemy')
-    fd_c.meta = fd.meta
-    fd_c.state = fd.state
-    for td in fd.taskdetails:
-        fd_c.add(_convert_td_to_external(td))
-    return fd_c
-def _convert_fd_to_internal(fd, lb_uuid):
-    fd_m = models.FlowDetail(name=fd.name, uuid=fd.uuid, parent_uuid=lb_uuid,
-                             meta=fd.meta, state=fd.state)
-    fd_m.taskdetails = []
-    for td in fd:
-        fd_m.taskdetails.append(_convert_td_to_internal(td, fd_m.uuid))
-    return fd_m
-def _convert_td_to_internal(td, parent_uuid):
-    return models.TaskDetail(name=td.name, uuid=td.uuid,
-                             state=td.state, results=td.results,
-                             exception=td.exception, meta=td.meta,
-                             stacktrace=td.stacktrace,
-                             version=td.version, parent_uuid=parent_uuid)
-def _convert_td_to_external(td):
-    # Convert from sqlalchemy model -> external model, this allows us
-    # to change the internal sqlalchemy model easily by forcing a defined
-    # interface (that isn't the sqlalchemy model itself).
-    td_c = taskdetail.TaskDetail(td.name, uuid=td.uuid, backend='sqlalchemy')
-    td_c.state = td.state
-    td_c.results = td.results
-    td_c.exception = td.exception
-    td_c.stacktrace = td.stacktrace
-    td_c.meta = td.meta
-    td_c.version = td.version
-    return td_c
-def _convert_lb_to_external(lb_m):
-    """Don't expose the internal sqlalchemy ORM model to the external api."""
-    lb_c = logbook.LogBook(lb_m.name, lb_m.uuid,
-                           updated_at=lb_m.updated_at,
-                           created_at=lb_m.created_at,
-                           backend='sqlalchemy')
-    lb_c.meta = lb_m.meta
-    for fd_m in lb_m.flowdetails:
-        lb_c.add(_convert_fd_to_external(fd_m))
-    return lb_c
-def _convert_lb_to_internal(lb_c):
-    """Don't expose the external model to the sqlalchemy ORM model."""
-    lb_m = models.LogBook(uuid=lb_c.uuid, meta=lb_c.meta, name=lb_c.name)
-    lb_m.flowdetails = []
-    for fd_c in lb_c:
-        lb_m.flowdetails.append(_convert_fd_to_internal(fd_c, lb_c.uuid))
-    return lb_m
-def _logbook_get_model(lb_id, session):
-    entry = session.query(models.LogBook).filter_by(uuid=lb_id).first()
-    if entry is None:
-        raise exc.NotFound("No logbook found with id: %s" % lb_id)
-    return entry
-def _flow_details_get_model(f_id, session):
-    entry = session.query(models.FlowDetail).filter_by(uuid=f_id).first()
-    if entry is None:
-        raise exc.NotFound("No flow details found with id: %s" % f_id)
-    return entry
-def _task_details_get_model(t_id, session):
-    entry = session.query(models.TaskDetail).filter_by(uuid=t_id).first()
-    if entry is None:
-        raise exc.NotFound("No task details found with id: %s" % t_id)
-    return entry
-def _taskdetails_merge(td_m, td):
-    if td_m.state != td.state:
-        td_m.state = td.state
-    if td_m.results != td.results:
-        td_m.results = td.results
-    if td_m.exception != td.exception:
-        td_m.exception = td.exception
-    if td_m.stacktrace != td.stacktrace:
-        td_m.stacktrace = td.stacktrace
-    if td_m.meta != td.meta:
-        td_m.meta = td.meta
-    return td_m
-def clear_all():
-    session = db_session.get_session()
-    with session.begin():
-        # NOTE(harlowja): due to how we have our relationship setup and
-        # cascading deletes are enabled, this will cause all associated task
-        # details and flow details to automatically be purged.
-        try:
-            return session.query(models.LogBook).delete()
-        except sql_exc.DBAPIError as e:
-            raise exc.StorageError("Failed clearing all entries: %s" % e, e)
-def taskdetails_save(td):
-    # Must already exist since a tasks details has a strong connection to
-    # a flow details, and tasks details can not be saved on there own since
-    # they *must* have a connection to an existing flow details.
-    session = db_session.get_session()
-    with session.begin():
-        td_m = _task_details_get_model(td.uuid, session=session)
-        td_m = _taskdetails_merge(td_m, td)
-        td_m = session.merge(td_m)
-        return _convert_td_to_external(td_m)
-def flowdetails_save(fd):
-    # Must already exist since a flow details has a strong connection to
-    # a logbook, and flow details can not be saved on there own since they
-    # *must* have a connection to an existing logbook.
-    session = db_session.get_session()
-    with session.begin():
-        fd_m = _flow_details_get_model(fd.uuid, session=session)
-        if fd_m.meta != fd.meta:
-            fd_m.meta = fd.meta
-        if fd_m.state != fd.state:
-            fd_m.state = fd.state
-        for td in fd:
-            updated = False
-            for td_m in fd_m.taskdetails:
-                if td_m.uuid == td.uuid:
-                    updated = True
-                    td_m = _taskdetails_merge(td_m, td)
-                    break
-            if not updated:
-                fd_m.taskdetails.append(_convert_td_to_internal(td, fd_m.uuid))
-        fd_m = session.merge(fd_m)
-        return _convert_fd_to_external(fd_m)
-def logbook_destroy(lb_id):
-    session = db_session.get_session()
-    with session.begin():
-        try:
-            lb = _logbook_get_model(lb_id, session=session)
-            session.delete(lb)
-        except sql_exc.DBAPIError as e:
-            raise exc.StorageError("Failed destroying"
-                                   " logbook %s: %s" % (lb_id, e), e)
-def logbook_save(lb):
-    session = db_session.get_session()
-    with session.begin():
-        try:
-            lb_m = _logbook_get_model(lb.uuid, session=session)
-            # NOTE(harlowja): Merge them (note that this doesn't provide 100%
-            # correct update semantics due to how databases have MVCC). This
-            # is where a stored procedure or a better backing store would
-            # handle this better (something more suited to this type of data).
-            for fd in lb:
-                existing_fd = False
-                for fd_m in lb_m.flowdetails:
-                    if fd_m.uuid == fd.uuid:
-                        existing_fd = True
-                        if fd_m.meta != fd.meta:
-                            fd_m.meta = fd.meta
-                        if fd_m.state != fd.state:
-                            fd_m.state = fd.state
-                        for td in fd:
-                            existing_td = False
-                            for td_m in fd_m.taskdetails:
-                                if td_m.uuid == td.uuid:
-                                    existing_td = True
-                                    td_m = _taskdetails_merge(td_m, td)
-                                    break
-                            if not existing_td:
-                                td_m = _convert_td_to_internal(td, fd_m.uuid)
-                                fd_m.taskdetails.append(td_m)
-                if not existing_fd:
-                    lb_m.flowdetails.append(_convert_fd_to_internal(fd,
-                                                                    lb_m.uuid))
-        except exc.NotFound:
-            lb_m = _convert_lb_to_internal(lb)
-        try:
-            lb_m = session.merge(lb_m)
-            return _convert_lb_to_external(lb_m)
-        except sql_exc.DBAPIError as e:
-            raise exc.StorageError("Failed saving"
-                                   " logbook %s: %s" % (lb.uuid, e), e)
-def logbook_get(lb_id):
-    session = db_session.get_session()
-    try:
-        lb_m = _logbook_get_model(lb_id, session=session)
-        return _convert_lb_to_external(lb_m)
-    except sql_exc.DBAPIError as e:
-        raise exc.StorageError("Failed getting"
-                               " logbook %s: %s" % (lb_id, e), e)
diff --git a/taskflow/persistence/backends/sqlalchemy/migration.py b/taskflow/persistence/backends/sqlalchemy/migration.py
index 89c27c29c..52ac7c957 100644
--- a/taskflow/persistence/backends/sqlalchemy/migration.py
+++ b/taskflow/persistence/backends/sqlalchemy/migration.py
@@ -20,25 +20,25 @@
 import os
-from oslo.config import cfg
-import alembic
-from alembic import config as alembic_config
-CONF = cfg.CONF
-                'taskflow.openstack.common.db.sqlalchemy.session',
-                group='database')
+from alembic import config as a_config
+from alembic import environment as a_env
+from alembic import script as a_script
 def _alembic_config():
     path = os.path.join(os.path.dirname(__file__), 'alembic', 'alembic.ini')
-    config = alembic_config.Config(path)
-    if not config.get_main_option('url'):
-        config.set_main_option('sqlalchemy.url', CONF.database.connection)
-    return config
+    return a_config.Config(path)
-def db_sync():
+def db_sync(connection, revision='head'):
+    script = a_script.ScriptDirectory.from_config(_alembic_config())
+    def upgrade(rev, context):
+        return script._upgrade_revs(revision, rev)
     config = _alembic_config()
-    alembic.command.upgrade(config, "head")
+    with a_env.EnvironmentContext(config, script, fn=upgrade, as_sql=False,
+                                  starting_rev=None, destination_rev=revision,
+                                  tag=None) as context:
+        context.configure(connection=connection)
+        context.run_migrations()
diff --git a/taskflow/persistence/backends/sqlalchemy/models.py b/taskflow/persistence/backends/sqlalchemy/models.py
index bc3e48181..59894ca70 100644
--- a/taskflow/persistence/backends/sqlalchemy/models.py
+++ b/taskflow/persistence/backends/sqlalchemy/models.py
@@ -25,6 +25,7 @@ from sqlalchemy.orm import relationship
 from sqlalchemy import types as types
 from taskflow.openstack.common.db.sqlalchemy import models as c_models
 from taskflow.openstack.common import jsonutils
 from taskflow.openstack.common import uuidutils
@@ -41,8 +42,7 @@ class Json(types.TypeDecorator, types.MutableType):
         return jsonutils.loads(value)
-class ModelBase(c_models.ModelBase,
-                c_models.TimestampMixin):
+class ModelBase(c_models.ModelBase, c_models.TimestampMixin):
     """Base model for all taskflow objects"""
     uuid = Column(String, default=uuidutils.generate_uuid,
                   primary_key=True, nullable=False, unique=True)
diff --git a/taskflow/persistence/flowdetail.py b/taskflow/persistence/flowdetail.py
deleted file mode 100644
index 2cad3a2d8..000000000
--- a/taskflow/persistence/flowdetail.py
+++ /dev/null
@@ -1,100 +0,0 @@
-# -*- coding: utf-8 -*-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#    Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#         http://www.apache.org/licenses/LICENSE-2.0
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-from taskflow.persistence.backends import api as b_api
-class FlowDetail(object):
-    """This class contains an append-only list of task detail entries for a
-    given flow along with any metadata associated with that flow.
-    The data contained within this class need *not* backed by the backend
-    storage in real time. The data in this class will only be guaranteed to be
-    persisted when the logbook that contains this flow detail is saved or when
-    the save() method is called directly.
-    """
-    def __init__(self, name, uuid, backend='memory'):
-        self._uuid = uuid
-        self._name = name
-        self._taskdetails = []
-        self.state = None
-        # Any other metadata to include about this flow while storing. For
-        # example timing information could be stored here, other misc. flow
-        # related items (edge connections)...
-        self.meta = None
-        self.backend = backend
-    def _get_backend(self):
-        if not self.backend:
-            return None
-        return b_api.fetch(self.backend)
-    def add(self, td):
-        self._taskdetails.append(td)
-        # When added the backend that the task details is using will be
-        # automatically switched to whatever backend this flow details is
-        # using.
-        if td.backend != self.backend:
-            td.backend = self.backend
-    def find(self, td_uuid):
-        for self_td in self:
-            if self_td.uuid == td_uuid:
-                return self_td
-        return None
-    def find_by_name(self, td_name):
-        for self_td in self:
-            if self_td.name == td_name:
-                return self_td
-        return None
-    def save(self):
-        """Saves *most* of the components of this given object.
-        This will immediately and atomically save the attributes of this flow
-        details object to a backing store providing by the backing api.
-        The underlying storage must contain an existing flow details that this
-        save operation will merge with and then reflect those changes in this
-        object.
-        """
-        backend = self._get_backend()
-        if not backend:
-            raise NotImplementedError("No saving backend provided")
-        fd_u = backend.flowdetails_save(self)
-        if fd_u is self:
-            return
-        self.meta = fd_u.meta
-        self.state = fd_u.state
-        self._taskdetails = fd_u._taskdetails
-    @property
-    def uuid(self):
-        return self._uuid
-    @property
-    def name(self):
-        return self._name
-    def __iter__(self):
-        for td in self._taskdetails:
-            yield td
-    def __len__(self):
-        return len(self._taskdetails)
diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py
index dcced943a..ca28d74c2 100644
--- a/taskflow/persistence/logbook.py
+++ b/taskflow/persistence/logbook.py
@@ -2,7 +2,8 @@
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
-#    Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#    Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -16,8 +17,11 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
+import logging
 from taskflow.openstack.common import uuidutils
-from taskflow.persistence.backends import api as b_api
+LOG = logging.getLogger(__name__)
 class LogBook(object):
@@ -27,10 +31,9 @@ class LogBook(object):
     The data contained within this class need *not* backed by the backend
     storage in real time. The data in this class will only be guaranteed to be
-    persisted when the logbook is saved.
+    persisted when a save occurs via some backend connection.
-    def __init__(self, name, uuid=None, updated_at=None, created_at=None,
-                 backend='memory'):
+    def __init__(self, name, uuid=None, updated_at=None, created_at=None):
         if uuid:
             self._uuid = uuid
@@ -39,14 +42,8 @@ class LogBook(object):
         self._flowdetails = []
         self._updated_at = updated_at
         self._created_at = created_at
-        self.backend = backend
         self.meta = None
-    def _get_backend(self):
-        if not self.backend:
-            return None
-        return b_api.fetch(self.backend)
     def created_at(self):
         return self._created_at
@@ -55,59 +52,12 @@ class LogBook(object):
     def updated_at(self):
         return self._updated_at
-    def save(self):
-        """Saves all the components of the given logbook.
-        This will immediately and atomically save all the entries of the given
-        logbook, including any flow details, and any associated task details,
-        that may be contained in this logbook to a backing store providing by
-        the backing api.
-        If the logbook is the underlying storage contains an existing logbook
-        then this save operation will merge the different flow details objects
-        to that logbook and then reflect those changes in this logbook.
-        """
-        backend = self._get_backend()
-        if not backend:
-            raise NotImplementedError("No saving backend provided")
-        s_book = backend.logbook_save(self)
-        if s_book is self:
-            return
-        # Alter the internal variables to reflect what was saved (which may
-        # have new additions if there was a merge with pre-existing data).
-        self._name = s_book._name
-        self._flowdetails = s_book._flowdetails
-        self._updated_at = s_book._updated_at
-        self._created_at = s_book._created_at
-        self.meta = s_book.meta
-    def delete(self):
-        """Deletes all traces of the given logbook.
-        This will delete the logbook entry, any associated flow detail entries
-        and any associated task detail entries associated with those flow
-        detail entries immediately via the backing api (using a atomic
-        transaction).
-        """
-        backend = self._get_backend()
-        if not backend:
-            raise NotImplementedError("No deleting backend provided")
-        backend.logbook_destroy(self.uuid)
     def add(self, flow_detail):
         """Adds a new entry to the underlying logbook.
         Does not *guarantee* that the details will be immediatly saved.
-        # When added the backend that the flow details (and any owned task
-        # details) is using will be automatically switched to whatever backend
-        # this logbook is using.
-        if flow_detail.backend != self.backend:
-            flow_detail.backend = self.backend
-        for task_detail in flow_detail:
-            if task_detail.backend != self.backend:
-                task_detail.backend = self.backend
     def find(self, flow_uuid):
         for fd in self._flowdetails:
@@ -131,6 +81,115 @@ class LogBook(object):
         return len(self._flowdetails)
-def load(lb_id, backend='memory'):
-    """Loads a given logbook (if it exists) from the given backend type."""
-    return b_api.fetch(backend).logbook_get(lb_id)
+class FlowDetail(object):
+    """This class contains an append-only list of task detail entries for a
+    given flow along with any metadata associated with that flow.
+    The data contained within this class need *not* backed by the backend
+    storage in real time. The data in this class will only be guaranteed to be
+    persisted when a save/update occurs via some backend connection.
+    """
+    def __init__(self, name, uuid):
+        self._uuid = uuid
+        self._name = name
+        self._taskdetails = []
+        self.state = None
+        # Any other metadata to include about this flow while storing. For
+        # example timing information could be stored here, other misc. flow
+        # related items (edge connections)...
+        self.meta = None
+    def update(self, fd):
+        """Updates the objects state to be the same as the given one."""
+        if fd is self:
+            return
+        self._taskdetails = list(fd._taskdetails)
+        self.state = fd.state
+        self.meta = fd.meta
+    def add(self, td):
+        self._taskdetails.append(td)
+    def find(self, td_uuid):
+        for self_td in self:
+            if self_td.uuid == td_uuid:
+                return self_td
+        return None
+    def find_by_name(self, td_name):
+        for self_td in self:
+            if self_td.name == td_name:
+                return self_td
+        return None
+    @property
+    def uuid(self):
+        return self._uuid
+    @property
+    def name(self):
+        return self._name
+    def __iter__(self):
+        for td in self._taskdetails:
+            yield td
+    def __len__(self):
+        return len(self._taskdetails)
+class TaskDetail(object):
+    """This class contains an entry that contains the persistance of a task
+    after or before (or during) it is running including any results it may have
+    produced, any state that it may be in (failed for example), any exception
+    that occured when running and any associated stacktrace that may have
+    occuring during that exception being thrown and any other metadata that
+    should be stored along-side the details about this task.
+    The data contained within this class need *not* backed by the backend
+    storage in real time. The data in this class will only be guaranteed to be
+    persisted when a save/update occurs via some backend connection.
+    """
+    def __init__(self, name, uuid):
+        self._uuid = uuid
+        self._name = name
+        # TODO(harlowja): decide if these should be passed in and therefore
+        # immutable or let them be assigned?
+        #
+        # The state the task was last in.
+        self.state = None
+        # The results it may have produced (useful for reverting).
+        self.results = None
+        # An exception that it may have thrown (or part of it), useful for
+        # knowing what failed.
+        self.exception = None
+        # Any stack trace the exception may have had, useful for debugging or
+        # examining the failure in more depth.
+        self.stacktrace = None
+        # Any other metadata to include about this task while storing. For
+        # example timing information could be stored here, other misc. task
+        # related items.
+        self.meta = None
+        # The version of the task this task details was associated with which
+        # is quite useful for determining what versions of tasks this detail
+        # information can be associated with.
+        self.version = None
+    def update(self, td):
+        """Updates the objects state to be the same as the given one."""
+        if td is self:
+            return
+        self.state = td.state
+        self.meta = td.meta
+        self.stacktrace = td.stacktrace
+        self.exception = td.exception
+        self.results = td.results
+        self.version = td.version
+    @property
+    def uuid(self):
+        return self._uuid
+    @property
+    def name(self):
+        return self._name
diff --git a/taskflow/persistence/taskdetail.py b/taskflow/persistence/taskdetail.py
deleted file mode 100644
index ba135c99a..000000000
--- a/taskflow/persistence/taskdetail.py
+++ /dev/null
@@ -1,94 +0,0 @@
-# -*- coding: utf-8 -*-
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#    Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#         http://www.apache.org/licenses/LICENSE-2.0
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-from taskflow.persistence.backends import api as b_api
-class TaskDetail(object):
-    """This class contains an entry that contains the persistance of a task
-    after or before (or during) it is running including any results it may have
-    produced, any state that it may be in (failed for example), any exception
-    that occured when running and any associated stacktrace that may have
-    occuring during that exception being thrown and any other metadata that
-    should be stored along-side the details about this task.
-    The data contained within this class need *not* backed by the backend
-    storage in real time. The data in this class will only be guaranteed to be
-    persisted when the logbook that contains this task detail is saved or when
-    the save() method is called directly.
-    """
-    def __init__(self, name, uuid, backend='memory'):
-        self._uuid = uuid
-        self._name = name
-        self.backend = backend
-        # TODO(harlowja): decide if these should be passed in and therefore
-        # immutable or let them be assigned?
-        #
-        # The state the task was last in.
-        self.state = None
-        # The results it may have produced (useful for reverting).
-        self.results = None
-        # An exception that it may have thrown (or part of it), useful for
-        # knowing what failed.
-        self.exception = None
-        # Any stack trace the exception may have had, useful for debugging or
-        # examining the failure in more depth.
-        self.stacktrace = None
-        # Any other metadata to include about this task while storing. For
-        # example timing information could be stored here, other misc. task
-        # related items.
-        self.meta = None
-        # The version of the task this task details was associated with which
-        # is quite useful for determining what versions of tasks this detail
-        # information can be associated with.
-        self.version = None
-    def save(self):
-        """Saves *most* of the components of this given object.
-        This will immediately and atomically save the attributes of this task
-        details object to a backing store providing by the backing api.
-        The underlying storage must contain an existing task details that this
-        save operation will merge with and then reflect those changes in this
-        object.
-        """
-        backend = self._get_backend()
-        if not backend:
-            raise NotImplementedError("No saving backend provided")
-        td_u = backend.taskdetails_save(self)
-        if td_u is self:
-            return
-        self.meta = td_u.meta
-        self.exception = td_u.exception
-        self.results = td_u.results
-        self.stacktrace = td_u.stacktrace
-        self.state = td_u.state
-    def _get_backend(self):
-        if not self.backend:
-            return None
-        return b_api.fetch(self.backend)
-    @property
-    def uuid(self):
-        return self._uuid
-    @property
-    def name(self):
-        return self._name
diff --git a/taskflow/persistence/utils.py b/taskflow/persistence/utils.py
new file mode 100644
index 000000000..8bd477ba1
--- /dev/null
+++ b/taskflow/persistence/utils.py
@@ -0,0 +1,84 @@
+# -*- coding: utf-8 -*-
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#         http://www.apache.org/licenses/LICENSE-2.0
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+import contextlib
+import logging
+from taskflow.openstack.common import uuidutils
+from taskflow.persistence import logbook
+LOG = logging.getLogger(__name__)
+def temporary_log_book(backend):
+    """Creates a temporary logbook for temporary usage in the given backend.
+    Mainly useful for tests and other use cases where a temporary logbook
+    is needed for a short-period of time.
+    """
+    book = logbook.LogBook('tmp')
+    with contextlib.closing(backend.get_connection()) as conn:
+        conn.save_logbook(book)
+        return book
+def temporary_flow_detail(backend):
+    """Creates a temporary flow detail and logbook for temporary usage in
+    the given backend.
+    Mainly useful for tests and other use cases where a temporary flow detail
+    is needed for a short-period of time.
+    """
+    flow_id = uuidutils.generate_uuid()
+    book = temporary_log_book(backend)
+    with contextlib.closing(backend.get_connection()) as conn:
+        book.add(logbook.FlowDetail(name='tmp-flow-detail', uuid=flow_id))
+        conn.save_logbook(book)
+        # Return the one from the saved logbook instead of the local one so
+        # that the freshest version is given back.
+        return (book, book.find(flow_id))
+def create_flow_detail(flow, book=None, backend=None):
+    """Creates a flow detail for the given flow and adds it to the provided
+    logbook (if provided) and then uses the given backend (if provided) to
+    save the logbook then returns the created flow detail.
+    """
+    try:
+        flow_name = getattr(flow, 'name')
+    except AttributeError:
+        LOG.warn("Flow %s does not have a name attribute, creating one.", flow)
+        flow_name = uuidutils.generate_uuid()
+    try:
+        flow_id = getattr(flow, 'uuid')
+    except AttributeError:
+        LOG.warn("Flow %s does not have a uuid attribute, creating one.", flow)
+        flow_id = uuidutils.generate_uuid()
+    flow_detail = logbook.FlowDetail(name=flow_name, uuid=flow_id)
+    if book is not None:
+        book.add(flow_detail)
+        if backend is not None:
+            with contextlib.closing(backend.get_connection()) as conn:
+                conn.save_logbook(book)
+        # Return the one from the saved logbook instead of the local one so
+        # that the freshest version is given back
+        return book.find(flow_id)
+    else:
+        if backend is not None:
+            LOG.warn("Can not save %s without a provided logbook", flow)
+        return flow_detail
diff --git a/taskflow/storage.py b/taskflow/storage.py
index 5b8acdf84..054e27106 100644
--- a/taskflow/storage.py
+++ b/taskflow/storage.py
@@ -16,32 +16,14 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
+import contextlib
 from taskflow import exceptions
 from taskflow.openstack.common import uuidutils
-from taskflow.persistence import flowdetail
 from taskflow.persistence import logbook
-from taskflow.persistence import taskdetail
 from taskflow import states
 from taskflow.utils import threading_utils
-def temporary_flow_detail():
-    """Creates flow detail class for temporary usage
-    Creates in-memory logbook and flow detail in it. Should
-    be useful for tests and other use cases where persistence
-    is not needed
-    """
-    lb = logbook.LogBook('tmp', backend='memory')
-    fd = flowdetail.FlowDetail(
-        name='tmp', uuid=uuidutils.generate_uuid(),
-        backend='memory')
-    lb.add(fd)
-    lb.save()
-    fd.save()
-    return fd
@@ -54,16 +36,17 @@ class Storage(object):
     injector_name = '_TaskFlow_INJECTOR'
-    def __init__(self, flow_detail=None):
+    def __init__(self, flow_detail, backend=None):
         self._result_mappings = {}
         self._reverse_mapping = {}
+        self._backend = backend
+        self._flowdetail = flow_detail
-        if flow_detail is None:
-            # TODO(imelnikov): this is useful mainly for tests;
-            #  maybe we should make flow_detail required parameter?
-            self._flowdetail = temporary_flow_detail()
-        else:
-            self._flowdetail = flow_detail
+    def _with_connection(self, functor, *args, **kwargs):
+        if self._backend is None:
+            return
+        with contextlib.closing(self._backend.get_connection()) as conn:
+            functor(conn, *args, **kwargs)
     def add_task(self, uuid, task_name):
         """Add the task to storage
@@ -72,12 +55,18 @@ class Storage(object):
         Task state is set to PENDING.
         # TODO(imelnikov): check that task with same uuid or
-        #   task name does not exist
-        td = taskdetail.TaskDetail(name=task_name, uuid=uuid)
+        # task name does not exist
+        td = logbook.TaskDetail(name=task_name, uuid=uuid)
         td.state = states.PENDING
-        self._flowdetail.save()
-        td.save()
+        self._with_connection(self._save_flow_detail)
+        self._with_connection(self._save_task_detail, task_detail=td)
+    def _save_flow_detail(self, conn):
+        # NOTE(harlowja): we need to update our contained flow detail if
+        # the result of the update actually added more (aka another process
+        # added item to the flow detail).
+        self._flowdetail.update(conn.update_flow_details(self._flowdetail))
     def get_uuid_by_name(self, task_name):
         """Get uuid of task with given name"""
@@ -93,11 +82,17 @@ class Storage(object):
             raise exceptions.NotFound("Unknown task: %r" % uuid)
         return td
+    def _save_task_detail(self, conn, task_detail):
+        # NOTE(harlowja): we need to update our contained task detail if
+        # the result of the update actually added more (aka another process
+        # is also modifying the task detail).
+        task_detail.update(conn.update_task_details(task_detail))
     def set_task_state(self, uuid, state):
         """Set task state"""
         td = self._taskdetail_by_uuid(uuid)
         td.state = state
-        td.save()
+        self._with_connection(self._save_task_detail, task_detail=td)
     def get_task_state(self, uuid):
         """Get state of task with given uuid"""
@@ -108,7 +103,7 @@ class Storage(object):
         td = self._taskdetail_by_uuid(uuid)
         td.state = state
         td.results = data
-        td.save()
+        self._with_connection(self._save_task_detail, task_detail=td)
     def get(self, uuid):
         """Get result for task with id 'uuid' to storage"""
@@ -122,7 +117,7 @@ class Storage(object):
         td = self._taskdetail_by_uuid(uuid)
         td.results = None
         td.state = state
-        td.save()
+        self._with_connection(self._save_task_detail, task_detail=td)
     def inject(self, pairs):
         """Add values into storage
@@ -202,7 +197,7 @@ class Storage(object):
     def set_flow_state(self, state):
         """Set flowdetails state and save it"""
         self._flowdetail.state = state
-        self._flowdetail.save()
+        self._with_connection(self._save_flow_detail)
     def get_flow_state(self):
         """Set state from flowdetails"""
diff --git a/taskflow/test.py b/taskflow/test.py
index 63da745df..2f1ec49b9 100644
--- a/taskflow/test.py
+++ b/taskflow/test.py
@@ -18,38 +18,12 @@
 import unittest2
-from oslo.config import cfg
-CONF = cfg.CONF
 class TestCase(unittest2.TestCase):
     """Test case base class for all unit tests."""
     def setUp(self):
-        """Run before each test method to initialize test environment."""
         super(TestCase, self).setUp()
-        self.overriden = []
-        self.addCleanup(self._clear_attrs)
     def tearDown(self):
         super(TestCase, self).tearDown()
-        self._reset_flags()
-    def _reset_flags(self):
-        for k, group in self.overriden:
-            CONF.clear_override(k, group=group)
-    def _clear_attrs(self):
-        # Delete attributes that don't start with _ so they don't pin
-        # memory around unnecessarily for the duration of the test
-        # suite
-        for key in [k for k in self.__dict__.keys() if k[0] != '_']:
-            del self.__dict__[key]
-    def flags(self, **kw):
-        """Override flag variables for a test."""
-        group = kw.pop('group', None)
-        for k, v in kw.iteritems():
-            CONF.set_override(k, v, group)
-            self.overriden.append((k, group))
diff --git a/taskflow/tests/unit/persistence/base.py b/taskflow/tests/unit/persistence/base.py
index f5efdf8c9..f362f0c76 100644
--- a/taskflow/tests/unit/persistence/base.py
+++ b/taskflow/tests/unit/persistence/base.py
@@ -16,34 +16,33 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
+import contextlib
 from taskflow import exceptions as exc
 from taskflow.openstack.common import uuidutils
-from taskflow.persistence import flowdetail
 from taskflow.persistence import logbook
-from taskflow.persistence import taskdetail
 class PersistenceTestMixin(object):
-    def _get_backend():
+    def _get_connection():
         raise NotImplementedError()
-    def test_logbook_simple_save(self):
+    def test_logbook_save_retrieve(self):
         lb_id = uuidutils.generate_uuid()
         lb_meta = {'1': 2}
         lb_name = 'lb-%s' % (lb_id)
-        lb = logbook.LogBook(name=lb_name, uuid=lb_id,
-                             backend=self._get_backend())
+        lb = logbook.LogBook(name=lb_name, uuid=lb_id)
         lb.meta = lb_meta
         # Should not already exist
-        self.assertRaises(exc.NotFound, logbook.load, lb_id,
-                          backend=self._get_backend())
+        with contextlib.closing(self._get_connection()) as conn:
+            self.assertRaises(exc.NotFound, conn.get_logbook, lb_id)
+            conn.save_logbook(lb)
-        lb.save()
-        del lb
-        lb = None
-        lb = logbook.load(lb_id, backend=self._get_backend())
+        # Make sure we can reload it (and all of its attributes are what
+        # we expect them to be).
+        with contextlib.closing(self._get_connection()) as conn:
+            lb = conn.get_logbook(lb_id)
         self.assertEquals(lb_name, lb.name)
         self.assertEquals(0, len(lb))
         self.assertEquals(lb_meta, lb.meta)
@@ -53,109 +52,104 @@ class PersistenceTestMixin(object):
     def test_flow_detail_save(self):
         lb_id = uuidutils.generate_uuid()
         lb_name = 'lb-%s' % (lb_id)
-        lb = logbook.LogBook(name=lb_name, uuid=lb_id,
-                             backend=self._get_backend())
-        fd = flowdetail.FlowDetail('test', uuid=uuidutils.generate_uuid())
+        lb = logbook.LogBook(name=lb_name, uuid=lb_id)
+        fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
         # Ensure we can't save it since its owning logbook hasn't been
-        # saved.
-        self.assertRaises(exc.NotFound, fd.save)
+        # saved (flow details can not exist on there own without a connection
+        # to a logbook).
+        with contextlib.closing(self._get_connection()) as conn:
+            self.assertRaises(exc.NotFound, conn.get_logbook, lb_id)
+            self.assertRaises(exc.NotFound, conn.update_flow_details, fd)
-        # Ok now we should be able to save it
-        lb.save()
-        fd.save()
+        # Ok now we should be able to save both.
+        with contextlib.closing(self._get_connection()) as conn:
+            conn.save_logbook(lb)
+            conn.update_flow_details(fd)
     def test_task_detail_save(self):
         lb_id = uuidutils.generate_uuid()
         lb_name = 'lb-%s' % (lb_id)
-        lb = logbook.LogBook(name=lb_name, uuid=lb_id,
-                             backend=self._get_backend())
-        fd = flowdetail.FlowDetail('test', uuid=uuidutils.generate_uuid())
+        lb = logbook.LogBook(name=lb_name, uuid=lb_id)
+        fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
-        td = taskdetail.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
+        td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
         # Ensure we can't save it since its owning logbook hasn't been
-        # saved.
-        self.assertRaises(exc.NotFound, fd.save)
-        self.assertRaises(exc.NotFound, td.save)
+        # saved (flow details/task details can not exist on there own without
+        # there parent existing).
+        with contextlib.closing(self._get_connection()) as conn:
+            self.assertRaises(exc.NotFound, conn.update_flow_details, fd)
+            self.assertRaises(exc.NotFound, conn.update_task_details, td)
-        # Ok now we should be able to save it
-        lb.save()
-        fd.save()
-        td.save()
+        # Ok now we should be able to save them.
+        with contextlib.closing(self._get_connection()) as conn:
+            conn.save_logbook(lb)
+            conn.update_flow_details(fd)
+            conn.update_task_details(td)
     def test_logbook_merge_flow_detail(self):
         lb_id = uuidutils.generate_uuid()
         lb_name = 'lb-%s' % (lb_id)
-        lb = logbook.LogBook(name=lb_name, uuid=lb_id,
-                             backend=self._get_backend())
-        fd = flowdetail.FlowDetail('test', uuid=uuidutils.generate_uuid())
+        lb = logbook.LogBook(name=lb_name, uuid=lb_id)
+        fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
-        lb.save()
-        lb2 = logbook.LogBook(name=lb_name, uuid=lb_id,
-                              backend=self._get_backend())
-        fd = flowdetail.FlowDetail('test2', uuid=uuidutils.generate_uuid())
-        lb2.add(fd)
-        lb2.save()
-        lb3 = logbook.load(lb_id, backend=self._get_backend())
-        self.assertEquals(2, len(lb3))
+        with contextlib.closing(self._get_connection()) as conn:
+            conn.save_logbook(lb)
+        lb2 = logbook.LogBook(name=lb_name, uuid=lb_id)
+        fd2 = logbook.FlowDetail('test2', uuid=uuidutils.generate_uuid())
+        lb2.add(fd2)
+        with contextlib.closing(self._get_connection()) as conn:
+            conn.save_logbook(lb2)
+        with contextlib.closing(self._get_connection()) as conn:
+            lb3 = conn.get_logbook(lb_id)
+            self.assertEquals(2, len(lb3))
     def test_logbook_add_flow_detail(self):
         lb_id = uuidutils.generate_uuid()
         lb_name = 'lb-%s' % (lb_id)
-        lb = logbook.LogBook(name=lb_name, uuid=lb_id,
-                             backend=self._get_backend())
-        fd = flowdetail.FlowDetail('test', uuid=uuidutils.generate_uuid())
+        lb = logbook.LogBook(name=lb_name, uuid=lb_id)
+        fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
-        lb.save()
-        lb2 = logbook.load(lb_id, backend=self._get_backend())
-        self.assertEquals(1, len(lb2))
-        self.assertEquals(1, len(lb))
-        self.assertEquals(fd.name, lb2.find(fd.uuid).name)
+        with contextlib.closing(self._get_connection()) as conn:
+            conn.save_logbook(lb)
+        with contextlib.closing(self._get_connection()) as conn:
+            lb2 = conn.get_logbook(lb_id)
+            self.assertEquals(1, len(lb2))
+            self.assertEquals(1, len(lb))
+            self.assertEquals(fd.name, lb2.find(fd.uuid).name)
     def test_logbook_add_task_detail(self):
         lb_id = uuidutils.generate_uuid()
         lb_name = 'lb-%s' % (lb_id)
-        lb = logbook.LogBook(name=lb_name, uuid=lb_id,
-                             backend=self._get_backend())
-        fd = flowdetail.FlowDetail('test', uuid=uuidutils.generate_uuid())
-        td = taskdetail.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
+        lb = logbook.LogBook(name=lb_name, uuid=lb_id)
+        fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
+        td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
-        lb.save()
-        lb2 = logbook.load(lb_id, backend=self._get_backend())
-        self.assertEquals(1, len(lb2))
-        tasks = 0
-        for fd in lb:
-            tasks += len(fd)
-        self.assertEquals(1, tasks)
+        with contextlib.closing(self._get_connection()) as conn:
+            conn.save_logbook(lb)
+        with contextlib.closing(self._get_connection()) as conn:
+            lb2 = conn.get_logbook(lb_id)
+            self.assertEquals(1, len(lb2))
+            tasks = 0
+            for fd in lb:
+                tasks += len(fd)
+            self.assertEquals(1, tasks)
     def test_logbook_delete(self):
         lb_id = uuidutils.generate_uuid()
         lb_name = 'lb-%s' % (lb_id)
-        lb = logbook.LogBook(name=lb_name, uuid=lb_id,
-                             backend=self._get_backend())
-        # Ensure we can't delete it since it hasn't been saved
-        self.assertRaises(exc.NotFound, lb.delete)
-        lb.save()
-        lb2 = logbook.load(lb_id, backend=self._get_backend())
-        self.assertIsNotNone(lb2)
-        lb.delete()
-        self.assertRaises(exc.NotFound, lb.delete)
+        lb = logbook.LogBook(name=lb_name, uuid=lb_id)
+        with contextlib.closing(self._get_connection()) as conn:
+            self.assertRaises(exc.NotFound, conn.destroy_logbook, lb_id)
+        with contextlib.closing(self._get_connection()) as conn:
+            conn.save_logbook(lb)
+        with contextlib.closing(self._get_connection()) as conn:
+            lb2 = conn.get_logbook(lb_id)
+            self.assertIsNotNone(lb2)
+        with contextlib.closing(self._get_connection()) as conn:
+            conn.destroy_logbook(lb_id)
+            self.assertRaises(exc.NotFound, conn.destroy_logbook, lb_id)
diff --git a/taskflow/tests/unit/persistence/test_memory_persistence.py b/taskflow/tests/unit/persistence/test_memory_persistence.py
index 0d940b356..da088c13f 100644
--- a/taskflow/tests/unit/persistence/test_memory_persistence.py
+++ b/taskflow/tests/unit/persistence/test_memory_persistence.py
@@ -16,15 +16,16 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
-from taskflow.persistence.backends import api as b_api
+from taskflow.persistence.backends import impl_memory
 from taskflow import test
 from taskflow.tests.unit.persistence import base
 class MemoryPersistenceTest(test.TestCase, base.PersistenceTestMixin):
-    def _get_backend(self):
-        return 'memory'
+    def _get_connection(self):
+        return impl_memory.MemoryBackend({}).get_connection()
     def tearDown(self):
-        b_api.fetch(self._get_backend()).clear_all()
+        conn = self._get_connection()
+        conn.clear_all()
         super(MemoryPersistenceTest, self).tearDown()
diff --git a/taskflow/tests/unit/persistence/test_sql_persistence.py b/taskflow/tests/unit/persistence/test_sql_persistence.py
index 44a7b0cf0..ba60f5a97 100644
--- a/taskflow/tests/unit/persistence/test_sql_persistence.py
+++ b/taskflow/tests/unit/persistence/test_sql_persistence.py
@@ -19,31 +19,29 @@
 import os
 import tempfile
-from taskflow.openstack.common.db.sqlalchemy import session
-from taskflow.persistence.backends import api as b_api
-from taskflow.persistence.backends.sqlalchemy import migration
+from taskflow.persistence.backends import impl_sqlalchemy
 from taskflow import test
 from taskflow.tests.unit.persistence import base
 class SqlPersistenceTest(test.TestCase, base.PersistenceTestMixin):
     """Inherits from the base test and sets up a sqlite temporary db."""
-    def _get_backend(self):
-        return 'sqlalchemy'
-    def setupDatabase(self):
-        _handle, db_location = tempfile.mkstemp()
-        db_uri = "sqlite:///%s" % (db_location)
-        session.set_defaults(db_uri, db_location)
-        migration.db_sync()
-        return db_location
+    def _get_connection(self):
+        conf = {
+            'connection': self.db_uri,
+        }
+        conn = impl_sqlalchemy.SQLAlchemyBackend(conf).get_connection()
+        return conn
     def setUp(self):
         super(SqlPersistenceTest, self).setUp()
-        self.db_location = self.setupDatabase()
+        self.db_location = tempfile.mktemp(suffix='.db')
+        self.db_uri = "sqlite:///%s" % (self.db_location)
+        # Ensure upgraded to the right schema
+        conn = self._get_connection()
+        conn.upgrade()
     def tearDown(self):
-        b_api.fetch(self._get_backend()).clear_all()
         super(SqlPersistenceTest, self).tearDown()
         if self.db_location and os.path.isfile(self.db_location):
diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py
index 0a3eb6eda..1314c9edc 100644
--- a/taskflow/tests/unit/test_action_engine.py
+++ b/taskflow/tests/unit/test_action_engine.py
@@ -16,21 +16,22 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
+import contextlib
 from multiprocessing import pool
 import time
 from taskflow.patterns import linear_flow as lf
 from taskflow.patterns import unordered_flow as uf
+from taskflow.engines.action_engine import engine as eng
 from taskflow import exceptions
-from taskflow.persistence import taskdetail
+from taskflow.persistence.backends import impl_memory
+from taskflow.persistence import logbook
+from taskflow.persistence import utils as p_utils
 from taskflow import states
-from taskflow import storage
 from taskflow import task
 from taskflow import test
-from taskflow.engines.action_engine import engine as eng
 class TestTask(task.Task):
@@ -103,8 +104,17 @@ class EngineTestBase(object):
     def setUp(self):
         super(EngineTestBase, self).setUp()
         self.values = []
+        self.backend = impl_memory.MemoryBackend(conf={})
+        self.book = p_utils.temporary_log_book(self.backend)
-    def _make_engine(self, _flow, _flow_detail=None):
+    def tearDown(self):
+        super(EngineTestBase, self).tearDown()
+        with contextlib.closing(self.backend) as be:
+            with contextlib.closing(be.get_connection()) as conn:
+                conn.clear_all()
+        self.book = None
+    def _make_engine(self, flow, flow_detail=None):
         raise NotImplementedError()
@@ -114,7 +124,6 @@ class EngineTaskTest(EngineTestBase):
         flow = lf.Flow('test-1')
         flow.add(TestTask(self.values, name='task1'))
         engine = self._make_engine(flow)
-        engine.compile()
         self.assertEquals(self.values, ['task1'])
@@ -405,13 +414,15 @@ class EngineParallelFlowTest(EngineTestBase):
         # Create FlowDetail as if we already run task1
-        fd = storage.temporary_flow_detail()
-        td = taskdetail.TaskDetail(name='task1', uuid='42')
+        _lb, fd = p_utils.temporary_flow_detail(self.backend)
+        td = logbook.TaskDetail(name='task1', uuid='42')
         td.state = states.SUCCESS
         td.results = 17
-        fd.save()
-        td.save()
+        with contextlib.closing(self.backend.get_connection()) as conn:
+            fd.update(conn.update_flow_details(fd))
+            td.update(conn.update_task_details(td))
         engine = self._make_engine(flow, fd)
@@ -425,22 +436,30 @@ class SingleThreadedEngineTest(EngineTaskTest,
     def _make_engine(self, flow, flow_detail=None):
-        return eng.SingleThreadedActionEngine(flow, flow_detail=flow_detail)
+        if flow_detail is None:
+            flow_detail = p_utils.create_flow_detail(flow, self.book,
+                                                     self.backend)
+        return eng.SingleThreadedActionEngine(flow, backend=self.backend,
+                                              flow_detail=flow_detail)
 class MultiThreadedEngineTest(EngineTaskTest,
-    def _make_engine(self, flow, flow_detail=None):
-        return eng.MultiThreadedActionEngine(flow, flow_detail=flow_detail)
+    def _make_engine(self, flow, flow_detail=None, thread_pool=None):
+        if flow_detail is None:
+            flow_detail = p_utils.create_flow_detail(flow, self.book,
+                                                     self.backend)
+        return eng.MultiThreadedActionEngine(flow, backend=self.backend,
+                                             flow_detail=flow_detail,
+                                             thread_pool=thread_pool)
     def test_using_common_pool(self):
         flow = TestTask(self.values, name='task1')
         thread_pool = pool.ThreadPool()
-        e1 = eng.MultiThreadedActionEngine(flow, thread_pool=thread_pool)
-        e2 = eng.MultiThreadedActionEngine(flow, thread_pool=thread_pool)
+        e1 = self._make_engine(flow, thread_pool=thread_pool)
+        e2 = self._make_engine(flow, thread_pool=thread_pool)
         self.assertIs(e1.thread_pool, e2.thread_pool)
     def test_parallel_revert_specific(self):
diff --git a/taskflow/tests/unit/test_decorators.py b/taskflow/tests/unit/test_decorators.py
index 037ab2b50..06a9338d3 100644
--- a/taskflow/tests/unit/test_decorators.py
+++ b/taskflow/tests/unit/test_decorators.py
@@ -23,13 +23,11 @@ from taskflow import test
 from taskflow.engines.action_engine import engine as eng
-def _make_engine(flow):
-    e = eng.SingleThreadedActionEngine(flow)
-    e.compile()
-    return e
 class WrapableObjectsTest(test.TestCase):
+    def _make_engine(self, flow):
+        e = eng.SingleThreadedActionEngine(flow)
+        e.compile()
+        return e
     def test_simple_function(self):
         values = []
@@ -52,7 +50,7 @@ class WrapableObjectsTest(test.TestCase):
         with self.assertRaisesRegexp(RuntimeError, '^Woot'):
-            e = _make_engine(flow)
+            e = self._make_engine(flow)
         self.assertEquals(values, ['one', 'fail', 'revert one'])
@@ -80,7 +78,7 @@ class WrapableObjectsTest(test.TestCase):
         with self.assertRaisesRegexp(RuntimeError, '^Woot'):
-            e = _make_engine(flow)
+            e = self._make_engine(flow)
         self.assertEquals(tasks.values, ['one', 'fail'])
@@ -106,7 +104,7 @@ class WrapableObjectsTest(test.TestCase):
         with self.assertRaisesRegexp(RuntimeError, '^Woot'):
-            e = _make_engine(flow)
+            e = self._make_engine(flow)
         self.assertEquals(values, ['one', 'fail'])
@@ -133,6 +131,6 @@ class WrapableObjectsTest(test.TestCase):
         with self.assertRaisesRegexp(RuntimeError, '^Woot'):
-            e = _make_engine(flow)
+            e = self._make_engine(flow)
         self.assertEquals(MyTasks.values, ['one', 'fail'])
diff --git a/taskflow/tests/unit/test_linear_flow.py b/taskflow/tests/unit/test_linear_flow.py
index 21715e36b..f0645cc4b 100644
--- a/taskflow/tests/unit/test_linear_flow.py
+++ b/taskflow/tests/unit/test_linear_flow.py
@@ -19,24 +19,22 @@
 import collections
 from taskflow import decorators
+from taskflow.engines.action_engine import engine as eng
 from taskflow import exceptions as exc
+from taskflow.patterns import linear_flow as lw
 from taskflow import states
 from taskflow import test
-from taskflow.patterns import linear_flow as lw
 from taskflow.tests import utils
-from taskflow.engines.action_engine import engine as eng
-def _make_engine(flow):
-    e = eng.SingleThreadedActionEngine(flow)
-    e.compile()
-    e.storage.inject([('context', {})])
-    return e
 class LinearFlowTest(test.TestCase):
+    def _make_engine(self, flow):
+        e = eng.SingleThreadedActionEngine(flow)
+        e.compile()
+        e.storage.inject([('context', {})])
+        return e
     def make_reverting_task(self, token, blowup=False):
         def do_revert(context, *args, **kwargs):
@@ -67,7 +65,7 @@ class LinearFlowTest(test.TestCase):
         wf = lw.Flow("the-test-action")
-        e = _make_engine(wf)
+        e = self._make_engine(wf)
         data = e.storage.fetch_all()
         self.assertIn('a', data)
@@ -92,7 +90,7 @@ class LinearFlowTest(test.TestCase):
-        e = _make_engine(wf)
+        e = self._make_engine(wf)
         self.assertEquals(2, len(e.storage.fetch('context')))
@@ -111,7 +109,7 @@ class LinearFlowTest(test.TestCase):
         wf.add(self.make_reverting_task(2, False))
         wf.add(self.make_reverting_task(1, True))
-        e = _make_engine(wf)
+        e = self._make_engine(wf)
         e.notifier.register('*', listener)
         e.task_notifier.register('*', task_listener)
         self.assertRaises(Exception, e.run)
@@ -148,7 +146,7 @@ class LinearFlowTest(test.TestCase):
         wf = lw.Flow("the-test-action")
-        e = _make_engine(wf)
+        e = self._make_engine(wf)
         e.notifier.register('*', listener)
@@ -159,7 +157,7 @@ class LinearFlowTest(test.TestCase):
         for i in range(0, 10):
-        e = _make_engine(wf)
+        e = self._make_engine(wf)
         capture_func, captured = self._capture_states()
         e.task_notifier.register('*', capture_func)
@@ -189,7 +187,7 @@ class LinearFlowTest(test.TestCase):
         wf.add(self.make_reverting_task(2, True))
         capture_func, captured = self._capture_states()
-        e = _make_engine(wf)
+        e = self._make_engine(wf)
         e.task_notifier.register('*', capture_func)
         self.assertRaises(Exception, e.run)
@@ -225,7 +223,7 @@ class LinearFlowTest(test.TestCase):
         wf = lw.Flow("the-test-action")
-        e = _make_engine(wf)
+        e = self._make_engine(wf)
         self.assertRaises(exc.NotFound, e.run)
     def test_flow_bad_order(self):
@@ -239,7 +237,7 @@ class LinearFlowTest(test.TestCase):
         no_req_task = utils.ProvidesRequiresTask('test-2', requires=['c'],
-        e = _make_engine(wf)
+        e = self._make_engine(wf)
         self.assertRaises(exc.NotFound, e.run)
     def test_flow_set_order(self):
@@ -250,7 +248,7 @@ class LinearFlowTest(test.TestCase):
                                           requires=set(['a', 'b']),
-        e = _make_engine(wf)
+        e = self._make_engine(wf)
         run_context = e.storage.fetch('context')
         ordering = run_context[utils.ORDER_KEY]
@@ -283,7 +281,7 @@ class LinearFlowTest(test.TestCase):
-        e = _make_engine(wf)
+        e = self._make_engine(wf)
         run_context = e.storage.fetch('context')
         ordering = run_context[utils.ORDER_KEY]
diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py
index b0cf0e537..8e7ed3d65 100644
--- a/taskflow/tests/unit/test_storage.py
+++ b/taskflow/tests/unit/test_storage.py
@@ -16,21 +16,38 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
+import contextlib
 from taskflow import exceptions
+from taskflow.persistence.backends import impl_memory
+from taskflow.persistence import utils as p_utils
 from taskflow import states
 from taskflow import storage
 from taskflow import test
 class StorageTest(test.TestCase):
+    def setUp(self):
+        super(StorageTest, self).setUp()
+        self.backend = impl_memory.MemoryBackend(conf={})
+    def _get_storage(self):
+        lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
+        return storage.Storage(backend=self.backend, flow_detail=flow_detail)
+    def tearDown(self):
+        super(StorageTest, self).tearDown()
+        with contextlib.closing(self.backend) as be:
+            with contextlib.closing(be.get_connection()) as conn:
+                conn.clear_all()
     def test_add_task(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.add_task('42', 'my task')
         self.assertEquals(s.get_task_state('42'), states.PENDING)
     def test_save_and_get(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.add_task('42', 'my task')
         s.save('42', 5)
         self.assertEquals(s.get('42'), 5)
@@ -38,20 +55,20 @@ class StorageTest(test.TestCase):
         self.assertEquals(s.get_task_state('42'), states.SUCCESS)
     def test_save_and_get_other_state(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.add_task('42', 'my task')
         s.save('42', 5, states.FAILURE)
         self.assertEquals(s.get('42'), 5)
         self.assertEquals(s.get_task_state('42'), states.FAILURE)
     def test_get_non_existing_var(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.add_task('42', 'my task')
         with self.assertRaises(exceptions.NotFound):
     def test_reset(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.add_task('42', 'my task')
         s.save('42', 5)
@@ -60,12 +77,12 @@ class StorageTest(test.TestCase):
     def test_reset_unknown_task(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.add_task('42', 'my task')
         self.assertEquals(s.reset('42'), None)
     def test_fetch_by_name(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.add_task('42', 'my task')
         name = 'my result'
         s.set_result_mapping('42', {name: None})
@@ -74,13 +91,13 @@ class StorageTest(test.TestCase):
         self.assertEquals(s.fetch_all(), {name: 5})
     def test_fetch_unknown_name(self):
-        s = storage.Storage()
+        s = self._get_storage()
         with self.assertRaisesRegexp(exceptions.NotFound,
                                      "^Name 'xxx' is not mapped"):
     def test_fetch_result_not_ready(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.add_task('42', 'my task')
         name = 'my result'
         s.set_result_mapping('42', {name: None})
@@ -89,7 +106,7 @@ class StorageTest(test.TestCase):
         self.assertEquals(s.fetch_all(), {})
     def test_save_multiple_results(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.add_task('42', 'my task')
         s.set_result_mapping('42', {'foo': 0, 'bar': 1, 'whole': None})
         s.save('42', ('spam', 'eggs'))
@@ -100,14 +117,14 @@ class StorageTest(test.TestCase):
     def test_mapping_none(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.add_task('42', 'my task')
         s.set_result_mapping('42', None)
         s.save('42', 5)
         self.assertEquals(s.fetch_all(), {})
     def test_inject(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.inject({'foo': 'bar', 'spam': 'eggs'})
         self.assertEquals(s.fetch('spam'), 'eggs')
         self.assertEquals(s.fetch_all(), {
@@ -116,48 +133,49 @@ class StorageTest(test.TestCase):
     def test_fetch_meapped_args(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.inject({'foo': 'bar', 'spam': 'eggs'})
         self.assertEquals(s.fetch_mapped_args({'viking': 'spam'}),
                           {'viking': 'eggs'})
     def test_fetch_not_found_args(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.inject({'foo': 'bar', 'spam': 'eggs'})
         with self.assertRaises(exceptions.NotFound):
             s.fetch_mapped_args({'viking': 'helmet'})
     def test_set_and_get_task_state(self):
-        s = storage.Storage()
+        s = self._get_storage()
         state = states.PENDING
         s.add_task('42', 'my task')
         s.set_task_state('42', state)
         self.assertEquals(s.get_task_state('42'), state)
     def test_get_state_of_unknown_task(self):
-        s = storage.Storage()
+        s = self._get_storage()
         with self.assertRaisesRegexp(exceptions.NotFound, '^Unknown'):
     def test_task_by_name(self):
-        s = storage.Storage()
+        s = self._get_storage()
         s.add_task('42', 'my task')
         self.assertEquals(s.get_uuid_by_name('my task'), '42')
     def test_unknown_task_by_name(self):
-        s = storage.Storage()
+        s = self._get_storage()
         with self.assertRaisesRegexp(exceptions.NotFound,
                                      '^Unknown task name:'):
     def test_get_flow_state(self):
-        fd = storage.temporary_flow_detail()
+        lb, fd = p_utils.temporary_flow_detail(backend=self.backend)
         fd.state = states.INTERRUPTED
-        fd.save()
-        s = storage.Storage(fd)
+        with contextlib.closing(self.backend.get_connection()) as conn:
+            fd.update(conn.update_flow_details(fd))
+        s = storage.Storage(flow_detail=fd, backend=self.backend)
         self.assertEquals(s.get_flow_state(), states.INTERRUPTED)
     def test_set_and_get_flow_state(self):
-        s = storage.Storage()
+        s = self._get_storage()
         self.assertEquals(s.get_flow_state(), states.SUCCESS)
diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py
index 00496bbd2..f536d450b 100644
--- a/taskflow/utils/misc.py
+++ b/taskflow/utils/misc.py
@@ -38,6 +38,49 @@ def get_task_version(task):
     return task_version
+class ExponentialBackoff(object):
+    def __init__(self, attempts, exponent=2):
+        self.attempts = int(attempts)
+        self.exponent = exponent
+    def __iter__(self):
+        if self.attempts <= 0:
+            raise StopIteration()
+        for i in xrange(0, self.attempts):
+            yield self.exponent ** i
+    def __str__(self):
+        return "ExponentialBackoff: %s" % ([str(v) for v in self])
+def as_bool(val):
+    if isinstance(val, bool):
+        return val
+    if isinstance(val, basestring):
+        if val.lower() in ('f', 'false', '0', 'n', 'no'):
+            return False
+        if val.lower() in ('t', 'true', '1', 'y', 'yes'):
+            return True
+    return bool(val)
+def as_int(obj, quiet=False):
+    # Try "2" -> 2
+    try:
+        return int(obj)
+    except (ValueError, TypeError):
+        pass
+    # Try "2.5" -> 2
+    try:
+        return int(float(obj))
+    except (ValueError, TypeError):
+        pass
+    # Eck, not sure what this is then.
+    if not quiet:
+        raise TypeError("Can not translate %s to an integer." % (obj))
+    return obj
 def is_version_compatible(version_1, version_2):
     """Checks for major version compatibility of two *string" versions."""
diff --git a/tools/pip-requires b/tools/pip-requires
index 3f7082a9c..4018c4cc9 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -15,3 +15,5 @@ alembic>=0.4.1
+# Used for backend storage engine loading