diff --git a/keystone/openstack/common/context.py b/keystone/openstack/common/context.py
index 643e62b4e5..182b044365 100644
--- a/keystone/openstack/common/context.py
+++ b/keystone/openstack/common/context.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
@@ -23,12 +21,11 @@ context or provide additional information in their specific WSGI pipeline.
 """
 
 import itertools
-
-from keystone.openstack.common import uuidutils
+import uuid
 
 
 def generate_request_id():
-    return 'req-%s' % uuidutils.generate_uuid()
+    return 'req-%s' % str(uuid.uuid4())
 
 
 class RequestContext(object):
@@ -39,26 +36,46 @@ class RequestContext(object):
     accesses the system, as well as additional request information.
     """
 
-    def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False,
-                 read_only=False, show_deleted=False, request_id=None):
+    user_idt_format = '{user} {tenant} {domain} {user_domain} {p_domain}'
+
+    def __init__(self, auth_token=None, user=None, tenant=None, domain=None,
+                 user_domain=None, project_domain=None, is_admin=False,
+                 read_only=False, show_deleted=False, request_id=None,
+                 instance_uuid=None):
         self.auth_token = auth_token
         self.user = user
         self.tenant = tenant
+        self.domain = domain
+        self.user_domain = user_domain
+        self.project_domain = project_domain
         self.is_admin = is_admin
         self.read_only = read_only
         self.show_deleted = show_deleted
+        self.instance_uuid = instance_uuid
         if not request_id:
             request_id = generate_request_id()
         self.request_id = request_id
 
     def to_dict(self):
+        user_idt = (
+            self.user_idt_format.format(user=self.user or '-',
+                                        tenant=self.tenant or '-',
+                                        domain=self.domain or '-',
+                                        user_domain=self.user_domain or '-',
+                                        p_domain=self.project_domain or '-'))
+
         return {'user': self.user,
                 'tenant': self.tenant,
+                'domain': self.domain,
+                'user_domain': self.user_domain,
+                'project_domain': self.project_domain,
                 'is_admin': self.is_admin,
                 'read_only': self.read_only,
                 'show_deleted': self.show_deleted,
                 'auth_token': self.auth_token,
-                'request_id': self.request_id}
+                'request_id': self.request_id,
+                'instance_uuid': self.instance_uuid,
+                'user_identity': user_idt}
 
 
 def get_admin_context(show_deleted=False):
diff --git a/keystone/openstack/common/crypto/utils.py b/keystone/openstack/common/crypto/utils.py
index 717989d4e1..ee738153d3 100644
--- a/keystone/openstack/common/crypto/utils.py
+++ b/keystone/openstack/common/crypto/utils.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2013 Red Hat, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
diff --git a/keystone/openstack/common/db/__init__.py b/keystone/openstack/common/db/__init__.py
index 1b9b60dec1..5f5273f3e0 100644
--- a/keystone/openstack/common/db/__init__.py
+++ b/keystone/openstack/common/db/__init__.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2012 Cloudscaling Group, Inc
 # All Rights Reserved.
 #
diff --git a/keystone/openstack/common/db/api.py b/keystone/openstack/common/db/api.py
index a57f4bd8bc..133304c2da 100644
--- a/keystone/openstack/common/db/api.py
+++ b/keystone/openstack/common/db/api.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright (c) 2013 Rackspace Hosting
 # All Rights Reserved.
 #
diff --git a/keystone/openstack/common/db/exception.py b/keystone/openstack/common/db/exception.py
index 0f843914e6..29cf897765 100644
--- a/keystone/openstack/common/db/exception.py
+++ b/keystone/openstack/common/db/exception.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # All Rights Reserved.
@@ -49,3 +47,8 @@ class DbMigrationError(DBError):
     """Wraps migration specific exception."""
     def __init__(self, message=None):
         super(DbMigrationError, self).__init__(str(message))
+
+
+class DBConnectionError(DBError):
+    """Wraps connection specific exception."""
+    pass
diff --git a/keystone/openstack/common/db/sqlalchemy/__init__.py b/keystone/openstack/common/db/sqlalchemy/__init__.py
index 1b9b60dec1..5f5273f3e0 100644
--- a/keystone/openstack/common/db/sqlalchemy/__init__.py
+++ b/keystone/openstack/common/db/sqlalchemy/__init__.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2012 Cloudscaling Group, Inc
 # All Rights Reserved.
 #
diff --git a/keystone/openstack/common/db/sqlalchemy/migration.py b/keystone/openstack/common/db/sqlalchemy/migration.py
index e1f69ff01d..538826dfca 100644
--- a/keystone/openstack/common/db/sqlalchemy/migration.py
+++ b/keystone/openstack/common/db/sqlalchemy/migration.py
@@ -36,7 +36,8 @@
 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
 
 import distutils.version as dist_version
 import os
diff --git a/keystone/openstack/common/db/sqlalchemy/models.py b/keystone/openstack/common/db/sqlalchemy/models.py
index 76c4acb61b..68f0257e6f 100644
--- a/keystone/openstack/common/db/sqlalchemy/models.py
+++ b/keystone/openstack/common/db/sqlalchemy/models.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
@@ -41,13 +39,13 @@ class ModelBase(object):
         if not session:
             session = sa.get_session()
         # NOTE(boris-42): This part of code should be look like:
-        #                       sesssion.add(self)
+        #                       session.add(self)
         #                       session.flush()
         #                 But there is a bug in sqlalchemy and eventlet that
         #                 raises NoneType exception if there is no running
         #                 transaction and rollback is called. As long as
         #                 sqlalchemy has this bug we have to create transaction
-        #                 explicity.
+        #                 explicitly.
         with session.begin(subtransactions=True):
             session.add(self)
             session.flush()
