Integrate Rally & Alembic

Alembic allows us to change in future DB schema,
which we are going to do soon.

This patch adds under rally/common/db/sqlalchemy/migrations
required files for alembic as well as a first init migrations
that was autogenerated from models that we have.

As well this patch adds CLI commands for DB management

Co-Authored-By: Illia Khudoshyn <>
@ -5,3 +5,4 @@ source = rally
[report] [report]
ignore_errors = True ignore_errors = True
precision = 3 precision = 3
omit = */migrations/versions/

@ -33,6 +33,7 @@ Contents
user_stories user_stories
plugins plugins
plugin/plugin_reference plugin/plugin_reference
contribute contribute
gates gates
feature_requests feature_requests

@ -28,11 +28,34 @@ class DBCommands(object):
"""Commands for DB management.""" """Commands for DB management."""
def recreate(self): def recreate(self):
"""Drop and create Rally database.""" """Drop and create Rally database.
db.db_create() This will delete all existing data.
envutils.clear_env() envutils.clear_env()
def create(self):
"""Create Rally database."""
def upgrade(self):
"""Upgrade Rally database to the latest state."""
help=("Downgrade to specified revision UUID. "
"Current revision of DB could be found by calling "
"'rally-manage db revision'"))
def downgrade(self, revision):
"""Downgrade Rally database."""
def revision(self):
"""Print current Rally database revision UUID."""
def main(): def main():
categories = {"db": DBCommands} categories = {"db": DBCommands}

@ -65,19 +65,39 @@ def get_impl():
return IMPL return IMPL
def db_cleanup(): def engine_reset():
"""Recreate engine.""" """Reset DB engine."""
get_impl().db_cleanup() get_impl().engine_reset()
def db_create(): def schema_cleanup():
"""Initialize DB. This method will drop existing database.""" """Drop DB schema. This method drops existing database."""
get_impl().db_create() get_impl().schema_cleanup()
def db_drop(): def schema_upgrade(revision=None):
"""Drop DB. This method drop existing database.""" """Migrate the database to `revision` or the most recent revision."""
get_impl().db_drop() return get_impl().schema_upgrade(revision)
def schema_create():
"""Create database schema from models description."""
return get_impl().schema_create()
def schema_downgrade(revision):
"""Downgrade DB schema to specified revision."""
return get_impl().schema_downgrade(revision)
def schema_revision():
"""Return the schema revision."""
return get_impl().schema_revision()
def schema_stamp(revision):
"""Stamps database with provided revision."""
return get_impl().schema_stamp(revision)
def task_get(uuid): def task_get(uuid):

@ -16,6 +16,11 @@
SQLAlchemy implementation for DB.API SQLAlchemy implementation for DB.API
""" """
import os
import alembic
from alembic import config as alembic_config
import alembic.migration as alembic_migration
from oslo_config import cfg from oslo_config import cfg
from oslo_db import exception as db_exc from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import session as db_session from oslo_db.sqlalchemy import session as db_session
@ -33,6 +38,8 @@ CONF = cfg.CONF
_FACADE = None _FACADE = None
INITIAL_REVISION_UUID = "ca3626f62937"
def _create_facade_lazily(): def _create_facade_lazily():
global _FACADE global _FACADE
@ -58,19 +65,93 @@ def get_backend():
return Connection() return Connection()
def _alembic_config():
path = os.path.join(os.path.dirname(__file__), "alembic.ini")
config = alembic_config.Config(path)
return config
class Connection(object): class Connection(object):
def db_cleanup(self): def engine_reset(self):
global _FACADE global _FACADE
_FACADE = None _FACADE = None
def db_create(self): def schema_cleanup(self):
def db_drop(self):
models.drop_db() models.drop_db()
def schema_revision(self, config=None, engine=None):
"""Current database revision.
:param config: Instance of alembic config
:param engine: Instance of DB engine
:returns: Database revision
:rtype: string
engine = engine or get_engine()
with engine.connect() as conn:
context = alembic_migration.MigrationContext.configure(conn)
return context.get_current_revision()
def schema_upgrade(self, revision=None, config=None, engine=None):
"""Used for upgrading database.
:param revision: Desired database version
:type revision: string
:param config: Instance of alembic config
:param engine: Instance of DB engine
revision = revision or "head"
config = config or _alembic_config()
engine = engine or get_engine()
if self.schema_revision() is None:
self.schema_stamp(INITIAL_REVISION_UUID, config=config)
alembic.command.upgrade(config, revision or "head")
def schema_create(self, config=None, engine=None):
"""Create database schema from models description.
Can be used for initial installation instead of upgrade('head').
:param config: Instance of alembic config
:param engine: Instance of DB engine
engine = engine or get_engine()
# NOTE(viktors): If we will use metadata.create_all() for non empty db
# schema, it will only add the new tables, but leave
# existing as is. So we should avoid of this situation.
if self.schema_revision(engine=engine) is not None:
raise db_exc.DbMigrationError("DB schema is already under version"
" control. Use upgrade() instead")
self.schema_stamp("head", config=config)
def schema_downgrade(self, revision, config=None):
"""Used for downgrading database.
:param revision: Desired database revision
:type revision: string
:param config: Instance of alembic config
config = config or _alembic_config()
return alembic.command.downgrade(config, revision)
def schema_stamp(self, revision, config=None):
"""Stamps database with provided revision.
Don't run any migrations.
:param revision: Should match one from repository or head - to stamp
database with most recent revision
:type revision: string
:param config: Instance of alembic config
config = config or _alembic_config()
return alembic.command.stamp(config, revision=revision)
def model_query(self, model, session=None): def model_query(self, model, session=None):
"""The helper method to create query. """The helper method to create query.

