Merge "Claim messages and delete messages"

This commit is contained in:
Jenkins
2015-06-04 17:28:05 +00:00
committed by Gerrit Code Review
6 changed files with 259 additions and 46 deletions

View File

@@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from openstack.message.v1 import claim
from openstack.message.v1 import message from openstack.message.v1 import message
from openstack.message.v1 import queue from openstack.message.v1 import queue
from openstack import proxy from openstack import proxy
@@ -44,29 +45,36 @@ class Proxy(proxy.BaseProxy):
""" """
return self._delete(queue.Queue, value, ignore_missing=ignore_missing) return self._delete(queue.Queue, value, ignore_missing=ignore_missing)
def create_messages(self, client, value, messages): def create_messages(self, values):
"""Create new messages """Create new messages
:param uuid client: A UUID for each client instance. The UUID must :param list values: The list of
be submitted in its canonical form (for
example, 3381af92-2b9e-11e3-b191-71861300734c).
The client generates this UUID once.
The client UUID persists between restarts of the
client so the client should reuse that same
UUID. All message-related operations
require the use of the client UUID in the headers
to ensure that messages are not echoed back
to the client that posted them, unless the
client explicitly requests this.
:param value: The value can be either the name of a queue or a
:class:`~openstack.message.v1.queue.Queue` instance.
:param list messages: The list of
:class:`~openstack.message.v1.message.Message`s to create. :class:`~openstack.message.v1.message.Message`s to create.
:returns: The results of message creation :returns: The results of message creation
:rtype: list ids: A list of ids that correspond to the messages :rtype: list messages: The list of
created, in order. :class:`~openstack.message.v1.message.Message`s created.
""" """
queue_name = queue.Queue.get_id(value) return message.Message.create_messages(self.session, values)
return message.Message.create_from_messages(self.session, client,
queue_name, messages) def claim_messages(self, value):
"""Claims a set of messages.
:param value: The value must be a
:class:`~openstack.message.v1.claim.Claim` instance.
:returns: The results of a claim
:rtype: list messages: The list of
:class:`~openstack.message.v1.message.Message`s claimed.
"""
return claim.Claim.claim_messages(self.session, value)
def delete_message(self, value):
"""Delete a message
:param value: The value must be a
:class:`~openstack.message.v1.message.Message` instance.
:returns: ``None``
"""
message.Message.delete_by_id(self.session, value)

View File

@@ -0,0 +1,71 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from openstack.message import message_service
from openstack.message.v1 import message
from openstack import resource
class Claim(resource.Resource):
resources_key = 'claims'
base_path = "/queues/%(queue_name)s/claims"
service = message_service.MessageService()
# capabilities
allow_create = True
allow_list = False
allow_retrieve = False
allow_delete = False
#: A UUID for each client instance. The UUID must be submitted in its
#: canonical form (for example, 3381af92-2b9e-11e3-b191-71861300734c).
#: The client generates this UUID once. The client UUID persists between
#: restarts of the client so the client should reuse that same UUID.
#: All message-related operations require the use of the client UUID in
#: the headers to ensure that messages are not echoed back to the client
#: that posted them, unless the client explicitly requests this.
client = None
#: The queue this Claim belongs to.
queue = None
#: Specifies the number of Messages to return.
limit = None
#: Specifies how long the server waits before releasing the claim,
#: in seconds.
ttl = resource.prop("ttl")
#: Specifies the message grace period, in seconds.
grace = resource.prop("grace")
@classmethod
def claim_messages(cls, session, claim):
"""Create a remote resource from this instance."""
url = cls._get_url({'queue_name': claim.queue})
headers = {'Client-ID': claim.client}
params = {'limit': claim.limit} if claim.limit else None
resp = session.post(url, service=cls.service, headers=headers,
data=json.dumps(claim, cls=ClaimEncoder),
params=params)
for message_attrs in resp.body:
yield message.Message.new(
client=claim.client, queue=claim.queue, **message_attrs)
class ClaimEncoder(json.JSONEncoder):
def default(self, claim):
return {'ttl': claim.ttl, 'grace': claim.grace}

View File