diff --git a/keystone/openstack/common/db/sqlalchemy/provision.py b/keystone/openstack/common/db/sqlalchemy/provision.py
new file mode 100644
index 0000000000..97b023e9d6
--- /dev/null
+++ b/keystone/openstack/common/db/sqlalchemy/provision.py
@@ -0,0 +1,187 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Mirantis.inc
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Provision test environment for specific DB backends"""
+
+import argparse
+import os
+import random
+import string
+
+import sqlalchemy
+
+from keystone.openstack.common.db import exception as exc
+
+
+SQL_CONNECTION = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION', 'sqlite://')
+
+
+def _gen_credentials(*names):
+    """Generate credentials."""
+    auth_dict = {}
+    for name in names:
+        val = ''.join(random.choice(string.lowercase) for i in xrange(10))
+        auth_dict[name] = val
+    return auth_dict
+
+
+def _get_engine(uri=SQL_CONNECTION):
+    """Engine creation
+
+    By default the uri is SQL_CONNECTION which is admin credentials.
+    Call the function without arguments to get admin connection. Admin
+    connection required to create temporary user and database for each
+    particular test. Otherwise use existing connection to recreate connection
+    to the temporary database.
+    """
+    return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool)
+
+
+def _execute_sql(engine, sql, driver):
+    """Initialize connection, execute sql query and close it."""
+    try:
+        with engine.connect() as conn:
+            if driver == 'postgresql':
+                conn.connection.set_isolation_level(0)
+            for s in sql:
+                conn.execute(s)
+    except sqlalchemy.exc.OperationalError:
+        msg = ('%s does not match database admin '
+               'credentials or database does not exist.')
+        raise exc.DBConnectionError(msg % SQL_CONNECTION)
+
+
+def create_database(engine):
+    """Provide temporary user and database for each particular test."""
+    driver = engine.name
+
+    auth = _gen_credentials('database', 'user', 'passwd')
+
+    sqls = {
+        'mysql': [
+            "drop database if exists %(database)s;",
+            "grant all on %(database)s.* to '%(user)s'@'localhost'"
+            " identified by '%(passwd)s';",
+            "create database %(database)s;",
+        ],
+        'postgresql': [
+            "drop database if exists %(database)s;",
+            "drop user if exists %(user)s;",
+            "create user %(user)s with password '%(passwd)s';",
+            "create database %(database)s owner %(user)s;",
+        ]
+    }
+
+    if driver == 'sqlite':
+        return 'sqlite:////tmp/%s' % auth['database']
+
+    try:
+        sql_rows = sqls[driver]
+    except KeyError:
+        raise ValueError('Unsupported RDBMS %s' % driver)
+    sql_query = map(lambda x: x % auth, sql_rows)
+
+    _execute_sql(engine, sql_query, driver)
+
+    params = auth.copy()
+    params['backend'] = driver
+    return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
+
+
+def drop_database(engine, current_uri):
+    """Drop temporary database and user after each particular test."""
+    engine = _get_engine(current_uri)
+    admin_engine = _get_engine()
+    driver = engine.name
+    auth = {'database': engine.url.database, 'user': engine.url.username}
+
+    if driver == 'sqlite':
+        try:
+            os.remove(auth['database'])
+        except OSError:
+            pass
+        return
+
+    sqls = {
+        'mysql': [
+            "drop database if exists %(database)s;",
+            "drop user '%(user)s'@'localhost';",
+        ],
+        'postgresql': [
+            "drop database if exists %(database)s;",
+            "drop user if exists %(user)s;",
+        ]
+    }
+
+    try:
+        sql_rows = sqls[driver]
+    except KeyError:
+        raise ValueError('Unsupported RDBMS %s' % driver)
+    sql_query = map(lambda x: x % auth, sql_rows)
+
+    _execute_sql(admin_engine, sql_query, driver)
+
+
+def main():
+    """Controller to handle commands
+
+    ::create: Create test user and database with random names.
+    ::drop: Drop user and database created by previous command.
+    """
+    parser = argparse.ArgumentParser(
+        description='Controller to handle database creation and dropping'
+        ' commands.',
+        epilog='Under normal circumstances is not used directly.'
+        ' Used in .testr.conf to automate test database creation'
+        ' and dropping processes.')
+    subparsers = parser.add_subparsers(
+        help='Subcommands to manipulate temporary test databases.')
+
+    create = subparsers.add_parser(
+        'create',
+        help='Create temporary test '
+        'databases and users.')
+    create.set_defaults(which='create')
+    create.add_argument(
+        'instances_count',
+        type=int,
+        help='Number of databases to create.')
+
+    drop = subparsers.add_parser(
+        'drop',
+        help='Drop temporary test databases and users.')
+    drop.set_defaults(which='drop')
+    drop.add_argument(
+        'instances',
+        nargs='+',
+        help='List of databases uri to be dropped.')
+
+    args = parser.parse_args()
+
+    engine = _get_engine()
+    which = args.which
+
+    if which == "create":
+        for i in range(int(args.instances_count)):
+            print(create_database(engine))
+    elif which == "drop":
+        for db in args.instances:
+            drop_database(engine, db)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/keystone/openstack/common/db/sqlalchemy/session.py b/keystone/openstack/common/db/sqlalchemy/session.py
index 510ef4b20d..0c9f7dbef2 100644
--- a/keystone/openstack/common/db/sqlalchemy/session.py
+++ b/keystone/openstack/common/db/sqlalchemy/session.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # All Rights Reserved.
@@ -109,7 +107,7 @@ Recommended ways to use sessions within this framework:
                 filter_by(id=subq.as_scalar()).\
                 update({'bar': newbar})
 
-  For reference, this emits approximagely the following SQL statement:
+  For reference, this emits approximately the following SQL statement:
 
     UPDATE bar SET bar = ${newbar}
         WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
@@ -613,7 +611,7 @@ def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
         dbapi_conn.cursor().execute('select 1')
     except dbapi_conn.OperationalError as ex:
         if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
-            LOG.warn(_('Got mysql server has gone away: %s'), ex)
+            LOG.warning(_('Got mysql server has gone away: %s'), ex)
             raise sqla_exc.DisconnectionError("Database server went away")
         else:
             raise
@@ -695,7 +693,7 @@ def create_engine(sql_connection, sqlite_fk=False):
             remaining = 'infinite'
         while True:
             msg = _('SQL connection failed. %s attempts left.')
-            LOG.warn(msg % remaining)
+            LOG.warning(msg % remaining)
             if remaining != 'infinite':
                 remaining -= 1
             time.sleep(CONF.database.retry_interval)
@@ -754,25 +752,25 @@ def _patch_mysqldb_with_stacktrace_comments():
 
     def _do_query(self, q):
         stack = ''
-        for file, line, method, function in traceback.extract_stack():
+        for filename, line, method, function in traceback.extract_stack():
             # exclude various common things from trace
-            if file.endswith('session.py') and method == '_do_query':
+            if filename.endswith('session.py') and method == '_do_query':
                 continue
-            if file.endswith('api.py') and method == 'wrapper':
+            if filename.endswith('api.py') and method == 'wrapper':
                 continue
-            if file.endswith('utils.py') and method == '_inner':
+            if filename.endswith('utils.py') and method == '_inner':
                 continue
-            if file.endswith('exception.py') and method == '_wrap':
+            if filename.endswith('exception.py') and method == '_wrap':
                 continue
             # db/api is just a wrapper around db/sqlalchemy/api
-            if file.endswith('db/api.py'):
+            if filename.endswith('db/api.py'):
                 continue
             # only trace inside keystone
-            index = file.rfind('keystone')
+            index = filename.rfind('keystone')
             if index == -1:
                 continue
             stack += "File:%s:%s Method:%s() Line:%s | " \
-                     % (file[index:], line, method, function)
+                     % (filename[index:], line, method, function)
 
         # strip trailing " | " from stack
         if stack:
diff --git a/keystone/openstack/common/db/sqlalchemy/test_migrations.py b/keystone/openstack/common/db/sqlalchemy/test_migrations.py
index db01815858..232d4d83df 100644
--- a/keystone/openstack/common/db/sqlalchemy/test_migrations.py
+++ b/keystone/openstack/common/db/sqlalchemy/test_migrations.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010-2011 OpenStack Foundation
 # Copyright 2012-2013 IBM Corp.
 # All Rights Reserved.
@@ -16,17 +14,18 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-
-import commands
 import ConfigParser
+import functools
 import os
-import urlparse
+import subprocess
 
+import lockfile
 import sqlalchemy
 import sqlalchemy.exc
 
-from keystone.openstack.common import lockutils
+from keystone.openstack.common.gettextutils import _
 from keystone.openstack.common import log as logging
+from keystone.openstack.common.py3kcompat import urlutils
 from keystone.openstack.common import test
 
 LOG = logging.getLogger(__name__)
@@ -93,6 +92,22 @@ def get_db_connection_info(conn_pieces):
     return (user, password, database, host)
 
 
+def _set_db_lock(lock_path=None, lock_prefix=None):
+    def decorator(f):
+        @functools.wraps(f)
+        def wrapper(*args, **kwargs):
+            try:
+                path = lock_path or os.environ.get("KEYSTONE_LOCK_PATH")
+                lock = lockfile.FileLock(os.path.join(path, lock_prefix))
+                with lock:
+                    LOG.debug(_('Got lock "%s"') % f.__name__)
+                    return f(*args, **kwargs)
+            finally:
+                LOG.debug(_('Lock released "%s"') % f.__name__)
+        return wrapper
+    return decorator
+
+
 class BaseMigrationTestCase(test.BaseTestCase):
     """Base class fort testing of migration utils."""
 
@@ -143,12 +158,13 @@ class BaseMigrationTestCase(test.BaseTestCase):
         super(BaseMigrationTestCase, self).tearDown()
 
     def execute_cmd(self, cmd=None):
-        status, output = commands.getstatusoutput(cmd)
+        process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
+                                   stderr=subprocess.STDOUT)
+        output = process.communicate()[0]
         LOG.debug(output)
-        self.assertEqual(0, status,
+        self.assertEqual(0, process.returncode,
                          "Failed to run: %s\n%s" % (cmd, output))
 
-    @lockutils.synchronized('pgadmin', 'tests-', external=True)
     def _reset_pg(self, conn_pieces):
         (user, password, database, host) = get_db_connection_info(conn_pieces)
         os.environ['PGPASSWORD'] = password
@@ -170,10 +186,11 @@ class BaseMigrationTestCase(test.BaseTestCase):
         os.unsetenv('PGPASSWORD')
         os.unsetenv('PGUSER')
 
+    @_set_db_lock(lock_prefix='migration_tests-')
     def _reset_databases(self):
         for key, engine in self.engines.items():
             conn_string = self.test_databases[key]
-            conn_pieces = urlparse.urlparse(conn_string)
+            conn_pieces = urlutils.urlparse(conn_string)
             engine.dispose()
             if conn_string.startswith('sqlite'):
                 # We can just delete the SQLite database, which is
diff --git a/keystone/openstack/common/db/sqlalchemy/utils.py b/keystone/openstack/common/db/sqlalchemy/utils.py
index 1ac5cac394..9556af33f3 100644
--- a/keystone/openstack/common/db/sqlalchemy/utils.py
+++ b/keystone/openstack/common/db/sqlalchemy/utils.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # Copyright 2010-2011 OpenStack Foundation.
@@ -96,7 +94,7 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
     if 'id' not in sort_keys:
         # TODO(justinsb): If this ever gives a false-positive, check
         # the actual primary key, rather than assuming its id
-        LOG.warn(_('Id not in sort_keys; is sort_keys unique?'))
+        LOG.warning(_('Id not in sort_keys; is sort_keys unique?'))
 
     assert(not (sort_dir and sort_dirs))
 
diff --git a/keystone/openstack/common/eventlet_backdoor.py b/keystone/openstack/common/eventlet_backdoor.py
index c4d18ddb1f..f11fecd0d5 100644
--- a/keystone/openstack/common/eventlet_backdoor.py
+++ b/keystone/openstack/common/eventlet_backdoor.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright (c) 2012 OpenStack Foundation.
 # Administrator of the National Aeronautics and Space Administration.
 # All Rights Reserved.
diff --git a/keystone/openstack/common/excutils.py b/keystone/openstack/common/excutils.py
index 28d59f90e3..ec8b2ff76c 100644
--- a/keystone/openstack/common/excutils.py
+++ b/keystone/openstack/common/excutils.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # Copyright 2012, Red Hat, Inc.
 #
@@ -24,6 +22,8 @@ import sys
 import time
 import traceback
 
+import six
+
 from keystone.openstack.common.gettextutils import _  # noqa
 
 
@@ -65,7 +65,7 @@ class save_and_reraise_exception(object):
                                                      self.tb))
             return False
         if self.reraise:
-            raise self.type_, self.value, self.tb
+            six.reraise(self.type_, self.value, self.tb)
 
 
 def forever_retry_uncaught_exceptions(infunc):
@@ -77,7 +77,7 @@ def forever_retry_uncaught_exceptions(infunc):
             try:
                 return infunc(*args, **kwargs)
             except Exception as exc:
-                this_exc_message = unicode(exc)
+                this_exc_message = six.u(str(exc))
                 if this_exc_message == last_exc_message:
                     exc_count += 1
                 else:
diff --git a/keystone/openstack/common/fileutils.py b/keystone/openstack/common/fileutils.py
index f4d00259da..454cb166b9 100644
--- a/keystone/openstack/common/fileutils.py
+++ b/keystone/openstack/common/fileutils.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
@@ -19,6 +17,7 @@
 import contextlib
 import errno
 import os
+import tempfile
 
 from keystone.openstack.common import excutils
 from keystone.openstack.common.gettextutils import _  # noqa
@@ -109,3 +108,30 @@ def file_open(*args, **kwargs):
     state at all (for unit tests)
     """
     return file(*args, **kwargs)
+
+
+def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):
+    """Create temporary file or use existing file.
+
+    This util is needed for creating temporary file with
+    specified content, suffix and prefix. If path is not None,
+    it will be used for writing content. If the path doesn't
+    exist it'll be created.
+
+    :param content: content for temporary file.
+    :param path: same as parameter 'dir' for mkstemp
+    :param suffix: same as parameter 'suffix' for mkstemp
+    :param prefix: same as parameter 'prefix' for mkstemp
+
+    For example: it can be used in database tests for creating
+    configuration files.
+    """
+    if path:
+        ensure_tree(path)
+
+    (fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix)
+    try:
+        os.write(fd, content)
+    finally:
+        os.close(fd)
+    return path
diff --git a/keystone/openstack/common/fixture/config.py b/keystone/openstack/common/fixture/config.py
index 7b044ef749..0bf90ff7a0 100644
--- a/keystone/openstack/common/fixture/config.py
+++ b/keystone/openstack/common/fixture/config.py
@@ -1,4 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
 #
 # Copyright 2013 Mirantis, Inc.
 # Copyright 2013 OpenStack Foundation
@@ -30,7 +29,7 @@ class Config(fixtures.Fixture):
     the specified configuration option group.
 
     All overrides are automatically cleared at the end of the current
