
The etcd backend is duplicated for removal. This commit removes the etcd db backend related code. Change-Id: Id7849576a94f66b681d8d346a90d4e151548eaa7
112 lines
3.7 KiB
Python
112 lines
3.7 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Zun DB test base class."""
|
|
|
|
import fixtures
|
|
|
|
import zun.conf
|
|
from zun.db import api as db_api
|
|
from zun.db.sqlalchemy import api as sqla_api
|
|
from zun.db.sqlalchemy import migration
|
|
from zun.db.sqlalchemy import models
|
|
from zun.tests import base
|
|
|
|
|
|
CONF = zun.conf.CONF
|
|
|
|
_DB_CACHE = None
|
|
|
|
|
|
class Database(fixtures.Fixture):
|
|
|
|
def __init__(self, db_api, db_migrate, sql_connection):
|
|
self.sql_connection = sql_connection
|
|
self.engine = db_api.get_engine()
|
|
self.engine.dispose()
|
|
conn = self.engine.connect()
|
|
self.setup_sqlite(db_migrate)
|
|
self.post_migrations()
|
|
|
|
self._DB = "".join(line for line in conn.connection.iterdump())
|
|
self.engine.dispose()
|
|
|
|
def setup_sqlite(self, db_migrate):
|
|
if db_migrate.version():
|
|
return
|
|
models.Base.metadata.create_all(self.engine)
|
|
db_migrate.stamp('head')
|
|
|
|
def _setUp(self):
|
|
conn = self.engine.connect()
|
|
conn.connection.executescript(self._DB)
|
|
self.addCleanup(self.engine.dispose)
|
|
|
|
def post_migrations(self):
|
|
"""Any addition steps that are needed outside of the migrations."""
|
|
|
|
|
|
class DbTestCase(base.TestCase):
|
|
|
|
def setUp(self):
|
|
super(DbTestCase, self).setUp()
|
|
|
|
self.dbapi = db_api._get_dbdriver_instance()
|
|
|
|
global _DB_CACHE
|
|
if not _DB_CACHE:
|
|
_DB_CACHE = Database(sqla_api, migration,
|
|
sql_connection=CONF.database.connection)
|
|
self.useFixture(_DB_CACHE)
|
|
|
|
|
|
class ModelsObjectComparatorMixin(object):
|
|
def _dict_from_object(self, obj, ignored_keys):
|
|
if ignored_keys is None:
|
|
ignored_keys = []
|
|
|
|
return {k: v for k, v in obj.items()
|
|
if k not in ignored_keys}
|
|
|
|
def _assertEqualObjects(self, obj1, obj2, ignored_keys=None):
|
|
obj1 = self._dict_from_object(obj1, ignored_keys)
|
|
obj2 = self._dict_from_object(obj2, ignored_keys)
|
|
|
|
self.assertEqual(len(obj1),
|
|
len(obj2),
|
|
"Keys mismatch: %s" %
|
|
str(set(obj1.keys()) ^ set(obj2.keys())))
|
|
for key, value in obj1.items():
|
|
self.assertEqual(value, obj2[key])
|
|
|
|
def _assertEqualListsOfObjects(self, objs1, objs2, ignored_keys=None):
|
|
obj_to_dict = lambda o: self._dict_from_object(o, ignored_keys)
|
|
sort_key = lambda d: [d[k] for k in sorted(d)]
|
|
conv_and_sort = lambda obj: sorted(map(obj_to_dict, obj), key=sort_key)
|
|
|
|
self.assertEqual(conv_and_sort(objs1), conv_and_sort(objs2))
|
|
|
|
def _assertEqualOrderedListOfObjects(self, objs1, objs2,
|
|
ignored_keys=None):
|
|
obj_to_dict = lambda o: self._dict_from_object(o, ignored_keys)
|
|
conv = lambda objs: [obj_to_dict(obj) for obj in objs]
|
|
|
|
self.assertEqual(conv(objs1), conv(objs2))
|
|
|
|
def _assertEqualListsOfPrimitivesAsSets(self, primitives1, primitives2):
|
|
self.assertEqual(len(primitives1), len(primitives2))
|
|
for primitive in primitives1:
|
|
self.assertIn(primitive, primitives2)
|
|
|
|
for primitive in primitives2:
|
|
self.assertIn(primitive, primitives1)
|