@@ -10,13 +10,14 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import copy
import json import json
from six.moves.urllib import parse
from openstack.message import message_service from openstack.message import message_service
from openstack import resource from openstack import resource
from six.moves.urllib import parse
class Message(resource.Resource): class Message(resource.Resource):
resources_key = 'messages' resources_key = 'messages'
@@ -29,6 +30,21 @@ class Message(resource.Resource):
allow_retrieve = False allow_retrieve = False
allow_delete = False allow_delete = False
#: A UUID for each client instance. The UUID must be submitted in its
#: canonical form (for example, 3381af92-2b9e-11e3-b191-71861300734c).
#: The client generates this UUID once. The client UUID persists between
#: restarts of the client so the client should reuse that same UUID.
#: All message-related operations require the use of the client UUID in
#: the headers to ensure that messages are not echoed back to the client
#: that posted them, unless the client explicitly requests this.
client = None
#: The queue this Message belongs to.
queue = None
#: A relative href that references this Message.
href = resource.prop("href")
#: An arbitrary JSON document that constitutes the body of the message #: An arbitrary JSON document that constitutes the body of the message
#: being sent. #: being sent.
body = resource.prop("body") body = resource.prop("body")
@@ -40,28 +56,49 @@ class Message(resource.Resource):
#: Specifies how long the message has been in the queue, in seconds. #: Specifies how long the message has been in the queue, in seconds.
age = resource.prop("age") age = resource.prop("age")
@staticmethod
def get_message_id(href):
"""Get the ID of a message, which is the last component in an href."""
path = parse.urlparse(href).path
return path[path.rfind('/')+1:]
@classmethod @classmethod
def create_from_messages(cls, session, client_id=None, queue_name=None, def create_messages(cls, session, messages):
messages=None): if len(messages) == 0:
"""Create a remote resource from this instance.""" raise ValueError('messages cannot be empty')
url = cls._get_url({'queue_name': queue_name})
headers = {'Client-ID': client_id} for i, message in enumerate(messages, -1):
if message.queue != messages[i].queue:
raise ValueError('All queues in messages must be equal')
if message.client != messages[i].client:
raise ValueError('All clients in messages must be equal')
url = cls._get_url({'queue_name': messages[0].queue})
headers = {'Client-ID': messages[0].client}
resp = session.post(url, service=cls.service, headers=headers, resp = session.post(url, service=cls.service, headers=headers,
data=json.dumps(messages, cls=MessageEncoder)) data=json.dumps(messages, cls=MessageEncoder))
messages_deepcopy = copy.deepcopy(messages)
hrefs = resp.body['resources'] hrefs = resp.body['resources']
ids = [cls.get_message_id(href) for href in hrefs]
return ids for i, href in enumerate(hrefs):
messages_deepcopy[i].href = href
return messages_deepcopy
@classmethod
def _strip_version(cls, href):
path = parse.urlparse(href).path
if path.startswith('/v'):
return href[href.find('/', 1):]
else:
return href
@classmethod
def delete_by_id(cls, session, message, path_args=None):
url = cls._strip_version(message.href)
headers = {'Client-ID': message.client}
session.delete(url, service=cls.service,
headers=headers, accept=None)
class MessageEncoder(json.JSONEncoder): class MessageEncoder(json.JSONEncoder):
def default(self, obj): def default(self, message):
return obj._attrs return {'body': message.body, 'ttl': message.ttl}

View File

@@ -0,0 +1,62 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import testtools
from openstack.message.v1 import claim
CLIENT = '3381af92-2b9e-11e3-b191-71861300734c'
QUEUE = 'test_queue'
LIMIT = 2
FAKE = {
'ttl': 300,
'grace': 60
}
class TestClaim(testtools.TestCase):
def test_basic(self):
sot = claim.Claim()
self.assertEqual('claims', sot.resources_key)
self.assertEqual('/queues/%(queue_name)s/claims', sot.base_path)
self.assertEqual('messaging', sot.service.service_type)
self.assertTrue(sot.allow_create)
self.assertFalse(sot.allow_retrieve)
self.assertFalse(sot.allow_update)
self.assertFalse(sot.allow_delete)
self.assertFalse(sot.allow_list)
def test_make_it(self):
sot = claim.Claim.new(client=CLIENT, queue=QUEUE, limit=LIMIT, **FAKE)
self.assertEqual(CLIENT, sot.client)
self.assertEqual(QUEUE, sot.queue)
self.assertEqual(LIMIT, sot.limit)
self.assertEqual(FAKE['ttl'], sot.ttl)
self.assertEqual(FAKE['grace'], sot.grace)
def test_create(self):
sess = mock.Mock()
sess.post = mock.Mock()
sess.post.return_value = mock.MagicMock()
sot = claim.Claim()
list(sot.claim_messages(
sess, claim.Claim.new(client=CLIENT, queue=QUEUE, **FAKE)))
url = '/queues/%s/claims' % QUEUE
sess.post.assert_called_with(
url, service=sot.service,
headers={'Client-ID': CLIENT}, params=None,
data=json.dumps(FAKE, cls=claim.ClaimEncoder))