-    test by the reset() method, which is registred by addCleanup().
+    test by the reset() method, which is registered by addCleanup().
     """
 
     def __init__(self, conf=cfg.CONF):
diff --git a/keystone/openstack/common/fixture/lockutils.py b/keystone/openstack/common/fixture/lockutils.py
index 3a194e75a6..3e18bbf3ff 100644
--- a/keystone/openstack/common/fixture/lockutils.py
+++ b/keystone/openstack/common/fixture/lockutils.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
diff --git a/keystone/openstack/common/fixture/mockpatch.py b/keystone/openstack/common/fixture/mockpatch.py
index cd0d6ca6b5..858e77cd06 100644
--- a/keystone/openstack/common/fixture/mockpatch.py
+++ b/keystone/openstack/common/fixture/mockpatch.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # Copyright 2013 Hewlett-Packard Development Company, L.P.
diff --git a/keystone/openstack/common/fixture/moxstubout.py b/keystone/openstack/common/fixture/moxstubout.py
index a0e74fd11e..e8c031f08a 100644
--- a/keystone/openstack/common/fixture/moxstubout.py
+++ b/keystone/openstack/common/fixture/moxstubout.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # Copyright 2013 Hewlett-Packard Development Company, L.P.
diff --git a/keystone/openstack/common/gettextutils.py b/keystone/openstack/common/gettextutils.py
index 7c07d70708..b248d9115f 100644
--- a/keystone/openstack/common/gettextutils.py
+++ b/keystone/openstack/common/gettextutils.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2012 Red Hat, Inc.
 # Copyright 2013 IBM Corp.
 # All Rights Reserved.
@@ -317,7 +315,7 @@ def get_available_languages(domain):
     # NOTE(luisg): Babel <1.0 used a function called list(), which was
     # renamed to locale_identifiers() in >=1.0, the requirements master list
     # requires >=0.9.6, uncapped, so defensively work with both. We can remove
-    # this check when the master list updates to >=1.0, and all projects udpate
+    # this check when the master list updates to >=1.0, and update all projects
     list_identifiers = (getattr(localedata, 'list', None) or
                         getattr(localedata, 'locale_identifiers'))
     locale_identifiers = list_identifiers()
@@ -329,13 +327,21 @@ def get_available_languages(domain):
 
 
 def get_localized_message(message, user_locale):
-    """Gets a localized version of the given message in the given locale."""
+    """Gets a localized version of the given message in the given locale.
+
+    If the message is not a Message object the message is returned as-is.
+    If the locale is None the message is translated to the default locale.
+
+    :returns: the translated message in unicode, or the original message if
+              it could not be translated
+    """
+    translated = message
     if isinstance(message, Message):
-        if user_locale:
-            message.locale = user_locale
-        return six.text_type(message)
-    else:
-        return message
+        original_locale = message.locale
+        message.locale = user_locale
+        translated = six.text_type(message)
+        message.locale = original_locale
+    return translated
 
 
 class LocaleHandler(logging.Handler):
diff --git a/keystone/openstack/common/importutils.py b/keystone/openstack/common/importutils.py
index 7a303f93f2..4fd9ae2bc2 100644
--- a/keystone/openstack/common/importutils.py
+++ b/keystone/openstack/common/importutils.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
diff --git a/keystone/openstack/common/jsonutils.py b/keystone/openstack/common/jsonutils.py
index ecea09bb6d..67578e686d 100644
--- a/keystone/openstack/common/jsonutils.py
+++ b/keystone/openstack/common/jsonutils.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # Copyright 2011 Justin Santa Barbara
@@ -38,14 +36,19 @@ import functools
 import inspect
 import itertools
 import json
-import types
-import xmlrpclib
+try:
+    import xmlrpclib
+except ImportError:
+    # NOTE(jd): xmlrpclib is not shipped with Python 3
+    xmlrpclib = None
 
-import netaddr
 import six
 
+from keystone.openstack.common import gettextutils
+from keystone.openstack.common import importutils
 from keystone.openstack.common import timeutils
 
+netaddr = importutils.try_import("netaddr")
 
 _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
                      inspect.isfunction, inspect.isgeneratorfunction,
@@ -53,7 +56,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
                      inspect.iscode, inspect.isbuiltin, inspect.isroutine,
                      inspect.isabstract]
 
-_simple_types = (types.NoneType, int, basestring, bool, float, long)
+_simple_types = (six.string_types + six.integer_types
+                 + (type(None), bool, float))
 
 
 def to_primitive(value, convert_instances=False, convert_datetime=True,
@@ -125,11 +129,13 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
         # It's not clear why xmlrpclib created their own DateTime type, but
         # for our purposes, make it a datetime type which is explicitly
         # handled
-        if isinstance(value, xmlrpclib.DateTime):
+        if xmlrpclib and isinstance(value, xmlrpclib.DateTime):
             value = datetime.datetime(*tuple(value.timetuple())[:6])
 
         if convert_datetime and isinstance(value, datetime.datetime):
             return timeutils.strtime(value)
+        elif isinstance(value, gettextutils.Message):
+            return value.data
         elif hasattr(value, 'iteritems'):
             return recursive(dict(value.iteritems()), level=level + 1)
         elif hasattr(value, '__iter__'):
@@ -138,7 +144,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
             # Likely an instance of something. Watch for cycles.
             # Ignore class member vars.
             return recursive(value.__dict__, level=level + 1)
-        elif isinstance(value, netaddr.IPAddress):
+        elif netaddr and isinstance(value, netaddr.IPAddress):
             return six.text_type(value)
         else:
             if any(test(value) for test in _nasty_type_tests):
diff --git a/keystone/openstack/common/local.py b/keystone/openstack/common/local.py
index e82f17d0f3..0819d5b97c 100644
--- a/keystone/openstack/common/local.py
+++ b/keystone/openstack/common/local.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
diff --git a/keystone/openstack/common/lockutils.py b/keystone/openstack/common/lockutils.py
index c8f4e8bbde..ef50b1cf50 100644
--- a/keystone/openstack/common/lockutils.py
+++ b/keystone/openstack/common/lockutils.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
@@ -20,6 +18,10 @@ import contextlib
 import errno
 import functools
 import os
+import shutil
+import subprocess
+import sys
+import tempfile
 import threading
 import time
 import weakref
@@ -39,6 +41,7 @@ util_opts = [
     cfg.BoolOpt('disable_process_locking', default=False,
                 help='Whether to disable inter-process locks'),
     cfg.StrOpt('lock_path',
+               default=os.environ.get("KEYSTONE_LOCK_PATH"),
                help=('Directory to use for lock files.'))
 ]
 
@@ -131,6 +134,7 @@ else:
     InterProcessLock = _PosixLock
 
 _semaphores = weakref.WeakValueDictionary()
+_semaphores_lock = threading.Lock()
 
 
 @contextlib.contextmanager
@@ -153,15 +157,12 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None):
     special location for external lock files to live. If nothing is set, then
     CONF.lock_path is used as a default.
     """
-    # NOTE(soren): If we ever go natively threaded, this will be racy.
-    #              See http://stackoverflow.com/questions/5390569/dyn
-    #              amically-allocating-and-destroying-mutexes
-    sem = _semaphores.get(name, threading.Semaphore())
-    if name not in _semaphores:
-        # this check is not racy - we're already holding ref locally
-        # so GC won't remove the item and there was no IO switch
-        # (only valid in greenthreads)
-        _semaphores[name] = sem
+    with _semaphores_lock:
+        try:
+            sem = _semaphores[name]
+        except KeyError:
+            sem = threading.Semaphore()
+            _semaphores[name] = sem
 
     with sem:
         LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
@@ -241,13 +242,14 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
     def wrap(f):
         @functools.wraps(f)
         def inner(*args, **kwargs):
