Refactoring iterators
Change-Id: Ib70dbcab38c98b3bd81ab9fff27890df25508f0c
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
from marconiclient.queues.v1 import core
|
||||
from marconiclient.queues.v1 import iterator as iterate
|
||||
from marconiclient.queues.v1 import message
|
||||
|
||||
|
||||
@@ -44,8 +45,12 @@ class Claim(object):
|
||||
self._ttl = claim_res['ttl']
|
||||
self._grace = claim_res.get('grace')
|
||||
msgs = claim_res.get('messages', [])
|
||||
self._message_iter = message._MessageIterator(self._queue,
|
||||
msgs)
|
||||
self._message_iter = iterate._Iterator(self._queue.client,
|
||||
msgs,
|
||||
'messages',
|
||||
message.create_object(
|
||||
self._queue
|
||||
))
|
||||
|
||||
def _create(self):
|
||||
req, trans = self._queue.client._request_and_transport()
|
||||
@@ -57,7 +62,12 @@ class Claim(object):
|
||||
# extract the id from the first message
|
||||
if msgs is not None:
|
||||
self.id = msgs[0]['href'].split('=')[-1]
|
||||
self._message_iter = message._MessageIterator(self._queue, msgs or [])
|
||||
self._message_iter = iterate._Iterator(self._queue.client,
|
||||
msgs or [],
|
||||
'messages',
|
||||
message.create_object(
|
||||
self._queue
|
||||
))
|
||||
|
||||
def __iter__(self):
|
||||
if self._message_iter is None:
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
import uuid
|
||||
|
||||
from marconiclient.queues.v1 import core
|
||||
from marconiclient.queues.v1 import iterator
|
||||
from marconiclient.queues.v1 import queues
|
||||
from marconiclient.queues.v1 import shard
|
||||
from marconiclient import transport
|
||||
@@ -101,7 +102,10 @@ class Client(object):
|
||||
|
||||
queue_list = core.queue_list(trans, req, **params)
|
||||
|
||||
return queues._QueueIterator(self, queue_list)
|
||||
return iterator._Iterator(self,
|
||||
queue_list,
|
||||
'queues',
|
||||
queues.create_object(self))
|
||||
|
||||
def follow(self, ref):
|
||||
"""Follows ref.
|
||||
|
||||
88
marconiclient/queues/v1/iterator.py
Normal file
88
marconiclient/queues/v1/iterator.py
Normal file
@@ -0,0 +1,88 @@
|
||||
# Copyright (c) 2014 Rackspace
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
class _Iterator(object):
|
||||
"""Base Iterator
|
||||
|
||||
This iterator is not meant to be used outside
|
||||
the scope of this package. The iterator gets
|
||||
a dictionary as returned by a listing endpoint.
|
||||
|
||||
Subclasses of this base class determine the key
|
||||
to iterate over, as well as the means of creating
|
||||
the objects contained within.
|
||||
|
||||
If there are no objects left to return, the iterator
|
||||
will try to load more by following the `next` rel link
|
||||
type.
|
||||
|
||||
The iterator raises a StopIteration exception if the server
|
||||
doesn't return more objects after a `next-page` call.
|
||||
|
||||
:param client: The client instance used by the queue
|
||||
:type client: `v1.Client`
|
||||
:param listing_response: Response returned by the listing call
|
||||
:type listing_response: Dict
|
||||
"""
|
||||
def __init__(self, client, listing_response, iter_key, create_function):
|
||||
self._client = client
|
||||
self._iter_key = iter_key
|
||||
self._create_function = create_function
|
||||
|
||||
self._links = []
|
||||
self._listing_response = listing_response
|
||||
|
||||
# NOTE(flaper87): Simple hack to
|
||||
# re-use the iterator for get_many_messages
|
||||
# and message listing.
|
||||
if isinstance(listing_response, dict):
|
||||
self._links = listing_response['links']
|
||||
self._listing_response = listing_response[self._iter_key]
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def get_iterables(self, iterables):
|
||||
self._links = iterables['links']
|
||||
self._listing_response = iterables[self._iter_key]
|
||||
|
||||
def _next_page(self):
|
||||
for link in self._links:
|
||||
if link['rel'] == 'next':
|
||||
# NOTE(flaper87): We already have the
|
||||
# ref for the next set of messages, lets
|
||||
# just follow it.
|
||||
iterables = self._client.follow(link['href'])
|
||||
|
||||
# NOTE(flaper87): Since we're using
|
||||
# `.follow`, the empty result will
|
||||
# be None. Consider making the API
|
||||
# return an empty dict for consistency.
|
||||
if iterables:
|
||||
self.get_iterables(iterables)
|
||||
return
|
||||
raise StopIteration
|
||||
|
||||
def __next__(self):
|
||||
try:
|
||||
args = self._listing_response.pop(0)
|
||||
except IndexError:
|
||||
self._next_page()
|
||||
return self.next()
|
||||
return self._create_function(args)
|
||||
|
||||
# NOTE(flaper87): Py2K support
|
||||
next = __next__
|
||||
@@ -17,76 +17,6 @@
|
||||
from marconiclient.queues.v1 import core
|
||||
|
||||
|
||||
# NOTE(flaper87): Consider moving the
|
||||
# iterator into a common package.
|
||||
class _MessageIterator(object):
|
||||
"""Message iterator
|
||||
|
||||
This iterator is not meant to be used outside
|
||||
the scope of this package. The iterator gets
|
||||
a dictionary as returned by the message listing
|
||||
endpoint and iterates over the messages in the
|
||||
`messages` key.
|
||||
|
||||
If there are no messages left to return, the iterator
|
||||
will try to load more by following the `next` rel link
|
||||
type.
|
||||
|
||||
The iterator raises a StopIteration exception if the server
|
||||
doesn't return more messages after a `next-page` call.
|
||||
|
||||
:param client: The client instance used by the queue
|
||||
:type client: `v1.Client`
|
||||
:param messages: Response returned by the messages listing call
|
||||
:type messages: Dict
|
||||
"""
|
||||
|
||||
def __init__(self, queue, messages):
|
||||
self._queue = queue
|
||||
|
||||
# NOTE(flaper87): Simple hack to
|
||||
# re-use the iterator for get_many_messages
|
||||
# and message listing.
|
||||
self._links = []
|
||||
self._messages = messages
|
||||
|
||||
if isinstance(messages, dict):
|
||||
self._links = messages['links']
|
||||
self._messages = messages['messages']
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def _next_page(self):
|
||||
for link in self._links:
|
||||
if link['rel'] == 'next':
|
||||
# NOTE(flaper87): We already have the
|
||||
# ref for the next set of messages, lets
|
||||
# just follow it.
|
||||
messages = self._queue.client.follow(link['href'])
|
||||
|
||||
# NOTE(flaper87): Since we're using
|
||||
# `.follow`, the empty result will
|
||||
# be None. Consider making the API
|
||||
# return an empty dict for consistency.
|
||||
if messages:
|
||||
self._links = messages['links']
|
||||
self._messages = messages['messages']
|
||||
return
|
||||
raise StopIteration
|
||||
|
||||
def __next__(self):
|
||||
try:
|
||||
msg = self._messages.pop(0)
|
||||
except IndexError:
|
||||
self._next_page()
|
||||
return self.next()
|
||||
return Message(self._queue, **msg)
|
||||
|
||||
# NOTE(flaper87): Py2K support
|
||||
next = __next__
|
||||
|
||||
|
||||
class Message(object):
|
||||
"""A handler for Marconi server Message resources.
|
||||
Attributes are only downloaded once - at creation time.
|
||||
@@ -121,3 +51,7 @@ class Message(object):
|
||||
def delete(self):
|
||||
req, trans = self.queue.client._request_and_transport()
|
||||
core.message_delete(trans, req, self.queue._name, self._id)
|
||||
|
||||
|
||||
def create_object(parent):
|
||||
return lambda args: Message(parent, **args)
|
||||
|
||||
@@ -15,71 +15,17 @@
|
||||
|
||||
from marconiclient.queues.v1 import claim as claim_api
|
||||
from marconiclient.queues.v1 import core
|
||||
from marconiclient.queues.v1 import iterator
|
||||
from marconiclient.queues.v1 import message
|
||||
|
||||
|
||||
class _QueueIterator(object):
|
||||
"""Queue iterator
|
||||
|
||||
This iterator is not meant to be used outside
|
||||
the scope of this package. The iterator gets
|
||||
a dictionary as returned by the queue listing
|
||||
endpoint and iterates over the queues in the
|
||||
`queues` key.
|
||||
|
||||
If there are no queues left to return, the iterator
|
||||
will try to load more by following the `next` rel link type.
|
||||
|
||||
The iterator raises a StopIteration exception if the server
|
||||
doesn't return more messages after a `next-page` call.
|
||||
|
||||
:param client: The client instance used by the queue
|
||||
:type client: `v1.Client`
|
||||
:param queues: Response returned by the queues listing call
|
||||
:type queues: Dict
|
||||
"""
|
||||
|
||||
def __init__(self, client, queues):
|
||||
self._client = client
|
||||
|
||||
self._links = queues['links']
|
||||
self._queues = queues['queues']
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def _next_page(self):
|
||||
for link in self._links:
|
||||
if link['rel'] == 'next':
|
||||
|
||||
queues = self._client.follow(link['href'])
|
||||
|
||||
if queues:
|
||||
self._links = queues['links']
|
||||
self._queues = queues['queues']
|
||||
return
|
||||
|
||||
raise StopIteration
|
||||
|
||||
def __next__(self):
|
||||
try:
|
||||
q = self._queues.pop(0)
|
||||
except IndexError:
|
||||
self._next_page()
|
||||
return self.next()
|
||||
|
||||
return Queue(self._client, q["name"], False)
|
||||
|
||||
next = __next__
|
||||
|
||||
|
||||
class Queue(object):
|
||||
|
||||
def __init__(self, client, queue_name, auto_create=True):
|
||||
def __init__(self, client, name, auto_create=True):
|
||||
self.client = client
|
||||
|
||||
# NOTE(flaper87) Queue Info
|
||||
self._name = queue_name
|
||||
self._name = name
|
||||
self._metadata = None
|
||||
|
||||
if auto_create:
|
||||
@@ -208,8 +154,15 @@ class Queue(object):
|
||||
self._name,
|
||||
**params)
|
||||
|
||||
return message._MessageIterator(self, msgs)
|
||||
return iterator._Iterator(self.client,
|
||||
msgs,
|
||||
'messages',
|
||||
message.create_object(self))
|
||||
|
||||
def claim(self, id=None, ttl=None, grace=None,
|
||||
limit=None):
|
||||
return claim_api.Claim(self, id=id, ttl=ttl, grace=grace, limit=limit)
|
||||
|
||||
|
||||
def create_object(parent):
|
||||
return lambda args: Queue(parent, args["name"], auto_create=False)
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
import json
|
||||
import mock
|
||||
|
||||
from marconiclient.queues.v1 import iterator
|
||||
from marconiclient.queues.v1 import message
|
||||
from marconiclient.tests.queues import base
|
||||
from marconiclient.transport import response
|
||||
@@ -143,7 +144,7 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase):
|
||||
send_method.return_value = resp
|
||||
|
||||
msgs = self.queue.messages(limit=1)
|
||||
self.assertIsInstance(msgs, message._MessageIterator)
|
||||
self.assertIsInstance(msgs, iterator._Iterator)
|
||||
|
||||
# NOTE(flaper87): Nothing to assert here,
|
||||
# just checking our way down to the transport
|
||||
@@ -191,7 +192,7 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase):
|
||||
|
||||
msg = self.queue.messages('50b68a50d6f5b8c8a7c62b01',
|
||||
'50b68a50d6f5b8c8a7c62b02')
|
||||
self.assertIsInstance(msg, message._MessageIterator)
|
||||
self.assertIsInstance(msg, iterator._Iterator)
|
||||
|
||||
# NOTE(flaper87): Nothing to assert here,
|
||||
# just checking our way down to the transport
|
||||
@@ -272,7 +273,7 @@ class QueuesV1QueueFunctionalTest(base.QueuesTestBase):
|
||||
queue.post(messages)
|
||||
|
||||
messages = queue.messages()
|
||||
self.assertTrue(isinstance(messages, message._MessageIterator))
|
||||
self.assertTrue(isinstance(messages, iterator._Iterator))
|
||||
self.assertGreaterEqual(len(list(messages)), 0)
|
||||
|
||||
def test_message_list_echo_functional(self):
|
||||
@@ -286,7 +287,7 @@ class QueuesV1QueueFunctionalTest(base.QueuesTestBase):
|
||||
]
|
||||
queue.post(messages)
|
||||
messages = queue.messages(echo=True)
|
||||
self.assertTrue(isinstance(messages, message._MessageIterator))
|
||||
self.assertTrue(isinstance(messages, iterator._Iterator))
|
||||
self.assertGreaterEqual(len(list(messages)), 3)
|
||||
|
||||
def test_message_get_functional(self):
|
||||
@@ -321,5 +322,5 @@ class QueuesV1QueueFunctionalTest(base.QueuesTestBase):
|
||||
res = queue.post(messages)['resources']
|
||||
msgs_id = [ref.split('/')[-1] for ref in res]
|
||||
messages = queue.messages(*msgs_id)
|
||||
self.assertTrue(isinstance(messages, message._MessageIterator))
|
||||
self.assertTrue(isinstance(messages, iterator._Iterator))
|
||||
self.assertEqual(len(list(messages)), 1)
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
import json
|
||||
import mock
|
||||
|
||||
from marconiclient.queues.v1 import iterator as iterate
|
||||
from marconiclient.queues.v1 import message
|
||||
from marconiclient.tests.queues import base
|
||||
from marconiclient.tests.queues import messages as test_message
|
||||
@@ -36,7 +37,10 @@ class TestMessageIterator(base.QueuesTestBase):
|
||||
}]
|
||||
}
|
||||
|
||||
iterator = message._MessageIterator(self.queue, messages)
|
||||
iterator = iterate._Iterator(self.queue.client,
|
||||
messages,
|
||||
'messages',
|
||||
message.create_object(self.queue))
|
||||
iterated = [msg for msg in iterator]
|
||||
self.assertEqual(len(iterated), 1)
|
||||
|
||||
@@ -64,7 +68,10 @@ class TestMessageIterator(base.QueuesTestBase):
|
||||
'href': "/v1/queues/mine/messages?marker=6244-244224-783"}
|
||||
messages['links'].append(link)
|
||||
|
||||
iterator = message._MessageIterator(self.queue, messages)
|
||||
iterator = iterate._Iterator(self.queue.client,
|
||||
messages,
|
||||
'messages',
|
||||
message.create_object(self.queue))
|
||||
iterated = [msg for msg in iterator]
|
||||
self.assertEqual(len(iterated), 2)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user