Remove sqlalchemy from our data plane

This patch removes the sqlalchemy as a valid data driver. In order to
make this work properly, we also need the data plane to be properly
split into 2 completely independent pipelines.

This patch, unfortunately, disables some of the sqlalchemy's tests for
queues and pools. The reason is that, until these planes are correctly
separated and pooling is enabled by default, we won't be able to
properly run those tests. A follow-up patch, expected to land in kilo,
will do this.

DocImpact

Implements blueprint: disable-sqlalchemy-as-pool

Change-Id: I5ca73102241692b0e1b23b3eaaaf1938f0106dfa
This commit is contained in:
Flavio Percoco 2015-03-27 10:43:11 +01:00 committed by Victoria Martínez de la Cruz
parent bb06ad70e6
commit 35f2764204
48 changed files with 179 additions and 457 deletions

View File

@ -38,9 +38,6 @@ console_scripts =
zaqar-gc = zaqar.cmd.gc:run
zaqar.data.storage =
# NOTE(flaper87): sqlite points to sqla for backwards compatibility
sqlite = zaqar.storage.sqlalchemy.driver:DataDriver
sqlalchemy = zaqar.storage.sqlalchemy.driver:DataDriver
mongodb = zaqar.storage.mongodb.driver:DataDriver
mongodb.fifo = zaqar.storage.mongodb.driver:FIFODataDriver
redis = zaqar.storage.redis.driver:DataDriver

View File

@ -25,7 +25,7 @@ unreliable = True
# Transport driver module (e.g., wsgi, zmq)
transport = wsgi
# Storage driver module (e.g., mongodb, sqlalchemy)
storage = sqlalchemy
storage = mongodb
[drivers:transport:wsgi]
bind = 127.0.0.1

View File

@ -6,7 +6,7 @@ verbose = False
[drivers]
transport = wsgi
storage = sqlalchemy
storage = mongodb
[drivers:transport:wsgi]
bind = 0.0.0.0:8888

View File

@ -1,3 +1,3 @@
[drivers]
transport = wsgi
storage = sqlalchemy
storage = mongodb

View File

@ -1,6 +1,6 @@
[drivers]
transport = wsgi
storage = sqlalchemy
storage = mongodb
# Test support for deprecated options
[limits:transport]

View File

@ -75,7 +75,7 @@ EXPECTED_VERSIONS = [
class TestVersion(base.TestBase):
config_file = 'wsgi_sqlalchemy.conf'
config_file = 'wsgi_mongodb.conf'
def test_get(self):
response = self.simulate_get('/')

View File

@ -41,7 +41,7 @@ class TestHealth(base.V1_1FunctionalTestBase):
{
'name': "pool_1",
'weight': 10,
'uri': "sqlite:///:memory:"
'uri': "mongodb://localhost:27017"
}
)
def test_health_with_pool(self, params):
@ -49,7 +49,7 @@ class TestHealth(base.V1_1FunctionalTestBase):
# as pool node and the mongodb is working on gate successfully.
doc = helpers.create_pool_body(
weight=params.get('weight', 10),
uri=params.get('uri', "sqlite:///:memory:")
uri=params.get('uri', "mongodb://localhost:27017")
)
pool_name = params.get('name', "pool_1")

View File

@ -38,8 +38,8 @@ class TestUtils(testing.TestBase):
self.assertTrue(utils.can_connect('redis://localhost'))
self.assertTrue(utils.can_connect('redis://localhost:6379'))
def test_can_connect_suceeds_if_good_uri_sqlite(self):
self.assertTrue(utils.can_connect('sqlite://:memory:'))
# def test_can_connect_suceeds_if_good_uri_sqlite(self):
# self.assertTrue(utils.can_connect('sqlite://:memory:'))
def test_can_connect_fails_if_bad_uri_missing_schema(self):
self.assertFalse(utils.can_connect('localhost:27017'))

View File

@ -0,0 +1,70 @@
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import six
from zaqar.storage import sqlalchemy
from zaqar.storage.sqlalchemy import controllers
from zaqar.storage.sqlalchemy import utils
from zaqar import tests as testing
from zaqar.tests.unit.storage import base
# NOTE(flaper87): We'll need this after splitting queues
# from the data driver
class SqlalchemyQueueTests(base.QueueControllerTest):
driver_class = sqlalchemy.ControlDriver
controller_class = controllers.QueueController
class SqlalchemyPoolsTest(base.PoolsControllerTest):
driver_class = sqlalchemy.ControlDriver
controller_class = controllers.PoolsController
def setUp(self):
super(SqlalchemyPoolsTest, self).setUp()
self.load_conf('wsgi_sqlalchemy.conf')
def tearDown(self):
super(SqlalchemyPoolsTest, self).tearDown()
class SqlalchemyCatalogueTest(base.CatalogueControllerTest):
driver_class = sqlalchemy.ControlDriver
controller_class = controllers.CatalogueController
def setUp(self):
super(SqlalchemyCatalogueTest, self).setUp()
self.load_conf('wsgi_sqlalchemy.conf')
def tearDown(self):
super(SqlalchemyCatalogueTest, self).tearDown()
class MsgidTests(testing.TestBase):
def test_encode(self):
if six.PY2:
ids = [3, long(1), 0]
elif six.PY3:
ids = [3, 1, 0]
msgids = ['5c693a50', '5c693a52', '5c693a53']
for msgid, id in zip(msgids, ids):
self.assertEqual(msgid, utils.msgid_encode(id))
def test_decode(self):
msgids = ['5c693a50', '5c693a52', '5c693a53', '']
ids = [3, 1, 0, None]
for msgid, id in zip(msgids, ids):
self.assertEqual(id, utils.msgid_decode(msgid))

View File

@ -518,6 +518,9 @@ class PooledMessageTests(base.MessageControllerTest):
control_driver_class = mongodb.ControlDriver
controller_base_class = storage.Message
# NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute
gc_interval = 60
@testing.requires_mongodb
class PooledQueueTests(base.QueueControllerTest):

View File

@ -1,199 +0,0 @@
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import datetime
import uuid
import six
import sqlalchemy as sa
from sqlalchemy.sql import func as sfunc
from zaqar import storage
from zaqar.storage import pooling
from zaqar.storage import sqlalchemy
from zaqar.storage.sqlalchemy import controllers
from zaqar.storage.sqlalchemy import tables
from zaqar.storage.sqlalchemy import utils
from zaqar import tests as testing
from zaqar.tests.unit.storage import base
class SqlalchemyTableTests(testing.TestBase):
def setUp(self):
super(SqlalchemyTableTests, self).setUp()
self.engine = sa.create_engine('sqlite:///:memory:')
tables.metadata.create_all(self.engine, checkfirst=True)
def test_table_queries(self):
self.engine.execute(tables.Queues.insert(), id=1, project='test',
name='zaqar', metadata=utils.json_encode('aaaa'))
self.engine.execute(tables.Messages.insert(), id=1, qid=1, ttl=10,
body=utils.json_encode('bbbb'), client='a',
created=datetime.datetime.now())
self.engine.execute(tables.Claims.insert(), id=1, qid=1, ttl=10,
created=datetime.datetime.now())
rs = self.engine.execute(tables.Claims.select())
row = rs.fetchone()
self.assertEqual(row.id, 1)
self.assertEqual(row.qid, 1)
self.assertEqual(row.ttl, 10)
self.engine.execute(tables.Claims.delete(tables.Claims.c.id == 1))
rs = self.engine.execute(tables.Claims.select())
row = rs.fetchone()
self.assertIsNone(row)
class SqlalchemyQueueTests(base.QueueControllerTest):
driver_class = sqlalchemy.DataDriver
controller_class = controllers.QueueController
class SqlalchemyMessageTests(base.MessageControllerTest):
driver_class = sqlalchemy.DataDriver
controller_class = controllers.MessageController
def test_expired_messages_be_deleted(self):
messages = [{'body': 3.14, 'ttl': 0}, {'body': 0.618, 'ttl': 600}]
client_uuid = uuid.uuid4()
[msgid_expired, msgid] = self.controller.post(self.queue_name,
messages,
project=self.project,
client_uuid=client_uuid)
mid = utils.msgid_decode(msgid_expired)
def _get(count=False):
j = sa.join(tables.Messages, tables.Queues,
tables.Messages.c.qid == tables.Queues.c.id)
sel = sa.sql.select([tables.Messages.c.body,
tables.Messages.c.ttl,
tables.Messages.c.created])
if count:
sel = sa.sql.select([sfunc.count(tables.Messages.c.id)])
sel = sel.select_from(j)
and_stmt = [tables.Messages.c.id == mid,
tables.Queues.c.name == self.queue_name,
tables.Queues.c.project == self.project]
sel = sel.where(sa.and_(*and_stmt))
return self.driver.get(sel)
[count] = _get(count=True)
self.assertEqual(count, 1)
# Expired messages will be removed from db until next Post
message = [{'body': 3.14, 'ttl': 300}]
self.controller.post(self.queue_name,
message,
project=self.project,
client_uuid=client_uuid)
with testing.expect(utils.NoResult):
_get()
class SqlalchemyClaimTests(base.ClaimControllerTest):
driver_class = sqlalchemy.DataDriver
controller_class = controllers.ClaimController
def test_delete_message_expired_claim(self):
# NOTE(flaper87): Several reasons to do this:
# The sqla driver is deprecated
# It's not optimized
# mocking utcnow mocks the driver too, which
# requires to put sleeps in the test
self.skip("Fix sqlalchemy driver")
class SqlalchemyPoolsTest(base.PoolsControllerTest):
driver_class = sqlalchemy.ControlDriver
controller_class = controllers.PoolsController
def setUp(self):
super(SqlalchemyPoolsTest, self).setUp()
self.load_conf('wsgi_sqlalchemy.conf')
def tearDown(self):
super(SqlalchemyPoolsTest, self).tearDown()
class SqlalchemyCatalogueTest(base.CatalogueControllerTest):
driver_class = sqlalchemy.ControlDriver
controller_class = controllers.CatalogueController
def setUp(self):
super(SqlalchemyCatalogueTest, self).setUp()
self.load_conf('wsgi_sqlalchemy.conf')
def tearDown(self):
super(SqlalchemyCatalogueTest, self).tearDown()
class PooledMessageTests(base.MessageControllerTest):
config_file = 'wsgi_sqlalchemy_pooled.conf'
controller_class = pooling.MessageController
driver_class = pooling.DataDriver
control_driver_class = sqlalchemy.ControlDriver
controller_base_class = storage.Message
class PooledClaimsTests(base.ClaimControllerTest):
config_file = 'wsgi_sqlalchemy_pooled.conf'
controller_class = pooling.ClaimController
driver_class = pooling.DataDriver
control_driver_class = sqlalchemy.ControlDriver
controller_base_class = storage.Claim
def test_delete_message_expired_claim(self):
# NOTE(flaper87): Several reasons to do this:
# The sqla driver is deprecated
# It's not optimized
# mocking utcnow mocks the driver too, which
# requires to put sleeps in the test
self.skip("Fix sqlalchemy driver")
class PooledQueueTests(base.QueueControllerTest):
config_file = 'wsgi_sqlalchemy_pooled.conf'
controller_class = pooling.QueueController
driver_class = pooling.DataDriver
control_driver_class = sqlalchemy.ControlDriver
controller_base_class = storage.Queue
class MsgidTests(testing.TestBase):
def test_encode(self):
if six.PY2:
ids = [3, long(1), 0]
elif six.PY3:
ids = [3, 1, 0]
msgids = ['5c693a50', '5c693a52', '5c693a53']
for msgid, id in zip(msgids, ids):
self.assertEqual(msgid, utils.msgid_encode(id))
def test_decode(self):
msgids = ['5c693a50', '5c693a52', '5c693a53', '']
ids = [3, 1, 0, None]
for msgid, id in zip(msgids, ids):
self.assertEqual(id, utils.msgid_decode(msgid))

