feat: split proxy API into admin and public apps
Rationale: when deploying the proxy, operators should be the only ones that have access to the administrative functionality - primarily, the management of partitions. This split makes it possible to deploy the forwarding portion of the proxy independently of the administrative portion. The appropriate changes in tests, setup.cfg, and configuration files (see: I7cf25e47ecff47934b50c21000b31308e1a4c8a9) were made. Certain helpers that are reused for wsgi transport implementations were extracted to a common location to aid reuse. Unit tests were also simplified for the proxy, with the intent to make them more thorough in coming patches. New test requirement: httpretty - used to perform request mocking during proxy transport unit tests Change-Id: Ia26981a78c477a896370c48768e71f45c364c769 Implements: blueprint placement-service
This commit is contained in:
parent
910451514d
commit
53c386af68
21
marconi/common/access.py
Normal file
21
marconi/common/access.py
Normal file
@ -0,0 +1,21 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
# TODO(cpp-cabrera): port to enum34 when that becomes available
|
||||
class Access(object):
|
||||
"""An enumeration to represent access levels for APIs."""
|
||||
public = 1
|
||||
admin = 2
|
0
marconi/common/transport/__init__.py
Normal file
0
marconi/common/transport/__init__.py
Normal file
0
marconi/common/transport/wsgi/__init__.py
Normal file
0
marconi/common/transport/wsgi/__init__.py
Normal file
60
marconi/common/transport/wsgi/helpers.py
Normal file
60
marconi/common/transport/wsgi/helpers.py
Normal file
@ -0,0 +1,60 @@
|
||||
# Copyright (c) 2013 Rackspace Hosting, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""wsgi transport helpers."""
|
||||
import falcon
|
||||
|
||||
|
||||
def extract_project_id(req, resp, params):
|
||||
"""Adds `project_id` to the list of params for all responders
|
||||
|
||||
Meant to be used as a `before` hook.
|
||||
|
||||
:param req: request sent
|
||||
:type req: falcon.request.Request
|
||||
:param resp: response object to return
|
||||
:type resp: falcon.response.Response
|
||||
:param params: additional parameters passed to responders
|
||||
:type params: dict
|
||||
:rtype: None
|
||||
"""
|
||||
params['project_id'] = req.get_header('X-PROJECT-ID')
|
||||
if params['project_id'] == "":
|
||||
raise falcon.HTTPBadRequest('Empty project header not allowed',
|
||||
_(u'''
|
||||
X-PROJECT-ID cannot be an empty string. Specify the right header X-PROJECT-ID
|
||||
and retry.'''))
|
||||
|
||||
|
||||
def require_accepts_json(req, resp, params):
|
||||
"""Raises an exception if the request does not accept JSON
|
||||
|
||||
Meant to be used as a `before` hook.
|
||||
|
||||
:param req: request sent
|
||||
:type req: falcon.request.Request
|
||||
:param resp: response object to return
|
||||
:type resp: falcon.response.Response
|
||||
:param params: additional parameters passed to responders
|
||||
:type params: dict
|
||||
:rtype: None
|
||||
:raises: falcon.HTTPNotAcceptable
|
||||
"""
|
||||
if not req.client_accepts('application/json'):
|
||||
raise falcon.HTTPNotAcceptable(
|
||||
u'''
|
||||
Endpoint only serves `application/json`; specify client-side
|
||||
media type support with the "Accept" header.''',
|
||||
href=u'http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html',
|
||||
href_text=u'14.1 Accept, Hypertext Transfer Protocol -- HTTP/1.1')
|
0
marconi/proxy/admin/__init__.py
Normal file
0
marconi/proxy/admin/__init__.py
Normal file
24
marconi/proxy/admin/bootstrap.py
Normal file
24
marconi/proxy/admin/bootstrap.py
Normal file
@ -0,0 +1,24 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from marconi.common import access
|
||||
from marconi.proxy import base
|
||||
|
||||
|
||||
class Bootstrap(base.Bootstrap):
|
||||
def __init__(self, config_file=None, cli_args=None):
|
||||
super(Bootstrap, self).__init__(access.Access.admin,
|
||||
config_file,
|
||||
cli_args)
|
@ -16,6 +16,7 @@
|
||||
from oslo.config import cfg
|
||||
from stevedore import driver
|
||||
|
||||
from marconi.common import access
|
||||
from marconi.common.cache import cache as oslo_cache
|
||||
from marconi.common import config
|
||||
from marconi.common import decorators
|
||||
@ -39,9 +40,13 @@ class Bootstrap(object):
|
||||
manages their lifetimes.
|
||||
"""
|
||||
|
||||
def __init__(self, config_file=None, cli_args=None):
|
||||
def __init__(self, access_mode, config_file=None, cli_args=None):
|
||||
PROJECT_CFG.load(filename=config_file, args=cli_args)
|
||||
log.setup('marconi_proxy')
|
||||
form = 'marconi.proxy.{0}.transport'
|
||||
lookup = {access.Access.public: 'public',
|
||||
access.Access.admin: 'admin'}
|
||||
self._transport_type = form.format(lookup[access_mode])
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def storage(self):
|
||||
@ -69,7 +74,7 @@ class Bootstrap(object):
|
||||
def transport(self):
|
||||
LOG.debug(_(u'Loading Proxy Transport Driver'))
|
||||
try:
|
||||
mgr = driver.DriverManager('marconi.proxy.transport',
|
||||
mgr = driver.DriverManager(self._transport_type,
|
||||
CFG.transport,
|
||||
invoke_on_load=True,
|
||||
invoke_args=[self.storage,
|
0
marconi/proxy/public/__init__.py
Normal file
0
marconi/proxy/public/__init__.py
Normal file
24
marconi/proxy/public/bootstrap.py
Normal file
24
marconi/proxy/public/bootstrap.py
Normal file
@ -0,0 +1,24 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from marconi.common import access
|
||||
from marconi.proxy import base
|
||||
|
||||
|
||||
class Bootstrap(base.Bootstrap):
|
||||
def __init__(self, config_file=None, cli_args=None):
|
||||
super(Bootstrap, self).__init__(access.Access.public,
|
||||
config_file,
|
||||
cli_args)
|
@ -145,12 +145,13 @@ class CatalogueBase(ControllerBase):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def insert(self, project, queue, partition, metadata={}):
|
||||
def insert(self, project, queue, partition, host, metadata={}):
|
||||
"""Creates a new catalogue entry.
|
||||
|
||||
:param project: str - Namespace to insert the given queue into
|
||||
:param queue: str - The name of the queue to insert
|
||||
:param partition: str - Partition name where this queue is stored
|
||||
:param host: text - URL to representative host
|
||||
:param metadata: A dictionary of metadata for this queue
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
@ -26,8 +26,6 @@ class CatalogueController(base.CatalogueBase):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CatalogueController, self).__init__(*args, **kwargs)
|
||||
|
||||
self.driver.db['catalogue'] = {}
|
||||
self._col = self.driver.db['catalogue']
|
||||
|
||||
def list(self, project):
|
||||
@ -47,10 +45,12 @@ class CatalogueController(base.CatalogueBase):
|
||||
return _idx(project, queue) in self._col
|
||||
|
||||
def insert(self, project, queue, partition, host, metadata={}):
|
||||
self._col[_idx(project, queue)] = {
|
||||
'p': project, 'q': queue,
|
||||
'n': partition, 'h': host, 'm': metadata
|
||||
}
|
||||
key = _idx(project, queue)
|
||||
if key not in self._col:
|
||||
self._col[key] = {
|
||||
'p': project, 'q': queue,
|
||||
'n': partition, 'h': host, 'm': metadata
|
||||
}
|
||||
|
||||
def delete(self, project, queue):
|
||||
try:
|
||||
|
@ -20,6 +20,8 @@ class Driver(base.DriverBase):
|
||||
|
||||
def __init__(self):
|
||||
self._db = {}
|
||||
self._db['partitions'] = {}
|
||||
self._db['catalogue'] = {}
|
||||
|
||||
@property
|
||||
def db(self):
|
||||
|
@ -22,8 +22,6 @@ from marconi.proxy.storage import exceptions
|
||||
class PartitionsController(base.PartitionsBase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(PartitionsController, self).__init__(*args, **kwargs)
|
||||
|
||||
self.driver.db['partitions'] = {}
|
||||
self._col = self.driver.db['partitions']
|
||||
|
||||
def list(self):
|
||||
|
@ -25,6 +25,7 @@ Schema:
|
||||
'm': Metadata :: dict
|
||||
}
|
||||
"""
|
||||
from pymongo import errors
|
||||
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.proxy.storage import base
|
||||
@ -78,8 +79,11 @@ class CatalogueController(base.CatalogueBase):
|
||||
|
||||
@utils.raises_conn_error
|
||||
def insert(self, project, queue, partition, host, metadata={}):
|
||||
self._col.insert({'p': project, 'q': queue,
|
||||
'n': partition, 'h': host, 'm': metadata})
|
||||
try:
|
||||
self._col.insert({'p': project, 'q': queue,
|
||||
'n': partition, 'h': host, 'm': metadata})
|
||||
except errors.DuplicateKeyError:
|
||||
pass # duplicate insertions are not a problem
|
||||
|
||||
@utils.raises_conn_error
|
||||
def delete(self, project, queue):
|
||||
|
@ -1,6 +1,6 @@
|
||||
"""WSGI Proxy Transport Driver"""
|
||||
|
||||
from marconi.queues.transport.wsgi import driver
|
||||
from marconi.proxy.transport.wsgi import driver
|
||||
|
||||
# Hoist into package namespace
|
||||
Driver = driver.Driver
|
||||
Driver = driver.DriverBase
|
||||
|
0
marconi/proxy/transport/wsgi/admin/__init__.py
Normal file
0
marconi/proxy/transport/wsgi/admin/__init__.py
Normal file
@ -18,7 +18,7 @@
|
||||
This app should be used by external WSGI
|
||||
containers. For example:
|
||||
|
||||
$ gunicorn marconi.proxy.transport.wsgi.app:app
|
||||
$ gunicorn marconi.proxy.transport.wsgi.admin.app:app
|
||||
|
||||
NOTE: As for external containers, it is necessary
|
||||
to put config files in the standard paths. There's
|
||||
@ -26,6 +26,6 @@ no common way to specify / pass configuration files
|
||||
to the WSGI app when it is called from other apps.
|
||||
"""
|
||||
|
||||
from marconi.proxy import bootstrap
|
||||
from marconi.proxy.admin import bootstrap
|
||||
|
||||
app = bootstrap.Bootstrap().transport.app
|
39
marconi/proxy/transport/wsgi/admin/driver.py
Normal file
39
marconi/proxy/transport/wsgi/admin/driver.py
Normal file
@ -0,0 +1,39 @@
|
||||
# Copyright (c) 2013 Rackspace Hosting, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""marconi-proxy (admin): interface for managing partitions."""
|
||||
|
||||
from marconi.proxy.transport.wsgi import (
|
||||
catalogue, driver, health, partitions,
|
||||
)
|
||||
|
||||
|
||||
class Driver(driver.DriverBase):
|
||||
def __init__(self, storage, cache):
|
||||
super(Driver, self).__init__(storage, cache)
|
||||
|
||||
@property
|
||||
def bridge(self):
|
||||
return [
|
||||
('/partitions',
|
||||
partitions.Listing(self.partitions)),
|
||||
('/partitions/{partition}',
|
||||
partitions.Resource(self.partitions)),
|
||||
('/catalogue',
|
||||
catalogue.Listing(self.catalogue)),
|
||||
('/catalogue/{queue}',
|
||||
catalogue.Resource(self.catalogue)),
|
||||
('/health',
|
||||
health.Resource())
|
||||
]
|
@ -12,34 +12,22 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""marconi-proxy: maintains a mapping from inserted queues to partitions
|
||||
|
||||
Supports the following operator API:
|
||||
- [GET] /v1/partitions - lists registered partitions
|
||||
- [PUT|GET|DELETE] /v1/partitions/{partition}
|
||||
- [GET] /v1/catalogue
|
||||
|
||||
Running:
|
||||
- configure marconi.conf appropriately
|
||||
- gunicorn marconi.proxy.transport.wsgi.app:app
|
||||
"""
|
||||
"""marconi-proxy (base): Interface for driver implementations."""
|
||||
import abc
|
||||
from wsgiref import simple_server
|
||||
|
||||
import falcon
|
||||
import six
|
||||
|
||||
from marconi.common import config
|
||||
from marconi.common.transport.wsgi import helpers
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.proxy import transport
|
||||
from marconi.proxy.transport.wsgi import (
|
||||
catalogue, forward, health, metadata,
|
||||
partitions, queues, v1, version
|
||||
)
|
||||
from marconi.proxy.transport.wsgi import version
|
||||
from marconi.proxy.utils import round_robin
|
||||
from marconi.queues.transport import auth
|
||||
|
||||
|
||||
_VER = version.path()
|
||||
|
||||
OPTIONS = {
|
||||
'bind': '0.0.0.0',
|
||||
'port': 8889
|
||||
@ -54,92 +42,30 @@ WSGI_CFG = config.namespace('proxy:drivers:transport:wsgi').from_options(
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# TODO(cpp-cabrera): refactor to avoid duplication with queues..wsgi
|
||||
def _check_media_type(req, resp, params):
|
||||
if not req.client_accepts('application/json'):
|
||||
raise falcon.HTTPNotAcceptable(
|
||||
u'''
|
||||
Endpoint only serves `application/json`; specify client-side
|
||||
media type support with the "Accept" header.''',
|
||||
href=u'http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html',
|
||||
href_text=u'14.1 Accept, Hypertext Transfer Protocol -- HTTP/1.1')
|
||||
|
||||
|
||||
class Driver(transport.DriverBase):
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class DriverBase(transport.DriverBase):
|
||||
"""Entry point to the proxy
|
||||
|
||||
:param storage: storage driver to use
|
||||
:type storage: marconi.proxy.storage.base.DriverBase
|
||||
:param cache: cache driver to use
|
||||
:type cache: marconi.common.cache.backends.BaseCache
|
||||
"""
|
||||
def __init__(self, storage, cache):
|
||||
super(Driver, self).__init__(storage, cache)
|
||||
super(DriverBase, self).__init__(storage, cache)
|
||||
self.app = None
|
||||
self._catalogue = self.storage.catalogue_controller
|
||||
self._partitions = self.storage.partitions_controller
|
||||
self._selector = round_robin.Selector()
|
||||
self.catalogue = self.storage.catalogue_controller
|
||||
self.partitions = self.storage.partitions_controller
|
||||
self.selector = round_robin.Selector()
|
||||
|
||||
self._init_routes()
|
||||
self._init_middleware()
|
||||
|
||||
def _init_routes(self):
|
||||
self.app = falcon.API(before=[_check_media_type])
|
||||
|
||||
# NOTE(cpp-cabrera): proxy-specififc routes
|
||||
self.app.add_route(_VER + '/partitions',
|
||||
partitions.Listing(self._partitions))
|
||||
self.app.add_route(_VER + '/partitions/{partition}',
|
||||
partitions.Resource(self._partitions))
|
||||
self.app.add_route(_VER + '/catalogue',
|
||||
catalogue.Listing(self._catalogue))
|
||||
self.app.add_route(_VER + '/catalogue/{queue}',
|
||||
catalogue.Resource(self._catalogue))
|
||||
self.app.add_route(_VER + '/health',
|
||||
health.Resource())
|
||||
|
||||
# NOTE(cpp-cabrera): queue handling routes
|
||||
self.app.add_route(_VER + '/queues',
|
||||
queues.Listing(self._catalogue))
|
||||
self.app.add_route(_VER + '/queues/{queue}',
|
||||
queues.Resource(self._partitions,
|
||||
self._catalogue,
|
||||
self.cache, self._selector))
|
||||
|
||||
# NOTE(cpp-cabrera): Marconi forwarded routes
|
||||
self.app.add_route(_VER,
|
||||
v1.Resource(self._partitions))
|
||||
|
||||
# NOTE(cpp-cabrera): Marconi forwarded routes involving a queue
|
||||
self.app.add_route(_VER + '/queues/{queue}/claims',
|
||||
forward.ClaimCreate(self._partitions,
|
||||
self._catalogue,
|
||||
self.cache,
|
||||
self._selector))
|
||||
|
||||
self.app.add_route(_VER + '/queues/{queue}/claims/{cid}',
|
||||
forward.Claim(self._partitions,
|
||||
self._catalogue,
|
||||
self.cache, self._selector))
|
||||
|
||||
self.app.add_route(_VER + '/queues/{queue}/messages',
|
||||
forward.MessageBulk(self._partitions,
|
||||
self._catalogue,
|
||||
self.cache,
|
||||
self._selector))
|
||||
|
||||
self.app.add_route(_VER + '/queues/{queue}/messages/{mid}',
|
||||
forward.Message(self._partitions,
|
||||
self._catalogue, self.cache,
|
||||
self._selector))
|
||||
|
||||
self.app.add_route(_VER + '/queues/{queue}/stats',
|
||||
forward.Stats(self._partitions,
|
||||
self._catalogue,
|
||||
self.cache, self._selector))
|
||||
|
||||
self.app.add_route(_VER + '/queues/{queue}/metadata',
|
||||
metadata.Resource(self._partitions,
|
||||
self._catalogue,
|
||||
self.cache, self._selector))
|
||||
version_path = version.path()
|
||||
self.app = falcon.API(before=[helpers.require_accepts_json])
|
||||
for route, resource in self.bridge:
|
||||
self.app.add_route(version_path + route, resource)
|
||||
|
||||
# TODO(cpp-cabrera): refactor to avoid duplication with queues..wsgi
|
||||
def _init_middleware(self):
|
||||
@ -150,6 +76,17 @@ class Driver(transport.DriverBase):
|
||||
strategy = auth.strategy(GLOBAL_CFG.auth_strategy)
|
||||
self.app = strategy.install(self.app, PROJECT_CFG.conf)
|
||||
|
||||
@abc.abstractproperty
|
||||
def bridge(self):
|
||||
"""Constructs a list of route/responder pairs that can be used to
|
||||
establish the functionality of this driver.
|
||||
|
||||
Note: the routes should be unversioned.
|
||||
|
||||
:rtype: [(str, falcon-compatible responser)]
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def listen(self):
|
||||
"""Listens on the 'bind:port' as per the config."""
|
||||
|
||||
|
0
marconi/proxy/transport/wsgi/public/__init__.py
Normal file
0
marconi/proxy/transport/wsgi/public/__init__.py
Normal file
31
marconi/proxy/transport/wsgi/public/app.py
Normal file
31
marconi/proxy/transport/wsgi/public/app.py
Normal file
@ -0,0 +1,31 @@
|
||||
# Copyright (c) 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""WSGI App for WSGI Containers
|
||||
|
||||
This app should be used by external WSGI
|
||||
containers. For example:
|
||||
|
||||
$ gunicorn marconi.proxy.transport.wsgi.public.app:app
|
||||
|
||||
NOTE: As for external containers, it is necessary
|
||||
to put config files in the standard paths. There's
|
||||
no common way to specify / pass configuration files
|
||||
to the WSGI app when it is called from other apps.
|
||||
"""
|
||||
|
||||
from marconi.proxy.public import bootstrap
|
||||
|
||||
app = bootstrap.Bootstrap().transport.app
|
58
marconi/proxy/transport/wsgi/public/driver.py
Normal file
58
marconi/proxy/transport/wsgi/public/driver.py
Normal file
@ -0,0 +1,58 @@
|
||||
# Copyright (c) 2013 Rackspace Hosting, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""marconi-proxy (public): maps project/queue to partitions.
|
||||
|
||||
Forwards requests to the appropriate marconi queues server.
|
||||
"""
|
||||
|
||||
from marconi.proxy.transport.wsgi import (
|
||||
driver, forward, health, metadata,
|
||||
queues, v1
|
||||
)
|
||||
|
||||
|
||||
class Driver(driver.DriverBase):
|
||||
|
||||
def __init(self, storage, cache):
|
||||
super(Driver, self).__init__(storage, cache)
|
||||
|
||||
@property
|
||||
def bridge(self):
|
||||
forwarder_args = (self.partitions, self.catalogue,
|
||||
self.cache, self.selector)
|
||||
return [
|
||||
('/health', health.Resource()),
|
||||
|
||||
# NOTE(cpp-cabrera): queue handling routes
|
||||
('/queues',
|
||||
queues.Listing(self.catalogue)),
|
||||
('/queues/{queue}',
|
||||
queues.Resource(*forwarder_args)),
|
||||
|
||||
# NOTE(cpp-cabrera): Marconi forwarded routes
|
||||
('/queues/{queue}/claims',
|
||||
forward.ClaimCreate(*forwarder_args)),
|
||||
('/queues/{queue}/claims/{cid}',
|
||||
forward.Claim(*forwarder_args)),
|
||||
('/queues/{queue}/messages',
|
||||
forward.MessageBulk(*forwarder_args)),
|
||||
('/queues/{queue}/messages/{mid}',
|
||||
forward.Message(*forwarder_args)),
|
||||
('/queues/{queue}/metadata',
|
||||
metadata.Resource(*forwarder_args)),
|
||||
('/queues/{queue}/stats',
|
||||
forward.Stats(*forwarder_args)),
|
||||
('', v1.Resource(self.partitions))
|
||||
]
|
@ -17,6 +17,7 @@ import falcon
|
||||
from wsgiref import simple_server
|
||||
|
||||
from marconi.common import config
|
||||
from marconi.common.transport.wsgi import helpers
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.queues import transport
|
||||
from marconi.queues.transport import auth
|
||||
@ -42,25 +43,6 @@ WSGI_CFG = config.namespace('queues:drivers:transport:wsgi').from_options(
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _check_media_type(req, resp, params):
|
||||
if not req.client_accepts('application/json'):
|
||||
raise falcon.HTTPNotAcceptable(
|
||||
u'''
|
||||
Endpoint only serves `application/json`; specify client-side
|
||||
media type support with the "Accept" header.''',
|
||||
href=u'http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html',
|
||||
href_text=u'14.1 Accept, Hypertext Transfer Protocol -- HTTP/1.1')
|
||||
|
||||
|
||||
def _extract_project_id(req, resp, params):
|
||||
params['project_id'] = req.get_header('X-PROJECT-ID')
|
||||
if params['project_id'] == "":
|
||||
raise falcon.HTTPBadRequest('Empty project header not allowed',
|
||||
_(u'''
|
||||
X-PROJECT-ID cannot be an empty string. Specify the right header X-PROJECT-ID
|
||||
and retry.'''))
|
||||
|
||||
|
||||
class Driver(transport.DriverBase):
|
||||
|
||||
def __init__(self, storage):
|
||||
@ -71,7 +53,10 @@ class Driver(transport.DriverBase):
|
||||
|
||||
def _init_routes(self):
|
||||
"""Initialize URI routes to resources."""
|
||||
self.app = falcon.API(before=[_check_media_type, _extract_project_id])
|
||||
self.app = falcon.API(before=[
|
||||
helpers.require_accepts_json,
|
||||
helpers.extract_project_id
|
||||
])
|
||||
|
||||
queue_controller = self.storage.queue_controller
|
||||
message_controller = self.storage.message_controller
|
||||
|
@ -41,8 +41,11 @@ marconi.proxy.storage =
|
||||
memory = marconi.proxy.storage.memory.driver:Driver
|
||||
mongodb = marconi.proxy.storage.mongodb.driver:Driver
|
||||
|
||||
marconi.proxy.transport =
|
||||
wsgi = marconi.proxy.transport.wsgi.driver:Driver
|
||||
marconi.proxy.public.transport =
|
||||
wsgi = marconi.proxy.transport.wsgi.public.driver:Driver
|
||||
|
||||
marconi.proxy.admin.transport =
|
||||
wsgi = marconi.proxy.transport.wsgi.admin.driver:Driver
|
||||
|
||||
[nosetests]
|
||||
where=tests
|
||||
|
@ -8,6 +8,7 @@ mock>=1.0
|
||||
ddt>=0.4.0
|
||||
discover
|
||||
fixtures>=0.3.14
|
||||
httpretty>=0.6.3
|
||||
python-subunit
|
||||
testrepository>=0.0.17
|
||||
testtools>=0.9.32
|
||||
|
10
tests/etc/wsgi_proxy_memory.conf
Normal file
10
tests/etc/wsgi_proxy_memory.conf
Normal file
@ -0,0 +1,10 @@
|
||||
[DEFAULT]
|
||||
debug = False
|
||||
verbose = False
|
||||
|
||||
[proxy:drivers]
|
||||
transport = wsgi
|
||||
storage = memory
|
||||
|
||||
[proxy:drivers:transport:wsgi]
|
||||
port = 8888
|
@ -14,30 +14,44 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import multiprocessing
|
||||
from wsgiref import simple_server
|
||||
|
||||
from falcon import testing as ftest
|
||||
|
||||
from marconi.proxy import bootstrap
|
||||
from marconi.proxy.admin import bootstrap as admin
|
||||
from marconi.proxy.transport.wsgi import (
|
||||
queues, version
|
||||
)
|
||||
from marconi.proxy.utils import round_robin
|
||||
from tests.unit.queues.transport.wsgi import base
|
||||
|
||||
|
||||
class TestBase(base.TestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(base.TestBase, self).setUp()
|
||||
config_filename = "wsgi_proxy_memory.conf"
|
||||
|
||||
self.proxy = bootstrap.Bootstrap()
|
||||
self.app = self.proxy.transport.app
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestBase, cls).setUpClass()
|
||||
TestBase._proxy = admin.Bootstrap()
|
||||
|
||||
TestBase._app = TestBase._proxy.transport.app
|
||||
partitions_controller = TestBase._proxy.storage.partitions_controller
|
||||
catalogue_controller = TestBase._proxy.storage.catalogue_controller
|
||||
cache = TestBase._proxy.cache
|
||||
selector = round_robin.Selector()
|
||||
|
||||
# NOTE(cpp-cabrera): allow for queue creation: needed for
|
||||
# catalogue tests
|
||||
TestBase._app.add_route(version.path() + '/queues/{queue}',
|
||||
queues.Resource(partitions_controller,
|
||||
catalogue_controller,
|
||||
cache, selector))
|
||||
|
||||
def setUp(self):
|
||||
super(TestBase, self).setUp()
|
||||
self.app = TestBase._app
|
||||
self.proxy = TestBase._proxy
|
||||
self.srmock = ftest.StartResponseMock()
|
||||
|
||||
|
||||
def make_app_daemon(host, port, app):
|
||||
httpd = simple_server.make_server(host, port, app)
|
||||
process = multiprocessing.Process(target=httpd.serve_forever,
|
||||
name='marconi_' + str(port))
|
||||
process.daemon = True
|
||||
process.start()
|
||||
|
||||
return process
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
super(TestBase, cls).tearDownClass()
|
||||
|
@ -1,99 +0,0 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
#
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
import random
|
||||
|
||||
import falcon
|
||||
|
||||
from marconi.proxy import bootstrap as proxy_bootstrap
|
||||
from marconi.queues import bootstrap
|
||||
|
||||
import base # noqa
|
||||
|
||||
|
||||
class CatalogTest(base.TestBase):
|
||||
|
||||
servers = []
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
ports = range(8900, 8903)
|
||||
cls.proxy = proxy_bootstrap.Bootstrap()
|
||||
app = bootstrap.Bootstrap().transport.app
|
||||
cls.servers = [base.make_app_daemon('localhost', pt, app)
|
||||
for pt in ports]
|
||||
# TODO(cpp-cabrera): allow trailing slash
|
||||
cls.urls = ['http://127.0.0.1:%d' % pt for pt in ports]
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
for p in cls.servers:
|
||||
p.terminate()
|
||||
|
||||
def tearDown(self):
|
||||
CatalogTest.proxy.cache.flush()
|
||||
CatalogTest.proxy.storage.catalogue_controller.drop_all()
|
||||
super(CatalogTest, self).tearDown()
|
||||
|
||||
def __add_partitions(self):
|
||||
for name, url, weight in zip(
|
||||
[server.name for server in self.servers],
|
||||
self.urls,
|
||||
random.sample(xrange(100), len(self.urls))):
|
||||
doc = {'hosts': [url], 'weight': weight}
|
||||
self.simulate_put('/v1/partitions/' + name,
|
||||
body=json.dumps(doc))
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
def test_simple(self):
|
||||
path = '/v1/catalogue'
|
||||
|
||||
# TODO(cpp-cabrera): use queue creating/deleting cmgrs
|
||||
queue_names = ['arakawa', 'bridge']
|
||||
|
||||
# No catalog created yet
|
||||
self.simulate_get(path)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
# TODO(cpp-cabrera): what if there is no partition?
|
||||
|
||||
self.__add_partitions()
|
||||
|
||||
# Queue is not touched
|
||||
result = self.simulate_get('/v1/catalogue/' + queue_names[0])
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
# Create queues (and implicitly, catalog)
|
||||
for name in queue_names:
|
||||
self.simulate_put('/v1/queues/' + name)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
result = self.simulate_get(path)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
doc = json.loads(result[0])
|
||||
|
||||
for name in queue_names:
|
||||
self.assertIn(name, doc)
|
||||
self.assertIn(doc[name]['host'], self.urls)
|
||||
|
||||
result = self.simulate_get('/v1/catalogue/' + name)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
each_doc = json.loads(result[0])
|
||||
self.assertEquals(each_doc, doc[name])
|
||||
|
||||
self.simulate_delete('/v1/queues/' + name)
|
89
tests/unit/proxy/test_catalogue.py
Normal file
89
tests/unit/proxy/test_catalogue.py
Normal file
@ -0,0 +1,89 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
#
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
import uuid
|
||||
|
||||
import falcon
|
||||
import httpretty
|
||||
|
||||
import base # noqa
|
||||
|
||||
|
||||
class CatalogueTest(base.TestBase):
|
||||
|
||||
servers = []
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(CatalogueTest, cls).setUpClass()
|
||||
|
||||
def setUp(self):
|
||||
super(CatalogueTest, self).setUp()
|
||||
self.host = 'http://localhost:8000'
|
||||
self.partition_name = str(uuid.uuid1())
|
||||
self.partition = '/v1/partitions/' + self.partition_name
|
||||
|
||||
# create a partition
|
||||
doc = {'weight': 100, 'hosts': [self.host]}
|
||||
self.simulate_put(self.partition, body=json.dumps(doc))
|
||||
|
||||
def tearDown(self):
|
||||
self.simulate_delete(self.partition)
|
||||
super(CatalogueTest, self).tearDown()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
super(CatalogueTest, cls).tearDownClass()
|
||||
|
||||
def test_list_empty(self):
|
||||
self.simulate_get('/v1/catalogue')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
@httpretty.activate
|
||||
def test_simple(self):
|
||||
queues = ['arakawa', 'bridge']
|
||||
|
||||
self.simulate_get('/v1/catalogue/' + queues[0])
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
# Create queues
|
||||
for name in queues:
|
||||
uri = '{0}/v1/queues/{1}'.format(self.host, name)
|
||||
httpretty.register_uri(httpretty.PUT, uri, status=201)
|
||||
|
||||
self.simulate_put('/v1/queues/' + name)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
for name in queues:
|
||||
# mock out forwarding
|
||||
uri = '{0}/v1/queues/{1}'.format(self.host, name)
|
||||
httpretty.register_uri(httpretty.DELETE, uri, status=204)
|
||||
|
||||
# fetch from the catalogue
|
||||
result = self.simulate_get('/v1/catalogue/' + name)
|
||||
data = json.loads(result[0])
|
||||
self.assertEqual(data['name'], name)
|
||||
self.assertEqual(data['partition'], self.partition_name)
|
||||
self.assertEqual(data['host'], self.host)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
# delete queues, implicitly removing from catalogue
|
||||
self.simulate_delete('/v1/queues/' + name)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
# ensure entries were removed from catalogue
|
||||
self.simulate_get('/v1/catalogue/' + name)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
@ -24,6 +24,10 @@ import base # noqa
|
||||
|
||||
class PartitionTest(base.TestBase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(PartitionTest, cls).setUpClass()
|
||||
|
||||
def setUp(self):
|
||||
super(PartitionTest, self).setUp()
|
||||
self.path = '/v1/partitions'
|
||||
@ -35,6 +39,10 @@ class PartitionTest(base.TestBase):
|
||||
self.proxy.storage.partitions_controller.drop_all()
|
||||
super(PartitionTest, self).tearDown()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
super(PartitionTest, cls).tearDownClass()
|
||||
|
||||
def test_simple(self):
|
||||
# No partition
|
||||
self.simulate_get(self.partition)
|
||||
|
Loading…
Reference in New Issue
Block a user