Fix message's get and deletion
The message get was not returning the expected result. Also, bulk_message deletion was not deleting messages due to a wrong serialization of messages ids. Implements blueprint: sql-storage-driver Change-Id: I9d04dc9f615b4b1d408a710cc447cfc2ed098286
This commit is contained in:
@@ -28,7 +28,7 @@ from marconi.queues.storage.sqlalchemy import utils
|
||||
|
||||
class MessageController(storage.Message):
|
||||
|
||||
def get(self, queue, message_id, project, count=False):
|
||||
def _get(self, queue, message_id, project, count=False):
|
||||
|
||||
if project is None:
|
||||
project = ''
|
||||
@@ -56,10 +56,29 @@ class MessageController(storage.Message):
|
||||
tables.Messages.c.ttl >
|
||||
sfunc.now() - tables.Messages.c.created))
|
||||
|
||||
return self.driver.get(sel)[0]
|
||||
return self.driver.get(sel)
|
||||
except utils.NoResult:
|
||||
raise errors.MessageDoesNotExist(message_id, queue, project)
|
||||
|
||||
def _exists(self, queue, message_id, project):
|
||||
try:
|
||||
# NOTE(flaper87): Use count to avoid returning
|
||||
# unnecessary data from the database.
|
||||
self._get(queue, message_id, project, count=True)
|
||||
return True
|
||||
except errors.MessageDoesNotExist:
|
||||
return False
|
||||
|
||||
def get(self, queue, message_id, project):
|
||||
body, ttl, created = self._get(queue, message_id, project)
|
||||
now = timeutils.utcnow_ts()
|
||||
return {
|
||||
'id': message_id,
|
||||
'ttl': ttl,
|
||||
'age': now - calendar.timegm(created.timetuple()),
|
||||
'body': json.loads(body),
|
||||
}
|
||||
|
||||
def bulk_get(self, queue, message_ids, project):
|
||||
if project is None:
|
||||
project = ''
|
||||
@@ -159,6 +178,13 @@ class MessageController(storage.Message):
|
||||
mark = utils.marker_decode(marker)
|
||||
if mark:
|
||||
and_clause.append(tables.Messages.c.id > mark)
|
||||
else:
|
||||
# NOTE(flaper87): Awful hack.
|
||||
# If the marker is invalid, we don't want to
|
||||
# return *any* record. Since rows PKs start
|
||||
# from 0, it won't match anything and the query
|
||||
# will still be fast.
|
||||
and_clause.append(tables.Messages.c.id < -1)
|
||||
|
||||
if not include_claimed:
|
||||
and_clause.append(tables.Messages.c.cid == (None))
|
||||
@@ -224,9 +250,7 @@ class MessageController(storage.Message):
|
||||
return
|
||||
|
||||
with self.driver.trans() as trans:
|
||||
try:
|
||||
self.get(queue, message_id, project, count=True)
|
||||
except errors.MessageDoesNotExist:
|
||||
if not self._exists(queue, message_id, project):
|
||||
return
|
||||
|
||||
statement = tables.Messages.delete()
|
||||
@@ -254,13 +278,14 @@ class MessageController(storage.Message):
|
||||
if project is None:
|
||||
project = ''
|
||||
|
||||
message_ids = ','.join(
|
||||
["'%s'" % id for id in
|
||||
map(utils.msgid_decode, message_ids) if id]
|
||||
)
|
||||
message_ids = [id for id in
|
||||
map(utils.msgid_decode, message_ids) if id]
|
||||
|
||||
with self.driver.trans() as trans:
|
||||
qid = utils.get_qid(self.driver, queue, project)
|
||||
try:
|
||||
qid = utils.get_qid(self.driver, queue, project)
|
||||
except errors.QueueDoesNotExist:
|
||||
return
|
||||
|
||||
statement = tables.Messages.delete()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user