diff --git a/marconi/common/access.py b/marconi/common/access.py new file mode 100644 index 000000000..0fb895147 --- /dev/null +++ b/marconi/common/access.py @@ -0,0 +1,21 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# TODO(cpp-cabrera): port to enum34 when that becomes available +class Access(object): + """An enumeration to represent access levels for APIs.""" + public = 1 + admin = 2 diff --git a/marconi/common/transport/__init__.py b/marconi/common/transport/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/common/transport/wsgi/__init__.py b/marconi/common/transport/wsgi/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/common/transport/wsgi/helpers.py b/marconi/common/transport/wsgi/helpers.py new file mode 100644 index 000000000..3b7427738 --- /dev/null +++ b/marconi/common/transport/wsgi/helpers.py @@ -0,0 +1,60 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""wsgi transport helpers.""" +import falcon + + +def extract_project_id(req, resp, params): + """Adds `project_id` to the list of params for all responders + + Meant to be used as a `before` hook. + + :param req: request sent + :type req: falcon.request.Request + :param resp: response object to return + :type resp: falcon.response.Response + :param params: additional parameters passed to responders + :type params: dict + :rtype: None + """ + params['project_id'] = req.get_header('X-PROJECT-ID') + if params['project_id'] == "": + raise falcon.HTTPBadRequest('Empty project header not allowed', + _(u''' +X-PROJECT-ID cannot be an empty string. Specify the right header X-PROJECT-ID +and retry.''')) + + +def require_accepts_json(req, resp, params): + """Raises an exception if the request does not accept JSON + + Meant to be used as a `before` hook. + + :param req: request sent + :type req: falcon.request.Request + :param resp: response object to return + :type resp: falcon.response.Response + :param params: additional parameters passed to responders + :type params: dict + :rtype: None + :raises: falcon.HTTPNotAcceptable + """ + if not req.client_accepts('application/json'): + raise falcon.HTTPNotAcceptable( + u''' +Endpoint only serves `application/json`; specify client-side +media type support with the "Accept" header.''', + href=u'http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html', + href_text=u'14.1 Accept, Hypertext Transfer Protocol -- HTTP/1.1') diff --git a/marconi/proxy/admin/__init__.py b/marconi/proxy/admin/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/proxy/admin/bootstrap.py b/marconi/proxy/admin/bootstrap.py new file mode 100644 index 000000000..fb269425a --- /dev/null +++ b/marconi/proxy/admin/bootstrap.py @@ -0,0 +1,24 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from marconi.common import access +from marconi.proxy import base + + +class Bootstrap(base.Bootstrap): + def __init__(self, config_file=None, cli_args=None): + super(Bootstrap, self).__init__(access.Access.admin, + config_file, + cli_args) diff --git a/marconi/proxy/bootstrap.py b/marconi/proxy/base.py similarity index 87% rename from marconi/proxy/bootstrap.py rename to marconi/proxy/base.py index 5f34b11cb..e5ca81d72 100644 --- a/marconi/proxy/bootstrap.py +++ b/marconi/proxy/base.py @@ -16,6 +16,7 @@ from oslo.config import cfg from stevedore import driver +from marconi.common import access from marconi.common.cache import cache as oslo_cache from marconi.common import config from marconi.common import decorators @@ -39,9 +40,13 @@ class Bootstrap(object): manages their lifetimes. """ - def __init__(self, config_file=None, cli_args=None): + def __init__(self, access_mode, config_file=None, cli_args=None): PROJECT_CFG.load(filename=config_file, args=cli_args) log.setup('marconi_proxy') + form = 'marconi.proxy.{0}.transport' + lookup = {access.Access.public: 'public', + access.Access.admin: 'admin'} + self._transport_type = form.format(lookup[access_mode]) @decorators.lazy_property(write=False) def storage(self): @@ -69,7 +74,7 @@ class Bootstrap(object): def transport(self): LOG.debug(_(u'Loading Proxy Transport Driver')) try: - mgr = driver.DriverManager('marconi.proxy.transport', + mgr = driver.DriverManager(self._transport_type, CFG.transport, invoke_on_load=True, invoke_args=[self.storage, diff --git a/marconi/proxy/public/__init__.py b/marconi/proxy/public/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/proxy/public/bootstrap.py b/marconi/proxy/public/bootstrap.py new file mode 100644 index 000000000..a22879a7d --- /dev/null +++ b/marconi/proxy/public/bootstrap.py @@ -0,0 +1,24 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from marconi.common import access +from marconi.proxy import base + + +class Bootstrap(base.Bootstrap): + def __init__(self, config_file=None, cli_args=None): + super(Bootstrap, self).__init__(access.Access.public, + config_file, + cli_args) diff --git a/marconi/proxy/storage/base.py b/marconi/proxy/storage/base.py index f3fd1a293..3ff521726 100644 --- a/marconi/proxy/storage/base.py +++ b/marconi/proxy/storage/base.py @@ -145,12 +145,13 @@ class CatalogueBase(ControllerBase): """ @abc.abstractmethod - def insert(self, project, queue, partition, metadata={}): + def insert(self, project, queue, partition, host, metadata={}): """Creates a new catalogue entry. :param project: str - Namespace to insert the given queue into :param queue: str - The name of the queue to insert :param partition: str - Partition name where this queue is stored + :param host: text - URL to representative host :param metadata: A dictionary of metadata for this queue """ raise NotImplementedError diff --git a/marconi/proxy/storage/memory/catalogue.py b/marconi/proxy/storage/memory/catalogue.py index cc9819124..0ff3038eb 100644 --- a/marconi/proxy/storage/memory/catalogue.py +++ b/marconi/proxy/storage/memory/catalogue.py @@ -26,8 +26,6 @@ class CatalogueController(base.CatalogueBase): def __init__(self, *args, **kwargs): super(CatalogueController, self).__init__(*args, **kwargs) - - self.driver.db['catalogue'] = {} self._col = self.driver.db['catalogue'] def list(self, project): @@ -47,10 +45,12 @@ class CatalogueController(base.CatalogueBase): return _idx(project, queue) in self._col def insert(self, project, queue, partition, host, metadata={}): - self._col[_idx(project, queue)] = { - 'p': project, 'q': queue, - 'n': partition, 'h': host, 'm': metadata - } + key = _idx(project, queue) + if key not in self._col: + self._col[key] = { + 'p': project, 'q': queue, + 'n': partition, 'h': host, 'm': metadata + } def delete(self, project, queue): try: diff --git a/marconi/proxy/storage/memory/driver.py b/marconi/proxy/storage/memory/driver.py index 2ff0ea6ce..554094fdb 100644 --- a/marconi/proxy/storage/memory/driver.py +++ b/marconi/proxy/storage/memory/driver.py @@ -20,6 +20,8 @@ class Driver(base.DriverBase): def __init__(self): self._db = {} + self._db['partitions'] = {} + self._db['catalogue'] = {} @property def db(self): diff --git a/marconi/proxy/storage/memory/partitions.py b/marconi/proxy/storage/memory/partitions.py index e8f9a8d62..c9ee44d8d 100644 --- a/marconi/proxy/storage/memory/partitions.py +++ b/marconi/proxy/storage/memory/partitions.py @@ -22,8 +22,6 @@ from marconi.proxy.storage import exceptions class PartitionsController(base.PartitionsBase): def __init__(self, *args, **kwargs): super(PartitionsController, self).__init__(*args, **kwargs) - - self.driver.db['partitions'] = {} self._col = self.driver.db['partitions'] def list(self): diff --git a/marconi/proxy/storage/mongodb/catalogue.py b/marconi/proxy/storage/mongodb/catalogue.py index e1aaa543b..f6a73e52b 100644 --- a/marconi/proxy/storage/mongodb/catalogue.py +++ b/marconi/proxy/storage/mongodb/catalogue.py @@ -25,6 +25,7 @@ Schema: 'm': Metadata :: dict } """ +from pymongo import errors import marconi.openstack.common.log as logging from marconi.proxy.storage import base @@ -78,8 +79,11 @@ class CatalogueController(base.CatalogueBase): @utils.raises_conn_error def insert(self, project, queue, partition, host, metadata={}): - self._col.insert({'p': project, 'q': queue, - 'n': partition, 'h': host, 'm': metadata}) + try: + self._col.insert({'p': project, 'q': queue, + 'n': partition, 'h': host, 'm': metadata}) + except errors.DuplicateKeyError: + pass # duplicate insertions are not a problem @utils.raises_conn_error def delete(self, project, queue): diff --git a/marconi/proxy/transport/wsgi/__init__.py b/marconi/proxy/transport/wsgi/__init__.py index 7055f2604..3b7ea0247 100644 --- a/marconi/proxy/transport/wsgi/__init__.py +++ b/marconi/proxy/transport/wsgi/__init__.py @@ -1,6 +1,6 @@ """WSGI Proxy Transport Driver""" -from marconi.queues.transport.wsgi import driver +from marconi.proxy.transport.wsgi import driver # Hoist into package namespace -Driver = driver.Driver +Driver = driver.DriverBase diff --git a/marconi/proxy/transport/wsgi/admin/__init__.py b/marconi/proxy/transport/wsgi/admin/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/proxy/transport/wsgi/app.py b/marconi/proxy/transport/wsgi/admin/app.py similarity index 90% rename from marconi/proxy/transport/wsgi/app.py rename to marconi/proxy/transport/wsgi/admin/app.py index 8562c0fa0..340c02c61 100644 --- a/marconi/proxy/transport/wsgi/app.py +++ b/marconi/proxy/transport/wsgi/admin/app.py @@ -18,7 +18,7 @@ This app should be used by external WSGI containers. For example: - $ gunicorn marconi.proxy.transport.wsgi.app:app + $ gunicorn marconi.proxy.transport.wsgi.admin.app:app NOTE: As for external containers, it is necessary to put config files in the standard paths. There's @@ -26,6 +26,6 @@ no common way to specify / pass configuration files to the WSGI app when it is called from other apps. """ -from marconi.proxy import bootstrap +from marconi.proxy.admin import bootstrap app = bootstrap.Bootstrap().transport.app diff --git a/marconi/proxy/transport/wsgi/admin/driver.py b/marconi/proxy/transport/wsgi/admin/driver.py new file mode 100644 index 000000000..e4d24967e --- /dev/null +++ b/marconi/proxy/transport/wsgi/admin/driver.py @@ -0,0 +1,39 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""marconi-proxy (admin): interface for managing partitions.""" + +from marconi.proxy.transport.wsgi import ( + catalogue, driver, health, partitions, +) + + +class Driver(driver.DriverBase): + def __init__(self, storage, cache): + super(Driver, self).__init__(storage, cache) + + @property + def bridge(self): + return [ + ('/partitions', + partitions.Listing(self.partitions)), + ('/partitions/{partition}', + partitions.Resource(self.partitions)), + ('/catalogue', + catalogue.Listing(self.catalogue)), + ('/catalogue/{queue}', + catalogue.Resource(self.catalogue)), + ('/health', + health.Resource()) + ] diff --git a/marconi/proxy/transport/wsgi/driver.py b/marconi/proxy/transport/wsgi/driver.py index d533b9d87..e7c0548cf 100644 --- a/marconi/proxy/transport/wsgi/driver.py +++ b/marconi/proxy/transport/wsgi/driver.py @@ -12,34 +12,22 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -"""marconi-proxy: maintains a mapping from inserted queues to partitions - -Supports the following operator API: -- [GET] /v1/partitions - lists registered partitions -- [PUT|GET|DELETE] /v1/partitions/{partition} -- [GET] /v1/catalogue - -Running: -- configure marconi.conf appropriately -- gunicorn marconi.proxy.transport.wsgi.app:app -""" +"""marconi-proxy (base): Interface for driver implementations.""" +import abc from wsgiref import simple_server import falcon +import six from marconi.common import config +from marconi.common.transport.wsgi import helpers import marconi.openstack.common.log as logging from marconi.proxy import transport -from marconi.proxy.transport.wsgi import ( - catalogue, forward, health, metadata, - partitions, queues, v1, version -) +from marconi.proxy.transport.wsgi import version from marconi.proxy.utils import round_robin from marconi.queues.transport import auth -_VER = version.path() - OPTIONS = { 'bind': '0.0.0.0', 'port': 8889 @@ -54,92 +42,30 @@ WSGI_CFG = config.namespace('proxy:drivers:transport:wsgi').from_options( LOG = logging.getLogger(__name__) -# TODO(cpp-cabrera): refactor to avoid duplication with queues..wsgi -def _check_media_type(req, resp, params): - if not req.client_accepts('application/json'): - raise falcon.HTTPNotAcceptable( - u''' -Endpoint only serves `application/json`; specify client-side -media type support with the "Accept" header.''', - href=u'http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html', - href_text=u'14.1 Accept, Hypertext Transfer Protocol -- HTTP/1.1') - - -class Driver(transport.DriverBase): +@six.add_metaclass(abc.ABCMeta) +class DriverBase(transport.DriverBase): """Entry point to the proxy :param storage: storage driver to use + :type storage: marconi.proxy.storage.base.DriverBase :param cache: cache driver to use + :type cache: marconi.common.cache.backends.BaseCache """ def __init__(self, storage, cache): - super(Driver, self).__init__(storage, cache) + super(DriverBase, self).__init__(storage, cache) self.app = None - self._catalogue = self.storage.catalogue_controller - self._partitions = self.storage.partitions_controller - self._selector = round_robin.Selector() + self.catalogue = self.storage.catalogue_controller + self.partitions = self.storage.partitions_controller + self.selector = round_robin.Selector() self._init_routes() self._init_middleware() def _init_routes(self): - self.app = falcon.API(before=[_check_media_type]) - - # NOTE(cpp-cabrera): proxy-specififc routes - self.app.add_route(_VER + '/partitions', - partitions.Listing(self._partitions)) - self.app.add_route(_VER + '/partitions/{partition}', - partitions.Resource(self._partitions)) - self.app.add_route(_VER + '/catalogue', - catalogue.Listing(self._catalogue)) - self.app.add_route(_VER + '/catalogue/{queue}', - catalogue.Resource(self._catalogue)) - self.app.add_route(_VER + '/health', - health.Resource()) - - # NOTE(cpp-cabrera): queue handling routes - self.app.add_route(_VER + '/queues', - queues.Listing(self._catalogue)) - self.app.add_route(_VER + '/queues/{queue}', - queues.Resource(self._partitions, - self._catalogue, - self.cache, self._selector)) - - # NOTE(cpp-cabrera): Marconi forwarded routes - self.app.add_route(_VER, - v1.Resource(self._partitions)) - - # NOTE(cpp-cabrera): Marconi forwarded routes involving a queue - self.app.add_route(_VER + '/queues/{queue}/claims', - forward.ClaimCreate(self._partitions, - self._catalogue, - self.cache, - self._selector)) - - self.app.add_route(_VER + '/queues/{queue}/claims/{cid}', - forward.Claim(self._partitions, - self._catalogue, - self.cache, self._selector)) - - self.app.add_route(_VER + '/queues/{queue}/messages', - forward.MessageBulk(self._partitions, - self._catalogue, - self.cache, - self._selector)) - - self.app.add_route(_VER + '/queues/{queue}/messages/{mid}', - forward.Message(self._partitions, - self._catalogue, self.cache, - self._selector)) - - self.app.add_route(_VER + '/queues/{queue}/stats', - forward.Stats(self._partitions, - self._catalogue, - self.cache, self._selector)) - - self.app.add_route(_VER + '/queues/{queue}/metadata', - metadata.Resource(self._partitions, - self._catalogue, - self.cache, self._selector)) + version_path = version.path() + self.app = falcon.API(before=[helpers.require_accepts_json]) + for route, resource in self.bridge: + self.app.add_route(version_path + route, resource) # TODO(cpp-cabrera): refactor to avoid duplication with queues..wsgi def _init_middleware(self): @@ -150,6 +76,17 @@ class Driver(transport.DriverBase): strategy = auth.strategy(GLOBAL_CFG.auth_strategy) self.app = strategy.install(self.app, PROJECT_CFG.conf) + @abc.abstractproperty + def bridge(self): + """Constructs a list of route/responder pairs that can be used to + establish the functionality of this driver. + + Note: the routes should be unversioned. + + :rtype: [(str, falcon-compatible responser)] + """ + raise NotImplementedError + def listen(self): """Listens on the 'bind:port' as per the config.""" diff --git a/marconi/proxy/transport/wsgi/public/__init__.py b/marconi/proxy/transport/wsgi/public/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/proxy/transport/wsgi/public/app.py b/marconi/proxy/transport/wsgi/public/app.py new file mode 100644 index 000000000..0420a8db3 --- /dev/null +++ b/marconi/proxy/transport/wsgi/public/app.py @@ -0,0 +1,31 @@ +# Copyright (c) 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""WSGI App for WSGI Containers + +This app should be used by external WSGI +containers. For example: + + $ gunicorn marconi.proxy.transport.wsgi.public.app:app + +NOTE: As for external containers, it is necessary +to put config files in the standard paths. There's +no common way to specify / pass configuration files +to the WSGI app when it is called from other apps. +""" + +from marconi.proxy.public import bootstrap + +app = bootstrap.Bootstrap().transport.app diff --git a/marconi/proxy/transport/wsgi/public/driver.py b/marconi/proxy/transport/wsgi/public/driver.py new file mode 100644 index 000000000..eb9d93af7 --- /dev/null +++ b/marconi/proxy/transport/wsgi/public/driver.py @@ -0,0 +1,58 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""marconi-proxy (public): maps project/queue to partitions. + +Forwards requests to the appropriate marconi queues server. +""" + +from marconi.proxy.transport.wsgi import ( + driver, forward, health, metadata, + queues, v1 +) + + +class Driver(driver.DriverBase): + + def __init(self, storage, cache): + super(Driver, self).__init__(storage, cache) + + @property + def bridge(self): + forwarder_args = (self.partitions, self.catalogue, + self.cache, self.selector) + return [ + ('/health', health.Resource()), + + # NOTE(cpp-cabrera): queue handling routes + ('/queues', + queues.Listing(self.catalogue)), + ('/queues/{queue}', + queues.Resource(*forwarder_args)), + + # NOTE(cpp-cabrera): Marconi forwarded routes + ('/queues/{queue}/claims', + forward.ClaimCreate(*forwarder_args)), + ('/queues/{queue}/claims/{cid}', + forward.Claim(*forwarder_args)), + ('/queues/{queue}/messages', + forward.MessageBulk(*forwarder_args)), + ('/queues/{queue}/messages/{mid}', + forward.Message(*forwarder_args)), + ('/queues/{queue}/metadata', + metadata.Resource(*forwarder_args)), + ('/queues/{queue}/stats', + forward.Stats(*forwarder_args)), + ('', v1.Resource(self.partitions)) + ] diff --git a/marconi/queues/transport/wsgi/driver.py b/marconi/queues/transport/wsgi/driver.py index 2e4814cd9..569e8b322 100644 --- a/marconi/queues/transport/wsgi/driver.py +++ b/marconi/queues/transport/wsgi/driver.py @@ -17,6 +17,7 @@ import falcon from wsgiref import simple_server from marconi.common import config +from marconi.common.transport.wsgi import helpers import marconi.openstack.common.log as logging from marconi.queues import transport from marconi.queues.transport import auth @@ -42,25 +43,6 @@ WSGI_CFG = config.namespace('queues:drivers:transport:wsgi').from_options( LOG = logging.getLogger(__name__) -def _check_media_type(req, resp, params): - if not req.client_accepts('application/json'): - raise falcon.HTTPNotAcceptable( - u''' -Endpoint only serves `application/json`; specify client-side -media type support with the "Accept" header.''', - href=u'http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html', - href_text=u'14.1 Accept, Hypertext Transfer Protocol -- HTTP/1.1') - - -def _extract_project_id(req, resp, params): - params['project_id'] = req.get_header('X-PROJECT-ID') - if params['project_id'] == "": - raise falcon.HTTPBadRequest('Empty project header not allowed', - _(u''' -X-PROJECT-ID cannot be an empty string. Specify the right header X-PROJECT-ID -and retry.''')) - - class Driver(transport.DriverBase): def __init__(self, storage): @@ -71,7 +53,10 @@ class Driver(transport.DriverBase): def _init_routes(self): """Initialize URI routes to resources.""" - self.app = falcon.API(before=[_check_media_type, _extract_project_id]) + self.app = falcon.API(before=[ + helpers.require_accepts_json, + helpers.extract_project_id + ]) queue_controller = self.storage.queue_controller message_controller = self.storage.message_controller diff --git a/setup.cfg b/setup.cfg index b9e737504..ffa67efb6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -41,8 +41,11 @@ marconi.proxy.storage = memory = marconi.proxy.storage.memory.driver:Driver mongodb = marconi.proxy.storage.mongodb.driver:Driver -marconi.proxy.transport = - wsgi = marconi.proxy.transport.wsgi.driver:Driver +marconi.proxy.public.transport = + wsgi = marconi.proxy.transport.wsgi.public.driver:Driver + +marconi.proxy.admin.transport = + wsgi = marconi.proxy.transport.wsgi.admin.driver:Driver [nosetests] where=tests diff --git a/test-requirements.txt b/test-requirements.txt index d6d45735b..c1e68d949 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -8,6 +8,7 @@ mock>=1.0 ddt>=0.4.0 discover fixtures>=0.3.14 +httpretty>=0.6.3 python-subunit testrepository>=0.0.17 testtools>=0.9.32 diff --git a/tests/etc/wsgi_proxy_memory.conf b/tests/etc/wsgi_proxy_memory.conf new file mode 100644 index 000000000..2c179f315 --- /dev/null +++ b/tests/etc/wsgi_proxy_memory.conf @@ -0,0 +1,10 @@ +[DEFAULT] +debug = False +verbose = False + +[proxy:drivers] +transport = wsgi +storage = memory + +[proxy:drivers:transport:wsgi] +port = 8888 diff --git a/tests/unit/proxy/base.py b/tests/unit/proxy/base.py index 4e4037d6b..85dedeae4 100644 --- a/tests/unit/proxy/base.py +++ b/tests/unit/proxy/base.py @@ -14,30 +14,44 @@ # See the License for the specific language governing permissions and # limitations under the License. -import multiprocessing -from wsgiref import simple_server - from falcon import testing as ftest -from marconi.proxy import bootstrap +from marconi.proxy.admin import bootstrap as admin +from marconi.proxy.transport.wsgi import ( + queues, version +) +from marconi.proxy.utils import round_robin from tests.unit.queues.transport.wsgi import base class TestBase(base.TestBase): - def setUp(self): - super(base.TestBase, self).setUp() + config_filename = "wsgi_proxy_memory.conf" - self.proxy = bootstrap.Bootstrap() - self.app = self.proxy.transport.app + @classmethod + def setUpClass(cls): + super(TestBase, cls).setUpClass() + TestBase._proxy = admin.Bootstrap() + + TestBase._app = TestBase._proxy.transport.app + partitions_controller = TestBase._proxy.storage.partitions_controller + catalogue_controller = TestBase._proxy.storage.catalogue_controller + cache = TestBase._proxy.cache + selector = round_robin.Selector() + + # NOTE(cpp-cabrera): allow for queue creation: needed for + # catalogue tests + TestBase._app.add_route(version.path() + '/queues/{queue}', + queues.Resource(partitions_controller, + catalogue_controller, + cache, selector)) + + def setUp(self): + super(TestBase, self).setUp() + self.app = TestBase._app + self.proxy = TestBase._proxy self.srmock = ftest.StartResponseMock() - -def make_app_daemon(host, port, app): - httpd = simple_server.make_server(host, port, app) - process = multiprocessing.Process(target=httpd.serve_forever, - name='marconi_' + str(port)) - process.daemon = True - process.start() - - return process + @classmethod + def tearDownClass(cls): + super(TestBase, cls).tearDownClass() diff --git a/tests/unit/proxy/test_catalog.py b/tests/unit/proxy/test_catalog.py deleted file mode 100644 index 19bb06752..000000000 --- a/tests/unit/proxy/test_catalog.py +++ /dev/null @@ -1,99 +0,0 @@ -# Copyright (c) 2013 Rackspace, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# -# See the License for the specific language governing permissions and -# limitations under the License. -import json -import random - -import falcon - -from marconi.proxy import bootstrap as proxy_bootstrap -from marconi.queues import bootstrap - -import base # noqa - - -class CatalogTest(base.TestBase): - - servers = [] - - @classmethod - def setUpClass(cls): - ports = range(8900, 8903) - cls.proxy = proxy_bootstrap.Bootstrap() - app = bootstrap.Bootstrap().transport.app - cls.servers = [base.make_app_daemon('localhost', pt, app) - for pt in ports] - # TODO(cpp-cabrera): allow trailing slash - cls.urls = ['http://127.0.0.1:%d' % pt for pt in ports] - - @classmethod - def tearDownClass(cls): - for p in cls.servers: - p.terminate() - - def tearDown(self): - CatalogTest.proxy.cache.flush() - CatalogTest.proxy.storage.catalogue_controller.drop_all() - super(CatalogTest, self).tearDown() - - def __add_partitions(self): - for name, url, weight in zip( - [server.name for server in self.servers], - self.urls, - random.sample(xrange(100), len(self.urls))): - doc = {'hosts': [url], 'weight': weight} - self.simulate_put('/v1/partitions/' + name, - body=json.dumps(doc)) - self.assertEquals(self.srmock.status, falcon.HTTP_201) - - def test_simple(self): - path = '/v1/catalogue' - - # TODO(cpp-cabrera): use queue creating/deleting cmgrs - queue_names = ['arakawa', 'bridge'] - - # No catalog created yet - self.simulate_get(path) - self.assertEquals(self.srmock.status, falcon.HTTP_204) - - # TODO(cpp-cabrera): what if there is no partition? - - self.__add_partitions() - - # Queue is not touched - result = self.simulate_get('/v1/catalogue/' + queue_names[0]) - self.assertEquals(self.srmock.status, falcon.HTTP_404) - - # Create queues (and implicitly, catalog) - for name in queue_names: - self.simulate_put('/v1/queues/' + name) - self.assertEquals(self.srmock.status, falcon.HTTP_201) - - result = self.simulate_get(path) - self.assertEquals(self.srmock.status, falcon.HTTP_200) - - doc = json.loads(result[0]) - - for name in queue_names: - self.assertIn(name, doc) - self.assertIn(doc[name]['host'], self.urls) - - result = self.simulate_get('/v1/catalogue/' + name) - self.assertEquals(self.srmock.status, falcon.HTTP_200) - - each_doc = json.loads(result[0]) - self.assertEquals(each_doc, doc[name]) - - self.simulate_delete('/v1/queues/' + name) diff --git a/tests/unit/proxy/test_catalogue.py b/tests/unit/proxy/test_catalogue.py new file mode 100644 index 000000000..564e416f9 --- /dev/null +++ b/tests/unit/proxy/test_catalogue.py @@ -0,0 +1,89 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. +import json +import uuid + +import falcon +import httpretty + +import base # noqa + + +class CatalogueTest(base.TestBase): + + servers = [] + + @classmethod + def setUpClass(cls): + super(CatalogueTest, cls).setUpClass() + + def setUp(self): + super(CatalogueTest, self).setUp() + self.host = 'http://localhost:8000' + self.partition_name = str(uuid.uuid1()) + self.partition = '/v1/partitions/' + self.partition_name + + # create a partition + doc = {'weight': 100, 'hosts': [self.host]} + self.simulate_put(self.partition, body=json.dumps(doc)) + + def tearDown(self): + self.simulate_delete(self.partition) + super(CatalogueTest, self).tearDown() + + @classmethod + def tearDownClass(cls): + super(CatalogueTest, cls).tearDownClass() + + def test_list_empty(self): + self.simulate_get('/v1/catalogue') + self.assertEquals(self.srmock.status, falcon.HTTP_204) + + @httpretty.activate + def test_simple(self): + queues = ['arakawa', 'bridge'] + + self.simulate_get('/v1/catalogue/' + queues[0]) + self.assertEquals(self.srmock.status, falcon.HTTP_404) + + # Create queues + for name in queues: + uri = '{0}/v1/queues/{1}'.format(self.host, name) + httpretty.register_uri(httpretty.PUT, uri, status=201) + + self.simulate_put('/v1/queues/' + name) + self.assertEqual(self.srmock.status, falcon.HTTP_201) + + for name in queues: + # mock out forwarding + uri = '{0}/v1/queues/{1}'.format(self.host, name) + httpretty.register_uri(httpretty.DELETE, uri, status=204) + + # fetch from the catalogue + result = self.simulate_get('/v1/catalogue/' + name) + data = json.loads(result[0]) + self.assertEqual(data['name'], name) + self.assertEqual(data['partition'], self.partition_name) + self.assertEqual(data['host'], self.host) + self.assertEquals(self.srmock.status, falcon.HTTP_200) + + # delete queues, implicitly removing from catalogue + self.simulate_delete('/v1/queues/' + name) + self.assertEqual(self.srmock.status, falcon.HTTP_204) + + # ensure entries were removed from catalogue + self.simulate_get('/v1/catalogue/' + name) + self.assertEqual(self.srmock.status, falcon.HTTP_404) diff --git a/tests/unit/proxy/test_partitions.py b/tests/unit/proxy/test_partitions.py index 1101a87a1..199e21e06 100644 --- a/tests/unit/proxy/test_partitions.py +++ b/tests/unit/proxy/test_partitions.py @@ -24,6 +24,10 @@ import base # noqa class PartitionTest(base.TestBase): + @classmethod + def setUpClass(cls): + super(PartitionTest, cls).setUpClass() + def setUp(self): super(PartitionTest, self).setUp() self.path = '/v1/partitions' @@ -35,6 +39,10 @@ class PartitionTest(base.TestBase): self.proxy.storage.partitions_controller.drop_all() super(PartitionTest, self).tearDown() + @classmethod + def tearDownClass(cls): + super(PartitionTest, cls).tearDownClass() + def test_simple(self): # No partition self.simulate_get(self.partition)