-            with lock(name, lock_file_prefix, external, lock_path):
-                LOG.debug(_('Got semaphore / lock "%(function)s"'),
+            try:
+                with lock(name, lock_file_prefix, external, lock_path):
+                    LOG.debug(_('Got semaphore / lock "%(function)s"'),
+                              {'function': f.__name__})
+                    return f(*args, **kwargs)
+            finally:
+                LOG.debug(_('Semaphore / lock released "%(function)s"'),
                           {'function': f.__name__})
-                return f(*args, **kwargs)
-
-            LOG.debug(_('Semaphore / lock released "%(function)s"'),
-                      {'function': f.__name__})
         return inner
     return wrap
 
@@ -275,3 +277,27 @@ def synchronized_with_prefix(lock_file_prefix):
     """
 
     return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
+
+
+def main(argv):
+    """Create a dir for locks and pass it to command from arguments
+
+    If you run this:
+    python -m openstack.common.lockutils python setup.py testr <etc>
+
+    a temporary directory will be created for all your locks and passed to all
+    your tests in an environment variable. The temporary dir will be deleted
+    afterwards and the return value will be preserved.
+    """
+
+    lock_dir = tempfile.mkdtemp()
+    os.environ["KEYSTONE_LOCK_PATH"] = lock_dir
+    try:
+        ret_val = subprocess.call(argv[1:])
+    finally:
+        shutil.rmtree(lock_dir, ignore_errors=True)
+    return ret_val
+
+
+if __name__ == '__main__':
+    sys.exit(main(sys.argv))
diff --git a/keystone/openstack/common/log.py b/keystone/openstack/common/log.py
index 8f3f4bd338..cf12133910 100644
--- a/keystone/openstack/common/log.py
+++ b/keystone/openstack/common/log.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
@@ -51,7 +49,7 @@ from keystone.openstack.common import local
 
 _DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
 
-_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password']
+_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password']
 
 # NOTE(ldbragst): Let's build a list of regex objects using the list of
 # _SANITIZE_KEYS we already have. This way, we only have to add the new key
@@ -132,7 +130,7 @@ generic_log_opts = [
 log_opts = [
     cfg.StrOpt('logging_context_format_string',
                default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
-                       '%(name)s [%(request_id)s %(user)s %(tenant)s] '
+                       '%(name)s [%(request_id)s %(user_identity)s] '
                        '%(instance)s%(message)s',
                help='format string to use for log messages with context'),
     cfg.StrOpt('logging_default_format_string',
@@ -155,6 +153,7 @@ log_opts = [
                     'qpid=WARN',
                     'sqlalchemy=WARN',
                     'suds=INFO',
+                    'iso8601=WARN',
                 ],
                 help='list of logger=LEVEL pairs'),
     cfg.BoolOpt('publish_errors',
@@ -333,10 +332,12 @@ class ContextAdapter(BaseLoggerAdapter):
         elif instance_uuid:
             instance_extra = (CONF.instance_uuid_format
                               % {'uuid': instance_uuid})
-        extra.update({'instance': instance_extra})
+        extra['instance'] = instance_extra
 
-        extra.update({"project": self.project})
-        extra.update({"version": self.version})
+        extra.setdefault('user_identity', kwargs.pop('user_identity', None))
+
+        extra['project'] = self.project
+        extra['version'] = self.version
         extra['extra'] = extra.copy()
         return msg, kwargs
 
@@ -350,7 +351,7 @@ class JSONFormatter(logging.Formatter):
     def formatException(self, ei, strip_newlines=True):
         lines = traceback.format_exception(*ei)
         if strip_newlines:
-            lines = [itertools.ifilter(
+            lines = [moves.filter(
                 lambda x: x,
                 line.rstrip().splitlines()) for line in lines]
             lines = list(itertools.chain(*lines))
@@ -388,10 +389,10 @@ class JSONFormatter(logging.Formatter):
 
 
 def _create_logging_excepthook(product_name):
-    def logging_excepthook(type, value, tb):
+    def logging_excepthook(exc_type, value, tb):
         extra = {}
         if CONF.verbose:
-            extra['exc_info'] = (type, value, tb)
+            extra['exc_info'] = (exc_type, value, tb)
         getLogger(product_name).critical(str(value), **extra)
     return logging_excepthook
 
@@ -476,7 +477,7 @@ def _setup_logging_from_conf():
         streamlog = ColorHandler()
         log_root.addHandler(streamlog)
 
-    elif not CONF.log_file:
+    elif not logpath:
         # pass sys.stdout as a positional argument
         # python2.6 calls the argument strm, in 2.7 it's stream
         streamlog = logging.StreamHandler(sys.stdout)
diff --git a/keystone/openstack/common/log_handler.py b/keystone/openstack/common/log_handler.py
index a5f7742e9a..ea25d44a8c 100644
--- a/keystone/openstack/common/log_handler.py
+++ b/keystone/openstack/common/log_handler.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2013 IBM Corp.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
diff --git a/keystone/openstack/common/loopingcall.py b/keystone/openstack/common/loopingcall.py
index 0801db0953..29bd427d84 100644
--- a/keystone/openstack/common/loopingcall.py
+++ b/keystone/openstack/common/loopingcall.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # Copyright 2011 Justin Santa Barbara
diff --git a/keystone/openstack/common/network_utils.py b/keystone/openstack/common/network_utils.py
index dbed1ceb44..fdf12d7bdb 100644
--- a/keystone/openstack/common/network_utils.py
+++ b/keystone/openstack/common/network_utils.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2012 OpenStack Foundation.
 # All Rights Reserved.
 #
@@ -19,7 +17,7 @@
 Network-related utilities and helper functions.
 """
 
-import urlparse
+from keystone.openstack.common.py3kcompat import urlutils
 
 
 def parse_host_port(address, default_port=None):
@@ -72,10 +70,10 @@ def urlsplit(url, scheme='', allow_fragments=True):
 
     The parameters are the same as urlparse.urlsplit.
     """
-    scheme, netloc, path, query, fragment = urlparse.urlsplit(
+    scheme, netloc, path, query, fragment = urlutils.urlsplit(
         url, scheme, allow_fragments)
     if allow_fragments and '#' in path:
         path, fragment = path.split('#', 1)
     if '?' in path:
         path, query = path.split('?', 1)
-    return urlparse.SplitResult(scheme, netloc, path, query, fragment)
+    return urlutils.SplitResult(scheme, netloc, path, query, fragment)
diff --git a/keystone/openstack/common/notifier/rpc_notifier.py b/keystone/openstack/common/notifier/rpc_notifier.py
index dad3bef548..30b89f21c9 100644
--- a/keystone/openstack/common/notifier/rpc_notifier.py
+++ b/keystone/openstack/common/notifier/rpc_notifier.py
@@ -43,4 +43,5 @@ def notify(context, message):
             rpc.notify(context, topic, message)
         except Exception:
             LOG.exception(_("Could not send notification to %(topic)s. "
-                            "Payload=%(message)s"), locals())
+                            "Payload=%(message)s"),
+                          {"topic": topic, "message": message})
diff --git a/keystone/openstack/common/notifier/rpc_notifier2.py b/keystone/openstack/common/notifier/rpc_notifier2.py
index 7b77bf12c8..3d11644d72 100644
--- a/keystone/openstack/common/notifier/rpc_notifier2.py
+++ b/keystone/openstack/common/notifier/rpc_notifier2.py
@@ -49,4 +49,5 @@ def notify(context, message):
             rpc.notify(context, topic, message, envelope=True)
         except Exception:
             LOG.exception(_("Could not send notification to %(topic)s. "
-                            "Payload=%(message)s"), locals())
+                            "Payload=%(message)s"),
+                          {"topic": topic, "message": message})
diff --git a/keystone/openstack/common/policy.py b/keystone/openstack/common/policy.py
index a96a3d6f5c..16b848d042 100644
--- a/keystone/openstack/common/policy.py
+++ b/keystone/openstack/common/policy.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright (c) 2012 OpenStack Foundation.
 # All Rights Reserved.
 #
@@ -221,7 +219,7 @@ class Enforcer(object):
         if policy_file:
             return policy_file
 
-        raise cfg.ConfigFilesNotFoundError((CONF.policy_file,))
+        raise cfg.ConfigFilesNotFoundError((self.policy_file,))
 
     def enforce(self, rule, target, creds, do_raise=False,
                 exc=None, *args, **kwargs):
@@ -279,11 +277,10 @@ class Enforcer(object):
         return result
 
 
+@six.add_metaclass(abc.ABCMeta)
 class BaseCheck(object):
     """Abstract base class for Check classes."""
 
-    __metaclass__ = abc.ABCMeta
-
     @abc.abstractmethod
     def __str__(self):
         """String representation of the Check tree rooted at this node."""
@@ -506,7 +503,7 @@ def _parse_list_rule(rule):
             continue
 
         # Handle bare strings
-        if isinstance(inner_rule, basestring):
+        if isinstance(inner_rule, six.string_types):
             inner_rule = [inner_rule]
 
         # Parse the inner rules into Check objects
@@ -626,6 +623,7 @@ def reducer(*tokens):
     return decorator
 
 
+@six.add_metaclass(ParseStateMeta)
 class ParseState(object):
     """Implement the core of parsing the policy language.
 
@@ -638,8 +636,6 @@ class ParseState(object):
     shouldn't be that big a problem.
     """
 
-    __metaclass__ = ParseStateMeta
-
     def __init__(self):
         """Initialize the ParseState."""
 
@@ -765,7 +761,7 @@ def parse_rule(rule):
     """Parses a policy rule into a tree of Check objects."""
 
     # If the rule is a string, it's in the policy language
-    if isinstance(rule, basestring):
+    if isinstance(rule, six.string_types):
         return _parse_text_rule(rule)
     return _parse_list_rule(rule)
 
diff --git a/keystone/openstack/common/processutils.py b/keystone/openstack/common/processutils.py
new file mode 100644
index 0000000000..2fc36f0204
--- /dev/null
+++ b/keystone/openstack/common/processutils.py
@@ -0,0 +1,248 @@
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+System-level utilities and helper functions.
+"""
+
+import logging as stdlib_logging
+import os
+import random
+import shlex
+import signal
+
+from eventlet.green import subprocess
+from eventlet import greenthread
+
+from keystone.openstack.common.gettextutils import _  # noqa
+from keystone.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class InvalidArgumentError(Exception):
+    def __init__(self, message=None):
+        super(InvalidArgumentError, self).__init__(message)
+
+
+class UnknownArgumentError(Exception):
+    def __init__(self, message=None):
+        super(UnknownArgumentError, self).__init__(message)
+
+
+class ProcessExecutionError(Exception):
+    def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
+                 description=None):
+        self.exit_code = exit_code
+        self.stderr = stderr
+        self.stdout = stdout
+        self.cmd = cmd
+        self.description = description
+
+        if description is None:
+            description = "Unexpected error while running command."
+        if exit_code is None:
+            exit_code = '-'
+        message = ("%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r"
+                   % (description, cmd, exit_code, stdout, stderr))
+        super(ProcessExecutionError, self).__init__(message)
+
+
+class NoRootWrapSpecified(Exception):
+    def __init__(self, message=None):
+        super(NoRootWrapSpecified, self).__init__(message)
+
+
+def _subprocess_setup():
+    # Python installs a SIGPIPE handler by default. This is usually not what
+    # non-Python subprocesses expect.
+    signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+
+
+def execute(*cmd, **kwargs):
+    """Helper method to shell out and execute a command through subprocess.
+
+    Allows optional retry.
+
+    :param cmd:             Passed to subprocess.Popen.
+    :type cmd:              string
+    :param process_input:   Send to opened process.
+    :type proces_input:     string
+    :param check_exit_code: Single bool, int, or list of allowed exit
+                            codes.  Defaults to [0].  Raise
+                            :class:`ProcessExecutionError` unless
+                            program exits with one of these code.
+    :type check_exit_code:  boolean, int, or [int]
+    :param delay_on_retry:  True | False. Defaults to True. If set to True,
+                            wait a short amount of time before retrying.
+    :type delay_on_retry:   boolean
+    :param attempts:        How many times to retry cmd.
+    :type attempts:         int
+    :param run_as_root:     True | False. Defaults to False. If set to True,
+                            the command is prefixed by the command specified
+                            in the root_helper kwarg.
+    :type run_as_root:      boolean
+    :param root_helper:     command to prefix to commands called with
+                            run_as_root=True
+    :type root_helper:      string
+    :param shell:           whether or not there should be a shell used to
+                            execute this command. Defaults to false.
+    :type shell:            boolean
+    :param loglevel:        log level for execute commands.
+    :type loglevel:         int.  (Should be stdlib_logging.DEBUG or
+                            stdlib_logging.INFO)
+    :returns:               (stdout, stderr) from process execution
+    :raises:                :class:`UnknownArgumentError` on
+                            receiving unknown arguments
+    :raises:                :class:`ProcessExecutionError`
+    """
+
+    process_input = kwargs.pop('process_input', None)
+    check_exit_code = kwargs.pop('check_exit_code', [0])
+    ignore_exit_code = False
+    delay_on_retry = kwargs.pop('delay_on_retry', True)
+    attempts = kwargs.pop('attempts', 1)
+    run_as_root = kwargs.pop('run_as_root', False)
+    root_helper = kwargs.pop('root_helper', '')
+    shell = kwargs.pop('shell', False)
+    loglevel = kwargs.pop('loglevel', stdlib_logging.DEBUG)
+
+    if isinstance(check_exit_code, bool):
+        ignore_exit_code = not check_exit_code
+        check_exit_code = [0]
+    elif isinstance(check_exit_code, int):
+        check_exit_code = [check_exit_code]
+
+    if kwargs:
+        raise UnknownArgumentError(_('Got unknown keyword args '
+                                     'to utils.execute: %r') % kwargs)
+
+    if run_as_root and hasattr(os, 'geteuid') and os.geteuid() != 0:
+        if not root_helper:
+            raise NoRootWrapSpecified(
+                message=('Command requested root, but did not specify a root '
+                         'helper.'))
+        cmd = shlex.split(root_helper) + list(cmd)
+
+    cmd = map(str, cmd)
+
+    while attempts > 0:
+        attempts -= 1
+        try:
+            LOG.log(loglevel, _('Running cmd (subprocess): %s'), ' '.join(cmd))
+            _PIPE = subprocess.PIPE  # pylint: disable=E1101
+
+            if os.name == 'nt':
+                preexec_fn = None
+                close_fds = False
+            else:
+                preexec_fn = _subprocess_setup
+                close_fds = True
+
+            obj = subprocess.Popen(cmd,
+                                   stdin=_PIPE,
+                                   stdout=_PIPE,
+                                   stderr=_PIPE,
+                                   close_fds=close_fds,
+                                   preexec_fn=preexec_fn,
+                                   shell=shell)
+            result = None
+            if process_input is not None:
+                result = obj.communicate(process_input)
+            else:
+                result = obj.communicate()
+            obj.stdin.close()  # pylint: disable=E1101
+            _returncode = obj.returncode  # pylint: disable=E1101
+            LOG.log(loglevel, _('Result was %s') % _returncode)
+            if not ignore_exit_code and _returncode not in check_exit_code:
+                (stdout, stderr) = result
+                raise ProcessExecutionError(exit_code=_returncode,
+                                            stdout=stdout,
+                                            stderr=stderr,
+                                            cmd=' '.join(cmd))
+            return result
+        except ProcessExecutionError:
+            if not attempts:
+                raise
+            else:
+                LOG.log(loglevel, _('%r failed. Retrying.'), cmd)
+                if delay_on_retry:
+                    greenthread.sleep(random.randint(20, 200) / 100.0)
+        finally:
+            # NOTE(termie): this appears to be necessary to let the subprocess
+            #               call clean something up in between calls, without
+            #               it two execute calls in a row hangs the second one
+            greenthread.sleep(0)
+
+
+def trycmd(*args, **kwargs):
+    """A wrapper around execute() to more easily handle warnings and errors.
+
+    Returns an (out, err) tuple of strings containing the output of
+    the command's stdout and stderr.  If 'err' is not empty then the
+    command can be considered to have failed.
+
+    :discard_warnings   True | False. Defaults to False. If set to True,
+                        then for succeeding commands, stderr is cleared
+
+    """
+    discard_warnings = kwargs.pop('discard_warnings', False)
+
+    try:
+        out, err = execute(*args, **kwargs)
+        failed = False
+    except ProcessExecutionError as exn:
+        out, err = '', str(exn)
+        failed = True
+
+    if not failed and discard_warnings and err:
+        # Handle commands that output to stderr but otherwise succeed
+        err = ''
+
+    return out, err
+
+
+def ssh_execute(ssh, cmd, process_input=None,
+                addl_env=None, check_exit_code=True):
+    LOG.debug(_('Running cmd (SSH): %s'), cmd)
+    if addl_env:
+        raise InvalidArgumentError(_('Environment not supported over SSH'))
+
+    if process_input:
+        # This is (probably) fixable if we need it...
+        raise InvalidArgumentError(_('process_input not supported over SSH'))
+
+    stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd)
+    channel = stdout_stream.channel
+
+    # NOTE(justinsb): This seems suspicious...
+    # ...other SSH clients have buffering issues with this approach
+    stdout = stdout_stream.read()
+    stderr = stderr_stream.read()
+    stdin_stream.close()
+
+    exit_status = channel.recv_exit_status()
+
+    # exit_status == -1 if no exit code was returned
+    if exit_status != -1:
+        LOG.debug(_('Result was %s') % exit_status)
+        if check_exit_code and exit_status != 0:
+            raise ProcessExecutionError(exit_code=exit_status,
+                                        stdout=stdout,
+                                        stderr=stderr,
+                                        cmd=cmd)
+
+    return (stdout, stderr)
diff --git a/keystone/openstack/common/py3kcompat/__init__.py b/keystone/openstack/common/py3kcompat/__init__.py
new file mode 100644
index 0000000000..97ae4e34a7
--- /dev/null
+++ b/keystone/openstack/common/py3kcompat/__init__.py
@@ -0,0 +1,16 @@
+#
+# Copyright 2013 Canonical Ltd.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
diff --git a/keystone/openstack/common/py3kcompat/urlutils.py b/keystone/openstack/common/py3kcompat/urlutils.py
new file mode 100644
index 0000000000..51e18111a2
--- /dev/null
+++ b/keystone/openstack/common/py3kcompat/urlutils.py
@@ -0,0 +1,63 @@
+#
+# Copyright 2013 Canonical Ltd.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+"""
+Python2/Python3 compatibility layer for OpenStack
+"""
+
+import six
+
+if six.PY3:
+    # python3
+    import urllib.error
+    import urllib.parse
+    import urllib.request
+
+    urlencode = urllib.parse.urlencode
+    urljoin = urllib.parse.urljoin
+    quote = urllib.parse.quote
+    parse_qsl = urllib.parse.parse_qsl
+    unquote = urllib.parse.unquote
+    urlparse = urllib.parse.urlparse
+    urlsplit = urllib.parse.urlsplit
+    urlunsplit = urllib.parse.urlunsplit
+    SplitResult = urllib.parse.SplitResult
+
+    urlopen = urllib.request.urlopen
+    URLError = urllib.error.URLError
+    pathname2url = urllib.request.pathname2url
+else:
+    # python2
+    import urllib
+    import urllib2
+    import urlparse
+
+    urlencode = urllib.urlencode
+    quote = urllib.quote
+    unquote = urllib.unquote
+
+    parse = urlparse
+    parse_qsl = parse.parse_qsl
+    urljoin = parse.urljoin
+    urlparse = parse.urlparse
+    urlsplit = parse.urlsplit
+    urlunsplit = parse.urlunsplit
+    SplitResult = parse.SplitResult
+
+    urlopen = urllib2.urlopen
+    URLError = urllib2.URLError
+    pathname2url = urllib.pathname2url
diff --git a/keystone/openstack/common/rpc/__init__.py b/keystone/openstack/common/rpc/__init__.py
index 248a7458c6..0f36870827 100644
--- a/keystone/openstack/common/rpc/__init__.py
+++ b/keystone/openstack/common/rpc/__init__.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # All Rights Reserved.
@@ -56,13 +54,12 @@ rpc_opts = [
                help='Seconds to wait before a cast expires (TTL). '
                     'Only supported by impl_zmq.'),
     cfg.ListOpt('allowed_rpc_exception_modules',
-                default=['keystone.openstack.common.exception',
-                         'nova.exception',
+                default=['nova.exception',
                          'cinder.exception',
                          'exceptions',
                          ],
                 help='Modules of exceptions that are permitted to be recreated'
-                     'upon receiving exception data from an rpc call.'),
+                     ' upon receiving exception data from an rpc call.'),
     cfg.BoolOpt('fake_rabbit',
                 default=False,
                 help='If passed, use a fake RabbitMQ provider'),
@@ -228,7 +225,7 @@ def notify(context, topic, msg, envelope=False):
 
 
 def cleanup():
-    """Clean up resoruces in use by implementation.
+    """Clean up resources in use by implementation.
 
     Clean up any resources that have been allocated by the RPC implementation.
     This is typically open connections to a messaging service.  This function
diff --git a/keystone/openstack/common/rpc/amqp.py b/keystone/openstack/common/rpc/amqp.py
index 3bcedbdbf5..c8f23e2309 100644
--- a/keystone/openstack/common/rpc/amqp.py
+++ b/keystone/openstack/common/rpc/amqp.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # All Rights Reserved.
@@ -20,9 +18,9 @@
 """
 Shared code between AMQP based openstack.common.rpc implementations.
 
