Claim messages in SQlite.

Implements: blueprint storage-reference

Change-Id: Ib10f80e05b169f62885cd1ae88307b9aa3b37c0c
This commit is contained in:
Zhihao Yuan
2013-03-24 04:03:35 -04:00
committed by Gerrit Code Review
parent 90dd619d78
commit 9cde7d8c95
4 changed files with 236 additions and 9 deletions

View File

@@ -16,3 +16,7 @@
class DoesNotExist(Exception):
pass
class NotPermitted(Exception):
pass

View File

@@ -152,9 +152,9 @@ class Message(base.MessageBase):
limit ?'''
args += [limit]
iter = self.driver.run(sql, *args)
records = self.driver.run(sql, *args)
for id, content, ttl, age in iter:
for id, content, ttl, age in records:
yield {
'id': _msgid_encode(id),
'ttl': ttl,
@@ -198,16 +198,168 @@ class Message(base.MessageBase):
def delete(self, queue, message_id, tenant, claim=None):
try:
self.driver.run('''
sql = '''
delete from Messages
where id = ?
and qid = (select id from Queues
where tenant = ? and name = ?)'''
args = [_msgid_decode(message_id), tenant, queue]
if claim:
sql += '''
and id in (select msgid
from Claims join Locked
on id = cid
where ttl > julianday() * 86400.0 - created
and id = ?)'''
args += [_cid_decode(claim)]
self.driver.run(sql, *args)
if not self.driver.affected:
raise _BadID
except _BadID:
#TODO(zyuan): use exception itself to format this
if claim:
msg = (_("Attempt to delete message %(id)s "
"with a wrong claim")
% dict(id=message_id))
raise exceptions.NotPermitted(msg)
class Claim(base.ClaimBase):
def __init__(self, driver):
self.driver = driver
self.driver.run('''
create table
if not exists
Claims (
id INTEGER PRIMARY KEY AUTOINCREMENT,
qid INTEGER,
ttl INTEGER,
created DATETIME, -- seconds since the Julian day
FOREIGN KEY(qid) references Queues(id) on delete cascade
)
''')
self.driver.run('''
create table
if not exists
Locked (
cid INTEGER,
msgid INTEGER,
FOREIGN KEY(cid) references Claims(id) on delete cascade,
FOREIGN KEY(msgid) references Messages(id) on delete cascade
)
''')
def get(self, queue, claim_id, tenant):
with self.driver('deferred'):
try:
id, ttl, age = self.driver.get('''
select C.id, C.ttl, julianday() * 86400.0 - C.created
from Queues as Q join Claims as C
on Q.id = C.qid
where C.ttl > julianday() * 86400.0 - C.created
and C.id = ? and tenant = ? and name = ?
''', _cid_decode(claim_id), tenant, queue)
return (
{
'id': claim_id,
'ttl': ttl,
'age': int(age),
},
self.__get(id)
)
except (_NoResult, _BadID):
_claim_doesnotexist(claim_id)
def create(self, queue, metadata, tenant, limit=10):
with self.driver('immediate'):
qid = _get_qid(self.driver, queue, tenant)
# cleanup all expired claims in this queue
self.driver.run('''
delete from Claims
where ttl <= julianday() * 86400.0 - created
and qid = ?''', qid)
self.driver.run('''
insert into Claims
values (null, ?, ?, julianday() * 86400.0)
''', qid, metadata['ttl'])
id = self.driver.lastrowid
self.driver.run('''
insert into Locked
select last_insert_rowid(), id
from Messages left join Locked
on id = msgid
where msgid is null
and qid = ?
limit ?''', qid, limit)
return (
{
'id': _cid_encode(id),
'ttl': metadata['ttl'],
'age': 0,
},
self.__get(id)
)
def __get(self, cid):
records = self.driver.run('''
select id, content, ttl, julianday() * 86400.0 - created
from Messages join Locked
on msgid = id
where ttl > julianday() * 86400.0 - created
and cid = ?''', cid)
for id, content, ttl, age in records:
yield {
'id': _msgid_encode(id),
'ttl': ttl,
'age': int(age),
'body': content,
}
def update(self, queue, claim_id, metadata, tenant):
try:
self.driver.run('''
update Claims
set ttl = ?
where id = ?
and qid = (select id from Queues
where tenant = ? and name = ?)
''', _msgid_decode(message_id), tenant, queue)
''', metadata['ttl'], _cid_decode(claim_id), tenant, queue)
if not self.driver.affected:
_claim_doesnotexist(claim_id)
except _BadID:
_claim_doesnotexist(claim_id)
def delete(self, queue, claim_id, tenant):
try:
self.driver.run('''
delete from Claims
where id = ?
and qid = (select id from Queues
where tenant = ? and name = ?)
''', _cid_decode(claim_id), tenant, queue)
except _BadID:
pass
def stats(self, queue, claim_id, tenant=None):
raise NotImplementedError
class _NoResult(Exception):
pass
@@ -231,6 +383,13 @@ def _msg_doesnotexist(id):
raise exceptions.DoesNotExist(msg)
def _claim_doesnotexist(id):
msg = (_("Claim %(id)s does not exist")
% dict(id=id))
raise exceptions.DoesNotExist(msg)
def _get_qid(driver, queue, tenant):
try:
return driver.get('''
@@ -271,3 +430,15 @@ def _marker_decode(id):
except ValueError:
raise _BadID
def _cid_encode(id):
return hex(id ^ 0x63c9a59c)[2:]
def _cid_decode(id):
try:
return int(id, 16) ^ 0x63c9a59c
except ValueError:
raise _BadID

