Merge "Support the new claim TTL semantics in SQlite."
This commit is contained in:
@@ -333,6 +333,8 @@ class Claim(base.ClaimBase):
|
|||||||
and qid = ?
|
and qid = ?
|
||||||
limit ?''', qid, limit)
|
limit ?''', qid, limit)
|
||||||
|
|
||||||
|
self.__update_claimed(id, metadata['ttl'])
|
||||||
|
|
||||||
return (_cid_encode(id), self.__get(id))
|
return (_cid_encode(id), self.__get(id))
|
||||||
|
|
||||||
def __get(self, cid):
|
def __get(self, cid):
|
||||||
@@ -353,22 +355,40 @@ class Claim(base.ClaimBase):
|
|||||||
|
|
||||||
def update(self, queue, claim_id, metadata, tenant):
|
def update(self, queue, claim_id, metadata, tenant):
|
||||||
try:
|
try:
|
||||||
# still delay the cleanup here
|
id = _cid_decode(claim_id)
|
||||||
self.driver.run('''
|
|
||||||
update Claims
|
|
||||||
set ttl = ?
|
|
||||||
where ttl > julianday() * 86400.0 - created
|
|
||||||
and id = ?
|
|
||||||
and qid = (select id from Queues
|
|
||||||
where tenant = ? and name = ?)
|
|
||||||
''', metadata['ttl'], _cid_decode(claim_id), tenant, queue)
|
|
||||||
|
|
||||||
if not self.driver.affected:
|
with self.driver('deferred'):
|
||||||
raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant)
|
|
||||||
|
# still delay the cleanup here
|
||||||
|
self.driver.run('''
|
||||||
|
update Claims
|
||||||
|
set created = julianday() * 86400.0,
|
||||||
|
ttl = ?
|
||||||
|
where ttl > julianday() * 86400.0 - created
|
||||||
|
and id = ?
|
||||||
|
and qid = (select id from Queues
|
||||||
|
where tenant = ? and name = ?)
|
||||||
|
''', metadata['ttl'], id, tenant, queue)
|
||||||
|
|
||||||
|
if not self.driver.affected:
|
||||||
|
raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant)
|
||||||
|
|
||||||
|
self.__update_claimed(id, metadata['ttl'])
|
||||||
|
|
||||||
except _BadID:
|
except _BadID:
|
||||||
raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant)
|
raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant)
|
||||||
|
|
||||||
|
def __update_claimed(self, cid, ttl):
|
||||||
|
# Precondition: cid is not expired
|
||||||
|
self.driver.run('''
|
||||||
|
update Messages
|
||||||
|
set created = julianday() * 86400.0,
|
||||||
|
ttl = ?
|
||||||
|
where ttl < ?
|
||||||
|
and id in (select msgid from Locked
|
||||||
|
where cid = ?)
|
||||||
|
''', ttl, ttl, cid)
|
||||||
|
|
||||||
def delete(self, queue, claim_id, tenant):
|
def delete(self, queue, claim_id, tenant):
|
||||||
try:
|
try:
|
||||||
self.driver.run('''
|
self.driver.run('''
|
||||||
|
|||||||
@@ -300,7 +300,9 @@ class ClaimControllerTest(ControllerBaseTest):
|
|||||||
|
|
||||||
messages2 = list(messages2)
|
messages2 = list(messages2)
|
||||||
self.assertEquals(len(messages2), 15)
|
self.assertEquals(len(messages2), 15)
|
||||||
self.assertEquals(messages, messages2)
|
#TODO(zyuan): Add some tests to ensure the ttl is extended/not-extended
|
||||||
|
for msg1, msg2 in zip(messages, messages2):
|
||||||
|
self.assertEquals(msg1['body'], msg2['body'])
|
||||||
self.assertEquals(claim["ttl"], 100)
|
self.assertEquals(claim["ttl"], 100)
|
||||||
self.assertEquals(claim["id"], claim_id)
|
self.assertEquals(claim["id"], claim_id)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user