-The code in this module is shared between the rpc implemenations based on AMQP.
-Specifically, this includes impl_kombu and impl_qpid.  impl_carrot also uses
-AMQP, but is deprecated and predates this code.
+The code in this module is shared between the rpc implementations based on
+AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also
+uses AMQP, but is deprecated and predates this code.
 """
 
 import collections
@@ -189,7 +187,7 @@ class ReplyProxy(ConnectionContext):
     def __init__(self, conf, connection_pool):
         self._call_waiters = {}
         self._num_call_waiters = 0
-        self._num_call_waiters_wrn_threshhold = 10
+        self._num_call_waiters_wrn_threshold = 10
         self._reply_q = 'reply_' + uuid.uuid4().hex
         super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
         self.declare_direct_consumer(self._reply_q, self._process_data)
@@ -208,11 +206,11 @@ class ReplyProxy(ConnectionContext):
 
     def add_call_waiter(self, waiter, msg_id):
         self._num_call_waiters += 1
-        if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
+        if self._num_call_waiters > self._num_call_waiters_wrn_threshold:
             LOG.warn(_('Number of call waiters is greater than warning '
-                       'threshhold: %d. There could be a MulticallProxyWaiter '
-                       'leak.') % self._num_call_waiters_wrn_threshhold)
-            self._num_call_waiters_wrn_threshhold *= 2
+                       'threshold: %d. There could be a MulticallProxyWaiter '
+                       'leak.') % self._num_call_waiters_wrn_threshold)
+            self._num_call_waiters_wrn_threshold *= 2
         self._call_waiters[msg_id] = waiter
 
     def del_call_waiter(self, msg_id):
@@ -241,7 +239,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
         _add_unique_id(msg)
         # If a reply_q exists, add the msg_id to the reply and pass the
         # reply_q to direct_send() to use it as the response queue.
-        # Otherwise use the msg_id for backward compatibilty.
+        # Otherwise use the msg_id for backward compatibility.
         if reply_q:
             msg['_msg_id'] = msg_id
             conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
@@ -364,22 +362,43 @@ class CallbackWrapper(_ThreadPoolWithWait):
     Allows it to be invoked in a green thread.
     """
 
-    def __init__(self, conf, callback, connection_pool):
+    def __init__(self, conf, callback, connection_pool,
+                 wait_for_consumers=False):
         """Initiates CallbackWrapper object.
 
         :param conf: cfg.CONF instance
         :param callback: a callable (probably a function)
         :param connection_pool: connection pool as returned by
                                 get_connection_pool()
+        :param wait_for_consumers: wait for all green threads to
+                                   complete and raise the last
+                                   caught exception, if any.
+
         """
         super(CallbackWrapper, self).__init__(
             conf=conf,
             connection_pool=connection_pool,
         )
         self.callback = callback
+        self.wait_for_consumers = wait_for_consumers
+        self.exc_info = None
+
+    def _wrap(self, message_data, **kwargs):
+        """Wrap the callback invocation to catch exceptions.
+        """
+        try:
+            self.callback(message_data, **kwargs)
+        except Exception:
+            self.exc_info = sys.exc_info()
 
     def __call__(self, message_data):
-        self.pool.spawn_n(self.callback, message_data)
+        self.exc_info = None
+        self.pool.spawn_n(self._wrap, message_data)
+
+        if self.wait_for_consumers:
+            self.pool.waitall()
+            if self.exc_info:
+                raise self.exc_info[1], None, self.exc_info[2]
 
 
 class ProxyCallback(_ThreadPoolWithWait):
diff --git a/keystone/openstack/common/rpc/common.py b/keystone/openstack/common/rpc/common.py
index 3696f0fb8d..42ab16ad48 100644
--- a/keystone/openstack/common/rpc/common.py
+++ b/keystone/openstack/common/rpc/common.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # All Rights Reserved.
@@ -29,6 +27,7 @@ from keystone.openstack.common import importutils
 from keystone.openstack.common import jsonutils
 from keystone.openstack.common import local
 from keystone.openstack.common import log as logging
+from keystone.openstack.common import versionutils
 
 
 CONF = cfg.CONF
@@ -265,7 +264,7 @@ def _safe_log(log_func, msg, msg_data):
 
     def _fix_passwords(d):
         """Sanitizes the password fields in the dictionary."""
-        for k in d.iterkeys():
+        for k in six.iterkeys(d):
             if k.lower().find('password') != -1:
                 d[k] = '<SANITIZED>'
             elif k.lower() in SANITIZE:
@@ -441,19 +440,15 @@ def client_exceptions(*exceptions):
     return outer
 
 
+# TODO(sirp): we should deprecate this in favor of
+# using `versionutils.is_compatible` directly
 def version_is_compatible(imp_version, version):
     """Determine whether versions are compatible.
 
     :param imp_version: The version implemented
     :param version: The version requested by an incoming message.
     """
-    version_parts = version.split('.')
-    imp_version_parts = imp_version.split('.')
-    if int(version_parts[0]) != int(imp_version_parts[0]):  # Major
-        return False
-    if int(version_parts[1]) > int(imp_version_parts[1]):  # Minor
-        return False
-    return True
+    return versionutils.is_compatible(version, imp_version)
 
 
 def serialize_msg(raw_msg):
diff --git a/keystone/openstack/common/rpc/dispatcher.py b/keystone/openstack/common/rpc/dispatcher.py
index d2fd5dc52f..51f0e19b68 100644
--- a/keystone/openstack/common/rpc/dispatcher.py
+++ b/keystone/openstack/common/rpc/dispatcher.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2012 Red Hat, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
diff --git a/keystone/openstack/common/rpc/impl_fake.py b/keystone/openstack/common/rpc/impl_fake.py
index 9479ab4d62..d504689776 100644
--- a/keystone/openstack/common/rpc/impl_fake.py
+++ b/keystone/openstack/common/rpc/impl_fake.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011 OpenStack Foundation
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -146,7 +144,7 @@ def multicall(conf, context, topic, msg, timeout=None):
     try:
         consumer = CONSUMERS[topic][0]
     except (KeyError, IndexError):
-        return iter([None])
+        raise rpc_common.Timeout("No consumers available")
     else:
         return consumer.call(context, version, method, namespace, args,
                              timeout)
