feat: add shard management resource
This patchset adds a new resource that exposes shard registry through the administrative API. Since there is no storage backend available that implements this functionality yet, the shards resource is not connected to the queues routes just yet. Preliminary support is added for unit testing this soon. Change-Id: I49793990327643fc2bc5091615fb0bebcef04bc0 Partially-implements: blueprint storage-sharding
This commit is contained in:
parent
13d7974cb9
commit
52b5dad078
@ -13,12 +13,34 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""utils: utilities for transport handling."""
|
||||
"""utils: a set of utilities to help with transport handling details."""
|
||||
|
||||
import jsonschema
|
||||
|
||||
from marconi.openstack.common import log
|
||||
from marconi.queues.transport import utils as json_utils
|
||||
from marconi.queues.transport.wsgi import exceptions as wsgi_errors
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def load(req):
|
||||
"""Reads request body, raising an exception if it is not JSON.
|
||||
|
||||
:param req: The request object to read from
|
||||
:type req: falcon.Request
|
||||
:return: a dictionary decoded from the JSON stream
|
||||
:rtype: dict
|
||||
:raises: wsgi_errors.HTTPBadRequestBody
|
||||
"""
|
||||
try:
|
||||
return json_utils.read_json(req.stream, req.content_length)
|
||||
except (json_utils.MalformedJSON, json_utils.OverflowedJSONInteger) as ex:
|
||||
LOG.exception(ex)
|
||||
raise wsgi_errors.HTTPBadRequestBody(
|
||||
'JSON could not be parsed.'
|
||||
)
|
||||
|
||||
|
||||
# TODO(cpp-cabrera): generalize this
|
||||
def validate(validator, document):
|
@ -32,35 +32,17 @@ import falcon
|
||||
import jsonschema
|
||||
import six
|
||||
|
||||
from marconi.common.transport.wsgi import utils
|
||||
from marconi.openstack.common import log
|
||||
from marconi.proxy.storage import exceptions
|
||||
from marconi.proxy.transport import schema, utils
|
||||
from marconi.proxy.transport import schema
|
||||
from marconi.proxy.utils import lookup
|
||||
from marconi.queues.transport import utils as json_utils
|
||||
from marconi.queues.transport.wsgi import exceptions as wsgi_errors
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def load(req):
|
||||
"""Reads request body, raising an exception if it is not JSON.
|
||||
|
||||
:param req: The request object to read from
|
||||
:type req: falcon.Request
|
||||
:return: a dictionary decoded from the JSON stream
|
||||
:rtype: dict
|
||||
:raises: wsgi_errors.HTTPBadRequestBody
|
||||
"""
|
||||
try:
|
||||
return json_utils.read_json(req.stream, req.content_length)
|
||||
except (json_utils.MalformedJSON, json_utils.OverflowedJSONInteger) as ex:
|
||||
LOG.exception(ex)
|
||||
raise wsgi_errors.HTTPBadRequestBody(
|
||||
'JSON could not be parsed.'
|
||||
)
|
||||
|
||||
|
||||
class Listing(object):
|
||||
"""A resource to list registered partition
|
||||
|
||||
@ -135,7 +117,7 @@ class Resource(object):
|
||||
response.status = falcon.HTTP_204
|
||||
return
|
||||
|
||||
data = load(request)
|
||||
data = utils.load(request)
|
||||
utils.validate(self._put_validator, data)
|
||||
self._ctrl.create(partition,
|
||||
weight=data['weight'],
|
||||
@ -165,7 +147,7 @@ class Resource(object):
|
||||
|
||||
"""
|
||||
LOG.debug('PATCH partition - name: {0}'.format(partition))
|
||||
data = load(request)
|
||||
data = utils.load(request)
|
||||
|
||||
if 'weight' not in data and 'hosts' not in data:
|
||||
LOG.debug('PATCH partition, bad params')
|
||||
|
@ -106,3 +106,10 @@ class MessageIsClaimedBy(NotPermitted):
|
||||
msg = (u'Message %(mid)s is not claimed by %(cid)s' %
|
||||
dict(cid=cid, mid=mid))
|
||||
super(MessageIsClaimedBy, self).__init__(msg)
|
||||
|
||||
|
||||
class ShardDoesNotExist(DoesNotExist):
|
||||
|
||||
def __init__(self, shard):
|
||||
msg = u'Shard {0} does not exists'.format(shard)
|
||||
super(ShardDoesNotExist, self).__init__(msg)
|
||||
|
178
marconi/queues/transport/wsgi/shards.py
Normal file
178
marconi/queues/transport/wsgi/shards.py
Normal file
@ -0,0 +1,178 @@
|
||||
# Copyright (c) 2013 Rackspace Hosting, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""shards: a resource to handle storage shard management
|
||||
|
||||
A shard is added by an operator by interacting with the
|
||||
sharding-related endpoints. When specifying a shard, the
|
||||
following fields are required:
|
||||
|
||||
{
|
||||
"name": string,
|
||||
"weight": integer,
|
||||
"location": string::uri
|
||||
}
|
||||
|
||||
Furthermore, depending on the underlying storage type of shard being
|
||||
registered, there is an optional field:
|
||||
|
||||
{
|
||||
"options": {...}
|
||||
}
|
||||
"""
|
||||
|
||||
import falcon
|
||||
import jsonschema
|
||||
|
||||
from marconi.common.schemas import shards as schema
|
||||
from marconi.common.transport.wsgi import utils
|
||||
from marconi.openstack.common import log
|
||||
from marconi.proxy.storage import exceptions
|
||||
from marconi.queues.transport import utils as transport_utils
|
||||
from marconi.queues.transport.wsgi import exceptions as wsgi_errors
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Listing(object):
|
||||
"""A resource to list registered partition
|
||||
|
||||
:param partitions_controller: means to interact with storage
|
||||
"""
|
||||
def __init__(self, partitions_controller):
|
||||
self._ctrl = partitions_controller
|
||||
|
||||
def on_get(self, request, response):
|
||||
"""Returns a partition listing as a JSON object:
|
||||
|
||||
[
|
||||
{"name": "", "weight": 100, "location": ""},
|
||||
...
|
||||
]
|
||||
|
||||
:returns: HTTP | [200, 204]
|
||||
"""
|
||||
LOG.debug('LIST shards')
|
||||
resp = list(self._ctrl.list())
|
||||
|
||||
if not resp:
|
||||
response.status = falcon.HTTP_204
|
||||
return
|
||||
|
||||
response.body = transport_utils.to_json(resp)
|
||||
response.status = falcon.HTTP_200
|
||||
|
||||
|
||||
class Resource(object):
|
||||
"""A handler for individual partitions
|
||||
|
||||
:param partitions_controller: means to interact with storage
|
||||
"""
|
||||
def __init__(self, shards_controller):
|
||||
self._ctrl = shards_controller
|
||||
validator_type = jsonschema.Draft4Validator
|
||||
self._validators = {
|
||||
'weight': validator_type(schema.patch_weight),
|
||||
'location': validator_type(schema.patch_location),
|
||||
'options': validator_type(schema.patch_options),
|
||||
'create': validator_type(schema.create)
|
||||
}
|
||||
|
||||
def on_get(self, request, response, shard):
|
||||
"""Returns a JSON object for a single shard entry:
|
||||
|
||||
{"weight": 100, "location": "", options: {...}}
|
||||
|
||||
:returns: HTTP | [200, 404]
|
||||
"""
|
||||
LOG.debug('GET shard - name: {0}'.format(shard))
|
||||
data = None
|
||||
try:
|
||||
data = self._ctrl.get(shard)
|
||||
except exceptions.ShardDoesNotExist as ex:
|
||||
LOG.exception(ex)
|
||||
raise falcon.HTTPNotFound()
|
||||
|
||||
# remove the name entry - it isn't needed on GET
|
||||
del data['name']
|
||||
response.body = transport_utils.to_json(data)
|
||||
response.content_location = request.path
|
||||
|
||||
def on_put(self, request, response, shard):
|
||||
"""Registers a new shard. Expects the following input:
|
||||
|
||||
{"weight": 100, "location": ""}
|
||||
|
||||
An options object may also be provided.
|
||||
|
||||
:returns: HTTP | [201, 204]
|
||||
"""
|
||||
LOG.debug('PUT shard - name: {0}'.format(shard))
|
||||
if self._ctrl.exists(shard):
|
||||
LOG.debug('Shard {0} already exists'.format(shard))
|
||||
response.status = falcon.HTTP_204
|
||||
return
|
||||
|
||||
data = utils.load(request)
|
||||
utils.validate(self._validators['create'], data)
|
||||
self._ctrl.create(shard, weight=data['weight'],
|
||||
location=data['location'],
|
||||
options=data.get('options', {}))
|
||||
response.status = falcon.HTTP_201
|
||||
response.location = request.path
|
||||
|
||||
def on_delete(self, request, response, shard):
|
||||
"""Deregisters a shard.
|
||||
|
||||
:returns: HTTP | 204
|
||||
"""
|
||||
LOG.debug('DELETE shard - name: {0}'.format(shard))
|
||||
self._ctrl.delete(shard)
|
||||
response.status = falcon.HTTP_204
|
||||
|
||||
def on_patch(self, request, response, shard):
|
||||
"""Allows one to update a shard's weight, location, and/or options.
|
||||
|
||||
This method expects the user to submit a JSON object
|
||||
containing atleast one of: 'hosts', 'weight', 'options'. If
|
||||
none are found, the request is flagged as bad. There is also
|
||||
strict format checking through the use of
|
||||
jsonschema. Appropriate errors are returned in each case for
|
||||
badly formatted input.
|
||||
|
||||
:returns: HTTP | 200,400
|
||||
"""
|
||||
LOG.debug('PATCH shard - name: {0}'.format(shard))
|
||||
data = utils.load(request)
|
||||
|
||||
EXPECT = ('weight', 'location', 'options')
|
||||
if not any([(field in data) for field in EXPECT]):
|
||||
LOG.debug('PATCH shard, bad params')
|
||||
raise wsgi_errors.HTTPBadRequestBody(
|
||||
'One of `location`, `weight`, or `options` needs '
|
||||
'to be specified'
|
||||
)
|
||||
|
||||
for field in EXPECT:
|
||||
utils.validate(self._validators[field], data)
|
||||
|
||||
try:
|
||||
fields = dict((k, v) for k, v in data.items()
|
||||
if k in EXPECT and v is not None)
|
||||
|
||||
self._ctrl.update(shard, **fields)
|
||||
except exceptions.ShardDoesNotExist as ex:
|
||||
LOG.exception(ex)
|
||||
raise falcon.HTTPNotFound()
|
29
tests/unit/queues/transport/wsgi/test_shards.py
Normal file
29
tests/unit/queues/transport/wsgi/test_shards.py
Normal file
@ -0,0 +1,29 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
#
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import ddt
|
||||
|
||||
import base # noqa
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class ShardsBaseTest(base.TestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(ShardsBaseTest, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(ShardsBaseTest, self).tearDown()
|
Loading…
Reference in New Issue
Block a user