Support md5 of message body

DocImpact
ApiImpact

Tempest plugin
Depends-on: Icb82042afb1759f129f09e55c2961f1802ae83b4

Implement blueprint support-md5-of-body

Change-Id: I671737f423248ddc79bde74e492fc6d4c172bcd0
This commit is contained in:
yangzhenyu 2018-01-16 11:15:34 +08:00 committed by wangxiyuan
parent 43c97aede5
commit a8215f72f9
10 changed files with 148 additions and 33 deletions

View File

@ -49,7 +49,10 @@ class ResponseSchema(api.Api):
"body": { "body": {
"type": "object" "type": "object"
} },
"checksum": {
"type": "string",
},
}, },
"required": ["href", "ttl", "age", "body", "id"], "required": ["href", "ttl", "age", "body", "id"],
"additionalProperties": False, "additionalProperties": False,
@ -328,7 +331,10 @@ class ResponseSchema(api.Api):
"age": age, "age": age,
"body": { "body": {
"type": "object" "type": "object"
} },
"checksum": {
"type": "string",
},
}, },
"required": ["href", "ttl", "age", "body", "id"], "required": ["href", "ttl", "age", "body", "id"],
"additionalProperties": False, "additionalProperties": False,

View File

@ -49,7 +49,11 @@ class ResponseSchema(api.Api):
"body": { "body": {
"type": "object" "type": "object"
} },
"checksum": {
"type": "string",
},
}, },
"required": ["href", "ttl", "age", "body", "id"], "required": ["href", "ttl", "age", "body", "id"],
"additionalProperties": False, "additionalProperties": False,
@ -328,7 +332,10 @@ class ResponseSchema(api.Api):
"age": age, "age": age,
"body": { "body": {
"type": "object" "type": "object"
} },
"checksum": {
"type": "string",
},
}, },
"required": ["href", "ttl", "age", "body", "id"], "required": ["href", "ttl", "age", "body", "id"],
"additionalProperties": False, "additionalProperties": False,

View File

@ -35,6 +35,9 @@ _GENERAL_OPTIONS = (
item_type=cfg.types.List(item_type=cfg.types.String( item_type=cfg.types.List(item_type=cfg.types.String(
choices=('1', '1.1'))), choices=('1', '1.1'))),
help='List of deprecated API versions to enable.'), help='List of deprecated API versions to enable.'),
cfg.BoolOpt('enable_checksum', default=False,
help='Enable a checksum for message body. The default value '
'is False.'),
) )
_DRIVER_OPTIONS = ( _DRIVER_OPTIONS = (

View File

@ -35,6 +35,7 @@ from zaqar.i18n import _
from zaqar import storage from zaqar import storage
from zaqar.storage import errors from zaqar.storage import errors
from zaqar.storage.mongodb import utils from zaqar.storage.mongodb import utils
from zaqar.storage import utils as s_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -137,6 +138,7 @@ class MessageController(storage.Message):
client uuid -> u client uuid -> u
transaction -> tx transaction -> tx
delay -> d delay -> d
checksum -> cs
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -646,8 +648,9 @@ class MessageController(storage.Message):
project, project,
amount=msgs_n) - msgs_n amount=msgs_n) - msgs_n
prepared_messages = [ prepared_messages = []
{ for index, message in enumerate(messages):
msg = {
PROJ_QUEUE: utils.scope_queue_name(queue_name, project), PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
't': message['ttl'], 't': message['ttl'],
'e': now_dt + datetime.timedelta(seconds=message['ttl']), 'e': now_dt + datetime.timedelta(seconds=message['ttl']),
@ -656,11 +659,12 @@ class MessageController(storage.Message):
'd': now + message.get('delay', 0), 'd': now + message.get('delay', 0),
'b': message['body'] if 'body' in message else {}, 'b': message['body'] if 'body' in message else {},
'k': next_marker + index, 'k': next_marker + index,
'tx': None, 'tx': None
} }
if self.driver.conf.enable_checksum:
msg['cs'] = s_utils.get_checksum(message.get('body', None))
for index, message in enumerate(messages) prepared_messages.append(msg)
]
res = collection.insert_many(prepared_messages, res = collection.insert_many(prepared_messages,
bypass_document_validation=True) bypass_document_validation=True)
@ -825,8 +829,9 @@ class FIFOMessageController(MessageController):
# Unique transaction ID to facilitate atomic batch inserts # Unique transaction ID to facilitate atomic batch inserts
transaction = objectid.ObjectId() transaction = objectid.ObjectId()
prepared_messages = [ prepared_messages = []
{ for index, message in enumerate(messages):
msg = {
PROJ_QUEUE: utils.scope_queue_name(queue_name, project), PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
't': message['ttl'], 't': message['ttl'],
'e': now_dt + datetime.timedelta(seconds=message['ttl']), 'e': now_dt + datetime.timedelta(seconds=message['ttl']),
@ -835,11 +840,12 @@ class FIFOMessageController(MessageController):
'd': now + message.get('delay', 0), 'd': now + message.get('delay', 0),
'b': message['body'] if 'body' in message else {}, 'b': message['body'] if 'body' in message else {},
'k': next_marker + index, 'k': next_marker + index,
'tx': transaction, 'tx': None
} }
if self.driver.conf.enable_checksum:
msg['cs'] = s_utils.get_checksum(message.get('body', None))
for index, message in enumerate(messages) prepared_messages.append(msg)
]
# NOTE(kgriffs): Don't take the time to do a 2-phase insert # NOTE(kgriffs): Don't take the time to do a 2-phase insert
# if there is no way for it to partially succeed. # if there is no way for it to partially succeed.
@ -1002,8 +1008,7 @@ def _is_claimed(msg, now):
def _basic_message(msg, now): def _basic_message(msg, now):
oid = msg['_id'] oid = msg['_id']
age = now - utils.oid_ts(oid) age = now - utils.oid_ts(oid)
res = {
return {
'id': str(oid), 'id': str(oid),
'age': int(age), 'age': int(age),
'ttl': msg['t'], 'ttl': msg['t'],
@ -1011,6 +1016,10 @@ def _basic_message(msg, now):
'body': msg['b'], 'body': msg['b'],
'claim_id': str(msg['c']['id']) if msg['c']['id'] else None 'claim_id': str(msg['c']['id']) if msg['c']['id'] else None
} }
if msg.get('cs'):
res['checksum'] = msg.get('cs')
return res
class MessageQueueHandler(object): class MessageQueueHandler(object):