diff --git a/keystone/openstack/common/rpc/impl_kombu.py b/keystone/openstack/common/rpc/impl_kombu.py
index 0e641db0d6..9cd2e2ae5f 100644
--- a/keystone/openstack/common/rpc/impl_kombu.py
+++ b/keystone/openstack/common/rpc/impl_kombu.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011 OpenStack Foundation
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -28,6 +26,7 @@ import kombu.connection
 import kombu.entity
 import kombu.messaging
 from oslo.config import cfg
+import six
 
 from keystone.openstack.common import excutils
 from keystone.openstack.common.gettextutils import _  # noqa
@@ -146,29 +145,23 @@ class ConsumerBase(object):
         Messages that are processed without exception are ack'ed.
 
         If the message processing generates an exception, it will be
-        ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
-        Rejection is better than waiting for the message to timeout.
-        Rejected messages are immediately requeued.
+        ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
         """
 
-        ack_msg = False
         try:
             msg = rpc_common.deserialize_msg(message.payload)
             callback(msg)
-            ack_msg = True
         except Exception:
             if self.ack_on_error:
-                ack_msg = True
                 LOG.exception(_("Failed to process message"
                                 " ... skipping it."))
+                message.ack()
             else:
                 LOG.exception(_("Failed to process message"
                                 " ... will requeue."))
-        finally:
-            if ack_msg:
-                message.ack()
-            else:
-                message.reject()
+                message.requeue()
+        else:
+            message.ack()
 
     def consume(self, *args, **kwargs):
         """Actually declare the consumer on the amqp channel.  This will
@@ -631,7 +624,7 @@ class Connection(object):
 
         def _declare_consumer():
             consumer = consumer_cls(self.conf, self.channel, topic, callback,
-                                    self.consumer_num.next())
+                                    six.next(self.consumer_num))
             self.consumers.append(consumer)
             return consumer
 
@@ -738,7 +731,7 @@ class Connection(object):
         it = self.iterconsume(limit=limit)
         while True:
             try:
-                it.next()
+                six.next(it)
             except StopIteration:
                 return
 
@@ -789,6 +782,7 @@ class Connection(object):
             callback=callback,
             connection_pool=rpc_amqp.get_connection_pool(self.conf,
                                                          Connection),
