diff --git a/marconiclient/queues/v1/iterator.py b/marconiclient/queues/v1/iterator.py index d9ef5f82..f0c6bba2 100644 --- a/marconiclient/queues/v1/iterator.py +++ b/marconiclient/queues/v1/iterator.py @@ -43,6 +43,7 @@ class _Iterator(object): self._create_function = create_function self._links = [] + self._stream = False self._listing_response = listing_response # NOTE(flaper87): Simple hack to @@ -59,6 +60,22 @@ class _Iterator(object): self._links = iterables['links'] self._listing_response = iterables[self._iter_key] + def stream(self, enabled=True): + """Make this `_Iterator` a stream iterator. + + Since `_Iterator`'s default is to *not* stream, + this method's default value is to *stream* data + from the server. That is, unless explicitly specified + this method will enable make this iterator a stream + iterator. + + :param enabled: Whether streaming should be + enabled or not. + :type enabled: bool + """ + self._stream = enabled + return self + def _next_page(self): for link in self._links: if link['rel'] == 'next': @@ -80,8 +97,12 @@ class _Iterator(object): try: args = self._listing_response.pop(0) except IndexError: + if not self._stream: + raise StopIteration + self._next_page() return self.next() + return self._create_function(args) # NOTE(flaper87): Py2K support diff --git a/tests/unit/queues/v1/test_message.py b/tests/unit/queues/v1/test_message.py index 3d9700a1..caee0e5f 100644 --- a/tests/unit/queues/v1/test_message.py +++ b/tests/unit/queues/v1/test_message.py @@ -44,7 +44,7 @@ class TestMessageIterator(base.QueuesTestBase): iterated = [msg for msg in iterator] self.assertEqual(len(iterated), 1) - def test_next_page(self): + def test_stream(self): messages = {'links': [], 'messages': [{ 'href': '/v1/queues/mine/messages/123123423', @@ -72,9 +72,37 @@ class TestMessageIterator(base.QueuesTestBase): messages, 'messages', message.create_object(self.queue)) - iterated = [msg for msg in iterator] + iterated = [msg for msg in iterator.stream()] self.assertEqual(len(iterated), 2) + def test_iterator_respect_paging(self): + messages = {'links': [], + 'messages': [{ + 'href': '/v1/queues/mine/messages/123123423', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', + 'mode': 'active'} + }] + } + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(messages)) + send_method.return_value = resp + + link = {'rel': 'next', + 'href': "/v1/queues/mine/messages?marker=6244-244224-783"} + messages['links'].append(link) + + iterator = iterate._Iterator(self.queue.client, + messages, + 'messages', + message.create_object(self.queue)) + iterated = [msg for msg in iterator] + self.assertEqual(len(iterated), 1) + class QueuesV1MessageHttpUnitTest(test_message.QueuesV1MessageUnitTest):