View File

@ -16,8 +16,8 @@ import uuid
from zaqar.openstack.common.cache import cache as oslo_cache
from zaqar.storage import errors
from zaqar.storage import mongodb
from zaqar.storage import pooling
from zaqar.storage import sqlalchemy
from zaqar.storage import utils
from zaqar import tests as testing
@ -49,9 +49,9 @@ class PoolCatalogTest(testing.TestBase):
self.flavor = str(uuid.uuid1())
self.project = str(uuid.uuid1())
self.pools_ctrl.create(self.pool, 100, 'sqlite://:memory:')
self.pools_ctrl.create(self.pool, 100, 'mongodb://localhost:27017')
self.pools_ctrl.create(self.pool2, 100,
'sqlite://:memory:',
'mongodb://localhost:27017',
group=self.pool_group)
self.catalogue_ctrl.insert(self.project, self.queue, self.pool)
self.catalog = pooling.Catalog(self.conf, cache, control)
@ -65,7 +65,7 @@ class PoolCatalogTest(testing.TestBase):
def test_lookup_loads_correct_driver(self):
storage = self.catalog.lookup(self.queue, self.project)
self.assertIsInstance(storage, sqlalchemy.DataDriver)
self.assertIsInstance(storage, mongodb.DataDriver)
def test_lookup_returns_none_if_queue_not_mapped(self):
self.assertIsNone(self.catalog.lookup('not', 'mapped'))
@ -77,14 +77,14 @@ class PoolCatalogTest(testing.TestBase):
def test_register_leads_to_successful_lookup(self):
self.catalog.register('not_yet', 'mapped')
storage = self.catalog.lookup('not_yet', 'mapped')
self.assertIsInstance(storage, sqlalchemy.DataDriver)
self.assertIsInstance(storage, mongodb.DataDriver)
def test_register_with_flavor(self):
queue = 'test'
self.catalog.register(queue, project=self.project,
flavor=self.flavor)
storage = self.catalog.lookup(queue, self.project)
self.assertIsInstance(storage, sqlalchemy.DataDriver)
self.assertIsInstance(storage, mongodb.DataDriver)
def test_register_with_fake_flavor(self):
self.assertRaises(errors.FlavorDoesNotExist,

View File

@ -39,9 +39,11 @@ class PoolQueuesTest(testing.TestBase):
self.controller = self.driver.queue_controller
# fake two pools
for _ in six.moves.xrange(2):
for i in six.moves.xrange(2):
options = {'database': "zaqar_test_pools_" + str(i)}
self.pools_ctrl.create(str(uuid.uuid1()), 100,
'sqlite://:memory:')
'mongodb://localhost:27017',
options=options)
def tearDown(self):
self.pools_ctrl.drop_all()

View File

@ -15,9 +15,7 @@
from zaqar import bootstrap
from zaqar.common import errors
from zaqar.storage import pipeline
from zaqar.storage import pooling
from zaqar.storage import sqlalchemy
from zaqar.tests import base
from zaqar.transport import wsgi
@ -33,15 +31,9 @@ class TestBootstrap(base.TestBase):
self.assertRaises(errors.InvalidDriver,
lambda: bootstrap.storage)
def test_storage_sqlalchemy(self):
bootstrap = self._bootstrap('wsgi_sqlalchemy.conf')
self.assertIsInstance(bootstrap.storage, pipeline.DataDriver)
self.assertIsInstance(bootstrap.storage._storage,
sqlalchemy.DataDriver)
def test_storage_sqlalchemy_pooled(self):
def test_storage_mongodb_pooled(self):
"""Makes sure we can load the pool driver."""
bootstrap = self._bootstrap('wsgi_sqlalchemy_pooled.conf')
bootstrap = self._bootstrap('wsgi_mongodb_pooled.conf')
self.assertIsInstance(bootstrap.storage._storage, pooling.DataDriver)
def test_transport_invalid(self):
@ -50,5 +42,5 @@ class TestBootstrap(base.TestBase):
lambda: bootstrap.transport)
def test_transport_wsgi(self):
bootstrap = self._bootstrap('wsgi_sqlalchemy.conf')
bootstrap = self._bootstrap('wsgi_mongodb.conf')
self.assertIsInstance(bootstrap.transport, wsgi.Driver)

View File

