Refacotr DB layer (part 1)

- Remove usless abstraction of db.api
- Split API methods to schema management and just record related ops
- Remove part of oslo.db and oslo.utils dependency (need one more patch)
- Improve performance & refactor mulitple requests

Change-Id: I56ca311ea354409e9b19511dca77cda27be5d52b
This commit is contained in:
Boris Pavlovic 2018-02-19 04:00:58 -08:00
parent f6b3532ccb
commit b345294991
51 changed files with 1162 additions and 1608 deletions

View File

@ -1 +1 @@
../../../rally/common/db/sqlalchemy/migrations/README.rst
../../../rally/common/db/migrations/README.rst

View File

@ -33,9 +33,9 @@ class DBCommands(object):
"""
print("Recreating database: ", end="")
self.show(api, True)
db.schema_cleanup()
db.schema.schema_cleanup()
print("Database deleted successfully")
db.schema_create()
db.schema.schema_create()
print("Database created successfully")
envutils.clear_env()
@ -43,7 +43,7 @@ class DBCommands(object):
"""Create Rally database."""
print("Creating database: ", end="")
self.show(api, True)
db.schema_create()
db.schema.schema_create()
print("Database created successfully")
def ensure(self, api):
@ -51,8 +51,8 @@ class DBCommands(object):
print("Ensuring database exists: ", end="")
self.show(api, True)
if not db.schema_revision():
db.schema_create()
if not db.schema.schema_revision():
db.schema.schema_create()
print("Database created successfully")
else:
print("Database already exists, nothing to do")
@ -62,9 +62,9 @@ class DBCommands(object):
print("Upgrading database: ", end="")
self.show(api, True)
start_revision = db.schema_revision()
db.schema_upgrade()
current_revision = db.schema_revision()
start_revision = db.schema.schema_revision()
db.schema.schema_upgrade()
current_revision = db.schema.schema_revision()
if start_revision != current_revision:
print("Database schema upgraded successfully "
"from {start} to {end} revision."
@ -74,7 +74,7 @@ class DBCommands(object):
def revision(self, api):
"""Print current Rally database revision UUID."""
print(db.schema_revision())
print(db.schema.schema_revision())
@cliutils.args("--creds", action="store_true", dest="show_creds",
help="Do not hide credentials from connection string")

View File

@ -13,4 +13,5 @@
# License for the specific language governing permissions and limitations
# under the License.
from rally.common.db.api import * # noqa
from rally.common.db.api import * # noqa
from rally.common.db import schema # noqa

View File

@ -2,7 +2,7 @@
[alembic]
# path to migration scripts
script_location = rally.common.db.sqlalchemy:migrations
script_location = rally.common.db:migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s

File diff suppressed because it is too large Load Diff

View File

@ -60,13 +60,13 @@ create a new DB revision and a migration script with the following command.
.. code-block:: shell
alembic --config rally/common/db/sqlalchemy/alembic.ini revision -m <Message>
alembic --config rally/common/db/alembic.ini revision -m <Message>
or
.. code-block:: shell
alembic --config rally/common/db/sqlalchemy/alembic.ini revision --autogenerate -m <Message>
alembic --config rally/common/db/alembic.ini revision --autogenerate -m <Message>
It will generate migration script -- a file named `<UUID>_<Message>.py`
located in `rally/common/db/sqlalchemy/migrations/versions`.

View File

@ -15,8 +15,8 @@
from alembic import context
from rally.common.db.sqlalchemy import api
from rally.common.db.sqlalchemy import models
from rally.common.db import api
from rally.common.db import models
# add your model's MetaData object here
# for 'autogenerate' support

View File

@ -20,13 +20,6 @@ Create Date: 2016-09-12 15:47:11.279610
"""
# revision identifiers, used by Alembic.
revision = "08e1515a576c"
down_revision = "54e844ebfbc3"
branch_labels = None
depends_on = None
import json
from alembic import op
@ -36,6 +29,13 @@ from rally import consts
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "08e1515a576c"
down_revision = "54e844ebfbc3"
branch_labels = None
depends_on = None
task_helper = sa.Table(
"tasks",
sa.MetaData(),

View File

@ -21,6 +21,12 @@ Create Date: 2016-03-01 16:01:38.747048
"""
from alembic import op
import sqlalchemy as sa
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "3177d36ea270"
down_revision = "ca3626f62937"
@ -28,12 +34,6 @@ branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from rally import exceptions
deployments_helper = sa.Table(
"deployments",
sa.MetaData(),

View File

@ -20,18 +20,18 @@ Create Date: 2016-08-29 08:32:30.818019
"""
from alembic import op
import sqlalchemy as sa
from rally.common.db import sa_types
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "32fada9b2fde"
down_revision = "6ad4f426f005"
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from rally.common.db.sqlalchemy import types as sa_types
from rally import exceptions
deployments_helper = sa.Table(
"deployments",

View File

@ -26,7 +26,7 @@ Create Date: 2017-06-07 19:50:03.572493
from alembic import op
import sqlalchemy as sa
from rally.common.db.sqlalchemy import types as sa_types
from rally.common.db import sa_types
from rally import exceptions
# revision identifiers, used by Alembic.

View File

@ -20,6 +20,12 @@ Create Date: 2016-12-29 19:54:23.804525
"""
from alembic import op
import sqlalchemy as sa
from rally.common.db import sa_types
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "37fdbb373e8d"
down_revision = "484cd9413e66"
@ -27,13 +33,6 @@ branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from rally.common.db.sqlalchemy import types as sa_types
from rally import exceptions
verifications_helper = sa.Table(
"verifications",
sa.MetaData(),

View File

@ -23,7 +23,7 @@ Create Date: 2017-10-15 22:45:04.963524
from alembic import op
import sqlalchemy as sa
from rally.common.db.sqlalchemy import types as sa_types
from rally.common.db import sa_types
from rally import exceptions
from rally.task.processing import charts

View File

@ -19,6 +19,13 @@ Revises: e654a0648db0
Create Date: 2016-11-04 17:04:24.614075
"""
from alembic import op
from oslo_utils import timeutils
import sqlalchemy as sa
from rally.common.db import models
from rally.common.db import sa_types
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "484cd9413e66"
@ -26,14 +33,6 @@ down_revision = "e654a0648db0"
branch_labels = None
depends_on = None
from alembic import op
from oslo_utils import timeutils
import sqlalchemy as sa
from rally.common.db.sqlalchemy import models
from rally.common.db.sqlalchemy import types as sa_types
from rally import exceptions
TASK_STATUSES = ["aborted", "aborting", "cleaning up", "failed", "finished",
"init", "paused", "running", "setting up", "soft_aborting",

View File

@ -20,19 +20,19 @@ Create Date: 2016-04-22 21:28:50.745316
"""
from alembic import op
import sqlalchemy as sa
from rally.common.db import sa_types
from rally import consts
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "4ef544102ba7"
down_revision = "f33f4610dcda"
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from rally.common.db.sqlalchemy import types as sa_types
from rally import consts
from rally import exceptions
OLD_STATUS = [
"aborted", "aborting", "cleaning up", "failed", "finished",

View File

@ -22,18 +22,18 @@ Create Date: 2016-07-24 14:53:39.323105
"""
from alembic import op # noqa
import sqlalchemy as sa # noqa
from rally.common.db import sa_types
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "54e844ebfbc3"
down_revision = "3177d36ea270"
branch_labels = None
depends_on = None
from alembic import op # noqa
import sqlalchemy as sa # noqa
from rally.common.db.sqlalchemy import types as sa_types
from rally import exceptions
deployments_helper = sa.Table(
"deployments",

View File

@ -21,6 +21,11 @@ Revises: 08e1515a576c
Create Date: 2016-09-13 18:11:47.703023
"""
from alembic import op # noqa
import sqlalchemy as sa # noqa
from rally.common.db import sa_types
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "6ad4f426f005"
@ -28,12 +33,6 @@ down_revision = "08e1515a576c"
branch_labels = None
depends_on = None
from alembic import op # noqa
import sqlalchemy as sa # noqa
from rally.common.db.sqlalchemy import types as sa_types
from rally import exceptions
task_results_helper = sa.Table(
"task_results",

View File

@ -27,7 +27,7 @@ from alembic import op
from oslo_utils import timeutils
import sqlalchemy as sa
from rally.common.db.sqlalchemy import types as sa_types
from rally.common.db import sa_types
from rally import exceptions
# revision identifiers, used by Alembic.

View File

@ -19,6 +19,11 @@ Revises: 4ef544102ba7
Create Date: 2017-02-01 12:52:43.499663
"""
from alembic import op
import sqlalchemy as sa
from rally.common.db import sa_types
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "92aaaa2a6bb3"
@ -26,12 +31,6 @@ down_revision = "4ef544102ba7"
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from rally.common.db.sqlalchemy import types as sa_types
from rally import exceptions
deployments_helper = sa.Table(
"deployments",

View File

@ -23,7 +23,7 @@ Create Date: 2018-02-04 13:48:35.779255
from alembic import op
import sqlalchemy as sa
from rally.common.db.sqlalchemy import types as sa_types
from rally.common.db import sa_types
from rally import exceptions

View File

@ -23,7 +23,7 @@ Create Date: 2017-12-27 13:37:10.144970
from alembic import op
import sqlalchemy as sa
from rally.common.db.sqlalchemy import types as sa_types
from rally.common.db import sa_types
from rally import exceptions

View File

@ -19,6 +19,10 @@ Revises: 37fdbb373e8d
Create Date: 2017-01-17 18:47:10.700459
"""
from alembic import op
import sqlalchemy as sa
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "a6f364988fc2"
@ -27,12 +31,6 @@ branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from rally import exceptions
TAG_TYPES = ["task", "subtask"]
tag_helper = sa.Table(

View File

@ -21,6 +21,13 @@ Revises:
Create Date: 2016-01-07 00:27:39.687814
"""
from alembic import op
import sqlalchemy as sa
from rally.common.db import api
from rally.common.db import sa_types
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "ca3626f62937"
@ -29,14 +36,6 @@ branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
import rally
from rally.common.db.sqlalchemy import api
from rally import exceptions
def upgrade():
dialect = api.get_engine().dialect
@ -49,10 +48,7 @@ def upgrade():
sa.Column("name", sa.String(length=255), nullable=True),
sa.Column("started_at", sa.DateTime(), nullable=True),
sa.Column("completed_at", sa.DateTime(), nullable=True),
sa.Column(
"config",
rally.common.db.sqlalchemy.types.MutableJSONEncodedDict(),
nullable=False),
sa.Column("config", sa_types.MutableJSONEncodedDict(), nullable=False),
sa.Column("admin", sa.PickleType(), nullable=True),
sa.Column("users", sa.PickleType(), nullable=False),
sa.Column("enum_deployments_status", sa.Enum(
@ -100,10 +96,7 @@ def upgrade():
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("provider_name", sa.String(length=255), nullable=True),
sa.Column("type", sa.String(length=255), nullable=True),
sa.Column(
"info",
rally.common.db.sqlalchemy.types.MutableJSONEncodedDict(),
nullable=False),
sa.Column("info", sa_types.MutableJSONEncodedDict(), nullable=False),
sa.Column("deployment_uuid", sa.String(length=36), nullable=False),
sa.ForeignKeyConstraint(["deployment_uuid"], [u"deployments.uuid"]),
sa.PrimaryKeyConstraint("id")
@ -173,14 +166,8 @@ def upgrade():
sa.Column("created_at", sa.DateTime(), nullable=True),
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"key",
rally.common.db.sqlalchemy.types.MutableJSONEncodedDict(),
nullable=False),
sa.Column(
"data",
rally.common.db.sqlalchemy.types.MutableJSONEncodedDict(),
nullable=False),
sa.Column("key", sa_types.MutableJSONEncodedDict(), nullable=False),
sa.Column("data", sa_types.MutableJSONEncodedDict(), nullable=False),
sa.Column("task_uuid", sa.String(length=36), nullable=True),
sa.ForeignKeyConstraint(["task_uuid"], ["tasks.uuid"], ),
sa.PrimaryKeyConstraint("id")
@ -192,10 +179,7 @@ def upgrade():
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("verification_uuid", sa.String(length=36), nullable=True),
sa.Column(
"data",
rally.common.db.sqlalchemy.types.MutableJSONEncodedDict(),
nullable=False),
sa.Column("data", sa_types.MutableJSONEncodedDict(), nullable=False),
sa.ForeignKeyConstraint(["verification_uuid"], ["verifications.uuid"]),
sa.PrimaryKeyConstraint("id")
)

View File

@ -23,7 +23,7 @@ Create Date: 2017-10-24 15:50:17.493354
from alembic import op
import sqlalchemy as sa
from rally.common.db.sqlalchemy import types as sa_types
from rally.common.db import sa_types
from rally import exceptions
from rally import plugins
from rally.task import context

View File

@ -19,6 +19,10 @@ Revises: 7948b83229f6
Create Date: 2017-09-05 16:34:47.434748
"""
from alembic import op
import sqlalchemy as sa
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "e0a5df2c5153"
@ -27,12 +31,6 @@ branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from rally import exceptions
def upgrade():
with op.batch_alter_table("tasks") as batch_op:
batch_op.alter_column(

View File

@ -20,13 +20,6 @@ Revises: 3177d36ea270
Create Date: 2016-04-01 14:36:56.373349
"""
# revision identifiers, used by Alembic.
revision = "e654a0648db0"
down_revision = "32fada9b2fde"
branch_labels = None
depends_on = None
import datetime as dt
import json
import uuid
@ -34,9 +27,16 @@ import uuid
from alembic import op
import sqlalchemy as sa
from rally.common.db.sqlalchemy import types as sa_types
from rally.common.db import sa_types
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "e654a0648db0"
down_revision = "32fada9b2fde"
branch_labels = None
depends_on = None
taskhelper = sa.Table(
"tasks",
sa.MetaData(),

View File

@ -20,17 +20,17 @@ Create Date: 2017-01-23 13:56:30.999593
"""
from alembic import op
import sqlalchemy as sa
from rally import exceptions
# revision identifiers, used by Alembic.
revision = "f33f4610dcda"
down_revision = "a6f364988fc2"
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from rally import exceptions
verifications_helper = sa.Table(
"verifications",

View File

@ -15,45 +15,59 @@
"""
SQLAlchemy models for rally data.
"""
import datetime as dt
import uuid
from oslo_db.sqlalchemy.compat import utils as compat_utils
from oslo_db.sqlalchemy import models
from oslo_utils import timeutils
import six
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import deferred
from sqlalchemy import schema
import sqlalchemy.ext.declarative
import sqlalchemy.orm # noqa (used as sa.orm)
from rally.common.db.sqlalchemy import types as sa_types
from rally.common.db import sa_types
from rally import consts
BASE = declarative_base()
BASE = sa.ext.declarative.declarative_base()
def UUID():
return str(uuid.uuid4())
class RallyBase(models.ModelBase):
class RallyBase(six.Iterator):
"""Base class for models."""
__table_initialized__ = False
metadata = None
created_at = sa.Column(sa.DateTime, default=lambda: timeutils.utcnow())
updated_at = sa.Column(sa.DateTime, default=lambda: timeutils.utcnow(),
onupdate=lambda: timeutils.utcnow())
def save(self, session=None):
# NOTE(LimingWu): We can't direct import the api module. That will
# result in the cyclic reference import since the api has imported
# this module.
from rally.common.db.sqlalchemy import api as sa_api
created_at = sa.Column(sa.DateTime, default=dt.datetime.utcnow)
updated_at = sa.Column(sa.DateTime, default=dt.datetime.utcnow,
onupdate=dt.datetime.utcnow)
if session is None:
session = sa_api.get_session()
def save(self, session):
"""Save this object."""
super(RallyBase, self).save(session=session)
# NOTE(boris-42): This part of code should be look like:
# session.add(self)
# session.flush()
# But there is a bug in sqlalchemy and eventlet that
# raises NoneType exception if there is no running
# transaction and rollback is called. As long as
# sqlalchemy has this bug we have to create transaction
# explicitly.
with session.begin(subtransactions=True):
session.add(self)
session.flush()
def __getitem__(self, key):
return getattr(self, key)
def get(self, key, default=None):
return getattr(self, key, default)
def update(self, values):
"""Make the model object behave like a dict."""
for k, v in values.items():
setattr(self, k, v)
class Env(BASE, RallyBase):
@ -112,7 +126,7 @@ class Task(BASE, RallyBase):
env_uuid = sa.Column(sa.String(36), nullable=False)
# we do not save the whole input task
input_task = deferred(sa.Column(sa.Text, default=""))
input_task = sa.orm.deferred(sa.Column(sa.Text, default=""))
title = sa.Column(sa.String(128), default="")
description = sa.Column(sa.Text, default="")
@ -121,7 +135,7 @@ class Task(BASE, RallyBase):
sa_types.MutableJSONEncodedDict, default={}, nullable=False)
# we do not calculate the duration of a validation step yet
validation_duration = deferred(sa.Column(sa.Float))
validation_duration = sa.orm.deferred(sa.Column(sa.Float))
task_duration = sa.Column(sa.Float, default=0.0)
pass_sla = sa.Column(sa.Boolean, default=True)
@ -156,17 +170,17 @@ class Subtask(BASE, RallyBase):
# we do not support subtask contexts feature yet, see
# https://review.openstack.org/#/c/404168/
contexts = deferred(sa.Column(
contexts = sa.orm.deferred(sa.Column(
sa_types.JSONEncodedDict, default={}, nullable=False))
contexts_results = deferred(sa.Column(
contexts_results = sa.orm.deferred(sa.Column(
sa_types.MutableJSONEncodedList, default=[], nullable=False))
sla = sa.Column(
sa_types.JSONEncodedDict, default={}, nullable=False)
# It is always False for now
run_in_parallel = deferred(
run_in_parallel = sa.orm.deferred(
sa.Column(sa.Boolean, default=False, nullable=False))
duration = sa.Column(sa.Float, default=0.0)
@ -242,7 +256,7 @@ class Workload(BASE, RallyBase):
sa_types.MutableJSONEncodedDict, default={}, nullable=False)
pass_sla = sa.Column(sa.Boolean, default=True)
_profiling_data = deferred(sa.Column(sa.Text, default=""))
_profiling_data = sa.orm.deferred(sa.Column(sa.Text, default=""))
class WorkloadData(BASE, RallyBase):
@ -277,14 +291,17 @@ class WorkloadData(BASE, RallyBase):
chunk_data = sa.Column(
sa_types.MutableJSONEncodedDict, default={}, nullable=False)
# all these fields are not used
iteration_count = deferred(sa.Column(sa.Integer, nullable=False))
failed_iteration_count = deferred(sa.Column(sa.Integer, nullable=False))
chunk_size = deferred(sa.Column(sa.Integer, nullable=False))
compressed_chunk_size = deferred(sa.Column(sa.Integer, nullable=False))
started_at = deferred(sa.Column(
sa.DateTime, default=lambda: timeutils.utcnow(), nullable=False))
finished_at = deferred(sa.Column(
sa.DateTime, default=lambda: timeutils.utcnow(), nullable=False))
iteration_count = sa.orm.deferred(sa.Column(sa.Integer, nullable=False))
failed_iteration_count = sa.orm.deferred(sa.Column(
sa.Integer, nullable=False))
chunk_size = sa.orm.deferred(sa.Column(
sa.Integer, nullable=False))
compressed_chunk_size = sa.orm.deferred(sa.Column(
sa.Integer, nullable=False))
started_at = sa.orm.deferred(sa.Column(
sa.DateTime, default=dt.datetime.utcnow, nullable=False))
finished_at = sa.orm.deferred(sa.Column(
sa.DateTime, default=dt.datetime.utcnow, nullable=False))
class Tag(BASE, RallyBase):
@ -295,9 +312,7 @@ class Tag(BASE, RallyBase):
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
uuid = sa.Column(sa.String(36), default=UUID, nullable=False)
type = sa.Column(sa.String(36), nullable=False)
tag = sa.Column(sa.String(255), nullable=False)
@ -358,52 +373,3 @@ class Verification(BASE, RallyBase):
tests_duration = sa.Column(sa.Float, default=0.0)
tests = sa.Column(sa_types.MutableJSONEncodedDict, default={})
# TODO(boris-42): Remove it after oslo.db > 1.4.1 will be released.
def drop_all_objects(engine):
"""Drop all database objects.
Drops all database objects remaining on the default schema of the given
engine. Per-db implementations will also need to drop items specific to
those systems, such as sequences, custom types (e.g. pg ENUM), etc.
"""
with engine.begin() as conn:
inspector = sa.inspect(engine)
metadata = schema.MetaData()
tbs = []
all_fks = []
for table_name in inspector.get_table_names():
fks = []
for fk in inspector.get_foreign_keys(table_name):
if not fk["name"]:
continue
fks.append(
schema.ForeignKeyConstraint((), (), name=fk["name"]))
table = schema.Table(table_name, metadata, *fks)
tbs.append(table)
all_fks.extend(fks)
if engine.name != "sqlite":
for fkc in all_fks:
conn.execute(schema.DropConstraint(fkc))
for table in tbs:
conn.execute(schema.DropTable(table))
if engine.name == "postgresql":
if compat_utils.sqla_100:
enums = [e["name"] for e in sa.inspect(conn).get_enums()]
else:
enums = conn.dialect._load_enums(conn).keys()
for e in enums:
conn.execute("DROP TYPE %s" % e)
def drop_db():
# NOTE(LimingWu): We can't direct import the api module. That will
# result in the cyclic reference import since the api has imported
# this module.
from rally.common.db.sqlalchemy import api as sa_api
drop_all_objects(sa_api.get_engine())

152
rally/common/db/schema.py Normal file
View File

@ -0,0 +1,152 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import alembic
import alembic.config
import alembic.migration
import alembic.script
import sqlalchemy as sa
import sqlalchemy.schema # noqa
from rally.common.db import api
from rally.common.db import models
from rally import exceptions
INITIAL_REVISION_UUID = "ca3626f62937"
def _alembic_config():
path = os.path.join(os.path.dirname(__file__), "alembic.ini")
config = alembic.config.Config(path)
return config
def schema_cleanup():
"""Drop all database objects.
Drops all database objects remaining on the default schema of the given
engine. Per-db implementations will also need to drop items specific to
those systems, such as sequences, custom types (e.g. pg ENUM), etc.
"""
engine = api.get_engine()
with engine.begin() as conn:
inspector = sa.inspect(engine)
metadata = sa.schema.MetaData()
tbs = []
all_fks = []
for table_name in inspector.get_table_names():
fks = []
for fk in inspector.get_foreign_keys(table_name):
if not fk["name"]:
continue
fks.append(
sa.schema.ForeignKeyConstraint((), (), name=fk["name"]))
table = sa.schema.Table(table_name, metadata, *fks)
tbs.append(table)
all_fks.extend(fks)
if engine.name != "sqlite":
for fkc in all_fks:
conn.execute(sa.schema.DropConstraint(fkc))
for table in tbs:
conn.execute(sa.schema.DropTable(table))
if engine.name == "postgresql":
sqla_100 = int(sa.__version__.split(".")[0]) >= 1
if sqla_100:
enums = [e["name"] for e in sa.inspect(conn).get_enums()]
else:
enums = conn.dialect._load_enums(conn).keys()
for e in enums:
conn.execute("DROP TYPE %s" % e)
def schema_revision(config=None, engine=None, detailed=False):
"""Current database revision.
:param config: Instance of alembic config
:param engine: Instance of DB engine
:param detailed: whether to return a dict with detailed data
:rtype detailed: bool
:returns: Database revision
:rtype: string
:rtype: dict
"""
engine = engine or api.get_engine()
with engine.connect() as conn:
context = alembic.migration.MigrationContext.configure(conn)
revision = context.get_current_revision()
if detailed:
config = config or _alembic_config()
sc_dir = alembic.script.ScriptDirectory.from_config(config)
return {"revision": revision,
"current_head": sc_dir.get_current_head()}
return revision
def schema_upgrade(revision=None, config=None, engine=None):
"""Used for upgrading database.
:param revision: Desired database version
:type revision: string
:param config: Instance of alembic config
:param engine: Instance of DB engine
"""
revision = revision or "head"
config = config or _alembic_config()
engine = engine or api.get_engine()
if schema_revision() is None:
schema_stamp(INITIAL_REVISION_UUID, config=config)
alembic.command.upgrade(config, revision or "head")
def schema_create(config=None, engine=None):
"""Create database schema from models description.
Can be used for initial installation instead of upgrade('head').
:param config: Instance of alembic config
:param engine: Instance of DB engine
"""
engine = engine or api.get_engine()
# NOTE(viktors): If we will use metadata.create_all() for non empty db
# schema, it will only add the new tables, but leave
# existing as is. So we should avoid of this situation.
if schema_revision(engine=engine) is not None:
raise exceptions.DBMigrationError("DB schema is already under version"
" control. Use upgrade() instead")
models.BASE.metadata.create_all(engine)
schema_stamp("head", config=config)
def schema_stamp(revision, config=None):
"""Stamps database with provided revision.
Don't run any migrations.
:param revision: Should match one from repository or head - to stamp
database with most recent revision
:type revision: string
:param config: Instance of alembic config
"""
config = config or _alembic_config()
return alembic.command.stamp(config, revision=revision)

View File

@ -1,854 +0,0 @@
# Copyright 2013: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
SQLAlchemy implementation for DB.API
"""
import collections
import datetime as dt
import os
import time
import alembic
from alembic import config as alembic_config
import alembic.migration as alembic_migration
from alembic import script as alembic_script
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import session as db_session
import six
import sqlalchemy as sa
import sqlalchemy.orm # noqa
from rally.common import cfg
from rally.common.db.sqlalchemy import models
from rally import consts
from rally import exceptions
from rally.task.processing import charts
CONF = cfg.CONF
_FACADE = None
INITIAL_REVISION_UUID = "ca3626f62937"
def serialize_data(data):
if data is None:
return None
if isinstance(data, (six.integer_types,
six.string_types,
six.text_type,
dt.date,
dt.time,
float,
)):
return data
if isinstance(data, dict):
return collections.OrderedDict((k, serialize_data(v))
for k, v in data.items())
if isinstance(data, (list, tuple)):
return [serialize_data(i) for i in data]
if hasattr(data, "_as_dict"):
# NOTE(andreykurilin): it is an instance of the Model. It support a
# method `_as_dict`, which should transform an object into dict
# (quite logical as from the method name), BUT it does some extra
# work - tries to load properties which were marked to not be loaded
# in particular request and fails since the session object is not
# present. That is why the code bellow makes a custom transformation.
result = {}
for key in data.__dict__:
if not key.startswith("_"):
result[key] = serialize_data(getattr(data, key))
return result
raise exceptions.DBException("Can not serialize %s" % data)
def serialize(fn):
def wrapper(*args, **kwargs):
result = fn(*args, **kwargs)
return serialize_data(result)
return wrapper
def _create_facade_lazily():
global _FACADE
if _FACADE is None:
_FACADE = db_session.EngineFacade.from_config(CONF)
return _FACADE
def get_engine():
facade = _create_facade_lazily()
return facade.get_engine()
def get_session(**kwargs):
facade = _create_facade_lazily()
return facade.get_session(**kwargs)
def get_backend():
"""The backend is this module itself."""
return Connection()
def _alembic_config():
path = os.path.join(os.path.dirname(__file__), "alembic.ini")
config = alembic_config.Config(path)
return config
class Connection(object):
def engine_reset(self):
global _FACADE
_FACADE = None
def schema_cleanup(self):
models.drop_db()
def schema_revision(self, config=None, engine=None, detailed=False):
"""Current database revision.
:param config: Instance of alembic config
:param engine: Instance of DB engine
:param detailed: whether to return a dict with detailed data
:rtype detailed: bool
:returns: Database revision
:rtype: string
:rtype: dict
"""
engine = engine or get_engine()
with engine.connect() as conn:
context = alembic_migration.MigrationContext.configure(conn)
revision = context.get_current_revision()
if detailed:
config = config or _alembic_config()
sc_dir = alembic_script.ScriptDirectory.from_config(config)
return {"revision": revision,
"current_head": sc_dir.get_current_head()}
return revision
def schema_upgrade(self, revision=None, config=None, engine=None):
"""Used for upgrading database.
:param revision: Desired database version
:type revision: string
:param config: Instance of alembic config
:param engine: Instance of DB engine
"""
revision = revision or "head"
config = config or _alembic_config()
engine = engine or get_engine()
if self.schema_revision() is None:
self.schema_stamp(INITIAL_REVISION_UUID, config=config)
alembic.command.upgrade(config, revision or "head")
def schema_create(self, config=None, engine=None):
"""Create database schema from models description.
Can be used for initial installation instead of upgrade('head').
:param config: Instance of alembic config
:param engine: Instance of DB engine
"""
engine = engine or get_engine()
# NOTE(viktors): If we will use metadata.create_all() for non empty db
# schema, it will only add the new tables, but leave
# existing as is. So we should avoid of this situation.
if self.schema_revision(engine=engine) is not None:
raise db_exc.DBMigrationError("DB schema is already under version"
" control. Use upgrade() instead")
models.BASE.metadata.create_all(engine)
self.schema_stamp("head", config=config)
def schema_stamp(self, revision, config=None):
"""Stamps database with provided revision.
Don't run any migrations.
:param revision: Should match one from repository or head - to stamp
database with most recent revision
:type revision: string
:param config: Instance of alembic config
"""
config = config or _alembic_config()
return alembic.command.stamp(config, revision=revision)
def model_query(self, model, session=None):
"""The helper method to create query.
:param model: The instance of
:class:`rally.common.db.sqlalchemy.models.RallyBase` to
request it.
:param session: Reuse the session object or get new one if it is
None.
:returns: The query object.
:raises Exception: when the model is not a sublcass of
:class:`rally.common.db.sqlalchemy.models.RallyBase`.
"""
def issubclassof_rally_base(obj):
return isinstance(obj, type) and issubclass(obj, models.RallyBase)
if not issubclassof_rally_base(model):
raise exceptions.DBException(
"The model %s should be a subclass of RallyBase" % model)
session = session or get_session()
return session.query(model)
def _tags_get(self, uuid, tag_type, session=None):
tags = (self.model_query(models.Tag, session=session).
filter_by(uuid=uuid, type=tag_type).all())
return list(set(t.tag for t in tags))
def _uuids_by_tags_get(self, tag_type, tags):
tags = (self.model_query(models.Tag).
filter(models.Tag.type == tag_type,
models.Tag.tag.in_(tags)).all())
return list(set(tag.uuid for tag in tags))
def _task_get(self, uuid, load_only=None, session=None):
pre_query = self.model_query(models.Task, session=session)
if load_only:
pre_query = pre_query.options(sa.orm.load_only(load_only))
task = pre_query.filter_by(uuid=uuid).first()
if not task:
raise exceptions.DBRecordNotFound(
criteria="uuid: %s" % uuid, table="tasks")
task.tags = sorted(self._tags_get(uuid, consts.TagType.TASK, session))
return task
def _task_workload_data_get_all(self, workload_uuid):
session = get_session()
with session.begin():
results = (self.model_query(models.WorkloadData, session=session).
filter_by(workload_uuid=workload_uuid).
order_by(models.WorkloadData.chunk_order.asc()))
return sorted([raw for workload_data in results
for raw in workload_data.chunk_data["raw"]],
key=lambda x: x["timestamp"])
@serialize
def task_get(self, uuid=None, detailed=False):
session = get_session()
task = serialize_data(self._task_get(uuid, session=session))
if detailed:
task["subtasks"] = self._subtasks_get_all_by_task_uuid(
uuid, session=session)
return task
@serialize
def task_get_status(self, uuid):
return self._task_get(uuid, load_only="status").status
@serialize
def task_create(self, values):
tags = values.pop("tags", None)
# TODO(ikhudoshyn): currently 'input_task'
# does not come in 'values'
# After completely switching to the new
# DB schema in API we should reconstruct
# input_task's from associated workloads
# the same is true for 'pass_sla',
# 'task_duration', 'validation_result'
# and 'validation_duration'
task = models.Task()
task.update(values)
task.save()
if tags:
for t in set(tags):
tag = models.Tag()
tag.update({"uuid": task.uuid,
"type": consts.TagType.TASK,
"tag": t})
tag.save()
task.tags = sorted(self._tags_get(task.uuid, consts.TagType.TASK))
return task
@serialize
def task_update(self, uuid, values):
session = get_session()
values.pop("uuid", None)
tags = values.pop("tags", None)
with session.begin():
task = self._task_get(uuid, session=session)
task.update(values)
if tags:
for t in set(tags):
tag = models.Tag()
tag.update({"uuid": task.uuid,
"type": consts.TagType.TASK,
"tag": t})
tag.save()
# take an updated instance of task
task = self._task_get(uuid, session=session)
return task
def task_update_status(self, uuid, statuses, status_value):
session = get_session()
result = (
session.query(models.Task).filter(
models.Task.uuid == uuid, models.Task.status.in_(
statuses)).
update({"status": status_value}, synchronize_session=False)
)
if not result:
raise exceptions.DBRecordNotFound(
criteria="uuid=%(uuid)s and status in [%(statuses)s]"
% {"uuid": uuid, "statuses": ", ".join(statuses)},
table="tasks")
return result
@serialize
def task_list(self, status=None, env=None, tags=None):
session = get_session()
tasks = []
with session.begin():
query = self.model_query(models.Task)
filters = {}
if status is not None:
filters["status"] = status
if env is not None:
filters["env_uuid"] = self.env_get(env)["uuid"]
if filters:
query = query.filter_by(**filters)
if tags:
uuids = self._uuids_by_tags_get(
consts.TagType.TASK, tags)
if not uuids:
return []
query = query.filter(models.Task.uuid.in_(uuids))
for task in query.all():
task.tags = sorted(
self._tags_get(task.uuid, consts.TagType.TASK, session))
tasks.append(task)
return tasks
def task_delete(self, uuid, status=None):
session = get_session()
with session.begin():
query = base_query = (self.model_query(models.Task).
filter_by(uuid=uuid))
if status is not None:
query = base_query.filter_by(status=status)
(self.model_query(models.WorkloadData).filter_by(task_uuid=uuid).
delete(synchronize_session=False))
(self.model_query(models.Workload).filter_by(task_uuid=uuid).
delete(synchronize_session=False))
(self.model_query(models.Subtask).filter_by(task_uuid=uuid).
delete(synchronize_session=False))
(self.model_query(models.Tag).filter_by(
uuid=uuid, type=consts.TagType.TASK).
delete(synchronize_session=False))
count = query.delete(synchronize_session=False)
if not count:
if status is not None:
task = base_query.first()
if task:
raise exceptions.DBConflict(
"Task `%(uuid)s` in `%(actual)s` status but "
"`%(require)s` is required."
% {"uuid": uuid,
"require": status, "actual": task.status})
raise exceptions.DBRecordNotFound(
criteria="uuid: %s" % uuid, table="tasks")
def _subtasks_get_all_by_task_uuid(self, task_uuid, session=None):
result = (self.model_query(models.Subtask, session=session).filter_by(
task_uuid=task_uuid).all())
subtasks = []
for subtask in result:
subtask = serialize_data(subtask)
subtask["workloads"] = []
workloads = (self.model_query(models.Workload, session=session).
filter_by(subtask_uuid=subtask["uuid"]).all())
for workload in workloads:
workload.data = self._task_workload_data_get_all(
workload.uuid)
subtask["workloads"].append(serialize_data(workload))
subtasks.append(subtask)
return subtasks
@serialize
def subtask_create(self, task_uuid, title, description=None,
contexts=None):
subtask = models.Subtask(task_uuid=task_uuid)
subtask.update({
"title": title,
"description": description or "",
"contexts": contexts or {},
})
subtask.save()
return subtask
@serialize
def subtask_update(self, subtask_uuid, values):
subtask = self.model_query(models.Subtask).filter_by(
uuid=subtask_uuid).first()
subtask.update(values)
subtask.save()
return subtask
@serialize
def workload_get(self, workload_uuid):
return self.model_query(models.Workload).filter_by(
uuid=workload_uuid).first()
@serialize
def workload_create(self, task_uuid, subtask_uuid, name, description,
position, runner, runner_type, hooks, contexts, sla,
args):
workload = models.Workload(task_uuid=task_uuid,
subtask_uuid=subtask_uuid,
name=name,
description=description,
position=position,
runner=runner,
runner_type=runner_type,
hooks=hooks,
contexts=contexts or {},
sla=sla,
args=args)
workload.save()
return workload
@serialize
def workload_data_create(self, task_uuid, workload_uuid, chunk_order,
data):
workload_data = models.WorkloadData(task_uuid=task_uuid,
workload_uuid=workload_uuid)
raw_data = data.get("raw", [])
iter_count = len(raw_data)
failed_iter_count = 0
started_at = float("inf")
finished_at = 0
for d in raw_data:
if d.get("error"):
failed_iter_count += 1
timestamp = d["timestamp"]
duration = d["duration"]
finished = timestamp + duration
if timestamp < started_at:
started_at = timestamp
if finished > finished_at:
finished_at = finished
now = time.time()
if started_at == float("inf"):
started_at = now
if finished_at == 0:
finished_at = now
workload_data.update({
"task_uuid": task_uuid,
"workload_uuid": workload_uuid,
"chunk_order": chunk_order,
"iteration_count": iter_count,
"failed_iteration_count": failed_iter_count,
"chunk_data": {"raw": raw_data},
# TODO(ikhudoshyn)
"chunk_size": 0,
"compressed_chunk_size": 0,
"started_at": dt.datetime.fromtimestamp(started_at),
"finished_at": dt.datetime.fromtimestamp(finished_at)
})
workload_data.save()
return workload_data
@serialize
def workload_set_results(self, workload_uuid, subtask_uuid, task_uuid,
load_duration, full_duration, start_time,
sla_results, hooks_results, contexts_results):
session = get_session()
with session.begin():
workload_results = self._task_workload_data_get_all(workload_uuid)
iter_count = len(workload_results)
failed_iter_count = 0
max_duration = None
min_duration = None
for d in workload_results:
if d.get("error"):
failed_iter_count += 1
duration = d.get("duration", 0)
if max_duration is None or duration > max_duration:
max_duration = duration
if min_duration is None or min_duration > duration:
min_duration = duration
durations_stat = charts.MainStatsTable(
{"total_iteration_count": iter_count})
for itr in workload_results:
durations_stat.add_iteration(itr)
sla = sla_results or []
# NOTE(ikhudoshyn): we call it 'pass_sla'
# for the sake of consistency with other models
# so if no SLAs were specified, then we assume pass_sla == True
success = all([s.get("success") for s in sla])
session.query(models.Workload).filter_by(
uuid=workload_uuid).update(
{
"sla_results": {"sla": sla},
"contexts_results": contexts_results,
"hooks": hooks_results or [],
"load_duration": load_duration,
"full_duration": full_duration,
"min_duration": min_duration,
"max_duration": max_duration,
"total_iteration_count": iter_count,
"failed_iteration_count": failed_iter_count,
"start_time": start_time,
"statistics": {"durations": durations_stat.to_dict()},
"pass_sla": success}
)
task_values = {
"task_duration": models.Task.task_duration + load_duration}
if not success:
task_values["pass_sla"] = False
subtask_values = {
"duration": models.Subtask.duration + load_duration}
if not success:
subtask_values["pass_sla"] = False
session.query(models.Task).filter_by(uuid=task_uuid).update(
task_values)
session.query(models.Subtask).filter_by(uuid=subtask_uuid).update(
subtask_values)
@serialize
def env_get(self, uuid_or_name):
env = (self.model_query(models.Env)
.filter(sa.or_(models.Env.uuid == uuid_or_name,
models.Env.name == uuid_or_name))
.first())
if not env:
raise exceptions.DBRecordNotFound(
criteria="uuid or name is %s" % uuid_or_name, table="envs")
return env
@serialize
def env_get_status(self, uuid):
resp = (self.model_query(models.Env)
.filter_by(uuid=uuid)
.options(sa.orm.load_only("status"))
.first())
if not resp:
raise exceptions.DBRecordNotFound(
criteria="uuid: %s" % uuid, table="envs")
return resp["status"]
@serialize
def env_list(self, status=None):
query = self.model_query(models.Env)
if status:
query = query.filter_by(status=status)
return query.all()
@serialize
def env_create(self, name, status, description, extras, config,
spec, platforms):
try:
env_uuid = models.UUID()
for p in platforms:
p["env_uuid"] = env_uuid
env = models.Env(
name=name, uuid=env_uuid,
status=status, description=description,
extras=extras, config=config, spec=spec
)
get_session().bulk_save_objects([env] + [
models.Platform(**p) for p in platforms
])
except db_exc.DBDuplicateEntry:
raise exceptions.DBRecordExists(
field="name", value=name, table="envs")
return self.env_get(env_uuid)
def env_rename(self, uuid, old_name, new_name):
try:
return bool(self.model_query(models.Env)
.filter_by(uuid=uuid, name=old_name)
.update({"name": new_name}))
except db_exc.DBDuplicateEntry:
raise exceptions.DBRecordExists(
field="name", value=new_name, table="envs")
def env_update(self, uuid, description=None, extras=None, config=None):
values = {}
if description is not None:
values["description"] = description
if extras is not None:
values["extras"] = extras
if config is not None:
values["config"] = config
if not values:
return True
return bool(self.model_query(models.Env)
.filter_by(uuid=uuid)
.update(values))
def env_set_status(self, uuid, old_status, new_status):
count = (self.model_query(models.Env)
.filter_by(uuid=uuid, status=old_status)
.update({"status": new_status}))
if count:
return True
raise exceptions.DBConflict(
"Env %s should be in status %s actual %s"
% (uuid, old_status, self.env_get_status(uuid)))
def env_delete_cascade(self, uuid):
session = get_session()
with session.begin():
(self.model_query(models.Task, session=session)
.filter_by(env_uuid=uuid)
.delete())
(self.model_query(models.Verification, session=session)
.filter_by(env_uuid=uuid)
.delete())
(self.model_query(models.Platform, session=session)
.filter_by(env_uuid=uuid)
.delete())
(self.model_query(models.Env, session=session)
.filter_by(uuid=uuid)
.delete())
@serialize
def platforms_list(self, env_uuid):
return (self.model_query(models.Platform)
.filter_by(env_uuid=env_uuid)
.all())
@serialize
def platform_get(self, uuid):
p = self.model_query(models.Platform).filter_by(uuid=uuid).first()
if not p:
raise exceptions.DBRecordNotFound(
criteria="uuid = %s" % uuid, table="platforms")
return p
def platform_set_status(self, uuid, old_status, new_status):
count = (self.model_query(models.Platform)
.filter_by(uuid=uuid, status=old_status)
.update({"status": new_status}))
if count:
return True
platform = self.platform_get(uuid)
raise exceptions.DBConflict(
"Platform %s should be in status %s actual %s"
% (uuid, old_status, platform["status"]))
def platform_set_data(self, uuid, platform_data=None, plugin_data=None):
values = {}
if platform_data is not None:
values["platform_data"] = platform_data
if plugin_data is not None:
values["plugin_data"] = plugin_data
if not values:
return True
return bool(self.model_query(models.Platform)
.filter_by(uuid=uuid)
.update(values))
@serialize
def verifier_create(self, name, vtype, platform, source, version,
system_wide, extra_settings=None):
verifier = models.Verifier()
properties = {"name": name, "type": vtype, "platform": platform,
"source": source, "extra_settings": extra_settings,
"version": version, "system_wide": system_wide}
verifier.update(properties)
verifier.save()
return verifier
@serialize
def verifier_get(self, verifier_id):
return self._verifier_get(verifier_id)
def _verifier_get(self, verifier_id, session=None):
verifier = self.model_query(
models.Verifier, session=session).filter(
sa.or_(models.Verifier.name == verifier_id,
models.Verifier.uuid == verifier_id)).first()
if not verifier:
raise exceptions.DBRecordNotFound(
criteria="name or uuid is %s" % verifier_id, table="verifiers")
return verifier
@serialize
def verifier_list(self, status=None):
query = self.model_query(models.Verifier)
if status:
query = query.filter_by(status=status)
return query.all()
def verifier_delete(self, verifier_id):
session = get_session()
with session.begin():
query = self.model_query(
models.Verifier, session=session).filter(
sa.or_(models.Verifier.name == verifier_id,
models.Verifier.uuid == verifier_id))
count = query.delete(synchronize_session=False)
if not count:
raise exceptions.DBRecordNotFound(
criteria="name or uuid is %s" % verifier_id,
table="verifiers")
@serialize
def verifier_update(self, verifier_id, properties):
session = get_session()
with session.begin():
verifier = self._verifier_get(verifier_id)
verifier.update(properties)
verifier.save()
return verifier
@serialize
def verification_create(self, verifier_id, env, tags=None,
run_args=None):
verifier = self._verifier_get(verifier_id)
env = self.env_get(env)
verification = models.Verification()
verification.update({"verifier_uuid": verifier.uuid,
"env_uuid": env["uuid"],
"run_args": run_args})
verification.save()
if tags:
for t in set(tags):
tag = models.Tag()
tag.update({"uuid": verification.uuid,
"type": consts.TagType.VERIFICATION,
"tag": t})
tag.save()
return verification
@serialize
def verification_get(self, verification_uuid):
verification = self._verification_get(verification_uuid)
verification.tags = sorted(self._tags_get(verification.uuid,
consts.TagType.VERIFICATION))
return verification
def _verification_get(self, verification_uuid, session=None):
verification = self.model_query(
models.Verification, session=session).filter_by(
uuid=verification_uuid).first()
if not verification:
raise exceptions.DBRecordNotFound(
criteria="uuid: %s" % verification_uuid, table="verifications")
return verification
@serialize
def verification_list(self, verifier_id=None, env=None,
tags=None, status=None):
session = get_session()
with session.begin():
filter_by = {}
if verifier_id:
verifier = self._verifier_get(verifier_id, session=session)
filter_by["verifier_uuid"] = verifier.uuid
if env:
env = self.env_get(env)
filter_by["env_uuid"] = env["uuid"]
if status:
filter_by["status"] = status
query = self.model_query(models.Verification, session=session)
if filter_by:
query = query.filter_by(**filter_by)
def add_tags_to_verifications(verifications):
for verification in verifications:
verification.tags = sorted(self._tags_get(
verification.uuid, consts.TagType.VERIFICATION))
return verifications
if tags:
uuids = self._uuids_by_tags_get(
consts.TagType.VERIFICATION, tags)
query = query.filter(models.Verification.uuid.in_(uuids))
return add_tags_to_verifications(query.all())
def verification_delete(self, verification_uuid):
session = get_session()
with session.begin():
count = self.model_query(
models.Verification, session=session).filter_by(
uuid=verification_uuid).delete(synchronize_session=False)
if not count:
raise exceptions.DBRecordNotFound(
criteria="uuid: %s" % verification_uuid, table="verifications")
@serialize
def verification_update(self, verification_uuid, properties):
session = get_session()
with session.begin():
verification = self._verification_get(verification_uuid)
verification.update(properties)
verification.save()
return verification

View File

@ -15,7 +15,7 @@
from pbr import version as pbr_version
from rally.common.db import api
from rally.common.db import schema
RALLY_VENDOR = "OpenStack Foundation"
RALLY_PRODUCT = "OpenStack Rally"
@ -30,4 +30,4 @@ def version_string():
def database_revision():
return api.schema_revision(detailed=True)
return schema.schema_revision(detailed=True)

View File

@ -70,6 +70,10 @@ class DBException(RallyException):
msg_fmt = "DB Exception: '%(message)s'"
class DBMigrationError(DBException):
msg_fmt = "DB Migration Error: '%(message)s'"
class DBConflict(RallyException):
error_code = 409
msg_fmt = "DB Conflict. %(message)s"

View File

@ -41,6 +41,7 @@ baseline_missing=$(awk 'END { print $3 }' $baseline_report)
# Checkout back and save coverage report
git checkout -
git clean -f -d
current_report=$(mktemp -t rally_coverageXXXXXXX)
py.test --cov=rally tests/unit/ --cov-report=html -n auto

View File

@ -29,47 +29,47 @@ class DBCommandsTestCase(test.TestCase):
self.fake_api = fakes.FakeAPI()
@mock.patch("rally.cli.commands.db.envutils")
@mock.patch("rally.cli.commands.db.db")
def test_recreate(self, mock_db, mock_envutils):
@mock.patch("rally.cli.commands.db.db.schema")
def test_recreate(self, mock_db_schema, mock_envutils):
self.db_commands.recreate(self.fake_api)
db_calls = [mock.call.schema_cleanup(),
mock.call.schema_create()]
self.assertEqual(db_calls, mock_db.mock_calls)
self.assertEqual(db_calls, mock_db_schema.mock_calls)
envutils_calls = [mock.call.clear_env()]
self.assertEqual(envutils_calls, mock_envutils.mock_calls)
@mock.patch("rally.cli.commands.db.db")
def test_create(self, mock_db):
@mock.patch("rally.cli.commands.db.db.schema")
def test_create(self, mock_db_schema):
self.db_commands.create(self.fake_api)
calls = [mock.call.schema_create()]
self.assertEqual(calls, mock_db.mock_calls)
self.assertEqual(calls, mock_db_schema.mock_calls)
@mock.patch("rally.cli.commands.db.db")
def test_ensure_create(self, mock_db):
mock_db.schema_revision.return_value = None
@mock.patch("rally.cli.commands.db.db.schema")
def test_ensure_create(self, mock_db_schema):
mock_db_schema.schema_revision.return_value = None
self.db_commands.ensure(self.fake_api)
calls = [mock.call.schema_revision(),
mock.call.schema_create()]
self.assertEqual(calls, mock_db.mock_calls)
self.assertEqual(calls, mock_db_schema.mock_calls)
@mock.patch("rally.cli.commands.db.db")
def test_ensure_exists(self, mock_db):
mock_db.schema_revision.return_value = "revision"
@mock.patch("rally.cli.commands.db.db.schema")
def test_ensure_exists(self, mock_db_schema):
mock_db_schema.schema_revision.return_value = "revision"
self.db_commands.ensure(self.fake_api)
calls = [mock.call.schema_revision()]
self.assertEqual(calls, mock_db.mock_calls)
self.assertEqual(calls, mock_db_schema.mock_calls)
@mock.patch("rally.cli.commands.db.db")
def test_upgrade(self, mock_db):
@mock.patch("rally.cli.commands.db.db.schema")
def test_upgrade(self, mock_db_schema):
self.db_commands.upgrade(self.fake_api)
calls = [mock.call.schema_upgrade()]
mock_db.assert_has_calls(calls)
mock_db_schema.assert_has_calls(calls)
@mock.patch("rally.cli.commands.db.db")
def test_revision(self, mock_db):
@mock.patch("rally.cli.commands.db.db.schema")
def test_revision(self, mock_db_schema):
self.db_commands.revision(self.fake_api)
calls = [mock.call.schema_revision()]
mock_db.assert_has_calls(calls)
mock_db_schema.assert_has_calls(calls)
@mock.patch("rally.cli.commands.db.print")
@mock.patch("rally.cli.commands.db.cfg.CONF.database")

View File

@ -15,8 +15,10 @@
"""Tests for db.api layer."""
import collections
import datetime as dt
import ddt
import mock
from six import moves
@ -28,10 +30,70 @@ from tests.unit import test
NOW = dt.datetime.now()
class FakeSerializable(db.models.RallyBase):
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
@ddt.ddt
class SerializeTestCase(test.DBTestCase):
@ddt.data(
{"data": 1, "serialized": 1},
{"data": 1.1, "serialized": 1.1},
{"data": "a string", "serialized": "a string"},
{"data": NOW, "serialized": NOW},
{"data": {"k1": 1, "k2": 2}, "serialized": {"k1": 1, "k2": 2}},
{"data": [1, "foo"], "serialized": [1, "foo"]},
{"data": ["foo", 1, {"a": "b"}], "serialized": ["foo", 1, {"a": "b"}]},
{"data": FakeSerializable(a=1), "serialized": {"a": 1}},
{"data": [FakeSerializable(a=1),
FakeSerializable(b=FakeSerializable(c=1))],
"serialized": [{"a": 1}, {"b": {"c": 1}}]},
)
@ddt.unpack
def test_serialize(self, data, serialized):
@db.serialize
def fake_method():
return data
results = fake_method()
self.assertEqual(serialized, results)
def test_serialize_ordered_dict(self):
data = collections.OrderedDict([(1, 2), ("foo", "bar"), (2, 3)])
serialized = db.serialize_data(data)
self.assertIsInstance(serialized, collections.OrderedDict)
self.assertEqual([1, "foo", 2], list(serialized.keys()))
self.assertEqual([2, "bar", 3], list(serialized.values()))
def test_serialize_value_error(self):
@db.serialize
def fake_method():
class Fake(object):
pass
return Fake()
self.assertRaises(exceptions.DBException, fake_method)
class ModelQueryTestCase(test.DBTestCase):
def test_model_query_wrong_model(self):
class Foo(object):
pass
self.assertRaises(exceptions.DBException, db.model_query, Foo)
class ConnectionTestCase(test.DBTestCase):
def test_schema_revision(self):
rev = db.schema_revision()
drev = db.schema_revision(detailed=True)
rev = db.schema.schema_revision()
drev = db.schema.schema_revision(detailed=True)
self.assertEqual(drev["revision"], rev)
self.assertEqual(drev["revision"], drev["current_head"])

View File

@ -28,14 +28,12 @@ import jsonschema
import mock
from oslo_db.sqlalchemy import test_migrations
from oslo_db.sqlalchemy import utils as db_utils
from oslo_utils import timeutils
import six
import sqlalchemy as sa
import rally
from rally.common import db
from rally.common.db.sqlalchemy import api
from rally.common.db.sqlalchemy import models
from rally.common.db import models
from rally import consts
from tests.unit.common.db import test_migrations_base
from tests.unit import test as rtest
@ -110,18 +108,18 @@ class MigrationTestCase(rtest.DBTestCase,
def setUp(self):
# we change DB metadata in tests so we reload
# models to refresh the metadata to it's original state
six.moves.reload_module(rally.common.db.sqlalchemy.models)
six.moves.reload_module(rally.common.db.models)
super(MigrationTestCase, self).setUp()
self.alembic_config = api._alembic_config()
self.engine = api.get_engine()
self.alembic_config = db.schema._alembic_config()
self.engine = db.get_engine()
# remove everything from DB and stamp it as 'base'
# so that migration (i.e. upgrade up to 'head')
# will actually take place
db.schema_cleanup()
db.schema_stamp("base")
db.schema.schema_cleanup()
db.schema.schema_stamp("base")
def db_sync(self, engine):
db.schema_upgrade()
db.schema.schema_upgrade()
def get_engine(self):
return self.engine
@ -160,10 +158,10 @@ class MigrationTestCase(rtest.DBTestCase,
return diff
@mock.patch("rally.common.db.sqlalchemy.api.Connection.schema_stamp")
def test_models_sync(self, mock_connection_schema_stamp):
@mock.patch("rally.common.db.schema.schema_stamp")
def test_models_sync(self, mock_schema_stamp):
# drop all tables after a test run
self.addCleanup(db.schema_cleanup)
self.addCleanup(db.schema.schema_cleanup)
# run migration scripts
self.db_sync(self.get_engine())
@ -174,11 +172,11 @@ class MigrationTestCase(rtest.DBTestCase,
self.fail(
"Models and migration scripts aren't in sync:\n%s" % msg)
@mock.patch("rally.common.db.sqlalchemy.api.Connection.schema_stamp")
@mock.patch("rally.common.db.schema.schema_stamp")
def test_models_sync_negative__missing_table_in_script(
self, mock_connection_schema_stamp):
self, mock_schema_stamp):
# drop all tables after a test run
self.addCleanup(db.schema_cleanup)
self.addCleanup(db.schema.schema_cleanup)
self._create_fake_model("fake_model")
@ -193,11 +191,11 @@ class MigrationTestCase(rtest.DBTestCase,
self.assertIsInstance(object, sa.Table)
self.assertEqual("fake_model", object.name)
@mock.patch("rally.common.db.sqlalchemy.api.Connection.schema_stamp")
@mock.patch("rally.common.db.schema.schema_stamp")
def test_models_sync_negative__missing_model_in_metadata(
self, mock_connection_schema_stamp):
self, mock_schema_stamp):
# drop all tables after a test run
self.addCleanup(db.schema_cleanup)
self.addCleanup(db.schema.schema_cleanup)
table = self.get_metadata().tables["tags"]
self.get_metadata().remove(table)
@ -220,7 +218,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
def setUp(self):
super(MigrationWalkTestCase, self).setUp()
self.engine = api.get_engine()
self.engine = db.get_engine()
def assertColumnExists(self, engine, table, column):
t = db_utils.get_table(engine, table)
@ -265,7 +263,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
def _check_3177d36ea270(self, engine, data):
self.assertEqual(
"3177d36ea270", api.get_backend().schema_revision(engine=engine))
"3177d36ea270", db.schema.schema_revision(engine=engine))
self.assertColumnExists(engine, "deployments", "credentials")
self.assertColumnNotExists(engine, "deployments", "admin")
self.assertColumnNotExists(engine, "deployments", "users")
@ -413,7 +411,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
def _check_54e844ebfbc3(self, engine, data):
self.assertEqual("54e844ebfbc3",
api.get_backend().schema_revision(engine=engine))
db.schema.schema_revision(engine=engine))
original_deployments = self._54e844ebfbc3_deployments
@ -503,7 +501,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
def _check_08e1515a576c(self, engine, data):
self.assertEqual("08e1515a576c",
api.get_backend().schema_revision(engine=engine))
db.schema.schema_revision(engine=engine))
tasks = self._08e1515a576c_logs
@ -552,8 +550,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
task_table.insert(),
[{
"uuid": self._e654a0648db0_task_uuid,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"status": consts.TaskStatus.FINISHED,
"verification_log": json.dumps({}),
"tag": "test_tag",
@ -565,8 +563,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
taskresult_table.insert(), [
{
"task_uuid": self._e654a0648db0_task_uuid,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"key": json.dumps({
"name": "test_scenario",
"pos": 0,
@ -593,7 +591,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
def _check_e654a0648db0(self, engine, data):
self.assertEqual(
"e654a0648db0", api.get_backend().schema_revision(engine=engine))
"e654a0648db0", db.schema.schema_revision(engine=engine))
task_table = db_utils.get_table(engine, "tasks")
subtask_table = db_utils.get_table(engine, "subtasks")
@ -817,7 +815,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
def _check_6ad4f426f005(self, engine, data):
self.assertEqual("6ad4f426f005",
api.get_backend().schema_revision(engine=engine))
db.schema.schema_revision(engine=engine))
deployment_table = db_utils.get_table(engine, "deployments")
task_table = db_utils.get_table(engine, "tasks")
@ -899,7 +897,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
def _check_32fada9b2fde(self, engine, data):
self.assertEqual("32fada9b2fde",
api.get_backend().schema_revision(engine=engine))
db.schema.schema_revision(engine=engine))
original_deployments = self._32fada9b2fde_deployments
@ -1018,7 +1016,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
def _check_484cd9413e66(self, engine, data):
self.assertEqual("484cd9413e66",
api.get_backend().schema_revision(engine=engine))
db.schema.schema_revision(engine=engine))
verifications_table = db_utils.get_table(engine, "verifications")
@ -1152,7 +1150,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
def _check_37fdbb373e8d(self, engine, data):
self.assertEqual("37fdbb373e8d",
api.get_backend().schema_revision(engine=engine))
db.schema.schema_revision(engine=engine))
verifications_table = db_utils.get_table(engine, "verifications")
with engine.connect() as conn:
@ -1216,7 +1214,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
def _check_a6f364988fc2(self, engine, data):
self.assertEqual("a6f364988fc2",
api.get_backend().schema_revision(engine=engine))
db.schema.schema_revision(engine=engine))
tags_table = db_utils.get_table(engine, "tags")
with engine.connect() as conn:
@ -1286,7 +1284,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
def _check_f33f4610dcda(self, engine, data):
self.assertEqual("f33f4610dcda",
api.get_backend().schema_revision(engine=engine))
db.schema.schema_revision(engine=engine))
verifications_table = db_utils.get_table(engine, "verifications")
with engine.connect() as conn:
@ -1380,7 +1378,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
def _check_4ef544102ba7(self, engine, data):
self.assertEqual("4ef544102ba7",
api.get_backend().schema_revision(engine=engine))
db.schema.schema_revision(engine=engine))
org_tasks = self.tasks
@ -1521,8 +1519,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
task_table.insert(),
[{
"uuid": self._35fe16d4ab1c_task_uuid,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"status": consts.TaskStatus.FINISHED,
"validation_result": six.b(json.dumps({})),
"deployment_uuid": deployment_uuid
@ -1534,8 +1532,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
subtask_table.insert(),
[{
"uuid": subtask_id,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"task_uuid": self._35fe16d4ab1c_task_uuid,
"context": six.b(json.dumps([])),
"sla": six.b(json.dumps([])),
@ -1550,8 +1548,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
"name": "foo",
"task_uuid": self._35fe16d4ab1c_task_uuid,
"subtask_uuid": subtask_id,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"position": 0,
"runner": "",
"runner_type": "",
@ -1642,8 +1640,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
task_table.insert(),
[{
"uuid": self._7948b83229f6_task_uuid,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"status": consts.TaskStatus.FINISHED,
"validation_result": six.b(json.dumps({})),
"deployment_uuid": self._7948b83229f6_deployment_uuid
@ -1654,8 +1652,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
subtask_table.insert(),
[{
"uuid": subtask_uuid,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"task_uuid": self._7948b83229f6_task_uuid,
"context": six.b(json.dumps([])),
"sla": six.b(json.dumps([])),
@ -1671,8 +1669,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
"name": "foo",
"task_uuid": self._7948b83229f6_task_uuid,
"subtask_uuid": subtask_uuid,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"position": 0,
"runner": "",
"runner_type": "",
@ -1694,10 +1692,10 @@ class MigrationWalkTestCase(rtest.DBTestCase,
wdata_table.insert(),
[{
"uuid": str(uuid.uuid4()),
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"started_at": timeutils.utcnow(),
"finished_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"started_at": dt.datetime.utcnow(),
"finished_at": dt.datetime.utcnow(),
"task_uuid": self._7948b83229f6_task_uuid,
"workload_uuid": w_uuid,
"chunk_order": 0,
@ -1796,8 +1794,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
task_table.insert(),
[{
"uuid": self._046a38742e89_task_uuid,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"status": consts.TaskStatus.FINISHED,
"validation_result": six.b(json.dumps({})),
"deployment_uuid": self._046a38742e89_deployment_uuid
@ -1808,8 +1806,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
subtask_table.insert(),
[{
"uuid": subtask_uuid,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"task_uuid": self._046a38742e89_task_uuid,
"context": six.b(json.dumps([])),
"sla": six.b(json.dumps([])),
@ -1825,8 +1823,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
"name": "foo",
"task_uuid": self._046a38742e89_task_uuid,
"subtask_uuid": subtask_uuid,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"position": 0,
"runner": json.dumps(workload["runner"]),
"runner_type": "",
@ -2051,8 +2049,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
task_table.insert(),
[{
"uuid": task_uuid,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"status": consts.TaskStatus.FINISHED,
"validation_result": six.b(json.dumps({})),
"deployment_uuid": self._4394bdc32cfd_deployment_uuid
@ -2063,8 +2061,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
subtask_table.insert(),
[{
"uuid": self._4394bdc32cfd_subtask,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"task_uuid": task_uuid,
"context": six.b(json.dumps([])),
"sla": six.b(json.dumps([])),
@ -2080,8 +2078,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
"name": "foo",
"task_uuid": task_uuid,
"subtask_uuid": self._4394bdc32cfd_subtask,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"position": 0,
"runner": "",
"runner_type": "",
@ -2100,10 +2098,10 @@ class MigrationWalkTestCase(rtest.DBTestCase,
wdata_table.insert(),
[{
"uuid": str(uuid.uuid4()),
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"started_at": timeutils.utcnow(),
"finished_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"started_at": dt.datetime.utcnow(),
"finished_at": dt.datetime.utcnow(),
"task_uuid": task_uuid,
"workload_uuid": workload["uuid"],
"chunk_order": 0,
@ -2185,7 +2183,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
self._dc46687661df_workloads = {
str(uuid.uuid4()): {
"start_time": None,
"created_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"context": {"users": {"tenants": 3}},
"full_duration": 5,
"load_duration": 3
@ -2225,8 +2223,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
task_table.insert(),
[{
"uuid": self._dc46687661df_task_uuid,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"status": consts.TaskStatus.FINISHED,
"validation_result": six.b(json.dumps({})),
"deployment_uuid": self._046a38742e89_deployment_uuid
@ -2237,8 +2235,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
subtask_table.insert(),
[{
"uuid": subtask_uuid,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"task_uuid": self._dc46687661df_task_uuid,
"context": six.b(json.dumps([])),
"sla": six.b(json.dumps([])),
@ -2253,7 +2251,7 @@ class MigrationWalkTestCase(rtest.DBTestCase,
"task_uuid": self._dc46687661df_task_uuid,
"subtask_uuid": subtask_uuid,
"created_at": w["created_at"],
"updated_at": timeutils.utcnow(),
"updated_at": dt.datetime.utcnow(),
"position": 0,
"runner": six.b(json.dumps([])),
"runner_type": "",
@ -2403,8 +2401,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
task_table.insert(),
[{
"uuid": self._7287df262dbc_task_uuid,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"status": consts.TaskStatus.FINISHED,
"validation_result": six.b(json.dumps({})),
"deployment_uuid": self._7287df262dbc_deployments[0][0]
@ -2417,8 +2415,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
"uuid": self._7287df262dbc_verifier_uuid,
"name": str(uuid.uuid4()),
"type": str(uuid.uuid4()),
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
"status": consts.VerifierStatus.INIT
}]
)
@ -2430,8 +2428,8 @@ class MigrationWalkTestCase(rtest.DBTestCase,
"deployment_uuid": self._7287df262dbc_deployments[0][0],
"verifier_uuid": self._7287df262dbc_verifier_uuid,
"status": consts.VerificationStatus.INIT,
"created_at": timeutils.utcnow(),
"updated_at": timeutils.utcnow(),
"created_at": dt.datetime.utcnow(),
"updated_at": dt.datetime.utcnow(),
}]
)

View File

@ -30,7 +30,7 @@ from alembic import migration
from alembic import script as alembic_script
from rally.common import cfg
import rally.common.db.sqlalchemy.api as s_api
from rally.common import db
from rally.common import logging
LOG = logging.getLogger(__name__)
@ -40,7 +40,7 @@ CONF = cfg.CONF
class BaseWalkMigrationMixin(object):
ALEMBIC_CONFIG = alembic_config.Config(
os.path.join(os.path.dirname(s_api.__file__), "alembic.ini")
os.path.join(os.path.dirname(db.schema.__file__), "alembic.ini")
)
ALEMBIC_CONFIG.rally_config = CONF
@ -82,7 +82,7 @@ class BaseWalkMigrationMixin(object):
env = alembic_script.ScriptDirectory.from_config(self.ALEMBIC_CONFIG)
versions = []
for rev in env.walk_revisions():
if rev.revision == s_api.INITIAL_REVISION_UUID:
if rev.revision == db.schema.INITIAL_REVISION_UUID:
# NOTE(rpromyshlennikov): we skip initial migration here
continue
versions.append((rev.revision, rev.down_revision or "-1"))
@ -103,7 +103,7 @@ class BaseWalkMigrationMixin(object):
# NOTE(ikhudoshyn): Now DB contains certain schema
# so we can not execute all migrations starting from
# init. So we cleanup the DB.
s_api.get_backend().schema_cleanup()
db.schema.schema_cleanup()
up_and_down_versions = self._up_and_down_versions()
for ver_up, ver_down in up_and_down_versions:
self._migrate_up(engine, ver_up, with_data=True)

View File

@ -1,91 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# NOTE(andreykurilin): most tests for sqlalchemy api is merged with db_api
# tests. Hope, it will be fixed someday.
import collections
import datetime as dt
import ddt
from rally.common.db.sqlalchemy import api as db_api
from rally import exceptions
from tests.unit import test
NOW = dt.datetime.now()
class FakeSerializable(object):
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
def _as_dict(self):
return self.__dict__
@ddt.ddt
class SerializeTestCase(test.DBTestCase):
@ddt.data(
{"data": 1, "serialized": 1},
{"data": 1.1, "serialized": 1.1},
{"data": "a string", "serialized": "a string"},
{"data": NOW, "serialized": NOW},
{"data": {"k1": 1, "k2": 2}, "serialized": {"k1": 1, "k2": 2}},
{"data": [1, "foo"], "serialized": [1, "foo"]},
{"data": ["foo", 1, {"a": "b"}], "serialized": ["foo", 1, {"a": "b"}]},
{"data": FakeSerializable(a=1), "serialized": {"a": 1}},
{"data": [FakeSerializable(a=1),
FakeSerializable(b=FakeSerializable(c=1))],
"serialized": [{"a": 1}, {"b": {"c": 1}}]},
)
@ddt.unpack
def test_serialize(self, data, serialized):
@db_api.serialize
def fake_method():
return data
results = fake_method()
self.assertEqual(serialized, results)
def test_serialize_ordered_dict(self):
data = collections.OrderedDict([(1, 2), ("foo", "bar"), (2, 3)])
serialized = db_api.serialize_data(data)
self.assertIsInstance(serialized, collections.OrderedDict)
self.assertEqual([1, "foo", 2], list(serialized.keys()))
self.assertEqual([2, "bar", 3], list(serialized.values()))
def test_serialize_value_error(self):
@db_api.serialize
def fake_method():
class Fake(object):
pass
return Fake()
self.assertRaises(exceptions.DBException, fake_method)
class ModelQueryTestCase(test.DBTestCase):
def test_model_query_wrong_model(self):
class Foo(object):
pass
self.assertRaises(exceptions.DBException,
db_api.Connection().model_query, Foo)

View File

@ -17,29 +17,29 @@
import mock
import sqlalchemy as sa
from rally.common.db.sqlalchemy import types
from rally.common.db import sa_types
from tests.unit import test
class JsonEncodedTest(test.TestCase):
def test_impl(self):
self.assertEqual(sa.Text, types.JSONEncodedDict.impl)
self.assertEqual(sa.Text, types.JSONEncodedList.impl)
self.assertEqual(sa.Text, types.MutableJSONEncodedDict.impl)
self.assertEqual(sa.Text, types.MutableJSONEncodedList.impl)
self.assertEqual(sa.Text, sa_types.JSONEncodedDict.impl)
self.assertEqual(sa.Text, sa_types.JSONEncodedList.impl)
self.assertEqual(sa.Text, sa_types.MutableJSONEncodedDict.impl)
self.assertEqual(sa.Text, sa_types.MutableJSONEncodedList.impl)
def test_process_bind_param(self):
t = types.JSONEncodedDict()
t = sa_types.JSONEncodedDict()
self.assertEqual("{\"a\": 1}", t.process_bind_param({"a": 1}, None))
def test_process_bind_param_none(self):
t = types.JSONEncodedDict()
t = sa_types.JSONEncodedDict()
self.assertIsNone(t.process_bind_param(None, None))
def test_process_result_value(self):
t = types.JSONEncodedDict()
t = sa_types.JSONEncodedDict()
self.assertEqual({"a": 1}, t.process_result_value("{\"a\": 1}", None))
t = types.JSONEncodedList()
t = sa_types.JSONEncodedList()
self.assertEqual([[2, 1], [1, 2]], t.process_result_value(
"[[2, 1], [1, 2]]", None))
with mock.patch("json.loads") as mock_json_loads:
@ -47,46 +47,46 @@ class JsonEncodedTest(test.TestCase):
mock_json_loads.asser_called_once_with([(2, 1), (1, 2)])
def test_process_result_value_none(self):
t = types.JSONEncodedDict()
t = sa_types.JSONEncodedDict()
self.assertIsNone(t.process_result_value(None, None))
t = types.JSONEncodedList()
t = sa_types.JSONEncodedList()
self.assertIsNone(t.process_result_value(None, None))
class MutableDictTest(test.TestCase):
def test_creation(self):
sample = {"a": 1, "b": 2}
d = types.MutableDict(sample)
d = sa_types.MutableDict(sample)
self.assertEqual(sample, d)
def test_coerce_dict(self):
sample = {"a": 1, "b": 2}
md = types.MutableDict.coerce("test", sample)
md = sa_types.MutableDict.coerce("test", sample)
self.assertEqual(sample, md)
self.assertIsInstance(md, types.MutableDict)
self.assertIsInstance(md, sa_types.MutableDict)
def test_coerce_mutable_dict(self):
sample = {"a": 1, "b": 2}
sample_md = types.MutableDict(sample)
md = types.MutableDict.coerce("test", sample_md)
sample_md = sa_types.MutableDict(sample)
md = sa_types.MutableDict.coerce("test", sample_md)
self.assertEqual(sample, md)
self.assertIs(sample_md, md)
def test_coerce_unsupported(self):
self.assertRaises(ValueError, types.MutableDict.coerce, "test", [])
self.assertRaises(ValueError, sa_types.MutableDict.coerce, "test", [])
@mock.patch.object(types.MutableDict, "changed")
@mock.patch.object(sa_types.MutableDict, "changed")
def test_changed_on_setitem(self, mock_mutable_dict_changed):
sample = {"a": 1, "b": 2}
d = types.MutableDict(sample)
d = sa_types.MutableDict(sample)
d["b"] = 3
self.assertEqual({"a": 1, "b": 3}, d)
self.assertEqual(1, mock_mutable_dict_changed.call_count)
@mock.patch.object(types.MutableDict, "changed")
@mock.patch.object(sa_types.MutableDict, "changed")
def test_changed_on_delitem(self, mock_mutable_dict_changed):
sample = {"a": 1, "b": 2}
d = types.MutableDict(sample)
d = sa_types.MutableDict(sample)
del d["b"]
self.assertEqual({"a": 1}, d)
self.assertEqual(1, mock_mutable_dict_changed.call_count)
@ -95,45 +95,45 @@ class MutableDictTest(test.TestCase):
class MutableListTest(test.TestCase):
def test_creation(self):
sample = [1, 2, 3]
d = types.MutableList(sample)
d = sa_types.MutableList(sample)
self.assertEqual(sample, d)
def test_coerce_list(self):
sample = [1, 2, 3]
md = types.MutableList.coerce("test", sample)
md = sa_types.MutableList.coerce("test", sample)
self.assertEqual(sample, md)
self.assertIsInstance(md, types.MutableList)
self.assertIsInstance(md, sa_types.MutableList)
def test_coerce_mutable_list(self):
sample = [1, 2, 3]
sample_md = types.MutableList(sample)
md = types.MutableList.coerce("test", sample_md)
sample_md = sa_types.MutableList(sample)
md = sa_types.MutableList.coerce("test", sample_md)
self.assertEqual(sample, md)
self.assertIs(sample_md, md)
def test_coerce_unsupported(self):
self.assertRaises(ValueError, types.MutableList.coerce, "test", {})
self.assertRaises(ValueError, sa_types.MutableList.coerce, "test", {})
@mock.patch.object(types.MutableList, "changed")
@mock.patch.object(sa_types.MutableList, "changed")
def test_changed_on_append(self, mock_mutable_list_changed):
sample = [1, 2, 3]
lst = types.MutableList(sample)
lst = sa_types.MutableList(sample)
lst.append(4)
self.assertEqual([1, 2, 3, 4], lst)
self.assertEqual(1, mock_mutable_list_changed.call_count)
@mock.patch.object(types.MutableList, "changed")
@mock.patch.object(sa_types.MutableList, "changed")
def test_changed_on_setitem(self, mock_mutable_list_changed):
sample = [1, 2, 3]
lst = types.MutableList(sample)
lst = sa_types.MutableList(sample)
lst[2] = 4
self.assertEqual([1, 2, 4], lst)
self.assertEqual(1, mock_mutable_list_changed.call_count)
@mock.patch.object(types.MutableList, "changed")
@mock.patch.object(sa_types.MutableList, "changed")
def test_changed_on_delitem(self, mock_mutable_list_changed):
sample = [1, 2, 3]
lst = types.MutableList(sample)
lst = sa_types.MutableList(sample)
del lst[2]
self.assertEqual([1, 2], lst)
self.assertEqual(1, mock_mutable_list_changed.call_count)
@ -141,19 +141,19 @@ class MutableListTest(test.TestCase):
class TimeStampTestCase(test.TestCase):
def test_process_bind_param(self):
self.assertIsNone(types.TimeStamp().process_bind_param(
self.assertIsNone(sa_types.TimeStamp().process_bind_param(
None, dialect=None))
self.assertEqual(
1498561749348996,
types.TimeStamp().process_bind_param(1498561749.348996,
dialect=None))
sa_types.TimeStamp().process_bind_param(1498561749.348996,
dialect=None))
def test_process_result_value(self):
self.assertIsNone(types.TimeStamp().process_result_value(
self.assertIsNone(sa_types.TimeStamp().process_result_value(
None, dialect=None))
self.assertEqual(
1498561749.348996,
types.TimeStamp().process_result_value(1498561749348996,
dialect=None))
sa_types.TimeStamp().process_result_value(1498561749348996,
dialect=None))

View File

@ -32,7 +32,7 @@ class ModuleTestCase(test.TestCase):
mock_version_info.semantic_version.return_value = mock_sv
self.assertEqual("foo_version", version.version_string())
@mock.patch("rally.common.db.api.schema_revision", return_value="foo")
@mock.patch("rally.common.db.schema.schema_revision", return_value="foo")
def test_database_revision(self, mock_schema_revision):
self.assertEqual("foo", version.database_revision())
mock_schema_revision.assert_called_once_with(detailed=True)

View File

@ -33,8 +33,8 @@ class DatabaseFixture(cfg.fixture.Config):
db_url = os.environ.get("RALLY_UNITTEST_DB_URL", "sqlite://")
db.engine_reset()
self.conf.set_default("connection", db_url, group="database")
db.schema_cleanup()
db.schema_create()
db.schema.schema_cleanup()
db.schema.schema_create()
class TestCase(base.BaseTestCase):