View File

@ -25,6 +25,7 @@ from zaqar.storage import errors
from zaqar.storage.redis import models from zaqar.storage.redis import models
from zaqar.storage.redis import scripting from zaqar.storage.redis import scripting
from zaqar.storage.redis import utils from zaqar.storage.redis import utils
from zaqar.storage import utils as s_utils
Message = models.Message Message = models.Message
MessageEnvelope = models.MessageEnvelope MessageEnvelope = models.MessageEnvelope
@ -98,6 +99,8 @@ class MessageController(storage.Message, scripting.Mixin):
+---------------------+---------+ +---------------------+---------+
| delay expiry time | d | | delay expiry time | d |
+---------------------+---------+ +---------------------+---------+
| body checksum | cs |
+---------------------+---------+
4. Messages rank counter (Redis Hash): 4. Messages rank counter (Redis Hash):
@ -416,7 +419,6 @@ class MessageController(storage.Message, scripting.Mixin):
message_ids = [] message_ids = []
now = timeutils.utcnow_ts() now = timeutils.utcnow_ts()
with self._client.pipeline() as pipe: with self._client.pipeline() as pipe:
for msg in messages: for msg in messages:
prepared_msg = Message( prepared_msg = Message(
@ -428,6 +430,8 @@ class MessageController(storage.Message, scripting.Mixin):
claim_count=0, claim_count=0,
delay_expires=now + msg.get('delay', 0), delay_expires=now + msg.get('delay', 0),
body=msg.get('body', {}), body=msg.get('body', {}),
checksum=s_utils.get_checksum(msg.get('body', None)) if
self.driver.conf.enable_checksum else None
) )
prepared_msg.to_redis(pipe) prepared_msg.to_redis(pipe)

View File

