zun/zun/tests/unit/db/base.py

113 lines
3.8 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Zun DB test base class."""
import fixtures
import zun.conf
from zun.db import api as db_api
from zun.db.sqlalchemy import api as sqla_api
from zun.db.sqlalchemy import migration
from zun.db.sqlalchemy import models
from zun.tests import base
CONF = zun.conf.CONF
_DB_CACHE = None
class Database(fixtures.Fixture):
def __init__(self, db_api, db_migrate, sql_connection):
self.sql_connection = sql_connection
self.engine = db_api.get_engine()
self.engine.dispose()
conn = self.engine.connect()
self.setup_sqlite(db_migrate)
self.post_migrations()
self._DB = "".join(line for line in conn.connection.iterdump())
self.engine.dispose()
def setup_sqlite(self, db_migrate):
if db_migrate.version():
return
models.Base.metadata.create_all(self.engine)
db_migrate.stamp('head')
def _setUp(self):
conn = self.engine.connect()
conn.connection.executescript(self._DB)
self.addCleanup(self.engine.dispose)
def post_migrations(self):
"""Any addition steps that are needed outside of the migrations."""
class DbTestCase(base.TestCase):
def setUp(self):
super(DbTestCase, self).setUp()
self.dbapi = db_api._get_dbdriver_instance()
global _DB_CACHE
if not _DB_CACHE:
_DB_CACHE = Database(sqla_api, migration,
sql_connection=CONF.database.connection)
self.useFixture(_DB_CACHE)
class ModelsObjectComparatorMixin(object):
def _dict_from_object(self, obj, ignored_keys):
if ignored_keys is None:
ignored_keys = []
return {k: v for k, v in obj.items()
if k not in ignored_keys}
def _assertEqualObjects(self, obj1, obj2, ignored_keys=None):
obj1 = self._dict_from_object(obj1, ignored_keys)
obj2 = self._dict_from_object(obj2, ignored_keys)
self.assertEqual(len(obj1),
len(obj2),
"Keys mismatch: %s" %
str(set(obj1.keys()) ^ set(obj2.keys())))
for key, value in obj1.items():
self.assertEqual(value, obj2[key])
def _assertEqualListsOfObjects(self, objs1, objs2, ignored_keys=None):
obj_to_dict = lambda o: self._dict_from_object(o, ignored_keys) # noqa
sort_key = lambda d: [d[k] for k in sorted(d)] # noqa: E731
conv_and_sort = lambda obj: ( # noqa: E731
sorted(map(obj_to_dict, obj), key=sort_key))
self.assertEqual(conv_and_sort(objs1), conv_and_sort(objs2))
def _assertEqualOrderedListOfObjects(self, objs1, objs2,
ignored_keys=None):
obj_to_dict = lambda o: self._dict_from_object(o, ignored_keys) # noqa
conv = lambda objs: [obj_to_dict(obj) for obj in objs] # noqa: E731
self.assertEqual(conv(objs1), conv(objs2))
def _assertEqualListsOfPrimitivesAsSets(self, primitives1, primitives2):
self.assertEqual(len(primitives1), len(primitives2))
for primitive in primitives1:
self.assertIn(primitive, primitives2)
for primitive in primitives2:
self.assertIn(primitive, primitives1)