@ -37,10 +37,6 @@ class TestClaimsMongoDB(v1.TestClaimsMongoDB):
url_prefix = URL_PREFIX
class TestClaimsSqlalchemy(v1.TestClaimsSqlalchemy):
url_prefix = URL_PREFIX
class TestDefaultLimits(v1.TestDefaultLimits):
url_prefix = URL_PREFIX
@ -65,10 +61,6 @@ class TestMessagesMongoDBPooled(v1.TestMessagesMongoDBPooled):
url_prefix = URL_PREFIX
class TestMessagesSqlalchemy(v1.TestMessagesSqlalchemy):
url_prefix = URL_PREFIX
class TestQueueFaultyDriver(v1.TestQueueFaultyDriver):
url_prefix = URL_PREFIX
@ -77,16 +69,17 @@ class TestQueueLifecycleMongoDB(v1.TestQueueLifecycleMongoDB):
url_prefix = URL_PREFIX
class TestQueueLifecycleSqlalchemy(v1.TestQueueLifecycleSqlalchemy):
url_prefix = URL_PREFIX
# NOTE(flaper87): We'll need this later on
# class TestQueueLifecycleSqlalchemy(v1.TestQueueLifecycleSqlalchemy):
# url_prefix = URL_PREFIX
class TestPoolsMongoDB(v1.TestPoolsMongoDB):
url_prefix = URL_PREFIX
class TestPoolsSqlalchemy(v1.TestPoolsSqlalchemy):
url_prefix = URL_PREFIX
# class TestPoolsSqlalchemy(v1.TestPoolsSqlalchemy):
# url_prefix = URL_PREFIX
class TestValidation(v1.TestValidation):
@ -99,7 +92,7 @@ class TestValidation(v1.TestValidation):
class TestHealth(base.V1Base):
config_file = 'wsgi_sqlalchemy.conf'
config_file = 'wsgi_mongodb.conf'
def test_get(self):
response = self.simulate_get('/v1/health')

View File

@ -39,10 +39,6 @@ class TestClaimsMongoDB(v1_1.TestClaimsMongoDB):
url_prefix = URL_PREFIX
class TestClaimsSqlalchemy(v1_1.TestClaimsSqlalchemy):
url_prefix = URL_PREFIX
class TestDefaultLimits(v1_1.TestDefaultLimits):
url_prefix = URL_PREFIX
@ -67,10 +63,6 @@ class TestMessagesMongoDBPooled(v1_1.TestMessagesMongoDBPooled):
url_prefix = URL_PREFIX
class TestMessagesSqlalchemy(v1_1.TestMessagesSqlalchemy):
url_prefix = URL_PREFIX
class TestQueueFaultyDriver(v1_1.TestQueueFaultyDriver):
url_prefix = URL_PREFIX
@ -80,17 +72,17 @@ class TestQueueFaultyDriver(v1_1.TestQueueFaultyDriver):
class TestQueueLifecycleMongoDB(v1_1.TestQueueLifecycleMongoDB):
url_prefix = URL_PREFIX
class TestQueueLifecycleSqlalchemy(v1_1.TestQueueLifecycleSqlalchemy):
url_prefix = URL_PREFIX
# NOTE(flaper87): We'll need this once we split data/control plane
# class TestQueueLifecycleSqlalchemy(v1_1.TestQueueLifecycleSqlalchemy):
# url_prefix = URL_PREFIX
class TestPoolsMongoDB(v1_1.TestPoolsMongoDB):
url_prefix = URL_PREFIX
class TestPoolsSqlalchemy(v1_1.TestPoolsSqlalchemy):
url_prefix = URL_PREFIX
# class TestPoolsSqlalchemy(v1_1.TestPoolsSqlalchemy):
# url_prefix = URL_PREFIX
class TestValidation(v1_1.TestValidation):
@ -107,7 +99,7 @@ class TestFlavorsMongoDB(v1_1.TestFlavorsMongoDB):
class TestPing(base.V1_1Base):
config_file = 'wsgi_sqlalchemy.conf'
config_file = 'wsgi_mongodb.conf'
def test_get(self):
# TODO(kgriffs): Make use of setUp for setting the URL prefix
@ -136,7 +128,7 @@ class TestHealthFaultyDriver(v1_1.TestHealthFaultyDriver):
@ddt.ddt
class TestMessages(base.V1_1Base):
config_file = 'wsgi_sqlalchemy.conf'
config_file = 'wsgi_mongodb.conf'
def setUp(self):
super(TestMessages, self).setUp()

View File

@ -39,10 +39,6 @@ class TestClaimsMongoDB(v2_0.TestClaimsMongoDB):
url_prefix = URL_PREFIX
class TestClaimsSqlalchemy(v2_0.TestClaimsSqlalchemy):
url_prefix = URL_PREFIX
class TestDefaultLimits(v2_0.TestDefaultLimits):
url_prefix = URL_PREFIX
@ -67,10 +63,6 @@ class TestMessagesMongoDBPooled(v2_0.TestMessagesMongoDBPooled):
url_prefix = URL_PREFIX
class TestMessagesSqlalchemy(v2_0.TestMessagesSqlalchemy):
url_prefix = URL_PREFIX
class TestQueueFaultyDriver(v2_0.TestQueueFaultyDriver):
url_prefix = URL_PREFIX
@ -81,16 +73,17 @@ class TestQueueLifecycleMongoDB(v2_0.TestQueueLifecycleMongoDB):
url_prefix = URL_PREFIX
class TestQueueLifecycleSqlalchemy(v2_0.TestQueueLifecycleSqlalchemy):
url_prefix = URL_PREFIX
# NOTE(flaper87): We'll need this
# class TestQueueLifecycleSqlalchemy(v2_0.TestQueueLifecycleSqlalchemy):
# url_prefix = URL_PREFIX
class TestPoolsMongoDB(v2_0.TestPoolsMongoDB):
url_prefix = URL_PREFIX
class TestPoolsSqlalchemy(v2_0.TestPoolsSqlalchemy):
url_prefix = URL_PREFIX
# class TestPoolsSqlalchemy(v2_0.TestPoolsSqlalchemy):
# url_prefix = URL_PREFIX
class TestValidation(v2_0.TestValidation):
@ -107,7 +100,7 @@ class TestFlavorsMongoDB(v2_0.TestFlavorsMongoDB):
class TestPing(base.V2Base):
config_file = 'wsgi_sqlalchemy.conf'
config_file = 'wsgi_mongodb.conf'
def test_get(self):
# TODO(kgriffs): Make use of setUp for setting the URL prefix
@ -136,7 +129,7 @@ class TestHealthFaultyDriver(v2_0.TestHealthFaultyDriver):
@ddt.ddt
class TestMessages(base.V2Base):
config_file = 'wsgi_sqlalchemy.conf'
config_file = 'wsgi_mongodb.conf'
def setUp(self):
super(TestMessages, self).setUp()