+            wait_for_consumers=not ack_on_error
         )
         self.proxy_callbacks.append(callback_wrapper)
         self.declare_topic_consumer(
diff --git a/keystone/openstack/common/rpc/impl_qpid.py b/keystone/openstack/common/rpc/impl_qpid.py
index 7e67c81dc0..4bd7bda099 100644
--- a/keystone/openstack/common/rpc/impl_qpid.py
+++ b/keystone/openstack/common/rpc/impl_qpid.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011 OpenStack Foundation
 #    Copyright 2011 - 2012, Red Hat, Inc.
 #
@@ -18,11 +16,11 @@
 import functools
 import itertools
 import time
-import uuid
 
 import eventlet
 import greenlet
 from oslo.config import cfg
+import six
 
 from keystone.openstack.common import excutils
 from keystone.openstack.common.gettextutils import _  # noqa
@@ -67,6 +65,17 @@ qpid_opts = [
     cfg.BoolOpt('qpid_tcp_nodelay',
                 default=True,
                 help='Disable Nagle algorithm'),
+    # NOTE(russellb) If any additional versions are added (beyond 1 and 2),
+    # this file could probably use some additional refactoring so that the
+    # differences between each version are split into different classes.
+    cfg.IntOpt('qpid_topology_version',
+               default=1,
+               help="The qpid topology version to use.  Version 1 is what "
+                    "was originally used by impl_qpid.  Version 2 includes "
+                    "some backwards-incompatible changes that allow broker "
+                    "federation to work.  Users should update to version 2 "
+                    "when they are able to take everything down, as it "
+                    "requires a clean break."),
 ]
 
 cfg.CONF.register_opts(qpid_opts)
@@ -74,10 +83,17 @@ cfg.CONF.register_opts(qpid_opts)
 JSON_CONTENT_TYPE = 'application/json; charset=utf8'
 
 
+def raise_invalid_topology_version(conf):
+    msg = (_("Invalid value for qpid_topology_version: %d") %
+           conf.qpid_topology_version)
+    LOG.error(msg)
+    raise Exception(msg)
+
+
 class ConsumerBase(object):
     """Consumer base class."""
 
-    def __init__(self, session, callback, node_name, node_opts,
+    def __init__(self, conf, session, callback, node_name, node_opts,
                  link_name, link_opts):
         """Declare a queue on an amqp session.
 
@@ -95,26 +111,39 @@ class ConsumerBase(object):
         self.receiver = None
         self.session = None
 
-        addr_opts = {
-            "create": "always",
-            "node": {
-                "type": "topic",
-                "x-declare": {
+        if conf.qpid_topology_version == 1:
+            addr_opts = {
+                "create": "always",
+                "node": {
+                    "type": "topic",
+                    "x-declare": {
+                        "durable": True,
+                        "auto-delete": True,
+                    },
+                },
+                "link": {
                     "durable": True,
-                    "auto-delete": True,
+                    "x-declare": {
+                        "durable": False,
+                        "auto-delete": True,
+                        "exclusive": False,
+                    },
                 },
-            },
-            "link": {
-                "name": link_name,
-                "durable": True,
-                "x-declare": {
-                    "durable": False,
-                    "auto-delete": True,
-                    "exclusive": False,
+            }
+            if link_name:
+                addr_opts["link"]["name"] = link_name
+            addr_opts["node"]["x-declare"].update(node_opts)
+        elif conf.qpid_topology_version == 2:
+            addr_opts = {
+                "link": {
+                    "x-declare": {
+                        "auto-delete": True,
+                    },
                 },
-            },
-        }
-        addr_opts["node"]["x-declare"].update(node_opts)
+            }
+        else:
+            raise_invalid_topology_version()
+
         addr_opts["link"]["x-declare"].update(link_opts)
 
         self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
@@ -122,7 +151,7 @@ class ConsumerBase(object):
         self.connect(session)
 
     def connect(self, session):
-        """Declare the reciever on connect."""
+        """Declare the receiver on connect."""
         self._declare_receiver(session)
 
     def reconnect(self, session):
@@ -181,16 +210,24 @@ class DirectConsumer(ConsumerBase):
         'callback' is the callback to call when messages are received
         """
 
-        super(DirectConsumer, self).__init__(
-            session, callback,
-            "%s/%s" % (msg_id, msg_id),
-            {"type": "direct"},
-            msg_id,
-            {
-                "auto-delete": conf.amqp_auto_delete,
-                "exclusive": True,
-                "durable": conf.amqp_durable_queues,
-            })
+        link_opts = {
+            "auto-delete": conf.amqp_auto_delete,
+            "exclusive": True,
+            "durable": conf.amqp_durable_queues,
+        }
+
+        if conf.qpid_topology_version == 1:
+            node_name = "%s/%s" % (msg_id, msg_id)
+            node_opts = {"type": "direct"}
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.direct/%s" % msg_id
+            node_opts = {}
+        else:
+            raise_invalid_topology_version()
+
+        super(DirectConsumer, self).__init__(conf, session, callback,
+                                             node_name, node_opts, msg_id,
+                                             link_opts)
 
 
 class TopicConsumer(ConsumerBase):
@@ -208,14 +245,20 @@ class TopicConsumer(ConsumerBase):
         """
 
         exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
-        super(TopicConsumer, self).__init__(
-            session, callback,
-            "%s/%s" % (exchange_name, topic),
-            {}, name or topic,
-            {
-                "auto-delete": conf.amqp_auto_delete,
-                "durable": conf.amqp_durable_queues,
-            })
+        link_opts = {
+            "auto-delete": conf.amqp_auto_delete,
+            "durable": conf.amqp_durable_queues,
+        }
+
+        if conf.qpid_topology_version == 1:
+            node_name = "%s/%s" % (exchange_name, topic)
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
+        else:
+            raise_invalid_topology_version()
+
+        super(TopicConsumer, self).__init__(conf, session, callback, node_name,
+                                            {}, name or topic, link_opts)
 
 
 class FanoutConsumer(ConsumerBase):
@@ -230,52 +273,53 @@ class FanoutConsumer(ConsumerBase):
         """
         self.conf = conf
 
-        super(FanoutConsumer, self).__init__(
-            session, callback,
-            "%s_fanout" % topic,
-            {"durable": False, "type": "fanout"},
-            "%s_fanout_%s" % (topic, uuid.uuid4().hex),
-            {"exclusive": True})
+        link_opts = {"exclusive": True}
 
-    def reconnect(self, session):
-        topic = self.get_node_name().rpartition('_fanout')[0]
-        params = {
-            'session': session,
-            'topic': topic,
-            'callback': self.callback,
-        }
+        if conf.qpid_topology_version == 1:
+            node_name = "%s_fanout" % topic
+            node_opts = {"durable": False, "type": "fanout"}
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.topic/fanout/%s" % topic
+            node_opts = {}
+        else:
+            raise_invalid_topology_version()
 
-        self.__init__(conf=self.conf, **params)
-
-        super(FanoutConsumer, self).reconnect(session)
+        super(FanoutConsumer, self).__init__(conf, session, callback,
+                                             node_name, node_opts, None,
+                                             link_opts)
 
 
 class Publisher(object):
     """Base Publisher class."""
 
-    def __init__(self, session, node_name, node_opts=None):
+    def __init__(self, conf, session, node_name, node_opts=None):
         """Init the Publisher class with the exchange_name, routing_key,
         and other options
         """
         self.sender = None
         self.session = session
 
-        addr_opts = {
-            "create": "always",
-            "node": {
-                "type": "topic",
-                "x-declare": {
-                    "durable": False,
-                    # auto-delete isn't implemented for exchanges in qpid,
-                    # but put in here anyway
-                    "auto-delete": True,
+        if conf.qpid_topology_version == 1:
+            addr_opts = {
+                "create": "always",
+                "node": {
+                    "type": "topic",
+                    "x-declare": {
+                        "durable": False,
+                        # auto-delete isn't implemented for exchanges in qpid,
+                        # but put in here anyway
+                        "auto-delete": True,
+                    },
                 },
-            },
-        }
-        if node_opts:
-            addr_opts["node"]["x-declare"].update(node_opts)
+            }
+            if node_opts:
+                addr_opts["node"]["x-declare"].update(node_opts)
 
-        self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
+            self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
+        elif conf.qpid_topology_version == 2:
+            self.address = node_name
+        else:
+            raise_invalid_topology_version()
 
         self.reconnect(session)
 
@@ -319,39 +363,73 @@ class DirectPublisher(Publisher):
     """Publisher class for 'direct'."""
     def __init__(self, conf, session, msg_id):
         """Init a 'direct' publisher."""
-        super(DirectPublisher, self).__init__(session, msg_id,
-                                              {"type": "direct"})
+
+        if conf.qpid_topology_version == 1:
+            node_name = msg_id
+            node_opts = {"type": "direct"}
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.direct/%s" % msg_id
+            node_opts = {}
+        else:
+            raise_invalid_topology_version()
+
+        super(DirectPublisher, self).__init__(conf, session, node_name,
+                                              node_opts)
 
 
 class TopicPublisher(Publisher):
     """Publisher class for 'topic'."""
     def __init__(self, conf, session, topic):
-        """init a 'topic' publisher.
+        """Init a 'topic' publisher.
         """
         exchange_name = rpc_amqp.get_control_exchange(conf)
-        super(TopicPublisher, self).__init__(session,
-                                             "%s/%s" % (exchange_name, topic))
+
+        if conf.qpid_topology_version == 1:
+            node_name = "%s/%s" % (exchange_name, topic)
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
+        else:
+            raise_invalid_topology_version()
+
+        super(TopicPublisher, self).__init__(conf, session, node_name)
 
 
 class FanoutPublisher(Publisher):
     """Publisher class for 'fanout'."""
     def __init__(self, conf, session, topic):
-        """init a 'fanout' publisher.
+        """Init a 'fanout' publisher.
         """
-        super(FanoutPublisher, self).__init__(
-            session,
-            "%s_fanout" % topic, {"type": "fanout"})
+
+        if conf.qpid_topology_version == 1:
+            node_name = "%s_fanout" % topic
+            node_opts = {"type": "fanout"}
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.topic/fanout/%s" % topic
+            node_opts = {}
+        else:
+            raise_invalid_topology_version()
+
+        super(FanoutPublisher, self).__init__(conf, session, node_name,
+                                              node_opts)
 
 
 class NotifyPublisher(Publisher):
     """Publisher class for notifications."""
     def __init__(self, conf, session, topic):
-        """init a 'topic' publisher.
+        """Init a 'topic' publisher.
         """
         exchange_name = rpc_amqp.get_control_exchange(conf)
-        super(NotifyPublisher, self).__init__(session,
-                                              "%s/%s" % (exchange_name, topic),
-                                              {"durable": True})
+        node_opts = {"durable": True}
+
+        if conf.qpid_topology_version == 1:
+            node_name = "%s/%s" % (exchange_name, topic)
+        elif conf.qpid_topology_version == 2:
+            node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
+        else:
+            raise_invalid_topology_version()
+
+        super(NotifyPublisher, self).__init__(conf, session, node_name,
+                                              node_opts)
 
 
 class Connection(object):
@@ -446,7 +524,7 @@ class Connection(object):
             consumers = self.consumers
             self.consumers = {}
 
-            for consumer in consumers.itervalues():
+            for consumer in six.itervalues(consumers):
                 consumer.reconnect(self.session)
                 self._register_consumer(consumer)
 
@@ -604,7 +682,7 @@ class Connection(object):
         it = self.iterconsume(limit=limit)
         while True:
             try:
-                it.next()
+                six.next(it)
             except StopIteration:
                 return
 
@@ -665,6 +743,7 @@ class Connection(object):
             callback=callback,
             connection_pool=rpc_amqp.get_connection_pool(self.conf,
                                                          Connection),
+            wait_for_consumers=not ack_on_error
         )
         self.proxy_callbacks.append(callback_wrapper)
 
diff --git a/keystone/openstack/common/rpc/impl_zmq.py b/keystone/openstack/common/rpc/impl_zmq.py
index 1aaf8575b8..394def1c06 100644
--- a/keystone/openstack/common/rpc/impl_zmq.py
+++ b/keystone/openstack/common/rpc/impl_zmq.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011 Cloudscaling Group, Inc
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -25,6 +23,8 @@ import uuid
 import eventlet
 import greenlet
 from oslo.config import cfg
+import six
+from six import moves
 
 from keystone.openstack.common import excutils
 from keystone.openstack.common.gettextutils import _  # noqa
@@ -192,7 +192,7 @@ class ZmqSocket(object):
             # it would be much worse if some of the code calling this
             # were to fail. For now, lets log, and later evaluate
             # if we can safely raise here.
-            LOG.error("ZeroMQ socket could not be closed.")
+            LOG.error(_("ZeroMQ socket could not be closed."))
         self.sock = None
 
     def recv(self, **kwargs):
@@ -221,7 +221,7 @@ class ZmqClient(object):
             return
 
         rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
-        zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
+        zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())
         self.outq.send(map(bytes,
                        (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
 
@@ -383,6 +383,7 @@ class ZmqBaseReactor(ConsumerBase):
         LOG.info(_("In reactor registered"))
 
     def consume_in_thread(self):
+        @excutils.forever_retry_uncaught_exceptions
         def _consume(sock):
             LOG.info(_("Consuming socket"))
             while True:
@@ -522,8 +523,8 @@ def unflatten_envelope(packenv):
     h = {}
     try:
         while True:
-            k = i.next()
-            h[k] = i.next()
+            k = six.next(i)
+            h[k] = six.next(i)
     except StopIteration:
         return h
 
diff --git a/keystone/openstack/common/rpc/matchmaker.py b/keystone/openstack/common/rpc/matchmaker.py
index ff3fcbc73c..f7fa5e0cd3 100644
--- a/keystone/openstack/common/rpc/matchmaker.py
+++ b/keystone/openstack/common/rpc/matchmaker.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011 Cloudscaling Group, Inc
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
diff --git a/keystone/openstack/common/rpc/matchmaker_redis.py b/keystone/openstack/common/rpc/matchmaker_redis.py
index 20006f6846..6ed074d38d 100644
--- a/keystone/openstack/common/rpc/matchmaker_redis.py
+++ b/keystone/openstack/common/rpc/matchmaker_redis.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2013 Cloudscaling Group, Inc
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -95,7 +93,7 @@ class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
         if not redis:
             raise ImportError("Failed to import module redis.")
 
-        self.redis = redis.StrictRedis(
+        self.redis = redis.Redis(
             host=CONF.matchmaker_redis.host,
             port=CONF.matchmaker_redis.port,
             password=CONF.matchmaker_redis.password)
diff --git a/keystone/openstack/common/rpc/matchmaker_ring.py b/keystone/openstack/common/rpc/matchmaker_ring.py
index 91417f0a83..7165252c40 100644
--- a/keystone/openstack/common/rpc/matchmaker_ring.py
+++ b/keystone/openstack/common/rpc/matchmaker_ring.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011-2013 Cloudscaling Group, Inc
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
diff --git a/keystone/openstack/common/rpc/proxy.py b/keystone/openstack/common/rpc/proxy.py
index 9cc6192027..f8409a035e 100644
--- a/keystone/openstack/common/rpc/proxy.py
+++ b/keystone/openstack/common/rpc/proxy.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2012-2013 Red Hat, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -21,7 +19,6 @@ For more information about rpc API version numbers, see:
     rpc/dispatcher.py
 """
 
-
 from keystone.openstack.common import rpc
 from keystone.openstack.common.rpc import common as rpc_common
 from keystone.openstack.common.rpc import serializer as rpc_serializer
@@ -36,7 +33,7 @@ class RpcProxy(object):
     rpc API.
     """
 
-    # The default namespace, which can be overriden in a subclass.
+    # The default namespace, which can be overridden in a subclass.
     RPC_API_NAMESPACE = None
 
     def __init__(self, topic, default_version, version_cap=None,
diff --git a/keystone/openstack/common/rpc/serializer.py b/keystone/openstack/common/rpc/serializer.py
index 76c6831033..9bc6e2a3a0 100644
--- a/keystone/openstack/common/rpc/serializer.py
+++ b/keystone/openstack/common/rpc/serializer.py
@@ -16,10 +16,12 @@
 
 import abc
 
+import six
 
+
+@six.add_metaclass(abc.ABCMeta)
 class Serializer(object):
     """Generic (de-)serialization definition base class."""
-    __metaclass__ = abc.ABCMeta
 
     @abc.abstractmethod
     def serialize_entity(self, context, entity):
diff --git a/keystone/openstack/common/rpc/service.py b/keystone/openstack/common/rpc/service.py
index 34eacb6027..f3e8bec8b7 100644
--- a/keystone/openstack/common/rpc/service.py
+++ b/keystone/openstack/common/rpc/service.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # All Rights Reserved.
diff --git a/keystone/openstack/common/rpc/zmq_receiver.py b/keystone/openstack/common/rpc/zmq_receiver.py
old mode 100755
new mode 100644
index 2f095f10bb..70120bc137
--- a/keystone/openstack/common/rpc/zmq_receiver.py
+++ b/keystone/openstack/common/rpc/zmq_receiver.py
@@ -1,6 +1,3 @@
-#!/usr/bin/env python
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 #    Copyright 2011 OpenStack Foundation
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
diff --git a/keystone/openstack/common/service.py b/keystone/openstack/common/service.py
index 8418e2a8ba..09c17bce6b 100644
--- a/keystone/openstack/common/service.py
+++ b/keystone/openstack/common/service.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2010 United States Government as represented by the
 # Administrator of the National Aeronautics and Space Administration.
 # Copyright 2011 Justin Santa Barbara
@@ -20,6 +18,7 @@
 """Generic Node base class for all workers that run on hosts."""
 
 import errno
+import logging as std_logging
 import os
 import random
 import signal
@@ -28,7 +27,6 @@ import time
 
 import eventlet
 from eventlet import event
-import logging as std_logging
 from oslo.config import cfg
 
 from keystone.openstack.common import eventlet_backdoor
@@ -43,6 +41,29 @@ CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
 
+def _sighup_supported():
+    return hasattr(signal, 'SIGHUP')
+
+
+def _is_sighup(signo):
+    return _sighup_supported() and signo == signal.SIGHUP
+
+
+def _signo_to_signame(signo):
+    signals = {signal.SIGTERM: 'SIGTERM',
+               signal.SIGINT: 'SIGINT'}
+    if _sighup_supported():
+        signals[signal.SIGHUP] = 'SIGHUP'
+    return signals[signo]
+
+
+def _set_signals_handler(handler):
+    signal.signal(signal.SIGTERM, handler)
+    signal.signal(signal.SIGINT, handler)
+    if _sighup_supported():
+        signal.signal(signal.SIGHUP, handler)
+
+
 class Launcher(object):
     """Launch one or more services and wait for them to complete."""
 
@@ -100,18 +121,13 @@ class SignalExit(SystemExit):
 class ServiceLauncher(Launcher):
     def _handle_signal(self, signo, frame):
         # Allow the process to be killed again and die from natural causes
-        signal.signal(signal.SIGTERM, signal.SIG_DFL)
-        signal.signal(signal.SIGINT, signal.SIG_DFL)
-        signal.signal(signal.SIGHUP, signal.SIG_DFL)
-
+        _set_signals_handler(signal.SIG_DFL)
         raise SignalExit(signo)
 
     def handle_signal(self):
-        signal.signal(signal.SIGTERM, self._handle_signal)
-        signal.signal(signal.SIGINT, self._handle_signal)
-        signal.signal(signal.SIGHUP, self._handle_signal)
+        _set_signals_handler(self._handle_signal)
 
-    def _wait_for_exit_or_signal(self):
+    def _wait_for_exit_or_signal(self, ready_callback=None):
         status = None
         signo = 0
 
@@ -119,11 +135,11 @@ class ServiceLauncher(Launcher):
         CONF.log_opt_values(LOG, std_logging.DEBUG)
 
         try:
+            if ready_callback:
+                ready_callback()
             super(ServiceLauncher, self).wait()
         except SignalExit as exc:
-            signame = {signal.SIGTERM: 'SIGTERM',
-                       signal.SIGINT: 'SIGINT',
-                       signal.SIGHUP: 'SIGHUP'}[exc.signo]
+            signame = _signo_to_signame(exc.signo)
             LOG.info(_('Caught %s, exiting'), signame)
             status = exc.code
             signo = exc.signo
@@ -140,11 +156,11 @@ class ServiceLauncher(Launcher):
 
         return status, signo
 
-    def wait(self):
+    def wait(self, ready_callback=None):
         while True:
             self.handle_signal()
-            status, signo = self._wait_for_exit_or_signal()
-            if signo != signal.SIGHUP:
+            status, signo = self._wait_for_exit_or_signal(ready_callback)
+            if not _is_sighup(signo):
                 return status
             self.restart()
 
@@ -167,18 +183,14 @@ class ProcessLauncher(object):
         self.handle_signal()
 
     def handle_signal(self):
-        signal.signal(signal.SIGTERM, self._handle_signal)
-        signal.signal(signal.SIGINT, self._handle_signal)
-        signal.signal(signal.SIGHUP, self._handle_signal)
+        _set_signals_handler(self._handle_signal)
 
     def _handle_signal(self, signo, frame):
         self.sigcaught = signo
         self.running = False
 
         # Allow the process to be killed again and die from natural causes
-        signal.signal(signal.SIGTERM, signal.SIG_DFL)
-        signal.signal(signal.SIGINT, signal.SIG_DFL)
-        signal.signal(signal.SIGHUP, signal.SIG_DFL)
+        _set_signals_handler(signal.SIG_DFL)
 
     def _pipe_watcher(self):
         # This will block until the write end is closed when the parent
@@ -200,20 +212,22 @@ class ProcessLauncher(object):
             raise SignalExit(signal.SIGHUP)
 
         signal.signal(signal.SIGTERM, _sigterm)
-        signal.signal(signal.SIGHUP, _sighup)
+        if _sighup_supported():
+            signal.signal(signal.SIGHUP, _sighup)
         # Block SIGINT and let the parent send us a SIGTERM
         signal.signal(signal.SIGINT, signal.SIG_IGN)
 
     def _child_wait_for_exit_or_signal(self, launcher):
-        status = None
+        status = 0
         signo = 0
 
+        # NOTE(johannes): All exceptions are caught to ensure this
+        # doesn't fallback into the loop spawning children. It would
+        # be bad for a child to spawn more children.
         try:
             launcher.wait()
         except SignalExit as exc:
-            signame = {signal.SIGTERM: 'SIGTERM',
-                       signal.SIGINT: 'SIGINT',
-                       signal.SIGHUP: 'SIGHUP'}[exc.signo]
+            signame = _signo_to_signame(exc.signo)
             LOG.info(_('Caught %s, exiting'), signame)
             status = exc.code
             signo = exc.signo
@@ -262,14 +276,11 @@ class ProcessLauncher(object):
 
         pid = os.fork()
         if pid == 0:
-            # NOTE(johannes): All exceptions are caught to ensure this
-            # doesn't fallback into the loop spawning children. It would
-            # be bad for a child to spawn more children.
             launcher = self._child_process(wrap.service)
             while True:
                 self._child_process_handle_signal()
                 status, signo = self._child_wait_for_exit_or_signal(launcher)
-                if signo != signal.SIGHUP:
+                if not _is_sighup(signo):
                     break
                 launcher.restart()
 
@@ -339,11 +350,9 @@ class ProcessLauncher(object):
             self.handle_signal()
             self._respawn_children()
             if self.sigcaught:
-                signame = {signal.SIGTERM: 'SIGTERM',
-                           signal.SIGINT: 'SIGINT',
-                           signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
+                signame = _signo_to_signame(self.sigcaught)
                 LOG.info(_('Caught %s, stopping children'), signame)
-            if self.sigcaught != signal.SIGHUP:
+            if not _is_sighup(self.sigcaught):
                 break
 
             for pid in self.children:
diff --git a/keystone/openstack/common/sslutils.py b/keystone/openstack/common/sslutils.py
index 3aa975b8eb..567826960e 100644
--- a/keystone/openstack/common/sslutils.py
+++ b/keystone/openstack/common/sslutils.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2013 IBM Corp.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
diff --git a/keystone/openstack/common/strutils.py b/keystone/openstack/common/strutils.py
index dcde033319..4610ff0f4a 100644
--- a/keystone/openstack/common/strutils.py
+++ b/keystone/openstack/common/strutils.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
diff --git a/keystone/openstack/common/test.py b/keystone/openstack/common/test.py
new file mode 100644
index 0000000000..98fe3c9d21
--- /dev/null
+++ b/keystone/openstack/common/test.py
@@ -0,0 +1,52 @@
+# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Common utilities used in testing"""
+
+import os
+
+import fixtures
+import testtools
+
+_TRUE_VALUES = ('True', 'true', '1', 'yes')
+
+
+class BaseTestCase(testtools.TestCase):
+
+    def setUp(self):
+        super(BaseTestCase, self).setUp()
+        self._set_timeout()
+        self._fake_output()
+        self.useFixture(fixtures.FakeLogger('keystone.openstack.common'))
+        self.useFixture(fixtures.NestedTempfile())
+        self.useFixture(fixtures.TempHomeDir())
+
+    def _set_timeout(self):
+        test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
+        try:
+            test_timeout = int(test_timeout)
+        except ValueError:
+            # If timeout value is invalid do not set a timeout.
+            test_timeout = 0
+        if test_timeout > 0:
+            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
+
+    def _fake_output(self):
+        if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
+            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
+            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
+        if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
+            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
+            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
diff --git a/keystone/openstack/common/threadgroup.py b/keystone/openstack/common/threadgroup.py
index cde9fc7f20..7e9a012949 100644
--- a/keystone/openstack/common/threadgroup.py
+++ b/keystone/openstack/common/threadgroup.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2012 Red Hat, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -48,6 +46,9 @@ class Thread(object):
     def wait(self):
         return self.thread.wait()
 
+    def link(self, func, *args, **kwargs):
+        self.thread.link(func, *args, **kwargs)
+
 
 class ThreadGroup(object):
     """The point of the ThreadGroup classis to:
@@ -79,6 +80,7 @@ class ThreadGroup(object):
         gt = self.pool.spawn(callback, *args, **kwargs)
         th = Thread(gt, self)
         self.threads.append(th)
+        return th
 
     def thread_done(self, thread):
         self.threads.remove(thread)
diff --git a/keystone/openstack/common/timeutils.py b/keystone/openstack/common/timeutils.py
index aa9f708074..c8b0b15395 100644
--- a/keystone/openstack/common/timeutils.py
+++ b/keystone/openstack/common/timeutils.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
@@ -21,6 +19,7 @@ Time related utilities and helper functions.
 
 import calendar
 import datetime
+import time
 
 import iso8601
 import six
@@ -49,9 +48,9 @@ def parse_isotime(timestr):
     try:
         return iso8601.parse_date(timestr)
     except iso8601.ParseError as e:
-        raise ValueError(unicode(e))
+        raise ValueError(six.text_type(e))
     except TypeError as e:
-        raise ValueError(unicode(e))
+        raise ValueError(six.text_type(e))
 
 
 def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
@@ -90,6 +89,11 @@ def is_newer_than(after, seconds):
 
 def utcnow_ts():
     """Timestamp version of our utcnow function."""
+    if utcnow.override_time is None:
+        # NOTE(kgriffs): This is several times faster
+        # than going through calendar.timegm(...)
+        return int(time.time())
+
     return calendar.timegm(utcnow().timetuple())
 
 
@@ -111,12 +115,15 @@ def iso8601_from_timestamp(timestamp):
 utcnow.override_time = None
 
 
-def set_time_override(override_time=datetime.datetime.utcnow()):
+def set_time_override(override_time=None):
     """Overrides utils.utcnow.
 
     Make it return a constant time or a list thereof, one at a time.
+
+    :param override_time: datetime instance or list thereof. If not
+                          given, defaults to the current UTC time.
     """
-    utcnow.override_time = override_time
+    utcnow.override_time = override_time or datetime.datetime.utcnow()
 
 
 def advance_time_delta(timedelta):
@@ -169,6 +176,15 @@ def delta_seconds(before, after):
     datetime objects (as a float, to microsecond resolution).
     """
     delta = after - before
+    return total_seconds(delta)
+
+
+def total_seconds(delta):
+    """Return the total seconds of datetime.timedelta object.
+
+    Compute total seconds of datetime.timedelta, datetime.timedelta
+    doesn't have method total_seconds in Python2.6, calculate it manually.
+    """
     try:
         return delta.total_seconds()
     except AttributeError:
diff --git a/keystone/openstack/common/versionutils.py b/keystone/openstack/common/versionutils.py
new file mode 100644
index 0000000000..f8dc13ea8f
--- /dev/null
+++ b/keystone/openstack/common/versionutils.py
@@ -0,0 +1,43 @@
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Helpers for comparing version strings.
+"""
+
+import pkg_resources
+
+
+def is_compatible(requested_version, current_version, same_major=True):
+    """Determine whether `requested_version` is satisfied by
+    `current_version`; in other words, `current_version` is >=
+    `requested_version`.
+
+    :param requested_version: version to check for compatibility
+    :param current_version: version to check against
+    :param same_major: if True, the major version must be identical between
+        `requested_version` and `current_version`. This is used when a
+        major-version difference indicates incompatibility between the two
+        versions. Since this is the common-case in practice, the default is
+        True.
+    :returns: True if compatible, False if not
+    """
+    requested_parts = pkg_resources.parse_version(requested_version)
+    current_parts = pkg_resources.parse_version(current_version)
+
+    if same_major and (requested_parts[0] != current_parts[0]):
+        return False
+
+    return current_parts >= requested_parts
diff --git a/tools/colorizer.py b/tools/colorizer.py
index 13364badb0..c9f7d630a1 100755
--- a/tools/colorizer.py
+++ b/tools/colorizer.py
@@ -1,5 +1,4 @@
 #!/usr/bin/env python
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
 # Copyright (c) 2013, Nebula, Inc.
 # Copyright 2010 United States Government as represented by the
@@ -47,6 +46,7 @@ import subunit
 import sys
 import unittest
 
+import six
 import testtools
 
 
@@ -274,7 +274,7 @@ class OpenStackTestResult(testtools.TestResult):
         self.stopTestRun()
 
     def stopTestRun(self):
-        for cls in list(self.results.iterkeys()):
+        for cls in list(six.iterkeys(self.results)):
             self.writeTestCase(cls)
         self.stream.writeln()
         self.writeSlowTests()
diff --git a/tools/install_venv_common.py b/tools/install_venv_common.py
index b5ec5092f0..46822e3293 100644
--- a/tools/install_venv_common.py
+++ b/tools/install_venv_common.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
 # Copyright 2013 OpenStack Foundation
 # Copyright 2013 IBM Corp.
 #
@@ -114,12 +112,12 @@ class InstallVenv(object):
         print('Installing dependencies with pip (this can take a while)...')
 
         # First things first, make sure our venv has the latest pip and
-        # setuptools.
-        self.pip_install('pip>=1.3')
+        # setuptools and pbr
+        self.pip_install('pip>=1.4')
         self.pip_install('setuptools')
+        self.pip_install('pbr')
 
-        self.pip_install('-r', self.requirements)
-        self.pip_install('-r', self.test_requirements)
+        self.pip_install('-r', self.requirements, '-r', self.test_requirements)
 
     def parse_args(self, argv):
         """Parses command-line arguments."""