
344 lines
9.9 KiB

# Copyright (c) 2014 Prashanth Raghu.
# Copyright (c) 2015 Catalyst IT Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import functools
import uuid
import msgpack
from oslo_utils import encodeutils
from oslo_utils import uuidutils
MSGENV_FIELD_KEYS = (b'id', b't', b'cr', b'e', b'u', b'c', b'c.e',
b'c.c', b'd', b'cs')
SUBENV_FIELD_KEYS = (b'id', b's', b'u', b't', b'e', b'o', b'p', b'c')
# TODO(kgriffs): Make similar classes for claims and queues
class MessageEnvelope(object):
"""Encapsulates the message envelope (metadata only, no body).
:param id: Message ID in the form of a hexadecimal UUID. If not
given, one will be automatically generated.
:param ttl: Message TTL in seconds
:param created: Message creation time as a UNIX timestamp
:param client_uuid: UUID of the client that posted the message
:param claim_id: If claimed, the UUID of the claim. Set to None
for messages that have never been claimed.
:param claim_expires: Claim expiration as a UNIX timestamp
__slots__ = [
def __init__(self, **kwargs): = _validate_uuid4(kwargs.get('id', uuidutils.generate_uuid()))
self.ttl = kwargs['ttl']
self.created = kwargs['created']
self.expires = kwargs.get('expires', self.created + self.ttl)
self.client_uuid = _validate_uuid4(str(kwargs['client_uuid']))
self.claim_id = kwargs.get('claim_id')
if self.claim_id:
self.claim_expires = kwargs['claim_expires']
self.claim_count = kwargs.get('claim_count', 0)
self.delay_expires = kwargs.get('delay_expires', 0)
self.checksum = kwargs.get('checksum')
def from_hmap(hmap):
kwargs = _hmap_to_msgenv_kwargs(hmap)
return MessageEnvelope(**kwargs)
def from_redis(mid, client):
values = client.hmget(mid, MSGENV_FIELD_KEYS)
# NOTE(kgriffs): If the key does not exist, redis-py returns
# an array of None values.
if values[0] is None:
return None
return _hmap_kv_to_msgenv(MSGENV_FIELD_KEYS, values)
def from_redis_bulk(message_ids, client):
with client.pipeline() as pipe:
for mid in message_ids:
pipe.hmget(mid, MSGENV_FIELD_KEYS)
results = pipe.execute()
message_envs = []
for value_list in results:
if value_list is None:
env = None
env = _hmap_kv_to_msgenv(MSGENV_FIELD_KEYS, value_list)
return message_envs
def to_redis(self, pipe):
hmap = _msgenv_to_hmap(self)
pipe.hmset(, hmap)
pipe.expire(, self.ttl)
class SubscriptionEnvelope(object):
"""Encapsulates the subscription envelope."""
__slots__ = [
def __init__(self, **kwargs): = kwargs.get('id', uuidutils.generate_uuid())
self.source = kwargs['source']
self.subscriber = kwargs['subscriber']
self.ttl = kwargs['ttl']
self.expires = kwargs.get('expires', float('inf'))
self.options = kwargs['options']
self.confirmed = kwargs.get('confirmed', 1)
def from_redis(sid, client):
values = client.hmget(sid, SUBENV_FIELD_KEYS)
# NOTE(kgriffs): If the key does not exist, redis-py returns
# an array of None values.
if values[0] is None:
return None
return _hmap_kv_to_subenv(SUBENV_FIELD_KEYS, values)
def to_redis(self, pipe):
hmap = _subenv_to_hmap(self)
pipe.hmset(, hmap)
pipe.expire(, self.ttl)
def to_basic(self, now):
created = self.expires - self.ttl
is_confirmed = bool(self.confirmed)
basic_msg = {
'source': encodeutils.safe_decode(self.source),
'subscriber': encodeutils.safe_decode(self.subscriber),
'ttl': self.ttl,
'age': now - created,
'options': self.options,
'confirmed': is_confirmed,
return basic_msg
# NOTE(kgriffs): This could have implemented MessageEnvelope functionality
# by adding an "include_body" param to all the methods, but then you end
# up with tons of if statements that make the code rather ugly.
class Message(MessageEnvelope):
"""Represents an entire message, including envelope and body.
:param id: Message ID in the form of a hexadecimal UUID. If not
given, one will be automatically generated.
:param ttl: Message TTL in seconds
:param created: Message creation time as a UNIX timestamp
:param client_uuid: UUID of the client that posted the message
:param claim_id: If claimed, the UUID of the claim. Set to None
for messages that have never been claimed.
:param claim_expires: Claim expiration as a UNIX timestamp
:param body: Message payload. Must be serializable to mspack.
__slots__ = MessageEnvelope.__slots__ + ['body']
def __init__(self, **kwargs):
super(Message, self).__init__(**kwargs)
self.body = kwargs['body']
def from_hmap(hmap):
kwargs = _hmap_to_msgenv_kwargs(hmap)
kwargs['body'] = _unpack(hmap[b'b'])
return Message(**kwargs)
def from_redis(mid, client):
hmap = client.hgetall(mid)
return Message.from_hmap(hmap) if hmap else None
def from_redis_bulk(message_ids, client):
with client.pipeline() as pipe:
for mid in message_ids:
results = pipe.execute()
messages = [Message.from_hmap(hmap) if hmap else None
for hmap in results]
return messages
def to_redis(self, pipe, include_body=True):
if not include_body:
super(Message, self).to_redis(pipe)
hmap = _msgenv_to_hmap(self)
hmap['b'] = _pack(self.body)
pipe.hmset(, hmap)
pipe.expire(, self.ttl)
def to_basic(self, now, include_created=False):
basic_msg = {
'age': now - self.created,
'ttl': self.ttl,
'body': self.body,
'claim_id': self.claim_id,
'claim_count': self.claim_count,
if include_created:
created_iso = datetime.datetime.utcfromtimestamp(
basic_msg['created'] = created_iso
if self.checksum:
basic_msg['checksum'] = self.checksum
return basic_msg
# ==========================================================================
# Helpers
# ==========================================================================
_pack = msgpack.Packer(use_bin_type=True).pack
_unpack = functools.partial(msgpack.unpackb)
def _hmap_kv_to_msgenv(keys, values):
hmap = dict(zip(keys, values))
kwargs = _hmap_to_msgenv_kwargs(hmap)
return MessageEnvelope(**kwargs)
def _hmap_to_msgenv_kwargs(hmap):
claim_id = hmap[b'c']
if claim_id:
claim_id = encodeutils.safe_decode(claim_id)
claim_id = None
# NOTE(kgriffs): Under Py3K, redis-py converts all strings
# into binary. Woohoo!
res = {
'id': encodeutils.safe_decode(hmap[b'id']),
'ttl': int(hmap[b't']),
'created': int(hmap[b'cr']),
'expires': int(hmap[b'e']),
'client_uuid': encodeutils.safe_decode(hmap[b'u']),
'claim_id': claim_id,
'claim_expires': int(hmap[b'c.e']),
'claim_count': int(hmap[b'c.c']),
'delay_expires': int(hmap.get(b'd', 0))
checksum = hmap.get(b'cs')
if checksum:
res['checksum'] = encodeutils.safe_decode(hmap[b'cs'])
return res
def _msgenv_to_hmap(msg):
res = {
't': msg.ttl,
'cr': msg.created,
'e': msg.expires,
'u': msg.client_uuid,
'c': msg.claim_id or '',
'c.e': msg.claim_expires,
'c.c': msg.claim_count,
'd': msg.delay_expires
if msg.checksum:
res['cs'] = msg.checksum
return res
def _hmap_kv_to_subenv(keys, values):
hmap = dict(zip(keys, values))
kwargs = _hmap_to_subenv_kwargs(hmap)
return SubscriptionEnvelope(**kwargs)
def _hmap_to_subenv_kwargs(hmap):
# NOTE(kgriffs): Under Py3K, redis-py converts all strings
# into binary. Woohoo!
return {
'id': encodeutils.safe_decode(hmap[b'id']),
'source': encodeutils.safe_decode(hmap[b's']),
'subscriber': encodeutils.safe_decode(hmap[b'u']),
'ttl': int(hmap[b't']),
'expires': int(hmap[b'e']),
'options': _unpack(hmap[b'o']),
'confirmed': int(hmap[b'c'])
def _subenv_to_hmap(msg):
return {
's': msg.source,
'u': msg.subscriber,
't': msg.ttl,
'e': msg.expires,
'o': msg.options
def _validate_uuid4(_uuid):
uuid.UUID(str(_uuid), version=4)
return _uuid