View File

@ -59,7 +59,7 @@ _GENERAL_OPTIONS = (
_DRIVER_OPTIONS = (
cfg.StrOpt('transport', default='wsgi',
help='Transport driver to use.'),
cfg.StrOpt('storage', default='sqlalchemy',
cfg.StrOpt('storage', default='mongodb',
help='Storage driver to use.'),
)

View File

@ -17,4 +17,3 @@ from zaqar.storage.sqlalchemy import driver
# Hoist classes into package namespace
ControlDriver = driver.ControlDriver
DataDriver = driver.DataDriver

View File

@ -13,43 +13,27 @@
# License for the specific language governing permissions and limitations under
# the License.
import contextlib
import logging
import sqlalchemy as sa
from zaqar.common import decorators
from zaqar.i18n import _
from zaqar import storage
from zaqar.storage.sqlalchemy import controllers
from zaqar.storage.sqlalchemy import options
from zaqar.storage.sqlalchemy import tables
from zaqar.storage.sqlalchemy import utils
LOG = logging.getLogger(__name__)
class DataDriver(storage.DataDriverBase):
BASE_CAPABILITIES = tuple(storage.Capabilities)
_DRIVER_OPTIONS = options._config_options()
class ControlDriver(storage.ControlDriverBase):
def __init__(self, conf, cache):
super(DataDriver, self).__init__(conf, cache)
self.sqlalchemy_conf = self.conf[options.MESSAGE_SQLALCHEMY_GROUP]
LOG.warn(_('sqlalchemy\'s data plane driver will be removed during '
'the next release. Please, consider moving your data to '
'one of the other supported drivers.'))
# FIXME(flaper87): Make this dynamic
self._capabilities = self.BASE_CAPABILITIES
@property
def capabilities(self):
return self._capabilities
super(ControlDriver, self).__init__(conf, cache)
self.conf.register_opts(options.MANAGEMENT_SQLALCHEMY_OPTIONS,
group=options.MANAGEMENT_SQLALCHEMY_GROUP)
self.sqlalchemy_conf = self.conf[options.MANAGEMENT_SQLALCHEMY_GROUP]
def _sqlite_on_connect(self, conn, record):
# NOTE(flaper87): This is necessary in order
@ -90,98 +74,6 @@ class DataDriver(storage.DataDriverBase):
def close_connection(self):
self.connection.close()
@contextlib.contextmanager
def trans(self):
with self.engine.begin() as connection:
yield connection
def run(self, statement):
"""Performs a SQL query.
:param sql: a query string with the '?' placeholders
:param args: the arguments to substitute the placeholders
"""
return self.connection.execute(statement)
def get(self, statement):
"""Runs sql and returns the first entry in the results.
:raises: utils.NoResult if the result set is empty
"""
res = self.run(statement)
r = res.fetchone()
if r is None:
raise utils.NoResult()
else:
res.close()
return r
@decorators.lazy_property(write=False)
def queue_controller(self):
return controllers.QueueController(self)
@decorators.lazy_property(write=False)
def message_controller(self):
return controllers.MessageController(self)
@decorators.lazy_property(write=False)
def claim_controller(self):
return controllers.ClaimController(self)
@decorators.lazy_property(write=False)
def subscription_controller(self):
pass
def is_alive(self):
return True
def _health(self):
KPI = {}
# Leverage the is_alive to indicate if the backend storage is
# reachable or not
KPI['storage_reachable'] = self.is_alive()
KPI['operation_status'] = self._get_operation_status()
message_volume = {'free': 0, 'claimed': 0, 'total': 0}
# NOTE(flwang): Using SQL directly to get better performance than
# sqlalchemy.
msg_count_claimed = self.get('SELECT COUNT(*) FROM MESSAGES'
' WHERE CID IS NOT NULL')
message_volume['claimed'] = int(msg_count_claimed[0])
msg_count_total = self.get('SELECT COUNT(*) FROM MESSAGES')
message_volume['total'] = int(msg_count_total[0])
message_volume['free'] = (message_volume['total'] -
message_volume['claimed'])
KPI['message_volume'] = message_volume
return KPI
class ControlDriver(storage.ControlDriverBase):
def __init__(self, conf, cache):
super(ControlDriver, self).__init__(conf, cache)
self.conf.register_opts(options.MANAGEMENT_SQLALCHEMY_OPTIONS,
group=options.MANAGEMENT_SQLALCHEMY_GROUP)
self.sqlalchemy_conf = self.conf[options.MANAGEMENT_SQLALCHEMY_GROUP]
@decorators.lazy_property(write=False)
def engine(self, *args, **kwargs):
engine = sa.create_engine(self.sqlalchemy_conf.uri, **kwargs)
tables.metadata.create_all(engine, checkfirst=True)
return engine
# TODO(cpp-cabrera): expose connect/close as a context manager
# that acquires the connection to the DB for the desired scope and
# closes it once the operations are completed
@decorators.lazy_property(write=False)
def connection(self):
return self.engine.connect()
def close_connection(self):
self.connection.close()
@property
def pools_controller(self):
return controllers.PoolsController(self)

View File

@ -42,7 +42,8 @@ def dynamic_conf(uri, options, conf=None):
# NOTE(cpp-cabrera): parse storage-specific opts:
# 'drivers:storage:{type}'
storage_opts = utils.dict_to_conf({'uri': uri, 'options': options})
options['uri'] = uri
storage_opts = utils.dict_to_conf(options)
storage_group = u'drivers:message_store:%s' % storage_type
# NOTE(cpp-cabrera): register those options!
@ -61,7 +62,10 @@ def dynamic_conf(uri, options, conf=None):
conf.register_opts(driver_opts, group=u'drivers')
conf.set_override('storage', storage_type, 'drivers')
conf.set_override('uri', uri, group=storage_group)
for opt in options:
if opt in conf[storage_group]:
conf.set_override(opt, options[opt], group=storage_group)
return conf

View File

@ -62,10 +62,14 @@ class ControllerBaseTest(testing.TestBase):
self.driver = self.driver_class(self.conf, cache)
else:
control = self.control_driver_class(self.conf, cache)
uri = "sqlite:///:memory:"
uri = "mongodb://localhost:27017"
for i in range(4):
control.pools_controller.create(six.text_type(i), 100, uri)
options = {'database': "zaqar_test_pools_" + str(i)}
control.pools_controller.create(six.text_type(i),
100, uri, options=options)
self.driver = self.driver_class(self.conf, cache, control)
self.addCleanup(control.pools_controller.drop_all)
self.addCleanup(control.catalogue_controller.drop_all)
self._prepare_conf()
@ -131,6 +135,7 @@ class QueueControllerTest(ControllerBaseTest):
self.assertEqual(all(map(lambda queue:
'name' in queue and
'metadata' not in queue, queues)), True)
self.assertEqual(len(queues), 5)
def test_queue_lifecycle(self):

View File

@ -25,14 +25,12 @@ from zaqar.tests.unit.transport.wsgi.v1 import test_validation
TestAuth = test_auth.TestAuth
TestClaimsFaultyDriver = test_claims.TestClaimsFaultyDriver
TestClaimsMongoDB = test_claims.TestClaimsMongoDB
TestClaimsSqlalchemy = test_claims.TestClaimsSqlalchemy
TestDefaultLimits = test_default_limits.TestDefaultLimits
TestHomeDocument = test_home.TestHomeDocument
TestMediaType = test_media_type.TestMediaType
TestMessagesFaultyDriver = test_messages.TestMessagesFaultyDriver
TestMessagesMongoDB = test_messages.TestMessagesMongoDB
TestMessagesMongoDBPooled = test_messages.TestMessagesMongoDBPooled
TestMessagesSqlalchemy = test_messages.TestMessagesSqlalchemy
TestQueueFaultyDriver = l.TestQueueLifecycleFaultyDriver
TestQueueLifecycleMongoDB = l.TestQueueLifecycleMongoDB
TestQueueLifecycleSqlalchemy = l.TestQueueLifecycleSqlalchemy

View File

@ -243,11 +243,6 @@ class TestClaimsMongoDB(ClaimsBaseTest):
super(TestClaimsMongoDB, self).tearDown()
class TestClaimsSqlalchemy(ClaimsBaseTest):
config_file = 'wsgi_sqlalchemy.conf'
class TestClaimsFaultyDriver(base.V1BaseFaulty):
config_file = 'wsgi_faulty.conf'

View File

@ -25,13 +25,13 @@ from zaqar.tests.unit.transport.wsgi import base
class TestDefaultLimits(base.V1Base):
config_file = 'wsgi_sqlalchemy_default_limits.conf'
config_file = 'wsgi_mongodb_default_limits.conf'
def setUp(self):
super(TestDefaultLimits, self).setUp()
self.queue_path = self.url_prefix + '/queues'
self.q1_queue_path = self.queue_path + '/q1'
self.q1_queue_path = self.queue_path + '/' + str(uuid.uuid4())
self.messages_path = self.q1_queue_path + '/messages'
self.claims_path = self.q1_queue_path + '/claims'

View File

@ -21,7 +21,7 @@ from zaqar.tests.unit.transport.wsgi import base
class TestHomeDocument(base.V1Base):
config_file = 'wsgi_sqlalchemy.conf'
config_file = 'wsgi_mongodb.conf'
def test_json_response(self):
body = self.simulate_get(self.url_prefix)

View File

@ -24,7 +24,7 @@ from zaqar.tests.unit.transport.wsgi import base
class TestMediaType(base.V1Base):
config_file = 'wsgi_sqlalchemy.conf'
config_file = 'wsgi_mongodb.conf'
def test_json_only_endpoints(self):
endpoints = (

View File

@ -469,11 +469,6 @@ class MessagesBaseTest(base.V1Base):
return headers['location'].rsplit('=', 1)[-1].split(',')
class TestMessagesSqlalchemy(MessagesBaseTest):
config_file = 'wsgi_sqlalchemy.conf'
class TestMessagesMongoDB(MessagesBaseTest):
config_file = 'wsgi_mongodb.conf'

View File

@ -82,7 +82,7 @@ class PoolsBaseTest(base.V1Base):
def setUp(self):
super(PoolsBaseTest, self).setUp()
self.doc = {'weight': 100, 'uri': 'sqlite://:memory:'}
self.doc = {'weight': 100, 'uri': 'mongodb://localhost:27017'}
self.pool = self.url_prefix + '/pools/' + str(uuid.uuid1())
self.simulate_put(self.pool, body=jsonutils.dumps(self.doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
@ -105,7 +105,7 @@ class PoolsBaseTest(base.V1Base):
self.simulate_put(path,
body=jsonutils.dumps(
{'uri': 'sqlite://:memory:'}))
{'uri': 'mongodb://localhost:27017'}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 2**32+1, 'big')
@ -198,11 +198,15 @@ class PoolsBaseTest(base.V1Base):
self.assertEqual(pool['options'], doc['options'])
def test_patch_works(self):
doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1}}
doc = {'weight': 101,
'uri': 'mongodb://localhost:27017',
'options': {'a': 1}}
self._patch_test(doc)
def test_patch_works_with_extra_fields(self):
doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1},
doc = {'weight': 101,
'uri': 'mongodb://localhost:27017',
'options': {'a': 1},
'location': 100, 'partition': 'taco'}
self._patch_test(doc)

View File

@ -23,7 +23,7 @@ from zaqar.tests.unit.transport.wsgi import base
class TestValidation(base.V1Base):
config_file = 'wsgi_sqlalchemy_validation.conf'
config_file = 'wsgi_mongodb_validation.conf'
def setUp(self):
super(TestValidation, self).setUp()

View File

@ -27,7 +27,6 @@ from zaqar.tests.unit.transport.wsgi.v1_1 import test_validation
TestAuth = test_auth.TestAuth
TestClaimsFaultyDriver = test_claims.TestClaimsFaultyDriver
TestClaimsMongoDB = test_claims.TestClaimsMongoDB
TestClaimsSqlalchemy = test_claims.TestClaimsSqlalchemy
TestDefaultLimits = test_default_limits.TestDefaultLimits
TestHealthMongoDB = test_health.TestHealthMongoDB
TestHealthFaultyDriver = test_health.TestHealthFaultyDriver
@ -36,7 +35,6 @@ TestMediaType = test_media_type.TestMediaType
TestMessagesFaultyDriver = test_messages.TestMessagesFaultyDriver
TestMessagesMongoDB = test_messages.TestMessagesMongoDB
TestMessagesMongoDBPooled = test_messages.TestMessagesMongoDBPooled
TestMessagesSqlalchemy = test_messages.TestMessagesSqlalchemy
TestQueueFaultyDriver = l.TestQueueLifecycleFaultyDriver
TestQueueLifecycleMongoDB = l.TestQueueLifecycleMongoDB
TestQueueLifecycleSqlalchemy = l.TestQueueLifecycleSqlalchemy

View File

@ -296,11 +296,6 @@ class TestClaimsMongoDB(ClaimsBaseTest):
super(TestClaimsMongoDB, self).tearDown()
class TestClaimsSqlalchemy(ClaimsBaseTest):
config_file = 'wsgi_sqlalchemy.conf'
class TestClaimsFaultyDriver(base.V1_1BaseFaulty):
config_file = 'wsgi_faulty.conf'

View File

@ -25,17 +25,17 @@ from zaqar.tests.unit.transport.wsgi import base
class TestDefaultLimits(base.V1_1Base):
config_file = 'wsgi_sqlalchemy_default_limits.conf'
config_file = 'wsgi_mongodb_default_limits.conf'
def setUp(self):
super(TestDefaultLimits, self).setUp()
self.headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': '838383abc_'
'X-Project-ID': '%s_' % str(uuid.uuid4())
}
self.queue_path = self.url_prefix + '/queues'
self.q1_queue_path = self.queue_path + '/q1'
self.q1_queue_path = self.queue_path + '/' + str(uuid.uuid4())
self.messages_path = self.q1_queue_path + '/messages'
self.claims_path = self.q1_queue_path + '/claims'

