diff --git a/zaqar/api/v1_1/response.py b/zaqar/api/v1_1/response.py index b57209bcf..90c45fb7d 100644 --- a/zaqar/api/v1_1/response.py +++ b/zaqar/api/v1_1/response.py @@ -49,7 +49,10 @@ class ResponseSchema(api.Api): "body": { "type": "object" - } + }, + "checksum": { + "type": "string", + }, }, "required": ["href", "ttl", "age", "body", "id"], "additionalProperties": False, @@ -328,7 +331,10 @@ class ResponseSchema(api.Api): "age": age, "body": { "type": "object" - } + }, + "checksum": { + "type": "string", + }, }, "required": ["href", "ttl", "age", "body", "id"], "additionalProperties": False, diff --git a/zaqar/api/v2/response.py b/zaqar/api/v2/response.py index 704d88c52..e3a1bfeb0 100644 --- a/zaqar/api/v2/response.py +++ b/zaqar/api/v2/response.py @@ -49,7 +49,11 @@ class ResponseSchema(api.Api): "body": { "type": "object" - } + }, + + "checksum": { + "type": "string", + }, }, "required": ["href", "ttl", "age", "body", "id"], "additionalProperties": False, @@ -328,7 +332,10 @@ class ResponseSchema(api.Api): "age": age, "body": { "type": "object" - } + }, + "checksum": { + "type": "string", + }, }, "required": ["href", "ttl", "age", "body", "id"], "additionalProperties": False, diff --git a/zaqar/storage/mongodb/messages.py b/zaqar/storage/mongodb/messages.py index 7db74e08e..da0845dce 100644 --- a/zaqar/storage/mongodb/messages.py +++ b/zaqar/storage/mongodb/messages.py @@ -35,6 +35,7 @@ from zaqar.i18n import _ from zaqar import storage from zaqar.storage import errors from zaqar.storage.mongodb import utils +from zaqar.storage import utils as s_utils LOG = logging.getLogger(__name__) @@ -137,6 +138,7 @@ class MessageController(storage.Message): client uuid -> u transaction -> tx delay -> d + checksum -> cs """ def __init__(self, *args, **kwargs): @@ -657,6 +659,7 @@ class MessageController(storage.Message): 'b': message['body'] if 'body' in message else {}, 'k': next_marker + index, 'tx': None, + 'cs': s_utils.get_checksum(message.get('body', None)) } for index, message in enumerate(messages) @@ -836,6 +839,7 @@ class FIFOMessageController(MessageController): 'b': message['body'] if 'body' in message else {}, 'k': next_marker + index, 'tx': transaction, + 'cs': s_utils.get_checksum(message.get('body', None)) } for index, message in enumerate(messages) @@ -1009,7 +1013,8 @@ def _basic_message(msg, now): 'ttl': msg['t'], 'claim_count': msg['c'].get('c', 0), 'body': msg['b'], - 'claim_id': str(msg['c']['id']) if msg['c']['id'] else None + 'claim_id': str(msg['c']['id']) if msg['c']['id'] else None, + 'checksum': msg.get('cs', '') } diff --git a/zaqar/storage/redis/messages.py b/zaqar/storage/redis/messages.py index 762c8da37..cbff92968 100644 --- a/zaqar/storage/redis/messages.py +++ b/zaqar/storage/redis/messages.py @@ -25,6 +25,7 @@ from zaqar.storage import errors from zaqar.storage.redis import models from zaqar.storage.redis import scripting from zaqar.storage.redis import utils +from zaqar.storage import utils as s_utils Message = models.Message MessageEnvelope = models.MessageEnvelope @@ -98,6 +99,8 @@ class MessageController(storage.Message, scripting.Mixin): +---------------------+---------+ | delay expiry time | d | +---------------------+---------+ + | body checksum | cs | + +---------------------+---------+ 4. Messages rank counter (Redis Hash): @@ -428,6 +431,7 @@ class MessageController(storage.Message, scripting.Mixin): claim_count=0, delay_expires=now + msg.get('delay', 0), body=msg.get('body', {}), + checksum=s_utils.get_checksum(msg.get('body', None)) ) prepared_msg.to_redis(pipe) diff --git a/zaqar/storage/redis/models.py b/zaqar/storage/redis/models.py index 6af3f5876..527fdb426 100644 --- a/zaqar/storage/redis/models.py +++ b/zaqar/storage/redis/models.py @@ -23,7 +23,7 @@ from oslo_utils import encodeutils from oslo_utils import uuidutils MSGENV_FIELD_KEYS = (b'id', b't', b'cr', b'e', b'u', b'c', b'c.e', - b'c.c', b'd') + b'c.c', b'd', b'cs') SUBENV_FIELD_KEYS = (b'id', b's', b'u', b't', b'e', b'o', b'p', b'c') @@ -51,6 +51,7 @@ class MessageEnvelope(object): 'claim_expires', 'claim_count', 'delay_expires', + 'checksum', ] def __init__(self, **kwargs): @@ -67,6 +68,7 @@ class MessageEnvelope(object): self.claim_expires = kwargs['claim_expires'] self.claim_count = kwargs.get('claim_count', 0) self.delay_expires = kwargs.get('delay_expires', 0) + self.checksum = kwargs.get('checksum', '') @staticmethod def from_hmap(hmap): @@ -278,6 +280,7 @@ def _hmap_to_msgenv_kwargs(hmap): 'claim_expires': int(hmap[b'c.e']), 'claim_count': int(hmap[b'c.c']), 'delay_expires': int(hmap.get(b'd', 0)), + 'checksum': encodeutils.safe_decode(hmap[b'cs']), } @@ -292,6 +295,7 @@ def _msgenv_to_hmap(msg): 'c.e': msg.claim_expires, 'c.c': msg.claim_count, 'd': msg.delay_expires, + 'cs': msg.checksum, } diff --git a/zaqar/storage/swift/messages.py b/zaqar/storage/swift/messages.py index f2b25a431..d8ace8233 100644 --- a/zaqar/storage/swift/messages.py +++ b/zaqar/storage/swift/messages.py @@ -24,6 +24,7 @@ from zaqar.common import decorators from zaqar import storage from zaqar.storage import errors from zaqar.storage.swift import utils +from zaqar.storage import utils as s_utils class MessageController(storage.Message): @@ -53,6 +54,8 @@ class MessageController(storage.Message): +--------------+-----------------------------------------+ | Expires | Object Delete-After header | +--------------------------------------------------------+ + | Checksum | Object content 'body' checksum | + +--------------------------------------------------------+ """ def __init__(self, *args, **kwargs): @@ -226,7 +229,8 @@ class MessageController(storage.Message): contents = jsonutils.dumps( {'body': msg.get('body', {}), 'claim_id': None, 'ttl': msg['ttl'], 'claim_count': 0, - 'delay_expires': now + msg.get('delay', 0)}) + 'delay_expires': now + msg.get('delay', 0), + 'checksum': s_utils.get_checksum(msg.get('body', None))}) utils._put_or_create_container( self._client, utils._message_container(queue, project), diff --git a/zaqar/storage/utils.py b/zaqar/storage/utils.py index 4a9a4a651..fbe0b5589 100644 --- a/zaqar/storage/utils.py +++ b/zaqar/storage/utils.py @@ -13,6 +13,8 @@ # the License. import copy +import hashlib +import json from oslo_config import cfg from oslo_log import log @@ -210,3 +212,30 @@ def can_connect(uri, conf=None): except Exception as exc: LOG.debug('Can\'t connect to: %s \n%s', (uri, exc)) return False + + +def get_checksum(body, algorithm='MD5'): + """According to the algorithm to get the message body checksum. + + :param body: The message body. + :type body: six.text_type + :param algorithm: The algorithm type, default is MD5. + :type algorithm: six.text_type + :returns: The message body checksum. + :rtype: six.text_type + """ + + checksum = '%s:' % algorithm + + if body is None: + return '' + else: + checksum_body = json.dumps(body).encode('utf-8') + # TODO(yangzhenyu): We may support other algorithms in future + # versions, including SHA1, SHA256, SHA512, and so on. + if algorithm == 'MD5': + md5 = hashlib.md5() + md5.update(checksum_body) + checksum += md5.hexdigest() + + return checksum diff --git a/zaqar/tests/unit/storage/base.py b/zaqar/tests/unit/storage/base.py index b6bafa27c..3037b6b07 100644 --- a/zaqar/tests/unit/storage/base.py +++ b/zaqar/tests/unit/storage/base.py @@ -18,6 +18,8 @@ import os import collections import datetime +import hashlib +import json import math import random import time @@ -448,7 +450,7 @@ class MessageControllerTest(ControllerBaseTest): message_out = self.controller.get(queue_name, message_id, project=self.project) self.assertEqual({'id', 'body', 'ttl', 'age', 'claim_count', - 'claim_id'}, set(message_out)) + 'claim_id', 'checksum'}, set(message_out)) self.assertEqual(message_id, message_out['id']) self.assertEqual(message['body'], message_out['body']) self.assertEqual(message['ttl'], message_out['ttl']) @@ -460,6 +462,39 @@ class MessageControllerTest(ControllerBaseTest): with testing.expect(errors.DoesNotExist): self.controller.get(queue_name, message_id, project=self.project) + def test_message_body_checksum(self): + queue_name = self.queue_name + + message = { + 'ttl': 60, + 'body': { + 'event': 'BackupStarted', + 'backupId': 'c378813c-3f0b-11e2-ad92-7823d2b0f3ce' + } + } + + # Test Message Creation + created = list(self.controller.post(queue_name, [message], + project=self.project, + client_uuid=uuid.uuid4())) + self.assertEqual(1, len(created)) + message_id = created[0] + + # Test Message Get + message_out = self.controller.get(queue_name, message_id, + project=self.project) + self.assertEqual({'id', 'body', 'ttl', 'age', 'claim_count', + 'claim_id', 'checksum'}, set(message_out)) + + algorithm, checksum = message_out['checksum'].split(':') + expected_checksum = '' + if algorithm == 'MD5': + md5 = hashlib.md5() + md5.update(json.dumps(message['body']).encode('utf-8')) + expected_checksum = md5.hexdigest() + + self.assertEqual(expected_checksum, checksum) + def test_get_multi(self): client_uuid = uuid.uuid4() @@ -504,7 +539,7 @@ class MessageControllerTest(ControllerBaseTest): for idx, message in enumerate(messages_out): self.assertEqual({'id', 'body', 'ttl', 'age', 'claim_count', - 'claim_id'}, set(message)) + 'claim_id', 'checksum'}, set(message)) self.assertEqual(idx, message['body']) self.controller.bulk_delete(self.queue_name, ids, diff --git a/zaqar/transport/wsgi/utils.py b/zaqar/transport/wsgi/utils.py index 481bd2487..e368397ce 100644 --- a/zaqar/transport/wsgi/utils.py +++ b/zaqar/transport/wsgi/utils.py @@ -241,4 +241,5 @@ def format_message_v1_1(message, base_path, claim_id=None): 'ttl': message['ttl'], 'age': message['age'], 'body': message['body'], + 'checksum': message.get('checksum', '') }