Refactor pagination db functional tests

The pagination-related functional tests are overly complex and aren't
adding a lot of value. This rewrites them in a more straightforward way
by incorporating them into the main test class.

Related to bp refactor-db-layer

Change-Id: Ic08914cfa8135d73a4aae6724a6561bc337b91fb
This commit is contained in:
Brian Waldon 2012-06-26 15:35:42 -07:00
parent c56cfffc78
commit faa5ec7bff
4 changed files with 94 additions and 227 deletions

View File

@ -132,7 +132,7 @@ def _do_pagination(context, images, marker, limit, show_deleted):
else:
raise exception.NotFound()
end = start + limit if limit else None
end = start + limit if limit is not None else None
return images[start:end]

View File

@ -18,12 +18,10 @@
import copy
import datetime
import random
from glance.common import context
from glance.common import exception
from glance.common import utils
from glance.openstack.common import timeutils
# The default sort order of results is whatever sort key is specified,
@ -34,64 +32,28 @@ from glance.openstack.common import timeutils
UUID1, UUID2, UUID3 = sorted([utils.generate_uuid() for x in range(3)])
def build_fixtures():
dt = datetime.datetime.now()
return [
{
'id': UUID1,
'name': 'fake image #1',
'status': 'active',
'disk_format': 'ami',
'container_format': 'ami',
'is_public': False,
'created_at': dt,
'updated_at': dt,
'deleted_at': None,
'deleted': False,
'checksum': None,
'min_disk': 0,
'min_ram': 0,
'size': 13,
'location': "swift://user:passwd@acct/container/obj.tar.0",
'properties': {'type': 'kernel', 'foo': 'bar'},
},
{
'id': UUID2,
'name': 'fake image #2',
'status': 'active',
'disk_format': 'vhd',
'container_format': 'ovf',
'is_public': True,
'created_at': dt,
'updated_at': dt + datetime.timedelta(microseconds=5),
'deleted_at': None,
'deleted': False,
'checksum': None,
'min_disk': 5,
'min_ram': 256,
'size': 19,
'location': "file:///tmp/glance-tests/2",
'properties': {}
},
{
'id': UUID3,
'name': 'fake image #2',
'status': 'active',
'disk_format': 'vhd',
'container_format': 'ovf',
'is_public': True,
'created_at': dt + datetime.timedelta(microseconds=5),
'updated_at': dt + datetime.timedelta(microseconds=5),
'deleted_at': None,
'deleted': False,
'checksum': None,
'min_disk': 5,
'min_ram': 256,
'size': 19,
'location': "file:///tmp/glance-tests/2",
'properties': {},
},
]
def build_image_fixture(**kwargs):
default_datetime = datetime.datetime.now()
image = {
'id': utils.generate_uuid(),
'name': 'fake image #2',
'status': 'active',
'disk_format': 'vhd',
'container_format': 'ovf',
'is_public': True,
'created_at': default_datetime,
'updated_at': default_datetime,
'deleted_at': None,
'deleted': False,
'checksum': None,
'min_disk': 5,
'min_ram': 256,
'size': 19,
'location': "file:///tmp/glance-tests/2",
'properties': {},
}
image.update(kwargs)
return image
class BaseTestCase(object):
@ -100,11 +62,36 @@ class BaseTestCase(object):
self.context = context.RequestContext(is_admin=False)
self.configure()
self.reset()
self.fixtures = build_fixtures()
self.create_fixtures(self.fixtures)
self.fixtures = self.build_image_fixtures()
self.create_images(self.fixtures)
def create_fixtures(self, fixtures):
for fixture in fixtures:
def build_image_fixtures(self):
dt1 = datetime.datetime.now()
dt2 = dt1 + datetime.timedelta(microseconds=5)
fixtures = [
{
'id': UUID1,
'created_at': dt1,
'updated_at': dt1,
'properties': {'foo': 'bar'},
'size': 13,
},
{
'id': UUID2,
'created_at': dt1,
'updated_at': dt2,
'size': 17,
},
{
'id': UUID3,
'created_at': dt2,
'updated_at': dt2,
},
]
return [build_image_fixture(**fixture) for fixture in fixtures]
def create_images(self, images):
for fixture in images:
self.db_api.image_create(self.adm_context, fixture)
def reset(self):
@ -203,21 +190,61 @@ class BaseTestCase(object):
"""Specify a deleted image as a marker if showing deleted images."""
self.db_api.image_destroy(self.adm_context, UUID3)
images = self.db_api.image_get_all(self.adm_context, marker=UUID3)
#NOTE(bcwaldon): an admin should see all images (deleted or not)
self.assertEquals(2, len(images))
def test_image_get_all_marker_deleted_showing_deleted(self):
"""Specify a deleted image as a marker if showing deleted images."""
"""Specify a deleted image as a marker if showing deleted images.
A non-admin user has to explicitly ask for deleted
images, and should only see deleted images in the results
"""
self.db_api.image_destroy(self.adm_context, UUID3)
self.db_api.image_destroy(self.adm_context, UUID1)
filters = {'deleted': True}
images = self.db_api.image_get_all(self.context, marker=UUID3,
filters=filters)
filters=filters)
self.assertEquals(1, len(images))
def test_image_get_all_limit(self):
images = self.db_api.image_get_all(self.context, limit=2)
self.assertEquals(2, len(images))
# A limit of None should not equate to zero
images = self.db_api.image_get_all(self.context, limit=None)
self.assertEquals(3, len(images))
# A limit of zero should actually mean zero
images = self.db_api.image_get_all(self.context, limit=0)
self.assertEquals(0, len(images))
def test_image_paginate(self):
"""Paginate through a list of images using limit and marker"""
extra_uuids = [utils.generate_uuid() for i in range(2)]
extra_images = [build_image_fixture(id=_id) for _id in extra_uuids]
self.create_images(extra_images)
# Reverse uuids to match default sort of created_at
extra_uuids.reverse()
page = self.db_api.image_get_all(self.context, limit=2)
self.assertEquals(extra_uuids, [i['id'] for i in page])
last = page[-1]['id']
page = self.db_api.image_get_all(self.context, limit=2, marker=last)
self.assertEquals([UUID3, UUID2], [i['id'] for i in page])
page = self.db_api.image_get_all(self.context, limit=2, marker=UUID2)
self.assertEquals([UUID1], [i['id'] for i in page])
def test_image_get_all_invalid_sort_key(self):
self.assertRaises(exception.InvalidSortKey, self.db_api.image_get_all,
self.context, sort_key='blah')
def test_image_get_all_limit_marker(self):
images = self.db_api.image_get_all(self.context, limit=2)
self.assertEquals(2, len(images))
def test_image_tag_create(self):
tag = self.db_api.image_tag_create(self.context, UUID1, 'snap')
self.assertEqual('snap', tag)
@ -280,132 +307,3 @@ class BaseTestCase(object):
member=TENANT2,
image_id=utils.generate_uuid())
_assertMemberListMatch([], output)
class BaseTestCasePaging(object):
""" Checks the paging order, by paging through random images.
It generates images with random min_disk, created_at and image id.
Image id is a UUID and unique, min_disk and created_at are drawn from
a small range so are expected to have duplicates. Then we try paging
through, and check that we get back the images in the order we expect.
"""
ITEM_COUNT = 20
PAGE_SIZE = 5
TIME_VALUES = 10
MINDISK_VALUES = 10
def setUp(self):
self.adm_context = context.RequestContext(is_admin=True)
self.context = context.RequestContext(is_admin=False)
self.configure()
self.reset()
self.fixtures = self.build_fixtures()
self.create_fixtures(self.fixtures)
def configure(self):
pass
def create_fixtures(self, fixtures):
for fixture in fixtures:
self.db_api.image_create(self.adm_context, fixture)
def reset(self):
pass
def _build_random_image(self, t, min_disk):
image_id = utils.generate_uuid()
return {'id': image_id,
'name': 'fake image #' + image_id,
'status': 'active',
'disk_format': 'ami',
'container_format': 'ami',
'is_public': True,
'created_at': t,
'updated_at': t,
'deleted_at': None,
'deleted': False,
'checksum': None,
'min_disk': min_disk,
'min_ram': 0,
'size': 0,
'location': "file:///tmp/glance-tests/" + image_id,
'properties': {}}
def build_fixtures(self):
self.images = []
t0 = timeutils.utcnow()
for _ in xrange(0, self.ITEM_COUNT):
tdelta = random.uniform(0, self.TIME_VALUES)
min_disk = random.uniform(0, self.MINDISK_VALUES)
t = t0 + datetime.timedelta(microseconds=tdelta)
image = self._build_random_image(t, min_disk)
self.images.append(image)
return self.images
def _sort_results(self, sort_dir, sort_key):
results = self.images
results = sorted(results, key=lambda i: (i[sort_key],
i['created_at'],
i['id']))
if sort_dir == 'desc':
results.reverse()
return results
def _do_test(self, sort_dir, sort_key, **kwargs):
try:
limit = kwargs['limit']
except KeyError:
limit = self.PAGE_SIZE
marker = None
output = []
expected = []
for image in self._sort_results(sort_dir, sort_key):
expected.append(image)
while True:
results = self.db_api.image_get_all(self.context,
marker=marker,
limit=limit,
sort_key=sort_key,
sort_dir=sort_dir)
if not results:
break
else:
output.extend(results)
# Prevent this running infinitely in error cases
self.assertTrue(len(output) < (500 + len(expected)))
marker = results[-1]['id']
self.assertEquals(len(output), len(expected))
self.assertEquals([image['id'] for image in expected],
[image['id'] for image in output])
def test_sort_by_disk_asc(self):
self._do_test('asc', 'min_disk')
def test_sort_by_disk_desc(self):
self._do_test('desc', 'min_disk')
def test_sort_by_created_at_asc(self):
self._do_test('asc', 'created_at')
def test_sort_by_created_at_desc(self):
self._do_test('desc', 'created_at')
def test_sort_by_id_asc(self):
self._do_test('asc', 'id')
def test_sort_by_id_desc(self):
self._do_test('desc', 'id')
#NOTE(bcwaldon): make sure this works!
def test_limit_None(self):
self._do_test('desc', 'id', limit=None)