View File

@ -92,7 +92,7 @@ class FlavorsBaseTest(base.V1_1Base):
self.pool_path = self.url_prefix + '/pools/' + self.pool
self.pool_doc = {'weight': 100,
'group': self.pool_group,
'uri': 'sqlite://:memory:'}
'uri': 'mongodb://localhost:27017'}
self.simulate_put(self.pool_path, body=jsonutils.dumps(self.pool_doc))
self.flavor = 'test-flavor'

View File

@ -23,7 +23,7 @@ from zaqar.tests.unit.transport.wsgi import base
class TestHomeDocument(base.V1_1Base):
config_file = 'wsgi_sqlalchemy.conf'
config_file = 'wsgi_mongodb.conf'
def test_json_response(self):
self.headers = {

View File

@ -23,7 +23,7 @@ from zaqar.tests.unit.transport.wsgi import base
class TestMediaType(base.V1_1Base):
config_file = 'wsgi_sqlalchemy.conf'
config_file = 'wsgi_mongodb.conf'
def test_json_only_endpoints(self):
endpoints = (

View File

@ -514,10 +514,6 @@ class MessagesBaseTest(base.V1_1Base):
return headers['location'].rsplit('=', 1)[-1].split(',')
class TestMessagesSqlalchemy(MessagesBaseTest):
config_file = 'wsgi_sqlalchemy.conf'
class TestMessagesMongoDB(MessagesBaseTest):
config_file = 'wsgi_mongodb.conf'

View File

@ -86,7 +86,7 @@ class PoolsBaseTest(base.V1_1Base):
super(PoolsBaseTest, self).setUp()
self.doc = {'weight': 100,
'group': 'mygroup',
'uri': 'sqlite://:memory:'}
'uri': 'mongodb://localhost:27017'}
self.pool = self.url_prefix + '/pools/' + str(uuid.uuid1())
self.simulate_put(self.pool, body=jsonutils.dumps(self.doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
@ -109,7 +109,7 @@ class PoolsBaseTest(base.V1_1Base):
self.simulate_put(path,
body=jsonutils.dumps(
{'uri': 'sqlite://:memory:'}))
{'uri': 'mongodb://localhost:27017'}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 2**32+1, 'big')
@ -202,11 +202,15 @@ class PoolsBaseTest(base.V1_1Base):
self.assertEqual(pool['options'], doc['options'])
def test_patch_works(self):
doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1}}
doc = {'weight': 101,
'uri': 'mongodb://localhost:27017',
'options': {'a': 1}}
self._patch_test(doc)
def test_patch_works_with_extra_fields(self):
doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1},
doc = {'weight': 101,
'uri': 'mongodb://localhost:27017',
'options': {'a': 1},
'location': 100, 'partition': 'taco'}
self._patch_test(doc)

