proxy: mirror structure of marconi queues + bootstrap

This patch adds smarter configuration to the proxy in two steps:

1. mirror the transport implementation used in marconi.queues in
   marconi.proxy
2. add a bootstrap file to take care of start up

Rationale: make configuration work, make deploying easy, make
alternate transport implementations feasible.

Another change: the unit tests are fixed by adding a few changes:
1. add drop functionality to the proxy storage interface
2. use drop/flush in test suite tearDown
3. rm tests.unit.test_config
4. delete queues at the end of the catalogue test (not yet robust)

The rationale for (3) was that test_config did not play nice with
other tests when they were registering their options, and failed as a
result. Furthermore, we should not need to test oslo.config.

Configuration changes: new fields in etc/marconi.conf
- drivers:proxy
- drivers:proxy:storage:{memory.mongodb}
- drivers:proxy:transport:wsgi
- oslo_cache

Also, fix: InternalServerError -> HTTPInternalServerError

Finally, redis was removed from requirements.txt.

Change-Id: If2365a1a738a3975fe6bde7bd07dfdee3460cecd
Implements: blueprint placement-service
This commit is contained in:
Alejandro Cabrera 2013-09-19 11:01:17 -04:00
parent df98666411
commit 08c639019c
33 changed files with 477 additions and 201 deletions

View File

@ -30,6 +30,7 @@ storage = mongodb
# transport and storage drivers for use with marconi-proxy
[drivers:proxy]
storage = mongodb
transport = wsgi
[drivers:transport:wsgi]
;bind = 0.0.0.0
@ -43,6 +44,10 @@ storage = mongodb
;[drivers:transport:zmq]
;port = 9999
;[drivers:proxy:transport:wsgi]
;bind = 0.0.0.0
;port = 8889
[drivers:storage:mongodb]
uri = mongodb://db1.example.net,db2.example.net:2500/?replicaSet=test&ssl=true&w=majority
database = marconi

View File

@ -1,127 +0,0 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""marconi-proxy: maintains a mapping from inserted queues to partitions.
Supports the following operator API:
- [GET] /v1/partitions - lists registered partitions
- [PUT|GET|DELETE] /v1/partitions/{partition}
- [GET] /v1/catalogue
Running:
- configure marconi.conf appropriately
- gunicorn marconi.proxy.app:app
"""
import falcon
from oslo.config import cfg
from stevedore import driver
from marconi.common.cache import cache
from marconi.common import config
from marconi.common import exceptions
from marconi.proxy.resources import catalogue
from marconi.proxy.resources import forward
from marconi.proxy.resources import health
from marconi.proxy.resources import metadata
from marconi.proxy.resources import partitions
from marconi.proxy.resources import queues
from marconi.proxy.resources import v1
from marconi.proxy.utils import round_robin
# TODO(cpp-cabrera): wrap all this up in a nice bootstrap.py
# TODO(cpp-cabrera): mirror marconi.queues.transport with this
# for nicer deployments (and eventual
# proxy multi-transport support!)
PROJECT_CFG = config.project('marconi')
CFG = config.namespace('drivers:proxy').from_options(
transport='wsgi',
storage='memory')
# TODO(cpp-cabrera): need to wrap this in a bootstrap class to defer
# loading of config until it is run in a WSGI
# context, otherwise, it breaks the test suite.
if __name__ == '__main__':
PROJECT_CFG.load()
app = falcon.API()
try:
storage = driver.DriverManager('marconi.proxy.storage',
CFG.storage,
invoke_on_load=True)
except RuntimeError as exc:
raise exceptions.InvalidDriver(exc)
catalogue_driver = storage.driver.catalogue_controller
partitions_driver = storage.driver.partitions_controller
cache_driver = cache.get_cache(cfg.CONF)
selector = round_robin.Selector()
# TODO(cpp-cabrera): don't encode API version in routes -
# let's handle this elsewhere
# NOTE(cpp-cabrera): Proxy-specific routes
app.add_route('/v1/partitions',
partitions.Listing(partitions_driver))
app.add_route('/v1/partitions/{partition}',
partitions.Resource(partitions_driver))
app.add_route('/v1/catalogue',
catalogue.Listing(catalogue_driver))
app.add_route('/v1/catalogue/{queue}',
catalogue.Resource(catalogue_driver))
app.add_route('/v1/health',
health.Resource())
# NOTE(cpp-cabrera): queue handling routes
app.add_route('/v1/queues',
queues.Listing(catalogue_driver))
app.add_route('/v1/queues/{queue}',
queues.Resource(partitions_driver, catalogue_driver,
cache_driver, selector))
# NOTE(cpp-cabrera): Marconi forwarded routes
app.add_route('/v1',
v1.Resource(partitions_driver))
# NOTE(cpp-cabrera): Marconi forwarded routes involving a queue
app.add_route('/v1/queues/{queue}/claims',
forward.ClaimCreate(partitions_driver,
catalogue_driver,
cache_driver, selector))
app.add_route('/v1/queues/{queue}/claims/{cid}',
forward.Claim(partitions_driver,
catalogue_driver,
cache_driver, selector))
app.add_route('/v1/queues/{queue}/messages',
forward.MessageBulk(partitions_driver,
catalogue_driver,
cache_driver, selector))
app.add_route('/v1/queues/{queue}/messages/{mid}',
forward.Message(partitions_driver,
catalogue_driver, cache_driver, selector))
app.add_route('/v1/queues/{queue}/stats',
forward.Stats(partitions_driver,
catalogue_driver,
cache_driver, selector))
app.add_route('/v1/queues/{queue}/metadata',
metadata.Resource(partitions_driver,
catalogue_driver,
cache_driver, selector))

View File

@ -0,0 +1,83 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from stevedore import driver
from marconi.common.cache import cache as oslo_cache
from marconi.common import config
from marconi.common import decorators
from marconi.common import exceptions
from marconi.openstack.common import log
from marconi.proxy import transport # NOQA
PROJECT_CFG = config.project('marconi')
CFG = config.namespace('drivers:proxy').from_options(
transport='wsgi',
storage='memory')
LOG = log.getLogger(__name__)
class Bootstrap(object):
"""Defines the Marconi proxy bootstrapper.
The bootstrap loads up drivers per a given configuration, and
manages their lifetimes.
"""
def __init__(self, config_file=None, cli_args=None):
PROJECT_CFG.load(filename=config_file, args=cli_args)
log.setup('marconi_proxy')
@decorators.lazy_property(write=False)
def storage(self):
LOG.debug(_(u'Loading Proxy Storage Driver'))
try:
mgr = driver.DriverManager('marconi.proxy.storage',
CFG.storage,
invoke_on_load=True)
return mgr.driver
except RuntimeError as exc:
LOG.exception(exc)
raise exceptions.InvalidDriver(exc)
@decorators.lazy_property(write=False)
def cache(self):
LOG.debug(_(u'Loading Proxy Cache Driver'))
try:
mgr = oslo_cache.get_cache(cfg.CONF)
return mgr
except RuntimeError as exc:
LOG.exception(exc)
raise exceptions.InvalidDriver(exc)
@decorators.lazy_property(write=False)
def transport(self):
LOG.debug(_(u'Loading Proxy Transport Driver'))
try:
mgr = driver.DriverManager('marconi.proxy.transport',
CFG.transport,
invoke_on_load=True,
invoke_args=[self.storage,
self.cache])
return mgr.driver
except RuntimeError as exc:
LOG.exception(exc)
raise exceptions.InvalidDriver(exc)
def run(self):
self.transport.listen()

View File

@ -91,6 +91,11 @@ class PartitionsBase(ControllerBase):
"""Removes a partition from storage."""
raise NotImplementedError
@abc.abstractmethod
def drop_all(self):
"""Drops all partitions from storage."""
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class CatalogueBase(ControllerBase):
@ -168,3 +173,8 @@ class CatalogueBase(ControllerBase):
:param metadata: A dictionary of metadata for this queue
"""
raise NotImplementedError
@abc.abstractmethod
def drop_all(self):
"""Drops all catalogue entries from storage."""
raise NotImplementedError

View File

