From 4fdbddac2972d563b01d5876407fb7e6ac273f6e Mon Sep 17 00:00:00 2001 From: Flavio Percoco Date: Fri, 20 Dec 2013 15:33:36 +0100 Subject: [PATCH] Add a Message resource and a MessageIterator This patch defines the Message resource and the MessageIterator. The former represents a message unit and the possible operations that can be executed from that resource. The later allows users to iterate over Messages' pages and consume all available messages. The patch also renames queue_id to queue._name which is more consistent with the API terminology. NOTE: Functional tests are missing and will be added in a separate patch. Change-Id: I8c871e326bd580964f15d4ffc16c6264f9825ba7 Partially-Implements: python-marconiclient-v1 --- examples/simple.py | 7 +- marconiclient/queues/v1/api.py | 2 + marconiclient/queues/v1/client.py | 11 +++ marconiclient/queues/v1/message.py | 124 ++++++++++++++++--------- marconiclient/queues/v1/queues.py | 40 ++++---- marconiclient/tests/queues/base.py | 5 + marconiclient/tests/queues/messages.py | 42 +++++++++ marconiclient/tests/queues/queues.py | 10 +- marconiclient/tests/transport/api.py | 2 + marconiclient/transport/api.py | 1 + marconiclient/transport/http.py | 14 ++- marconiclient/transport/request.py | 3 +- tests/unit/queues/v1/test_message.py | 102 ++++++++++---------- 13 files changed, 244 insertions(+), 119 deletions(-) create mode 100644 marconiclient/tests/queues/messages.py diff --git a/examples/simple.py b/examples/simple.py index e1216a4c..0b6a90b2 100644 --- a/examples/simple.py +++ b/examples/simple.py @@ -31,9 +31,14 @@ def create_post_delete(queue_name, messages): cli = client.Client(URL) queue = cli.queue(queue_name) queue.post(messages) + + for msg in queue.messages(echo=True): + print(msg.body) + msg.delete() + queue.delete() if __name__ == '__main__': - messages = [{'body': {'id': idx}, 'ttl': 60} + messages = [{'body': {'id': idx}, 'ttl': 360} for idx in range(20)] create_post_delete('my_queue', messages) diff --git a/marconiclient/queues/v1/api.py b/marconiclient/queues/v1/api.py index e0c8e3e5..62c4ea80 100644 --- a/marconiclient/queues/v1/api.py +++ b/marconiclient/queues/v1/api.py @@ -18,6 +18,8 @@ from marconiclient.transport import api class V1(api.Api): + label = 'v1' + schema = { 'queue_list': { 'ref': 'queues', diff --git a/marconiclient/queues/v1/client.py b/marconiclient/queues/v1/client.py index c0fcff51..b7cb6f8d 100644 --- a/marconiclient/queues/v1/client.py +++ b/marconiclient/queues/v1/client.py @@ -88,3 +88,14 @@ class Client(object): :rtype: `queues.Queue` """ return queues.Queue(self, ref, **kwargs) + + def follow(self, ref): + """Follows ref. + + :params ref: The reference path. + :type ref: `six.text_type` + """ + req, trans = self._request_and_transport() + req.ref = ref + + return trans.send(req).deserialized_content diff --git a/marconiclient/queues/v1/message.py b/marconiclient/queues/v1/message.py index f810b939..9d6ac3e6 100644 --- a/marconiclient/queues/v1/message.py +++ b/marconiclient/queues/v1/message.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013 Rackspace, Inc. +# Copyright (c) 2013 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,63 +14,99 @@ # limitations under the License. """Implements a message controller that understands Marconi messages.""" - -def _args_from_dict(msg): - return { - 'href': msg['href'], - 'ttl': msg['ttl'], - 'age': msg['age'], - 'body': msg['body'] - } +from marconiclient.queues.v1 import core -def from_dict(msg, connection=None): - """from_dict(dict, Connection) => Message - :param msg: A dictionary created by decoding a Marconi message JSON reply - :param connection: A connection to a Marconi server. - :raises: KeyError If msg is missing fields - :raises: TypeError if msg is not a dict. +# NOTE(flaper87): Consider moving the +# iterator into a common package. +class _MessageIterator(object): + """Message iterator + + This iterator is not meant to be used outside + the scope of this package. The iterator gets + a dictionary as returned by the message listing + endpoint and iterates over the messages in the + `messages` key. + + If there are no messages left to return, the iterator + will try to load more by following the `next` rel link + type. + + The iterator raises a StopIteration exception if the server + doesn't return more messages after a `next-page` call. + + :param client: The client instance used by the queue + :type client: `v1.Client` + :param messages: Response returned by the messages listing call + :type messages: Dict """ - return Message( - connection=connection, - **_args_from_dict(msg) - ) + + def __init__(self, queue, messages): + self._queue = queue + + # NOTE(flaper87): Simple hack to + # re-use the iterator for get_many_messages + # and message listing. + self._links = [] + self._messages = messages + + if isinstance(messages, dict): + self._links = messages['links'] + self._messages = messages['messages'] + + def __iter__(self): + return self + + def _next_page(self): + for link in self._links: + if link['rel'] == 'next': + # NOTE(flaper87): We already have the + # ref for the next set of messages, lets + # just follow it. + messages = self._queue.client.follow(link['href']) + + # NOTE(flaper87): Since we're using + # `.follow`, the empty result will + # be None. Consider making the API + # return an empty dict for consistency. + if messages: + self._links = messages['links'] + self._messages = messages['messages'] + return + raise StopIteration + + def __next__(self): + try: + msg = self._messages.pop(0) + except IndexError: + self._next_page() + return self.next() + return Message(self._queue, **msg) + + # NOTE(flaper87): Py2K support + next = __next__ class Message(object): """A handler for Marconi server Message resources. Attributes are only downloaded once - at creation time. """ - def __init__(self, href, ttl, age, body, connection): + def __init__(self, queue, href, ttl, age, body): + self.queue = queue self.href = href self.ttl = ttl self.age = age self.body = body - self._connection = connection - self._deleted = False + + # NOTE(flaper87): Is this really + # necessary? Should this be returned + # by Marconi? + self._id = href.split('/')[-1] def __repr__(self): - return '' % (self.ttl,) - - def _assert_not_deleted(self): - assert not self._deleted, 'Already deleted' - - def reload(self): - """Queries the server and updates all local attributes - with new values. - """ - self._assert_not_deleted() - msg = self._connection.get(self.href).json() - - self.href = msg['href'] - self.ttl = msg['ttl'] - self.age = msg['age'] - self.body = msg['body'] + return ''.format(id=self._id, + ttl=self.ttl) def delete(self): - """Deletes this resource from the server, but leaves the local - object intact. - """ - self._assert_not_deleted() - self._connection.delete(self.href) - self._deleted = True + req, trans = self.queue.client._request_and_transport() + core.message_delete(trans, req, self.queue._name, self._id) diff --git a/marconiclient/queues/v1/queues.py b/marconiclient/queues/v1/queues.py index 376f6985..62fbef80 100644 --- a/marconiclient/queues/v1/queues.py +++ b/marconiclient/queues/v1/queues.py @@ -14,15 +14,16 @@ # limitations under the License. from marconiclient.queues.v1 import core +from marconiclient.queues.v1 import message class Queue(object): - def __init__(self, client, queue_id, auto_create=True): + def __init__(self, client, queue_name, auto_create=True): self.client = client # NOTE(flaper87) Queue Info - self._id = queue_id + self._name = queue_name self._metadata = None if auto_create: @@ -31,7 +32,7 @@ class Queue(object): def exists(self): """Checks if the queue exists.""" req, trans = self.client._request_and_transport() - return core.queue_exists(trans, req, self._id) + return core.queue_exists(trans, req, self._name) def ensure_exists(self): """Ensures a queue exists @@ -41,7 +42,7 @@ class Queue(object): right after it was called. """ req, trans = self.client._request_and_transport() - core.queue_create(trans, req, self._id) + core.queue_create(trans, req, self._name) def metadata(self, new_meta=None, force_reload=False): """Get metadata and return it @@ -61,19 +62,19 @@ class Queue(object): req, trans = self.client._request_and_transport() if new_meta: - core.queue_set_metadata(trans, req, self._id, new_meta) + core.queue_set_metadata(trans, req, self._name, new_meta) self._metadata = new_meta # TODO(flaper87): Cache with timeout if self._metadata and not force_reload: return self._metadata - self._metadata = core.queue_get_metadata(trans, req, self._id) + self._metadata = core.queue_get_metadata(trans, req, self._name) return self._metadata def delete(self): req, trans = self.client._request_and_transport() - core.queue_delete(trans, req, self._id) + core.queue_delete(trans, req, self._name) # Messages API @@ -93,7 +94,7 @@ class Queue(object): # TODO(flaper87): Return a list of messages return core.message_post(trans, req, - self._id, messages) + self._name, messages) def message(self, message_id): """Gets a message by id @@ -105,8 +106,9 @@ class Queue(object): :rtype: `dict` """ req, trans = self.client._request_and_transport() - return core.message_get(trans, req, self._id, - message_id) + msg = core.message_get(trans, req, self._name, + message_id) + return message.Message(self, **msg) def messages(self, *messages, **params): """Gets a list of messages from the server @@ -135,12 +137,14 @@ class Queue(object): # and messages deserialization. if messages: - return core.message_get_many(trans, req, - self._id, messages) + msgs = core.message_get_many(trans, req, + self._name, messages) + else: + # NOTE(flaper87): It's safe to access messages + # directly. If something wrong happens, the core + # API will raise the right exceptions. + msgs = core.message_list(trans, req, + self._name, + **params) - # NOTE(flaper87): It's safe to access messages - # directly. If something wrong happens, the core - # API will raise the right exceptions. - return core.message_list(trans, req, - self._id, - **params)['messages'] + return message._MessageIterator(self, msgs) diff --git a/marconiclient/tests/queues/base.py b/marconiclient/tests/queues/base.py index d529264d..f6abdbee 100644 --- a/marconiclient/tests/queues/base.py +++ b/marconiclient/tests/queues/base.py @@ -17,10 +17,15 @@ import mock from marconiclient.queues import client from marconiclient.tests import base +from marconiclient.tests.transport import dummy class QueuesTestBase(base.TestBase): + transport_cls = dummy.DummyTransport + url = 'http://127.0.0.1:8888/v1' + version = 1 + def setUp(self): super(QueuesTestBase, self).setUp() self.transport = self.transport_cls(self.conf) diff --git a/marconiclient/tests/queues/messages.py b/marconiclient/tests/queues/messages.py new file mode 100644 index 00000000..d2d4664c --- /dev/null +++ b/marconiclient/tests/queues/messages.py @@ -0,0 +1,42 @@ +# Copyright (c) 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import mock + +from marconiclient.tests.queues import base +from marconiclient.transport import response + + +class QueuesV1MessageUnitTest(base.QueuesTestBase): + + def test_message_delete(self): + returned = { + 'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', 'mode': 'active'} + } + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(returned)) + send_method.return_value = resp + + msg = self.queue.message('50b68a50d6f5b8c8a7c62b01') + + send_method.return_value = None + self.assertIsNone(msg.delete()) diff --git a/marconiclient/tests/queues/queues.py b/marconiclient/tests/queues/queues.py index f3c19baa..d820f991 100644 --- a/marconiclient/tests/queues/queues.py +++ b/marconiclient/tests/queues/queues.py @@ -16,6 +16,7 @@ import json import mock +from marconiclient.queues.v1 import message from marconiclient.tests.queues import base from marconiclient.transport import response @@ -113,7 +114,8 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase): resp = response.Response(None, json.dumps(returned)) send_method.return_value = resp - self.queue.messages(limit=1) + msgs = self.queue.messages(limit=1) + self.assertIsInstance(msgs, message._MessageIterator) # NOTE(flaper87): Nothing to assert here, # just checking our way down to the transport @@ -133,8 +135,8 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase): resp = response.Response(None, json.dumps(returned)) send_method.return_value = resp - msg = self.queue.message('50b68a50d6f5b8c8a7c62b01') - self.assertTrue(isinstance(msg, dict)) + msgs = self.queue.message('50b68a50d6f5b8c8a7c62b01') + self.assertIsInstance(msgs, message.Message) # NOTE(flaper87): Nothing to assert here, # just checking our way down to the transport @@ -161,7 +163,7 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase): msg = self.queue.messages('50b68a50d6f5b8c8a7c62b01', '50b68a50d6f5b8c8a7c62b02') - self.assertTrue(isinstance(msg, list)) + self.assertIsInstance(msg, message._MessageIterator) # NOTE(flaper87): Nothing to assert here, # just checking our way down to the transport diff --git a/marconiclient/tests/transport/api.py b/marconiclient/tests/transport/api.py index 926ed4e0..66ee4234 100644 --- a/marconiclient/tests/transport/api.py +++ b/marconiclient/tests/transport/api.py @@ -17,6 +17,8 @@ from marconiclient.transport import api class FakeApi(api.Api): + label = 'v1' + schema = { 'test_operation': { 'ref': 'test/{name}', diff --git a/marconiclient/transport/api.py b/marconiclient/transport/api.py index 7cefc13b..4c6f9a19 100644 --- a/marconiclient/transport/api.py +++ b/marconiclient/transport/api.py @@ -22,6 +22,7 @@ from marconiclient import errors class Api(object): schema = {} + label = None validators = {} def get_schema(self, operation): diff --git a/marconiclient/transport/http.py b/marconiclient/transport/http.py index 835e858d..20525dc3 100644 --- a/marconiclient/transport/http.py +++ b/marconiclient/transport/http.py @@ -45,9 +45,19 @@ class HttpTransport(base.Transport): # happen before any other operation here. # request.validate() - schema = request.api.get_schema(request.operation) - ref = schema.get('ref', '') + schema = {} ref_params = {} + ref = request.ref + + if request.operation: + schema = request.api.get_schema(request.operation) + ref = ref or schema.get('ref', '') + + # FIXME(flaper87): We expect the endpoint + # to have the API version label already, + # however in a follow-your-nose implementation + # it should be the other way around. + ref = ref.lstrip('/' + request.api.label) for param in list(request.params.keys()): if '{{{0}}}'.format(param) in ref: diff --git a/marconiclient/transport/request.py b/marconiclient/transport/request.py index 4edb70e0..62a4d4d6 100644 --- a/marconiclient/transport/request.py +++ b/marconiclient/transport/request.py @@ -77,7 +77,7 @@ class Request(object): """ def __init__(self, endpoint='', operation='', - content=None, params=None, + ref='', content=None, params=None, headers=None, api=None): self._api = None @@ -85,6 +85,7 @@ class Request(object): self.endpoint = endpoint self.operation = operation + self.ref = ref self.content = content self.params = params or {} self.headers = headers or {} diff --git a/tests/unit/queues/v1/test_message.py b/tests/unit/queues/v1/test_message.py index ee67f38e..3fc2b365 100644 --- a/tests/unit/queues/v1/test_message.py +++ b/tests/unit/queues/v1/test_message.py @@ -12,61 +12,65 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -import unittest +import json import mock from marconiclient.queues.v1 import message -from marconiclient.tests.mock import message as mock_message +from marconiclient.tests.queues import base +from marconiclient.tests.queues import messages as test_message +from marconiclient.transport import http +from marconiclient.transport import response -HREF = '/v1/queue/dgq/messages/my_msg_is_chocolate' -AGE = 100 -TTL = 120 +class TestMessageIterator(base.QueuesTestBase): + + def test_no_next_iteration(self): + messages = {'links': [], + 'messages': [{ + 'href': '/v1/queues/mine/messages/123123423', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', + 'mode': 'active'} + }] + } + + iterator = message._MessageIterator(self.queue, messages) + iterated = [msg for msg in iterator] + self.assertEqual(len(iterated), 1) + + def test_next_page(self): + messages = {'links': [], + 'messages': [{ + 'href': '/v1/queues/mine/messages/123123423', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', + 'mode': 'active'} + }] + } + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(messages)) + send_method.return_value = resp + + # NOTE(flaper87): The first iteration will return 1 message + # and then call `_next_page` which will use the rel-next link + # to get a new set of messages. + link = {'rel': 'next', + 'href': "/v1/queues/mine/messages?marker=6244-244224-783"} + messages['links'].append(link) + + iterator = message._MessageIterator(self.queue, messages) + iterated = [msg for msg in iterator] + self.assertEqual(len(iterated), 2) -class TestSimpleMessage(unittest.TestCase): - def setUp(self): - msg_body = { - 'href': HREF, - 'ttl': TTL, - 'age': AGE, - 'body': {'name': 'chocolate'} - } - self.conn = mock.MagicMock() - self.msg = message.from_dict(msg_body, connection=self.conn) +class QueuesV1MessageHttpUnitTest(test_message.QueuesV1MessageUnitTest): - def _attr_check(self, xhref, xttl, xage, xbody): - self.assertEqual(self.msg.href, xhref) - self.assertEqual(self.msg.ttl, xttl) - self.assertEqual(self.msg.age, xage) - self.assertEqual(self.msg.body, xbody) - - def test_attributes_match_expected(self): - self._attr_check(xhref=HREF, xttl=TTL, xage=AGE, - xbody={'name': 'chocolate'}) - - def test_repr_matches_expected(self): - self.assertEqual(repr(self.msg), - '' % (self.msg.ttl,)) - - def test_delete_works(self): - self.msg.delete() - - def test_reload_works(self): - msg = mock_message.message( - href=HREF, ttl=TTL - 1, age=AGE + 1, - body={'name': 'vanilla'}) - self.conn.get.return_value = mock.MagicMock() - self.conn.get.return_value.json.return_value = msg - self.msg.reload() - self._attr_check(xhref=HREF, xttl=TTL - 1, xage=AGE + 1, - xbody={'name': 'vanilla'}) - - def test_reload_after_delete_throws(self): - self.msg.delete() - self.assertRaises(AssertionError, self.msg.reload) - - def test_delete_after_delete_throws(self): - self.msg.delete() - self.assertRaises(AssertionError, self.msg.delete) + transport_cls = http.HttpTransport + url = 'http://127.0.0.1:8888/v1' + version = 1