View File

@ -23,7 +23,7 @@ from zaqar.tests.unit.transport.wsgi import base
class TestValidation(base.V1_1Base):
config_file = 'wsgi_sqlalchemy_validation.conf'
config_file = 'wsgi_mongodb_validation.conf'
def setUp(self):
super(TestValidation, self).setUp()

View File

@ -27,7 +27,6 @@ from zaqar.tests.unit.transport.wsgi.v2_0 import test_validation
TestAuth = test_auth.TestAuth
TestClaimsFaultyDriver = test_claims.TestClaimsFaultyDriver
TestClaimsMongoDB = test_claims.TestClaimsMongoDB
TestClaimsSqlalchemy = test_claims.TestClaimsSqlalchemy
TestDefaultLimits = test_default_limits.TestDefaultLimits
TestHealthMongoDB = test_health.TestHealthMongoDB
TestHealthFaultyDriver = test_health.TestHealthFaultyDriver
@ -36,7 +35,6 @@ TestMediaType = test_media_type.TestMediaType
TestMessagesFaultyDriver = test_messages.TestMessagesFaultyDriver
TestMessagesMongoDB = test_messages.TestMessagesMongoDB
TestMessagesMongoDBPooled = test_messages.TestMessagesMongoDBPooled
TestMessagesSqlalchemy = test_messages.TestMessagesSqlalchemy
TestQueueFaultyDriver = l.TestQueueLifecycleFaultyDriver
TestQueueLifecycleMongoDB = l.TestQueueLifecycleMongoDB
TestQueueLifecycleSqlalchemy = l.TestQueueLifecycleSqlalchemy

