From 52b5dad078884f33b3ed23e271ca7de5839ee895 Mon Sep 17 00:00:00 2001 From: Alejandro Cabrera Date: Wed, 9 Oct 2013 12:14:09 -0400 Subject: [PATCH] feat: add shard management resource This patchset adds a new resource that exposes shard registry through the administrative API. Since there is no storage backend available that implements this functionality yet, the shards resource is not connected to the queues routes just yet. Preliminary support is added for unit testing this soon. Change-Id: I49793990327643fc2bc5091615fb0bebcef04bc0 Partially-implements: blueprint storage-sharding --- .../transport/wsgi}/utils.py | 24 ++- marconi/proxy/transport/wsgi/partitions.py | 26 +-- marconi/queues/storage/exceptions.py | 7 + marconi/queues/transport/wsgi/shards.py | 178 ++++++++++++++++++ .../unit/queues/transport/wsgi/test_shards.py | 29 +++ 5 files changed, 241 insertions(+), 23 deletions(-) rename marconi/{proxy/transport => common/transport/wsgi}/utils.py (61%) create mode 100644 marconi/queues/transport/wsgi/shards.py create mode 100644 tests/unit/queues/transport/wsgi/test_shards.py diff --git a/marconi/proxy/transport/utils.py b/marconi/common/transport/wsgi/utils.py similarity index 61% rename from marconi/proxy/transport/utils.py rename to marconi/common/transport/wsgi/utils.py index 9da786216..76a98859b 100644 --- a/marconi/proxy/transport/utils.py +++ b/marconi/common/transport/wsgi/utils.py @@ -13,12 +13,34 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""utils: utilities for transport handling.""" +"""utils: a set of utilities to help with transport handling details.""" import jsonschema +from marconi.openstack.common import log +from marconi.queues.transport import utils as json_utils from marconi.queues.transport.wsgi import exceptions as wsgi_errors +LOG = log.getLogger(__name__) + + +def load(req): + """Reads request body, raising an exception if it is not JSON. + + :param req: The request object to read from + :type req: falcon.Request + :return: a dictionary decoded from the JSON stream + :rtype: dict + :raises: wsgi_errors.HTTPBadRequestBody + """ + try: + return json_utils.read_json(req.stream, req.content_length) + except (json_utils.MalformedJSON, json_utils.OverflowedJSONInteger) as ex: + LOG.exception(ex) + raise wsgi_errors.HTTPBadRequestBody( + 'JSON could not be parsed.' + ) + # TODO(cpp-cabrera): generalize this def validate(validator, document): diff --git a/marconi/proxy/transport/wsgi/partitions.py b/marconi/proxy/transport/wsgi/partitions.py index da5dddc4a..aa30aa498 100644 --- a/marconi/proxy/transport/wsgi/partitions.py +++ b/marconi/proxy/transport/wsgi/partitions.py @@ -32,35 +32,17 @@ import falcon import jsonschema import six +from marconi.common.transport.wsgi import utils from marconi.openstack.common import log from marconi.proxy.storage import exceptions -from marconi.proxy.transport import schema, utils +from marconi.proxy.transport import schema from marconi.proxy.utils import lookup -from marconi.queues.transport import utils as json_utils from marconi.queues.transport.wsgi import exceptions as wsgi_errors LOG = log.getLogger(__name__) -def load(req): - """Reads request body, raising an exception if it is not JSON. - - :param req: The request object to read from - :type req: falcon.Request - :return: a dictionary decoded from the JSON stream - :rtype: dict - :raises: wsgi_errors.HTTPBadRequestBody - """ - try: - return json_utils.read_json(req.stream, req.content_length) - except (json_utils.MalformedJSON, json_utils.OverflowedJSONInteger) as ex: - LOG.exception(ex) - raise wsgi_errors.HTTPBadRequestBody( - 'JSON could not be parsed.' - ) - - class Listing(object): """A resource to list registered partition @@ -135,7 +117,7 @@ class Resource(object): response.status = falcon.HTTP_204 return - data = load(request) + data = utils.load(request) utils.validate(self._put_validator, data) self._ctrl.create(partition, weight=data['weight'], @@ -165,7 +147,7 @@ class Resource(object): """ LOG.debug('PATCH partition - name: {0}'.format(partition)) - data = load(request) + data = utils.load(request) if 'weight' not in data and 'hosts' not in data: LOG.debug('PATCH partition, bad params') diff --git a/marconi/queues/storage/exceptions.py b/marconi/queues/storage/exceptions.py index cf18e2591..37d370c39 100644 --- a/marconi/queues/storage/exceptions.py +++ b/marconi/queues/storage/exceptions.py @@ -106,3 +106,10 @@ class MessageIsClaimedBy(NotPermitted): msg = (u'Message %(mid)s is not claimed by %(cid)s' % dict(cid=cid, mid=mid)) super(MessageIsClaimedBy, self).__init__(msg) + + +class ShardDoesNotExist(DoesNotExist): + + def __init__(self, shard): + msg = u'Shard {0} does not exists'.format(shard) + super(ShardDoesNotExist, self).__init__(msg) diff --git a/marconi/queues/transport/wsgi/shards.py b/marconi/queues/transport/wsgi/shards.py new file mode 100644 index 000000000..015091729 --- /dev/null +++ b/marconi/queues/transport/wsgi/shards.py @@ -0,0 +1,178 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""shards: a resource to handle storage shard management + +A shard is added by an operator by interacting with the +sharding-related endpoints. When specifying a shard, the +following fields are required: + +{ + "name": string, + "weight": integer, + "location": string::uri +} + +Furthermore, depending on the underlying storage type of shard being +registered, there is an optional field: + +{ + "options": {...} +} +""" + +import falcon +import jsonschema + +from marconi.common.schemas import shards as schema +from marconi.common.transport.wsgi import utils +from marconi.openstack.common import log +from marconi.proxy.storage import exceptions +from marconi.queues.transport import utils as transport_utils +from marconi.queues.transport.wsgi import exceptions as wsgi_errors + +LOG = log.getLogger(__name__) + + +class Listing(object): + """A resource to list registered partition + + :param partitions_controller: means to interact with storage + """ + def __init__(self, partitions_controller): + self._ctrl = partitions_controller + + def on_get(self, request, response): + """Returns a partition listing as a JSON object: + + [ + {"name": "", "weight": 100, "location": ""}, + ... + ] + + :returns: HTTP | [200, 204] + """ + LOG.debug('LIST shards') + resp = list(self._ctrl.list()) + + if not resp: + response.status = falcon.HTTP_204 + return + + response.body = transport_utils.to_json(resp) + response.status = falcon.HTTP_200 + + +class Resource(object): + """A handler for individual partitions + + :param partitions_controller: means to interact with storage + """ + def __init__(self, shards_controller): + self._ctrl = shards_controller + validator_type = jsonschema.Draft4Validator + self._validators = { + 'weight': validator_type(schema.patch_weight), + 'location': validator_type(schema.patch_location), + 'options': validator_type(schema.patch_options), + 'create': validator_type(schema.create) + } + + def on_get(self, request, response, shard): + """Returns a JSON object for a single shard entry: + + {"weight": 100, "location": "", options: {...}} + + :returns: HTTP | [200, 404] + """ + LOG.debug('GET shard - name: {0}'.format(shard)) + data = None + try: + data = self._ctrl.get(shard) + except exceptions.ShardDoesNotExist as ex: + LOG.exception(ex) + raise falcon.HTTPNotFound() + + # remove the name entry - it isn't needed on GET + del data['name'] + response.body = transport_utils.to_json(data) + response.content_location = request.path + + def on_put(self, request, response, shard): + """Registers a new shard. Expects the following input: + + {"weight": 100, "location": ""} + + An options object may also be provided. + + :returns: HTTP | [201, 204] + """ + LOG.debug('PUT shard - name: {0}'.format(shard)) + if self._ctrl.exists(shard): + LOG.debug('Shard {0} already exists'.format(shard)) + response.status = falcon.HTTP_204 + return + + data = utils.load(request) + utils.validate(self._validators['create'], data) + self._ctrl.create(shard, weight=data['weight'], + location=data['location'], + options=data.get('options', {})) + response.status = falcon.HTTP_201 + response.location = request.path + + def on_delete(self, request, response, shard): + """Deregisters a shard. + + :returns: HTTP | 204 + """ + LOG.debug('DELETE shard - name: {0}'.format(shard)) + self._ctrl.delete(shard) + response.status = falcon.HTTP_204 + + def on_patch(self, request, response, shard): + """Allows one to update a shard's weight, location, and/or options. + + This method expects the user to submit a JSON object + containing atleast one of: 'hosts', 'weight', 'options'. If + none are found, the request is flagged as bad. There is also + strict format checking through the use of + jsonschema. Appropriate errors are returned in each case for + badly formatted input. + + :returns: HTTP | 200,400 + """ + LOG.debug('PATCH shard - name: {0}'.format(shard)) + data = utils.load(request) + + EXPECT = ('weight', 'location', 'options') + if not any([(field in data) for field in EXPECT]): + LOG.debug('PATCH shard, bad params') + raise wsgi_errors.HTTPBadRequestBody( + 'One of `location`, `weight`, or `options` needs ' + 'to be specified' + ) + + for field in EXPECT: + utils.validate(self._validators[field], data) + + try: + fields = dict((k, v) for k, v in data.items() + if k in EXPECT and v is not None) + + self._ctrl.update(shard, **fields) + except exceptions.ShardDoesNotExist as ex: + LOG.exception(ex) + raise falcon.HTTPNotFound() diff --git a/tests/unit/queues/transport/wsgi/test_shards.py b/tests/unit/queues/transport/wsgi/test_shards.py new file mode 100644 index 000000000..15edbd519 --- /dev/null +++ b/tests/unit/queues/transport/wsgi/test_shards.py @@ -0,0 +1,29 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import ddt + +import base # noqa + + +@ddt.ddt +class ShardsBaseTest(base.TestBase): + + def setUp(self): + super(ShardsBaseTest, self).setUp() + + def tearDown(self): + super(ShardsBaseTest, self).tearDown()