View File

@@ -16,12 +16,17 @@ import testtools
from openstack.message.v1 import message from openstack.message.v1 import message
CLIENT_ID = '3381af92-2b9e-11e3-b191-71861300734c' CLIENT = '3381af92-2b9e-11e3-b191-71861300734c'
QUEUE_NAME = 'test_queue' QUEUE = 'test_queue'
FAKE = { FAKE = {
'ttl': 300, 'ttl': 300,
'body': {'key': 'value'} 'body': {'key': 'value'}
} }
FAKE_HREF = {
'href': '/v1/queues/test_queue/messages/1234',
'ttl': 300,
'body': {'key': 'value'}
}
class TestMessage(testtools.TestCase): class TestMessage(testtools.TestCase):
@@ -48,11 +53,25 @@ class TestMessage(testtools.TestCase):
sess.post.return_value = mock.MagicMock() sess.post.return_value = mock.MagicMock()
sot = message.Message() sot = message.Message()
sot.create_from_messages( sot.create_messages(
sess, CLIENT_ID, QUEUE_NAME, [message.Message.new(**FAKE)]) sess, [message.Message.new(client=CLIENT, queue=QUEUE, **FAKE)])
url = '/queues/%s/messages' % QUEUE_NAME url = '/queues/%s/messages' % QUEUE
sess.post.assert_called_with( sess.post.assert_called_with(
url, service=sot.service, url, service=sot.service,
headers={'Client-ID': CLIENT_ID}, headers={'Client-ID': CLIENT},
data=json.dumps([FAKE], cls=message.MessageEncoder)) data=json.dumps([FAKE], cls=message.MessageEncoder))
def test_delete(self):
sess = mock.Mock()
sess.delete = mock.Mock()
sess.delete.return_value = mock.Mock()
sot = message.Message()
sot.delete_by_id(
sess, message.Message.new(client=CLIENT, queue=QUEUE, **FAKE_HREF))
url = '/queues/%s/messages/1234' % QUEUE
sess.delete.assert_called_with(
url, service=sot.service, accept=None,
headers={'Client-ID': CLIENT})

View File

@@ -11,6 +11,8 @@
# under the License. # under the License.
from openstack.message.v1 import _proxy from openstack.message.v1 import _proxy
from openstack.message.v1 import claim
from openstack.message.v1 import message
from openstack.message.v1 import queue from openstack.message.v1 import queue
from openstack.tests.unit import test_proxy_base from openstack.tests.unit import test_proxy_base
@@ -41,7 +43,21 @@ class TestMessageProxy(test_proxy_base.TestProxyBase):
def test_messages_create(self): def test_messages_create(self):
self.verify_create2( self.verify_create2(
'openstack.message.v1.message.Message.create_from_messages', 'openstack.message.v1.message.Message.create_messages',
self.proxy.create_messages, self.proxy.create_messages,
method_args=[CLIENT_ID, QUEUE_NAME, []], method_args=[[]],
expected_args=[self.session, CLIENT_ID, QUEUE_NAME, []]) expected_args=[self.session, []])
def test_messages_claim(self):
self.verify_create2(
'openstack.message.v1.claim.Claim.claim_messages',
self.proxy.claim_messages,
method_args=[claim.Claim],
expected_args=[self.session, claim.Claim])
def test_message_delete(self):
self.verify_delete(
'openstack.message.v1.message.Message.delete_by_id',
self.proxy.delete_message,
method_args=[message.Message],
expected_args=[message.Message])