View File

@ -296,11 +296,6 @@ class TestClaimsMongoDB(ClaimsBaseTest):
super(TestClaimsMongoDB, self).tearDown()
class TestClaimsSqlalchemy(ClaimsBaseTest):
config_file = 'wsgi_sqlalchemy.conf'
class TestClaimsFaultyDriver(base.V2BaseFaulty):
config_file = 'wsgi_faulty.conf'

View File

@ -25,17 +25,17 @@ from zaqar.tests.unit.transport.wsgi import base
class TestDefaultLimits(base.V2Base):
config_file = 'wsgi_sqlalchemy_default_limits.conf'
config_file = 'wsgi_mongodb_default_limits.conf'
def setUp(self):
super(TestDefaultLimits, self).setUp()
self.headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': '838383abc_'
'X-Project-ID': '%s_' % str(uuid.uuid4())
}
self.queue_path = self.url_prefix + '/queues'
self.q1_queue_path = self.queue_path + '/q1'
self.q1_queue_path = self.queue_path + '/' + str(uuid.uuid4())
self.messages_path = self.q1_queue_path + '/messages'
self.claims_path = self.q1_queue_path + '/claims'

View File

@ -91,7 +91,7 @@ class FlavorsBaseTest(base.V2Base):
self.pool_path = self.url_prefix + '/pools/' + self.pool
self.pool_doc = {'weight': 100,
'group': self.pool_group,
'uri': 'sqlite://:memory:'}
'uri': 'mongodb://localhost:27017'}
self.simulate_put(self.pool_path, body=jsonutils.dumps(self.pool_doc))
self.flavor = 'test-flavor'

View File

@ -23,7 +23,7 @@ from zaqar.tests.unit.transport.wsgi import base
class TestHomeDocument(base.V2Base):
config_file = 'wsgi_sqlalchemy.conf'
config_file = 'wsgi_mongodb.conf'
def test_json_response(self):
self.headers = {

View File

@ -23,7 +23,7 @@ from zaqar.tests.unit.transport.wsgi import base
class TestMediaType(base.V2Base):
config_file = 'wsgi_sqlalchemy.conf'
config_file = 'wsgi_mongodb.conf'
def test_json_only_endpoints(self):
endpoints = (

View File

@ -514,10 +514,6 @@ class MessagesBaseTest(base.V2Base):
return headers['location'].rsplit('=', 1)[-1].split(',')
class TestMessagesSqlalchemy(MessagesBaseTest):
config_file = 'wsgi_sqlalchemy.conf'
class TestMessagesMongoDB(MessagesBaseTest):
config_file = 'wsgi_mongodb.conf'

View File

@ -86,7 +86,7 @@ class PoolsBaseTest(base.V2Base):
super(PoolsBaseTest, self).setUp()
self.doc = {'weight': 100,
'group': 'mygroup',
'uri': 'sqlite://:memory:'}
'uri': 'mongodb://127.0.0.1:27017'}
self.pool = self.url_prefix + '/pools/' + str(uuid.uuid1())
self.simulate_put(self.pool, body=jsonutils.dumps(self.doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
@ -109,7 +109,7 @@ class PoolsBaseTest(base.V2Base):
self.simulate_put(path,
body=jsonutils.dumps(
{'uri': 'sqlite://:memory:'}))
{'uri': 'mongodb://127.0.0.1:27017'}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 2**32+1, 'big')
@ -202,12 +202,17 @@ class PoolsBaseTest(base.V2Base):
self.assertEqual(pool['options'], doc['options'])
def test_patch_works(self):
doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1}}
doc = {'weight': 101,
'uri': 'mongodb://localhost:27017',
'options': {'a': 1}}
self._patch_test(doc)
def test_patch_works_with_extra_fields(self):
doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1},
'location': 100, 'partition': 'taco'}
doc = {'weight': 101,
'uri': 'mongodb://localhost:27017',
'options': {'a': 1},
'location': 100,
'partition': 'taco'}
self._patch_test(doc)
@ddt.data(-1, 2**32+1, 'big')

View File

@ -23,7 +23,7 @@ from zaqar.tests.unit.transport.wsgi import base
class TestValidation(base.V2Base):
config_file = 'wsgi_sqlalchemy_validation.conf'
config_file = 'wsgi_mongodb_validation.conf'
def setUp(self):
super(TestValidation, self).setUp()