View File

@@ -84,6 +84,22 @@ class Driver(storage.DriverBase):
except StopIteration:
raise controllers._NoResult
@property
def affected(self):
"""
Check whether a row is affected in
the last operation.
"""
assert self.__db.rowcount in (0, 1)
return self.__db.rowcount == 1
@property
def lastrowid(self):
"""
Get last inserted row id.
"""
return self.__db.lastrowid
@contextlib.contextmanager
def __call__(self, isolation):
self.run('begin ' + isolation)
@@ -104,4 +120,4 @@ class Driver(storage.DriverBase):
@property
def claim_controller(self):
return None
return controllers.Claim(self)

View File

@@ -30,6 +30,7 @@ class TestSqlite(testing.TestBase):
self.queue_ctrl = storage.queue_controller
self.queue_ctrl.upsert('fizbit', {'_message_ttl': 40}, '480924')
self.msg_ctrl = storage.message_controller
self.claim_ctrl = storage.claim_controller
def test_some_messages(self):
doc = [
@@ -83,14 +84,45 @@ class TestSqlite(testing.TestBase):
self.assertEquals(cnt, 4)
self.assertIn(
'body', self.msg_ctrl.get('fizbit', msgid, '480924'))
# can not delete a message with a wrong claim
meta, msgs = self.claim_ctrl.create('fizbit', {'ttl': 10}, '480924')
self.msg_ctrl.delete('fizbit', msgid, '480924')
with testtools.ExpectedException(exceptions.NotPermitted):
self.msg_ctrl.delete('fizbit', msgid, '480924', meta['id'])
self.msg_ctrl.get('fizbit', msgid, '480924')
# create a claim
meta, msgs = self.claim_ctrl.create('fizbit', {'ttl': 10}, '480924')
self.assertEquals(meta['ttl'], 10)
self.assertEquals(len(list(msgs)), 1)
# delete a message under a claim
self.msg_ctrl.delete('fizbit', msgid, '480924', meta['id'])
with testtools.ExpectedException(exceptions.DoesNotExist):
self.msg_ctrl.get('fizbit', msgid, '480924')
meta, msgs = self.claim_ctrl.get('fizbit', meta['id'], '480924')
self.assertEquals(len(list(msgs)), 0)
# it's just fine to delete a non-existing message
self.msg_ctrl.delete('fizbit', msgid, '480924')
# claim expires
self.claim_ctrl.update('fizbit', meta['id'], {'ttl': 0}, '480924')
with testtools.ExpectedException(exceptions.DoesNotExist):
self.claim_ctrl.get('fizbit', meta['id'], '480924')
# delete the claim
self.claim_ctrl.delete('fizbit', meta['id'], '480924')
with testtools.ExpectedException(exceptions.DoesNotExist):
self.claim_ctrl.update('fizbit', meta['id'], {'ttl': 40}, '480924')
def test_expired_messages(self):
doc = [
{'body': {}, 'ttl': 0},
@@ -123,7 +155,11 @@ class TestSqlite(testing.TestBase):
with testtools.ExpectedException(exceptions.DoesNotExist):
self.msg_ctrl.get('nonexistent', 'illformed', '480924')
self.msg_ctrl.delete('nonexistent', 'illformed', '480924')
self.claim_ctrl.delete('nonexistent', 'illformed', '480924')
with testtools.ExpectedException(exceptions.DoesNotExist):
self.claim_ctrl.update('nonexistent', 'illformed',
{'ttl': 40}, '480924')
def tearDown(self):
self.queue_ctrl.delete('fizbit', '480924')