feat(transport.wsgi): Create or update queue
Still needs some error handling, but the happy path works. Change-Id: I9da6cd0c7a54693389e996a58c8b2f7664ac7b5e Trello: 116
This commit is contained in:
43
marconi/main.py
Normal file
43
marconi/main.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
# Copyright (c) 2013 Rackspace, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from marconi.common import config
|
||||||
|
from marconi.storage import sqlite as storage
|
||||||
|
from marconi.transport.wsgi import driver as wsgi
|
||||||
|
|
||||||
|
|
||||||
|
cfg = config.project('marconi').from_options()
|
||||||
|
|
||||||
|
|
||||||
|
class Main(object):
|
||||||
|
"""
|
||||||
|
Defines the Marconi Kernel
|
||||||
|
|
||||||
|
The Kernel loads up drivers per a given configuration, and manages their
|
||||||
|
lifetimes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, config_file=None):
|
||||||
|
#TODO(kgriffs): Error handling
|
||||||
|
cfg.load(config_file)
|
||||||
|
|
||||||
|
#TODO(kgriffs): Determine driver types from cfg
|
||||||
|
self.storage = storage.Driver()
|
||||||
|
self.transport = wsgi.Driver(self.storage.queue_controller,
|
||||||
|
self.storage.message_controller,
|
||||||
|
self.storage.claim_controller)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.transport.listen()
|
||||||
@@ -4,3 +4,6 @@ storage = reference
|
|||||||
|
|
||||||
[drivers:transport:wsgi]
|
[drivers:transport:wsgi]
|
||||||
port = 8888
|
port = 8888
|
||||||
|
|
||||||
|
[drivers:storage:reference]
|
||||||
|
database = :memory:
|
||||||
|
|||||||
0
marconi/tests/transport/__init__.py
Normal file
0
marconi/tests/transport/__init__.py
Normal file
0
marconi/tests/transport/wsgi/__init__.py
Normal file
0
marconi/tests/transport/wsgi/__init__.py
Normal file
118
marconi/tests/transport/wsgi/test_queue_lifecycle.py
Normal file
118
marconi/tests/transport/wsgi/test_queue_lifecycle.py
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
# Copyright (c) 2013 Rackspace, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
import falcon
|
||||||
|
from falcon import testing
|
||||||
|
from testtools import matchers
|
||||||
|
|
||||||
|
import marconi
|
||||||
|
from marconi.tests import util
|
||||||
|
from marconi import transport
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreateQueue(util.TestBase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestCreateQueue, self).setUp()
|
||||||
|
|
||||||
|
conf_file = self.conf_path('wsgi_reference.conf')
|
||||||
|
boot = marconi.Bootstrap(conf_file)
|
||||||
|
|
||||||
|
self.app = boot.transport.app
|
||||||
|
self.srmock = testing.StartResponseMock()
|
||||||
|
|
||||||
|
def test_simple(self):
|
||||||
|
doc = '{"messages": {"ttl": 600}}'
|
||||||
|
env = testing.create_environ('/v1/480924/queues/gumshoe',
|
||||||
|
method="PUT", body=doc)
|
||||||
|
|
||||||
|
self.app(env, self.srmock)
|
||||||
|
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||||
|
|
||||||
|
location = ('Location', '/v1/480924/queues/gumshoe')
|
||||||
|
self.assertThat(self.srmock.headers, matchers.Contains(location))
|
||||||
|
|
||||||
|
env = testing.create_environ('/v1/480924/queues/gumshoe')
|
||||||
|
result = self.app(env, self.srmock)
|
||||||
|
self.assertEquals(self.srmock.status, falcon.HTTP_200)
|
||||||
|
self.assertEquals(result, [doc])
|
||||||
|
|
||||||
|
def test_no_metadata(self):
|
||||||
|
env = testing.create_environ('/v1/480924/queues/fizbat', method="PUT")
|
||||||
|
|
||||||
|
self.app(env, self.srmock)
|
||||||
|
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||||
|
|
||||||
|
def test_too_much_metadata(self):
|
||||||
|
doc = '{"messages": {"ttl": 600}, "padding": "%s"}'
|
||||||
|
padding_len = transport.MAX_QUEUE_METADATA_SIZE - (len(doc) - 2) + 1
|
||||||
|
doc = doc % ('x' * padding_len)
|
||||||
|
env = testing.create_environ('/v1/480924/queues/fizbat',
|
||||||
|
method="PUT", body=doc)
|
||||||
|
|
||||||
|
self.app(env, self.srmock)
|
||||||
|
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||||
|
|
||||||
|
def test_way_too_much_metadata(self):
|
||||||
|
doc = '{"messages": {"ttl": 600}, "padding": "%s"}'
|
||||||
|
padding_len = transport.MAX_QUEUE_METADATA_SIZE * 100
|
||||||
|
doc = doc % ('x' * padding_len)
|
||||||
|
env = testing.create_environ('/v1/480924/queues/gumshoe',
|
||||||
|
method="PUT", body=doc)
|
||||||
|
|
||||||
|
self.app(env, self.srmock)
|
||||||
|
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||||
|
|
||||||
|
def test_custom_metadata(self):
|
||||||
|
# Set
|
||||||
|
doc = '{"messages": {"ttl": 600}, "padding": "%s"}'
|
||||||
|
padding_len = transport.MAX_QUEUE_METADATA_SIZE - (len(doc) - 2)
|
||||||
|
doc = doc % ('x' * padding_len)
|
||||||
|
env = testing.create_environ('/v1/480924/queues/gumshoe',
|
||||||
|
method="PUT", body=doc)
|
||||||
|
|
||||||
|
self.app(env, self.srmock)
|
||||||
|
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||||
|
|
||||||
|
# Get
|
||||||
|
env = testing.create_environ('/v1/480924/queues/gumshoe')
|
||||||
|
result = self.app(env, self.srmock)
|
||||||
|
result_doc = json.loads(result[0])
|
||||||
|
self.assertEquals(result_doc, json.loads(doc))
|
||||||
|
|
||||||
|
def test_update_metadata(self):
|
||||||
|
# Create
|
||||||
|
doc1 = '{"messages": {"ttl": 600}}'
|
||||||
|
env = testing.create_environ('/v1/480924/queues/xyz',
|
||||||
|
method="PUT", body=doc1)
|
||||||
|
|
||||||
|
self.app(env, self.srmock)
|
||||||
|
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||||
|
|
||||||
|
# Update
|
||||||
|
doc2 = '{"messages": {"ttl": 100}}'
|
||||||
|
env = testing.create_environ('/v1/480924/queues/xyz',
|
||||||
|
method="PUT", body=doc2)
|
||||||
|
|
||||||
|
self.app(env, self.srmock)
|
||||||
|
self.assertEquals(self.srmock.status, falcon.HTTP_204)
|
||||||
|
|
||||||
|
# Get
|
||||||
|
env = testing.create_environ('/v1/480924/queues/xyz')
|
||||||
|
result = self.app(env, self.srmock)
|
||||||
|
result_doc = json.loads(result[0])
|
||||||
|
self.assertEquals(result_doc, json.loads(doc2))
|
||||||
@@ -1,3 +1,7 @@
|
|||||||
"""Marconi Transport Drivers"""
|
"""Marconi Transport Drivers"""
|
||||||
|
|
||||||
|
MAX_QUEUE_METADATA_SIZE = 64 * 1024
|
||||||
|
"""Maximum metadata size per queue when serialized as JSON"""
|
||||||
|
|
||||||
|
|
||||||
from marconi.transport.base import DriverBase # NOQA
|
from marconi.transport.base import DriverBase # NOQA
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
"""WSGI Transport Driver"""
|
"""WSGI Transport Driver"""
|
||||||
|
|
||||||
from marconi.transport.wsgi.driver import Driver # NOQA
|
from marconi.transport.wsgi.driver import Driver # NOQA
|
||||||
|
from marconi.transport.wsgi.queues import QueuesResource # NOQA
|
||||||
|
|||||||
@@ -13,6 +13,8 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import falcon
|
||||||
|
|
||||||
from marconi.common import config
|
from marconi.common import config
|
||||||
from marconi import transport
|
from marconi import transport
|
||||||
|
|
||||||
@@ -25,20 +27,10 @@ class Driver(transport.DriverBase):
|
|||||||
def __init__(self, queue_controller, message_controller,
|
def __init__(self, queue_controller, message_controller,
|
||||||
claim_controller):
|
claim_controller):
|
||||||
|
|
||||||
# E.g.:
|
queues = transport.wsgi.QueuesResource(queue_controller)
|
||||||
#
|
|
||||||
# self._queue_controller.create(tenant_id, queue_name)
|
|
||||||
# self._queue_controller.set_metadata(tenant_id, queue_name, metadata)
|
|
||||||
#
|
|
||||||
self._queue_controller = queue_controller
|
|
||||||
self._message_controller = message_controller
|
|
||||||
self._claim_controller = claim_controller
|
|
||||||
|
|
||||||
# self.app = api = falcon.API()
|
self.app = api = falcon.API()
|
||||||
|
api.add_route('/v1/{tenant_id}/queues/{queue_name}', queues)
|
||||||
|
|
||||||
def listen(self):
|
def listen(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def app(self, env, start_response, exc_info=None):
|
|
||||||
"""This will be replace by falcon.API()."""
|
|
||||||
pass
|
|
||||||
|
|||||||
50
marconi/transport/wsgi/queues.py
Normal file
50
marconi/transport/wsgi/queues.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
# Copyright (c) 2013 Rackspace, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
import falcon
|
||||||
|
|
||||||
|
from marconi import transport
|
||||||
|
|
||||||
|
|
||||||
|
class QueuesResource(object):
|
||||||
|
|
||||||
|
__slots__ = ('queue_ctrl')
|
||||||
|
|
||||||
|
def __init__(self, queue_controller):
|
||||||
|
self.queue_ctrl = queue_controller
|
||||||
|
|
||||||
|
def on_put(self, req, resp, tenant_id, queue_name):
|
||||||
|
if req.content_length > transport.MAX_QUEUE_METADATA_SIZE:
|
||||||
|
raise falcon.HTTPBadRequest(_('Bad request'),
|
||||||
|
_('Queue metadata size is too large.'))
|
||||||
|
|
||||||
|
if req.content_length is None or req.content_length == 0:
|
||||||
|
raise falcon.HTTPBadRequest(_('Bad request'),
|
||||||
|
_('Missing queue metadata.'))
|
||||||
|
|
||||||
|
#TODO(kgriffs): check for malformed JSON, must be a hash at top level
|
||||||
|
meta = json.load(req.stream)
|
||||||
|
|
||||||
|
#TODO(kgriffs): catch other kinds of exceptions
|
||||||
|
created = self.queue_ctrl.upsert(queue_name, meta, tenant=tenant_id)
|
||||||
|
|
||||||
|
resp.status = falcon.HTTP_201 if created else falcon.HTTP_204
|
||||||
|
resp.location = req.path
|
||||||
|
|
||||||
|
def on_get(self, req, resp, tenant_id, queue_name):
|
||||||
|
doc = self.queue_ctrl.get(queue_name, tenant=tenant_id)
|
||||||
|
resp.body = json.dumps(doc)
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
cliff
|
cliff
|
||||||
|
falcon
|
||||||
oslo.config>=1.1.0
|
oslo.config>=1.1.0
|
||||||
pymongo
|
pymongo
|
||||||
|
|||||||
Reference in New Issue
Block a user