Clean up expired messages for SQLAlchemy driver

Change-Id: I1d14258528bdea89399d491d45e5e1dfd1b06b50
Closes-Bug: #1331228
This commit is contained in:
pengfei wang
2014-07-30 13:22:00 +08:00
parent 8d889fc4e7
commit d8904dd495
2 changed files with 52 additions and 7 deletions

View File

@@ -13,8 +13,10 @@
# the License.
import datetime
import uuid
import sqlalchemy as sa
from sqlalchemy.sql import func as sfunc
from zaqar.queues.storage import pooling
from zaqar.queues.storage import sqlalchemy
@@ -64,6 +66,49 @@ class SqlalchemyMessageTests(base.MessageControllerTest):
driver_class = sqlalchemy.DataDriver
controller_class = controllers.MessageController
def test_expired_messages_be_deleted(self):
messages = [{'body': 3.14, 'ttl': 0}, {'body': 0.618, 'ttl': 600}]
client_uuid = uuid.uuid4()
[msgid_expired, msgid] = self.controller.post(self.queue_name,
messages,
project=self.project,
client_uuid=client_uuid)
mid = utils.msgid_decode(msgid_expired)
def _get(count=False):
j = sa.join(tables.Messages, tables.Queues,
tables.Messages.c.qid == tables.Queues.c.id)
sel = sa.sql.select([tables.Messages.c.body,
tables.Messages.c.ttl,
tables.Messages.c.created])
if count:
sel = sa.sql.select([sfunc.count(tables.Messages.c.id)])
sel = sel.select_from(j)
and_stmt = [tables.Messages.c.id == mid,
tables.Queues.c.name == self.queue_name,
tables.Queues.c.project == self.project]
sel = sel.where(sa.and_(*and_stmt))
return self.driver.get(sel)
[count] = _get(count=True)
self.assertEqual(count, 1)
# Expired messages will be removed from db until next Post
message = [{'body': 3.14, 'ttl': 300}]
self.controller.post(self.queue_name,
message,
project=self.project,
client_uuid=client_uuid)
with testing.expect(utils.NoResult):
_get()
class SqlalchemyClaimTests(base.ClaimControllerTest):
driver_class = sqlalchemy.DataDriver

View File

@@ -215,13 +215,13 @@ class MessageController(storage.Message):
with self.driver.trans() as trans:
qid = utils.get_qid(self.driver, queue, project)
# TODO(kgriffs): Need to port this to sqla! Bug #1331228
#
# cleanup all expired messages in this queue
# self.driver.run('''
# delete from Messages
# where ttl <= julianday() * 86400.0 - created
# and qid = ?''', qid)
# Delete the expired messages
and_stmt = sa.and_(tables.Messages.c.ttl <=
sfunc.now() - tables.Messages.c.created,
tables.Messages.c.qid == qid)
statement = tables.Messages.delete().where(and_stmt)
trans.execute(statement)
# executemany() sets lastrowid to None, so no matter we manually
# generate the IDs or not, we still need to query for it.