diff --git a/marconi/storage/mongodb/utils.py b/marconi/storage/mongodb/utils.py index 624ead20e..35a05d66c 100644 --- a/marconi/storage/mongodb/utils.py +++ b/marconi/storage/mongodb/utils.py @@ -123,7 +123,7 @@ def to_oid(obj): try: return objectid.ObjectId(obj) except (TypeError, berrors.InvalidId): - msg = _('Wrong id %s') % obj + msg = _('Invalid oid: %s') % obj raise storage_exceptions.MalformedID(msg) diff --git a/marconi/storage/sqlite/controllers.py b/marconi/storage/sqlite/controllers.py index 89df82e35..c7c473d21 100644 --- a/marconi/storage/sqlite/controllers.py +++ b/marconi/storage/sqlite/controllers.py @@ -36,6 +36,10 @@ class Queue(base.QueueBase): def list(self, project, marker=None, limit=10, detailed=False): + + if project is None: + project = '' + sql = ((''' select name from Queues''' if not detailed else ''' @@ -68,6 +72,9 @@ class Queue(base.QueueBase): yield marker_name['next'] def get(self, name, project): + if project is None: + project = '' + try: return self.driver.get(''' select metadata from Queues @@ -77,6 +84,9 @@ class Queue(base.QueueBase): raise exceptions.QueueDoesNotExist(name, project) def upsert(self, name, metadata, project): + if project is None: + project = '' + with self.driver('immediate'): previous_record = self.driver.run(''' select id from Queues @@ -91,11 +101,17 @@ class Queue(base.QueueBase): return previous_record is None def delete(self, name, project): + if project is None: + project = '' + self.driver.run(''' delete from Queues where project = ? and name = ?''', project, name) def stats(self, name, project): + if project is None: + project = '' + with self.driver('deferred'): qid = _get_qid(self.driver, name, project) claimed, free = self.driver.get(''' @@ -144,6 +160,9 @@ class Message(base.MessageBase): ''') def get(self, queue, message_ids, project): + if project is None: + project = '' + if not isinstance(message_ids, list): message_ids = [message_ids] @@ -170,6 +189,9 @@ class Message(base.MessageBase): def list(self, queue, project, marker=None, limit=10, echo=False, client_uuid=None): + if project is None: + project = '' + with self.driver('deferred'): sql = ''' select id, content, ttl, julianday() * 86400.0 - created @@ -209,6 +231,9 @@ class Message(base.MessageBase): yield _marker_encode(marker_id['next']) def post(self, queue, messages, client_uuid, project): + if project is None: + project = '' + with self.driver('immediate'): qid = _get_qid(self.driver, queue, project) @@ -239,6 +264,9 @@ class Message(base.MessageBase): return map(_msgid_encode, range(unused, my['newid'])) def delete(self, queue, message_id, project, claim=None): + if project is None: + project = '' + id = _msgid_decode(message_id) if not claim: @@ -306,6 +334,9 @@ class Claim(base.ClaimBase): ''') def get(self, queue, claim_id, project): + if project is None: + project = '' + with self.driver('deferred'): try: id, ttl, age = self.driver.get(''' @@ -329,10 +360,13 @@ class Claim(base.ClaimBase): raise exceptions.ClaimDoesNotExist(claim_id, queue, project) def create(self, queue, metadata, project, limit=10): + if project is None: + project = '' + with self.driver('immediate'): qid = _get_qid(self.driver, queue, project) - # cleanup all expired claims in this queue + # Clean up all expired claims in this queue self.driver.run(''' delete from Claims @@ -377,6 +411,9 @@ class Claim(base.ClaimBase): } def update(self, queue, claim_id, metadata, project): + if project is None: + project = '' + try: id = _cid_decode(claim_id) except exceptions.MalformedID: @@ -414,6 +451,9 @@ class Claim(base.ClaimBase): ''', ttl, ttl, cid) def delete(self, queue, claim_id, project): + if project is None: + project = '' + try: cid = _cid_decode(claim_id) except exceptions.MalformedID: diff --git a/marconi/tests/transport/wsgi/base.py b/marconi/tests/transport/wsgi/base.py index e0e11ffb6..c1a066e88 100644 --- a/marconi/tests/transport/wsgi/base.py +++ b/marconi/tests/transport/wsgi/base.py @@ -37,6 +37,47 @@ class TestBase(util.TestBase): self.app = boot.transport.app self.srmock = testing.StartResponseMock() + def simulate_request(self, path, project_id=None, **kwargs): + """Simulate a request. + + Simulates a WSGI request to the API for testing. + + :param path: Request path for the desired resource + :param project_id: Project ID to use for the X-Project-ID header, + or None to not set the header + :param kwargs: Same as falcon.testing.create_environ() + + :returns: standard WSGI iterable response + """ + + if project_id is not None: + headers = dict(kwargs['headers']) if 'headers' in kwargs else {} + headers['X-Project-ID'] = project_id + kwargs['headers'] = headers + + return self.app(testing.create_environ(path=path, **kwargs), + self.srmock) + + def simulate_get(self, *args, **kwargs): + kwargs['method'] = 'GET' + return self.simulate_request(*args, **kwargs) + + def simulate_put(self, *args, **kwargs): + kwargs['method'] = 'PUT' + return self.simulate_request(*args, **kwargs) + + def simulate_post(self, *args, **kwargs): + kwargs['method'] = 'POST' + return self.simulate_request(*args, **kwargs) + + def simulate_delete(self, *args, **kwargs): + kwargs['method'] = 'DELETE' + return self.simulate_request(*args, **kwargs) + + def simulate_patch(self, *args, **kwargs): + kwargs['method'] = 'PATCH' + return self.simulate_request(*args, **kwargs) + class TestBaseFaulty(TestBase): diff --git a/marconi/tests/transport/wsgi/test_claims.py b/marconi/tests/transport/wsgi/test_claims.py index 612329e59..7fc7383a2 100644 --- a/marconi/tests/transport/wsgi/test_claims.py +++ b/marconi/tests/transport/wsgi/test_claims.py @@ -19,7 +19,6 @@ import os import pymongo import falcon -from falcon import testing from marconi.common import config from marconi.tests.transport.wsgi import base @@ -30,173 +29,118 @@ class ClaimsBaseTest(base.TestBase): def setUp(self): super(ClaimsBaseTest, self).setUp() + self.project_id = '480924' + self.queue_path = '/v1/queues/fizbit' + self.claims_path = self.queue_path + '/claims' + doc = '{"_ttl": 60 }' - env = testing.create_environ('/v1/480924/queues/fizbit', - method='PUT', body=doc) - self.app(env, self.srmock) + + self.simulate_put(self.queue_path, self.project_id, body=doc) + self.assertEquals(self.srmock.status, falcon.HTTP_201) doc = json.dumps([{'body': 239, 'ttl': 30}] * 10) + self.simulate_post(self.queue_path + '/messages', self.project_id, + body=doc, headers={'Client-ID': '30387f00'}) + self.assertEquals(self.srmock.status, falcon.HTTP_201) - env = testing.create_environ('/v1/480924/queues/fizbit/messages', - method='POST', - body=doc, - headers={'Client-ID': '30387f00'}) - self.app(env, self.srmock) + def tearDown(self): + self.simulate_delete(self.queue_path, self.project_id) + + super(ClaimsBaseTest, self).tearDown() def test_bad_claim(self): - env = testing.create_environ('/v1/480924/queues/fizbit/claims', - method='POST') - - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_400) - - env = testing.create_environ('/v1/480924/queues/fizbit/claims', - method='POST', body='[') - - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_400) - - env = testing.create_environ('/v1/480924/queues/fizbit/claims', - method='POST', body='{}') - - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_400) + for doc in (None, '[', '[]', '{}', '.', '"fail"'): + self.simulate_post(self.claims_path, self.project_id, + body=doc) + self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_bad_patch(self): - env = testing.create_environ('/v1/480924/queues/fizbit/claims', - method='POST', - body='{"ttl": 10}') - self.app(env, self.srmock) - target = self.srmock.headers_dict['Location'] + self.simulate_post(self.claims_path, self.project_id, + body='{"ttl": 10}') + href = self.srmock.headers_dict['Location'] - env = testing.create_environ(target, method='PATCH') - - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_400) - - env = testing.create_environ(target, method='PATCH', body='{') - - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_400) + for doc in (None, '[', '"crunchy"'): + self.simulate_patch(href, self.project_id, body=doc) + self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_lifecycle(self): doc = '{"ttl": 10}' - # claim some messages - - env = testing.create_environ('/v1/480924/queues/fizbit/claims', - method='POST', - body=doc) - - body = self.app(env, self.srmock) + # First, claim some messages + body = self.simulate_post(self.claims_path, self.project_id, body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_200) - st = json.loads(body[0]) - target = self.srmock.headers_dict['Location'] - [msg_target, params] = st[0]['href'].split('?') + claim = json.loads(body[0]) + claim_href = self.srmock.headers_dict['Location'] + message_href, params = claim[0]['href'].split('?') - # no more messages to claim - - env = testing.create_environ('/v1/480924/queues/fizbit/claims', - method='POST', - body=doc, - query_string='limit=3') - - self.app(env, self.srmock) + # No more messages to claim + self.simulate_post(self.claims_path, self.project_id, body=doc, + query_string='limit=3') self.assertEquals(self.srmock.status, falcon.HTTP_204) - # check its metadata - - env = testing.create_environ(target, method='GET') - - body = self.app(env, self.srmock) - st = json.loads(body[0]) + # Check the claim's metadata + body = self.simulate_get(claim_href, self.project_id) + claim = json.loads(body[0]) self.assertEquals(self.srmock.status, falcon.HTTP_200) self.assertEquals(self.srmock.headers_dict['Content-Location'], - env['PATH_INFO']) + claim_href) + self.assertEquals(claim['ttl'], 10) - self.assertEquals(st['ttl'], 10) - - # delete a message with its associated claim - - env = testing.create_environ(msg_target, query_string=params, - method='DELETE') - - self.app(env, self.srmock) + # Delete the message and its associated claim + self.simulate_delete(message_href, self.project_id, + query_string=params) self.assertEquals(self.srmock.status, falcon.HTTP_204) - env = testing.create_environ(msg_target, query_string=params) - - self.app(env, self.srmock) + # Try to get it from the wrong project + self.simulate_get(message_href, 'bogus_project', query_string=params) self.assertEquals(self.srmock.status, falcon.HTTP_404) - # update the claim + # Get the message + self.simulate_get(message_href, self.project_id, query_string=params) + self.assertEquals(self.srmock.status, falcon.HTTP_404) - env = testing.create_environ(target, - body='{"ttl": 60}', - method='PATCH') - - self.app(env, self.srmock) + # Update the claim + self.simulate_patch(claim_href, self.project_id, body='{"ttl": 60}') self.assertEquals(self.srmock.status, falcon.HTTP_204) - # get the claimed messages again + # Get the claimed messages (again) + body = self.simulate_get(claim_href, self.project_id) + claim = json.loads(body[0]) + message_href, params = claim['messages'][0]['href'].split('?') - env = testing.create_environ(target, method='GET') + self.assertEquals(claim['ttl'], 60) - body = self.app(env, self.srmock) - st = json.loads(body[0]) - [msg_target, params] = st['messages'][0]['href'].split('?') - - self.assertEquals(st['ttl'], 60) - - # delete the claim - - env = testing.create_environ(st['href'], method='DELETE') - - self.app(env, self.srmock) + # Delete the claim + self.simulate_delete(claim['href'], 'bad_id') self.assertEquals(self.srmock.status, falcon.HTTP_204) - # can not delete a message with a non-existing claim + self.simulate_delete(claim['href'], self.project_id) + self.assertEquals(self.srmock.status, falcon.HTTP_204) - env = testing.create_environ(msg_target, query_string=params, - method='DELETE') - - self.app(env, self.srmock) + # Try to delete a message with an invalid claim ID + self.simulate_delete(message_href, self.project_id, + query_string=params) self.assertEquals(self.srmock.status, falcon.HTTP_403) - env = testing.create_environ(msg_target, query_string=params) - - self.app(env, self.srmock) + # Make sure it wasn't deleted! + self.simulate_get(message_href, self.project_id, query_string=params) self.assertEquals(self.srmock.status, falcon.HTTP_200) - # get & update a non existing claim - - env = testing.create_environ(st['href'], method='GET') - - body = self.app(env, self.srmock) + # Try to get a claim that doesn't exist + self.simulate_get(claim['href']) self.assertEquals(self.srmock.status, falcon.HTTP_404) - env = testing.create_environ(st['href'], method='PATCH', body=doc) - - body = self.app(env, self.srmock) + # Try to update a claim that doesn't exist + self.simulate_patch(claim['href'], body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_404) def test_nonexistent(self): - doc = '{"ttl": 10}' - env = testing.create_environ('/v1/480924/queues/nonexistent/claims', - method='POST', body=doc) - - self.app(env, self.srmock) + self.simulate_post('/v1/queues/nonexistent/claims', self.project_id, + body='{"ttl": 10}') self.assertEquals(self.srmock.status, falcon.HTTP_404) - def tearDown(self): - env = testing.create_environ('/v1/480924/queues/fizbit', - method='DELETE') - self.app(env, self.srmock) - - super(ClaimsBaseTest, self).tearDown() - class ClaimsMongoDBTests(ClaimsBaseTest): @@ -225,32 +169,18 @@ class ClaimsFaultyDriverTests(base.TestBaseFaulty): config_filename = 'wsgi_faulty.conf' def test_simple(self): + project_id = '480924' + claims_path = '/v1/queues/fizbit/claims' doc = '{"ttl": 100}' - env = testing.create_environ('/v1/480924/queues/fizbit/claims', - method='POST', - body=doc) - self.app(env, self.srmock) + self.simulate_post(claims_path, project_id, body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_503) - env = testing.create_environ('/v1/480924/queues/fizbit/claims' - '/nonexistent', - method='GET') - - self.app(env, self.srmock) + self.simulate_get(claims_path + '/nichts', project_id) self.assertEquals(self.srmock.status, falcon.HTTP_503) - env = testing.create_environ('/v1/480924/queues/fizbit/claims' - '/nonexistent', - method='PATCH', - body=doc) + self.simulate_patch(claims_path, project_id, body=doc) + self.assertEquals(self.srmock.status, falcon.HTTP_405) - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_503) - - env = testing.create_environ('/v1/480924/queues/fizbit/claims' - '/nonexistent', - method='DELETE') - - self.app(env, self.srmock) + self.simulate_delete(claims_path + '/foo', project_id) self.assertEquals(self.srmock.status, falcon.HTTP_503) diff --git a/marconi/tests/transport/wsgi/test_messages.py b/marconi/tests/transport/wsgi/test_messages.py index e136233dd..3350a02a6 100644 --- a/marconi/tests/transport/wsgi/test_messages.py +++ b/marconi/tests/transport/wsgi/test_messages.py @@ -17,7 +17,6 @@ import json import os import falcon -from falcon import testing from marconi.tests.transport.wsgi import base @@ -27,20 +26,18 @@ class MessagesBaseTest(base.TestBase): def setUp(self): super(MessagesBaseTest, self).setUp() + self.project_id = '7e55e1a7e' + self.queue_path = '/v1/queues/fizbit' + doc = '{"_ttl": 60}' - env = testing.create_environ('/v1/480924/queues/fizbit', - method='PUT', body=doc) - self.app(env, self.srmock) + self.simulate_put(self.queue_path, self.project_id, body=doc) self.headers = { 'Client-ID': '30387f00', } def tearDown(self): - env = testing.create_environ('/v1/480924/queues/fizbit', - method='DELETE') - self.app(env, self.srmock) - + self.simulate_delete(self.queue_path, self.project_id) super(MessagesBaseTest, self).tearDown() def test_post(self): @@ -52,28 +49,22 @@ class MessagesBaseTest(base.TestBase): ] """ - queue_path = '/v1/480924/queues/fizbit' - messages_path = queue_path + '/messages' - env = testing.create_environ(messages_path, - method='POST', - body=doc, - headers=self.headers) - - body = self.app(env, self.srmock) + messages_path = self.queue_path + '/messages' + result = self.simulate_post(messages_path, self.project_id, + body=doc, headers=self.headers) self.assertEquals(self.srmock.status, falcon.HTTP_201) + result_doc = json.loads(result[0]) + msg_ids = self._get_msg_ids(self.srmock.headers_dict) - print msg_ids self.assertEquals(len(msg_ids), 3) - body = json.loads(body[0]) expected_resources = [unicode(messages_path + '/' + id) for id in msg_ids] - self.assertEquals(expected_resources, body['resources']) - self.assertFalse(body['partial']) + self.assertEquals(expected_resources, result_doc['resources']) + self.assertFalse(result_doc['partial']) sample_messages = json.loads(doc) - self.assertEquals(len(msg_ids), len(sample_messages)) lookup = dict([(m['ttl'], m['body']) for m in sample_messages]) @@ -81,99 +72,72 @@ class MessagesBaseTest(base.TestBase): # Test GET on the message resource directly for msg_id in msg_ids: message_uri = messages_path + '/' + msg_id - env = testing.create_environ(message_uri, method='GET') - body = self.app(env, self.srmock)[0] + # Wrong project ID + self.simulate_get(message_uri, '777777') + self.assertEquals(self.srmock.status, falcon.HTTP_404) + + # Correct project ID + result = self.simulate_get(message_uri, self.project_id) self.assertEquals(self.srmock.status, falcon.HTTP_200) self.assertEquals(self.srmock.headers_dict['Content-Location'], message_uri) - msg = json.loads(body) - self.assertEquals(msg['href'], message_uri) - self.assertEquals(msg['body'], lookup[msg['ttl']]) + message = json.loads(result[0]) + self.assertEquals(message['href'], message_uri) + self.assertEquals(message['body'], lookup[message['ttl']]) # Test bulk GET query_string = 'ids=' + ','.join(msg_ids) - env = testing.create_environ(queue_path, method='GET', - query_string=query_string) + result = self.simulate_get(self.queue_path, self.project_id, + query_string=query_string) + self.assertEquals(self.srmock.status, falcon.HTTP_200) - body = self.app(env, self.srmock)[0] - document = json.loads(body) + result_doc = json.loads(result[0]) expected_ttls = set(m['ttl'] for m in sample_messages) - actual_ttls = set(m['ttl'] for m in document) + actual_ttls = set(m['ttl'] for m in result_doc) self.assertFalse(expected_ttls - actual_ttls) def test_post_to_mia_queue(self): - self._post_messages('/v1/480924/queues/nonexistent/messages') + self._post_messages('/v1/queues/nonexistent/messages') self.assertEquals(self.srmock.status, falcon.HTTP_404) def test_post_bad_message(self): - env = testing.create_environ('/v1/480924/queues/fizbit/messages', - method='POST', - headers=self.headers) + for document in (None, '[', '[]', '{}', '.'): + self.simulate_post(self.queue_path + '/messages', + body=document, + headers=self.headers) - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_400) - - env = testing.create_environ('/v1/480924/queues/fizbit/messages', - method='POST', - body='[', - headers=self.headers) - - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_400) - - env = testing.create_environ('/v1/480924/queues/fizbit/messages', - method='POST', - body='[]', - headers=self.headers) - - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_400) - - env = testing.create_environ('/v1/480924/queues/fizbit/messages', - method='POST', - body='{}', - headers=self.headers) - - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_400) + self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_delete(self): - self._post_messages('/v1/480924/queues/fizbit/messages') + path = self.queue_path + '/messages' + self._post_messages(path) # NOTE(kgriffs): This implictly tests that posting a single # message returns a message resource, not a queue resource. msg_id = self._get_msg_id(self.srmock.headers_dict) - env = testing.create_environ('/v1/480924/queues/fizbit/messages/' - + msg_id, method='GET') - - self.app(env, self.srmock) + self.simulate_get(path + '/' + msg_id, self.project_id) self.assertEquals(self.srmock.status, falcon.HTTP_200) - env = testing.create_environ('/v1/480924/queues/fizbit/messages/' - + msg_id, method='DELETE') - - self.app(env, self.srmock) + self.simulate_delete(path + '/' + msg_id, self.project_id) self.assertEquals(self.srmock.status, falcon.HTTP_204) - env = testing.create_environ('/v1/480924/queues/fizbit/messages/' - + msg_id, method='GET') - - self.app(env, self.srmock) + self.simulate_get(path + '/' + msg_id, self.project_id) self.assertEquals(self.srmock.status, falcon.HTTP_404) def test_list(self): - self._post_messages('/v1/480924/queues/fizbit/messages', repeat=10) + path = self.queue_path + '/messages' + self._post_messages(path, repeat=10) - env = testing.create_environ('/v1/480924/queues/fizbit/messages', - query_string='limit=3&echo=true', - headers=self.headers) + query_string = 'limit=3&echo=true' + body = self.simulate_get(path, self.project_id, + query_string=query_string, + headers=self.headers) - body = self.app(env, self.srmock) self.assertEquals(self.srmock.headers_dict['Content-Location'], - env['PATH_INFO'] + '?' + env['QUERY_STRING']) + path + '?' + query_string) cnt = 0 while self.srmock.status == falcon.HTTP_200: @@ -181,68 +145,55 @@ class MessagesBaseTest(base.TestBase): [target, params] = contents['links'][0]['href'].split('?') for msg in contents['messages']: - env = testing.create_environ(msg['href']) - self.app(env, self.srmock) + self.simulate_get(msg['href'], self.project_id) self.assertEquals(self.srmock.status, falcon.HTTP_200) - env = testing.create_environ(target, - query_string=params, - headers=self.headers) - body = self.app(env, self.srmock) + body = self.simulate_get(target, self.project_id, + query_string=params, + headers=self.headers) cnt += 1 self.assertEquals(cnt, 4) self.assertEquals(self.srmock.status, falcon.HTTP_204) # Stats - env = testing.create_environ('/v1/480924/queues/fizbit/stats') - - body = self.app(env, self.srmock) - countof = json.loads(body[0]) - + body = self.simulate_get(self.queue_path + '/stats', self.project_id) self.assertEquals(self.srmock.status, falcon.HTTP_200) + + countof = json.loads(body[0]) self.assertEquals(self.srmock.headers_dict['Content-Location'], - env['PATH_INFO']) + self.queue_path + '/stats') self.assertEquals(countof['messages']['free'], 10) - env = testing.create_environ('/v1/480924/queues/nonexistent/messages', - headers=self.headers) - - body = self.app(env, self.srmock) + self.simulate_get('/v1/queues/nonexistent/messages', self.project_id, + headers=self.headers) self.assertEquals(self.srmock.status, falcon.HTTP_404) def test_list_with_bad_marker(self): - self._post_messages('/v1/480924/queues/fizbit/messages', repeat=5) - query_string = 'limit=3&echo=true&marker=sfhlsfdjh2048' - env = testing.create_environ('/v1/480924/queues/fizbit/messages', - query_string=query_string, - headers=self.headers) + path = self.queue_path + '/messages' + self._post_messages(path, repeat=5) + + query_string = 'limit=3&echo=true&marker=sfhlsfdjh2048' + self.simulate_get(path, self.project_id, + query_string=query_string, + headers=self.headers) - self.app(env, self.srmock) self.assertEqual(self.srmock.status, falcon.HTTP_400) def test_no_uuid(self): - env = testing.create_environ('/v1/480924/queues/fizbit/messages', - method='POST', - body='[{"body": 0, "ttl": 0}]') + path = self.queue_path + '/messages' + + self.simulate_post(path, '7e7e7e', body='[{"body": 0, "ttl": 0}]') - self.app(env, self.srmock) self.assertEquals(self.srmock.status, falcon.HTTP_400) - env = testing.create_environ('/v1/480924/queues/fizbit/messages', - method='GET') - - self.app(env, self.srmock) + self.simulate_get(path, '7e7e7e') self.assertEquals(self.srmock.status, falcon.HTTP_400) def _post_messages(self, target, repeat=1): doc = json.dumps([{'body': 239, 'ttl': 30}] * repeat) - - env = testing.create_environ(target, - method='POST', - body=doc, - headers=self.headers) - self.app(env, self.srmock) + self.simulate_post(target, self.project_id, body=doc, + headers=self.headers) def _get_msg_id(self, headers): return headers['Location'].rsplit('/', 1)[-1] @@ -272,36 +223,24 @@ class MessagesFaultyDriverTests(base.TestBaseFaulty): config_filename = 'wsgi_faulty.conf' def test_simple(self): + project_id = 'xyz' + path = '/v1/queues/fizbit/messages' doc = '[{"body": 239, "ttl": 10}]' headers = { 'Client-ID': '30387f00', } - env = testing.create_environ('/v1/480924/queues/fizbit/messages', - method='POST', - body=doc, - headers=headers) - - self.app(env, self.srmock) + self.simulate_post(path, project_id, + body=doc, + headers=headers) self.assertEquals(self.srmock.status, falcon.HTTP_503) - env = testing.create_environ('/v1/480924/queues/fizbit/messages', - method='GET', - headers=headers) - - self.app(env, self.srmock) + self.simulate_get(path, project_id, + headers=headers) self.assertEquals(self.srmock.status, falcon.HTTP_503) - env = testing.create_environ('/v1/480924/queues/fizbit/messages' - '/nonexistent', - method='GET') - - self.app(env, self.srmock) + self.simulate_get(path + '/nonexistent', project_id) self.assertEquals(self.srmock.status, falcon.HTTP_503) - env = testing.create_environ('/v1/480924/queues/fizbit/messages' - '/nonexistent', - method='DELETE') - - self.app(env, self.srmock) + self.simulate_delete(path + '/nada', project_id) self.assertEquals(self.srmock.status, falcon.HTTP_503) diff --git a/marconi/tests/transport/wsgi/test_queue_lifecycle.py b/marconi/tests/transport/wsgi/test_queue_lifecycle.py index 6376e4351..1d0c948b0 100644 --- a/marconi/tests/transport/wsgi/test_queue_lifecycle.py +++ b/marconi/tests/transport/wsgi/test_queue_lifecycle.py @@ -18,7 +18,6 @@ import json import os import falcon -from falcon import testing import pymongo from marconi.common import config @@ -31,76 +30,54 @@ class QueueLifecycleBaseTest(base.TestBase): config_filename = None def test_simple(self): - doc = '{"messages": {"ttl": 600}}' + path = '/v1/queues/gumshoe' - # Create - env = testing.create_environ('/v1/480924/queues/gumshoe', - method='PUT', body=doc) + for project_id in ('480924', 'foo', '', None): + # Create + doc = '{"messages": {"ttl": 600}}' + self.simulate_put(path, project_id, body=doc) + self.assertEquals(self.srmock.status, falcon.HTTP_201) - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_201) + location = ('Location', '/v1/queues/gumshoe') + self.assertIn(location, self.srmock.headers) - location = ('Location', '/v1/480924/queues/gumshoe') - self.assertIn(location, self.srmock.headers) + result = self.simulate_get(path, project_id) + result_doc = json.loads(result[0]) + self.assertEquals(self.srmock.status, falcon.HTTP_200) + self.assertEquals(result_doc, json.loads(doc)) - env = testing.create_environ('/v1/480924/queues/gumshoe') - result = self.app(env, self.srmock) - result_doc = json.loads(result[0]) - self.assertEquals(self.srmock.status, falcon.HTTP_200) - self.assertEquals(result_doc, json.loads(doc)) + # Delete + self.simulate_delete(path, project_id) + self.assertEquals(self.srmock.status, falcon.HTTP_204) - # Delete - env = testing.create_environ('/v1/480924/queues/gumshoe', - method='DELETE') - - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_204) - - # Get non-existing - env = testing.create_environ('/v1/480924/queues/gumshoe') - - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_404) + # Get non-existing + self.simulate_get(path, project_id) + self.assertEquals(self.srmock.status, falcon.HTTP_404) def test_no_metadata(self): - env = testing.create_environ('/v1/480924/queues/fizbat', method='PUT') - - self.app(env, self.srmock) + self.simulate_put('/v1/queues/fizbat') self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_bad_metadata(self): - env = testing.create_environ('/v1/480924/queues/fizbat', - body='{', - method='PUT') - - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_400) - - env = testing.create_environ('/v1/480924/queues/fizbat', - body='[]', - method='PUT') - - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_400) + for document in ('{', '[]', '.', ' ', ''): + self.simulate_put('/v1/queues/fizbat', '7e55e1a7e', + body=document) + self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_too_much_metadata(self): doc = '{"messages": {"ttl": 600}, "padding": "%s"}' padding_len = transport.MAX_QUEUE_METADATA_SIZE - (len(doc) - 2) + 1 doc = doc % ('x' * padding_len) - env = testing.create_environ('/v1/480924/queues/fizbat', - method='PUT', body=doc) - self.app(env, self.srmock) + self.simulate_put('/v1/queues/fizbat', 'deadbeef', body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_way_too_much_metadata(self): doc = '{"messages": {"ttl": 600}, "padding": "%s"}' padding_len = transport.MAX_QUEUE_METADATA_SIZE * 100 doc = doc % ('x' * padding_len) - env = testing.create_environ('/v1/480924/queues/gumshoe', - method='PUT', body=doc) - self.app(env, self.srmock) + self.simulate_put('/v1/queues/fizbat', 'deadbeef', body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_custom_metadata(self): @@ -108,103 +85,84 @@ class QueueLifecycleBaseTest(base.TestBase): doc = '{"messages": {"ttl": 600}, "padding": "%s"}' padding_len = transport.MAX_QUEUE_METADATA_SIZE - (len(doc) - 2) doc = doc % ('x' * padding_len) - env = testing.create_environ('/v1/480924/queues/gumshoe', - method='PUT', body=doc) - - self.app(env, self.srmock) + self.simulate_put('/v1/queues/fizbat', '480924', body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_201) # Get - env = testing.create_environ('/v1/480924/queues/gumshoe') - result = self.app(env, self.srmock) + result = self.simulate_get('/v1/queues/fizbat', '480924') result_doc = json.loads(result[0]) self.assertEquals(result_doc, json.loads(doc)) def test_update_metadata(self): + path = '/v1/queues/xyz' + project_id = '480924' + # Create doc1 = '{"messages": {"ttl": 600}}' - env = testing.create_environ('/v1/480924/queues/xyz', - method='PUT', body=doc1) - - self.app(env, self.srmock) + self.simulate_put(path, project_id, body=doc1) self.assertEquals(self.srmock.status, falcon.HTTP_201) # Update doc2 = '{"messages": {"ttl": 100}}' - env = testing.create_environ('/v1/480924/queues/xyz', - method='PUT', body=doc2) - - self.app(env, self.srmock) + self.simulate_put(path, project_id, body=doc2) self.assertEquals(self.srmock.status, falcon.HTTP_204) # Get - env = testing.create_environ('/v1/480924/queues/xyz') - result = self.app(env, self.srmock) + result = self.simulate_get(path, project_id) result_doc = json.loads(result[0]) self.assertEquals(result_doc, json.loads(doc2)) self.assertEquals(self.srmock.headers_dict['Content-Location'], - env['PATH_INFO']) + path) def test_list(self): - # List empty - env = testing.create_environ('/v1/480924/queues') + project_id = '644079696574693' + alt_project_id = '644079696574694' - self.app(env, self.srmock) + # List empty + self.simulate_get('/v1/queues', project_id) self.assertEquals(self.srmock.status, falcon.HTTP_204) # Create some - env = testing.create_environ('/v1/480924/queues/q1', - method='PUT', - body='{"_ttl": 30 }') - self.app(env, self.srmock) + self.simulate_put('/v1/queues/q1', project_id, body='{"_ttl": 30 }') + self.simulate_put('/v1/queues/q2', project_id, body='{}') + self.simulate_put('/v1/queues/q3', project_id, body='{"_ttl": 30 }') - env = testing.create_environ('/v1/480924/queues/q2', - method='PUT', - body='{}') - self.app(env, self.srmock) + # List (no metadata) + result = self.simulate_get('/v1/queues', project_id, + query_string='limit=2') - env = testing.create_environ('/v1/480924/queues/q3', - method='PUT', - body='{"_ttl": 30 }') - self.app(env, self.srmock) - - # List - env = testing.create_environ('/v1/480924/queues', - query_string='limit=2') - - result = self.app(env, self.srmock) result_doc = json.loads(result[0]) [target, params] = result_doc['links'][0]['href'].split('?') self.assertEquals(self.srmock.status, falcon.HTTP_200) self.assertEquals(self.srmock.headers_dict['Content-Location'], - env['PATH_INFO'] + '?' + env['QUERY_STRING']) + '/v1/queues?limit=2') for queue in result_doc['queues']: - env = testing.create_environ(queue['href']) - self.app(env, self.srmock) + self.simulate_get(queue['href'], project_id) self.assertEquals(self.srmock.status, falcon.HTTP_200) + + self.simulate_get(queue['href'], alt_project_id) + self.assertEquals(self.srmock.status, falcon.HTTP_404) + self.assertNotIn('metadata', queue) # List with metadata - env = testing.create_environ(target, - query_string=params + '&detailed=true') + result = self.simulate_get('/v1/queues', project_id, + query_string=params + '&detailed=true') - result = self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_200) result_doc = json.loads(result[0]) [target, params] = result_doc['links'][0]['href'].split('?') [queue] = result_doc['queues'] - env = testing.create_environ(queue['href']) - result = self.app(env, self.srmock) + result = self.simulate_get(queue['href'], project_id) result_doc = json.loads(result[0]) self.assertEquals(result_doc, queue['metadata']) # List tail - env = testing.create_environ(target, query_string=params) - - self.app(env, self.srmock) + self.simulate_get(target, project_id, query_string=params) self.assertEquals(self.srmock.status, falcon.HTTP_204) @@ -235,36 +193,24 @@ class QueueFaultyDriverTests(base.TestBaseFaulty): config_filename = 'wsgi_faulty.conf' def test_simple(self): + path = '/v1/queues/gumshoe' doc = '{"messages": {"ttl": 600}}' - env = testing.create_environ('/v1/480924/queues/gumshoe', - method='PUT', body=doc) - - self.app(env, self.srmock) + self.simulate_put(path, '480924', body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_503) - location = ('Location', '/v1/480924/queues/gumshoe') + location = ('Location', path) self.assertNotIn(location, self.srmock.headers) - env = testing.create_environ('/v1/480924/queues/gumshoe') - result = self.app(env, self.srmock) + result = self.simulate_get(path, '480924') result_doc = json.loads(result[0]) self.assertEquals(self.srmock.status, falcon.HTTP_503) self.assertNotEquals(result_doc, json.loads(doc)) - env = testing.create_environ('/v1/480924/queues/gumshoe/stats') - self.app(env, self.srmock) + self.simulate_get(path + '/stats', '480924') self.assertEquals(self.srmock.status, falcon.HTTP_503) - env = testing.create_environ('/v1/480924/queues') - self.app(env, self.srmock) + self.simulate_get('/v1/queues', '480924') self.assertEquals(self.srmock.status, falcon.HTTP_503) - env = testing.create_environ('/v1/480924/queues/gumshoe', - method='DELETE') - self.app(env, self.srmock) - self.assertEquals(self.srmock.status, falcon.HTTP_503) - - def test_bad_document(self): - env = testing.create_environ('/v1/480924/queues/bad-doc') - self.app(env, self.srmock) + self.simulate_delete(path, '480924') self.assertEquals(self.srmock.status, falcon.HTTP_503) diff --git a/marconi/transport/wsgi/driver.py b/marconi/transport/wsgi/driver.py index 2a87a8bab..13f412f17 100644 --- a/marconi/transport/wsgi/driver.py +++ b/marconi/transport/wsgi/driver.py @@ -37,12 +37,16 @@ WSGI_CFG = config.namespace('drivers:transport:wsgi').from_options(**OPTIONS) LOG = logging.getLogger(__name__) +def _extract_project_id(req, resp, params): + params['project_id'] = req.get_header('X-PROJECT-ID') + + class Driver(transport.DriverBase): def __init__(self, storage): super(Driver, self).__init__(storage) - self.app = falcon.API() + self.app = falcon.API(before=_extract_project_id) queue_controller = self.storage.queue_controller message_controller = self.storage.message_controller @@ -50,31 +54,31 @@ class Driver(transport.DriverBase): # Queues Endpoints queue_collection = queues.CollectionResource(queue_controller) - self.app.add_route('/v1/{project_id}/queues', queue_collection) + self.app.add_route('/v1/queues', queue_collection) queue_item = queues.ItemResource(queue_controller, message_controller) - self.app.add_route('/v1/{project_id}/queues/{queue_name}', queue_item) + self.app.add_route('/v1/queues/{queue_name}', queue_item) stats_endpoint = stats.Resource(queue_controller) - self.app.add_route('/v1/{project_id}/queues/{queue_name}' + self.app.add_route('/v1/queues/{queue_name}' '/stats', stats_endpoint) # Messages Endpoints msg_collection = messages.CollectionResource(message_controller) - self.app.add_route('/v1/{project_id}/queues/{queue_name}' + self.app.add_route('/v1/queues/{queue_name}' '/messages', msg_collection) msg_item = messages.ItemResource(message_controller) - self.app.add_route('/v1/{project_id}/queues/{queue_name}' + self.app.add_route('/v1/queues/{queue_name}' '/messages/{message_id}', msg_item) # Claims Endpoints claim_collection = claims.CollectionResource(claim_controller) - self.app.add_route('/v1/{project_id}/queues/{queue_name}' + self.app.add_route('/v1/queues/{queue_name}' '/claims', claim_collection) claim_item = claims.ItemResource(claim_controller) - self.app.add_route('/v1/{project_id}/queues/{queue_name}' + self.app.add_route('/v1/queues/{queue_name}' '/claims/{claim_id}', claim_item) # NOTE(flaper87): Install Auth diff --git a/marconi/transport/wsgi/messages.py b/marconi/transport/wsgi/messages.py index 976908e38..1694c3a11 100644 --- a/marconi/transport/wsgi/messages.py +++ b/marconi/transport/wsgi/messages.py @@ -173,13 +173,16 @@ class ItemResource(object): def on_get(self, req, resp, project_id, queue_name, message_id): try: - print message_id message = self.message_controller.get( queue_name, message_id, project=project_id).next() except StopIteration: + # Good project_id and queue, but no messages + raise falcon.HTTPNotFound() + except storage_exceptions.DoesNotExist: + # This can happen if the queue or project_id is invalid raise falcon.HTTPNotFound() except Exception as ex: LOG.exception(ex) diff --git a/marconi/transport/wsgi/queues.py b/marconi/transport/wsgi/queues.py index aa9e5dc95..0a58ce238 100644 --- a/marconi/transport/wsgi/queues.py +++ b/marconi/transport/wsgi/queues.py @@ -68,7 +68,6 @@ class ItemResource(object): base_path += '/' for each_message in messages: - print each_message each_message['href'] = base_path + each_message['id'] del each_message['id']