@ -23,7 +23,7 @@ from oslo_utils import encodeutils
from oslo_utils import uuidutils from oslo_utils import uuidutils
MSGENV_FIELD_KEYS = (b'id', b't', b'cr', b'e', b'u', b'c', b'c.e', MSGENV_FIELD_KEYS = (b'id', b't', b'cr', b'e', b'u', b'c', b'c.e',
b'c.c', b'd') b'c.c', b'd', b'cs')
SUBENV_FIELD_KEYS = (b'id', b's', b'u', b't', b'e', b'o', b'p', b'c') SUBENV_FIELD_KEYS = (b'id', b's', b'u', b't', b'e', b'o', b'p', b'c')
@ -51,6 +51,7 @@ class MessageEnvelope(object):
'claim_expires', 'claim_expires',
'claim_count', 'claim_count',
'delay_expires', 'delay_expires',
'checksum',
] ]
def __init__(self, **kwargs): def __init__(self, **kwargs):
@ -67,6 +68,7 @@ class MessageEnvelope(object):
self.claim_expires = kwargs['claim_expires'] self.claim_expires = kwargs['claim_expires']
self.claim_count = kwargs.get('claim_count', 0) self.claim_count = kwargs.get('claim_count', 0)
self.delay_expires = kwargs.get('delay_expires', 0) self.delay_expires = kwargs.get('delay_expires', 0)
self.checksum = kwargs.get('checksum')
@staticmethod @staticmethod
def from_hmap(hmap): def from_hmap(hmap):
@ -238,7 +240,8 @@ class Message(MessageEnvelope):
created_iso = datetime.datetime.utcfromtimestamp( created_iso = datetime.datetime.utcfromtimestamp(
self.created).strftime('%Y-%m-%dT%H:%M:%SZ') self.created).strftime('%Y-%m-%dT%H:%M:%SZ')
basic_msg['created'] = created_iso basic_msg['created'] = created_iso
if self.checksum:
basic_msg['checksum'] = self.checksum
return basic_msg return basic_msg
@ -266,7 +269,7 @@ def _hmap_to_msgenv_kwargs(hmap):
# NOTE(kgriffs): Under Py3K, redis-py converts all strings # NOTE(kgriffs): Under Py3K, redis-py converts all strings
# into binary. Woohoo! # into binary. Woohoo!
return { res = {
'id': encodeutils.safe_decode(hmap[b'id']), 'id': encodeutils.safe_decode(hmap[b'id']),
'ttl': int(hmap[b't']), 'ttl': int(hmap[b't']),
'created': int(hmap[b'cr']), 'created': int(hmap[b'cr']),
@ -277,12 +280,18 @@ def _hmap_to_msgenv_kwargs(hmap):
'claim_id': claim_id, 'claim_id': claim_id,
'claim_expires': int(hmap[b'c.e']), 'claim_expires': int(hmap[b'c.e']),
'claim_count': int(hmap[b'c.c']), 'claim_count': int(hmap[b'c.c']),
'delay_expires': int(hmap.get(b'd', 0)), 'delay_expires': int(hmap.get(b'd', 0))
} }
checksum = hmap.get(b'cs')
if checksum:
res['checksum'] = encodeutils.safe_decode(hmap[b'cs'])
return res
def _msgenv_to_hmap(msg): def _msgenv_to_hmap(msg):
return { res = {
'id': msg.id, 'id': msg.id,
't': msg.ttl, 't': msg.ttl,
'cr': msg.created, 'cr': msg.created,
@ -291,8 +300,11 @@ def _msgenv_to_hmap(msg):
'c': msg.claim_id or '', 'c': msg.claim_id or '',
'c.e': msg.claim_expires, 'c.e': msg.claim_expires,
'c.c': msg.claim_count, 'c.c': msg.claim_count,
'd': msg.delay_expires, 'd': msg.delay_expires
} }
if msg.checksum:
res['cs'] = msg.checksum
return res
def _hmap_kv_to_subenv(keys, values): def _hmap_kv_to_subenv(keys, values):

View File

