Deprecate sqlite in favor of sqlalchemy

Since the default engine used by the new sqlalchemy driver is sqlite, it
doesn't make sense to keep the sqlite driver around. This patch replaces
all usages of `sqlite` with `sqlalchemy`.

Although sqlite is being deprecated, the patch keeps the entry point for
backwards compatibility so that environments currently using the sqlite
backend won't be suddenly broken.

Implement blueprint: sql-storage-driver

Change-Id: Ib7e32fa56ab0c6621dc9a888feb6b0e4981b4e91
This commit is contained in:
Flavio Percoco 2014-02-26 11:21:52 +01:00
parent bbcd593a74
commit e43e7e7101
33 changed files with 47 additions and 1186 deletions

View File

@ -1,24 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
In-memory reference Storage Driver for Marconi.
Useful for automated testing and for prototyping storage driver concepts.
"""
from marconi.queues.storage.sqlite import driver
# Hoist classes into package namespace
ControlDriver = driver.ControlDriver
DataDriver = driver.DataDriver

View File

@ -1,48 +0,0 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""sqlite storage controller for the queues catalogue.
Serves to construct an association between a project + queue -> shard
"""
from marconi.queues.storage import base
class CatalogueController(base.CatalogueBase):
def __init__(self, *args, **kwargs):
super(CatalogueController, self).__init__(*args, **kwargs)
def list(self, project):
pass
def get(self, project, queue):
pass
def exists(self, project, queue):
pass
def insert(self, project, queue, shard):
pass
def delete(self, project, queue):
pass
def update(self, project, queue, shards=None):
pass
def drop_all(self):
pass

View File