View File

@ -30,16 +30,3 @@ class TestSimpleDriver(base.IsolatedUnitTest, tests.BaseTestCase):
def reset(self):
self.db_api.reset()
class TestSimpleDriverPaging(base.IsolatedUnitTest,
tests.BaseTestCasePaging):
def setUp(self):
base.IsolatedUnitTest.setUp(self)
tests.BaseTestCasePaging.setUp(self)
def configure(self):
self.db_api = glance.db.simple.api
def reset(self):
self.db_api.reset()

View File

@ -36,21 +36,3 @@ class TestSqlalchemyDriver(base.IsolatedUnitTest, tests.BaseTestCase):
def reset(self):
db_models.unregister_models(self.db_api._ENGINE)
db_models.register_models(self.db_api._ENGINE)
class TestSqlalchemyDriverPaging(base.IsolatedUnitTest,
tests.BaseTestCasePaging):
def setUp(self):
base.IsolatedUnitTest.setUp(self)
tests.BaseTestCasePaging.setUp(self)
def configure(self):
self.config(sql_connection='sqlite://',
verbose=False,
debug=False)
self.db_api = glance.db.sqlalchemy.api
self.db_api.configure_db()
def reset(self):
db_models.unregister_models(self.db_api._ENGINE)
db_models.register_models(self.db_api._ENGINE)