Reuse identical API v2 code for v1

There are a couple modules that are virtually identical between
API v1 and v2. This cleans up some of the v1 API code to just
use the v2 API code.

More clean up can be done for other API modules, but there are
subtle differences (returning 200 vs 202) between the two. The
code can still be reused, but some v1 specific handling will
need to be put in place to make that optimization.

Change-Id: Ice3b2819b65c55cb189a0c16c0d7ef2795bd20dd
Partial-bug: #1627921
This commit is contained in:
Sean McGinnis
2017-02-26 04:04:50 +00:00
parent 467b9a4d3f
commit c50e4b2e6a
5 changed files with 2 additions and 2062 deletions
-438
View File
@@ -1,438 +0,0 @@
# Copyright 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Module dedicated functions/classes dealing with rate limiting requests.
"""
import collections
import copy
import math
import re
import time
from oslo_serialization import jsonutils
from oslo_utils import importutils
from six.moves import http_client
import webob.dec
import webob.exc
from cinder.api.openstack import wsgi
from cinder.api.views import limits as limits_views
from cinder.i18n import _
from cinder import quota
from cinder.wsgi import common as base_wsgi
QUOTAS = quota.QUOTAS
LIMITS_PREFIX = "limits."
# Convenience constants for the limits dictionary passed to Limiter().
PER_SECOND = 1
PER_MINUTE = 60
PER_HOUR = 60 * 60
PER_DAY = 60 * 60 * 24
class LimitsController(wsgi.Controller):
"""Controller for accessing limits in the OpenStack API."""
def index(self, req):
"""Return all global and rate limit information."""
context = req.environ['cinder.context']
quotas = QUOTAS.get_project_quotas(context, context.project_id,
usages=False)
abs_limits = {k: v['limit'] for k, v in quotas.items()}
rate_limits = req.environ.get("cinder.limits", [])
builder = self._get_view_builder(req)
return builder.build(rate_limits, abs_limits)
def _get_view_builder(self, req):
return limits_views.ViewBuilder()
def create_resource():
return wsgi.Resource(LimitsController())
class Limit(object):
"""Stores information about a limit for HTTP requests."""
UNITS = {
1: "SECOND",
60: "MINUTE",
60 * 60: "HOUR",
60 * 60 * 24: "DAY",
}
UNIT_MAP = {v: k for k, v in UNITS.items()}
def __init__(self, verb, uri, regex, value, unit):
"""Initialize a new `Limit`.
@param verb: HTTP verb (POST, PUT, etc.)
@param uri: Human-readable URI
@param regex: Regular expression format for this limit
@param value: Integer number of requests which can be made
@param unit: Unit of measure for the value parameter
"""
self.verb = verb
self.uri = uri
self.regex = regex
self.value = int(value)
self.unit = unit
self.unit_string = self.display_unit().lower()
self.remaining = int(value)
if value <= 0:
raise ValueError("Limit value must be > 0")
self.last_request = None
self.next_request = None
self.water_level = 0
self.capacity = self.unit
self.request_value = float(self.capacity) / float(self.value)
msg = (_("Only %(value)s %(verb)s request(s) can be "
"made to %(uri)s every %(unit_string)s.") %
{'value': self.value, 'verb': self.verb,
'uri': self.uri, 'unit_string': self.unit_string})
self.error_message = msg
def __call__(self, verb, url):
"""Represent a call to this limit from a relevant request.
@param verb: string http verb (POST, GET, etc.)
@param url: string URL
"""
if self.verb != verb or not re.match(self.regex, url):
return
now = self._get_time()
if self.last_request is None:
self.last_request = now
leak_value = now - self.last_request
self.water_level -= leak_value
self.water_level = max(self.water_level, 0)
self.water_level += self.request_value
difference = self.water_level - self.capacity
self.last_request = now
if difference > 0:
self.water_level -= self.request_value
self.next_request = now + difference
return difference
cap = self.capacity
water = self.water_level
val = self.value
self.remaining = math.floor(((cap - water) / cap) * val)
self.next_request = now
def _get_time(self):
"""Retrieve the current time. Broken out for testability."""
return time.time()
def display_unit(self):
"""Display the string name of the unit."""
return self.UNITS.get(self.unit, "UNKNOWN")
def display(self):
"""Return a useful representation of this class."""
return {
"verb": self.verb,
"URI": self.uri,
"regex": self.regex,
"value": self.value,
"remaining": int(self.remaining),
"unit": self.display_unit(),
"resetTime": int(self.next_request or self._get_time()),
}
# "Limit" format is a dictionary with the HTTP verb, human-readable URI,
# a regular-expression to match, value and unit of measure (PER_DAY, etc.)
DEFAULT_LIMITS = [
Limit("POST", "*", ".*", 10, PER_MINUTE),
Limit("POST", "*/servers", "^/servers", 50, PER_DAY),
Limit("PUT", "*", ".*", 10, PER_MINUTE),
Limit("GET", "*changes-since*", ".*changes-since.*", 3, PER_MINUTE),
Limit("DELETE", "*", ".*", 100, PER_MINUTE),
]
class RateLimitingMiddleware(base_wsgi.Middleware):
"""Rate-limits requests passing through this middleware.
All limit information is stored in memory for this implementation.
"""
def __init__(self, application, limits=None, limiter=None, **kwargs):
"""Initialize new `RateLimitingMiddleware`
This wraps the given WSGI application and sets up the given limits.
@param application: WSGI application to wrap
@param limits: String describing limits
@param limiter: String identifying class for representing limits
Other parameters are passed to the constructor for the limiter.
"""
base_wsgi.Middleware.__init__(self, application)
# Select the limiter class
if limiter is None:
limiter = Limiter
else:
limiter = importutils.import_class(limiter)
# Parse the limits, if any are provided
if limits is not None:
limits = limiter.parse_limits(limits)
self._limiter = limiter(limits or DEFAULT_LIMITS, **kwargs)
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
"""Represent a single call through this middleware.
We should record the request if we have a limit relevant to it.
If no limit is relevant to the request, ignore it.
If the request should be rate limited, return a fault telling the user
they are over the limit and need to retry later.
"""
verb = req.method
url = req.url
context = req.environ.get("cinder.context")
if context:
username = context.user_id
else:
username = None
delay, error = self._limiter.check_for_delay(verb, url, username)
if delay:
msg = _("This request was rate-limited.")
retry = time.time() + delay
return wsgi.OverLimitFault(msg, error, retry)
req.environ["cinder.limits"] = self._limiter.get_limits(username)
return self.application
class Limiter(object):
"""Rate-limit checking class which handles limits in memory."""
def __init__(self, limits, **kwargs):
"""Initialize the new `Limiter`.
@param limits: List of `Limit` objects
"""
self.limits = copy.deepcopy(limits)
self.levels = collections.defaultdict(lambda: copy.deepcopy(limits))
# Pick up any per-user limit information
for key, value in kwargs.items():
if key.startswith(LIMITS_PREFIX):
username = key[len(LIMITS_PREFIX):]
self.levels[username] = self.parse_limits(value)
def get_limits(self, username=None):
"""Return the limits for a given user."""
return [limit.display() for limit in self.levels[username]]
def check_for_delay(self, verb, url, username=None):
"""Check the given verb/user/user triplet for limit.
@return: Tuple of delay (in seconds) and error message (or None, None)
"""
delays = []
for limit in self.levels[username]:
delay = limit(verb, url)
if delay:
delays.append((delay, limit.error_message))
if delays:
delays.sort()
return delays[0]
return None, None
# Note: This method gets called before the class is instantiated,
# so this must be either a static method or a class method. It is
# used to develop a list of limits to feed to the constructor. We
# put this in the class so that subclasses can override the
# default limit parsing.
@staticmethod
def parse_limits(limits):
"""Convert a string into a list of Limit instances.
This implementation expects a semicolon-separated sequence of
parenthesized groups, where each group contains a
comma-separated sequence consisting of HTTP method,
user-readable URI, a URI reg-exp, an integer number of
requests which can be made, and a unit of measure. Valid
values for the latter are "SECOND", "MINUTE", "HOUR", and
"DAY".
@return: List of Limit instances.
"""
# Handle empty limit strings
limits = limits.strip()
if not limits:
return []
# Split up the limits by semicolon
result = []
for group in limits.split(';'):
group = group.strip()
if group[:1] != '(' or group[-1:] != ')':
raise ValueError("Limit rules must be surrounded by "
"parentheses")
group = group[1:-1]
# Extract the Limit arguments
args = [a.strip() for a in group.split(',')]
if len(args) != 5:
raise ValueError("Limit rules must contain the following "
"arguments: verb, uri, regex, value, unit")
# Pull out the arguments
verb, uri, regex, value, unit = args
# Upper-case the verb
verb = verb.upper()
# Convert value--raises ValueError if it's not integer
value = int(value)
# Convert unit
unit = unit.upper()
if unit not in Limit.UNIT_MAP:
raise ValueError("Invalid units specified")
unit = Limit.UNIT_MAP[unit]
# Build a limit
result.append(Limit(verb, uri, regex, value, unit))
return result
class WsgiLimiter(object):
"""Rate-limit checking from a WSGI application.
Uses an in-memory `Limiter`.
To use, POST ``/<username>`` with JSON data such as::
{
"verb" : GET,
"path" : "/servers"
}
and receive a 204 No Content, or a 403 Forbidden with an X-Wait-Seconds
header containing the number of seconds to wait before the action would
succeed.
"""
def __init__(self, limits=None):
"""Initialize the new `WsgiLimiter`.
@param limits: List of `Limit` objects
"""
self._limiter = Limiter(limits or DEFAULT_LIMITS)
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, request):
"""Handles a call to this application.
Returns 204 if the request is acceptable to the limiter, else a 403
is returned with a relevant header indicating when the request
*will* succeed.
"""
if request.method != "POST":
raise webob.exc.HTTPMethodNotAllowed()
try:
info = dict(jsonutils.loads(request.body))
except ValueError:
raise webob.exc.HTTPBadRequest()
username = request.path_info_pop()
verb = info.get("verb")
path = info.get("path")
delay, error = self._limiter.check_for_delay(verb, path, username)
if delay:
headers = {"X-Wait-Seconds": "%.2f" % delay}
return webob.exc.HTTPForbidden(headers=headers, explanation=error)
else:
return webob.exc.HTTPNoContent()
class WsgiLimiterProxy(object):
"""Rate-limit requests based on answers from a remote source."""
def __init__(self, limiter_address):
"""Initialize the new `WsgiLimiterProxy`.
@param limiter_address: IP/port combination of where to request limit
"""
self.limiter_address = limiter_address
def check_for_delay(self, verb, path, username=None):
body = jsonutils.dump_as_bytes({"verb": verb, "path": path})
headers = {"Content-Type": "application/json"}
conn = http_client.HTTPConnection(self.limiter_address)
if username:
conn.request("POST", "/%s" % (username), body, headers)
else:
conn.request("POST", "/", body, headers)
resp = conn.getresponse()
if 200 >= resp.status < 300:
return None, None
return resp.getheader("X-Wait-Seconds"), resp.read() or None
# Note: This method gets called before the class is instantiated,
# so this must be either a static method or a class method. It is
# used to develop a list of limits to feed to the constructor.
# This implementation returns an empty list, since all limit
# decisions are made by a remote server.
@staticmethod
def parse_limits(limits):
"""Ignore a limits string--simply doesn't apply for the limit proxy.
@return: Empty list.
"""
return []
+2 -2
View File
@@ -21,12 +21,12 @@ WSGI middleware for OpenStack Volume API.
from cinder.api import extensions
import cinder.api.openstack
from cinder.api.v1 import limits
from cinder.api.v1 import snapshot_metadata
from cinder.api.v1 import snapshots
from cinder.api.v1 import types
from cinder.api.v1 import volume_metadata
from cinder.api.v1 import volumes
from cinder.api.v2 import limits
from cinder.api.v2 import snapshot_metadata
from cinder.api import versions
-145
View File
@@ -1,145 +0,0 @@
# Copyright 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob
from webob import exc
from cinder.api.openstack import wsgi
from cinder import exception
from cinder.i18n import _
from cinder import volume
class Controller(wsgi.Controller):
"""The snapshot metadata API controller for the OpenStack API."""
def __init__(self):
self.volume_api = volume.API()
super(Controller, self).__init__()
def _get_metadata(self, context, snapshot_id):
# Not found exception will be handled at the wsgi level
snapshot = self.volume_api.get_snapshot(context, snapshot_id)
meta = self.volume_api.get_snapshot_metadata(context, snapshot)
return meta
def index(self, req, snapshot_id):
"""Returns the list of metadata for a given snapshot."""
context = req.environ['cinder.context']
return {'metadata': self._get_metadata(context, snapshot_id)}
def create(self, req, snapshot_id, body):
try:
metadata = body['metadata']
except (KeyError, TypeError):
msg = _("Malformed request body")
raise exc.HTTPBadRequest(explanation=msg)
context = req.environ['cinder.context']
new_metadata = self._update_snapshot_metadata(context,
snapshot_id,
metadata,
delete=False)
return {'metadata': new_metadata}
def update(self, req, snapshot_id, id, body):
try:
meta_item = body['meta']
except (TypeError, KeyError):
expl = _('Malformed request body')
raise exc.HTTPBadRequest(explanation=expl)
if id not in meta_item:
expl = _('Request body and URI mismatch')
raise exc.HTTPBadRequest(explanation=expl)
if len(meta_item) > 1:
expl = _('Request body contains too many items')
raise exc.HTTPBadRequest(explanation=expl)
context = req.environ['cinder.context']
self._update_snapshot_metadata(context,
snapshot_id,
meta_item,
delete=False)
return {'meta': meta_item}
def update_all(self, req, snapshot_id, body):
try:
metadata = body['metadata']
except (TypeError, KeyError):
expl = _('Malformed request body')
raise exc.HTTPBadRequest(explanation=expl)
context = req.environ['cinder.context']
new_metadata = self._update_snapshot_metadata(context,
snapshot_id,
metadata,
delete=True)
return {'metadata': new_metadata}
def _update_snapshot_metadata(self, context,
snapshot_id, metadata,
delete=False):
try:
snapshot = self.volume_api.get_snapshot(context, snapshot_id)
return self.volume_api.update_snapshot_metadata(context,
snapshot,
metadata,
delete)
# Not found exception will be handled at the wsgi level
except (ValueError, AttributeError):
msg = _("Malformed request body")
raise exc.HTTPBadRequest(explanation=msg)
except exception.InvalidVolumeMetadata as error:
raise exc.HTTPBadRequest(explanation=error.msg)
except exception.InvalidVolumeMetadataSize as error:
raise exc.HTTPRequestEntityTooLarge(explanation=error.msg)
def show(self, req, snapshot_id, id):
"""Return a single metadata item."""
context = req.environ['cinder.context']
data = self._get_metadata(context, snapshot_id)
try:
return {'meta': {id: data[id]}}
except KeyError:
raise exception.SnapshotMetadataNotFound(snapshot_id=snapshot_id,
metadata_key=id)
def delete(self, req, snapshot_id, id):
"""Deletes an existing metadata."""
context = req.environ['cinder.context']
metadata = self._get_metadata(context, snapshot_id)
if id not in metadata:
raise exception.SnapshotMetadataNotFound(snapshot_id=snapshot_id,
metadata_key=id)
# Not found exception will be handled at the wsgi level
snapshot = self.volume_api.get_snapshot(context, snapshot_id)
self.volume_api.delete_snapshot_metadata(context, snapshot, id)
return webob.Response(status_int=200)
def create_resource():
return wsgi.Resource(Controller())
-801
View File
@@ -1,801 +0,0 @@
# Copyright 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests dealing with HTTP rate-limiting.
"""
from oslo_serialization import jsonutils
import six
from six.moves import http_client
from six.moves import range
import webob
from cinder.api.v1 import limits
from cinder.api import views
import cinder.context
from cinder import test
from cinder.tests.unit import fake_constants as fake
TEST_LIMITS = [
limits.Limit("GET", "/delayed", "^/delayed", 1, limits.PER_MINUTE),
limits.Limit("POST", "*", ".*", 7, limits.PER_MINUTE),
limits.Limit("POST", "/volumes", "^/volumes", 3, limits.PER_MINUTE),
limits.Limit("PUT", "*", "", 10, limits.PER_MINUTE),
limits.Limit("PUT", "/volumes", "^/volumes", 5, limits.PER_MINUTE),
]
NS = {
'atom': 'http://www.w3.org/2005/Atom',
'ns': 'http://docs.openstack.org/common/api/v1.0'
}
class BaseLimitTestSuite(test.TestCase):
"""Base test suite which provides relevant stubs and time abstraction."""
def setUp(self):
super(BaseLimitTestSuite, self).setUp()
self.time = 0.0
self.stubs.Set(limits.Limit, "_get_time", self._get_time)
self.absolute_limits = {}
def stub_get_project_quotas(context, project_id, usages=True):
return {k: dict(limit=v) for k, v in self.absolute_limits.items()}
self.stubs.Set(cinder.quota.QUOTAS, "get_project_quotas",
stub_get_project_quotas)
def _get_time(self):
"""Return the "time" according to this test suite."""
return self.time
class LimitsControllerTest(BaseLimitTestSuite):
"""Tests for `limits.LimitsController` class."""
def setUp(self):
"""Run before each test."""
super(LimitsControllerTest, self).setUp()
self.controller = limits.create_resource()
def _get_index_request(self, accept_header="application/json"):
"""Helper to set routing arguments."""
request = webob.Request.blank("/")
request.accept = accept_header
request.environ["wsgiorg.routing_args"] = (None, {
"action": "index",
"controller": "",
})
context = cinder.context.RequestContext(fake.USER_ID, fake.PROJECT_ID)
request.environ["cinder.context"] = context
return request
def _populate_limits(self, request):
"""Put limit info into a request."""
_limits = [
limits.Limit("GET", "*", ".*", 10, 60).display(),
limits.Limit("POST", "*", ".*", 5, 60 * 60).display(),
limits.Limit("GET", "changes-since*", "changes-since",
5, 60).display(),
]
request.environ["cinder.limits"] = _limits
return request
def test_empty_index_json(self):
"""Test getting empty limit details in JSON."""
request = self._get_index_request()
response = request.get_response(self.controller)
expected = {
"limits": {
"rate": [],
"absolute": {},
},
}
body = jsonutils.loads(response.body)
self.assertEqual(expected, body)
def test_index_json(self):
"""Test getting limit details in JSON."""
request = self._get_index_request()
request = self._populate_limits(request)
self.absolute_limits = {
'gigabytes': 512,
'volumes': 5,
}
response = request.get_response(self.controller)
expected = {
"limits": {
"rate": [
{
"regex": ".*",
"uri": "*",
"limit": [
{
"verb": "GET",
"next-available": "1970-01-01T00:00:00",
"unit": "MINUTE",
"value": 10,
"remaining": 10,
},
{
"verb": "POST",
"next-available": "1970-01-01T00:00:00",
"unit": "HOUR",
"value": 5,
"remaining": 5,
},
],
},
{
"regex": "changes-since",
"uri": "changes-since*",
"limit": [
{
"verb": "GET",
"next-available": "1970-01-01T00:00:00",
"unit": "MINUTE",
"value": 5,
"remaining": 5,
},
],
},
],
"absolute": {"maxTotalVolumeGigabytes": 512,
"maxTotalVolumes": 5, },
},
}
body = jsonutils.loads(response.body)
self.assertEqual(expected, body)
def _populate_limits_diff_regex(self, request):
"""Put limit info into a request."""
_limits = [
limits.Limit("GET", "*", ".*", 10, 60).display(),
limits.Limit("GET", "*", "*.*", 10, 60).display(),
]
request.environ["cinder.limits"] = _limits
return request
def test_index_diff_regex(self):
"""Test getting limit details in JSON."""
request = self._get_index_request()
request = self._populate_limits_diff_regex(request)
response = request.get_response(self.controller)
expected = {
"limits": {
"rate": [
{
"regex": ".*",
"uri": "*",
"limit": [
{
"verb": "GET",
"next-available": "1970-01-01T00:00:00",
"unit": "MINUTE",
"value": 10,
"remaining": 10,
},
],
},
{
"regex": "*.*",
"uri": "*",
"limit": [
{
"verb": "GET",
"next-available": "1970-01-01T00:00:00",
"unit": "MINUTE",
"value": 10,
"remaining": 10,
},
],
},
],
"absolute": {},
},
}
body = jsonutils.loads(response.body)
self.assertEqual(expected, body)
def _test_index_absolute_limits_json(self, expected):
request = self._get_index_request()
response = request.get_response(self.controller)
body = jsonutils.loads(response.body)
self.assertEqual(expected, body['limits']['absolute'])
def test_index_ignores_extra_absolute_limits_json(self):
self.absolute_limits = {'unknown_limit': 9001}
self._test_index_absolute_limits_json({})
class TestLimiter(limits.Limiter):
pass
class LimitMiddlewareTest(BaseLimitTestSuite):
"""Tests for the `limits.RateLimitingMiddleware` class."""
@webob.dec.wsgify
def _empty_app(self, request):
"""Do-nothing WSGI app."""
pass
def setUp(self):
"""Prepare middleware for use through fake WSGI app."""
super(LimitMiddlewareTest, self).setUp()
_limits = '(GET, *, .*, 1, MINUTE)'
self.app = limits.RateLimitingMiddleware(self._empty_app, _limits,
"%s.TestLimiter" %
self.__class__.__module__)
def test_limit_class(self):
"""Test that middleware selected correct limiter class."""
self.assertIsInstance(self.app._limiter, TestLimiter)
def test_good_request(self):
"""Test successful GET request through middleware."""
request = webob.Request.blank("/")
response = request.get_response(self.app)
self.assertEqual(200, response.status_int)
def test_limited_request_json(self):
"""Test a rate-limited (413) GET request through middleware."""
request = webob.Request.blank("/")
response = request.get_response(self.app)
self.assertEqual(200, response.status_int)
request = webob.Request.blank("/")
response = request.get_response(self.app)
self.assertEqual(413, response.status_int)
self.assertIn('Retry-After', response.headers)
retry_after = int(response.headers['Retry-After'])
self.assertAlmostEqual(retry_after, 60, 1)
body = jsonutils.loads(response.body)
expected = "Only 1 GET request(s) can be made to * every minute."
value = body["overLimitFault"]["details"].strip()
self.assertEqual(expected, value)
class LimitTest(BaseLimitTestSuite):
"""Tests for the `limits.Limit` class."""
def test_GET_no_delay(self):
"""Test a limit handles 1 GET per second."""
limit = limits.Limit("GET", "*", ".*", 1, 1)
delay = limit("GET", "/anything")
self.assertIsNone(delay)
self.assertEqual(0, limit.next_request)
self.assertEqual(0, limit.last_request)
def test_GET_delay(self):
"""Test two calls to 1 GET per second limit."""
limit = limits.Limit("GET", "*", ".*", 1, 1)
delay = limit("GET", "/anything")
self.assertIsNone(delay)
delay = limit("GET", "/anything")
self.assertEqual(1, delay)
self.assertEqual(1, limit.next_request)
self.assertEqual(0, limit.last_request)
self.time += 4
delay = limit("GET", "/anything")
self.assertIsNone(delay)
self.assertEqual(4, limit.next_request)
self.assertEqual(4, limit.last_request)
class ParseLimitsTest(BaseLimitTestSuite):
"""Tests for the default limits parser in the `limits.Limiter` class."""
def test_invalid(self):
"""Test that parse_limits() handles invalid input correctly."""
self.assertRaises(ValueError, limits.Limiter.parse_limits,
';;;;;')
def test_bad_rule(self):
"""Test that parse_limits() handles bad rules correctly."""
self.assertRaises(ValueError, limits.Limiter.parse_limits,
'GET, *, .*, 20, minute')
def test_missing_arg(self):
"""Test that parse_limits() handles missing args correctly."""
self.assertRaises(ValueError, limits.Limiter.parse_limits,
'(GET, *, .*, 20)')
def test_bad_value(self):
"""Test that parse_limits() handles bad values correctly."""
self.assertRaises(ValueError, limits.Limiter.parse_limits,
'(GET, *, .*, foo, minute)')
def test_bad_unit(self):
"""Test that parse_limits() handles bad units correctly."""
self.assertRaises(ValueError, limits.Limiter.parse_limits,
'(GET, *, .*, 20, lightyears)')
def test_multiple_rules(self):
"""Test that parse_limits() handles multiple rules correctly."""
try:
l = limits.Limiter.parse_limits('(get, *, .*, 20, minute);'
'(PUT, /foo*, /foo.*, 10, hour);'
'(POST, /bar*, /bar.*, 5, second);'
'(Say, /derp*, /derp.*, 1, day)')
except ValueError as e:
self.fail(msg=e)
# Make sure the number of returned limits are correct
self.assertEqual(4, len(l))
# Check all the verbs...
expected = ['GET', 'PUT', 'POST', 'SAY']
self.assertEqual(expected, [t.verb for t in l])
# ...the URIs...
expected = ['*', '/foo*', '/bar*', '/derp*']
self.assertEqual(expected, [t.uri for t in l])
# ...the regexes...
expected = ['.*', '/foo.*', '/bar.*', '/derp.*']
self.assertEqual(expected, [t.regex for t in l])
# ...the values...
expected = [20, 10, 5, 1]
self.assertEqual(expected, [t.value for t in l])
# ...and the units...
expected = [limits.PER_MINUTE, limits.PER_HOUR,
limits.PER_SECOND, limits.PER_DAY]
self.assertEqual(expected, [t.unit for t in l])
class LimiterTest(BaseLimitTestSuite):
"""Tests for the in-memory `limits.Limiter` class."""
def setUp(self):
"""Run before each test."""
super(LimiterTest, self).setUp()
userlimits = {'limits.user3': '',
'limits.user0': '(get, *, .*, 4, minute);'
'(put, *, .*, 2, minute)'}
self.limiter = limits.Limiter(TEST_LIMITS, **userlimits)
def _check(self, num, verb, url, username=None):
"""Check and yield results from checks."""
for x in range(num):
yield self.limiter.check_for_delay(verb, url, username)[0]
def _check_sum(self, num, verb, url, username=None):
"""Check and sum results from checks."""
results = self._check(num, verb, url, username)
return sum(item for item in results if item)
def test_no_delay_GET(self):
"""no delay on a single call for a limit verb we didn"t set."""
delay = self.limiter.check_for_delay("GET", "/anything")
self.assertEqual((None, None), delay)
def test_no_delay_PUT(self):
"""no delay on a single call for a known limit."""
delay = self.limiter.check_for_delay("PUT", "/anything")
self.assertEqual((None, None), delay)
def test_delay_PUT(self):
"""test delay on 11th put request.
the 11th PUT will result in a delay of 6.0 seconds until
the next request will be granted.
"""
expected = [None] * 10 + [6.0]
results = list(self._check(11, "PUT", "/anything"))
self.assertEqual(expected, results)
def test_delay_POST(self):
"""test delay of 8th post request.
Ensure that the 8th POST will result in a delay of 6.0 seconds
until the next request will be granted.
"""
expected = [None] * 7
results = list(self._check(7, "POST", "/anything"))
self.assertEqual(expected, results)
expected = 60.0 / 7.0
results = self._check_sum(1, "POST", "/anything")
self.assertAlmostEqual(expected, results, 8)
def test_delay_GET(self):
"""Ensure the 11th GET will result in NO delay."""
expected = [None] * 11
results = list(self._check(11, "GET", "/anything"))
self.assertEqual(expected, results)
expected = [None] * 4 + [15.0]
results = list(self._check(5, "GET", "/foo", "user0"))
self.assertEqual(expected, results)
def test_delay_PUT_volumes(self):
"""Test limit of PUT on /volumes.
Ensure PUT on /volumes limits at 5 requests, and PUT elsewhere is
still OK after 5 requests...
but then after 11 total requests, PUT limiting kicks in.
"""
# First 6 requests on PUT /volumes
expected = [None] * 5 + [12.0]
results = list(self._check(6, "PUT", "/volumes"))
self.assertEqual(expected, results)
# Next 5 request on PUT /anything
expected = [None] * 4 + [6.0]
results = list(self._check(5, "PUT", "/anything"))
self.assertEqual(expected, results)
def test_delay_PUT_wait(self):
"""Test limit on PUT is lifted.
Ensure after hitting the limit and then waiting for the correct
amount of time, the limit will be lifted.
"""
expected = [None] * 10 + [6.0]
results = list(self._check(11, "PUT", "/anything"))
self.assertEqual(expected, results)
# Advance time
self.time += 6.0
expected = [None, 6.0]
results = list(self._check(2, "PUT", "/anything"))
self.assertEqual(expected, results)
def test_multiple_delays(self):
"""Ensure multiple requests still get a delay."""
expected = [None] * 10 + [6.0] * 10
results = list(self._check(20, "PUT", "/anything"))
self.assertEqual(expected, results)
self.time += 1.0
expected = [5.0] * 10
results = list(self._check(10, "PUT", "/anything"))
self.assertEqual(expected, results)
expected = [None] * 2 + [30.0] * 8
results = list(self._check(10, "PUT", "/anything", "user0"))
self.assertEqual(expected, results)
def test_user_limit(self):
"""Test user-specific limits."""
self.assertEqual([], self.limiter.levels['user3'])
self.assertEqual(2, len(self.limiter.levels['user0']))
def test_multiple_users(self):
"""Tests involving multiple users."""
# User0
expected = [None] * 2 + [30.0] * 8
results = list(self._check(10, "PUT", "/anything", "user0"))
self.assertEqual(expected, results)
# User1
expected = [None] * 10 + [6.0] * 10
results = list(self._check(20, "PUT", "/anything", "user1"))
self.assertEqual(expected, results)
# User2
expected = [None] * 10 + [6.0] * 5
results = list(self._check(15, "PUT", "/anything", "user2"))
self.assertEqual(expected, results)
# User3
expected = [None] * 20
results = list(self._check(20, "PUT", "/anything", "user3"))
self.assertEqual(expected, results)
self.time += 1.0
# User1 again
expected = [5.0] * 10
results = list(self._check(10, "PUT", "/anything", "user1"))
self.assertEqual(expected, results)
self.time += 1.0
# User1 again
expected = [4.0] * 5
results = list(self._check(5, "PUT", "/anything", "user2"))
self.assertEqual(expected, results)
# User0 again
expected = [28.0]
results = list(self._check(1, "PUT", "/anything", "user0"))
self.assertEqual(expected, results)
self.time += 28.0
expected = [None, 30.0]
results = list(self._check(2, "PUT", "/anything", "user0"))
self.assertEqual(expected, results)
class WsgiLimiterTest(BaseLimitTestSuite):
"""Tests for `limits.WsgiLimiter` class."""
def setUp(self):
"""Run before each test."""
super(WsgiLimiterTest, self).setUp()
self.app = limits.WsgiLimiter(TEST_LIMITS)
def _request_data(self, verb, path):
"""Get data describing a limit request verb/path."""
return jsonutils.dump_as_bytes({"verb": verb, "path": path})
def _request(self, verb, url, username=None):
"""Assert that POSTing to given url triggers given action.
Ensure POSTing to the given url causes the given username
to perform the given action.
Make the internal rate limiter return delay and make sure that the
WSGI app returns the correct response.
"""
if username:
request = webob.Request.blank("/%s" % username)
else:
request = webob.Request.blank("/")
request.method = "POST"
request.body = self._request_data(verb, url)
response = request.get_response(self.app)
if "X-Wait-Seconds" in response.headers:
self.assertEqual(403, response.status_int)
return response.headers["X-Wait-Seconds"]
self.assertEqual(204, response.status_int)
def test_invalid_methods(self):
"""Only POSTs should work."""
for method in ["GET", "PUT", "DELETE", "HEAD", "OPTIONS"]:
request = webob.Request.blank("/", method=method)
response = request.get_response(self.app)
self.assertEqual(405, response.status_int)
def test_good_url(self):
delay = self._request("GET", "/something")
self.assertIsNone(delay)
def test_escaping(self):
delay = self._request("GET", "/something/jump%20up")
self.assertIsNone(delay)
def test_response_to_delays(self):
delay = self._request("GET", "/delayed")
self.assertIsNone(delay)
delay = self._request("GET", "/delayed")
self.assertEqual('60.00', delay)
def test_response_to_delays_usernames(self):
delay = self._request("GET", "/delayed", "user1")
self.assertIsNone(delay)
delay = self._request("GET", "/delayed", "user2")
self.assertIsNone(delay)
delay = self._request("GET", "/delayed", "user1")
self.assertEqual('60.00', delay)
delay = self._request("GET", "/delayed", "user2")
self.assertEqual('60.00', delay)
class FakeHttplibSocket(object):
"""Fake `http_client.HTTPResponse` replacement."""
def __init__(self, response_string):
"""Initialize new `FakeHttplibSocket`."""
if isinstance(response_string, six.text_type):
response_string = response_string.encode('utf-8')
self._buffer = six.BytesIO(response_string)
def makefile(self, mode, *args):
"""Returns the socket's internal buffer."""
return self._buffer
class FakeHttplibConnection(object):
"""Fake `http_client.HTTPConnection`."""
def __init__(self, app, host):
"""Initialize `FakeHttplibConnection`."""
self.app = app
self.host = host
def request(self, method, path, body="", headers=None):
"""Fake method for request.
Requests made via this connection actually get translated and
routed into our WSGI app, we then wait for the response and turn
it back into an `http_client.HTTPResponse`.
"""
if not headers:
headers = {}
req = webob.Request.blank(path)
req.method = method
req.headers = headers
req.host = self.host
req.body = body
resp = str(req.get_response(self.app))
resp = "HTTP/1.0 %s" % resp
sock = FakeHttplibSocket(resp)
self.http_response = http_client.HTTPResponse(sock)
self.http_response.begin()
def getresponse(self):
"""Return our generated response from the request."""
return self.http_response
def wire_HTTPConnection_to_WSGI(host, app):
"""Monkeypatches HTTPConnection.
Monkeypatches HTTPConnection so that if you try to connect to host, you
are instead routed straight to the given WSGI app.
After calling this method, when any code calls
http_client.HTTPConnection(host)
the connection object will be a fake. Its requests will be sent directly
to the given WSGI app rather than through a socket.
Code connecting to hosts other than host will not be affected.
This method may be called multiple times to map different hosts to
different apps.
This method returns the original HTTPConnection object, so that the caller
can restore the default HTTPConnection interface (for all hosts).
"""
class HTTPConnectionDecorator(object):
"""Decorator to mock the HTTPConnection class.
Wraps the real HTTPConnection class so that when you instantiate
the class you might instead get a fake instance.
"""
def __init__(self, wrapped):
self.wrapped = wrapped
def __call__(self, connection_host, *args, **kwargs):
if connection_host == host:
return FakeHttplibConnection(app, host)
else:
return self.wrapped(connection_host, *args, **kwargs)
oldHTTPConnection = http_client.HTTPConnection
new_http_connection = HTTPConnectionDecorator(http_client.HTTPConnection)
http_client.HTTPConnection = new_http_connection
return oldHTTPConnection
class WsgiLimiterProxyTest(BaseLimitTestSuite):
"""Tests for the `limits.WsgiLimiterProxy` class."""
def setUp(self):
"""setUp for test suite.
Do some nifty HTTP/WSGI magic which allows for WSGI to be called
directly by something like the `http_client` library.
"""
super(WsgiLimiterProxyTest, self).setUp()
self.app = limits.WsgiLimiter(TEST_LIMITS)
self.oldHTTPConnection = (
wire_HTTPConnection_to_WSGI("169.254.0.1:80", self.app))
self.proxy = limits.WsgiLimiterProxy("169.254.0.1:80")
self.addCleanup(self._restore, self.oldHTTPConnection)
def _restore(self, oldHTTPConnection):
# restore original HTTPConnection object
http_client.HTTPConnection = oldHTTPConnection
def test_200(self):
"""Successful request test."""
delay = self.proxy.check_for_delay("GET", "/anything")
self.assertEqual((None, None), delay)
def test_403(self):
"""Forbidden request test."""
delay = self.proxy.check_for_delay("GET", "/delayed")
self.assertEqual((None, None), delay)
delay, error = self.proxy.check_for_delay("GET", "/delayed")
error = error.strip()
expected = ("60.00",
b"403 Forbidden\n\nOnly 1 GET request(s) can be "
b"made to /delayed every minute.")
self.assertEqual(expected, (delay, error))
class LimitsViewBuilderTest(test.TestCase):
def setUp(self):
super(LimitsViewBuilderTest, self).setUp()
self.view_builder = views.limits.ViewBuilder()
self.rate_limits = [{"URI": "*",
"regex": ".*",
"value": 10,
"verb": "POST",
"remaining": 2,
"unit": "MINUTE",
"resetTime": 1311272226},
{"URI": "*/volumes",
"regex": "^/volumes",
"value": 50,
"verb": "POST",
"remaining": 10,
"unit": "DAY",
"resetTime": 1311272226}]
self.absolute_limits = {"gigabytes": 1,
"backup_gigabytes": 2,
"volumes": 3,
"snapshots": 4,
"backups": 5}
def test_build_limits(self):
tdate = "2011-07-21T18:17:06"
expected_limits = \
{"limits": {"rate": [{"uri": "*",
"regex": ".*",
"limit": [{"value": 10,
"verb": "POST",
"remaining": 2,
"unit": "MINUTE",
"next-available": tdate}]},
{"uri": "*/volumes",
"regex": "^/volumes",
"limit": [{"value": 50,
"verb": "POST",
"remaining": 10,
"unit": "DAY",
"next-available": tdate}]}],
"absolute": {"maxTotalVolumeGigabytes": 1,
"maxTotalBackupGigabytes": 2,
"maxTotalVolumes": 3,
"maxTotalSnapshots": 4,
"maxTotalBackups": 5}}}
output = self.view_builder.build(self.rate_limits,
self.absolute_limits)
self.assertDictEqual(expected_limits, output)
def test_build_limits_empty_limits(self):
expected_limits = {"limits": {"rate": [],
"absolute": {}}}
abs_limits = {}
rate_limits = []
output = self.view_builder.build(rate_limits, abs_limits)
self.assertDictEqual(expected_limits, output)
@@ -1,676 +0,0 @@
# Copyright 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_serialization import jsonutils
import webob
from cinder.api import extensions
from cinder.api.v1 import snapshot_metadata
from cinder.api.v1 import snapshots
from cinder import context
import cinder.db
from cinder import exception as exc
from cinder.objects import fields
from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder import volume
def return_create_snapshot_metadata(context, snapshot_id, metadata, delete):
return stub_snapshot_metadata(snapshot_id)
def return_create_snapshot_metadata_insensitive(context, snapshot_id,
metadata, delete):
return stub_snapshot_metadata_insensitive(snapshot_id)
def return_new_snapshot_metadata(context, snapshot_id, metadata, delete):
return stub_new_snapshot_metadata(snapshot_id)
def return_empty_container_metadata(context, snapshot_id, metadata, delete):
if snapshot_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.SnapshotNotFound(snapshot_id)
return {}
def stub_snapshot_metadata(snapshot_id):
if snapshot_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.SnapshotNotFound(snapshot_id)
metadata = {
"key1": "value1",
"key2": "value2",
"key3": "value3",
}
return metadata
def stub_snapshot_metadata_insensitive(snapshot_id):
if snapshot_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.SnapshotNotFound(snapshot_id)
metadata = {
"key1": "value1",
"key2": "value2",
"key3": "value3",
"KEY4": "value4",
}
return metadata
def stub_new_snapshot_metadata(snapshot_id):
if snapshot_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.SnapshotNotFound(snapshot_id)
metadata = {
'key10': 'value10',
'key99': 'value99',
'KEY20': 'value20',
}
return metadata
def return_snapshot(context, snapshot_id):
if snapshot_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.SnapshotNotFound(snapshot_id)
return {'id': '0cc3346e-9fef-4445-abe6-5d2b2690ec64',
'name': 'fake',
'status': 'available',
'metadata': {}}
def stub_get(self, context, volume_id, *args, **kwargs):
if volume_id == fake.WILL_NOT_BE_FOUND_ID:
raise exc.VolumeNotFound(volume_id)
vol = {'id': volume_id,
'size': 100,
'name': 'fake',
'host': 'fake-host',
'status': 'available',
'encryption_key_id': None,
'volume_type_id': None,
'migration_status': None,
'availability_zone': 'zone1:host1',
'attach_status': fields.VolumeAttachStatus.DETACHED}
return fake_volume.fake_volume_obj(context, **vol)
def fake_update_snapshot_metadata(self, context, snapshot, diff):
pass
class SnapshotMetaDataTest(test.TestCase):
def setUp(self):
super(SnapshotMetaDataTest, self).setUp()
self.volume_api = cinder.volume.api.API()
self.stubs.Set(volume.api.API, 'get', stub_get)
self.stubs.Set(cinder.db, 'snapshot_get', return_snapshot)
self.stubs.Set(self.volume_api, 'update_snapshot_metadata',
fake_update_snapshot_metadata)
self.ext_mgr = extensions.ExtensionManager()
self.ext_mgr.extensions = {}
self.snapshot_controller = snapshots.SnapshotsController(self.ext_mgr)
self.controller = snapshot_metadata.Controller()
self.url = '/v1/%s/snapshots/%s/metadata' % (
fake.PROJECT_ID, fake.SNAPSHOT_ID)
snap = {"volume_size": 100,
"volume_id": fake.VOLUME_ID,
"display_name": "Snapshot Test Name",
"display_description": "Snapshot Test Desc",
"availability_zone": "zone1:host1",
"host": "fake-host",
"metadata": {}}
body = {"snapshot": snap}
req = fakes.HTTPRequest.blank('/v1/snapshots')
self.snapshot_controller.create(req, body)
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_index(self, snapshot_get_by_id):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_obj['metadata'] = {'key1': 'value1',
'key2': 'value2',
'key3': 'value3'}
snapshot_get_by_id.return_value = snapshot_obj
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, fake.SNAPSHOT_ID)
expected = {
'metadata': {
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
},
}
self.assertEqual(expected, res_dict)
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_index_nonexistent_snapshot(self, snapshot_get_by_id):
snapshot_get_by_id.side_effect = \
exc.SnapshotNotFound(snapshot_id=fake.WILL_NOT_BE_FOUND_ID)
req = fakes.HTTPRequest.blank(self.url)
self.assertRaises(exc.SnapshotNotFound,
self.controller.index, req, self.url)
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_index_no_data(self, snapshot_get_by_id):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_get_by_id.return_value = snapshot_obj
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, fake.SNAPSHOT_ID)
expected = {'metadata': {}}
self.assertEqual(expected, res_dict)
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_show(self, snapshot_get_by_id):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_obj['metadata'] = {'key2': 'value2'}
snapshot_get_by_id.return_value = snapshot_obj
req = fakes.HTTPRequest.blank(self.url + '/key2')
res_dict = self.controller.show(req, fake.SNAPSHOT_ID, 'key2')
expected = {'meta': {'key2': 'value2'}}
self.assertEqual(expected, res_dict)
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_show_nonexistent_snapshot(self, snapshot_get_by_id):
snapshot_get_by_id.side_effect = \
exc.SnapshotNotFound(snapshot_id=fake.WILL_NOT_BE_FOUND_ID)
req = fakes.HTTPRequest.blank(self.url + '/key2')
self.assertRaises(exc.SnapshotNotFound,
self.controller.show, req, fake.SNAPSHOT_ID, 'key2')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_show_meta_not_found(self, snapshot_get_by_id):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_get_by_id.return_value = snapshot_obj
req = fakes.HTTPRequest.blank(self.url + '/key6')
self.assertRaises(exc.SnapshotMetadataNotFound,
self.controller.show, req, fake.SNAPSHOT_ID, 'key6')
@mock.patch('cinder.db.snapshot_metadata_delete')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_delete(self, snapshot_get_by_id, snapshot_metadata_delete):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_obj['metadata'] = {'key2': 'value2'}
snapshot_get_by_id.return_value = snapshot_obj
req = fakes.HTTPRequest.blank(self.url + '/key2')
req.method = 'DELETE'
res = self.controller.delete(req, fake.SNAPSHOT_ID, 'key2')
self.assertEqual(200, res.status_int)
def test_delete_nonexistent_snapshot(self):
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'DELETE'
self.assertRaises(exc.SnapshotNotFound,
self.controller.delete, req,
fake.WILL_NOT_BE_FOUND_ID, 'key1')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_delete_meta_not_found(self, snapshot_get_by_id):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_get_by_id.return_value = snapshot_obj
req = fakes.HTTPRequest.blank(self.url + '/key6')
req.method = 'DELETE'
self.assertRaises(exc.SnapshotMetadataNotFound,
self.controller.delete, req,
fake.SNAPSHOT_ID, 'key6')
@mock.patch('cinder.db.snapshot_update')
@mock.patch('cinder.objects.Volume.get_by_id')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_create(self, snapshot_get_by_id, volume_get_by_id,
snapshot_update):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
fake_volume_obj = fake_volume.fake_volume_obj(ctx)
snapshot_get_by_id.return_value = snapshot_obj
volume_get_by_id.return_value = fake_volume_obj
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank('/v1/snapshot_metadata')
req.method = 'POST'
req.content_type = "application/json"
body = {"metadata": {"key1": "value1",
"key2": "value2",
"key3": "value3"}}
req.body = jsonutils.dump_as_bytes(body)
res_dict = self.controller.create(req, fake.SNAPSHOT_ID, body)
self.assertEqual(body, res_dict)
@mock.patch('cinder.db.snapshot_update')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_create_with_keys_in_uppercase_and_lowercase(
self, snapshot_get_by_id, snapshot_update):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_get_by_id.return_value = snapshot_obj
# if the keys in uppercase_and_lowercase, should return the one
# which server added
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata_insensitive)
req = fakes.HTTPRequest.blank('/v1/snapshot_metadata')
req.method = 'POST'
req.content_type = "application/json"
body = {"metadata": {"key1": "value1",
"KEY1": "value1",
"key2": "value2",
"KEY2": "value2",
"key3": "value3",
"KEY4": "value4"}}
expected = {"metadata": {"key1": "value1",
"key2": "value2",
"key3": "value3",
"KEY4": "value4"}}
req.body = jsonutils.dump_as_bytes(body)
res_dict = self.controller.create(req, fake.SNAPSHOT_ID, body)
self.assertEqual(expected, res_dict)
def test_create_empty_body(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'POST'
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, fake.SNAPSHOT_ID, None)
def test_create_item_empty_key(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"": "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, fake.SNAPSHOT_ID, body)
def test_create_item_key_too_long(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {("a" * 260): "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create,
req, fake.SNAPSHOT_ID, body)
def test_create_nonexistent_snapshot(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank('/v1/snapshot_metadata')
req.method = 'POST'
req.content_type = "application/json"
body = {"metadata": {"key9": "value9"}}
req.body = jsonutils.dump_as_bytes(body)
self.assertRaises(exc.SnapshotNotFound,
self.controller.create, req,
fake.WILL_NOT_BE_FOUND_ID, body)
@mock.patch('cinder.db.snapshot_update')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_update_all(self, snapshot_get_by_id, snapshot_update):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': []
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_get_by_id.return_value = snapshot_obj
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_new_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {
'metadata': {
'key10': 'value10',
'key99': 'value99',
'KEY20': 'value20',
},
}
req.body = jsonutils.dump_as_bytes(expected)
res_dict = self.controller.update_all(req, fake.SNAPSHOT_ID, expected)
self.assertEqual(expected, res_dict)
@mock.patch('cinder.db.snapshot_update',
return_value={'key10': 'value10',
'key99': 'value99',
'KEY20': 'value20'})
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_update_all_with_keys_in_uppercase_and_lowercase(
self, snapshot_get_by_id, snapshot_update):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_get_by_id.return_value = snapshot_obj
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_new_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
body = {
'metadata': {
'key10': 'value10',
'KEY10': 'value10',
'key99': 'value99',
'KEY20': 'value20',
},
}
expected = {
'metadata': {
'key10': 'value10',
'key99': 'value99',
'KEY20': 'value20',
},
}
req.body = jsonutils.dump_as_bytes(expected)
res_dict = self.controller.update_all(req, fake.SNAPSHOT_ID, body)
self.assertEqual(expected, res_dict)
@mock.patch('cinder.db.snapshot_update')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_update_all_empty_container(self, snapshot_get_by_id,
snapshot_update):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': []
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_get_by_id.return_value = snapshot_obj
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_empty_container_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'metadata': {}}
req.body = jsonutils.dump_as_bytes(expected)
res_dict = self.controller.update_all(req, fake.SNAPSHOT_ID, expected)
self.assertEqual(expected, res_dict)
def test_update_all_malformed_container(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'meta': {}}
req.body = jsonutils.dump_as_bytes(expected)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update_all, req, fake.SNAPSHOT_ID,
expected)
@mock.patch('cinder.db.sqlalchemy.api._snapshot_get')
@mock.patch('cinder.db.snapshot_metadata_update', autospec=True)
def test_update_all_malformed_data(self, metadata_update, snapshot_get):
snapshot_get.return_value = stub_get
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'metadata': ['asdf']}
req.body = jsonutils.dump_as_bytes(expected)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update_all, req, fake.SNAPSHOT_ID,
expected)
def test_update_all_nonexistent_snapshot(self):
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
body = {'metadata': {'key10': 'value10'}}
req.body = jsonutils.dump_as_bytes(body)
self.assertRaises(exc.SnapshotNotFound,
self.controller.update_all, req,
fake.WILL_NOT_BE_FOUND_ID, body)
@mock.patch('cinder.db.snapshot_metadata_update', return_value=dict())
@mock.patch('cinder.db.snapshot_update')
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_update_item(self, snapshot_get_by_id,
snapshot_update, snapshot_metadata_update):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_get_by_id.return_value = snapshot_obj
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
res_dict = self.controller.update(req, fake.SNAPSHOT_ID, 'key1', body)
expected = {'meta': {'key1': 'value1'}}
self.assertEqual(expected, res_dict)
def test_update_item_nonexistent_snapshot(self):
req = fakes.HTTPRequest.blank(
'/v1.1/fake/snapshots/asdf/metadata/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
self.assertRaises(exc.SnapshotNotFound,
self.controller.update, req,
fake.WILL_NOT_BE_FOUND_ID, 'key1',
body)
def test_update_item_empty_body(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req,
fake.SNAPSHOT_ID, 'key1',
None)
@mock.patch('cinder.db.sqlalchemy.api._snapshot_get')
@mock.patch('cinder.db.snapshot_metadata_update', autospec=True)
def test_update_item_empty_key(self, metadata_update, snapshot_get):
snapshot_get.return_value = stub_get
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"": "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req,
fake.SNAPSHOT_ID, '', body)
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_update_item_key_too_long(self, snapshot_get_by_id):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_get_by_id.return_value = snapshot_obj
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {("a" * 260): "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.update,
req, fake.SNAPSHOT_ID, ("a" * 260), body)
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_update_item_value_too_long(self, snapshot_get_by_id):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_get_by_id.return_value = snapshot_obj
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": ("a" * 260)}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.update,
req, fake.SNAPSHOT_ID, "key1", body)
def test_update_item_too_many_keys(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1", "key2": "value2"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req,
fake.SNAPSHOT_ID, 'key1',
body)
def test_update_item_body_uri_mismatch(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/bad')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, fake.SNAPSHOT_ID, 'bad',
body)
@mock.patch('cinder.objects.Snapshot.get_by_id')
def test_invalid_metadata_items_on_create(self, snapshot_get_by_id):
snapshot = {
'id': fake.SNAPSHOT_ID,
'expected_attrs': ['metadata']
}
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
snapshot_get_by_id.return_value = snapshot_obj
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'POST'
req.headers["content-type"] = "application/json"
# test for long key
data = {"metadata": {"a" * 260: "value1"}}
req.body = jsonutils.dump_as_bytes(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, fake.SNAPSHOT_ID, data)
# test for long value
data = {"metadata": {"key": "v" * 260}}
req.body = jsonutils.dump_as_bytes(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, fake.SNAPSHOT_ID, data)
# test for empty key.
data = {"metadata": {"": "value1"}}
req.body = jsonutils.dump_as_bytes(data)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, fake.SNAPSHOT_ID, data)