@ -24,6 +24,7 @@ from zaqar.common import decorators
from zaqar import storage from zaqar import storage
from zaqar.storage import errors from zaqar.storage import errors
from zaqar.storage.swift import utils from zaqar.storage.swift import utils
from zaqar.storage import utils as s_utils
class MessageController(storage.Message): class MessageController(storage.Message):
@ -53,6 +54,8 @@ class MessageController(storage.Message):
+--------------+-----------------------------------------+ +--------------+-----------------------------------------+
| Expires | Object Delete-After header | | Expires | Object Delete-After header |
+--------------------------------------------------------+ +--------------------------------------------------------+
| Checksum | Object content 'body' checksum |
+--------------------------------------------------------+
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -223,10 +226,14 @@ class MessageController(storage.Message):
def _create_msg(self, queue, msg, client_uuid, project): def _create_msg(self, queue, msg, client_uuid, project):
slug = str(uuid.uuid1()) slug = str(uuid.uuid1())
now = timeutils.utcnow_ts() now = timeutils.utcnow_ts()
contents = jsonutils.dumps( message = {'body': msg.get('body', {}), 'claim_id': None,
{'body': msg.get('body', {}), 'claim_id': None,
'ttl': msg['ttl'], 'claim_count': 0, 'ttl': msg['ttl'], 'claim_count': 0,
'delay_expires': now + msg.get('delay', 0)}) 'delay_expires': now + msg.get('delay', 0)}
if self.driver.conf.enable_checksum:
message['checksum'] = s_utils.get_checksum(msg.get('body', None))
contents = jsonutils.dumps(message)
utils._put_or_create_container( utils._put_or_create_container(
self._client, self._client,
utils._message_container(queue, project), utils._message_container(queue, project),

View File

@ -13,6 +13,8 @@
# the License. # the License.
import copy import copy
import hashlib
import json
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
@ -210,3 +212,30 @@ def can_connect(uri, conf=None):
except Exception as exc: except Exception as exc:
LOG.debug('Can\'t connect to: %s \n%s', (uri, exc)) LOG.debug('Can\'t connect to: %s \n%s', (uri, exc))
return False return False
def get_checksum(body, algorithm='MD5'):
"""According to the algorithm to get the message body checksum.
:param body: The message body.
:type body: six.text_type
:param algorithm: The algorithm type, default is MD5.
:type algorithm: six.text_type
:returns: The message body checksum.
:rtype: six.text_type
"""
checksum = '%s:' % algorithm
if body is None:
return ''
else:
checksum_body = json.dumps(body).encode('utf-8')
# TODO(yangzhenyu): We may support other algorithms in future
# versions, including SHA1, SHA256, SHA512, and so on.
if algorithm == 'MD5':
md5 = hashlib.md5()
md5.update(checksum_body)
checksum += md5.hexdigest()
return checksum

View File

@ -18,6 +18,8 @@ import os
import collections import collections
import datetime import datetime
import hashlib
import json
import math import math
import random import random
import time import time
@ -460,6 +462,39 @@ class MessageControllerTest(ControllerBaseTest):
with testing.expect(errors.DoesNotExist): with testing.expect(errors.DoesNotExist):
self.controller.get(queue_name, message_id, project=self.project) self.controller.get(queue_name, message_id, project=self.project)
def test_message_body_checksum(self):
self.conf.enable_checksum = True
queue_name = self.queue_name
message = {
'ttl': 60,
'body': {
'event': 'BackupStarted',
'backupId': 'c378813c-3f0b-11e2-ad92-7823d2b0f3ce'
}
}
# Test Message Creation
created = list(self.controller.post(queue_name, [message],
project=self.project,
client_uuid=uuid.uuid4()))
self.assertEqual(1, len(created))
message_id = created[0]
# Test Message Get
message_out = self.controller.get(queue_name, message_id,
project=self.project)
self.assertEqual({'id', 'body', 'ttl', 'age', 'claim_count',
'claim_id', 'checksum'}, set(message_out))
algorithm, checksum = message_out['checksum'].split(':')
expected_checksum = ''
if algorithm == 'MD5':
md5 = hashlib.md5()
md5.update(json.dumps(message['body']).encode('utf-8'))
expected_checksum = md5.hexdigest()
self.assertEqual(expected_checksum, checksum)
def test_get_multi(self): def test_get_multi(self):
client_uuid = uuid.uuid4() client_uuid = uuid.uuid4()

View File

@ -235,10 +235,13 @@ def format_message_v1(message, base_path, claim_id=None):
def format_message_v1_1(message, base_path, claim_id=None): def format_message_v1_1(message, base_path, claim_id=None):
url = message_url(message, base_path, claim_id) url = message_url(message, base_path, claim_id)
return { res = {
'id': message['id'], 'id': message['id'],
'href': url, 'href': url,
'ttl': message['ttl'], 'ttl': message['ttl'],
'age': message['age'], 'age': message['age'],
'body': message['body'], 'body': message['body']
} }
if message.get('checksum'):
res['checksum'] = message.get('checksum')
return res