.. _db_migrations:
Database upgrade/downgrade in Rally
Information for users
Rally supports DB schema versioning (schema versions are called *revisions*)
and migration (upgrade to later and downgrade to earlier revisions).
End user is provided with the following possibilities:
- Print current revision of DB.
.. code-block:: shell
rally-manage db revision
- Upgrade existing DB to the latest state.
This is needed when previously existing Rally installation is being
upgraded to a newer version. In this case user should issue command
.. code-block:: shell
rally-manage db upgrade
**AFTER** upgrading Rally package. DB schema
will get upgraded to the latest state and all existing data will be kept.
- Downgrade existing DB to a previous revision.
This command could be useful if user wants to return to an earlier version
of Rally. This could be done by issuing command
.. code-block:: shell
rally-manage db downgrade --revision <UUID>
Database schema downgrade **MUST** be done **BEFORE** Rally package is downgraded.
User must provide revision UUID to which the schema must be downgraded.
Information for developers
DB migration in Rally is implemented via package *alembic*.
It is highly recommended to get familiar with it's documnetation
available by the link_ before proceeding.
.. _link:
If developer is about to change existing DB schema they should
create new DB revision and migration script with the following command
.. code-block:: shell
alembic --config rally/common/db/sqlalchemy/alembic.ini revision -m <Message>
It will generate migration script -- a file named `<UUID>_<Message>.py`
located in `rally/common/db/sqlalchemy/migrations/versions`.
Generated script should then be checked, edited if it is needed to be
and added to Rally source tree.

from alembic import context
from rally.common.db.sqlalchemy import api
from rally.common.db.sqlalchemy import models
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
target_metadata = models.BASE.metadata
# other values from the config, defined by the needs of,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
engine = api.get_engine()
with engine.connect() as connection:
with context.begin_transaction():

