Merge "Support delayed queues for redis"
This commit is contained in:
commit
5e4cba1490
@ -96,6 +96,8 @@ class MessageController(storage.Message, scripting.Mixin):
|
|||||||
+---------------------+---------+
|
+---------------------+---------+
|
||||||
| created time | cr |
|
| created time | cr |
|
||||||
+---------------------+---------+
|
+---------------------+---------+
|
||||||
|
| delay expiry time | d |
|
||||||
|
+---------------------+---------+
|
||||||
|
|
||||||
4. Messages rank counter (Redis Hash):
|
4. Messages rank counter (Redis Hash):
|
||||||
|
|
||||||
@ -223,7 +225,7 @@ class MessageController(storage.Message, scripting.Mixin):
|
|||||||
def _list(self, queue, project=None, marker=None,
|
def _list(self, queue, project=None, marker=None,
|
||||||
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
|
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
|
||||||
echo=False, client_uuid=None,
|
echo=False, client_uuid=None,
|
||||||
include_claimed=False,
|
include_claimed=False, include_delayed=False,
|
||||||
to_basic=True):
|
to_basic=True):
|
||||||
|
|
||||||
if not self._queue_ctrl.exists(queue, project):
|
if not self._queue_ctrl.exists(queue, project):
|
||||||
@ -262,6 +264,10 @@ class MessageController(storage.Message, scripting.Mixin):
|
|||||||
filters.append(functools.partial(utils.msg_claimed_filter,
|
filters.append(functools.partial(utils.msg_claimed_filter,
|
||||||
now=now))
|
now=now))
|
||||||
|
|
||||||
|
if not include_delayed:
|
||||||
|
filters.append(functools.partial(utils.msg_delayed_filter,
|
||||||
|
now=now))
|
||||||
|
|
||||||
if not echo:
|
if not echo:
|
||||||
filters.append(functools.partial(utils.msg_echo_filter,
|
filters.append(functools.partial(utils.msg_echo_filter,
|
||||||
client_uuid=client_uuid))
|
client_uuid=client_uuid))
|
||||||
@ -344,10 +350,11 @@ class MessageController(storage.Message, scripting.Mixin):
|
|||||||
def list(self, queue, project=None, marker=None,
|
def list(self, queue, project=None, marker=None,
|
||||||
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
|
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
|
||||||
echo=False, client_uuid=None,
|
echo=False, client_uuid=None,
|
||||||
include_claimed=False):
|
include_claimed=False, include_delayed=False):
|
||||||
|
|
||||||
return self._list(queue, project, marker, limit, echo,
|
return self._list(queue, project, marker, limit, echo,
|
||||||
client_uuid, include_claimed)
|
client_uuid, include_claimed,
|
||||||
|
include_delayed)
|
||||||
|
|
||||||
@utils.raises_conn_error
|
@utils.raises_conn_error
|
||||||
@utils.retries_on_connection_error
|
@utils.retries_on_connection_error
|
||||||
@ -419,6 +426,7 @@ class MessageController(storage.Message, scripting.Mixin):
|
|||||||
claim_id=None,
|
claim_id=None,
|
||||||
claim_expires=now,
|
claim_expires=now,
|
||||||
claim_count=0,
|
claim_count=0,
|
||||||
|
delay_expires=now + msg.get('delay', 0),
|
||||||
body=msg.get('body', {}),
|
body=msg.get('body', {}),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -22,7 +22,8 @@ import msgpack
|
|||||||
from oslo_utils import encodeutils
|
from oslo_utils import encodeutils
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
|
|
||||||
MSGENV_FIELD_KEYS = (b'id', b't', b'cr', b'e', b'u', b'c', b'c.e', b'c.c')
|
MSGENV_FIELD_KEYS = (b'id', b't', b'cr', b'e', b'u', b'c', b'c.e',
|
||||||
|
b'c.c', b'd')
|
||||||
SUBENV_FIELD_KEYS = (b'id', b's', b'u', b't', b'e', b'o', b'p', b'c')
|
SUBENV_FIELD_KEYS = (b'id', b's', b'u', b't', b'e', b'o', b'p', b'c')
|
||||||
|
|
||||||
|
|
||||||
@ -49,6 +50,7 @@ class MessageEnvelope(object):
|
|||||||
'claim_id',
|
'claim_id',
|
||||||
'claim_expires',
|
'claim_expires',
|
||||||
'claim_count',
|
'claim_count',
|
||||||
|
'delay_expires',
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
@ -64,6 +66,7 @@ class MessageEnvelope(object):
|
|||||||
_validate_uuid4(self.claim_id)
|
_validate_uuid4(self.claim_id)
|
||||||
self.claim_expires = kwargs['claim_expires']
|
self.claim_expires = kwargs['claim_expires']
|
||||||
self.claim_count = kwargs.get('claim_count', 0)
|
self.claim_count = kwargs.get('claim_count', 0)
|
||||||
|
self.delay_expires = kwargs.get('delay_expires', 0)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_hmap(hmap):
|
def from_hmap(hmap):
|
||||||
@ -274,6 +277,7 @@ def _hmap_to_msgenv_kwargs(hmap):
|
|||||||
'claim_id': claim_id,
|
'claim_id': claim_id,
|
||||||
'claim_expires': int(hmap[b'c.e']),
|
'claim_expires': int(hmap[b'c.e']),
|
||||||
'claim_count': int(hmap[b'c.c']),
|
'claim_count': int(hmap[b'c.c']),
|
||||||
|
'delay_expires': int(hmap.get(b'd', 0)),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -287,6 +291,7 @@ def _msgenv_to_hmap(msg):
|
|||||||
'c': msg.claim_id or '',
|
'c': msg.claim_id or '',
|
||||||
'c.e': msg.claim_expires,
|
'c.e': msg.claim_expires,
|
||||||
'c.c': msg.claim_count,
|
'c.c': msg.claim_count,
|
||||||
|
'd': msg.delay_expires,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -55,13 +55,16 @@ while (#claimed_msgs < limit) do
|
|||||||
-- unclaimed as well.
|
-- unclaimed as well.
|
||||||
|
|
||||||
if not found_unclaimed then
|
if not found_unclaimed then
|
||||||
local msg = redis.call('HMGET', mid, 'c', 'c.e')
|
local msg = redis.call('HMGET', mid, 'c', 'c.e', 'd')
|
||||||
if msg[1] == false and msg[2] == false then
|
if msg[1] == false and msg[2] == false then
|
||||||
-- NOTE(Eva-i): It means the message expired and does not
|
-- NOTE(Eva-i): It means the message expired and does not
|
||||||
-- actually exist anymore, we must later garbage collect it's
|
-- actually exist anymore, we must later garbage collect it's
|
||||||
-- ID from the set and move on.
|
-- ID from the set and move on.
|
||||||
msg_ids_to_cleanup[#msg_ids_to_cleanup + 1] = mid
|
msg_ids_to_cleanup[#msg_ids_to_cleanup + 1] = mid
|
||||||
elseif msg[1] == '' or tonumber(msg[2]) <= now then
|
elseif (msg[1] == '' or tonumber(msg[2]) <= now)
|
||||||
|
and tonumber(msg[3]) <= now then
|
||||||
|
-- NOTE(cdyangzhenyu): If the message's delay time has not
|
||||||
|
-- expired, the message can not be claimed.
|
||||||
found_unclaimed = true
|
found_unclaimed = true
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -219,6 +219,12 @@ def msg_claimed_filter(message, now):
|
|||||||
return message.claim_id and (now < message.claim_expires)
|
return message.claim_id and (now < message.claim_expires)
|
||||||
|
|
||||||
|
|
||||||
|
def msg_delayed_filter(message, now):
|
||||||
|
"""Return True IFF the message is currently delayed."""
|
||||||
|
|
||||||
|
return now < message.delay_expires
|
||||||
|
|
||||||
|
|
||||||
def msg_echo_filter(message, client_uuid):
|
def msg_echo_filter(message, client_uuid):
|
||||||
"""Return True IFF the specified client posted the message."""
|
"""Return True IFF the specified client posted the message."""
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user