From 53c386af6897e460d61eb2fe5799e2fddda67571 Mon Sep 17 00:00:00 2001 From: Alejandro Cabrera Date: Thu, 26 Sep 2013 09:03:38 -0400 Subject: [PATCH] feat: split proxy API into admin and public apps Rationale: when deploying the proxy, operators should be the only ones that have access to the administrative functionality - primarily, the management of partitions. This split makes it possible to deploy the forwarding portion of the proxy independently of the administrative portion. The appropriate changes in tests, setup.cfg, and configuration files (see: I7cf25e47ecff47934b50c21000b31308e1a4c8a9) were made. Certain helpers that are reused for wsgi transport implementations were extracted to a common location to aid reuse. Unit tests were also simplified for the proxy, with the intent to make them more thorough in coming patches. New test requirement: httpretty - used to perform request mocking during proxy transport unit tests Change-Id: Ia26981a78c477a896370c48768e71f45c364c769 Implements: blueprint placement-service --- marconi/common/access.py | 21 ++++ marconi/common/transport/__init__.py | 0 marconi/common/transport/wsgi/__init__.py | 0 marconi/common/transport/wsgi/helpers.py | 60 +++++++++ marconi/proxy/admin/__init__.py | 0 marconi/proxy/admin/bootstrap.py | 24 ++++ marconi/proxy/{bootstrap.py => base.py} | 9 +- marconi/proxy/public/__init__.py | 0 marconi/proxy/public/bootstrap.py | 24 ++++ marconi/proxy/storage/base.py | 3 +- marconi/proxy/storage/memory/catalogue.py | 12 +- marconi/proxy/storage/memory/driver.py | 2 + marconi/proxy/storage/memory/partitions.py | 2 - marconi/proxy/storage/mongodb/catalogue.py | 8 +- marconi/proxy/transport/wsgi/__init__.py | 4 +- .../proxy/transport/wsgi/admin/__init__.py | 0 .../proxy/transport/wsgi/{ => admin}/app.py | 4 +- marconi/proxy/transport/wsgi/admin/driver.py | 39 ++++++ marconi/proxy/transport/wsgi/driver.py | 119 +++++------------- .../proxy/transport/wsgi/public/__init__.py | 0 marconi/proxy/transport/wsgi/public/app.py | 31 +++++ marconi/proxy/transport/wsgi/public/driver.py | 58 +++++++++ marconi/queues/transport/wsgi/driver.py | 25 +--- setup.cfg | 7 +- test-requirements.txt | 1 + tests/etc/wsgi_proxy_memory.conf | 10 ++ tests/unit/proxy/base.py | 48 ++++--- tests/unit/proxy/test_catalog.py | 99 --------------- tests/unit/proxy/test_catalogue.py | 89 +++++++++++++ tests/unit/proxy/test_partitions.py | 8 ++ 30 files changed, 461 insertions(+), 246 deletions(-) create mode 100644 marconi/common/access.py create mode 100644 marconi/common/transport/__init__.py create mode 100644 marconi/common/transport/wsgi/__init__.py create mode 100644 marconi/common/transport/wsgi/helpers.py create mode 100644 marconi/proxy/admin/__init__.py create mode 100644 marconi/proxy/admin/bootstrap.py rename marconi/proxy/{bootstrap.py => base.py} (87%) create mode 100644 marconi/proxy/public/__init__.py create mode 100644 marconi/proxy/public/bootstrap.py create mode 100644 marconi/proxy/transport/wsgi/admin/__init__.py rename marconi/proxy/transport/wsgi/{ => admin}/app.py (90%) create mode 100644 marconi/proxy/transport/wsgi/admin/driver.py create mode 100644 marconi/proxy/transport/wsgi/public/__init__.py create mode 100644 marconi/proxy/transport/wsgi/public/app.py create mode 100644 marconi/proxy/transport/wsgi/public/driver.py create mode 100644 tests/etc/wsgi_proxy_memory.conf delete mode 100644 tests/unit/proxy/test_catalog.py create mode 100644 tests/unit/proxy/test_catalogue.py diff --git a/marconi/common/access.py b/marconi/common/access.py new file mode 100644 index 000000000..0fb895147 --- /dev/null +++ b/marconi/common/access.py @@ -0,0 +1,21 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# TODO(cpp-cabrera): port to enum34 when that becomes available +class Access(object): + """An enumeration to represent access levels for APIs.""" + public = 1 + admin = 2 diff --git a/marconi/common/transport/__init__.py b/marconi/common/transport/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/common/transport/wsgi/__init__.py b/marconi/common/transport/wsgi/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/common/transport/wsgi/helpers.py b/marconi/common/transport/wsgi/helpers.py new file mode 100644 index 000000000..3b7427738 --- /dev/null +++ b/marconi/common/transport/wsgi/helpers.py @@ -0,0 +1,60 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""wsgi transport helpers.""" +import falcon + + +def extract_project_id(req, resp, params): + """Adds `project_id` to the list of params for all responders + + Meant to be used as a `before` hook. + + :param req: request sent + :type req: falcon.request.Request + :param resp: response object to return + :type resp: falcon.response.Response + :param params: additional parameters passed to responders + :type params: dict + :rtype: None + """ + params['project_id'] = req.get_header('X-PROJECT-ID') + if params['project_id'] == "": + raise falcon.HTTPBadRequest('Empty project header not allowed', + _(u''' +X-PROJECT-ID cannot be an empty string. Specify the right header X-PROJECT-ID +and retry.''')) + + +def require_accepts_json(req, resp, params): + """Raises an exception if the request does not accept JSON + + Meant to be used as a `before` hook. + + :param req: request sent + :type req: falcon.request.Request + :param resp: response object to return + :type resp: falcon.response.Response + :param params: additional parameters passed to responders + :type params: dict + :rtype: None + :raises: falcon.HTTPNotAcceptable + """ + if not req.client_accepts('application/json'): + raise falcon.HTTPNotAcceptable( + u''' +Endpoint only serves `application/json`; specify client-side +media type support with the "Accept" header.''', + href=u'http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html', + href_text=u'14.1 Accept, Hypertext Transfer Protocol -- HTTP/1.1') diff --git a/marconi/proxy/admin/__init__.py b/marconi/proxy/admin/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/proxy/admin/bootstrap.py b/marconi/proxy/admin/bootstrap.py new file mode 100644 index 000000000..fb269425a --- /dev/null +++ b/marconi/proxy/admin/bootstrap.py @@ -0,0 +1,24 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from marconi.common import access +from marconi.proxy import base + + +class Bootstrap(base.Bootstrap): + def __init__(self, config_file=None, cli_args=None): + super(Bootstrap, self).__init__(access.Access.admin, + config_file, + cli_args) diff --git a/marconi/proxy/bootstrap.py b/marconi/proxy/base.py similarity index 87% rename from marconi/proxy/bootstrap.py rename to marconi/proxy/base.py index 5f34b11cb..e5ca81d72 100644 --- a/marconi/proxy/bootstrap.py +++ b/marconi/proxy/base.py @@ -16,6 +16,7 @@ from oslo.config import cfg from stevedore import driver +from marconi.common import access from marconi.common.cache import cache as oslo_cache from marconi.common import config from marconi.common import decorators @@ -39,9 +40,13 @@ class Bootstrap(object): manages their lifetimes. """ - def __init__(self, config_file=None, cli_args=None): + def __init__(self, access_mode, config_file=None, cli_args=None): PROJECT_CFG.load(filename=config_file, args=cli_args) log.setup('marconi_proxy') + form = 'marconi.proxy.{0}.transport' + lookup = {access.Access.public: 'public', + access.Access.admin: 'admin'} + self._transport_type = form.format(lookup[access_mode]) @decorators.lazy_property(write=False) def storage(self): @@ -69,7 +74,7 @@ class Bootstrap(object): def transport(self): LOG.debug(_(u'Loading Proxy Transport Driver')) try: - mgr = driver.DriverManager('marconi.proxy.transport', + mgr = driver.DriverManager(self._transport_type, CFG.transport, invoke_on_load=True, invoke_args=[self.storage, diff --git a/marconi/proxy/public/__init__.py b/marconi/proxy/public/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/proxy/public/bootstrap.py b/marconi/proxy/public/bootstrap.py new file mode 100644 index 000000000..a22879a7d --- /dev/null +++ b/marconi/proxy/public/bootstrap.py @@ -0,0 +1,24 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from marconi.common import access +from marconi.proxy import base + + +class Bootstrap(base.Bootstrap): + def __init__(self, config_file=None, cli_args=None): + super(Bootstrap, self).__init__(access.Access.public, + config_file, + cli_args) diff --git a/marconi/proxy/storage/base.py b/marconi/proxy/storage/base.py index f3fd1a293..3ff521726 100644 --- a/marconi/proxy/storage/base.py +++ b/marconi/proxy/storage/base.py @@ -145,12 +145,13 @@ class CatalogueBase(ControllerBase): """ @abc.abstractmethod - def insert(self, project, queue, partition, metadata={}): + def insert(self, project, queue, partition, host, metadata={}): """Creates a new catalogue entry. :param project: str - Namespace to insert the given queue into :param queue: str - The name of the queue to insert :param partition: str - Partition name where this queue is stored + :param host: text - URL to representative host :param metadata: A dictionary of metadata for this queue """ raise NotImplementedError diff --git a/marconi/proxy/storage/memory/catalogue.py b/marconi/proxy/storage/memory/catalogue.py index cc9819124..0ff3038eb 100644 --- a/marconi/proxy/storage/memory/catalogue.py +++ b/marconi/proxy/storage/memory/catalogue.py @@ -26,8 +26,6 @@ class CatalogueController(base.CatalogueBase): def __init__(self, *args, **kwargs): super(CatalogueController, self).__init__(*args, **kwargs) - - self.driver.db['catalogue'] = {} self._col = self.driver.db['catalogue'] def list(self, project): @@ -47,10 +45,12 @@ class CatalogueController(base.CatalogueBase): return _idx(project, queue) in self._col def insert(self, project, queue, partition, host, metadata={}): - self._col[_idx(project, queue)] = { - 'p': project, 'q': queue, - 'n': partition, 'h': host, 'm': metadata - } + key = _idx(project, queue) + if key not in self._col: + self._col[key] = { + 'p': project, 'q': queue, + 'n': partition, 'h': host, 'm': metadata + } def delete(self, project, queue): try: diff --git a/marconi/proxy/storage/memory/driver.py b/marconi/proxy/storage/memory/driver.py index 2ff0ea6ce..554094fdb 100644 --- a/marconi/proxy/storage/memory/driver.py +++ b/marconi/proxy/storage/memory/driver.py @@ -20,6 +20,8 @@ class Driver(base.DriverBase): def __init__(self): self._db = {} + self._db['partitions'] = {} + self._db['catalogue'] = {} @property def db(self): diff --git a/marconi/proxy/storage/memory/partitions.py b/marconi/proxy/storage/memory/partitions.py index e8f9a8d62..c9ee44d8d 100644 --- a/marconi/proxy/storage/memory/partitions.py +++ b/marconi/proxy/storage/memory/partitions.py @@ -22,8 +22,6 @@ from marconi.proxy.storage import exceptions class PartitionsController(base.PartitionsBase): def __init__(self, *args, **kwargs): super(PartitionsController, self).__init__(*args, **kwargs) - - self.driver.db['partitions'] = {} self._col = self.driver.db['partitions'] def list(self): diff --git a/marconi/proxy/storage/mongodb/catalogue.py b/marconi/proxy/storage/mongodb/catalogue.py index e1aaa543b..f6a73e52b 100644 --- a/marconi/proxy/storage/mongodb/catalogue.py +++ b/marconi/proxy/storage/mongodb/catalogue.py @@ -25,6 +25,7 @@ Schema: 'm': Metadata :: dict } """ +from pymongo import errors import marconi.openstack.common.log as logging from marconi.proxy.storage import base @@ -78,8 +79,11 @@ class CatalogueController(base.CatalogueBase): @utils.raises_conn_error def insert(self, project, queue, partition, host, metadata={}): - self._col.insert({'p': project, 'q': queue, - 'n': partition, 'h': host, 'm': metadata}) + try: + self._col.insert({'p': project, 'q': queue, + 'n': partition, 'h': host, 'm': metadata}) + except errors.DuplicateKeyError: + pass # duplicate insertions are not a problem @utils.raises_conn_error def delete(self, project, queue): diff --git a/marconi/proxy/transport/wsgi/__init__.py b/marconi/proxy/transport/wsgi/__init__.py index 7055f2604..3b7ea0247 100644 --- a/marconi/proxy/transport/wsgi/__init__.py +++ b/marconi/proxy/transport/wsgi/__init__.py @@ -1,6 +1,6 @@ """WSGI Proxy Transport Driver""" -from marconi.queues.transport.wsgi import driver +from marconi.proxy.transport.wsgi import driver # Hoist into package namespace -Driver = driver.Driver +Driver = driver.DriverBase diff --git a/marconi/proxy/transport/wsgi/admin/__init__.py b/marconi/proxy/transport/wsgi/admin/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/proxy/transport/wsgi/app.py b/marconi/proxy/transport/wsgi/admin/app.py similarity index 90% rename from marconi/proxy/transport/wsgi/app.py rename to marconi/proxy/transport/wsgi/admin/app.py index 8562c0fa0..340c02c61 100644 --- a/marconi/proxy/transport/wsgi/app.py +++ b/marconi/proxy/transport/wsgi/admin/app.py @@ -18,7 +18,7 @@ This app should be used by external WSGI containers. For example: - $ gunicorn marconi.proxy.transport.wsgi.app:app + $ gunicorn marconi.proxy.transport.wsgi.admin.app:app NOTE: As for external containers, it is necessary to put config files in the standard paths. There's @@ -26,6 +26,6 @@ no common way to specify / pass configuration files to the WSGI app when it is called from other apps. """ -from marconi.proxy import bootstrap +from marconi.proxy.admin import bootstrap app = bootstrap.Bootstrap().transport.app diff --git a/marconi/proxy/transport/wsgi/admin/driver.py b/marconi/proxy/transport/wsgi/admin/driver.py new file mode 100644 index 000000000..e4d24967e --- /dev/null +++ b/marconi/proxy/transport/wsgi/admin/driver.py @@ -0,0 +1,39 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""marconi-proxy (admin): interface for managing partitions.""" + +from marconi.proxy.transport.wsgi import ( + catalogue, driver, health, partitions, +) + + +class Driver(driver.DriverBase): + def __init__(self, storage, cache): + super(Driver, self).__init__(storage, cache) + + @property + def bridge(self): + return [ + ('/partitions', + partitions.Listing(self.partitions)), + ('/partitions/{partition}', + partitions.Resource(self.partitions)), + ('/catalogue', + catalogue.Listing(self.catalogue)), + ('/catalogue/{queue}', + catalogue.Resource(self.catalogue)), + ('/health', + health.Resource()) + ] diff --git a/marconi/proxy/transport/wsgi/driver.py b/marconi/proxy/transport/wsgi/driver.py index d533b9d87..e7c0548cf 100644 --- a/marconi/proxy/transport/wsgi/driver.py +++ b/marconi/proxy/transport/wsgi/driver.py @@ -12,34 +12,22 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -"""marconi-proxy: maintains a mapping from inserted queues to partitions - -Supports the following operator API: -- [GET] /v1/partitions - lists registered partitions -- [PUT|GET|DELETE] /v1/partitions/{partition} -- [GET] /v1/catalogue - -Running: -- configure marconi.conf appropriately -- gunicorn marconi.proxy.transport.wsgi.app:app -""" +"""marconi-proxy (base): Interface for driver implementations.""" +import abc from wsgiref import simple_server import falcon +import six from marconi.common import config +from marconi.common.transport.wsgi import helpers import marconi.openstack.common.log as logging from marconi.proxy import transport -from marconi.proxy.transport.wsgi import ( - catalogue, forward, health, metadata, - partitions, queues, v1, version -) +from marconi.proxy.transport.wsgi import version from marconi.proxy.utils import round_robin from marconi.queues.transport import auth -_VER = version.path() - OPTIONS = { 'bind': '0.0.0.0', 'port': 8889 @@ -54,92 +42,30 @@ WSGI_CFG = config.namespace('proxy:drivers:transport:wsgi').from_options( LOG = logging.getLogger(__name__) -# TODO(cpp-cabrera): refactor to avoid duplication with queues..wsgi -def _check_media_type(req, resp, params): - if not req.client_accepts('application/json'): - raise falcon.HTTPNotAcceptable( - u''' -Endpoint only serves `application/json`; specify client-side -media type support with the "Accept" header.''', - href=u'http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html', - href_text=u'14.1 Accept, Hypertext Transfer Protocol -- HTTP/1.1') - - -class Driver(transport.DriverBase): +@six.add_metaclass(abc.ABCMeta) +class DriverBase(transport.DriverBase): """Entry point to the proxy :param storage: storage driver to use + :type storage: marconi.proxy.storage.base.DriverBase :param cache: cache driver to use + :type cache: marconi.common.cache.backends.BaseCache """ def __init__(self, storage, cache): - super(Driver, self).__init__(storage, cache) + super(DriverBase, self).__init__(storage, cache) self.app = None - self._catalogue = self.storage.catalogue_controller - self._partitions = self.storage.partitions_controller - self._selector = round_robin.Selector() + self.catalogue = self.storage.catalogue_controller + self.partitions = self.storage.partitions_controller + self.selector = round_robin.Selector() self._init_routes() self._init_middleware() def _init_routes(self): - self.app = falcon.API(before=[_check_media_type]) - - # NOTE(cpp-cabrera): proxy-specififc routes - self.app.add_route(_VER + '/partitions', - partitions.Listing(self._partitions)) - self.app.add_route(_VER + '/partitions/{partition}', - partitions.Resource(self._partitions)) - self.app.add_route(_VER + '/catalogue', - catalogue.Listing(self._catalogue)) - self.app.add_route(_VER + '/catalogue/{queue}', - catalogue.Resource(self._catalogue)) - self.app.add_route(_VER + '/health', - health.Resource()) - - # NOTE(cpp-cabrera): queue handling routes - self.app.add_route(_VER + '/queues', - queues.Listing(self._catalogue)) - self.app.add_route(_VER + '/queues/{queue}', - queues.Resource(self._partitions, - self._catalogue, - self.cache, self._selector)) - - # NOTE(cpp-cabrera): Marconi forwarded routes - self.app.add_route(_VER, - v1.Resource(self._partitions)) - - # NOTE(cpp-cabrera): Marconi forwarded routes involving a queue - self.app.add_route(_VER + '/queues/{queue}/claims', - forward.ClaimCreate(self._partitions, - self._catalogue, - self.cache, - self._selector)) - - self.app.add_route(_VER + '/queues/{queue}/claims/{cid}', - forward.Claim(self._partitions, - self._catalogue, - self.cache, self._selector)) - - self.app.add_route(_VER + '/queues/{queue}/messages', - forward.MessageBulk(self._partitions, - self._catalogue, - self.cache, - self._selector)) - - self.app.add_route(_VER + '/queues/{queue}/messages/{mid}', - forward.Message(self._partitions, - self._catalogue, self.cache, - self._selector)) - - self.app.add_route(_VER + '/queues/{queue}/stats', - forward.Stats(self._partitions, - self._catalogue, - self.cache, self._selector)) - - self.app.add_route(_VER + '/queues/{queue}/metadata', - metadata.Resource(self._partitions, - self._catalogue, - self.cache, self._selector)) + version_path = version.path() + self.app = falcon.API(before=[helpers.require_accepts_json]) + for route, resource in self.bridge: + self.app.add_route(version_path + route, resource) # TODO(cpp-cabrera): refactor to avoid duplication with queues..wsgi def _init_middleware(self): @@ -150,6 +76,17 @@ class Driver(transport.DriverBase): strategy = auth.strategy(GLOBAL_CFG.auth_strategy) self.app = strategy.install(self.app, PROJECT_CFG.conf) + @abc.abstractproperty + def bridge(self): + """Constructs a list of route/responder pairs that can be used to + establish the functionality of this driver. + + Note: the routes should be unversioned. + + :rtype: [(str, falcon-compatible responser)] + """ + raise NotImplementedError + def listen(self): """Listens on the 'bind:port' as per the config.""" diff --git a/marconi/proxy/transport/wsgi/public/__init__.py b/marconi/proxy/transport/wsgi/public/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/proxy/transport/wsgi/public/app.py b/marconi/proxy/transport/wsgi/public/app.py new file mode 100644 index 000000000..0420a8db3 --- /dev/null +++ b/marconi/proxy/transport/wsgi/public/app.py @@ -0,0 +1,31 @@ +# Copyright (c) 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""WSGI App for WSGI Containers + +This app should be used by external WSGI +containers. For example: + + $ gunicorn marconi.proxy.transport.wsgi.public.app:app + +NOTE: As for external containers, it is necessary +to put config files in the standard paths. There's +no common way to specify / pass configuration files +to the WSGI app when it is called from other apps. +""" + +from marconi.proxy.public import bootstrap + +app = bootstrap.Bootstrap().transport.app diff --git a/marconi/proxy/transport/wsgi/public/driver.py b/marconi/proxy/transport/wsgi/public/driver.py new file mode 100644 index 000000000..eb9d93af7 --- /dev/null +++ b/marconi/proxy/transport/wsgi/public/driver.py @@ -0,0 +1,58 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""marconi-proxy (public): maps project/queue to partitions. + +Forwards requests to the appropriate marconi queues server. +""" + +from marconi.proxy.transport.wsgi import ( + driver, forward, health, metadata, + queues, v1 +) + + +class Driver(driver.DriverBase): + + def __init(self, storage, cache): + super(Driver, self).__init__(storage, cache) + + @property + def bridge(self): + forwarder_args = (self.partitions, self.catalogue, + self.cache, self.selector) + return [ + ('/health', health.Resource()), + + # NOTE(cpp-cabrera): queue handling routes + ('/queues', + queues.Listing(self.catalogue)), + ('/queues/{queue}', + queues.Resource(*forwarder_args)), + + # NOTE(cpp-cabrera): Marconi forwarded routes + ('/queues/{queue}/claims', + forward.ClaimCreate(*forwarder_args)), + ('/queues/{queue}/claims/{cid}', + forward.Claim(*forwarder_args)), + ('/queues/{queue}/messages', + forward.MessageBulk(*forwarder_args)), + ('/queues/{queue}/messages/{mid}', + forward.Message(*forwarder_args)), + ('/queues/{queue}/metadata', + metadata.Resource(*forwarder_args)), + ('/queues/{queue}/stats', + forward.Stats(*forwarder_args)), + ('', v1.Resource(self.partitions)) + ] diff --git a/marconi/queues/transport/wsgi/driver.py b/marconi/queues/transport/wsgi/driver.py index 2e4814cd9..569e8b322 100644 --- a/marconi/queues/transport/wsgi/driver.py +++ b/marconi/queues/transport/wsgi/driver.py @@ -17,6 +17,7 @@ import falcon from wsgiref import simple_server from marconi.common import config +from marconi.common.transport.wsgi import helpers import marconi.openstack.common.log as logging from marconi.queues import transport from marconi.queues.transport import auth @@ -42,25 +43,6 @@ WSGI_CFG = config.namespace('queues:drivers:transport:wsgi').from_options( LOG = logging.getLogger(__name__) -def _check_media_type(req, resp, params): - if not req.client_accepts('application/json'): - raise falcon.HTTPNotAcceptable( - u''' -Endpoint only serves `application/json`; specify client-side -media type support with the "Accept" header.''', - href=u'http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html', - href_text=u'14.1 Accept, Hypertext Transfer Protocol -- HTTP/1.1') - - -def _extract_project_id(req, resp, params): - params['project_id'] = req.get_header('X-PROJECT-ID') - if params['project_id'] == "": - raise falcon.HTTPBadRequest('Empty project header not allowed', - _(u''' -X-PROJECT-ID cannot be an empty string. Specify the right header X-PROJECT-ID -and retry.''')) - - class Driver(transport.DriverBase): def __init__(self, storage): @@ -71,7 +53,10 @@ class Driver(transport.DriverBase): def _init_routes(self): """Initialize URI routes to resources.""" - self.app = falcon.API(before=[_check_media_type, _extract_project_id]) + self.app = falcon.API(before=[ + helpers.require_accepts_json, + helpers.extract_project_id + ]) queue_controller = self.storage.queue_controller message_controller = self.storage.message_controller diff --git a/setup.cfg b/setup.cfg index b9e737504..ffa67efb6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -41,8 +41,11 @@ marconi.proxy.storage = memory = marconi.proxy.storage.memory.driver:Driver mongodb = marconi.proxy.storage.mongodb.driver:Driver -marconi.proxy.transport = - wsgi = marconi.proxy.transport.wsgi.driver:Driver +marconi.proxy.public.transport = + wsgi = marconi.proxy.transport.wsgi.public.driver:Driver + +marconi.proxy.admin.transport = + wsgi = marconi.proxy.transport.wsgi.admin.driver:Driver [nosetests] where=tests diff --git a/test-requirements.txt b/test-requirements.txt index d6d45735b..c1e68d949 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -8,6 +8,7 @@ mock>=1.0 ddt>=0.4.0 discover fixtures>=0.3.14 +httpretty>=0.6.3 python-subunit testrepository>=0.0.17 testtools>=0.9.32 diff --git a/tests/etc/wsgi_proxy_memory.conf b/tests/etc/wsgi_proxy_memory.conf new file mode 100644 index 000000000..2c179f315 --- /dev/null +++ b/tests/etc/wsgi_proxy_memory.conf @@ -0,0 +1,10 @@ +[DEFAULT] +debug = False +verbose = False + +[proxy:drivers] +transport = wsgi +storage = memory + +[proxy:drivers:transport:wsgi] +port = 8888 diff --git a/tests/unit/proxy/base.py b/tests/unit/proxy/base.py index 4e4037d6b..85dedeae4 100644 --- a/tests/unit/proxy/base.py +++ b/tests/unit/proxy/base.py @@ -14,30 +14,44 @@ # See the License for the specific language governing permissions and # limitations under the License. -import multiprocessing -from wsgiref import simple_server - from falcon import testing as ftest -from marconi.proxy import bootstrap +from marconi.proxy.admin import bootstrap as admin +from marconi.proxy.transport.wsgi import ( + queues, version +) +from marconi.proxy.utils import round_robin from tests.unit.queues.transport.wsgi import base class TestBase(base.TestBase): - def setUp(self): - super(base.TestBase, self).setUp() + config_filename = "wsgi_proxy_memory.conf" - self.proxy = bootstrap.Bootstrap() - self.app = self.proxy.transport.app + @classmethod + def setUpClass(cls): + super(TestBase, cls).setUpClass() + TestBase._proxy = admin.Bootstrap() + + TestBase._app = TestBase._proxy.transport.app + partitions_controller = TestBase._proxy.storage.partitions_controller + catalogue_controller = TestBase._proxy.storage.catalogue_controller + cache = TestBase._proxy.cache + selector = round_robin.Selector() + + # NOTE(cpp-cabrera): allow for queue creation: needed for + # catalogue tests + TestBase._app.add_route(version.path() + '/queues/{queue}', + queues.Resource(partitions_controller, + catalogue_controller, + cache, selector)) + + def setUp(self): + super(TestBase, self).setUp() + self.app = TestBase._app + self.proxy = TestBase._proxy self.srmock = ftest.StartResponseMock() - -def make_app_daemon(host, port, app): - httpd = simple_server.make_server(host, port, app) - process = multiprocessing.Process(target=httpd.serve_forever, - name='marconi_' + str(port)) - process.daemon = True - process.start() - - return process + @classmethod + def tearDownClass(cls): + super(TestBase, cls).tearDownClass() diff --git a/tests/unit/proxy/test_catalog.py b/tests/unit/proxy/test_catalog.py deleted file mode 100644 index 19bb06752..000000000 --- a/tests/unit/proxy/test_catalog.py +++ /dev/null @@ -1,99 +0,0 @@ -# Copyright (c) 2013 Rackspace, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# -# See the License for the specific language governing permissions and -# limitations under the License. -import json -import random - -import falcon - -from marconi.proxy import bootstrap as proxy_bootstrap -from marconi.queues import bootstrap - -import base # noqa - - -class CatalogTest(base.TestBase): - - servers = [] - - @classmethod - def setUpClass(cls): - ports = range(8900, 8903) - cls.proxy = proxy_bootstrap.Bootstrap() - app = bootstrap.Bootstrap().transport.app - cls.servers = [base.make_app_daemon('localhost', pt, app) - for pt in ports] - # TODO(cpp-cabrera): allow trailing slash - cls.urls = ['http://127.0.0.1:%d' % pt for pt in ports] - - @classmethod - def tearDownClass(cls): - for p in cls.servers: - p.terminate() - - def tearDown(self): - CatalogTest.proxy.cache.flush() - CatalogTest.proxy.storage.catalogue_controller.drop_all() - super(CatalogTest, self).tearDown() - - def __add_partitions(self): - for name, url, weight in zip( - [server.name for server in self.servers], - self.urls, - random.sample(xrange(100), len(self.urls))): - doc = {'hosts': [url], 'weight': weight} - self.simulate_put('/v1/partitions/' + name, - body=json.dumps(doc)) - self.assertEquals(self.srmock.status, falcon.HTTP_201) - - def test_simple(self): - path = '/v1/catalogue' - - # TODO(cpp-cabrera): use queue creating/deleting cmgrs - queue_names = ['arakawa', 'bridge'] - - # No catalog created yet - self.simulate_get(path) - self.assertEquals(self.srmock.status, falcon.HTTP_204) - - # TODO(cpp-cabrera): what if there is no partition? - - self.__add_partitions() - - # Queue is not touched - result = self.simulate_get('/v1/catalogue/' + queue_names[0]) - self.assertEquals(self.srmock.status, falcon.HTTP_404) - - # Create queues (and implicitly, catalog) - for name in queue_names: - self.simulate_put('/v1/queues/' + name) - self.assertEquals(self.srmock.status, falcon.HTTP_201) - - result = self.simulate_get(path) - self.assertEquals(self.srmock.status, falcon.HTTP_200) - - doc = json.loads(result[0]) - - for name in queue_names: - self.assertIn(name, doc) - self.assertIn(doc[name]['host'], self.urls) - - result = self.simulate_get('/v1/catalogue/' + name) - self.assertEquals(self.srmock.status, falcon.HTTP_200) - - each_doc = json.loads(result[0]) - self.assertEquals(each_doc, doc[name]) - - self.simulate_delete('/v1/queues/' + name) diff --git a/tests/unit/proxy/test_catalogue.py b/tests/unit/proxy/test_catalogue.py new file mode 100644 index 000000000..564e416f9 --- /dev/null +++ b/tests/unit/proxy/test_catalogue.py @@ -0,0 +1,89 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. +import json +import uuid + +import falcon +import httpretty + +import base # noqa + + +class CatalogueTest(base.TestBase): + + servers = [] + + @classmethod + def setUpClass(cls): + super(CatalogueTest, cls).setUpClass() + + def setUp(self): + super(CatalogueTest, self).setUp() + self.host = 'http://localhost:8000' + self.partition_name = str(uuid.uuid1()) + self.partition = '/v1/partitions/' + self.partition_name + + # create a partition + doc = {'weight': 100, 'hosts': [self.host]} + self.simulate_put(self.partition, body=json.dumps(doc)) + + def tearDown(self): + self.simulate_delete(self.partition) + super(CatalogueTest, self).tearDown() + + @classmethod + def tearDownClass(cls): + super(CatalogueTest, cls).tearDownClass() + + def test_list_empty(self): + self.simulate_get('/v1/catalogue') + self.assertEquals(self.srmock.status, falcon.HTTP_204) + + @httpretty.activate + def test_simple(self): + queues = ['arakawa', 'bridge'] + + self.simulate_get('/v1/catalogue/' + queues[0]) + self.assertEquals(self.srmock.status, falcon.HTTP_404) + + # Create queues + for name in queues: + uri = '{0}/v1/queues/{1}'.format(self.host, name) + httpretty.register_uri(httpretty.PUT, uri, status=201) + + self.simulate_put('/v1/queues/' + name) + self.assertEqual(self.srmock.status, falcon.HTTP_201) + + for name in queues: + # mock out forwarding + uri = '{0}/v1/queues/{1}'.format(self.host, name) + httpretty.register_uri(httpretty.DELETE, uri, status=204) + + # fetch from the catalogue + result = self.simulate_get('/v1/catalogue/' + name) + data = json.loads(result[0]) + self.assertEqual(data['name'], name) + self.assertEqual(data['partition'], self.partition_name) + self.assertEqual(data['host'], self.host) + self.assertEquals(self.srmock.status, falcon.HTTP_200) + + # delete queues, implicitly removing from catalogue + self.simulate_delete('/v1/queues/' + name) + self.assertEqual(self.srmock.status, falcon.HTTP_204) + + # ensure entries were removed from catalogue + self.simulate_get('/v1/catalogue/' + name) + self.assertEqual(self.srmock.status, falcon.HTTP_404) diff --git a/tests/unit/proxy/test_partitions.py b/tests/unit/proxy/test_partitions.py index 1101a87a1..199e21e06 100644 --- a/tests/unit/proxy/test_partitions.py +++ b/tests/unit/proxy/test_partitions.py @@ -24,6 +24,10 @@ import base # noqa class PartitionTest(base.TestBase): + @classmethod + def setUpClass(cls): + super(PartitionTest, cls).setUpClass() + def setUp(self): super(PartitionTest, self).setUp() self.path = '/v1/partitions' @@ -35,6 +39,10 @@ class PartitionTest(base.TestBase): self.proxy.storage.partitions_controller.drop_all() super(PartitionTest, self).tearDown() + @classmethod + def tearDownClass(cls): + super(PartitionTest, cls).tearDownClass() + def test_simple(self): # No partition self.simulate_get(self.partition)