@ -1,160 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.queues import storage
from marconi.queues.storage import errors
from marconi.queues.storage.sqlite import utils
class ClaimController(storage.Claim):
def get(self, queue, claim_id, project):
if project is None:
project = ''
cid = utils.cid_decode(claim_id)
if cid is None:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
with self.driver('deferred'):
try:
id, ttl, age = self.driver.get('''
select C.id, C.ttl, julianday() * 86400.0 - C.created
from Queues as Q join Claims as C
on Q.id = C.qid
where C.ttl > julianday() * 86400.0 - C.created
and C.id = ? and project = ? and name = ?
''', cid, project, queue)
return (
{
'id': claim_id,
'ttl': ttl,
'age': int(age),
},
self.__get(id)
)
except utils.NoResult:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
def create(self, queue, metadata, project,
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
if project is None:
project = ''
with self.driver('immediate'):
try:
qid = utils.get_qid(self.driver, queue, project)
except errors.QueueDoesNotExist:
return None, iter([])
# Clean up all expired claims in this queue
self.driver.run('''
delete from Claims
where ttl <= julianday() * 86400.0 - created
and qid = ?''', qid)
self.driver.run('''
insert into Claims
values (null, ?, ?, julianday() * 86400.0)
''', qid, metadata['ttl'])
id = self.driver.lastrowid
self.driver.run('''
insert into Locked
select last_insert_rowid(), id
from Messages left join Locked
on id = msgid
where msgid is null
and ttl > julianday() * 86400.0 - created
and qid = ?
limit ?''', qid, limit)
messages_ttl = metadata['ttl'] + metadata['grace']
self.__update_claimed(id, messages_ttl)
return (utils.cid_encode(id), self.__get(id))
def __get(self, cid):
records = self.driver.run('''
select id, content, ttl, julianday() * 86400.0 - created
from Messages join Locked
on msgid = id
where ttl > julianday() * 86400.0 - created
and cid = ?''', cid)
for id, content, ttl, age in records:
yield {
'id': utils.msgid_encode(id),
'ttl': ttl,
'age': int(age),
'body': content,
}
def update(self, queue, claim_id, metadata, project):
if project is None:
project = ''
id = utils.cid_decode(claim_id)
if id is None:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
with self.driver('deferred'):
# still delay the cleanup here
self.driver.run('''
update Claims
set created = julianday() * 86400.0,
ttl = ?
where ttl > julianday() * 86400.0 - created
and id = ?
and qid = (select id from Queues
where project = ? and name = ?)
''', metadata['ttl'], id, project, queue)
if not self.driver.affected:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
self.__update_claimed(id, metadata['ttl'])
def __update_claimed(self, cid, ttl):
# Precondition: cid is not expired
self.driver.run('''
update Messages
set created = julianday() * 86400.0,
ttl = ?
where ttl < ?
and id in (select msgid from Locked
where cid = ?)
''', ttl, ttl, cid)
def delete(self, queue, claim_id, project):
if project is None:
project = ''
cid = utils.cid_decode(claim_id)
if cid is None:
return
self.driver.run('''
delete from Claims
where id = ?
and qid = (select id from Queues
where project = ? and name = ?)
''', cid, project, queue)

View File

@ -1,30 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
"""Exports SQLite driver controllers."""
from marconi.queues.storage.sqlite import catalogue
from marconi.queues.storage.sqlite import claims
from marconi.queues.storage.sqlite import messages
from marconi.queues.storage.sqlite import queues
from marconi.queues.storage.sqlite import shards
CatalogueController = catalogue.CatalogueController
ClaimController = claims.ClaimController
MessageController = messages.MessageController
QueueController = queues.QueueController
ShardsController = shards.ShardsController

View File

@ -1,232 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import json
import sqlite3
import uuid
from marconi.common import decorators
from marconi.queues import storage
from marconi.queues.storage.sqlite import controllers
from marconi.queues.storage.sqlite import options
from marconi.queues.storage.sqlite import utils
class DataDriver(storage.DataDriverBase):
def __init__(self, conf, cache):
super(DataDriver, self).__init__(conf, cache)
self.conf.register_opts(options.SQLITE_OPTIONS,
group=options.SQLITE_GROUP)
self.sqlite_conf = self.conf[options.SQLITE_GROUP]
self.__path = self.sqlite_conf.database
@decorators.lazy_property(write=False)
def connection(self):
return sqlite3.connect(self.__path,
detect_types=sqlite3.PARSE_DECLTYPES)
@decorators.lazy_property(write=False)
def database(self):
db = self.connection.cursor()
db.execute('''PRAGMA foreign_keys = ON''')
self._ensure_tables(db)
return db
def _ensure_tables(self, db):
"""Creates tables if they don't already exist."""
# NOTE(kgriffs): Create tables all together rather
# than separately in each controller, since some queries
# in the individual controllers actually require the
# presence of more than one table.
# NOTE(flaper87): Consider moving tables definition
# outside this method.
db.execute('''
create table
if not exists
Messages (
id INTEGER,
qid INTEGER,
ttl INTEGER,
content DOCUMENT,
client UUID,
created DATETIME, -- seconds since the Julian day
PRIMARY KEY(id),
FOREIGN KEY(qid) references Queues(id) on delete cascade
)
''')
db.execute('''
create table
if not exists
Queues (
id INTEGER,
project TEXT,
name TEXT,
metadata DOCUMENT,
PRIMARY KEY(id),
UNIQUE(project, name)
)
''')
db.execute('''
create table
if not exists
Claims (
id INTEGER PRIMARY KEY AUTOINCREMENT,
qid INTEGER,
ttl INTEGER,
created DATETIME, -- seconds since the Julian day
FOREIGN KEY(qid) references Queues(id) on delete cascade
)
''')
db.execute('''
create table
if not exists
Locked (
cid INTEGER,
msgid INTEGER,
FOREIGN KEY(cid) references Claims(id) on delete cascade,
FOREIGN KEY(msgid) references Messages(id) on delete cascade
)
''')
@staticmethod
def pack(o):
"""Converts a Python variable to a custom SQlite `DOCUMENT`.
:param o: a Python str, unicode, int, long, float, bool, None
or a dict or list of %o
"""
return sqlite3.Binary(json.dumps(o))
sqlite3.register_converter('DOCUMENT', lambda s:
json.loads(s, encoding='utf-8'))
@staticmethod
def uuid(o):
"""Converts a UUID object to a custom SQlite `UUID`.
:param o: a UUID object
"""
return sqlite3.Binary(o.bytes)
sqlite3.register_converter('UUID', lambda s:
uuid.UUID(hex=s))
def run(self, sql, *args):
"""Performs a SQL query.
:param sql: a query string with the '?' placeholders
:param args: the arguments to substitute the placeholders
"""
return self.database.execute(sql, args)
def run_multiple(self, sql, it):
"""Iteratively perform multiple SQL queries.
:param sql: a query string with the '?' placeholders
:param it: an iterator which yields a sequence of arguments to
substitute the placeholders
"""
self.database.executemany(sql, it)
def get(self, sql, *args):
"""Runs %sql and returns the first entry in the results.
:param sql: a query string with the '?' placeholders
:param args: the arguments to substitute the placeholders
:raises: utils.NoResult if the result set is empty
"""
try:
return next(self.run(sql, *args))
except StopIteration:
raise utils.NoResult()
@property
def affected(self):
"""Checks whether a row is affected in the last operation."""
assert self.database.rowcount in (0, 1)
return self.database.rowcount == 1
@property
def lastrowid(self):
"""Returns the last inserted row id."""
return self.database.lastrowid
@contextlib.contextmanager
def __call__(self, isolation):
self.run('begin ' + isolation)
try:
yield
self.connection.commit()
except Exception:
self.connection.rollback()
raise
def is_alive(self):
return True
@decorators.lazy_property(write=False)
def queue_controller(self):
return controllers.QueueController(self)
@decorators.lazy_property(write=False)
def message_controller(self):
return controllers.MessageController(self)
@decorators.lazy_property(write=False)
def claim_controller(self):
return controllers.ClaimController(self)
class ControlDriver(storage.ControlDriverBase):
def __init__(self, conf, cache):
super(ControlDriver, self).__init__(conf, cache)
self.conf.register_opts(options.SQLITE_OPTIONS,
group=options.SQLITE_GROUP)
self.sqlite_conf = self.conf[options.SQLITE_GROUP]
self.__path = self.sqlite_conf.database
@decorators.lazy_property(write=False)
def connection(self):
return sqlite3.connect(self.__path,
detect_types=sqlite3.PARSE_DECLTYPES)
@decorators.lazy_property(write=False)
def database(self):
db = self.connection.cursor()
db.execute('''PRAGMA foreign_keys = ON''')
return db
@property
def catalogue_controller(self):
return controllers.CatalogueController(self)
@property
def shards_controller(self):
return controllers.ShardsController(self)

View File

@ -1,275 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.openstack.common import timeutils
from marconi.queues import storage
from marconi.queues.storage import errors
from marconi.queues.storage.sqlite import utils
class MessageController(storage.Message):
def get(self, queue, message_id, project):
if project is None:
project = ''
mid = utils.msgid_decode(message_id)
if mid is None:
raise errors.MessageDoesNotExist(message_id, queue, project)
try:
content, ttl, age = self.driver.get('''
select content, ttl, julianday() * 86400.0 - created
from Queues as Q join Messages as M
on qid = Q.id
where ttl > julianday() * 86400.0 - created
and M.id = ? and project = ? and name = ?
''', mid, project, queue)
except utils.NoResult:
raise errors.MessageDoesNotExist(message_id, queue, project)
return {
'id': message_id,
'ttl': ttl,
'age': int(age),
'body': content,
}
def bulk_get(self, queue, message_ids, project):
if project is None:
project = ''
message_ids = ','.join(
["'%s'" % id for id in
map(utils.msgid_decode, message_ids) if id is not None]
)
sql = '''
select M.id, content, ttl, julianday() * 86400.0 - created
from Queues as Q join Messages as M
on qid = Q.id
where ttl > julianday() * 86400.0 - created
and M.id in (%s) and project = ? and name = ?
''' % message_ids
records = self.driver.run(sql, project, queue)
for id, content, ttl, age in records:
yield {
'id': utils.msgid_encode(id),
'ttl': ttl,
'age': int(age),
'body': content,
}
def first(self, queue, project=None, sort=1):
if project is None:
project = ''
with self.driver('deferred'):
sql = '''
select id, content, ttl, created,
julianday() * 86400.0 - created
from Messages
where ttl > julianday() * 86400.0 - created
and qid = ?
order by id %s
limit 1'''
if sort not in (1, -1):
raise ValueError(u'sort must be either 1 (ascending) '
u'or -1 (descending)')
sql = sql % ('DESC' if sort == -1 else 'ASC')
args = [utils.get_qid(self.driver, queue, project)]
records = self.driver.run(sql, *args)
try:
id, content, ttl, created, age = next(records)
except StopIteration:
raise errors.QueueIsEmpty(queue, project)
created_unix = utils.julian_to_unix(created)
created_iso8601 = timeutils.iso8601_from_timestamp(created_unix)
return {
'id': utils.msgid_encode(id),
'ttl': ttl,
'created': created_iso8601,
'age': age,
'body': content,
}
def list(self, queue, project, marker=None,
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
echo=False, client_uuid=None, include_claimed=False):
if project is None:
project = ''
with self.driver('deferred'):
sql = '''
select M.id, content, ttl, julianday() * 86400.0 - created
from Queues as Q join Messages as M
on M.qid = Q.id
where M.ttl > julianday() * 86400.0 - created
and Q.name = ? and Q.project = ?'''
args = [queue, project]
if not echo:
sql += '''
and M.client != ?'''
args += [self.driver.uuid(client_uuid)]
if marker:
sql += '''
and M.id > ?'''
args += [utils.marker_decode(marker)]
if not include_claimed:
sql += '''
and M.id not in (select msgid
from Claims join Locked
on id = cid)'''
sql += '''
limit ?'''
args += [limit]
records = self.driver.run(sql, *args)
marker_id = {}
def it():
for id, content, ttl, age in records:
marker_id['next'] = id
yield {
'id': utils.msgid_encode(id),
'ttl': ttl,
'age': int(age),
'body': content,
}
yield it()
yield utils.marker_encode(marker_id['next'])
def post(self, queue, messages, client_uuid, project):
if project is None:
project = ''
with self.driver('immediate'):
qid = utils.get_qid(self.driver, queue, project)
# cleanup all expired messages in this queue
self.driver.run('''
delete from Messages
where ttl <= julianday() * 86400.0 - created
and qid = ?''', qid)
# executemany() sets lastrowid to None, so no matter we manually
# generate the IDs or not, we still need to query for it.
unused = self.driver.get('''
select max(id) + 1 from Messages''')[0] or 1001
my = dict(newid=unused)
def it():
for m in messages:
yield (my['newid'], qid, m['ttl'],
self.driver.pack(m['body']),
self.driver.uuid(client_uuid))
my['newid'] += 1
self.driver.run_multiple('''
insert into Messages
values (?, ?, ?, ?, ?, julianday() * 86400.0)''', it())
return map(utils.msgid_encode, range(unused, my['newid']))
def delete(self, queue, message_id, project, claim=None):
if project is None:
project = ''
id = utils.msgid_decode(message_id)
if id is None:
return
with self.driver('immediate'):
message_exists, = self.driver.get('''
select count(M.id)
from Queues as Q join Messages as M
on qid = Q.id
where ttl > julianday() * 86400.0 - created
and M.id = ? and project = ? and name = ?
''', id, project, queue)
if not message_exists:
return
if claim is None:
self.__delete_unclaimed(id)
else:
self.__delete_claimed(id, claim)
def __delete_unclaimed(self, id):
self.driver.run('''
delete from Messages
where id = ?
and not exists (select *
from Claims join Locked
on id = cid
where ttl > julianday() * 86400.0 - created)
''', id)
if not self.driver.affected:
raise errors.MessageIsClaimed(id)
def __delete_claimed(self, id, claim):
# Precondition: id exists in a specific queue
cid = utils.cid_decode(claim)
if cid is None:
return
self.driver.run('''
delete from Messages
where id = ?
and id in (select msgid
from Claims join Locked
on id = cid
where ttl > julianday() * 86400.0 - created
and id = ?)
''', id, cid)
if not self.driver.affected:
raise errors.MessageIsClaimedBy(id, claim)
def bulk_delete(self, queue, message_ids, project):
if project is None:
project = ''
message_ids = ','.join(
["'%s'" % id for id in
map(utils.msgid_decode, message_ids) if id]
)
self.driver.run('''
delete from Messages
where id in (%s)
and qid = (select id from Queues
where project = ? and name = ?)
''' % message_ids, project, queue)

View File

@ -1,29 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from marconi.common import utils
SQLITE_OPTIONS = [
cfg.StrOpt('database', default=':memory:',
help='Sqlite database to use.')
]
SQLITE_GROUP = 'drivers:storage:sqlite'
def _config_options():
return utils.options_iter(SQLITE_OPTIONS, SQLITE_GROUP)

View File

@ -1,153 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.queues import storage
from marconi.queues.storage import errors
from marconi.queues.storage.sqlite import utils
class QueueController(storage.Queue):
def list(self, project, marker=None,
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
if project is None:
project = ''
sql = (('''
select name from Queues''' if not detailed
else '''
select name, metadata from Queues''') +
'''
where project = ?''')
args = [project]
if marker:
sql += '''
and name > ?'''
args += [marker]
sql += '''
order by name
limit ?'''
args += [limit]
records = self.driver.run(sql, *args)
marker_name = {}
def it():
for rec in records:
marker_name['next'] = rec[0]
yield ({'name': rec[0]} if not detailed
else
{'name': rec[0], 'metadata': rec[1]})
yield it()
yield marker_name['next']
def get_metadata(self, name, project):
if project is None:
project = ''
try:
return self.driver.get('''
select metadata from Queues
where project = ? and name = ?''', project, name)[0]
except utils.NoResult:
raise errors.QueueDoesNotExist(name, project)
def create(self, name, project):
if project is None:
project = ''
self.driver.run('''
insert or ignore into Queues
values (null, ?, ?, "{}")
''', project, name)
return self.driver.affected
def exists(self, name, project):
if project is None:
project = ''
return self.driver.run('''
select id from Queues
where project = ? and name = ?
''', project, name).fetchone() is not None
def set_metadata(self, name, metadata, project):
if project is None:
project = ''
self.driver.run('''
update Queues
set metadata = ?
where project = ? and name = ?
''', self.driver.pack(metadata), project, name)
if not self.driver.affected:
raise errors.QueueDoesNotExist(name, project)
def delete(self, name, project):
if project is None:
project = ''
self.driver.run('''
delete from Queues
where project = ? and name = ?''', project, name)
def stats(self, name, project):
if project is None:
project = ''
with self.driver('deferred'):
qid = utils.get_qid(self.driver, name, project)
claimed, free = self.driver.get('''
select * from
(select count(msgid)
from Claims join Locked
on id = cid
where ttl > julianday() * 86400.0 - created
and qid = ?),
(select count(id)
from Messages left join Locked
on id = msgid
where msgid is null
and ttl > julianday() * 86400.0 - created
and qid = ?)
''', qid, qid)
total = free + claimed
message_stats = {
'claimed': claimed,
'free': free,
'total': total,
}
try:
message_controller = self.driver.message_controller
oldest = message_controller.first(name, project, sort=1)
newest = message_controller.first(name, project, sort=-1)
except errors.QueueIsEmpty:
pass
else:
message_stats['oldest'] = utils.stat_message(oldest)
message_stats['newest'] = utils.stat_message(newest)
return {'messages': message_stats}

View File

@ -1,41 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.queues.storage import base
class ShardsController(base.ShardsBase):
def list(self, marker=None, limit=10, detailed=False):
pass
def get(self, name, detailed=False):
pass
def create(self, name, weight, uri, options=None):
pass
def exists(self, name):
pass
def update(self, name, **kwargs):
pass
def delete(self, name):
pass
def drop_all(self):
pass

View File

@ -1,92 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.queues.storage import errors
UNIX_EPOCH_AS_JULIAN_SEC = 2440587.5 * 86400.0
class NoResult(Exception):
pass
def get_qid(driver, queue, project):
try:
return driver.get('''
select id from Queues
where project = ? and name = ?''', project, queue)[0]
except NoResult:
raise errors.QueueDoesNotExist(queue, project)
# The utilities below make the database IDs opaque to the users
# of Marconi API. The only purpose is to advise the users NOT to
# make assumptions on the implementation of and/or relationship
# between the message IDs, the markers, and claim IDs.
#
# The magic numbers are arbitrarily picked; the numbers themselves
# come with no special functionalities.
def msgid_encode(id):
return hex(id ^ 0x5c693a53)[2:]
def msgid_decode(id):
try:
return int(id, 16) ^ 0x5c693a53
except ValueError:
return None
def marker_encode(id):
return oct(id ^ 0x3c96a355)[1:]
def marker_decode(id):
try:
return int(id, 8) ^ 0x3c96a355
except ValueError:
return None
def cid_encode(id):
return hex(id ^ 0x63c9a59c)[2:]
def cid_decode(id):
try:
return int(id, 16) ^ 0x63c9a59c
except ValueError:
return None
def julian_to_unix(julian_sec):
"""Converts Julian timestamp, in seconds, to a UNIX timestamp."""
return int(round(julian_sec - UNIX_EPOCH_AS_JULIAN_SEC))
def stat_message(message):
"""Creates a stat document based on a message."""
return {
'id': message['id'],
'age': message['age'],
'created': message['created'],
}

View File

@ -245,7 +245,7 @@ class TestClaimsMongoDB(ClaimsBaseTest):
class TestClaimsSQLite(ClaimsBaseTest):
config_file = 'wsgi_sqlite.conf'
config_file = 'wsgi_sqlalchemy.conf'
class TestClaimsFaultyDriver(base.TestBaseFaulty):

View File

@ -26,7 +26,7 @@ from marconi.queues import storage
class TestDefaultLimits(base.TestBase):
config_file = 'wsgi_sqlite_default_limits.conf'
config_file = 'wsgi_sqlalchemy_default_limits.conf'
def setUp(self):
super(TestDefaultLimits, self).setUp()

View File

@ -24,7 +24,7 @@ from . import base # noqa
class TestHomeDocument(base.TestBase):
config_file = 'wsgi_sqlite.conf'
config_file = 'wsgi_sqlalchemy.conf'
def test_json_response(self):
body = self.simulate_get(self.url_prefix)

View File

@ -23,7 +23,7 @@ from . import base # noqa
class TestMediaType(base.TestBase):
config_file = 'wsgi_sqlite.conf'
config_file = 'wsgi_sqlalchemy.conf'
def test_json_only_endpoints(self):
endpoints = (

View File

@ -456,11 +456,11 @@ class MessagesBaseTest(base.TestBase):
class TestMessagesSQLite(MessagesBaseTest):
config_file = 'wsgi_sqlite.conf'
config_file = 'wsgi_sqlalchemy.conf'
# TODO(cpp-cabrera): restore sqlite sharded test suite once shards and
# catalogue get an sqlite implementation.
# TODO(cpp-cabrera): restore sqlalchemy sharded test suite once shards and
# catalogue get an sqlalchemy implementation.
class TestMessagesMongoDB(MessagesBaseTest):

View File

@ -354,7 +354,7 @@ class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):
class TestQueueLifecycleSQLite(QueueLifecycleBaseTest):
config_file = 'wsgi_sqlite.conf'
config_file = 'wsgi_sqlalchemy.conf'
class TestQueueLifecycleFaultyDriver(base.TestBaseFaulty):

View File

@ -84,7 +84,7 @@ class ShardsBaseTest(base.TestBase):
def setUp(self):
super(ShardsBaseTest, self).setUp()
self.doc = {'weight': 100, 'uri': 'sqlite://memory'}
self.doc = {'weight': 100, 'uri': 'sqlite://:memory:'}
self.shard = self.url_prefix + '/shards/' + str(uuid.uuid1())
self.simulate_put(self.shard, body=json.dumps(self.doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
@ -105,7 +105,7 @@ class ShardsBaseTest(base.TestBase):
self.simulate_put(path, body=json.dumps({'weight': 100}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
self.simulate_put(path, body=json.dumps({'uri': 'sqlite://memory'}))
self.simulate_put(path, body=json.dumps({'uri': 'sqlite://:memory:'}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 2**32+1, 'big')
@ -132,7 +132,7 @@ class ShardsBaseTest(base.TestBase):
def test_put_existing_overwrites(self):
# NOTE(cabrera): setUp creates default shard
expect = {'weight': 20, 'uri': 'sqlite://other'}
expect = {'weight': 20, 'uri': 'sqlalchemy://other'}
self.simulate_put(self.shard,
body=json.dumps(expect))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
@ -198,11 +198,11 @@ class ShardsBaseTest(base.TestBase):
self.assertEqual(shard['options'], doc['options'])
def test_patch_works(self):
doc = {'weight': 101, 'uri': 'sqlite://memory', 'options': {'a': 1}}
doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1}}
self._patch_test(doc)
def test_patch_works_with_extra_fields(self):
doc = {'weight': 101, 'uri': 'sqlite://memory', 'options': {'a': 1},
doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1},
'location': 100, 'partition': 'taco'}
self._patch_test(doc)

View File

@ -30,13 +30,14 @@ console_scripts =
marconi-server = marconi.cmd.server:run
marconi.queues.data.storage =
sqlite = marconi.queues.storage.sqlite.driver:DataDriver
# NOTE(flaper87): sqlite points to sqla for backwards compatibility
sqlite = marconi.queues.storage.sqlalchemy.driver:DataDriver
sqlalchemy = marconi.queues.storage.sqlalchemy.driver:DataDriver
mongodb = marconi.queues.storage.mongodb.driver:DataDriver
faulty = marconi.tests.faulty_storage:DataDriver
marconi.queues.control.storage =
sqlite = marconi.queues.storage.sqlite.driver:ControlDriver
sqlite = marconi.queues.storage.sqlalchemy.driver:ControlDriver
sqlalchemy = marconi.queues.storage.sqlalchemy.driver:ControlDriver
mongodb = marconi.queues.storage.mongodb.driver:ControlDriver
faulty = marconi.tests.faulty_storage:ControlDriver
@ -53,7 +54,9 @@ oslo.config.opts =
marconi.storage.pipeline = marconi.queues.storage.pipeline._config_options
marconi.storage.sharding = marconi.queues.storage.sharding._config_options
marconi.storage.mongodb = marconi.queues.storage.mongodb.options._config_options
marconi.storage.sqlite = marconi.queues.storage.sqlite.options._config_options
# NOTE(flaper87): sqlite points to sqla for backwards compatibility
marconi.storage.sqlite = marconi.queues.storage.sqlalchemy.options._config_options
marconi.storage.sqlalchemy = marconi.queues.storage.sqlalchemy.options._config_options
marconi.transport.wsgi = marconi.queues.transport.wsgi.v1_0.driver._config_options
marconi.transport.base = marconi.queues.transport.base._config_options

View File

@ -4,7 +4,7 @@ verbose = False
[drivers]
transport = invalid
storage = sqlite
storage = sqlalchemy
[drivers:transport:wsgi]
port = 8888

View File

@ -23,8 +23,8 @@ debug = True
[drivers]
# Transport driver module (e.g., wsgi, zmq)
transport = wsgi
# Storage driver module (e.g., mongodb, sqlite)
storage = sqlite
# Storage driver module (e.g., mongodb, sqlalchemy)
storage = sqlalchemy
[drivers:transport:wsgi]
bind = 127.0.0.1

View File

@ -6,7 +6,7 @@ verbose = False
[drivers]
transport = wsgi
storage = sqlite
storage = sqlalchemy
[drivers:transport:wsgi]
bind = 0.0.0.0:8888

View File

@ -1,13 +1,13 @@
[DEFAULT]
debug = False
verbose = False
admin_mode = False
[drivers]
transport = wsgi
storage = sqlalchemy
[drivers:transport:wsgi]
bind = 0.0.0.0
port = 8888
[drivers:storage:sqlalchemy]
uri = sqlite:///:memory:
workers = 20

View File

@ -1,3 +1,3 @@
[drivers]
transport = wsgi
storage = sqlite
storage = sqlalchemy

View File

@ -3,4 +3,4 @@ sharding = True
[drivers]
transport = wsgi
storage = sqlite
storage = sqlalchemy

View File

@ -1,6 +1,6 @@
[drivers]
transport = wsgi
storage = sqlite
storage = sqlalchemy
# Test support for deprecated options
[limits:transport]

View File

@ -1,13 +0,0 @@
[DEFAULT]
debug = False
verbose = False
admin_mode = False
[drivers]
transport = wsgi
storage = sqlite
[drivers:transport:wsgi]
bind = 0.0.0.0
port = 8888
workers = 20

View File

@ -1,47 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.queues import storage
from marconi.queues.storage import sqlite
from marconi.queues.storage.sqlite import controllers
from marconi.tests.queues.storage import base
class SQliteQueueTests(base.QueueControllerTest):
driver_class = sqlite.DataDriver
controller_class = controllers.QueueController
class SQliteMessageTests(base.MessageControllerTest):
driver_class = sqlite.DataDriver
controller_class = controllers.MessageController
def test_empty_queue_exception(self):
queue_name = 'empty-queue-test'
self.queue_controller.create(queue_name, None)
self.assertRaises(storage.errors.QueueIsEmpty,
self.controller.first,
queue_name, None, sort=1)
def test_invalid_sort_option(self):
self.assertRaises(ValueError,
self.controller.first,
'foo', None, sort='dosomething()')
class SQliteClaimTests(base.ClaimControllerTest):
driver_class = sqlite.DataDriver
controller_class = controllers.ClaimController

View File

@ -20,7 +20,7 @@ from oslo.config import cfg
from marconi.openstack.common.cache import cache as oslo_cache
from marconi.queues.storage import sharding
from marconi.queues.storage import sqlite
from marconi.queues.storage import sqlalchemy
from marconi.queues.storage import utils
from marconi import tests as testing
@ -49,7 +49,7 @@ class ShardCatalogTest(testing.TestBase):
self.shard = str(uuid.uuid1())
self.queue = str(uuid.uuid1())
self.project = str(uuid.uuid1())
self.shards_ctrl.create(self.shard, 100, 'sqlite://memory')
self.shards_ctrl.create(self.shard, 100, 'sqlite://:memory:')
self.catalogue_ctrl.insert(self.project, self.queue, self.shard)
self.catalog = sharding.Catalog(self.conf, cache, control)
@ -60,7 +60,7 @@ class ShardCatalogTest(testing.TestBase):
def test_lookup_loads_correct_driver(self):
storage = self.catalog.lookup(self.queue, self.project)
self.assertIsInstance(storage, sqlite.DataDriver)
self.assertIsInstance(storage, sqlalchemy.DataDriver)
def test_lookup_returns_none_if_queue_not_mapped(self):
self.assertIsNone(self.catalog.lookup('not', 'mapped'))
@ -72,4 +72,4 @@ class ShardCatalogTest(testing.TestBase):
def test_register_leads_to_successful_lookup(self):
self.catalog.register('not_yet', 'mapped')
storage = self.catalog.lookup('not_yet', 'mapped')
self.assertIsInstance(storage, sqlite.DataDriver)
self.assertIsInstance(storage, sqlalchemy.DataDriver)

View File

@ -43,7 +43,8 @@ class ShardQueuesTest(testing.TestBase):
# fake two shards
for _ in xrange(2):
self.shards_ctrl.create(str(uuid.uuid1()), 100, 'sqlite://memory')
self.shards_ctrl.create(str(uuid.uuid1()), 100,
'sqlite://:memory:')
def tearDown(self):
self.shards_ctrl.drop_all()

View File

@ -92,7 +92,7 @@ class TestShardsMongoDB(wsgi.TestShardsMongoDB):
class TestHealth(wsgi.TestBase):
config_file = 'wsgi_sqlite.conf'
config_file = 'wsgi_sqlalchemy.conf'
def test_get(self):
response = self.simulate_get('/v1/health')

View File

@ -94,7 +94,7 @@ class TestShardsMongoDB(wsgi.TestShardsMongoDB):
class TestPing(wsgi.TestBase):
config_file = 'wsgi_sqlite.conf'
config_file = 'wsgi_sqlalchemy.conf'
def test_get(self):
# TODO(kgriffs): Make use of setUp for setting the URL prefix
@ -114,7 +114,7 @@ class TestPing(wsgi.TestBase):
class TestHealth(wsgi.TestBase):
config_file = 'wsgi_sqlite.conf'
config_file = 'wsgi_sqlalchemy.conf'
def test_get(self):
response = self.simulate_get('/v1.1/health')

View File

@ -23,7 +23,7 @@ from marconi.tests.queues.transport.wsgi import base
class ValidationTest(base.TestBase):
config_file = 'wsgi_sqlite_validation.conf'
config_file = 'wsgi_sqlalchemy_validation.conf'
def setUp(self):
super(ValidationTest, self).setUp()

View File

@ -17,7 +17,7 @@ from marconi.common import errors
from marconi.queues import bootstrap
from marconi.queues.storage import pipeline
from marconi.queues.storage import sharding
from marconi.queues.storage import sqlite
from marconi.queues.storage import sqlalchemy
from marconi.queues.transport import wsgi
from marconi.tests import base
@ -33,14 +33,15 @@ class TestBootstrap(base.TestBase):
self.assertRaises(errors.InvalidDriver,
lambda: bootstrap.storage)
def test_storage_sqlite(self):
bootstrap = self._bootstrap('wsgi_sqlite.conf')
def test_storage_sqlalchemy(self):
bootstrap = self._bootstrap('wsgi_sqlalchemy.conf')
self.assertIsInstance(bootstrap.storage, pipeline.DataDriver)
self.assertIsInstance(bootstrap.storage._storage, sqlite.DataDriver)
self.assertIsInstance(bootstrap.storage._storage,
sqlalchemy.DataDriver)
def test_storage_sqlite_sharded(self):
def test_storage_sqlalchemy_sharded(self):
"""Makes sure we can load the shard driver."""
bootstrap = self._bootstrap('wsgi_sqlite_sharded.conf')
bootstrap = self._bootstrap('wsgi_sqlalchemy_sharded.conf')
self.assertIsInstance(bootstrap.storage._storage, sharding.DataDriver)
def test_transport_invalid(self):
@ -49,5 +50,5 @@ class TestBootstrap(base.TestBase):
lambda: bootstrap.transport)
def test_transport_wsgi(self):
bootstrap = self._bootstrap('wsgi_sqlite.conf')
bootstrap = self._bootstrap('wsgi_sqlalchemy.conf')
self.assertIsInstance(bootstrap.transport, wsgi.Driver)