@ -19,7 +19,7 @@ from marconi.proxy.storage import exceptions
def _idx(project, queue):
return project + '.' + queue
return project + '/' + queue
class CatalogueController(base.CatalogueBase):
@ -44,7 +44,7 @@ class CatalogueController(base.CatalogueBase):
return _normalize(entry)
def exists(self, project, queue):
return self._col.get(_idx(project, queue)) is not None
return _idx(project, queue) in self._col
def insert(self, project, queue, partition, host, metadata={}):
self._col[_idx(project, queue)] = {
@ -64,6 +64,9 @@ class CatalogueController(base.CatalogueBase):
except KeyError:
pass
def drop_all(self):
self._col = {}
def _normalize(entry):
return {

View File

@ -12,6 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from marconi.proxy.storage import base
@ -39,7 +40,7 @@ class PartitionsController(base.PartitionsBase):
return _normalize(entry)
def exists(self, name):
return self._col.get(name) is not None
return name in self._col
def create(self, name, weight, hosts):
self._col[name] = {'n': name,
@ -52,6 +53,9 @@ class PartitionsController(base.PartitionsBase):
except KeyError:
pass
def drop_all(self):
self._col = {}
def _normalize(entry):
return {

View File

@ -1,3 +1,17 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MongoDB Proxy Storage Driver for Marconi"""
from marconi.proxy.storage.mongodb import driver

View File

@ -93,6 +93,11 @@ class CatalogueController(base.CatalogueBase):
{'$set': {'m': metadata}},
multi=False, manipulate=False)
@utils.raises_conn_error
def drop_all(self):
self._col.drop()
self._col.ensure_index(CATALOGUE_INDEX, unique=True)
def _normalize(entry):
return {

View File

@ -79,6 +79,11 @@ class PartitionsController(base.PartitionsBase):
def delete(self, name):
self._col.remove({'n': name}, w=0)
@utils.raises_conn_error
def drop_all(self):
self._col.drop()
self._col.ensure_index(PARTITIONS_INDEX, unique=True)
def _normalize(entry):
return {

View File

@ -0,0 +1,9 @@
"""Marconi Proxy Transport Drivers"""
from marconi.common import config
from marconi.proxy.transport import base
CFG = config.project('marconi').from_options()
# Hoist into package namespace
DriverBase = base.DriverBase

View File

@ -0,0 +1,37 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class DriverBase(object):
"""Base class for Proxy Transport Drivers to document the expected
interface.
:param storage: The storage driver
:param cache: The cache driver
"""
def __init__(self, storage, cache):
self.storage = storage
self.cache = cache
@abc.abstractmethod
def listen():
"""Start listening for client requests (self-hosting mode)."""
raise NotImplementedError

View File

@ -0,0 +1,6 @@
"""WSGI Proxy Transport Driver"""
from marconi.queues.transport.wsgi import driver
# Hoist into package namespace
Driver = driver.Driver

View File

@ -0,0 +1,31 @@
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""WSGI App for WSGI Containers
This app should be used by external WSGI
containers. For example:
$ gunicorn marconi.proxy.transport.wsgi.app:app
NOTE: As for external containers, it is necessary
to put config files in the standard paths. There's
no common way to specify / pass configuration files
to the WSGI app when it is called from other apps.
"""
from marconi.proxy import bootstrap
app = bootstrap.Bootstrap().transport.app

View File

@ -23,7 +23,10 @@ from marconi.proxy.utils import helpers
class Listing(object):
"""A listing of all entries in the catalogue."""
"""A listing of all entries in the catalogue
:param catalogue_controller: handles storage details
"""
def __init__(self, catalogue_controller):
self._catalogue = catalogue_controller
@ -43,7 +46,10 @@ class Listing(object):
class Resource(object):
"""A single catalogue entry."""
"""A single catalogue entry
:param catalogue_controller: handles storage details
"""
def __init__(self, catalogue_controller):
self._catalogue = catalogue_controller
@ -55,6 +61,5 @@ class Resource(object):
except exceptions.EntryNotFound:
raise falcon.HTTPNotFound()
resp = entry
response.status = falcon.HTTP_200
response.body = json.dumps(resp, ensure_ascii=False)
response.body = json.dumps(entry, ensure_ascii=False)

View File

@ -0,0 +1,163 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""marconi-proxy: maintains a mapping from inserted queues to partitions
Supports the following operator API:
- [GET] /v1/partitions - lists registered partitions
- [PUT|GET|DELETE] /v1/partitions/{partition}
- [GET] /v1/catalogue
Running:
- configure marconi.conf appropriately
- gunicorn marconi.proxy.transport.wsgi.app:app
"""
from wsgiref import simple_server
import falcon
from marconi.common import config
import marconi.openstack.common.log as logging
from marconi.proxy import transport
from marconi.proxy.transport.wsgi import (
catalogue, forward, health, metadata,
partitions, queues, v1, version
)
from marconi.proxy.utils import round_robin
from marconi.queues.transport import auth
_VER = version.path()
OPTIONS = {
'bind': '0.0.0.0',
'port': 8889
}
PROJECT_CFG = config.project('marconi')
GLOBAL_CFG = PROJECT_CFG.from_options()
WSGI_CFG = config.namespace(
'drivers:proxy:transport:wsgi'
).from_options(**OPTIONS)
LOG = logging.getLogger(__name__)
# TODO(cpp-cabrera): refactor to avoid duplication with queues..wsgi
def _check_media_type(req, resp, params):
if not req.client_accepts('application/json'):
raise falcon.HTTPNotAcceptable(
u'''
Endpoint only serves `application/json`; specify client-side
media type support with the "Accept" header.''',
href=u'http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html',
href_text=u'14.1 Accept, Hypertext Transfer Protocol -- HTTP/1.1')
class Driver(transport.DriverBase):
"""Entry point to the proxy
:param storage: storage driver to use
:param cache: cache driver to use
"""
def __init__(self, storage, cache):
super(Driver, self).__init__(storage, cache)
self.app = None
self._catalogue = self.storage.catalogue_controller
self._partitions = self.storage.partitions_controller
self._selector = round_robin.Selector()
self._init_routes()
self._init_middleware()
def _init_routes(self):
self.app = falcon.API(before=[_check_media_type])
# NOTE(cpp-cabrera): proxy-specififc routes
self.app.add_route(_VER + '/partitions',
partitions.Listing(self._partitions))
self.app.add_route(_VER + '/partitions/{partition}',
partitions.Resource(self._partitions))
self.app.add_route(_VER + '/catalogue',
catalogue.Listing(self._catalogue))
self.app.add_route(_VER + '/catalogue/{queue}',
catalogue.Resource(self._catalogue))
self.app.add_route(_VER + '/health',
health.Resource())
# NOTE(cpp-cabrera): queue handling routes
self.app.add_route(_VER + '/queues',
queues.Listing(self._catalogue))
self.app.add_route(_VER + '/queues/{queue}',
queues.Resource(self._partitions,
self._catalogue,
self.cache, self._selector))
# NOTE(cpp-cabrera): Marconi forwarded routes
self.app.add_route(_VER,
v1.Resource(self._partitions))
# NOTE(cpp-cabrera): Marconi forwarded routes involving a queue
self.app.add_route(_VER + '/queues/{queue}/claims',
forward.ClaimCreate(self._partitions,
self._catalogue,
self.cache,
self._selector))
self.app.add_route(_VER + '/queues/{queue}/claims/{cid}',
forward.Claim(self._partitions,
self._catalogue,
self.cache, self._selector))
self.app.add_route(_VER + '/queues/{queue}/messages',
forward.MessageBulk(self._partitions,
self._catalogue,
self.cache,
self._selector))
self.app.add_route(_VER + '/queues/{queue}/messages/{mid}',
forward.Message(self._partitions,
self._catalogue, self.cache,
self._selector))
self.app.add_route(_VER + '/queues/{queue}/stats',
forward.Stats(self._partitions,
self._catalogue,
self.cache, self._selector))
self.app.add_route(_VER + '/queues/{queue}/metadata',
metadata.Resource(self._partitions,
self._catalogue,
self.cache, self._selector))
# TODO(cpp-cabrera): refactor to avoid duplication with queues..wsgi
def _init_middleware(self):
"""Initialize WSGI middlewarez."""
# NOTE(flaper87): Install Auth
if GLOBAL_CFG.auth_strategy:
strategy = auth.strategy(GLOBAL_CFG.auth_strategy)
self.app = strategy.install(self.app, PROJECT_CFG.conf)
def listen(self):
"""Listens on the 'bind:port' as per the config."""
msg = _(u'Serving on host {bind}:{port}').format(
bind=WSGI_CFG.bind, port=WSGI_CFG.port
)
LOG.info(msg)
httpd = simple_server.make_server(WSGI_CFG.bind, WSGI_CFG.port,
self.app)
httpd.serve_forever()

View File

@ -32,7 +32,10 @@ from marconi.proxy.storage import exceptions
class Listing(object):
"""A listing of all partitions registered."""
"""A resource to list registered partition
:param partitions_controller: means to interact with storage
"""
def __init__(self, partitions_controller):
self._ctrl = partitions_controller
@ -60,7 +63,10 @@ class Listing(object):
class Resource(object):
"""A means to interact with individual partitions."""
"""A handler for individual partitions
:param partitions_controller: means to interact with storage
"""
def __init__(self, partitions_controller):
self._ctrl = partitions_controller

View File

@ -40,7 +40,9 @@ from marconi.proxy.utils import partition
class Listing(object):
"""Responsible for constructing a valid marconi queue listing
from the content stored in the catalogue.
from the content stored in the catalogue
:param catalogue_controller: storage driver to use
"""
def __init__(self, catalogue_controller):
self._catalogue = catalogue_controller
@ -86,6 +88,15 @@ class Listing(object):
class Resource(forward.ForwardMixin):
"""Forwards queue requests to marconi queues API and updates the
catalogue
:param partitions_controller: partitions storage driver
:param catalogue_conroller: catalogue storage driver
:param cache: caching driver
:param selector: algorithm to use to select next node
:param methods: HTTP methods to automatically derive from mixin
"""
def __init__(self, partitions_controller, catalogue_controller,
cache, selector):
self._partitions = partitions_controller
@ -103,9 +114,7 @@ class Resource(forward.ForwardMixin):
catalogue. This should also be the only time
partition.weighted_select is ever called.
:raises: HTTPInternalServerError - if no partitions are
registered
:raises: HTTPInternalServerError - if no partitions are registered
"""
target = partition.weighted_select(self._partitions.list())
if target is None:

View File

@ -20,6 +20,10 @@ from marconi.proxy.utils import http
class Resource(object):
"""Forwards homedoc requests to marconi queues API
:param partitions_controller: controller for handling partitions
"""
def __init__(self, partitions_controller):
self._partitions = partitions_controller

View File

@ -0,0 +1,32 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""version: version information for the proxy transport API."""
def info():
"""Returns the API version as a tuple.
:rtype: (int, int)
"""
return (1, 0)
def path():
"""Returns the API version as /v{version}.
:returns: /v{version}
:rtype: text
"""
return '/v{0}'.format(info()[0])

View File

@ -21,18 +21,17 @@ from marconi.proxy.utils import lookup
class ForwardMixin(object):
"""Implements falcon-compatible forwarding for resources."""
"""Implements falcon-compatible forwarding for resources
:param partitions_controller: talks to partitions storage
:param catalogue_controller: talks to catalogue storage
:param cache: localized, fast lookups
:param selector: @see utils.round_robin - host selection order
:param methods: [text] - allowed methods, e.g., ['get', 'post']
"""
def __init__(self, partitions_controller, catalogue_controller,
cache, selector, methods):
"""Initializes a forwarding resource.
:param partitions_controller: talks to partitions storage
:param catalogue_controller: talks to catalogue storage
:param cache: localized, fast lookups
:param selector: @see utils.round_robin - host selection order
:param methods: [text] - allowed methods, e.g., ['get', 'post']
"""
self._catalogue = catalogue_controller
self._partitions = partitions_controller
self._cache = cache

View File

@ -19,6 +19,7 @@ import requests
def get_project(request):
"""Retrieves the Project-Id header from a request.
:param request: falcon.Request
:returns: The Project-Id value or '_' if not provided
"""
return request.get_header('x_project_id') or '_'
@ -27,6 +28,8 @@ def get_project(request):
def forward(host, request):
"""Forwards a request.
:param host: str - URL to host to use
:param request: falcon.Request
:returns: a python-requests response object
"""
url = host + request.path

View File

@ -30,6 +30,7 @@ _code_map = dict((int(v.split()[0]), v)
def status(code):
"""Maps an integer HTTP status code to a friendly HTTP status message
:param code: int - HTTP status code
:raises: KeyError for an unknown HTTP status code
"""
return _code_map[code]

View File

@ -23,7 +23,7 @@ class Selector(object):
def next(self, name, hosts):
"""Round robin selection of hosts
:param name: str - name to associate this list with
:param name: text - name to associate this list with
:param hosts: [a] - list of things to round robin. In the context
of Marconi, this is a list of URLs.
"""

View File

@ -58,7 +58,8 @@ def partitions(controller, count):
"""context_manager: Creates `count` partitions in storage,
and deletes them once this goes out of scope.
:param partitions_controller:
:param controller:
:param count: int - number of partitions to create
:returns: [(str, int, [str])] - names, weights, hosts
"""
spec = [(six.text_type(uuid.uuid1()), i,
@ -98,6 +99,7 @@ def entry(controller, project, queue, partition, host, metadata={}):
:param queue: str - name of queue
:param partition: str - associated partition
:param host: str - representative host
:param metadata: dict - metadata representation for this entry
:returns: (str, str, str, str, dict) - (project, queue, part, host, meta)
"""
controller.insert(project, queue, partition, host, metadata)
@ -111,6 +113,7 @@ def entries(controller, count):
deletes them once the context manager goes out of scope.
:param controller: storage handler
:param count: int - number of entries to create
:returns: [(str, str, str, str)] - [(project, queue, partition, host)]
"""
spec = [(u'_', six.text_type(uuid.uuid1()), six.text_type(i),

View File

@ -41,6 +41,9 @@ marconi.proxy.storage =
memory = marconi.proxy.storage.memory.driver:Driver
mongodb = marconi.proxy.storage.mongodb.driver:Driver
marconi.proxy.transport =
wsgi = marconi.proxy.transport.wsgi.driver:Driver
[nosetests]
where=tests
verbosity=2

View File

@ -15,24 +15,21 @@
# limitations under the License.
import multiprocessing
import os
from wsgiref import simple_server
from falcon import testing as ftest
from marconi.proxy import app
from marconi.proxy import bootstrap
from tests.unit.queues.transport.wsgi import base
class TestBase(base.TestBase):
def setUp(self):
if not os.environ.get('REDIS_TEST_LIVE'):
self.skipTest('No Redis instance running')
super(base.TestBase, self).setUp()
self.app = app.app
self.proxy = bootstrap.Bootstrap()
self.app = self.proxy.transport.app
self.srmock = ftest.StartResponseMock()

View File

@ -13,12 +13,12 @@
#
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import random
import falcon
from marconi.proxy import bootstrap as proxy_bootstrap
from marconi.queues import bootstrap
import base # noqa
@ -31,6 +31,7 @@ class CatalogTest(base.TestBase):
@classmethod
def setUpClass(cls):
ports = range(8900, 8903)
cls.proxy = proxy_bootstrap.Bootstrap()
app = bootstrap.Bootstrap().transport.app
cls.servers = [base.make_app_daemon('localhost', pt, app)
for pt in ports]
@ -43,10 +44,8 @@ class CatalogTest(base.TestBase):
p.terminate()
def tearDown(self):
for server in self.servers:
self.simulate_delete('/v1/partitions/' + server.name)
# TODO(zyuan): use a storage API call to cleanup the catalogs
CatalogTest.proxy.cache.flush()
CatalogTest.proxy.storage.catalogue_controller.drop_all()
super(CatalogTest, self).tearDown()
def __add_partitions(self):
@ -61,6 +60,8 @@ class CatalogTest(base.TestBase):
def test_simple(self):
path = '/v1/catalogue'
# TODO(cpp-cabrera): use queue creating/deleting cmgrs
queue_names = ['arakawa', 'bridge']
# No catalog created yet
@ -94,3 +95,5 @@ class CatalogTest(base.TestBase):
each_doc = json.loads(result[0])
self.assertEquals(each_doc, doc[name])
self.simulate_delete('/v1/queues/' + name)

View File

@ -31,6 +31,8 @@ class PartitionTest(base.TestBase):
def tearDown(self):
self.simulate_delete(self.partition)
self.proxy.cache.flush()
self.proxy.storage.partitions_controller.drop_all()
super(PartitionTest, self).tearDown()
def test_simple(self):

View File

@ -1,39 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.common import config
from marconi import tests as testing
PROJECT_CONFIG = config.project()
CFG = PROJECT_CONFIG.from_options(
without_help=3,
with_help=(None, 'nonsense'))
class TestConfig(testing.TestBase):
def test_cli(self):
args = ['--with_help', 'sense']
PROJECT_CONFIG.load(self.conf_path('wsgi_sqlite.conf'), args)
self.assertEquals(CFG.with_help, 'sense')
PROJECT_CONFIG.load(args=[])
self.assertEquals(CFG.with_help, None)
def test_wrong_type(self):
ns = config.namespace('local')
with testing.expect(config.cfg.Error):
ns.from_options(opt={})