"""Init migration
Revision ID: ca3626f62937
Create Date: 2016-01-07 00:27:39.687814
# revision identifiers, used by Alembic.
revision = "ca3626f62937"
down_revision = None
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
import rally
from rally.common.db.sqlalchemy import api
def upgrade():
dialect = api.get_engine().dialect
deployments_columns = [
sa.Column("created_at", sa.DateTime(), nullable=True),
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("uuid", sa.String(length=36), nullable=False),
sa.Column("parent_uuid", sa.String(length=36), nullable=True),
sa.Column("name", sa.String(length=255), nullable=True),
sa.Column("started_at", sa.DateTime(), nullable=True),
sa.Column("completed_at", sa.DateTime(), nullable=True),
sa.Column("admin", sa.PickleType(), nullable=True),
sa.Column("users", sa.PickleType(), nullable=False),
sa.Column("enum_deployments_status", sa.Enum(
"cleanup->failed", "cleanup->finished", "cleanup->started",
"deploy->failed", "deploy->finished", "deploy->inconsistent",
"deploy->init", "deploy->started", "deploy->subdeploy",
name="enum_deploy_status"), nullable=False),
["parent_uuid"], [u"deployments.uuid"],
name="fk_parent_uuid", use_alter=True)
# commands auto generated by Alembic - please adjust!
op.create_table("deployments", *deployments_columns)
op.create_index("deployment_parent_uuid", "deployments",
["parent_uuid"], unique=False)
op.create_index("deployment_uuid", "deployments", ["uuid"], unique=True)
if not"sqlite"):
op.create_foreign_key("fk_parent_uuid", "deployments", "deployments",
["parent_uuid"], ["uuid"])
sa.Column("created_at", sa.DateTime(), nullable=True),
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("hostname", sa.String(length=255), nullable=True),
sa.UniqueConstraint("hostname", name="uniq_worker@hostname")
sa.Column("created_at", sa.DateTime(), nullable=True),
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("provider_name", sa.String(length=255), nullable=True),
sa.Column("type", sa.String(length=255), nullable=True),
sa.Column("deployment_uuid", sa.String(length=36), nullable=False),
sa.ForeignKeyConstraint(["deployment_uuid"], [u"deployments.uuid"]),
op.create_index("resource_deployment_uuid", "resources",
["deployment_uuid"], unique=False)
op.create_index("resource_provider_name", "resources",
["deployment_uuid", "provider_name"], unique=False)
op.create_index("resource_provider_name_and_type", "resources",
["deployment_uuid", "provider_name", "type"],
op.create_index("resource_type", "resources",
["deployment_uuid", "type"], unique=False)
sa.Column("created_at", sa.DateTime(), nullable=True),
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("uuid", sa.String(length=36), nullable=False),
sa.Column("status", sa.Enum(
"aborted", "aborting", "cleaning up", "failed", "finished",
"init", "paused", "running", "setting up", "soft_aborting",
"verifying", name="enum_tasks_status"), nullable=False),
sa.Column("verification_log", sa.Text(), nullable=True),
sa.Column("tag", sa.String(length=64), nullable=True),
sa.Column("deployment_uuid", sa.String(length=36), nullable=False),
sa.ForeignKeyConstraint(["deployment_uuid"], [u"deployments.uuid"], ),
op.create_index("task_deployment", "tasks", ["deployment_uuid"],
op.create_index("task_status", "tasks", ["status"], unique=False)
op.create_index("task_uuid", "tasks", ["uuid"], unique=True)
sa.Column("created_at", sa.DateTime(), nullable=True),
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("uuid", sa.String(length=36), nullable=False),
sa.Column("deployment_uuid", sa.String(length=36), nullable=False),
sa.Column("status", sa.Enum(
"aborted", "aborting", "cleaning up", "failed", "finished",
"init", "paused", "running", "setting up", "soft_aborting",
"verifying", name="enum_tasks_status"), nullable=False),
sa.Column("set_name", sa.String(length=20), nullable=True),
sa.Column("tests", sa.Integer(), nullable=True),
sa.Column("errors", sa.Integer(), nullable=True),
sa.Column("failures", sa.Integer(), nullable=True),
sa.Column("time", sa.Float(), nullable=True),
sa.ForeignKeyConstraint(["deployment_uuid"], [u"deployments.uuid"], ),
op.create_index("verification_uuid", "verifications", ["uuid"],
sa.Column("created_at", sa.DateTime(), nullable=True),
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("task_uuid", sa.String(length=36), nullable=True),
sa.ForeignKeyConstraint(["task_uuid"], ["tasks.uuid"], ),
sa.Column("created_at", sa.DateTime(), nullable=True),
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("verification_uuid", sa.String(length=36), nullable=True),
sa.ForeignKeyConstraint(["verification_uuid"], ["verifications.uuid"]),
# end Alembic commands
def downgrade():
# commands auto generated by Alembic - please adjust!
op.drop_index("verification_uuid", table_name="verifications")
op.drop_index("task_uuid", table_name="tasks")
op.drop_index("task_status", table_name="tasks")
op.drop_index("task_deployment", table_name="tasks")
op.drop_index("resource_type", table_name="resources")
op.drop_index("resource_provider_name_and_type", table_name="resources")
op.drop_index("resource_provider_name", table_name="resources")
op.drop_index("resource_deployment_uuid", table_name="resources")
op.drop_index("deployment_uuid", table_name="deployments")
op.drop_index("deployment_parent_uuid", table_name="deployments")
# end Alembic commands

hostname = sa.Column(sa.String(255)) hostname = sa.Column(sa.String(255))
def create_db():
from rally.common.db.sqlalchemy import api as sa_api
# TODO(boris-42): Remove it after oslo.db > 1.4.1 will be released. # TODO(boris-42): Remove it after oslo.db > 1.4.1 will be released.
def drop_all_objects(engine): def drop_all_objects(engine):
"""Drop all database objects. """Drop all database objects.

@ -1,6 +1,7 @@
# The order of packages is significant, because pip processes them in the order # The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration # of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later. # process, which may cause wedges in the gate later.
alembic>=0.8.0 # MIT
Babel>=1.3 # BSD Babel>=1.3 # BSD
boto>=2.32.1 # MIT boto>=2.32.1 # MIT
decorator>=3.4.0 # BSD decorator>=3.4.0 # BSD

@ -36,8 +36,37 @@ class DBCommandsTestCase(test.TestCase):
super(DBCommandsTestCase, self).setUp() super(DBCommandsTestCase, self).setUp()
self.db_commands = manage.DBCommands() self.db_commands = manage.DBCommands()
@mock.patch("rally.cli.manage.db") @mock.patch("rally.cli.manage.db")
def test_recreate(self, mock_db): def test_recreate(self, mock_db, mock_envutils):
self.db_commands.recreate() self.db_commands.recreate()
calls = [,] db_calls = [,]
self.assertEqual(db_calls, mock_db.mock_calls)
envutils_calls = []
self.assertEqual(envutils_calls, mock_envutils.mock_calls)
def test_create(self, mock_db):
calls = []
self.assertEqual(calls, mock_db.mock_calls) self.assertEqual(calls, mock_db.mock_calls)
def test_upgrade(self, mock_db):
calls = []
self.assertEqual(calls, mock_db.mock_calls)
def test_downgrade(self, mock_db):
revision = mock.MagicMock()
calls = []
self.assertEqual(calls, mock_db.mock_calls)
def test_revision(self, mock_db):
calls = []

@ -478,4 +478,4 @@ class WorkerTestCase(test.DBTestCase):
self.assertNotEqual(self.worker["updated_at"], worker["updated_at"]) self.assertNotEqual(self.worker["updated_at"], worker["updated_at"])
def test_update_worker_not_found(self): def test_update_worker_not_found(self):
self.assertRaises(exceptions.WorkerNotFound, db.update_worker, "fake") self.assertRaises(exceptions.WorkerNotFound, db.update_worker, "fake")

"""Tests for DB migration."""
import pprint
import alembic
import mock
from oslo_db.sqlalchemy import test_migrations
import six
import sqlalchemy as sa
import rally
from rally.common import db
from rally.common.db.sqlalchemy import api
from rally.common.db.sqlalchemy import models
from tests.unit import test as rtest
class MigrationTestCase(rtest.DBTestCase,
"""Test for checking of equality models state and migrations.
For the opportunistic testing you need to set up a db named
'openstack_citest' with user 'openstack_citest' and password
'openstack_citest' on localhost.
The test will then use that db and user/password combo to run the tests.
For PostgreSQL on Ubuntu this can be done with the following commands::
sudo -u postgres psql
postgres=# create user openstack_citest with createdb login password
postgres=# create database openstack_citest with owner
For MySQL on Ubuntu this can be done with the following commands::
mysql -u root
>create database openstack_citest;
>grant all privileges on openstack_citest.* to
openstack_citest@localhost identified by 'openstack_citest';
Output is a list that contains information about differences between db and
models. Output example::
Table('bat', MetaData(bind=None),
Column('info', String(), table=<bat>), schema=None)),
Table(u'bar', MetaData(bind=None),
Column(u'data', VARCHAR(), table=<bar>), schema=None)),
Column('data', Integer(), table=<foo>)),
Column(u'old_data', VARCHAR(), table=None)),
{'existing_server_default': None,
'existing_type': INTEGER()},
* ``remove_*`` means that there is extra table/column/constraint in db;
* ``add_*`` means that it is missing in db;
* ``modify_*`` means that on column in db is set wrong
type/nullable/server_default. Element contains information:
- what should be modified,
- schema,
- table,
- column,
- existing correct column parameters,
- right value,
- wrong value.
def setUp(self):
# we change DB metadata in tests so we reload
# models to refresh the metadata to it's original state
super(MigrationTestCase, self).setUp()
self.alembic_config = api._alembic_config()
self.engine = api.get_engine()
# remove everything from DB and stamp it as 'base'
# so that migration (i.e. upgrade up to 'head')
# will actually take place
def db_sync(self, engine):
def get_engine(self):
return self.engine
def get_metadata(self):
return models.BASE.metadata
def include_object(self, object_, name, type_, reflected, compare_to):
if type_ == "table" and name == "alembic_version":
return False
return super(MigrationTestCase, self).include_object(
object_, name, type_, reflected, compare_to)
def _create_fake_model(self, table_name):
(models.BASE, models.RallyBase),
{"__tablename__": table_name,
"id": sa.Column(sa.Integer, primary_key=True,
def _get_metadata_diff(self):
with self.get_engine().connect() as conn:
opts = {
"include_object": self.include_object,
"compare_type": self.compare_type,
"compare_server_default": self.compare_server_default,
mc = alembic.migration.MigrationContext.configure(conn, opts=opts)
# compare schemas and fail with diff, if it"s not empty
diff = self.filter_metadata_diff(
alembic.autogenerate.compare_metadata(mc, self.get_metadata()))
return diff
def test_models_sync(self, mock_connection_schema_stamp):
# drop all tables after a test run
# run migration scripts
diff = self._get_metadata_diff()
if diff:
msg = pprint.pformat(diff, indent=2, width=20)
"Models and migration scripts aren't in sync:\n%s" % msg)
def test_models_sync_negative__missing_table_in_script(
self, mock_connection_schema_stamp):
# drop all tables after a test run
# run migration scripts
diff = self._get_metadata_diff()
self.assertEqual(1, len(diff))
action, object = diff[0]
self.assertEqual("add_table", action)
self.assertIsInstance(object, sa.Table)
def test_models_sync_negative__missing_model_in_metadata(
self, mock_connection_schema_stamp):
# drop all tables after a test run
table = self.get_metadata().tables["workers"]
# run migration scripts
diff = self._get_metadata_diff()
self.assertEqual(1, len(diff))
action, object = diff[0]
self.assertEqual("remove_table", action)
self.assertIsInstance(object, sa.Table)

@ -31,10 +31,10 @@ class DatabaseFixture(fixture.Config):
def setUp(self): def setUp(self):
super(DatabaseFixture, self).setUp() super(DatabaseFixture, self).setUp()
db_url = os.environ.get("RALLY_UNITTEST_DB_URL", "sqlite://") db_url = os.environ.get("RALLY_UNITTEST_DB_URL", "sqlite://")
db.db_cleanup() db.engine_reset()
self.conf.set_default("connection", db_url, group="database") self.conf.set_default("connection", db_url, group="database")
db.db_drop() db.schema_cleanup()
db.db_create() db.schema_create()
class TestCase(base.BaseTestCase): class TestCase(base.BaseTestCase):