diff --git a/marconiclient/queues/v1/api.py b/marconiclient/queues/v1/api.py index 53d5d185..e0c8e3e5 100644 --- a/marconiclient/queues/v1/api.py +++ b/marconiclient/queues/v1/api.py @@ -85,4 +85,60 @@ class V1(api.Api): 'queue_name': {'type': 'string'} } }, + + 'message_list': { + 'ref': 'queues/{queue_name}/messages', + 'method': 'GET', + 'required': ['queue_name'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'marker': {'type': 'string'}, + 'limit': {'type': 'integer'}, + 'echo': {'type': 'boolean'}, + 'include_claimed': {'type': 'boolean'}, + } + }, + + 'message_post': { + 'ref': 'queues/{queue_name}/messages', + 'method': 'POST', + 'required': ['queue_name', 'message_id'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'claim_id': {'type': 'string'}, + } + }, + + 'message_get': { + 'ref': 'queues/{queue_name}/messages/{message_id}', + 'method': 'GET', + 'required': ['queue_name', 'message_id'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'message_id': {'type': 'string'}, + 'claim_id': {'type': 'string'}, + } + }, + + 'message_get_many': { + 'ref': 'queues/{queue_name}/messages', + 'method': 'GET', + 'required': ['queue_name', 'ids'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'ids': {'type': 'string'}, + 'claim_id': {'type': 'string'}, + } + }, + + 'message_delete': { + 'ref': 'queues/{queue_name}/messages/{message_id}', + 'method': 'DELETE', + 'required': ['queue_name', 'message_id'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'message_id': {'type': 'string'}, + 'claim_id': {'type': 'string'}, + } + }, } diff --git a/marconiclient/queues/v1/client.py b/marconiclient/queues/v1/client.py index fbb94b0a..a31db9d4 100644 --- a/marconiclient/queues/v1/client.py +++ b/marconiclient/queues/v1/client.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import uuid + from oslo.config import cfg from marconiclient.queues.v1 import queues @@ -22,6 +24,10 @@ from marconiclient import transport _CLIENT_OPTIONS = [ cfg.StrOpt('os_queues_url', help='Queues remote URL'), + + cfg.StrOpt('client_uuid', + default=uuid.uuid4().hex, + help='Client UUID'), ] @@ -38,6 +44,8 @@ class Client(object): self.api_url = self.conf.os_queues_url or url self.api_version = version + self.client_uuid = self.conf.client_uuid + def transport(self): """Gets a transport based on conf.""" return transport.get_transport_for_conf(self.conf) diff --git a/marconiclient/queues/v1/core.py b/marconiclient/queues/v1/core.py index 93f0c31d..cd657824 100644 --- a/marconiclient/queues/v1/core.py +++ b/marconiclient/queues/v1/core.py @@ -91,3 +91,144 @@ def queue_delete(transport, request, name, callback=None): """Deletes queue.""" return _common_queue_ops('queue_delete', transport, request, name, callback=callback) + + +def message_list(transport, request, queue_name, callback=None, **kwargs): + """Gets a list of messages in queue `queue_name` + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + :param queue_name: Queue reference name. + :type queue_name: `six.text_type` + :param callback: Optional callable to use as callback. + If specified, this request will be sent asynchronously. + (IGNORED UNTIL ASYNC SUPPORT IS COMPLETE) + :type callback: Callable object. + :param kwargs: Optional arguments for this operation. + - marker: Where to start getting messages from. + - limit: Maximum number of messages to get. + - echo: Whether to get our own messages. + - include_claimed: Whether to include claimed + messages. + """ + + request.operation = 'message_list' + request.params['queue_name'] = queue_name + + # NOTE(flaper87): Assume passed params + # are accepted by the API, if not, the + # API itself will raise an error. + request.params.update(kwargs) + + resp = transport.send(request) + + if not resp.content: + # NOTE(flaper87): We could also return None + # or an empty dict, however, we're giving + # more value to a consistent API here by + # returning a compliant dict with empty + # `links` and `messages` + return {'links': [], 'messages': []} + + return json.loads(resp.content) + + +def message_post(transport, request, queue_name, messages, callback=None): + """Post messages to `queue_name` + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + :param queue_name: Queue reference name. + :type queue_name: `six.text_type` + :param messages: One or more messages to post. + :param messages: `list` + :param callback: Optional callable to use as callback. + If specified, this request will be sent asynchronously. + (IGNORED UNTIL ASYNC SUPPORT IS COMPLETE) + :type callback: Callable object. + """ + + request.operation = 'message_post' + request.params['queue_name'] = queue_name + request.content = json.dumps(messages) + + resp = transport.send(request) + return json.loads(resp.content) + + +def message_get(transport, request, queue_name, message_id, callback=None): + """Gets one message from the queue by id + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + :param queue_name: Queue reference name. + :type queue_name: `six.text_type` + :param message_id: Message reference. + :param message_id: `six.text_type` + :param callback: Optional callable to use as callback. + If specified, this request will be sent asynchronously. + (IGNORED UNTIL ASYNC SUPPORT IS COMPLETE) + :type callback: Callable object. + """ + + request.operation = 'message_get' + request.params['queue_name'] = queue_name + request.params['message_id'] = message_id + + resp = transport.send(request) + return json.loads(resp.content) + + +def message_get_many(transport, request, queue_name, messages, callback=None): + """Gets many messages by id + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + :param queue_name: Queue reference name. + :type queue_name: `six.text_type` + :param messages: Messages references. + :param messages: list of `six.text_type` + :param callback: Optional callable to use as callback. + If specified, this request will be sent asynchronously. + (IGNORED UNTIL ASYNC SUPPORT IS COMPLETE) + :type callback: Callable object. + """ + + request.operation = 'message_get_many' + request.params['queue_name'] = queue_name + request.params['ids'] = messages + + resp = transport.send(request) + return json.loads(resp.content) + + +def message_delete(transport, request, queue_name, message_id, callback=None): + """Deletes messages from `queue_name` + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + :param queue_name: Queue reference name. + :type queue_name: `six.text_type` + :param message_id: Message reference. + :param message_id: `six.text_type` + :param callback: Optional callable to use as callback. + If specified, this request will be sent asynchronously. + (IGNORED UNTIL ASYNC SUPPORT IS COMPLETE) + :type callback: Callable object. + """ + + request.operation = 'message_delete' + request.params['queue_name'] = queue_name + request.params['message_id'] = message_id + + return transport.send(request) diff --git a/marconiclient/queues/v1/queues.py b/marconiclient/queues/v1/queues.py index 7b37d9c1..db7f5256 100644 --- a/marconiclient/queues/v1/queues.py +++ b/marconiclient/queues/v1/queues.py @@ -52,6 +52,8 @@ class Queue(object): endpoint=self.client.api_url, api=api) + req.headers['Client-ID'] = self.client.client_uuid + trans = self._get_transport(req) return req, trans @@ -101,3 +103,73 @@ class Queue(object): def delete(self): req, trans = self._request_and_transport() core.queue_delete(trans, req, self._id) + + # Messages API + + def post(self, messages): + """Posts one or more messages to this queue + + :param messages: One or more messages to post + :type messages: `list` or `dict` + + :returns: A dict with the result of this operation. + :rtype: `dict` + """ + if not isinstance(messages, list): + messages = [messages] + + req, trans = self._request_and_transport() + + # TODO(flaper87): Return a list of messages + return core.message_post(trans, req, + self._id, messages) + + def message(self, message_id): + """Gets a message by id + + :param message_id: Message's reference + :type message_id: `six.text_type` + + :returns: A message + :rtype: `dict` + """ + req, trans = self._request_and_transport() + return core.message_get(trans, req, self._id, + message_id) + + def messages(self, *messages, **params): + """Gets a list of messages from the server + + This method returns a list of messages, it can be + used to retrieve a set of messages by id or to + walk through the active messages by using the + collection endpoint. + + The `messages` and `params` params are mutually exclusive + and the former has the priority. + + :param messages: List of messages' ids to retrieve. + :type messages: *args of `six.string_type` + + :param params: Filters to use for getting messages + :type params: **kwargs dict. + + :returns: List of messages + :rtype: `list` + """ + req, trans = self._request_and_transport() + + # TODO(flaper87): Return a MessageIterator. + # This iterator should handle limits, pagination + # and messages deserialization. + + if messages: + return core.message_get_many(trans, req, + self._id, messages) + + # NOTE(flaper87): It's safe to access messages + # directly. If something wrong happens, the core + # API will raise the right exceptions. + return core.message_list(trans, req, + self._id, + **params)['messages'] diff --git a/marconiclient/tests/queues/queues.py b/marconiclient/tests/queues/queues.py index 27901a92..5c1aa263 100644 --- a/marconiclient/tests/queues/queues.py +++ b/marconiclient/tests/queues/queues.py @@ -104,6 +104,100 @@ class QueuesV1QueueTestBase(base.TestBase): # just checking our way down to the transport # doesn't crash. + def test_message_post(self): + messages = [{'ttl': 30, 'body': 'Post It!'}] + + result = { + "resources": [ + "/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01" + ], + "partial": False + } + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(result)) + send_method.return_value = resp + + posted = self.queue.post(messages) + self.assertEqual(result, posted) + + def test_message_list(self): + returned = { + 'links': [{ + 'rel': 'next', + 'href': '/v1/queues/fizbit/messages?marker=6244-244224-783' + }], + 'messages': [{ + 'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', + 'mode': 'active'} + }] + } + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(returned)) + send_method.return_value = resp + + self.queue.messages(limit=1) + + # NOTE(flaper87): Nothing to assert here, + # just checking our way down to the transport + # doesn't crash. + + def test_message_get(self): + returned = { + 'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', 'mode': 'active'} + } + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(returned)) + send_method.return_value = resp + + msg = self.queue.message('50b68a50d6f5b8c8a7c62b01') + self.assertTrue(isinstance(msg, dict)) + + # NOTE(flaper87): Nothing to assert here, + # just checking our way down to the transport + # doesn't crash. + + def test_message_get_many(self): + returned = [{ + 'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', 'mode': 'active'} + }, { + 'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b02', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', 'mode': 'active'} + }] + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(returned)) + send_method.return_value = resp + + msg = self.queue.messages('50b68a50d6f5b8c8a7c62b01', + '50b68a50d6f5b8c8a7c62b02') + self.assertTrue(isinstance(msg, list)) + + # NOTE(flaper87): Nothing to assert here, + # just checking our way down to the transport + # doesn't crash. + class QueuesV1QueueFuncMixin(object): @@ -154,3 +248,86 @@ class QueuesV1QueueFuncMixin(object): queue._metadata = 'test' metadata = queue.metadata(force_reload=True) self.assertEqual(metadata, test_metadata) + + @testtools.skipUnless(_RUN_FUNCTIONAL, + 'Functional tests disabled') + def test_message_post_functional(self): + messages = [ + {'ttl': 60, 'body': 'Post It!'}, + {'ttl': 60, 'body': 'Post It!'}, + {'ttl': 60, 'body': 'Post It!'}, + ] + + queue = self.client.queue("nonono") + queue._get_transport = mock.Mock(return_value=self.transport) + result = queue.post(messages) + self.assertIn('resources', result) + self.assertEqual(len(result['resources']), 3) + + @testtools.skipUnless(_RUN_FUNCTIONAL, + 'Functional tests disabled') + def test_message_list_functional(self): + queue = self.client.queue("test_queue") + queue._get_transport = mock.Mock(return_value=self.transport) + + messages = [{'ttl': 60, 'body': 'Post It 1!'}] + queue.post(messages) + + messages = queue.messages() + self.assertTrue(isinstance(messages, list)) + self.assertGreaterEqual(len(messages), 0) + + @testtools.skipUnless(_RUN_FUNCTIONAL, + 'Functional tests disabled') + def test_message_list_echo_functional(self): + queue = self.client.queue("test_queue") + queue._get_transport = mock.Mock(return_value=self.transport) + + messages = [ + {'ttl': 60, 'body': 'Post It 1!'}, + {'ttl': 60, 'body': 'Post It 2!'}, + {'ttl': 60, 'body': 'Post It 3!'}, + ] + queue.post(messages) + messages = queue.messages(echo=True) + self.assertTrue(isinstance(messages, list)) + self.assertGreaterEqual(len(messages), 3) + + @testtools.skipUnless(_RUN_FUNCTIONAL, + 'Functional tests disabled') + def test_message_get_functional(self): + queue = self.client.queue("test_queue") + queue._get_transport = mock.Mock(return_value=self.transport) + + messages = [ + {'ttl': 60, 'body': 'Post It 1!'}, + {'ttl': 60, 'body': 'Post It 2!'}, + {'ttl': 60, 'body': 'Post It 3!'}, + ] + + res = queue.post(messages)['resources'] + msg_id = res[0].split('/')[-1] + message = queue.message(msg_id) + self.assertTrue(isinstance(message, dict)) + self.assertEqual(message['href'], res[0]) + + @testtools.skipUnless(_RUN_FUNCTIONAL, + 'Functional tests disabled') + def test_message_get_many_functional(self): + queue = self.client.queue("test_queue") + queue._get_transport = mock.Mock(return_value=self.transport) + + messages = [ + {'ttl': 60, 'body': 'Post It 1!'}, + + # NOTE(falper87): Waiting for + # https://github.com/racker/falcon/issues/198 + #{'ttl': 60, 'body': 'Post It 2!'}, + #{'ttl': 60, 'body': 'Post It 3!'}, + ] + + res = queue.post(messages)['resources'] + msgs_id = [ref.split('/')[-1] for ref in res] + messages = queue.messages(*msgs_id) + self.assertTrue(isinstance(messages, list)) + self.assertEqual(len(messages), 1) diff --git a/marconiclient/transport/http.py b/marconiclient/transport/http.py index 93dba3ac..80ca8cb4 100644 --- a/marconiclient/transport/http.py +++ b/marconiclient/transport/http.py @@ -51,7 +51,16 @@ class HttpTransport(base.Transport): for param in list(request.params.keys()): if '{{{0}}}'.format(param) in ref: - ref_params[param] = request.params.pop(param) + value = request.params.pop(param) + + # NOTE(flaper87): Marconi API parses + # sequences encoded as '1,2,3,4'. Let's + # encode lists, tuples and sets before + # sending them to the server. + if isinstance(value, (list, tuple, set)): + value = ','.join(value) + + ref_params[param] = value url = '{0}/{1}'.format(request.endpoint.rstrip('/'), ref.format(**ref_params)) diff --git a/tests/unit/queues/v1/test_core.py b/tests/unit/queues/v1/test_core.py index 2d0d3209..0923c779 100644 --- a/tests/unit/queues/v1/test_core.py +++ b/tests/unit/queues/v1/test_core.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import mock from marconiclient.queues.v1 import core @@ -87,4 +88,81 @@ class TestV1Core(base.TestBase): core.queue_exists(self.transport, req, update_data, 'test') self.assertIn('queue_name', req.params) - self.assertIn('queue_name', req.params) + def test_message_post(self): + messages = [{'ttl': 30, 'body': 'Post It!'}] + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + resp = response.Response(None, '{}') + send_method.return_value = resp + + req = request.Request() + + core.message_post(self.transport, req, 'test', messages) + self.assertIn('queue_name', req.params) + self.assertEqual(json.loads(req.content), + messages) + + def test_message_list(self): + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + resp = response.Response(None, '{}') + send_method.return_value = resp + + req = request.Request() + + core.message_list(self.transport, req, 'test') + self.assertIn('queue_name', req.params) + + def test_message_list_kwargs(self): + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + resp = response.Response(None, '{}') + send_method.return_value = resp + + req = request.Request() + + core.message_list(self.transport, req, 'test', + marker='supermarket', + echo=False, limit=10) + + self.assertIn('queue_name', req.params) + self.assertIn('limit', req.params) + self.assertIn('echo', req.params) + self.assertIn('marker', req.params) + + def test_message_get_many(self): + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + resp = response.Response(None, '{}') + send_method.return_value = resp + + req = request.Request() + + ids = ['a', 'b'] + core.message_get_many(self.transport, req, + 'test', ids) + + self.assertIn('queue_name', req.params) + self.assertIn('ids', req.params) + self.assertEqual(ids, req.params['ids']) + + def test_message_get(self): + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + resp = response.Response(None, '{}') + send_method.return_value = resp + + req = request.Request() + core.message_get(self.transport, req, + 'test', 'message_id') + + def test_message_delete(self): + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + resp = response.Response(None, None) + send_method.return_value = resp + + req = request.Request() + core.message_delete(self.transport, req, + 'test', 'message_id')