Merge "The basic function modules of pagination, sort and filter"
This commit is contained in:
commit
2ae80a66a5
331
smaug/api/common.py
Normal file
331
smaug/api/common.py
Normal file
@ -0,0 +1,331 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import os
|
||||
import re
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from six.moves import urllib
|
||||
import webob
|
||||
|
||||
from smaug.i18n import _
|
||||
|
||||
|
||||
api_common_opts = [
|
||||
cfg.IntOpt('osapi_max_limit',
|
||||
default=1000,
|
||||
help='The maximum number of items that a collection '
|
||||
'resource returns in a single response'),
|
||||
cfg.StrOpt('osapi_plan_base_URL',
|
||||
help='Base URL that will be presented to users in links '
|
||||
'to the OpenStack Plan API',
|
||||
deprecated_name='osapi_compute_link_prefix'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(api_common_opts)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_pagination_params(params, max_limit=None):
|
||||
"""Return marker, limit, offset tuple from request.
|
||||
|
||||
:param params: `wsgi.Request`'s GET dictionary, possibly containing
|
||||
'marker', 'limit', and 'offset' variables. 'marker' is the
|
||||
id of the last element the client has seen, 'limit' is the
|
||||
maximum number of items to return and 'offset' is the number
|
||||
of items to skip from the marker or from the first element.
|
||||
If 'limit' is not specified, or > max_limit, we default to
|
||||
max_limit. Negative values for either offset or limit will
|
||||
cause exc.HTTPBadRequest() exceptions to be raised. If no
|
||||
offset is present we'll default to 0 and if no marker is
|
||||
present we'll default to None.
|
||||
:max_limit: Max value 'limit' return value can take
|
||||
:returns: Tuple (marker, limit, offset)
|
||||
"""
|
||||
max_limit = max_limit or CONF.osapi_max_limit
|
||||
limit = _get_limit_param(params, max_limit)
|
||||
marker = _get_marker_param(params)
|
||||
offset = _get_offset_param(params)
|
||||
return marker, limit, offset
|
||||
|
||||
|
||||
def _get_limit_param(params, max_limit=None):
|
||||
"""Extract integer limit from request's dictionary or fail.
|
||||
|
||||
Defaults to max_limit if not present and returns max_limit if present
|
||||
'limit' is greater than max_limit.
|
||||
"""
|
||||
max_limit = max_limit or CONF.osapi_max_limit
|
||||
try:
|
||||
limit = int(params.pop('limit', max_limit))
|
||||
except ValueError:
|
||||
msg = _('limit param must be an integer')
|
||||
raise webob.exc.HTTPBadRequest(explanation=msg)
|
||||
if limit < 0:
|
||||
msg = _('limit param must be positive')
|
||||
raise webob.exc.HTTPBadRequest(explanation=msg)
|
||||
limit = min(limit, max_limit)
|
||||
return limit
|
||||
|
||||
|
||||
def _get_marker_param(params):
|
||||
"""Extract marker id from request's dictionary (defaults to None)."""
|
||||
return params.pop('marker', None)
|
||||
|
||||
|
||||
def _get_offset_param(params):
|
||||
"""Extract offset id from request's dictionary (defaults to 0) or fail."""
|
||||
try:
|
||||
offset = int(params.pop('offset', 0))
|
||||
except ValueError:
|
||||
msg = _('offset param must be an integer')
|
||||
raise webob.exc.HTTPBadRequest(explanation=msg)
|
||||
|
||||
if offset < 0:
|
||||
msg = _('offset param must be positive')
|
||||
raise webob.exc.HTTPBadRequest(explanation=msg)
|
||||
|
||||
return offset
|
||||
|
||||
|
||||
def limited(items, request, max_limit=None):
|
||||
"""Return a slice of items according to requested offset and limit.
|
||||
|
||||
:param items: A sliceable entity
|
||||
:param request: ``wsgi.Request`` possibly containing 'offset' and 'limit'
|
||||
GET variables. 'offset' is where to start in the list,
|
||||
and 'limit' is the maximum number of items to return. If
|
||||
'limit' is not specified, 0, or > max_limit, we default
|
||||
to max_limit. Negative values for either offset or limit
|
||||
will cause exc.HTTPBadRequest() exceptions to be raised.
|
||||
:kwarg max_limit: The maximum number of items to return from 'items'
|
||||
"""
|
||||
max_limit = max_limit or CONF.osapi_max_limit
|
||||
marker, limit, offset = get_pagination_params(request.GET.copy(),
|
||||
max_limit)
|
||||
range_end = offset + (limit or max_limit)
|
||||
return items[offset:range_end]
|
||||
|
||||
|
||||
def limited_by_marker(items, request, max_limit=None):
|
||||
"""Return a slice of items according to the requested marker and limit."""
|
||||
max_limit = max_limit or CONF.osapi_max_limit
|
||||
marker, limit, __ = get_pagination_params(request.GET.copy(), max_limit)
|
||||
|
||||
start_index = 0
|
||||
if marker:
|
||||
start_index = -1
|
||||
for i, item in enumerate(items):
|
||||
if 'flavorid' in item:
|
||||
if item['flavorid'] == marker:
|
||||
start_index = i + 1
|
||||
break
|
||||
elif item['id'] == marker or item.get('uuid') == marker:
|
||||
start_index = i + 1
|
||||
break
|
||||
if start_index < 0:
|
||||
msg = _('marker [%s] not found') % marker
|
||||
raise webob.exc.HTTPBadRequest(explanation=msg)
|
||||
range_end = start_index + limit
|
||||
return items[start_index:range_end]
|
||||
|
||||
|
||||
def get_sort_params(params, default_key='created_at', default_dir='desc'):
|
||||
"""Retrieves sort keys/directions parameters.
|
||||
|
||||
Processes the parameters to create a list of sort keys and sort directions
|
||||
that correspond to either the 'sort' parameter or the 'sort_key' and
|
||||
'sort_dir' parameter values. The value of the 'sort' parameter is a comma-
|
||||
separated list of sort keys, each key is optionally appended with
|
||||
':<sort_direction>'.
|
||||
|
||||
Note that the 'sort_key' and 'sort_dir' parameters are deprecated in kilo
|
||||
and an exception is raised if they are supplied with the 'sort' parameter.
|
||||
|
||||
The sort parameters are removed from the request parameters by this
|
||||
function.
|
||||
|
||||
:param params: webob.multidict of request parameters (from
|
||||
smaug.api.openstack.wsgi.Request.params)
|
||||
:param default_key: default sort key value, added to the list if no
|
||||
sort keys are supplied
|
||||
:param default_dir: default sort dir value, added to the list if the
|
||||
corresponding key does not have a direction
|
||||
specified
|
||||
:returns: list of sort keys, list of sort dirs
|
||||
:raise webob.exc.HTTPBadRequest: If both 'sort' and either 'sort_key' or
|
||||
'sort_dir' are supplied parameters
|
||||
"""
|
||||
if 'sort' in params and ('sort_key' in params or 'sort_dir' in params):
|
||||
msg = _("The 'sort_key' and 'sort_dir' parameters are deprecated and "
|
||||
"cannot be used with the 'sort' parameter.")
|
||||
raise webob.exc.HTTPBadRequest(explanation=msg)
|
||||
sort_keys = []
|
||||
sort_dirs = []
|
||||
if 'sort' in params:
|
||||
for sort in params.pop('sort').strip().split(','):
|
||||
sort_key, _sep, sort_dir = sort.partition(':')
|
||||
if not sort_dir:
|
||||
sort_dir = default_dir
|
||||
sort_keys.append(sort_key.strip())
|
||||
sort_dirs.append(sort_dir.strip())
|
||||
else:
|
||||
sort_key = params.pop('sort_key', default_key)
|
||||
sort_dir = params.pop('sort_dir', default_dir)
|
||||
sort_keys.append(sort_key.strip())
|
||||
sort_dirs.append(sort_dir.strip())
|
||||
return sort_keys, sort_dirs
|
||||
|
||||
|
||||
def get_request_url(request):
|
||||
url = request.application_url
|
||||
headers = request.headers
|
||||
forwarded = headers.get('X-Forwarded-Host')
|
||||
if forwarded:
|
||||
url_parts = list(urllib.parse.urlsplit(url))
|
||||
url_parts[1] = re.split(',\s?', forwarded)[-1]
|
||||
url = urllib.parse.urlunsplit(url_parts).rstrip('/')
|
||||
return url
|
||||
|
||||
|
||||
def remove_version_from_href(href):
|
||||
"""Removes the first api version from the href.
|
||||
|
||||
Given: 'http://www.smaug.com/v1.1/123'
|
||||
Returns: 'http://www.smaug.com/123'
|
||||
|
||||
Given: 'http://www.smaug.com/v1.1'
|
||||
Returns: 'http://www.smaug.com'
|
||||
|
||||
"""
|
||||
parsed_url = urllib.parse.urlsplit(href)
|
||||
url_parts = parsed_url.path.split('/', 2)
|
||||
|
||||
# NOTE: this should match vX.X or vX
|
||||
expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
|
||||
if expression.match(url_parts[1]):
|
||||
del url_parts[1]
|
||||
|
||||
new_path = '/'.join(url_parts)
|
||||
|
||||
if new_path == parsed_url.path:
|
||||
msg = 'href %s does not contain version' % href
|
||||
LOG.debug(msg)
|
||||
raise ValueError(msg)
|
||||
|
||||
parsed_url = list(parsed_url)
|
||||
parsed_url[2] = new_path
|
||||
return urllib.parse.urlunsplit(parsed_url)
|
||||
|
||||
|
||||
class ViewBuilder(object):
|
||||
"""Model API responses as dictionaries."""
|
||||
|
||||
_collection_name = None
|
||||
|
||||
def _get_links(self, request, identifier):
|
||||
return [{"rel": "self",
|
||||
"href": self._get_href_link(request, identifier), },
|
||||
{"rel": "bookmark",
|
||||
"href": self._get_bookmark_link(request, identifier), }]
|
||||
|
||||
def _get_next_link(self, request, identifier, collection_name):
|
||||
"""Return href string with proper limit and marker params."""
|
||||
params = request.params.copy()
|
||||
params["marker"] = identifier
|
||||
prefix = self._update_link_prefix(get_request_url(request),
|
||||
CONF.osapi_plan_base_URL)
|
||||
url = os.path.join(prefix,
|
||||
request.environ["smaug.context"].project_id,
|
||||
collection_name)
|
||||
return "%s?%s" % (url, urllib.parse.urlencode(params))
|
||||
|
||||
def _get_href_link(self, request, identifier):
|
||||
"""Return an href string pointing to this object."""
|
||||
prefix = self._update_link_prefix(get_request_url(request),
|
||||
CONF.osapi_plan_base_URL)
|
||||
return os.path.join(prefix,
|
||||
request.environ["smaug.context"].project_id,
|
||||
self._collection_name,
|
||||
str(identifier))
|
||||
|
||||
def _get_bookmark_link(self, request, identifier):
|
||||
"""Create a URL that refers to a specific resource."""
|
||||
base_url = remove_version_from_href(get_request_url(request))
|
||||
base_url = self._update_link_prefix(base_url,
|
||||
CONF.osapi_plan_base_URL)
|
||||
return os.path.join(base_url,
|
||||
request.environ["smaug.context"].project_id,
|
||||
self._collection_name,
|
||||
str(identifier))
|
||||
|
||||
def _get_collection_links(self, request, items, collection_name,
|
||||
item_count=None, id_key="uuid"):
|
||||
"""Retrieve 'next' link, if applicable.
|
||||
|
||||
The next link is included if we are returning as many items as we can,
|
||||
given the restrictions of limit optional request parameter and
|
||||
osapi_max_limit configuration parameter as long as we are returning
|
||||
some elements.
|
||||
|
||||
So we return next link if:
|
||||
|
||||
1) 'limit' param is specified and equal to the number of items.
|
||||
2) 'limit' param is NOT specified and the number of items is
|
||||
equal to CONF.osapi_max_limit.
|
||||
|
||||
:param request: API request
|
||||
:param items: List of collection items
|
||||
:param collection_name: Name of collection, used to generate the
|
||||
next link for a pagination query
|
||||
:param item_count: Length of the list of the original collection
|
||||
items
|
||||
:param id_key: Attribute key used to retrieve the unique ID, used
|
||||
to generate the next link marker for a pagination query
|
||||
:returns: links
|
||||
"""
|
||||
item_count = item_count or len(items)
|
||||
limit = _get_limit_param(request.GET.copy())
|
||||
if len(items) and limit <= item_count:
|
||||
return self._generate_next_link(items, id_key, request,
|
||||
collection_name)
|
||||
|
||||
return []
|
||||
|
||||
def _generate_next_link(self, items, id_key, request,
|
||||
collection_name):
|
||||
links = []
|
||||
last_item = items[-1]
|
||||
if id_key in last_item:
|
||||
last_item_id = last_item[id_key]
|
||||
else:
|
||||
last_item_id = last_item["id"]
|
||||
links.append({
|
||||
"rel": "next",
|
||||
"href": self._get_next_link(request, last_item_id,
|
||||
collection_name),
|
||||
})
|
||||
return links
|
||||
|
||||
def _update_link_prefix(self, orig_url, prefix):
|
||||
if not prefix:
|
||||
return orig_url
|
||||
url_parts = list(urllib.parse.urlsplit(orig_url))
|
||||
prefix_parts = list(urllib.parse.urlsplit(prefix))
|
||||
url_parts[0:2] = prefix_parts[0:2]
|
||||
url_parts[2] = prefix_parts[2] + url_parts[2]
|
||||
|
||||
return urllib.parse.urlunsplit(url_parts).rstrip('/')
|
438
smaug/tests/unit/api/test_common.py
Normal file
438
smaug/tests/unit/api/test_common.py
Normal file
@ -0,0 +1,438 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Test suites for 'common' code used throughout the OpenStack HTTP API.
|
||||
"""
|
||||
|
||||
import mock
|
||||
from testtools import matchers
|
||||
import webob
|
||||
import webob.exc
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from smaug.api import common
|
||||
from smaug.tests import base
|
||||
|
||||
|
||||
NS = "{http://docs.openstack.org/compute/api/v1.1}"
|
||||
ATOMNS = "{http://www.w3.org/2005/Atom}"
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class PaginationParamsTest(base.TestCase):
|
||||
"""Unit tests for `smaug.api.common.get_pagination_params` method.
|
||||
|
||||
This method takes in a request object and returns 'marker' and 'limit'
|
||||
GET params.
|
||||
"""
|
||||
|
||||
def test_nonnumerical_limit(self):
|
||||
"""Test nonnumerical limit param."""
|
||||
req = webob.Request.blank('/?limit=hello')
|
||||
self.assertRaises(
|
||||
webob.exc.HTTPBadRequest, common.get_pagination_params,
|
||||
req.GET.copy())
|
||||
|
||||
@mock.patch.object(common, 'CONF')
|
||||
def test_no_params(self, mock_cfg):
|
||||
"""Test no params."""
|
||||
mock_cfg.osapi_max_limit = 100
|
||||
req = webob.Request.blank('/')
|
||||
expected = (None, 100, 0)
|
||||
self.assertEqual(expected,
|
||||
common.get_pagination_params(req.GET.copy()))
|
||||
|
||||
def test_valid_marker(self):
|
||||
"""Test valid marker param."""
|
||||
marker = '263abb28-1de6-412f-b00b-f0ee0c4333c2'
|
||||
req = webob.Request.blank('/?marker=' + marker)
|
||||
expected = (marker, CONF.osapi_max_limit, 0)
|
||||
self.assertEqual(expected,
|
||||
common.get_pagination_params(req.GET.copy()))
|
||||
|
||||
def test_valid_limit(self):
|
||||
"""Test valid limit param."""
|
||||
req = webob.Request.blank('/?limit=10')
|
||||
expected = (None, 10, 0)
|
||||
self.assertEqual(expected,
|
||||
common.get_pagination_params(req.GET.copy()))
|
||||
|
||||
def test_invalid_limit(self):
|
||||
"""Test invalid limit param."""
|
||||
req = webob.Request.blank('/?limit=-2')
|
||||
self.assertRaises(
|
||||
webob.exc.HTTPBadRequest, common.get_pagination_params,
|
||||
req.GET.copy())
|
||||
|
||||
def test_valid_limit_and_marker(self):
|
||||
"""Test valid limit and marker parameters."""
|
||||
marker = '263abb28-1de6-412f-b00b-f0ee0c4333c2'
|
||||
req = webob.Request.blank('/?limit=20&marker=%s' % marker)
|
||||
expected = (marker, 20, 0)
|
||||
self.assertEqual(expected,
|
||||
common.get_pagination_params(req.GET.copy()))
|
||||
|
||||
|
||||
class SortParamUtilsTest(base.TestCase):
|
||||
|
||||
def test_get_sort_params_defaults(self):
|
||||
"""Verifies the default sort key and direction."""
|
||||
sort_keys, sort_dirs = common.get_sort_params({})
|
||||
self.assertEqual(['created_at'], sort_keys)
|
||||
self.assertEqual(['desc'], sort_dirs)
|
||||
|
||||
def test_get_sort_params_override_defaults(self):
|
||||
"""Verifies that the defaults can be overriden."""
|
||||
sort_keys, sort_dirs = common.get_sort_params({}, default_key='key1',
|
||||
default_dir='dir1')
|
||||
self.assertEqual(['key1'], sort_keys)
|
||||
self.assertEqual(['dir1'], sort_dirs)
|
||||
|
||||
def test_get_sort_params_single_value_sort_param(self):
|
||||
"""Verifies a single sort key and direction."""
|
||||
params = {'sort': 'key1:dir1'}
|
||||
sort_keys, sort_dirs = common.get_sort_params(params)
|
||||
self.assertEqual(['key1'], sort_keys)
|
||||
self.assertEqual(['dir1'], sort_dirs)
|
||||
|
||||
def test_get_sort_params_single_value_old_params(self):
|
||||
"""Verifies a single sort key and direction."""
|
||||
params = {'sort_key': 'key1', 'sort_dir': 'dir1'}
|
||||
sort_keys, sort_dirs = common.get_sort_params(params)
|
||||
self.assertEqual(['key1'], sort_keys)
|
||||
self.assertEqual(['dir1'], sort_dirs)
|
||||
|
||||
def test_get_sort_params_single_with_default_sort_param(self):
|
||||
"""Verifies a single sort value with a default direction."""
|
||||
params = {'sort': 'key1'}
|
||||
sort_keys, sort_dirs = common.get_sort_params(params)
|
||||
self.assertEqual(['key1'], sort_keys)
|
||||
# Direction should be defaulted
|
||||
self.assertEqual(['desc'], sort_dirs)
|
||||
|
||||
def test_get_sort_params_single_with_default_old_params(self):
|
||||
"""Verifies a single sort value with a default direction."""
|
||||
params = {'sort_key': 'key1'}
|
||||
sort_keys, sort_dirs = common.get_sort_params(params)
|
||||
self.assertEqual(['key1'], sort_keys)
|
||||
# Direction should be defaulted
|
||||
self.assertEqual(['desc'], sort_dirs)
|
||||
|
||||
def test_get_sort_params_multiple_values(self):
|
||||
"""Verifies multiple sort parameter values."""
|
||||
params = {'sort': 'key1:dir1,key2:dir2,key3:dir3'}
|
||||
sort_keys, sort_dirs = common.get_sort_params(params)
|
||||
self.assertEqual(['key1', 'key2', 'key3'], sort_keys)
|
||||
self.assertEqual(['dir1', 'dir2', 'dir3'], sort_dirs)
|
||||
|
||||
def test_get_sort_params_multiple_not_all_dirs(self):
|
||||
"""Verifies multiple sort keys without all directions."""
|
||||
params = {'sort': 'key1:dir1,key2,key3:dir3'}
|
||||
sort_keys, sort_dirs = common.get_sort_params(params)
|
||||
self.assertEqual(['key1', 'key2', 'key3'], sort_keys)
|
||||
# Second key is missing the direction, should be defaulted
|
||||
self.assertEqual(['dir1', 'desc', 'dir3'], sort_dirs)
|
||||
|
||||
def test_get_sort_params_multiple_override_default_dir(self):
|
||||
"""Verifies multiple sort keys and overriding default direction."""
|
||||
params = {'sort': 'key1:dir1,key2,key3'}
|
||||
sort_keys, sort_dirs = common.get_sort_params(params,
|
||||
default_dir='foo')
|
||||
self.assertEqual(['key1', 'key2', 'key3'], sort_keys)
|
||||
self.assertEqual(['dir1', 'foo', 'foo'], sort_dirs)
|
||||
|
||||
def test_get_sort_params_params_modified(self):
|
||||
"""Verifies that the input sort parameter are modified."""
|
||||
params = {'sort': 'key1:dir1,key2:dir2,key3:dir3'}
|
||||
common.get_sort_params(params)
|
||||
self.assertEqual({}, params)
|
||||
|
||||
params = {'sort_key': 'key1', 'sort_dir': 'dir1'}
|
||||
common.get_sort_params(params)
|
||||
self.assertEqual({}, params)
|
||||
|
||||
def test_get_sort_params_random_spaces(self):
|
||||
"""Verifies that leading and trailing spaces are removed."""
|
||||
params = {'sort': ' key1 : dir1,key2: dir2 , key3 '}
|
||||
sort_keys, sort_dirs = common.get_sort_params(params)
|
||||
self.assertEqual(['key1', 'key2', 'key3'], sort_keys)
|
||||
self.assertEqual(['dir1', 'dir2', 'desc'], sort_dirs)
|
||||
|
||||
def test_get_params_mix_sort_and_old_params(self):
|
||||
"""An exception is raised if both types of sorting params are given."""
|
||||
for params in ({'sort': 'k1', 'sort_key': 'k1'},
|
||||
{'sort': 'k1', 'sort_dir': 'd1'},
|
||||
{'sort': 'k1', 'sort_key': 'k1', 'sort_dir': 'd2'}):
|
||||
self.assertRaises(webob.exc.HTTPBadRequest,
|
||||
common.get_sort_params,
|
||||
params)
|
||||
|
||||
|
||||
class MiscFunctionsTest(base.TestCase):
|
||||
|
||||
def test_remove_major_version_from_href(self):
|
||||
fixture = 'http://www.testsite.com/v1/images'
|
||||
expected = 'http://www.testsite.com/images'
|
||||
actual = common.remove_version_from_href(fixture)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_remove_version_from_href(self):
|
||||
fixture = 'http://www.testsite.com/v1.1/images'
|
||||
expected = 'http://www.testsite.com/images'
|
||||
actual = common.remove_version_from_href(fixture)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_remove_version_from_href_2(self):
|
||||
fixture = 'http://www.testsite.com/v1.1/'
|
||||
expected = 'http://www.testsite.com/'
|
||||
actual = common.remove_version_from_href(fixture)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_remove_version_from_href_3(self):
|
||||
fixture = 'http://www.testsite.com/v10.10'
|
||||
expected = 'http://www.testsite.com'
|
||||
actual = common.remove_version_from_href(fixture)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_remove_version_from_href_4(self):
|
||||
fixture = 'http://www.testsite.com/v1.1/images/v10.5'
|
||||
expected = 'http://www.testsite.com/images/v10.5'
|
||||
actual = common.remove_version_from_href(fixture)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_remove_version_from_href_bad_request(self):
|
||||
fixture = 'http://www.testsite.com/1.1/images'
|
||||
self.assertRaises(ValueError,
|
||||
common.remove_version_from_href,
|
||||
fixture)
|
||||
|
||||
def test_remove_version_from_href_bad_request_2(self):
|
||||
fixture = 'http://www.testsite.com/v/images'
|
||||
self.assertRaises(ValueError,
|
||||
common.remove_version_from_href,
|
||||
fixture)
|
||||
|
||||
def test_remove_version_from_href_bad_request_3(self):
|
||||
fixture = 'http://www.testsite.com/v1.1images'
|
||||
self.assertRaises(ValueError,
|
||||
common.remove_version_from_href,
|
||||
fixture)
|
||||
|
||||
|
||||
class TestCollectionLinks(base.TestCase):
|
||||
"""Tests the _get_collection_links method."""
|
||||
|
||||
def _validate_next_link(self, item_count, osapi_max_limit, limit,
|
||||
should_link_exist):
|
||||
req = webob.Request.blank('/?limit=%s' % limit if limit else '/')
|
||||
link_return = [{"rel": "next", "href": "fake_link"}]
|
||||
self.flags(osapi_max_limit=osapi_max_limit)
|
||||
if limit is None:
|
||||
limited_list_size = min(item_count, osapi_max_limit)
|
||||
else:
|
||||
limited_list_size = min(item_count, osapi_max_limit, limit)
|
||||
limited_list = [{"uuid": str(i)} for i in range(limited_list_size)]
|
||||
builder = common.ViewBuilder()
|
||||
|
||||
def get_pagination_params(params, max_limit=CONF.osapi_max_limit,
|
||||
original_call=common.get_pagination_params):
|
||||
return original_call(params, max_limit)
|
||||
|
||||
def _get_limit_param(params, max_limit=CONF.osapi_max_limit,
|
||||
original_call=common._get_limit_param):
|
||||
return original_call(params, max_limit)
|
||||
|
||||
with mock.patch.object(common, 'get_pagination_params',
|
||||
get_pagination_params), \
|
||||
mock.patch.object(common, '_get_limit_param',
|
||||
_get_limit_param), \
|
||||
mock.patch.object(common.ViewBuilder, '_generate_next_link',
|
||||
return_value=link_return) as href_link_mock:
|
||||
results = builder._get_collection_links(req, limited_list,
|
||||
mock.sentinel.coll_key,
|
||||
item_count, "uuid")
|
||||
if should_link_exist:
|
||||
href_link_mock.assert_called_once_with(limited_list, "uuid",
|
||||
req,
|
||||
mock.sentinel.coll_key)
|
||||
self.assertThat(results, matchers.HasLength(1))
|
||||
else:
|
||||
self.assertFalse(href_link_mock.called)
|
||||
self.assertThat(results, matchers.HasLength(0))
|
||||
|
||||
def test_items_equals_osapi_max_no_limit(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 5
|
||||
limit = None
|
||||
should_link_exist = True
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_items_equals_osapi_max_greater_than_limit(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 5
|
||||
limit = 4
|
||||
should_link_exist = True
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_items_equals_osapi_max_equals_limit(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 5
|
||||
limit = 5
|
||||
should_link_exist = True
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_items_equals_osapi_max_less_than_limit(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 5
|
||||
limit = 6
|
||||
should_link_exist = True
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_items_less_than_osapi_max_no_limit(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 7
|
||||
limit = None
|
||||
should_link_exist = False
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_limit_less_than_items_less_than_osapi_max(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 7
|
||||
limit = 4
|
||||
should_link_exist = True
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_limit_equals_items_less_than_osapi_max(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 7
|
||||
limit = 5
|
||||
should_link_exist = True
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_items_less_than_limit_less_than_osapi_max(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 7
|
||||
limit = 6
|
||||
should_link_exist = False
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_items_less_than_osapi_max_equals_limit(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 7
|
||||
limit = 7
|
||||
should_link_exist = False
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_items_less_than_osapi_max_less_than_limit(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 7
|
||||
limit = 8
|
||||
should_link_exist = False
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_items_greater_than_osapi_max_no_limit(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 3
|
||||
limit = None
|
||||
should_link_exist = True
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_limit_less_than_items_greater_than_osapi_max(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 3
|
||||
limit = 2
|
||||
should_link_exist = True
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_items_greater_than_osapi_max_equals_limit(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 3
|
||||
limit = 3
|
||||
should_link_exist = True
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_items_greater_than_limit_greater_than_osapi_max(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 3
|
||||
limit = 4
|
||||
should_link_exist = True
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_items_equals_limit_greater_than_osapi_max(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 3
|
||||
limit = 5
|
||||
should_link_exist = True
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
def test_limit_greater_than_items_greater_than_osapi_max(self):
|
||||
item_count = 5
|
||||
osapi_max_limit = 3
|
||||
limit = 6
|
||||
should_link_exist = True
|
||||
self._validate_next_link(item_count, osapi_max_limit, limit,
|
||||
should_link_exist)
|
||||
|
||||
|
||||
class LinkPrefixTest(base.TestCase):
|
||||
def test_update_link_prefix(self):
|
||||
vb = common.ViewBuilder()
|
||||
result = vb._update_link_prefix("http://192.168.0.243:24/",
|
||||
"http://127.0.0.1/volume")
|
||||
self.assertEqual("http://127.0.0.1/volume", result)
|
||||
|
||||
result = vb._update_link_prefix("http://foo.x.com/v1",
|
||||
"http://new.prefix.com")
|
||||
self.assertEqual("http://new.prefix.com/v1", result)
|
||||
|
||||
result = vb._update_link_prefix(
|
||||
"http://foo.x.com/v1",
|
||||
"http://new.prefix.com:20455/new_extra_prefix")
|
||||
self.assertEqual("http://new.prefix.com:20455/new_extra_prefix/v1",
|
||||
result)
|
||||
|
||||
|
||||
class RequestUrlTest(base.TestCase):
|
||||
def test_get_request_url_no_forward(self):
|
||||
app_url = 'http://127.0.0.1/v2;param?key=value#frag'
|
||||
request = type('', (), {
|
||||
'application_url': app_url,
|
||||
'headers': {}
|
||||
})
|
||||
result = common.get_request_url(request)
|
||||
self.assertEqual(app_url, result)
|
||||
|
||||
def test_get_request_url_forward(self):
|
||||
request = type('', (), {
|
||||
'application_url': 'http://127.0.0.1/v2;param?key=value#frag',
|
||||
'headers': {'X-Forwarded-Host': '192.168.0.243:24'}
|
||||
})
|
||||
result = common.get_request_url(request)
|
||||
self.assertEqual('http://192.168.0.243:24/v2;param?key=value#frag',
|
||||
result)
|
Loading…
x
Reference in New Issue
Block a user