diff --git a/zaqar/storage/base.py b/zaqar/storage/base.py index 1971f6813..74f270bad 100644 --- a/zaqar/storage/base.py +++ b/zaqar/storage/base.py @@ -699,6 +699,37 @@ class Subscription(ControllerBase): """ raise NotImplementedError + @abc.abstractmethod + def get_with_subscriber(self, queue, subscriber, project=None): + """Base method for get a subscription with the subscriber. + + :param queue: Name of the queue subscription belongs to. + :type queue: six.text_type + :param subscriber: link of the subscription to be notified. + :type subscriber: six.text_type + :param project: Project id + :type project: six.text_type + :returns: Dictionary containing subscription data + :rtype: dict + """ + raise NotImplementedError + + @abc.abstractmethod + def confirm(self, queue, subscription_id, project=None, confirmed=True): + """Base method for confirming a subscription. + + :param queue: Name of the queue subscription belongs to. + :type queue: six.text_type + :param subscription_id: ID of the subscription to be deleted. + :type subscription_id: six.text_type + :param project: Project id + :type project: six.text_type + :param confirmed: Confirm a subscription or cancel the confirmation of + a subscription. + :type confirmed: boolean + """ + raise NotImplementedError + @six.add_metaclass(abc.ABCMeta) class PoolsBase(ControllerBase): diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index d9f872e00..474d9c93a 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -164,10 +164,11 @@ class SubscriptionController(base.Subscription): return _basic_subscription(res, now) @utils.raises_conn_error - def confirm(self, queue, subscription_id, project=None, confirm=True): + def confirm(self, queue, subscription_id, project=None, confirmed=True): res = self._collection.update({'_id': utils.to_oid(subscription_id), - 'p': project}, {'$set': {'c': confirm}}, + 'p': project}, + {'$set': {'c': confirmed}}, upsert=False) if not res['updatedExisting']: raise errors.SubscriptionDoesNotExist(subscription_id) diff --git a/zaqar/storage/pooling.py b/zaqar/storage/pooling.py index bf771c6ed..77a4fceef 100644 --- a/zaqar/storage/pooling.py +++ b/zaqar/storage/pooling.py @@ -409,11 +409,11 @@ class SubscriptionController(storage.Subscription): return control.exists(queue, subscription_id, project=project) - def confirm(self, queue, subscription_id, project=None, confirm=None): + def confirm(self, queue, subscription_id, project=None, confirmed=None): control = self._get_controller(queue, project) if control: return control.confirm(queue, subscription_id, - project=project, confirm=confirm) + project=project, confirmed=confirmed) def get_with_subscriber(self, queue, subscriber, project=None): control = self._get_controller(queue, project) diff --git a/zaqar/storage/redis/models.py b/zaqar/storage/redis/models.py index 07cecd2df..825b4ba33 100644 --- a/zaqar/storage/redis/models.py +++ b/zaqar/storage/redis/models.py @@ -22,7 +22,7 @@ from oslo_utils import encodeutils from oslo_utils import timeutils MSGENV_FIELD_KEYS = (b'id', b't', b'cr', b'e', b'u', b'c', b'c.e') -SUBENV_FIELD_KEYS = (b'id', b's', b'u', b't', b'e', b'o', b'p') +SUBENV_FIELD_KEYS = (b'id', b's', b'u', b't', b'e', b'o', b'p', b'c') # TODO(kgriffs): Make similar classes for claims and queues @@ -115,6 +115,7 @@ class SubscriptionEnvelope(object): 'expires', 'options', 'project', + 'confirmed', ] def __init__(self, **kwargs): @@ -124,6 +125,7 @@ class SubscriptionEnvelope(object): self.ttl = kwargs['ttl'] self.expires = kwargs.get('expires', float('inf')) self.options = kwargs['options'] + self.confirmed = kwargs.get('confirmed', 'True') @staticmethod def from_redis(sid, client): @@ -144,6 +146,7 @@ class SubscriptionEnvelope(object): def to_basic(self, now): created = self.expires - self.ttl + is_confirmed = self.confirmed == str(True) basic_msg = { 'id': self.id, 'source': self.source, @@ -151,6 +154,7 @@ class SubscriptionEnvelope(object): 'ttl': self.ttl, 'age': now - created, 'options': self.options, + 'confirmed': is_confirmed, } return basic_msg @@ -294,7 +298,8 @@ def _hmap_to_subenv_kwargs(hmap): 'subscriber': hmap[b'u'], 'ttl': int(hmap[b't']), 'expires': int(hmap[b'e']), - 'options': _unpack(hmap[b'o']) + 'options': _unpack(hmap[b'o']), + 'confirmed': hmap[b'c'] } diff --git a/zaqar/storage/redis/subscriptions.py b/zaqar/storage/redis/subscriptions.py index 700ed6e24..fcb2ff0c5 100644 --- a/zaqar/storage/redis/subscriptions.py +++ b/zaqar/storage/redis/subscriptions.py @@ -71,6 +71,9 @@ class SubscriptionController(base.Subscription): ttl = int(record[2]) expires = int(record[3]) created = expires - ttl + is_confirmed = True + if len(record) == 6: + is_confirmed = record[5] == str(True) ret = { 'id': sid, 'source': record[0], @@ -78,6 +81,7 @@ class SubscriptionController(base.Subscription): 'ttl': ttl, 'age': now - created, 'options': self._unpacker(record[4]), + 'confirmed': is_confirmed, } marker_next['next'] = sid @@ -108,6 +112,7 @@ class SubscriptionController(base.Subscription): source = queue now = timeutils.utcnow_ts() expires = now + ttl + confirmed = False subscription = {'id': subscription_id, 's': source, @@ -115,7 +120,8 @@ class SubscriptionController(base.Subscription): 't': ttl, 'e': expires, 'o': self._packer(options), - 'p': project} + 'p': project, + 'c': confirmed} try: # Pipeline ensures atomic inserts. @@ -150,8 +156,9 @@ class SubscriptionController(base.Subscription): try: sub_ids = (q for q in self._client.zrange(subset_key, 0, -1)) for s_id in sub_ids: - subscription = self._client.hmget(s_id, ['s', 'u', 't', 'o']) - if subscription == [None, None, None, None]: + subscription = self._client.hmget(s_id, + ['s', 'u', 't', 'o', 'c']) + if subscription == [None, None, None, None, None]: # NOTE(flwang): Under this check, that means the # subscription has been expired. So redis can't get # the subscription but the id is still there. So let's @@ -228,3 +235,31 @@ class SubscriptionController(base.Subscription): pipe.zrem(subset_key, subscription_id) pipe.delete(subscription_id) pipe.execute() + + @utils.raises_conn_error + @utils.retries_on_connection_error + def get_with_subscriber(self, queue, subscriber, project=None): + subset_key = utils.scope_subscription_ids_set(queue, + project, + SUBSCRIPTION_IDS_SUFFIX) + sub_ids = (q for q in self._client.zrange(subset_key, 0, -1)) + for s_id in sub_ids: + subscription = self._client.hmget(s_id, + ['s', 'u', 't', 'o', 'c']) + if subscription[1] == subscriber: + subscription = SubscriptionEnvelope.from_redis(s_id, + self._client) + now = timeutils.utcnow_ts() + return subscription.to_basic(now) + + @utils.raises_conn_error + @utils.retries_on_connection_error + def confirm(self, queue, subscription_id, project=None, confirmed=True): + # Let's get our subscription by ID. If it does not exist, + # SubscriptionDoesNotExist error will be raised internally. + self.get(queue, subscription_id, project=project) + + fields = {'c': confirmed} + with self._client.pipeline() as pipe: + pipe.hmset(subscription_id, fields) + pipe.execute() diff --git a/zaqar/storage/redis/utils.py b/zaqar/storage/redis/utils.py index c621d4637..b028db5eb 100644 --- a/zaqar/storage/redis/utils.py +++ b/zaqar/storage/redis/utils.py @@ -266,7 +266,7 @@ class SubscriptionListCursor(object): @raises_conn_error def next(self): curr = next(self.subscription_iter) - subscription = self.client.hmget(curr, ['s', 'u', 't', 'e', 'o']) + subscription = self.client.hmget(curr, ['s', 'u', 't', 'e', 'o', 'c']) # NOTE(flwang): The expired subscription will be removed # automatically, but the key can't be deleted automatically as well. # Though we clean up those expired ids when create new subscription, diff --git a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py index ea4232617..08334951d 100644 --- a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py +++ b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py @@ -94,6 +94,28 @@ class TestSubscriptions(base.BaseV2MessagingTest): subscription_id = result[1]["subscription_id"] self.delete_subscription(self.queue_name, subscription_id) + @test.idempotent_id('fe0d8ec1-1a64-4490-8869-e821b2252e74') + def test_create_subscriptions_with_duplicate_subscriber(self): + # Adding subscriptions to the queue + results = self._create_subscriptions() + s_id1 = results[0][1]['subscription_id'] + + # Adding a subscription with duplicate subscriber, it will reconfirm + # the subscription and run well. + rbody = {'subscriber': 'http://fake:8080', + 'options': {'MessagingKeyMsg': 'MessagingValueMsg'}, + 'ttl': 293305} + resp, body = self.create_subscription(self.queue_name, rbody) + s_id2 = body['subscription_id'] + + self.assertEqual('201', resp['status']) + self.assertEqual(s_id2, s_id1) + + # Delete the subscriptions created + for result in results: + subscription_id = result[1]["subscription_id"] + self.delete_subscription(self.queue_name, subscription_id) + @decorators.idempotent_id('ff4344b4-ba78-44c5-9ffc-44e53e484f76') def test_trust_subscription(self): sub_queue = data_utils.rand_name('Queues-Test') diff --git a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions_negative.py b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions_negative.py index 1d6b09780..75e20320d 100644 --- a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions_negative.py +++ b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions_negative.py @@ -17,7 +17,6 @@ import uuid from tempest import config from tempest.lib.common.utils import data_utils -from tempest.lib import decorators from tempest.lib import exceptions as lib_exc from tempest import test @@ -44,28 +43,6 @@ class TestSubscriptionsNegative(base.BaseV2MessagingTest): results.append((resp, body)) return results - # Create Subscriptions - - # TODO(wangxiyuan): Now the subscription confirmation feature only support - # mongoDB backend. Skip this test until the feature support the redis - # backend. Then rewrite it. - @decorators.skip_because(bug='1609596') - @test.attr(type=['negative']) - @test.idempotent_id('fe0d8ec1-1a64-4490-8869-e821b2252e74') - def test_create_subscriptions_with_duplicate_subscriber(self): - # Adding a subscription to the queue - results = self._create_subscriptions() - # Adding a duplicate subscriber - rbody = {'subscriber': 'http://fake:8080', - 'options': {'MessagingKeyMsg': 'MessagingValueMsg'}, - 'ttl': 293305} - self.assertRaises(lib_exc.Conflict, - self.create_subscription, self.queue_name, rbody) - # Delete the subscriptions created - for result in results: - subscription_id = result[1]["subscription_id"] - self.delete_subscription(self.queue_name, subscription_id) - @test.attr(type=['negative']) @test.idempotent_id('0bda2907-a783-4614-af16-23d7a7d53b72') def test_create_subscriptions_with_invalid_body(self): diff --git a/zaqar/tests/unit/storage/base.py b/zaqar/tests/unit/storage/base.py index 45d8775f4..ef613dfd7 100644 --- a/zaqar/tests/unit/storage/base.py +++ b/zaqar/tests/unit/storage/base.py @@ -1236,6 +1236,35 @@ class SubscriptionControllerTest(ControllerBaseTest): project=self.project, **update_fields) + def test_confirm(self): + s_id = self.subscription_controller.create(self.source, + self.subscriber, + self.ttl, + self.options, + project=self.project) + self.addCleanup(self.subscription_controller.delete, self.source, + s_id, self.project) + subscription = self.subscription_controller.get(self.source, s_id, + project=self.project) + + self.assertEqual(False, subscription['confirmed']) + + self.subscription_controller.confirm(self.source, s_id, + project=self.project, + confirmed=True) + subscription = self.subscription_controller.get(self.source, s_id, + project=self.project) + + self.assertEqual(True, subscription['confirmed']) + + def test_confirm_with_nonexist_subscription(self): + s_id = 'fake-id' + self.assertRaises(errors.SubscriptionDoesNotExist, + self.subscription_controller.confirm, + self.source, s_id, project=self.project, + confirmed=True + ) + class PoolsControllerTest(ControllerBaseTest): """Pools Controller base tests. diff --git a/zaqar/tests/unit/storage/test_impl_mongodb.py b/zaqar/tests/unit/storage/test_impl_mongodb.py index 6e902cbec..cb6ef5ebf 100644 --- a/zaqar/tests/unit/storage/test_impl_mongodb.py +++ b/zaqar/tests/unit/storage/test_impl_mongodb.py @@ -493,34 +493,6 @@ class MongodbSubscriptionTests(MongodbSetupMixin, controller_class = controllers.SubscriptionController control_driver_class = mongodb.ControlDriver - def test_confirm(self): - s_id = self.subscription_controller.create(self.source, - self.subscriber, - self.ttl, - self.options, - project=self.project) - self.addCleanup(self.subscription_controller.delete, self.source, - s_id, self.project) - subscription = self.subscription_controller.get(self.source, s_id, - project=self.project) - - self.assertEqual(False, subscription['confirmed']) - - self.subscription_controller.confirm(self.source, s_id, - project=self.project, - confirm=True) - subscription = self.subscription_controller.get(self.source, s_id, - project=self.project) - - self.assertEqual(True, subscription['confirmed']) - - def test_confirm_with_nonexist_subscription(self): - s_id = 'fake-id' - self.assertRaises(errors.SubscriptionDoesNotExist, - self.subscription_controller.confirm, - self.source, s_id, project=self.project, confirm=True - ) - # # TODO(kgriffs): Do these need database purges as well as those above? diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index 75d1bb76e..5ef1f190d 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -540,9 +540,9 @@ class Validator(object): except OverflowError: raise ValidationFailed(msg, datetime.datetime.max) - def subscription_confirming(self, confirm): - confirm = confirm.get('confirmed', None) - if not isinstance(confirm, bool): + def subscription_confirming(self, confirmed): + confirmed = confirmed.get('confirmed', None) + if not isinstance(confirmed, bool): msg = _(u"The 'confirmed' should be boolean.") raise ValidationFailed(msg) diff --git a/zaqar/transport/wsgi/v2_0/subscriptions.py b/zaqar/transport/wsgi/v2_0/subscriptions.py index 7690a47cc..303908149 100644 --- a/zaqar/transport/wsgi/v2_0/subscriptions.py +++ b/zaqar/transport/wsgi/v2_0/subscriptions.py @@ -264,10 +264,10 @@ class ConfirmResource(object): try: self._validate.subscription_confirming(document) - confirm = document.get('confirmed', None) + confirmed = document.get('confirmed', None) self._subscription_controller.confirm(queue_name, subscription_id, project=project_id, - confirm=confirm) + confirmed=confirmed) resp.status = falcon.HTTP_204 resp.location = req.path except storage_errors.SubscriptionDoesNotExist as ex: