diff --git a/api-ref/source/parameters.yaml b/api-ref/source/parameters.yaml index b2c788ea8..c2fd14719 100644 --- a/api-ref/source/parameters.yaml +++ b/api-ref/source/parameters.yaml @@ -239,6 +239,15 @@ _default_message_ttl: one of the ``reserved attributes`` of Zaqar queues. The value will be reverted to the default value after deleting it explicitly. +_enable_encrypt_messages: + type: boolean + in: body + required: False + description: | + The switch of encrypting messages for a queue, which will effect for + any messages posted to the queue. By default, the value is False. It is + one of the ``reserved attributes`` of Zaqar queues. + _flavor: type: string in: body diff --git a/api-ref/source/queues.inc b/api-ref/source/queues.inc index 49cedcac1..fc74de90c 100644 --- a/api-ref/source/queues.inc +++ b/api-ref/source/queues.inc @@ -85,8 +85,8 @@ exceed 64 bytes in length, and it is limited to US-ASCII letters, digits, underscores, and hyphens. When create queue, user can specify metadata for the queue. Currently, Zaqar -supports below metadata: _flavor, _max_claim_count, _dead_letter_queue and -_dead_letter_queue_messages_ttl. +supports below metadata: _flavor, _max_claim_count, _dead_letter_queue, +_dead_letter_queue_messages_ttl and _enable_encrypt_messages. In order to support the delayed queues, now add a metadata ``_default_message_delay``. @@ -119,6 +119,7 @@ Request Parameters - _flavor: _flavor - _max_claim_count: _max_claim_count - _max_messages_post_size: _max_messages_post_size + - _enable_encrypt_messages: _enable_encrypt_messages Request Example --------------- @@ -225,6 +226,7 @@ Response Parameters - _max_claim_count: _max_claim_count_response - _dead_letter_queue: _dead_letter_queue_response - _dead_letter_queue_messages_ttl: _dead_letter_queue_messages_ttl_response + - _enable_encrypt_messages: _enable_encrypt_messages Response Example ---------------- diff --git a/api-ref/source/samples/queue-create-request.json b/api-ref/source/samples/queue-create-request.json index 1f2cd71e9..2b0e0209e 100644 --- a/api-ref/source/samples/queue-create-request.json +++ b/api-ref/source/samples/queue-create-request.json @@ -5,5 +5,6 @@ "_dead_letter_queue": "dead_letter", "_dead_letter_queue_messages_ttl": 3600, "_max_claim_count": 10, + "_enable_encrypt_messages": true, "description": "Queue for international traffic billing." } \ No newline at end of file diff --git a/api-ref/source/samples/queue-show-response.json b/api-ref/source/samples/queue-show-response.json index 2303f2a5a..30045df93 100644 --- a/api-ref/source/samples/queue-show-response.json +++ b/api-ref/source/samples/queue-show-response.json @@ -4,5 +4,6 @@ "description": "Queue used for billing.", "_max_claim_count": 10, "_dead_letter_queue": "dead_letter", - "_dead_letter_queue_messages_ttl": 3600 + "_dead_letter_queue_messages_ttl": 3600, + "_enable_encrypt_messages": true } \ No newline at end of file diff --git a/lower-constraints.txt b/lower-constraints.txt index 33c3ca66c..69740375f 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -2,6 +2,7 @@ alembic==0.8.10 autobahn==0.17.1 Babel==2.3.4 coverage==4.0 +cryptography==2.1 ddt==1.0.1 doc8==0.6.0 dogpile.cache==0.6.2 diff --git a/releasenotes/notes/encrypted-messages-in-queue-d7438d4f185be444.yaml b/releasenotes/notes/encrypted-messages-in-queue-d7438d4f185be444.yaml new file mode 100644 index 000000000..28b9daa94 --- /dev/null +++ b/releasenotes/notes/encrypted-messages-in-queue-d7438d4f185be444.yaml @@ -0,0 +1,9 @@ +--- +features: + - | + To enhance the security of messaging service, the queue in Zaqar + supports to encrypt messages before storing them into storage backends, + also could support to decrypt messages when those are claimed by consumer. + To enable this feature, user just need to take "_enable_encrypt_messages=True" + when creating queue. AES-256 is used as the default of encryption algorithm and + encryption key is configurable in the zaqar.conf. diff --git a/requirements.txt b/requirements.txt index 008da0801..4a7a297ca 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 alembic>=0.8.10 # MIT Babel!=2.4.0,>=2.3.4 # BSD +cryptography>=2.1 # BSD/Apache-2.0 falcon>=1.1.0 # Apache-2.0 jsonschema>=2.6.0 # MIT iso8601>=0.1.11 # MIT diff --git a/test-requirements.txt b/test-requirements.txt index ab39b5d5d..fba056680 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -16,6 +16,7 @@ PyMySQL>=0.7.6 # MIT License # Unit testing coverage!=4.4,>=4.0 # Apache-2.0 +cryptography>=2.1 # BSD/Apache-2.0 ddt>=1.0.1 # MIT doc8>=0.6.0 # Apache-2.0 Pygments>=2.2.0 # BSD license diff --git a/zaqar/conf/transport.py b/zaqar/conf/transport.py index fe6de9b67..25f81fd13 100644 --- a/zaqar/conf/transport.py +++ b/zaqar/conf/transport.py @@ -149,6 +149,15 @@ message_delete_with_claim_id = cfg.BoolOpt( 'improve the security of the message avoiding delete messages before' ' they are claimed and handled.') +message_encryption_algorithms = cfg.StrOpt( + 'message_encryption_algorithms', default='AES256', choices=['AES256'], + help='Defines the encryption algorithms of messages, the value could be ' + '"AES256" for now.') + +message_encryption_key = cfg.StrOpt( + 'message_encryption_key', default='AES256', + help='Defines the encryption key of algorithms.') + GROUP_NAME = 'transport' ALL_OPTS = [ @@ -173,7 +182,9 @@ ALL_OPTS = [ client_id_uuid_safe, min_length_client_id, max_length_client_id, - message_delete_with_claim_id + message_delete_with_claim_id, + message_encryption_algorithms, + message_encryption_key ] diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py index 8ae9c3d8e..298ee106f 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py @@ -61,13 +61,19 @@ class TestMessagesMongoDB(base.V2Base): # so that we don't have to concatenate against self.url_prefix # all over the place. self.queue_path = self.url_prefix + '/queues/fizbit' + self.encrypted_queue_path = self.url_prefix + '/queues/secretbit' self.messages_path = self.queue_path + '/messages' + self.encrypted_messages_path = self.encrypted_queue_path + '/messages' doc = '{"_ttl": 60}' self.simulate_put(self.queue_path, body=doc, headers=self.headers) + doc = '{"_ttl": 60, "_enable_encrypt_messages": true}' + self.simulate_put(self.encrypted_queue_path, body=doc, + headers=self.headers) def tearDown(self): self.simulate_delete(self.queue_path, headers=self.headers) + self.simulate_delete(self.encrypted_queue_path, headers=self.headers) if self.conf.pooling: for i in range(4): self.simulate_delete(self.url_prefix + '/pools/' + str(i), @@ -94,10 +100,15 @@ class TestMessagesMongoDB(base.V2Base): body=sample_doc, headers=self.headers) self.assertEqual(falcon.HTTP_400, self.srmock.status) - def _test_post(self, sample_messages): + def _test_post(self, sample_messages, is_encrypted=False): sample_doc = jsonutils.dumps({'messages': sample_messages}) + messages_path = None + if is_encrypted: + messages_path = self.encrypted_messages_path + else: + messages_path = self.messages_path - result = self.simulate_post(self.messages_path, + result = self.simulate_post(messages_path, body=sample_doc, headers=self.headers) self.assertEqual(falcon.HTTP_201, self.srmock.status) @@ -125,7 +136,7 @@ class TestMessagesMongoDB(base.V2Base): with mock.patch(timeutils_utcnow) as mock_utcnow: mock_utcnow.return_value = now for msg_id in msg_ids: - message_uri = self.messages_path + '/' + msg_id + message_uri = messages_path + '/' + msg_id headers = self.headers.copy() headers['X-Project-ID'] = '777777' @@ -150,7 +161,7 @@ class TestMessagesMongoDB(base.V2Base): # Test bulk GET query_string = 'ids=' + ','.join(msg_ids) - result = self.simulate_get(self.messages_path, + result = self.simulate_get(messages_path, query_string=query_string, headers=self.headers) @@ -204,6 +215,22 @@ class TestMessagesMongoDB(base.V2Base): self._test_post(sample_messages) + def test_post_single_encrypted(self): + sample_messages = [ + {'body': {'key': 'value'}, 'ttl': 200}, + ] + + self._test_post(sample_messages) + + def test_post_multiple_encrypted(self): + sample_messages = [ + {'body': 239, 'ttl': 100}, + {'body': {'key': 'value'}, 'ttl': 200}, + {'body': [1, 3], 'ttl': 300}, + ] + + self._test_post(sample_messages) + def test_post_optional_ttl(self): sample_messages = { 'messages': [ @@ -304,6 +331,24 @@ class TestMessagesMongoDB(base.V2Base): body=sample_doc, headers=self.headers) self.assertEqual(falcon.HTTP_400, self.srmock.status) + def test_post_using_queue_max_messages_post_size_with_encrypted(self): + queue_path = self.url_prefix + '/queues/test_queue2' + messages_path = queue_path + '/messages' + doc = ('{"_max_messages_post_size": 1023, ' + '"_enable_encrypt_messages": true}') + self.simulate_put(queue_path, body=doc, headers=self.headers) + self.addCleanup(self.simulate_delete, queue_path, headers=self.headers) + sample_messages = { + 'messages': [ + {'body': {'key': 'a' * 1204}}, + ], + } + + sample_doc = jsonutils.dumps(sample_messages) + self.simulate_post(messages_path, + body=sample_doc, headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + def test_get_from_missing_queue(self): body = self.simulate_get(self.url_prefix + '/queues/nonexistent/messages', @@ -384,6 +429,24 @@ class TestMessagesMongoDB(base.V2Base): self.simulate_delete(target, headers=self.headers) self.assertEqual(falcon.HTTP_204, self.srmock.status) + def test_delete_with_encrypted(self): + self._post_messages(self.encrypted_messages_path) + msg_id = self._get_msg_id(self.srmock.headers_dict) + target = self.encrypted_messages_path + '/' + msg_id + + self.simulate_get(target, headers=self.headers) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + self.simulate_delete(target, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + self.simulate_get(target, headers=self.headers) + self.assertEqual(falcon.HTTP_404, self.srmock.status) + + # Safe to delete non-existing ones + self.simulate_delete(target, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + def test_bulk_delete(self): path = self.queue_path + '/messages' self._post_messages(path, repeat=5) @@ -410,6 +473,32 @@ class TestMessagesMongoDB(base.V2Base): self.simulate_delete(target, query_string=params, headers=self.headers) self.assertEqual(falcon.HTTP_204, self.srmock.status) + def test_bulk_delete_with_encrpted(self): + path = self.encrypted_queue_path + '/messages' + self._post_messages(path, repeat=5) + [target, params] = self.srmock.headers_dict['location'].split('?') + + # Deleting the whole collection is denied + self.simulate_delete(path, headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + self.simulate_delete(target, query_string=params, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + self.simulate_get(target, query_string=params, headers=self.headers) + self.assertEqual(falcon.HTTP_404, self.srmock.status) + + # Safe to delete non-existing ones + self.simulate_delete(target, query_string=params, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + # Even after the queue is gone + self.simulate_delete(self.queue_path, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + self.simulate_delete(target, query_string=params, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + def test_bulk_delete_with_claim_ids(self): self.conf.set_override('message_delete_with_claim_id', True, 'transport') @@ -490,6 +579,35 @@ class TestMessagesMongoDB(base.V2Base): self.assertEqual(falcon.HTTP_200, self.srmock.status) self._empty_message_list(body) + def test_list_with_encrpyted(self): + path = self.encrypted_queue_path + '/messages' + self._post_messages(path, repeat=10) + + query_string = 'limit=3&echo=true' + body = self.simulate_get(path, + query_string=query_string, + headers=self.headers) + + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + cnt = 0 + while jsonutils.loads(body[0])['messages'] != []: + contents = jsonutils.loads(body[0]) + [target, params] = contents['links'][0]['href'].split('?') + + for msg in contents['messages']: + self.simulate_get(msg['href'], headers=self.headers) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + body = self.simulate_get(target, + query_string=params, + headers=self.headers) + cnt += 1 + + self.assertEqual(4, cnt) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + self._empty_message_list(body) + def test_list_with_bad_marker(self): path = self.queue_path + '/messages' self._post_messages(path, repeat=5) diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py index 633895645..4648e863c 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py @@ -113,6 +113,7 @@ class TestQueueLifecycleMongoDB(base.V2Base): ref_doc['_dead_letter_queue'] = None ref_doc['_dead_letter_queue_messages_ttl'] = None ref_doc['_max_claim_count'] = None + ref_doc['_enable_encrypt_messages'] = False self.assertEqual(ref_doc, result_doc) # Stats empty queue @@ -161,6 +162,7 @@ class TestQueueLifecycleMongoDB(base.V2Base): ref_doc['_dead_letter_queue'] = None ref_doc['_dead_letter_queue_messages_ttl'] = None ref_doc['_max_claim_count'] = None + ref_doc['_enable_encrypt_messages'] = False self.assertEqual(ref_doc, result_doc) # Stats empty queue @@ -301,6 +303,7 @@ class TestQueueLifecycleMongoDB(base.V2Base): ref_doc['_dead_letter_queue'] = None ref_doc['_dead_letter_queue_messages_ttl'] = None ref_doc['_max_claim_count'] = None + ref_doc['_enable_encrypt_messages'] = False self.assertEqual(ref_doc, result_doc) self.assertEqual(falcon.HTTP_200, self.srmock.status) @@ -358,7 +361,8 @@ class TestQueueLifecycleMongoDB(base.V2Base): '_default_message_delay': 0, '_dead_letter_queue': None, '_dead_letter_queue_messages_ttl': None, - '_max_claim_count': None}, result_doc) + '_max_claim_count': None, + '_enable_encrypt_messages': False}, result_doc) # remove metadata doc3 = '[{"op":"remove", "path": "/metadata/key1"}]' @@ -383,7 +387,8 @@ class TestQueueLifecycleMongoDB(base.V2Base): '_default_message_delay': 0, '_dead_letter_queue': None, '_dead_letter_queue_messages_ttl': None, - '_max_claim_count': None}, result_doc) + '_max_claim_count': None, + '_enable_encrypt_messages': False}, result_doc) # replace non-existent metadata doc4 = '[{"op":"replace", "path": "/metadata/key3", "value":2}]' @@ -501,7 +506,8 @@ class TestQueueLifecycleMongoDB(base.V2Base): '_default_message_delay': 0, '_dead_letter_queue': None, '_dead_letter_queue_messages_ttl': None, - '_max_claim_count': None}, result_doc) + '_max_claim_count': None, + '_enable_encrypt_messages': False}, result_doc) # queue filter result = self.simulate_get(self.queue_path, headers=header, diff --git a/zaqar/transport/encryptor.py b/zaqar/transport/encryptor.py new file mode 100644 index 000000000..0074d0afc --- /dev/null +++ b/zaqar/transport/encryptor.py @@ -0,0 +1,216 @@ +# Copyright (c) 2020 Fiberhome Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Encryption has a dependency on the pycrypto. If pycrypto is not available, +CryptoUnavailableError will be raised. + +""" + +import base64 +import functools +import hashlib +import os +import pickle + +try: + from cryptography.hazmat import backends as crypto_backends + from cryptography.hazmat.primitives import ciphers + from cryptography.hazmat.primitives.ciphers import algorithms + from cryptography.hazmat.primitives.ciphers import modes + from cryptography.hazmat.primitives import padding +except ImportError: + ciphers = None + +from zaqar.conf import transport +from zaqar.i18n import _ + + +class EncryptionFailed(ValueError): + """Encryption failed when encrypting messages.""" + + def __init__(self, msg, *args, **kwargs): + msg = msg.format(*args, **kwargs) + super(EncryptionFailed, self).__init__(msg) + + +class DecryptError(Exception): + """raise when unable to decrypt encrypted data.""" + pass + + +class CryptoUnavailableError(Exception): + """raise when Python Crypto module is not available.""" + pass + + +def assert_crypto_availability(f): + """Ensure cryptography module is available.""" + @functools.wraps(f) + def wrapper(*args, **kwds): + if ciphers is None: + raise CryptoUnavailableError() + return f(*args, **kwds) + return wrapper + + +class EncryptionFactory(object): + + def __init__(self, conf): + self._conf = conf + self._conf.register_opts(transport.ALL_OPTS, + group=transport.GROUP_NAME) + self._limits_conf = self._conf[transport.GROUP_NAME] + self._algorithm = self._limits_conf.message_encryption_algorithms + self._encryption_key = None + if self._limits_conf.message_encryption_key: + hash_function = hashlib.sha256() + key = bytes(self._limits_conf.message_encryption_key, 'utf-8') + hash_function.update(key) + self._encryption_key = hash_function.digest() + + def getEncryptor(self): + if self._algorithm == 'AES256' and self._encryption_key: + return AES256Encryptor(self._encryption_key) + + +class Encryptor(object): + + def __init__(self, encryption_key): + self._encryption_key = encryption_key + + def message_encrypted(self, messages): + """Encrypting a list of messages. + + :param messages: A list of messages + """ + pass + + def message_decrypted(self, messages): + """decrypting a list of messages. + + :param messages: A list of messages + """ + pass + + def get_cipher(self): + pass + + def get_encryption_key(self): + return self._encryption_key + + +class AES256Encryptor(Encryptor): + + def get_cipher(self): + iv = os.urandom(16) + cipher = ciphers.Cipher( + algorithms.AES(self.get_encryption_key()), + modes.CBC(iv), backend=crypto_backends.default_backend()) + # AES algorithm uses block size of 16 bytes = 128 bits, defined in + # algorithms.AES.block_size. Using ``cryptography``, we will + # analogously use hazmat.primitives.padding to pad it to + # the 128-bit block size. + padder = padding.PKCS7(algorithms.AES.block_size).padder() + return iv, cipher, padder + + def _encrypt_string_message(self, message): + """Encrypt the message type of string""" + message = message.encode('utf-8') + iv, cipher, padder = self.get_cipher() + encryptor = cipher.encryptor() + padded_data = padder.update(message) + padder.finalize() + data = iv + encryptor.update(padded_data) + encryptor.finalize() + return base64.b64encode(data) + + def _encrypt_other_types_message(self, message): + """Encrypt the message type of other types""" + iv, cipher, padder = self.get_cipher() + encryptor = cipher.encryptor() + padded_data = padder.update(message) + padder.finalize() + data = iv + encryptor.update(padded_data) + encryptor.finalize() + return base64.b64encode(data) + + def _encrypt_message(self, message): + """Encrypt the message data with the given secret key. + + Padding is n bytes of the value n, where 1 <= n <= blocksize. + """ + if isinstance(message['body'], str): + message['body'] = self._encrypt_string_message(message['body']) + else: + # For other types like dict or list, we need to serialize them + # first. + try: + s_message = pickle.dumps(message['body']) + except pickle.PickleError: + return + message['body'] = self._encrypt_other_types_message(s_message) + + def _decrypt_message(self, message): + try: + encrypted_message = base64.b64decode(message['body']) + except (ValueError, TypeError): + return + iv = encrypted_message[:16] + cipher = ciphers.Cipher( + algorithms.AES(self._encryption_key), + modes.CBC(iv), + backend=crypto_backends.default_backend()) + try: + decryptor = cipher.decryptor() + data = (decryptor.update(encrypted_message[16:]) + + decryptor.finalize()) + except Exception: + raise DecryptError(_('Encrypted data appears to be corrupted.')) + + # Strip the last n padding bytes where n is the last value in + # the plaintext + unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder() + data = unpadder.update(data) + unpadder.finalize() + try: + message['body'] = pickle.loads(data) + except pickle.UnpicklingError: + # If the data is a string which didn't be serialized, there will + # raise an exception. We just try to return the string itself. + message['body'] = str(data, encoding="utf-8") + + @assert_crypto_availability + def message_encrypted(self, messages): + """Encrypting a list of messages. + + :param messages: A list of messages + """ + if self.get_encryption_key(): + for msg in messages: + self._encrypt_message(msg) + else: + msg = _(u'Now Zaqar only support AES-256 and need to specify the' + u'key.') + raise EncryptionFailed(msg) + + @assert_crypto_availability + def message_decrypted(self, messages): + """decrypting a list of messages. + + :param messages: A list of messages + """ + if self.get_encryption_key(): + for msg in messages: + self._decrypt_message(msg) + else: + msg = _(u'Now Zaqar only support AES-256 and need to specify the' + u'key.') + raise EncryptionFailed(msg) diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index bcb271c2d..c9fca5b84 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -341,6 +341,11 @@ class Validator(object): msg, self._limits_conf.max_message_delay, MIN_DELAY_TTL) + encrypted_queue = queue_metadata.get('_enable_encrypt_messages', False) + if encrypted_queue and not isinstance(encrypted_queue, bool): + msg = _(u'_enable_encrypt_messages must be boolean.') + raise ValidationFailed(msg) + self._validate_retry_policy(queue_metadata) def queue_purging(self, document): diff --git a/zaqar/transport/wsgi/driver.py b/zaqar/transport/wsgi/driver.py index 3e1dfc46b..ad3c9a19f 100644 --- a/zaqar/transport/wsgi/driver.py +++ b/zaqar/transport/wsgi/driver.py @@ -28,6 +28,7 @@ from zaqar.conf import drivers_transport_wsgi from zaqar.i18n import _ from zaqar import transport from zaqar.transport import acl +from zaqar.transport import encryptor from zaqar.transport.middleware import auth from zaqar.transport.middleware import cors from zaqar.transport.middleware import profile @@ -59,6 +60,7 @@ class Driver(transport.DriverBase): group=drivers_transport_wsgi.GROUP_NAME) self._wsgi_conf = self._conf[drivers_transport_wsgi.GROUP_NAME] self._validate = validation.Validator(self._conf) + self._encryptor_factory = encryptor.EncryptionFactory(self._conf) self.app = None self._init_routes() diff --git a/zaqar/transport/wsgi/v2_0/__init__.py b/zaqar/transport/wsgi/v2_0/__init__.py index 266e27fea..184d29db5 100644 --- a/zaqar/transport/wsgi/v2_0/__init__.py +++ b/zaqar/transport/wsgi/v2_0/__init__.py @@ -82,9 +82,11 @@ def public_endpoints(driver, conf): driver._validate, message_controller, queue_controller, - defaults.message_ttl)), + defaults.message_ttl, + driver._encryptor_factory)), ('/queues/{queue_name}/messages/{message_id}', - messages.ItemResource(message_controller)), + messages.ItemResource(message_controller, queue_controller, + driver._encryptor_factory)), # Claims Endpoints ('/queues/{queue_name}/claims', @@ -140,9 +142,11 @@ def public_endpoints(driver, conf): driver._validate, message_controller, topic_controller, - defaults.message_ttl)), + defaults.message_ttl, + driver._encryptor_factory)), ('/topics/{topic_name}/messages/{message_id}', - messages.ItemResource(message_controller)), + messages.ItemResource(message_controller, queue_controller, + driver._encryptor_factory)), # Topic Subscription Endpoints ('/topics/{topic_name}/subscriptions', subscriptions.CollectionResource(driver._validate, diff --git a/zaqar/transport/wsgi/v2_0/messages.py b/zaqar/transport/wsgi/v2_0/messages.py index 57d5fc466..9a71e619d 100644 --- a/zaqar/transport/wsgi/v2_0/messages.py +++ b/zaqar/transport/wsgi/v2_0/messages.py @@ -37,18 +37,20 @@ class CollectionResource(object): '_queue_controller', '_wsgi_conf', '_validate', - '_default_message_ttl' + '_default_message_ttl', + '_encryptor' ) def __init__(self, wsgi_conf, validate, message_controller, queue_controller, - default_message_ttl): + default_message_ttl, encryptor_factory): self._wsgi_conf = wsgi_conf self._validate = validate self._message_controller = message_controller self._queue_controller = queue_controller self._default_message_ttl = default_message_ttl + self._encryptor = encryptor_factory.getEncryptor() # ---------------------------------------------------------------------- # Helpers @@ -63,10 +65,14 @@ class CollectionResource(object): message_ids=ids, project=project_id) + queue_meta = self._queue_controller.get_metadata(queue_name, + project_id) except validation.ValidationFailed as ex: LOG.debug(ex) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) - + except storage_errors.QueueDoesNotExist: + LOG.exception('Queue name "%s" does not exist', queue_name) + queue_meta = None except Exception: description = _(u'Message could not be retrieved.') LOG.exception(description) @@ -77,6 +83,10 @@ class CollectionResource(object): if not messages: return None + # Decrypt messages + if queue_meta and queue_meta.get('_enable_encrypt_messages', False): + self._encryptor.message_decrypted(messages) + messages = [wsgi_utils.format_message_v1_1(m, base_path, m['claim_id']) for m in messages] @@ -123,6 +133,10 @@ class CollectionResource(object): cursor = next(results) messages = list(cursor) + # Decrypt messages + if queue_meta.get('_enable_encrypt_messages', False): + self._encryptor.message_decrypted(messages) + except validation.ValidationFailed as ex: LOG.debug(ex) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) @@ -187,6 +201,7 @@ class CollectionResource(object): queue_max_msg_size = queue_meta.get('_max_messages_post_size') queue_default_ttl = queue_meta.get('_default_message_ttl') queue_delay = queue_meta.get('_default_message_delay') + queue_encrypted = queue_meta.get('_enable_encrypt_messages', False) if queue_default_ttl: message_post_spec = (('ttl', int, queue_default_ttl), @@ -217,6 +232,9 @@ class CollectionResource(object): try: self._validate.message_posting(messages) + if queue_encrypted: + self._encryptor.message_encrypted(messages) + message_ids = self._message_controller.post( queue_name, messages=messages, @@ -343,10 +361,17 @@ class CollectionResource(object): class ItemResource(object): - __slots__ = '_message_controller' + __slots__ = ( + '_message_controller', + '_queue_controller', + '_encryptor' + ) - def __init__(self, message_controller): + def __init__(self, message_controller, queue_controller, + encryptor_factory): self._message_controller = message_controller + self._queue_controller = queue_controller + self._encryptor = encryptor_factory.getEncryptor() @decorators.TransportLog("Messages item") @acl.enforce("messages:get") @@ -357,6 +382,12 @@ class ItemResource(object): message_id, project=project_id) + queue_meta = self._queue_controller.get_metadata(queue_name, + project_id) + # Decrypt messages + if queue_meta.get('_enable_encrypt_messages', False): + self._encryptor.message_decrypted([message]) + except storage_errors.DoesNotExist as ex: LOG.debug(ex) raise wsgi_errors.HTTPNotFound(six.text_type(ex)) diff --git a/zaqar/transport/wsgi/v2_0/queues.py b/zaqar/transport/wsgi/v2_0/queues.py index 014bbeb13..61c0b10be 100644 --- a/zaqar/transport/wsgi/v2_0/queues.py +++ b/zaqar/transport/wsgi/v2_0/queues.py @@ -42,6 +42,7 @@ def _get_reserved_metadata(validate): for metadata in ['_dead_letter_queue', '_dead_letter_queue_messages_ttl', '_max_claim_count']: reserved_metadata.update({metadata: None}) + reserved_metadata.update({'_enable_encrypt_messages': False}) return reserved_metadata