glance/glance/tests/functional/db/base.py

2630 lines
112 KiB
Python

# Copyright 2010-2012 OpenStack Foundation
# Copyright 2012 Justin Santa Barbara
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
import uuid
import mock
from oslo_db import exception as db_exception
from oslo_db.sqlalchemy import utils as sqlalchemyutils
# NOTE(jokke): simplified transition to py3, behaves like py2 xrange
from six.moves import range
from six.moves import reduce
from sqlalchemy.dialects import sqlite
from glance.common import exception
from glance.common import timeutils
from glance import context
from glance.db.sqlalchemy import api as db_api
from glance.tests import functional
import glance.tests.functional.db as db_tests
from glance.tests import utils as test_utils
# The default sort order of results is whatever sort key is specified,
# plus created_at and id for ties. When we're not specifying a sort_key,
# we get the default (created_at). Some tests below expect the fixtures to be
# returned in array-order, so if the created_at timestamps are the same,
# these tests rely on the UUID* values being in order
UUID1, UUID2, UUID3 = sorted([str(uuid.uuid4()) for x in range(3)])
def build_image_fixture(**kwargs):
default_datetime = timeutils.utcnow()
image = {
'id': str(uuid.uuid4()),
'name': 'fake image #2',
'status': 'active',
'disk_format': 'vhd',
'container_format': 'ovf',
'is_public': True,
'created_at': default_datetime,
'updated_at': default_datetime,
'deleted_at': None,
'deleted': False,
'checksum': None,
'min_disk': 5,
'min_ram': 256,
'size': 19,
'locations': [{'url': "file:///tmp/glance-tests/2",
'metadata': {}, 'status': 'active'}],
'properties': {},
}
if 'visibility' in kwargs:
image.pop('is_public')
image.update(kwargs)
return image
def build_task_fixture(**kwargs):
default_datetime = timeutils.utcnow()
task = {
'id': str(uuid.uuid4()),
'type': 'import',
'status': 'pending',
'input': {'ping': 'pong'},
'owner': str(uuid.uuid4()),
'message': None,
'expires_at': None,
'created_at': default_datetime,
'updated_at': default_datetime,
}
task.update(kwargs)
return task
class FunctionalInitWrapper(functional.FunctionalTest):
def setUp(self):
super(FunctionalInitWrapper, self).setUp()
self.config(policy_file=self.policy_file, group='oslo_policy')
class TestDriver(test_utils.BaseTestCase):
def setUp(self):
super(TestDriver, self).setUp()
context_cls = context.RequestContext
self.adm_context = context_cls(is_admin=True,
auth_token='user:user:admin')
self.context = context_cls(is_admin=False,
auth_token='user:user:user')
self.db_api = db_tests.get_db(self.config)
db_tests.reset_db(self.db_api)
self.fixtures = self.build_image_fixtures()
self.create_images(self.fixtures)
def build_image_fixtures(self):
dt1 = timeutils.utcnow()
dt2 = dt1 + datetime.timedelta(microseconds=5)
fixtures = [
{
'id': UUID1,
'created_at': dt1,
'updated_at': dt1,
'properties': {'foo': 'bar', 'far': 'boo'},
'protected': True,
'size': 13,
},
{
'id': UUID2,
'created_at': dt1,
'updated_at': dt2,
'size': 17,
},
{
'id': UUID3,
'created_at': dt2,
'updated_at': dt2,
},
]
return [build_image_fixture(**fixture) for fixture in fixtures]
def create_images(self, images):
for fixture in images:
self.db_api.image_create(self.adm_context, fixture)
self.delay_inaccurate_clock()
class DriverTests(object):
def test_image_create_requires_status(self):
fixture = {'name': 'mark', 'size': 12}
self.assertRaises(exception.Invalid,
self.db_api.image_create, self.context, fixture)
fixture = {'name': 'mark', 'size': 12, 'status': 'queued'}
self.db_api.image_create(self.context, fixture)
@mock.patch.object(timeutils, 'utcnow')
def test_image_create_defaults(self, mock_utcnow):
mock_utcnow.return_value = datetime.datetime.utcnow()
create_time = timeutils.utcnow()
values = {'status': 'queued',
'created_at': create_time,
'updated_at': create_time}
image = self.db_api.image_create(self.context, values)
self.assertIsNone(image['name'])
self.assertIsNone(image['container_format'])
self.assertEqual(0, image['min_ram'])
self.assertEqual(0, image['min_disk'])
self.assertIsNone(image['owner'])
self.assertEqual('shared', image['visibility'])
self.assertIsNone(image['size'])
self.assertIsNone(image['checksum'])
self.assertIsNone(image['disk_format'])
self.assertEqual([], image['locations'])
self.assertFalse(image['protected'])
self.assertFalse(image['deleted'])
self.assertIsNone(image['deleted_at'])
self.assertEqual([], image['properties'])
self.assertEqual(create_time, image['created_at'])
self.assertEqual(create_time, image['updated_at'])
# Image IDs aren't predictable, but they should be populated
self.assertTrue(uuid.UUID(image['id']))
# NOTE(bcwaldon): the tags attribute should not be returned as a part
# of a core image entity
self.assertNotIn('tags', image)
def test_image_create_duplicate_id(self):
self.assertRaises(exception.Duplicate,
self.db_api.image_create,
self.context, {'id': UUID1, 'status': 'queued'})
def test_image_create_with_locations(self):
locations = [{'url': 'a', 'metadata': {}, 'status': 'active'},
{'url': 'b', 'metadata': {}, 'status': 'active'}]
fixture = {'status': 'queued',
'locations': locations}
image = self.db_api.image_create(self.context, fixture)
actual = [{'url': l['url'], 'metadata': l['metadata'],
'status': l['status']}
for l in image['locations']]
self.assertEqual(locations, actual)
def test_image_create_without_locations(self):
locations = []
fixture = {'status': 'queued',
'locations': locations}
self.db_api.image_create(self.context, fixture)
def test_image_create_with_location_data(self):
location_data = [{'url': 'a', 'metadata': {'key': 'value'},
'status': 'active'},
{'url': 'b', 'metadata': {},
'status': 'active'}]
fixture = {'status': 'queued', 'locations': location_data}
image = self.db_api.image_create(self.context, fixture)
actual = [{'url': l['url'], 'metadata': l['metadata'],
'status': l['status']}
for l in image['locations']]
self.assertEqual(location_data, actual)
def test_image_create_properties(self):
fixture = {'status': 'queued', 'properties': {'ping': 'pong'}}
image = self.db_api.image_create(self.context, fixture)
expected = [{'name': 'ping', 'value': 'pong'}]
actual = [{'name': p['name'], 'value': p['value']}
for p in image['properties']]
self.assertEqual(expected, actual)
def test_image_create_unknown_attributes(self):
fixture = {'ping': 'pong'}
self.assertRaises(exception.Invalid,
self.db_api.image_create, self.context, fixture)
def test_image_create_bad_name(self):
bad_name = u'A name with forbidden symbol \U0001f62a'
fixture = {'name': bad_name, 'size': 12, 'status': 'queued'}
self.assertRaises(exception.Invalid, self.db_api.image_create,
self.context, fixture)
def test_image_create_bad_checksum(self):
# checksum should be no longer than 32 characters
bad_checksum = "42" * 42
fixture = {'checksum': bad_checksum}
self.assertRaises(exception.Invalid, self.db_api.image_create,
self.context, fixture)
# if checksum is not longer than 32 characters but non-ascii ->
# still raise 400
fixture = {'checksum': u'\u042f' * 32}
self.assertRaises(exception.Invalid, self.db_api.image_create,
self.context, fixture)
def test_image_create_bad_int_params(self):
int_too_long = 2 ** 31 + 42
for param in ['min_disk', 'min_ram']:
fixture = {param: int_too_long}
self.assertRaises(exception.Invalid, self.db_api.image_create,
self.context, fixture)
def test_image_create_bad_property(self):
# bad value
fixture = {'status': 'queued',
'properties': {'bad': u'Bad \U0001f62a'}}
self.assertRaises(exception.Invalid, self.db_api.image_create,
self.context, fixture)
# bad property names are also not allowed
fixture = {'status': 'queued', 'properties': {u'Bad \U0001f62a': 'ok'}}
self.assertRaises(exception.Invalid, self.db_api.image_create,
self.context, fixture)
def test_image_create_bad_location(self):
location_data = [{'url': 'a', 'metadata': {'key': 'value'},
'status': 'active'},
{'url': u'Bad \U0001f60a', 'metadata': {},
'status': 'active'}]
fixture = {'status': 'queued', 'locations': location_data}
self.assertRaises(exception.Invalid, self.db_api.image_create,
self.context, fixture)
def test_image_update_core_attribute(self):
fixture = {'status': 'queued'}
image = self.db_api.image_update(self.adm_context, UUID3, fixture)
self.assertEqual('queued', image['status'])
self.assertNotEqual(image['created_at'], image['updated_at'])
def test_image_update_with_locations(self):
locations = [{'url': 'a', 'metadata': {}, 'status': 'active'},
{'url': 'b', 'metadata': {}, 'status': 'active'}]
fixture = {'locations': locations}
image = self.db_api.image_update(self.adm_context, UUID3, fixture)
self.assertEqual(2, len(image['locations']))
self.assertIn('id', image['locations'][0])
self.assertIn('id', image['locations'][1])
image['locations'][0].pop('id')
image['locations'][1].pop('id')
self.assertEqual(locations, image['locations'])
def test_image_update_with_location_data(self):
location_data = [{'url': 'a', 'metadata': {'key': 'value'},
'status': 'active'},
{'url': 'b', 'metadata': {}, 'status': 'active'}]
fixture = {'locations': location_data}
image = self.db_api.image_update(self.adm_context, UUID3, fixture)
self.assertEqual(2, len(image['locations']))
self.assertIn('id', image['locations'][0])
self.assertIn('id', image['locations'][1])
image['locations'][0].pop('id')
image['locations'][1].pop('id')
self.assertEqual(location_data, image['locations'])
def test_image_update(self):
fixture = {'status': 'queued', 'properties': {'ping': 'pong'}}
image = self.db_api.image_update(self.adm_context, UUID3, fixture)
expected = [{'name': 'ping', 'value': 'pong'}]
actual = [{'name': p['name'], 'value': p['value']}
for p in image['properties']]
self.assertEqual(expected, actual)
self.assertEqual('queued', image['status'])
self.assertNotEqual(image['created_at'], image['updated_at'])
def test_image_update_properties(self):
fixture = {'properties': {'ping': 'pong'}}
self.delay_inaccurate_clock()
image = self.db_api.image_update(self.adm_context, UUID1, fixture)
expected = {'ping': 'pong', 'foo': 'bar', 'far': 'boo'}
actual = {p['name']: p['value'] for p in image['properties']}
self.assertEqual(expected, actual)
self.assertNotEqual(image['created_at'], image['updated_at'])
def test_image_update_purge_properties(self):
fixture = {'properties': {'ping': 'pong'}}
image = self.db_api.image_update(self.adm_context, UUID1,
fixture, purge_props=True)
properties = {p['name']: p for p in image['properties']}
# New properties are set
self.assertIn('ping', properties)
self.assertEqual('pong', properties['ping']['value'])
self.assertFalse(properties['ping']['deleted'])
# Original properties still show up, but with deleted=True
# TODO(markwash): db api should not return deleted properties
self.assertIn('foo', properties)
self.assertEqual('bar', properties['foo']['value'])
self.assertTrue(properties['foo']['deleted'])
def test_image_update_bad_name(self):
fixture = {'name': u'A new name with forbidden symbol \U0001f62a'}
self.assertRaises(exception.Invalid, self.db_api.image_update,
self.adm_context, UUID1, fixture)
def test_image_update_bad_property(self):
# bad value
fixture = {'status': 'queued',
'properties': {'bad': u'Bad \U0001f62a'}}
self.assertRaises(exception.Invalid, self.db_api.image_update,
self.adm_context, UUID1, fixture)
# bad property names are also not allowed
fixture = {'status': 'queued', 'properties': {u'Bad \U0001f62a': 'ok'}}
self.assertRaises(exception.Invalid, self.db_api.image_update,
self.adm_context, UUID1, fixture)
def test_image_update_bad_location(self):
location_data = [{'url': 'a', 'metadata': {'key': 'value'},
'status': 'active'},
{'url': u'Bad \U0001f60a', 'metadata': {},
'status': 'active'}]
fixture = {'status': 'queued', 'locations': location_data}
self.assertRaises(exception.Invalid, self.db_api.image_update,
self.adm_context, UUID1, fixture)
def test_update_locations_direct(self):
"""
For some reasons update_locations can be called directly
(not via image_update), so better check that everything is ok if passed
4 byte unicode characters
"""
# update locations correctly first to retrieve existing location id
location_data = [{'url': 'a', 'metadata': {'key': 'value'},
'status': 'active'}]
fixture = {'locations': location_data}
image = self.db_api.image_update(self.adm_context, UUID1, fixture)
self.assertEqual(1, len(image['locations']))
self.assertIn('id', image['locations'][0])
loc_id = image['locations'][0].pop('id')
bad_location = {'url': u'Bad \U0001f60a', 'metadata': {},
'status': 'active', 'id': loc_id}
self.assertRaises(exception.Invalid,
self.db_api.image_location_update,
self.adm_context, UUID1, bad_location)
def test_image_property_delete(self):
fixture = {'name': 'ping', 'value': 'pong', 'image_id': UUID1}
prop = self.db_api.image_property_create(self.context, fixture)
prop = self.db_api.image_property_delete(self.context,
prop['name'], UUID1)
self.assertIsNotNone(prop['deleted_at'])
self.assertTrue(prop['deleted'])
def test_image_get(self):
image = self.db_api.image_get(self.context, UUID1)
self.assertEqual(self.fixtures[0]['id'], image['id'])
def test_image_get_disallow_deleted(self):
self.db_api.image_destroy(self.adm_context, UUID1)
self.assertRaises(exception.NotFound, self.db_api.image_get,
self.context, UUID1)
def test_image_get_allow_deleted(self):
self.db_api.image_destroy(self.adm_context, UUID1)
image = self.db_api.image_get(self.adm_context, UUID1)
self.assertEqual(self.fixtures[0]['id'], image['id'])
self.assertTrue(image['deleted'])
def test_image_get_force_allow_deleted(self):
self.db_api.image_destroy(self.adm_context, UUID1)
image = self.db_api.image_get(self.context, UUID1,
force_show_deleted=True)
self.assertEqual(self.fixtures[0]['id'], image['id'])
def test_image_get_not_owned(self):
TENANT1 = str(uuid.uuid4())
TENANT2 = str(uuid.uuid4())
ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1,
auth_token='user:%s:user' % TENANT1)
ctxt2 = context.RequestContext(is_admin=False, tenant=TENANT2,
auth_token='user:%s:user' % TENANT2)
image = self.db_api.image_create(
ctxt1, {'status': 'queued', 'owner': TENANT1})
self.assertRaises(exception.Forbidden,
self.db_api.image_get, ctxt2, image['id'])
def test_image_get_not_found(self):
UUID = str(uuid.uuid4())
self.assertRaises(exception.NotFound,
self.db_api.image_get, self.context, UUID)
def test_image_get_all(self):
images = self.db_api.image_get_all(self.context)
self.assertEqual(3, len(images))
def test_image_get_all_with_filter(self):
images = self.db_api.image_get_all(self.context,
filters={
'id': self.fixtures[0]['id'],
})
self.assertEqual(1, len(images))
self.assertEqual(self.fixtures[0]['id'], images[0]['id'])
def test_image_get_all_with_filter_user_defined_property(self):
images = self.db_api.image_get_all(self.context,
filters={'foo': 'bar'})
self.assertEqual(1, len(images))
self.assertEqual(self.fixtures[0]['id'], images[0]['id'])
def test_image_get_all_with_filter_nonexistent_userdef_property(self):
images = self.db_api.image_get_all(self.context,
filters={'faz': 'boo'})
self.assertEqual(0, len(images))
def test_image_get_all_with_filter_userdef_prop_nonexistent_value(self):
images = self.db_api.image_get_all(self.context,
filters={'foo': 'baz'})
self.assertEqual(0, len(images))
def test_image_get_all_with_filter_multiple_user_defined_properties(self):
images = self.db_api.image_get_all(self.context,
filters={'foo': 'bar',
'far': 'boo'})
self.assertEqual(1, len(images))
self.assertEqual(images[0]['id'], self.fixtures[0]['id'])
def test_image_get_all_with_filter_nonexistent_user_defined_property(self):
images = self.db_api.image_get_all(self.context,
filters={'foo': 'bar',
'faz': 'boo'})
self.assertEqual(0, len(images))
def test_image_get_all_with_filter_user_deleted_property(self):
fixture = {'name': 'poo', 'value': 'bear', 'image_id': UUID1}
prop = self.db_api.image_property_create(self.context,
fixture)
images = self.db_api.image_get_all(self.context,
filters={
'properties': {'poo': 'bear'},
})
self.assertEqual(1, len(images))
self.db_api.image_property_delete(self.context,
prop['name'], images[0]['id'])
images = self.db_api.image_get_all(self.context,
filters={
'properties': {'poo': 'bear'},
})
self.assertEqual(0, len(images))
def test_image_get_all_with_filter_undefined_property(self):
images = self.db_api.image_get_all(self.context,
filters={'poo': 'bear'})
self.assertEqual(0, len(images))
def test_image_get_all_with_filter_protected(self):
images = self.db_api.image_get_all(self.context,
filters={'protected':
True})
self.assertEqual(1, len(images))
images = self.db_api.image_get_all(self.context,
filters={'protected':
False})
self.assertEqual(2, len(images))
def test_image_get_all_with_filter_comparative_created_at(self):
anchor = timeutils.isotime(self.fixtures[0]['created_at'])
time_expr = 'lt:' + anchor
images = self.db_api.image_get_all(self.context,
filters={'created_at': time_expr})
self.assertEqual(0, len(images))
def test_image_get_all_with_filter_comparative_updated_at(self):
anchor = timeutils.isotime(self.fixtures[0]['updated_at'])
time_expr = 'lt:' + anchor
images = self.db_api.image_get_all(self.context,
filters={'updated_at': time_expr})
self.assertEqual(0, len(images))
def test_filter_image_by_invalid_operator(self):
self.assertRaises(exception.InvalidFilterOperatorValue,
self.db_api.image_get_all,
self.context, filters={'status': 'lala:active'})
def test_image_get_all_with_filter_in_status(self):
images = self.db_api.image_get_all(self.context,
filters={'status': 'in:active'})
self.assertEqual(3, len(images))
def test_image_get_all_with_filter_in_name(self):
data = 'in:%s' % self.fixtures[0]['name']
images = self.db_api.image_get_all(self.context,
filters={'name': data})
self.assertEqual(3, len(images))
def test_image_get_all_with_filter_in_container_format(self):
images = self.db_api.image_get_all(self.context,
filters={'container_format':
'in:ami,bare,ovf'})
self.assertEqual(3, len(images))
def test_image_get_all_with_filter_in_disk_format(self):
images = self.db_api.image_get_all(self.context,
filters={'disk_format':
'in:vhd'})
self.assertEqual(3, len(images))
def test_image_get_all_with_filter_in_id(self):
data = 'in:%s,%s' % (UUID1, UUID2)
images = self.db_api.image_get_all(self.context,
filters={'id': data})
self.assertEqual(2, len(images))
def test_image_get_all_with_quotes(self):
fixture = {'name': 'fake\\\"name'}
self.db_api.image_update(self.adm_context, UUID3, fixture)
fixture = {'name': 'fake,name'}
self.db_api.image_update(self.adm_context, UUID2, fixture)
fixture = {'name': 'fakename'}
self.db_api.image_update(self.adm_context, UUID1, fixture)
data = 'in:\"fake\\\"name\",fakename,\"fake,name\"'
images = self.db_api.image_get_all(self.context,
filters={'name': data})
self.assertEqual(3, len(images))
def test_image_get_all_with_invalid_quotes(self):
invalid_expr = ['in:\"name', 'in:\"name\"name', 'in:name\"dd\"',
'in:na\"me', 'in:\"name\"\"name\"']
for expr in invalid_expr:
self.assertRaises(exception.InvalidParameterValue,
self.db_api.image_get_all,
self.context, filters={'name': expr})
def test_image_get_all_size_min_max(self):
images = self.db_api.image_get_all(self.context,
filters={
'size_min': 10,
'size_max': 15,
})
self.assertEqual(1, len(images))
self.assertEqual(self.fixtures[0]['id'], images[0]['id'])
def test_image_get_all_size_min(self):
images = self.db_api.image_get_all(self.context,
filters={'size_min': 15})
self.assertEqual(2, len(images))
self.assertEqual(self.fixtures[2]['id'], images[0]['id'])
self.assertEqual(self.fixtures[1]['id'], images[1]['id'])
def test_image_get_all_size_range(self):
images = self.db_api.image_get_all(self.context,
filters={'size_max': 15,
'size_min': 20})
self.assertEqual(0, len(images))
def test_image_get_all_size_max(self):
images = self.db_api.image_get_all(self.context,
filters={'size_max': 15})
self.assertEqual(1, len(images))
self.assertEqual(self.fixtures[0]['id'], images[0]['id'])
def test_image_get_all_with_filter_min_range_bad_value(self):
self.assertRaises(exception.InvalidFilterRangeValue,
self.db_api.image_get_all,
self.context, filters={'size_min': 'blah'})
def test_image_get_all_with_filter_max_range_bad_value(self):
self.assertRaises(exception.InvalidFilterRangeValue,
self.db_api.image_get_all,
self.context, filters={'size_max': 'blah'})
def test_image_get_all_marker(self):
images = self.db_api.image_get_all(self.context, marker=UUID3)
self.assertEqual(2, len(images))
def test_image_get_all_marker_with_size(self):
# Use sort_key=size to test BigInteger
images = self.db_api.image_get_all(self.context, sort_key=['size'],
marker=UUID3)
self.assertEqual(2, len(images))
self.assertEqual(17, images[0]['size'])
self.assertEqual(13, images[1]['size'])
def test_image_get_all_marker_deleted(self):
"""Cannot specify a deleted image as a marker."""
self.db_api.image_destroy(self.adm_context, UUID1)
filters = {'deleted': False}
self.assertRaises(exception.NotFound, self.db_api.image_get_all,
self.context, marker=UUID1, filters=filters)
def test_image_get_all_marker_deleted_showing_deleted_as_admin(self):
"""Specify a deleted image as a marker if showing deleted images."""
self.db_api.image_destroy(self.adm_context, UUID3)
images = self.db_api.image_get_all(self.adm_context, marker=UUID3)
# NOTE(bcwaldon): an admin should see all images (deleted or not)
self.assertEqual(2, len(images))
def test_image_get_all_marker_deleted_showing_deleted(self):
"""Specify a deleted image as a marker if showing deleted images.
A non-admin user has to explicitly ask for deleted
images, and should only see deleted images in the results
"""
self.db_api.image_destroy(self.adm_context, UUID3)
self.db_api.image_destroy(self.adm_context, UUID1)
filters = {'deleted': True}
images = self.db_api.image_get_all(self.context, marker=UUID3,
filters=filters)
self.assertEqual(1, len(images))
def test_image_get_all_marker_null_name_desc(self):
"""Check an image with name null is handled
Check an image with name null is handled
marker is specified and order is descending
"""
TENANT1 = str(uuid.uuid4())
ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1,
auth_token='user:%s:user' % TENANT1)
UUIDX = str(uuid.uuid4())
self.db_api.image_create(ctxt1, {'id': UUIDX,
'status': 'queued',
'name': None,
'owner': TENANT1})
images = self.db_api.image_get_all(ctxt1, marker=UUIDX,
sort_key=['name'],
sort_dir=['desc'])
image_ids = [image['id'] for image in images]
expected = []
self.assertEqual(sorted(expected), sorted(image_ids))
def test_image_get_all_marker_null_disk_format_desc(self):
"""Check an image with disk_format null is handled
Check an image with disk_format null is handled when
marker is specified and order is descending
"""
TENANT1 = str(uuid.uuid4())
ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1,
auth_token='user:%s:user' % TENANT1)
UUIDX = str(uuid.uuid4())
self.db_api.image_create(ctxt1, {'id': UUIDX,
'status': 'queued',
'disk_format': None,
'owner': TENANT1})
images = self.db_api.image_get_all(ctxt1, marker=UUIDX,
sort_key=['disk_format'],
sort_dir=['desc'])
image_ids = [image['id'] for image in images]
expected = []
self.assertEqual(sorted(expected), sorted(image_ids))
def test_image_get_all_marker_null_container_format_desc(self):
"""Check an image with container_format null is handled
Check an image with container_format null is handled when
marker is specified and order is descending
"""
TENANT1 = str(uuid.uuid4())
ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1,
auth_token='user:%s:user' % TENANT1)
UUIDX = str(uuid.uuid4())
self.db_api.image_create(ctxt1, {'id': UUIDX,
'status': 'queued',
'container_format': None,
'owner': TENANT1})
images = self.db_api.image_get_all(ctxt1, marker=UUIDX,
sort_key=['container_format'],
sort_dir=['desc'])
image_ids = [image['id'] for image in images]
expected = []
self.assertEqual(sorted(expected), sorted(image_ids))
def test_image_get_all_marker_null_name_asc(self):
"""Check an image with name null is handled
Check an image with name null is handled when
marker is specified and order is ascending
"""
TENANT1 = str(uuid.uuid4())
ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1,
auth_token='user:%s:user' % TENANT1)
UUIDX = str(uuid.uuid4())
self.db_api.image_create(ctxt1, {'id': UUIDX,
'status': 'queued',
'name': None,
'owner': TENANT1})
images = self.db_api.image_get_all(ctxt1, marker=UUIDX,
sort_key=['name'],
sort_dir=['asc'])
image_ids = [image['id'] for image in images]
expected = [UUID3, UUID2, UUID1]
self.assertEqual(sorted(expected), sorted(image_ids))
def test_image_get_all_marker_null_disk_format_asc(self):
"""Check an image with disk_format null is handled
Check an image with disk_format null is handled when
marker is specified and order is ascending
"""
TENANT1 = str(uuid.uuid4())
ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1,
auth_token='user:%s:user' % TENANT1)
UUIDX = str(uuid.uuid4())
self.db_api.image_create(ctxt1, {'id': UUIDX,
'status': 'queued',
'disk_format': None,
'owner': TENANT1})
images = self.db_api.image_get_all(ctxt1, marker=UUIDX,
sort_key=['disk_format'],
sort_dir=['asc'])
image_ids = [image['id'] for image in images]
expected = [UUID3, UUID2, UUID1]
self.assertEqual(sorted(expected), sorted(image_ids))
def test_image_get_all_marker_null_container_format_asc(self):
"""Check an image with container_format null is handled
Check an image with container_format null is handled when
marker is specified and order is ascending
"""
TENANT1 = str(uuid.uuid4())
ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1,
auth_token='user:%s:user' % TENANT1)
UUIDX = str(uuid.uuid4())
self.db_api.image_create(ctxt1, {'id': UUIDX,
'status': 'queued',
'container_format': None,
'owner': TENANT1})
images = self.db_api.image_get_all(ctxt1, marker=UUIDX,
sort_key=['container_format'],
sort_dir=['asc'])
image_ids = [image['id'] for image in images]
expected = [UUID3, UUID2, UUID1]
self.assertEqual(sorted(expected), sorted(image_ids))
def test_image_get_all_limit(self):
images = self.db_api.image_get_all(self.context, limit=2)
self.assertEqual(2, len(images))
# A limit of None should not equate to zero
images = self.db_api.image_get_all(self.context, limit=None)
self.assertEqual(3, len(images))
# A limit of zero should actually mean zero
images = self.db_api.image_get_all(self.context, limit=0)
self.assertEqual(0, len(images))
def test_image_get_all_owned(self):
TENANT1 = str(uuid.uuid4())
ctxt1 = context.RequestContext(is_admin=False,
tenant=TENANT1,
auth_token='user:%s:user' % TENANT1)
UUIDX = str(uuid.uuid4())
image_meta_data = {'id': UUIDX, 'status': 'queued', 'owner': TENANT1}
self.db_api.image_create(ctxt1, image_meta_data)
TENANT2 = str(uuid.uuid4())
ctxt2 = context.RequestContext(is_admin=False,
tenant=TENANT2,
auth_token='user:%s:user' % TENANT2)
UUIDY = str(uuid.uuid4())
image_meta_data = {'id': UUIDY, 'status': 'queued', 'owner': TENANT2}
self.db_api.image_create(ctxt2, image_meta_data)
images = self.db_api.image_get_all(ctxt1)
image_ids = [image['id'] for image in images]
expected = [UUIDX, UUID3, UUID2, UUID1]
self.assertEqual(sorted(expected), sorted(image_ids))
def test_image_get_all_owned_checksum(self):
TENANT1 = str(uuid.uuid4())
ctxt1 = context.RequestContext(is_admin=False,
tenant=TENANT1,
auth_token='user:%s:user' % TENANT1)
UUIDX = str(uuid.uuid4())
CHECKSUM1 = '91264c3edf5972c9f1cb309543d38a5c'
image_meta_data = {
'id': UUIDX,
'status': 'queued',
'checksum': CHECKSUM1,
'owner': TENANT1
}
self.db_api.image_create(ctxt1, image_meta_data)
image_member_data = {
'image_id': UUIDX,
'member': TENANT1,
'can_share': False,
"status": "accepted",
}
self.db_api.image_member_create(ctxt1, image_member_data)
TENANT2 = str(uuid.uuid4())
ctxt2 = context.RequestContext(is_admin=False,
tenant=TENANT2,
auth_token='user:%s:user' % TENANT2)
UUIDY = str(uuid.uuid4())
CHECKSUM2 = '92264c3edf5972c9f1cb309543d38a5c'
image_meta_data = {
'id': UUIDY,
'status': 'queued',
'checksum': CHECKSUM2,
'owner': TENANT2
}
self.db_api.image_create(ctxt2, image_meta_data)
image_member_data = {
'image_id': UUIDY,
'member': TENANT2,
'can_share': False,
"status": "accepted",
}
self.db_api.image_member_create(ctxt2, image_member_data)
filters = {'visibility': 'shared', 'checksum': CHECKSUM2}
images = self.db_api.image_get_all(ctxt2, filters)
self.assertEqual(1, len(images))
self.assertEqual(UUIDY, images[0]['id'])
def test_image_get_all_with_filter_tags(self):
self.db_api.image_tag_create(self.context, UUID1, 'x86')
self.db_api.image_tag_create(self.context, UUID1, '64bit')
self.db_api.image_tag_create(self.context, UUID2, 'power')
self.db_api.image_tag_create(self.context, UUID2, '64bit')
images = self.db_api.image_get_all(self.context,
filters={'tags': ['64bit']})
self.assertEqual(2, len(images))
image_ids = [image['id'] for image in images]
expected = [UUID1, UUID2]
self.assertEqual(sorted(expected), sorted(image_ids))
def test_image_get_all_with_filter_multi_tags(self):
self.db_api.image_tag_create(self.context, UUID1, 'x86')
self.db_api.image_tag_create(self.context, UUID1, '64bit')
self.db_api.image_tag_create(self.context, UUID2, 'power')
self.db_api.image_tag_create(self.context, UUID2, '64bit')
images = self.db_api.image_get_all(self.context,
filters={'tags': ['64bit', 'power']
})
self.assertEqual(1, len(images))
self.assertEqual(UUID2, images[0]['id'])
def test_image_get_all_with_filter_tags_and_nonexistent(self):
self.db_api.image_tag_create(self.context, UUID1, 'x86')
images = self.db_api.image_get_all(self.context,
filters={'tags': ['x86', 'fake']
})
self.assertEqual(0, len(images))
def test_image_get_all_with_filter_deleted_tags(self):
tag = self.db_api.image_tag_create(self.context, UUID1, 'AIX')
images = self.db_api.image_get_all(self.context,
filters={
'tags': [tag],
})
self.assertEqual(1, len(images))
self.db_api.image_tag_delete(self.context, UUID1, tag)
images = self.db_api.image_get_all(self.context,
filters={
'tags': [tag],
})
self.assertEqual(0, len(images))
def test_image_get_all_with_filter_undefined_tags(self):
images = self.db_api.image_get_all(self.context,
filters={'tags': ['fake']})
self.assertEqual(0, len(images))
def test_image_paginate(self):
"""Paginate through a list of images using limit and marker"""
now = timeutils.utcnow()
extra_uuids = [(str(uuid.uuid4()),
now + datetime.timedelta(seconds=i * 5))
for i in range(2)]
extra_images = [build_image_fixture(id=_id,
created_at=_dt,
updated_at=_dt)
for _id, _dt in extra_uuids]
self.create_images(extra_images)
# Reverse uuids to match default sort of created_at
extra_uuids.reverse()
page = self.db_api.image_get_all(self.context, limit=2)
self.assertEqual([i[0] for i in extra_uuids], [i['id'] for i in page])
last = page[-1]['id']
page = self.db_api.image_get_all(self.context, limit=2, marker=last)
self.assertEqual([UUID3, UUID2], [i['id'] for i in page])
page = self.db_api.image_get_all(self.context, limit=2, marker=UUID2)
self.assertEqual([UUID1], [i['id'] for i in page])
def test_image_get_all_invalid_sort_key(self):
self.assertRaises(exception.InvalidSortKey, self.db_api.image_get_all,
self.context, sort_key=['blah'])
def test_image_get_all_limit_marker(self):
images = self.db_api.image_get_all(self.context, limit=2)
self.assertEqual(2, len(images))
def test_image_get_all_with_tag_returning(self):
expected_tags = {UUID1: ['foo'], UUID2: ['bar'], UUID3: ['baz']}
self.db_api.image_tag_create(self.context, UUID1,
expected_tags[UUID1][0])
self.db_api.image_tag_create(self.context, UUID2,
expected_tags[UUID2][0])
self.db_api.image_tag_create(self.context, UUID3,
expected_tags[UUID3][0])
images = self.db_api.image_get_all(self.context, return_tag=True)
self.assertEqual(3, len(images))
for image in images:
self.assertIn('tags', image)
self.assertEqual(expected_tags[image['id']], image['tags'])
self.db_api.image_tag_delete(self.context, UUID1,
expected_tags[UUID1][0])
expected_tags[UUID1] = []
images = self.db_api.image_get_all(self.context, return_tag=True)
self.assertEqual(3, len(images))
for image in images:
self.assertIn('tags', image)
self.assertEqual(expected_tags[image['id']], image['tags'])
def test_image_destroy(self):
location_data = [{'url': 'a', 'metadata': {'key': 'value'},
'status': 'active'},
{'url': 'b', 'metadata': {},
'status': 'active'}]
fixture = {'status': 'queued', 'locations': location_data}
image = self.db_api.image_create(self.context, fixture)
IMG_ID = image['id']
fixture = {'name': 'ping', 'value': 'pong', 'image_id': IMG_ID}
prop = self.db_api.image_property_create(self.context, fixture)
TENANT2 = str(uuid.uuid4())
fixture = {'image_id': IMG_ID, 'member': TENANT2, 'can_share': False}
member = self.db_api.image_member_create(self.context, fixture)
self.db_api.image_tag_create(self.context, IMG_ID, 'snarf')
self.assertEqual(2, len(image['locations']))
self.assertIn('id', image['locations'][0])
self.assertIn('id', image['locations'][1])
image['locations'][0].pop('id')
image['locations'][1].pop('id')
self.assertEqual(location_data, image['locations'])
self.assertEqual(('ping', 'pong', IMG_ID, False),
(prop['name'], prop['value'],
prop['image_id'], prop['deleted']))
self.assertEqual((TENANT2, IMG_ID, False),
(member['member'], member['image_id'],
member['can_share']))
self.assertEqual(['snarf'],
self.db_api.image_tag_get_all(self.context, IMG_ID))
image = self.db_api.image_destroy(self.adm_context, IMG_ID)
self.assertTrue(image['deleted'])
self.assertTrue(image['deleted_at'])
self.assertRaises(exception.NotFound, self.db_api.image_get,
self.context, IMG_ID)
self.assertEqual([], image['locations'])
prop = image['properties'][0]
self.assertEqual(('ping', IMG_ID, True),
(prop['name'], prop['image_id'], prop['deleted']))
self.context.auth_token = 'user:%s:user' % TENANT2
members = self.db_api.image_member_find(self.context, IMG_ID)
self.assertEqual([], members)
tags = self.db_api.image_tag_get_all(self.context, IMG_ID)
self.assertEqual([], tags)
def test_image_destroy_with_delete_all(self):
"""Check the image child element's _image_delete_all methods.
checks if all the image_delete_all methods deletes only the child
elements of the image to be deleted.
"""
TENANT2 = str(uuid.uuid4())
location_data = [{'url': 'a', 'metadata': {'key': 'value'},
'status': 'active'},
{'url': 'b', 'metadata': {}, 'status': 'active'}]
def _create_image_with_child_entries():
fixture = {'status': 'queued', 'locations': location_data}
image_id = self.db_api.image_create(self.context, fixture)['id']
fixture = {'name': 'ping', 'value': 'pong', 'image_id': image_id}
self.db_api.image_property_create(self.context, fixture)
fixture = {'image_id': image_id, 'member': TENANT2,
'can_share': False}
self.db_api.image_member_create(self.context, fixture)
self.db_api.image_tag_create(self.context, image_id, 'snarf')
return image_id
ACTIVE_IMG_ID = _create_image_with_child_entries()
DEL_IMG_ID = _create_image_with_child_entries()
deleted_image = self.db_api.image_destroy(self.adm_context, DEL_IMG_ID)
self.assertTrue(deleted_image['deleted'])
self.assertTrue(deleted_image['deleted_at'])
self.assertRaises(exception.NotFound, self.db_api.image_get,
self.context, DEL_IMG_ID)
active_image = self.db_api.image_get(self.context, ACTIVE_IMG_ID)
self.assertFalse(active_image['deleted'])
self.assertFalse(active_image['deleted_at'])
self.assertEqual(2, len(active_image['locations']))
self.assertIn('id', active_image['locations'][0])
self.assertIn('id', active_image['locations'][1])
active_image['locations'][0].pop('id')
active_image['locations'][1].pop('id')
self.assertEqual(location_data, active_image['locations'])
self.assertEqual(1, len(active_image['properties']))
prop = active_image['properties'][0]
self.assertEqual(('ping', 'pong', ACTIVE_IMG_ID),
(prop['name'], prop['value'],
prop['image_id']))
self.assertEqual((False, None),
(prop['deleted'], prop['deleted_at']))
self.context.auth_token = 'user:%s:user' % TENANT2
members = self.db_api.image_member_find(self.context, ACTIVE_IMG_ID)
self.assertEqual(1, len(members))
member = members[0]
self.assertEqual((TENANT2, ACTIVE_IMG_ID, False),
(member['member'], member['image_id'],
member['can_share']))
tags = self.db_api.image_tag_get_all(self.context, ACTIVE_IMG_ID)
self.assertEqual(['snarf'], tags)
def test_image_get_multiple_members(self):
TENANT1 = str(uuid.uuid4())
TENANT2 = str(uuid.uuid4())
ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1,
auth_token='user:%s:user' % TENANT1,
owner_is_tenant=True)
ctxt2 = context.RequestContext(is_admin=False, user=TENANT2,
auth_token='user:%s:user' % TENANT2,
owner_is_tenant=False)
UUIDX = str(uuid.uuid4())
# We need a shared image and context.owner should not match image
# owner
self.db_api.image_create(ctxt1, {'id': UUIDX,
'status': 'queued',
'is_public': False,
'owner': TENANT1})
values = {'image_id': UUIDX, 'member': TENANT2, 'can_share': False}
self.db_api.image_member_create(ctxt1, values)
image = self.db_api.image_get(ctxt2, UUIDX)
self.assertEqual(UUIDX, image['id'])
# by default get_all displays only images with status 'accepted'
images = self.db_api.image_get_all(ctxt2)
self.assertEqual(3, len(images))
# filter by rejected
images = self.db_api.image_get_all(ctxt2, member_status='rejected')
self.assertEqual(3, len(images))
# filter by visibility
images = self.db_api.image_get_all(ctxt2,
filters={'visibility': 'shared'})
self.assertEqual(0, len(images))
# filter by visibility
images = self.db_api.image_get_all(ctxt2, member_status='pending',
filters={'visibility': 'shared'})
self.assertEqual(1, len(images))
# filter by visibility
images = self.db_api.image_get_all(ctxt2, member_status='all',
filters={'visibility': 'shared'})
self.assertEqual(1, len(images))
# filter by status pending
images = self.db_api.image_get_all(ctxt2, member_status='pending')
self.assertEqual(4, len(images))
# filter by status all
images = self.db_api.image_get_all(ctxt2, member_status='all')
self.assertEqual(4, len(images))
def test_is_image_visible(self):
TENANT1 = str(uuid.uuid4())
TENANT2 = str(uuid.uuid4())
ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1,
auth_token='user:%s:user' % TENANT1,
owner_is_tenant=True)
ctxt2 = context.RequestContext(is_admin=False, user=TENANT2,
auth_token='user:%s:user' % TENANT2,
owner_is_tenant=False)
UUIDX = str(uuid.uuid4())
# We need a shared image and context.owner should not match image
# owner
image = self.db_api.image_create(ctxt1, {'id': UUIDX,
'status': 'queued',
'is_public': False,
'owner': TENANT1})
values = {'image_id': UUIDX, 'member': TENANT2, 'can_share': False}
self.db_api.image_member_create(ctxt1, values)
result = self.db_api.is_image_visible(ctxt2, image)
self.assertTrue(result)
# image should not be visible for a deleted member
members = self.db_api.image_member_find(ctxt1, image_id=UUIDX)
self.db_api.image_member_delete(ctxt1, members[0]['id'])
result = self.db_api.is_image_visible(ctxt2, image)
self.assertFalse(result)
def test_is_community_image_visible(self):
TENANT1 = str(uuid.uuid4())
TENANT2 = str(uuid.uuid4())
owners_ctxt = context.RequestContext(is_admin=False, tenant=TENANT1,
auth_token='user:%s:user'
% TENANT1, owner_is_tenant=True)
viewing_ctxt = context.RequestContext(is_admin=False, user=TENANT2,
auth_token='user:%s:user'
% TENANT2, owner_is_tenant=False)
UUIDX = str(uuid.uuid4())
# We need a community image and context.owner should not match image
# owner
image = self.db_api.image_create(owners_ctxt,
{'id': UUIDX,
'status': 'queued',
'visibility': 'community',
'owner': TENANT1})
# image should be visible in every context
result = self.db_api.is_image_visible(owners_ctxt, image)
self.assertTrue(result)
result = self.db_api.is_image_visible(viewing_ctxt, image)
self.assertTrue(result)
def test_image_tag_create(self):
tag = self.db_api.image_tag_create(self.context, UUID1, 'snap')
self.assertEqual('snap', tag)
def test_image_tag_create_bad_value(self):
self.assertRaises(exception.Invalid,
self.db_api.image_tag_create, self.context,
UUID1, u'Bad \U0001f62a')
def test_image_tag_set_all(self):
tags = self.db_api.image_tag_get_all(self.context, UUID1)
self.assertEqual([], tags)
self.db_api.image_tag_set_all(self.context, UUID1, ['ping', 'pong'])
tags = self.db_api.image_tag_get_all(self.context, UUID1)
# NOTE(bcwaldon): tag ordering should match exactly what was provided
self.assertEqual(['ping', 'pong'], tags)
def test_image_tag_get_all(self):
self.db_api.image_tag_create(self.context, UUID1, 'snap')
self.db_api.image_tag_create(self.context, UUID1, 'snarf')
self.db_api.image_tag_create(self.context, UUID2, 'snarf')
# Check the tags for the first image
tags = self.db_api.image_tag_get_all(self.context, UUID1)
expected = ['snap', 'snarf']
self.assertEqual(expected, tags)
# Check the tags for the second image
tags = self.db_api.image_tag_get_all(self.context, UUID2)
expected = ['snarf']
self.assertEqual(expected, tags)
def test_image_tag_get_all_no_tags(self):
actual = self.db_api.image_tag_get_all(self.context, UUID1)
self.assertEqual([], actual)
def test_image_tag_get_all_non_existent_image(self):
bad_image_id = str(uuid.uuid4())
actual = self.db_api.image_tag_get_all(self.context, bad_image_id)
self.assertEqual([], actual)
def test_image_tag_delete(self):
self.db_api.image_tag_create(self.context, UUID1, 'snap')
self.db_api.image_tag_delete(self.context, UUID1, 'snap')
self.assertRaises(exception.NotFound, self.db_api.image_tag_delete,
self.context, UUID1, 'snap')
@mock.patch.object(timeutils, 'utcnow')
def test_image_member_create(self, mock_utcnow):
mock_utcnow.return_value = datetime.datetime.utcnow()
memberships = self.db_api.image_member_find(self.context)
self.assertEqual([], memberships)
TENANT1 = str(uuid.uuid4())
# NOTE(flaper87): Update auth token, otherwise
# non visible members won't be returned.
self.context.auth_token = 'user:%s:user' % TENANT1
self.db_api.image_member_create(self.context,
{'member': TENANT1, 'image_id': UUID1})
memberships = self.db_api.image_member_find(self.context)
self.assertEqual(1, len(memberships))
actual = memberships[0]
self.assertIsNotNone(actual['created_at'])
self.assertIsNotNone(actual['updated_at'])
actual.pop('id')
actual.pop('created_at')
actual.pop('updated_at')
expected = {
'member': TENANT1,
'image_id': UUID1,
'can_share': False,
'status': 'pending',
'deleted': False,
}
self.assertEqual(expected, actual)
def test_image_member_update(self):
TENANT1 = str(uuid.uuid4())
# NOTE(flaper87): Update auth token, otherwise
# non visible members won't be returned.
self.context.auth_token = 'user:%s:user' % TENANT1
member = self.db_api.image_member_create(self.context,
{'member': TENANT1,
'image_id': UUID1})
member_id = member.pop('id')
member.pop('created_at')
member.pop('updated_at')
expected = {'member': TENANT1,
'image_id': UUID1,
'status': 'pending',
'can_share': False,
'deleted': False}
self.assertEqual(expected, member)
self.delay_inaccurate_clock()
member = self.db_api.image_member_update(self.context,
member_id,
{'can_share': True})
self.assertNotEqual(member['created_at'], member['updated_at'])
member.pop('id')
member.pop('created_at')
member.pop('updated_at')
expected = {'member': TENANT1,
'image_id': UUID1,
'status': 'pending',
'can_share': True,
'deleted': False}
self.assertEqual(expected, member)
members = self.db_api.image_member_find(self.context,
member=TENANT1,
image_id=UUID1)
member = members[0]
member.pop('id')
member.pop('created_at')
member.pop('updated_at')
self.assertEqual(expected, member)
def test_image_member_update_status(self):
TENANT1 = str(uuid.uuid4())
# NOTE(flaper87): Update auth token, otherwise
# non visible members won't be returned.
self.context.auth_token = 'user:%s:user' % TENANT1
member = self.db_api.image_member_create(self.context,
{'member': TENANT1,
'image_id': UUID1})
member_id = member.pop('id')
member.pop('created_at')
member.pop('updated_at')
expected = {'member': TENANT1,
'image_id': UUID1,
'status': 'pending',
'can_share': False,
'deleted': False}
self.assertEqual(expected, member)
self.delay_inaccurate_clock()
member = self.db_api.image_member_update(self.context,
member_id,
{'status': 'accepted'})
self.assertNotEqual(member['created_at'], member['updated_at'])
member.pop('id')
member.pop('created_at')
member.pop('updated_at')
expected = {'member': TENANT1,
'image_id': UUID1,
'status': 'accepted',
'can_share': False,
'deleted': False}
self.assertEqual(expected, member)
members = self.db_api.image_member_find(self.context,
member=TENANT1,
image_id=UUID1)
member = members[0]
member.pop('id')
member.pop('created_at')
member.pop('updated_at')
self.assertEqual(expected, member)
def test_image_member_find(self):
TENANT1 = str(uuid.uuid4())
TENANT2 = str(uuid.uuid4())
fixtures = [
{'member': TENANT1, 'image_id': UUID1},
{'member': TENANT1, 'image_id': UUID2, 'status': 'rejected'},
{'member': TENANT2, 'image_id': UUID1, 'status': 'accepted'},
]
for f in fixtures:
self.db_api.image_member_create(self.context, copy.deepcopy(f))
def _simplify(output):
return
def _assertMemberListMatch(list1, list2):
def _simple(x):
return set([(o['member'], o['image_id']) for o in x])
self.assertEqual(_simple(list1), _simple(list2))
# NOTE(flaper87): Update auth token, otherwise
# non visible members won't be returned.
self.context.auth_token = 'user:%s:user' % TENANT1
output = self.db_api.image_member_find(self.context, member=TENANT1)
_assertMemberListMatch([fixtures[0], fixtures[1]], output)
output = self.db_api.image_member_find(self.adm_context,
image_id=UUID1)
_assertMemberListMatch([fixtures[0], fixtures[2]], output)
# NOTE(flaper87): Update auth token, otherwise
# non visible members won't be returned.
self.context.auth_token = 'user:%s:user' % TENANT2
output = self.db_api.image_member_find(self.context,
member=TENANT2,
image_id=UUID1)
_assertMemberListMatch([fixtures[2]], output)
output = self.db_api.image_member_find(self.context,
status='accepted')
_assertMemberListMatch([fixtures[2]], output)
# NOTE(flaper87): Update auth token, otherwise
# non visible members won't be returned.
self.context.auth_token = 'user:%s:user' % TENANT1
output = self.db_api.image_member_find(self.context,
status='rejected')
_assertMemberListMatch([fixtures[1]], output)
output = self.db_api.image_member_find(self.context,
status='pending')
_assertMemberListMatch([fixtures[0]], output)
output = self.db_api.image_member_find(self.context,
status='pending',
image_id=UUID2)
_assertMemberListMatch([], output)
image_id = str(uuid.uuid4())
output = self.db_api.image_member_find(self.context,
member=TENANT2,
image_id=image_id)
_assertMemberListMatch([], output)
def test_image_member_count(self):
TENANT1 = str(uuid.uuid4())
self.db_api.image_member_create(self.context,
{'member': TENANT1,
'image_id': UUID1})
actual = self.db_api.image_member_count(self.context, UUID1)
self.assertEqual(1, actual)
def test_image_member_count_invalid_image_id(self):
TENANT1 = str(uuid.uuid4())
self.db_api.image_member_create(self.context,
{'member': TENANT1,
'image_id': UUID1})
self.assertRaises(exception.Invalid, self.db_api.image_member_count,
self.context, None)
def test_image_member_count_empty_image_id(self):
TENANT1 = str(uuid.uuid4())
self.db_api.image_member_create(self.context,
{'member': TENANT1,
'image_id': UUID1})
self.assertRaises(exception.Invalid, self.db_api.image_member_count,
self.context, "")
def test_image_member_delete(self):
TENANT1 = str(uuid.uuid4())
# NOTE(flaper87): Update auth token, otherwise
# non visible members won't be returned.
self.context.auth_token = 'user:%s:user' % TENANT1
fixture = {'member': TENANT1, 'image_id': UUID1, 'can_share': True}
member = self.db_api.image_member_create(self.context, fixture)
self.assertEqual(1, len(self.db_api.image_member_find(self.context)))
member = self.db_api.image_member_delete(self.context, member['id'])
self.assertEqual(0, len(self.db_api.image_member_find(self.context)))
class DriverQuotaTests(test_utils.BaseTestCase):
def setUp(self):
super(DriverQuotaTests, self).setUp()
self.owner_id1 = str(uuid.uuid4())
self.context1 = context.RequestContext(
is_admin=False, user=self.owner_id1, tenant=self.owner_id1,
auth_token='%s:%s:user' % (self.owner_id1, self.owner_id1))
self.db_api = db_tests.get_db(self.config)
db_tests.reset_db(self.db_api)
dt1 = timeutils.utcnow()
dt2 = dt1 + datetime.timedelta(microseconds=5)
fixtures = [
{
'id': UUID1,
'created_at': dt1,
'updated_at': dt1,
'size': 13,
'owner': self.owner_id1,
},
{
'id': UUID2,
'created_at': dt1,
'updated_at': dt2,
'size': 17,
'owner': self.owner_id1,
},
{
'id': UUID3,
'created_at': dt2,
'updated_at': dt2,
'size': 7,
'owner': self.owner_id1,
},
]
self.owner1_fixtures = [
build_image_fixture(**fixture) for fixture in fixtures]
for fixture in self.owner1_fixtures:
self.db_api.image_create(self.context1, fixture)
def test_storage_quota(self):
total = reduce(lambda x, y: x + y,
[f['size'] for f in self.owner1_fixtures])
x = self.db_api.user_get_storage_usage(self.context1, self.owner_id1)
self.assertEqual(total, x)
def test_storage_quota_without_image_id(self):
total = reduce(lambda x, y: x + y,
[f['size'] for f in self.owner1_fixtures])
total = total - self.owner1_fixtures[0]['size']
x = self.db_api.user_get_storage_usage(
self.context1, self.owner_id1,
image_id=self.owner1_fixtures[0]['id'])
self.assertEqual(total, x)
def test_storage_quota_multiple_locations(self):
dt1 = timeutils.utcnow()
sz = 53
new_fixture_dict = {'id': str(uuid.uuid4()), 'created_at': dt1,
'updated_at': dt1, 'size': sz,
'owner': self.owner_id1}
new_fixture = build_image_fixture(**new_fixture_dict)
new_fixture['locations'].append({'url': 'file:///some/path/file',
'metadata': {},
'status': 'active'})
self.db_api.image_create(self.context1, new_fixture)
total = reduce(lambda x, y: x + y,
[f['size'] for f in self.owner1_fixtures]) + (sz * 2)
x = self.db_api.user_get_storage_usage(self.context1, self.owner_id1)
self.assertEqual(total, x)
def test_storage_quota_deleted_image(self):
# NOTE(flaper87): This needs to be tested for
# soft deleted images as well. Currently there's no
# good way to delete locations.
dt1 = timeutils.utcnow()
sz = 53
image_id = str(uuid.uuid4())
new_fixture_dict = {'id': image_id, 'created_at': dt1,
'updated_at': dt1, 'size': sz,
'owner': self.owner_id1}
new_fixture = build_image_fixture(**new_fixture_dict)
new_fixture['locations'].append({'url': 'file:///some/path/file',
'metadata': {},
'status': 'active'})
self.db_api.image_create(self.context1, new_fixture)
total = reduce(lambda x, y: x + y,
[f['size'] for f in self.owner1_fixtures])
x = self.db_api.user_get_storage_usage(self.context1, self.owner_id1)
self.assertEqual(total + (sz * 2), x)
self.db_api.image_destroy(self.context1, image_id)
x = self.db_api.user_get_storage_usage(self.context1, self.owner_id1)
self.assertEqual(total, x)
class TaskTests(test_utils.BaseTestCase):
def setUp(self):
super(TaskTests, self).setUp()
self.admin_id = 'admin'
self.owner_id = 'user'
self.adm_context = context.RequestContext(
is_admin=True, auth_token='user:admin:admin', tenant=self.admin_id)
self.context = context.RequestContext(
is_admin=False, auth_token='user:user:user', user=self.owner_id)
self.db_api = db_tests.get_db(self.config)
self.fixtures = self.build_task_fixtures()
db_tests.reset_db(self.db_api)
def build_task_fixtures(self):
self.context.project_id = str(uuid.uuid4())
fixtures = [
{
'owner': self.context.owner,
'type': 'import',
'input': {'import_from': 'file:///a.img',
'import_from_format': 'qcow2',
'image_properties': {
"name": "GreatStack 1.22",
"tags": ["lamp", "custom"]
}},
},
{
'owner': self.context.owner,
'type': 'import',
'input': {'import_from': 'file:///b.img',
'import_from_format': 'qcow2',
'image_properties': {
"name": "GreatStack 1.23",
"tags": ["lamp", "good"]
}},
},
{
'owner': self.context.owner,
"type": "export",
"input": {
"export_uuid": "deadbeef-dead-dead-dead-beefbeefbeef",
"export_to":
"swift://cloud.foo/myaccount/mycontainer/path",
"export_format": "qcow2"
}
},
]
return [build_task_fixture(**fixture) for fixture in fixtures]
def test_task_get_all_with_filter(self):
for fixture in self.fixtures:
self.db_api.task_create(self.adm_context,
build_task_fixture(**fixture))
import_tasks = self.db_api.task_get_all(self.adm_context,
filters={'type': 'import'})
self.assertTrue(import_tasks)
self.assertEqual(2, len(import_tasks))
for task in import_tasks:
self.assertEqual('import', task['type'])
self.assertEqual(self.context.owner, task['owner'])
def test_task_get_all_as_admin(self):
tasks = []
for fixture in self.fixtures:
task = self.db_api.task_create(self.adm_context,
build_task_fixture(**fixture))
tasks.append(task)
import_tasks = self.db_api.task_get_all(self.adm_context)
self.assertTrue(import_tasks)
self.assertEqual(3, len(import_tasks))
def test_task_get_all_marker(self):
for fixture in self.fixtures:
self.db_api.task_create(self.adm_context,
build_task_fixture(**fixture))
tasks = self.db_api.task_get_all(self.adm_context, sort_key='id')
task_ids = [t['id'] for t in tasks]
tasks = self.db_api.task_get_all(self.adm_context, sort_key='id',
marker=task_ids[0])
self.assertEqual(2, len(tasks))
def test_task_get_all_limit(self):
for fixture in self.fixtures:
self.db_api.task_create(self.adm_context,
build_task_fixture(**fixture))
tasks = self.db_api.task_get_all(self.adm_context, limit=2)
self.assertEqual(2, len(tasks))
# A limit of None should not equate to zero
tasks = self.db_api.task_get_all(self.adm_context, limit=None)
self.assertEqual(3, len(tasks))
# A limit of zero should actually mean zero
tasks = self.db_api.task_get_all(self.adm_context, limit=0)
self.assertEqual(0, len(tasks))
def test_task_get_all_owned(self):
then = timeutils.utcnow() + datetime.timedelta(days=365)
TENANT1 = str(uuid.uuid4())
ctxt1 = context.RequestContext(is_admin=False,
tenant=TENANT1,
auth_token='user:%s:user' % TENANT1)
task_values = {'type': 'import', 'status': 'pending',
'input': '{"loc": "fake"}', 'owner': TENANT1,
'expires_at': then}
self.db_api.task_create(ctxt1, task_values)
TENANT2 = str(uuid.uuid4())
ctxt2 = context.RequestContext(is_admin=False,
tenant=TENANT2,
auth_token='user:%s:user' % TENANT2)
task_values = {'type': 'export', 'status': 'pending',
'input': '{"loc": "fake"}', 'owner': TENANT2,
'expires_at': then}
self.db_api.task_create(ctxt2, task_values)
tasks = self.db_api.task_get_all(ctxt1)
task_owners = set([task['owner'] for task in tasks])
expected = set([TENANT1])
self.assertEqual(sorted(expected), sorted(task_owners))
def test_task_get(self):
expires_at = timeutils.utcnow()
image_id = str(uuid.uuid4())
fixture = {
'owner': self.context.owner,
'type': 'import',
'status': 'pending',
'input': '{"loc": "fake"}',
'result': "{'image_id': %s}" % image_id,
'message': 'blah',
'expires_at': expires_at
}
task = self.db_api.task_create(self.adm_context, fixture)
self.assertIsNotNone(task)
self.assertIsNotNone(task['id'])
task_id = task['id']
task = self.db_api.task_get(self.adm_context, task_id)
self.assertIsNotNone(task)
self.assertEqual(task_id, task['id'])
self.assertEqual(self.context.owner, task['owner'])
self.assertEqual('import', task['type'])
self.assertEqual('pending', task['status'])
self.assertEqual(fixture['input'], task['input'])
self.assertEqual(fixture['result'], task['result'])
self.assertEqual(fixture['message'], task['message'])
self.assertEqual(expires_at, task['expires_at'])
def test_task_get_all(self):
now = timeutils.utcnow()
then = now + datetime.timedelta(days=365)
image_id = str(uuid.uuid4())
fixture1 = {
'owner': self.context.owner,
'type': 'import',
'status': 'pending',
'input': '{"loc": "fake_1"}',
'result': "{'image_id': %s}" % image_id,
'message': 'blah_1',
'expires_at': then,
'created_at': now,
'updated_at': now
}
fixture2 = {
'owner': self.context.owner,
'type': 'import',
'status': 'pending',
'input': '{"loc": "fake_2"}',
'result': "{'image_id': %s}" % image_id,
'message': 'blah_2',
'expires_at': then,
'created_at': now,
'updated_at': now
}
task1 = self.db_api.task_create(self.adm_context, fixture1)
task2 = self.db_api.task_create(self.adm_context, fixture2)
self.assertIsNotNone(task1)
self.assertIsNotNone(task2)
task1_id = task1['id']
task2_id = task2['id']
task_fixtures = {task1_id: fixture1, task2_id: fixture2}
tasks = self.db_api.task_get_all(self.adm_context)
self.assertEqual(2, len(tasks))
self.assertEqual(set((tasks[0]['id'], tasks[1]['id'])),
set((task1_id, task2_id)))
for task in tasks:
fixture = task_fixtures[task['id']]
self.assertEqual(self.context.owner, task['owner'])
self.assertEqual(fixture['type'], task['type'])
self.assertEqual(fixture['status'], task['status'])
self.assertEqual(fixture['expires_at'], task['expires_at'])
self.assertFalse(task['deleted'])
self.assertIsNone(task['deleted_at'])
self.assertEqual(fixture['created_at'], task['created_at'])
self.assertEqual(fixture['updated_at'], task['updated_at'])
task_details_keys = ['input', 'message', 'result']
for key in task_details_keys:
self.assertNotIn(key, task)
def test_task_soft_delete(self):
now = timeutils.utcnow()
then = now + datetime.timedelta(days=365)
fixture1 = build_task_fixture(id='1', expires_at=now,
owner=self.adm_context.owner)
fixture2 = build_task_fixture(id='2', expires_at=now,
owner=self.adm_context.owner)
fixture3 = build_task_fixture(id='3', expires_at=then,
owner=self.adm_context.owner)
fixture4 = build_task_fixture(id='4', expires_at=then,
owner=self.adm_context.owner)
task1 = self.db_api.task_create(self.adm_context, fixture1)
task2 = self.db_api.task_create(self.adm_context, fixture2)
task3 = self.db_api.task_create(self.adm_context, fixture3)
task4 = self.db_api.task_create(self.adm_context, fixture4)
self.assertIsNotNone(task1)
self.assertIsNotNone(task2)
self.assertIsNotNone(task3)
self.assertIsNotNone(task4)
tasks = self.db_api.task_get_all(
self.adm_context, sort_key='id', sort_dir='asc')
self.assertEqual(4, len(tasks))
self.assertTrue(tasks[0]['deleted'])
self.assertTrue(tasks[1]['deleted'])
self.assertFalse(tasks[2]['deleted'])
self.assertFalse(tasks[3]['deleted'])
def test_task_create(self):
task_id = str(uuid.uuid4())
self.context.project_id = self.context.owner
values = {
'id': task_id,
'owner': self.context.owner,
'type': 'export',
'status': 'pending',
}
task_values = build_task_fixture(**values)
task = self.db_api.task_create(self.adm_context, task_values)
self.assertIsNotNone(task)
self.assertEqual(task_id, task['id'])
self.assertEqual(self.context.owner, task['owner'])
self.assertEqual('export', task['type'])
self.assertEqual('pending', task['status'])
self.assertEqual({'ping': 'pong'}, task['input'])
def test_task_create_with_all_task_info_null(self):
task_id = str(uuid.uuid4())
self.context.project_id = str(uuid.uuid4())
values = {
'id': task_id,
'owner': self.context.owner,
'type': 'export',
'status': 'pending',
'input': None,
'result': None,
'message': None,
}
task_values = build_task_fixture(**values)
task = self.db_api.task_create(self.adm_context, task_values)
self.assertIsNotNone(task)
self.assertEqual(task_id, task['id'])
self.assertEqual(self.context.owner, task['owner'])
self.assertEqual('export', task['type'])
self.assertEqual('pending', task['status'])
self.assertIsNone(task['input'])
self.assertIsNone(task['result'])
self.assertIsNone(task['message'])
def test_task_update(self):
self.context.project_id = str(uuid.uuid4())
result = {'foo': 'bar'}
task_values = build_task_fixture(owner=self.context.owner,
result=result)
task = self.db_api.task_create(self.adm_context, task_values)
task_id = task['id']
fixture = {
'status': 'processing',
'message': 'This is a error string',
}
self.delay_inaccurate_clock()
task = self.db_api.task_update(self.adm_context, task_id, fixture)
self.assertEqual(task_id, task['id'])
self.assertEqual(self.context.owner, task['owner'])
self.assertEqual('import', task['type'])
self.assertEqual('processing', task['status'])
self.assertEqual({'ping': 'pong'}, task['input'])
self.assertEqual(result, task['result'])
self.assertEqual('This is a error string', task['message'])
self.assertFalse(task['deleted'])
self.assertIsNone(task['deleted_at'])
self.assertIsNone(task['expires_at'])
self.assertEqual(task_values['created_at'], task['created_at'])
self.assertGreater(task['updated_at'], task['created_at'])
def test_task_update_with_all_task_info_null(self):
self.context.project_id = str(uuid.uuid4())
task_values = build_task_fixture(owner=self.context.owner,
input=None,
result=None,
message=None)
task = self.db_api.task_create(self.adm_context, task_values)
task_id = task['id']
fixture = {'status': 'processing'}
self.delay_inaccurate_clock()
task = self.db_api.task_update(self.adm_context, task_id, fixture)
self.assertEqual(task_id, task['id'])
self.assertEqual(self.context.owner, task['owner'])
self.assertEqual('import', task['type'])
self.assertEqual('processing', task['status'])
self.assertIsNone(task['input'])
self.assertIsNone(task['result'])
self.assertIsNone(task['message'])
self.assertFalse(task['deleted'])
self.assertIsNone(task['deleted_at'])
self.assertIsNone(task['expires_at'])
self.assertEqual(task_values['created_at'], task['created_at'])
self.assertGreater(task['updated_at'], task['created_at'])
def test_task_delete(self):
task_values = build_task_fixture(owner=self.context.owner)
task = self.db_api.task_create(self.adm_context, task_values)
self.assertIsNotNone(task)
self.assertFalse(task['deleted'])
self.assertIsNone(task['deleted_at'])
task_id = task['id']
self.db_api.task_delete(self.adm_context, task_id)
self.assertRaises(exception.TaskNotFound, self.db_api.task_get,
self.context, task_id)
def test_task_delete_as_admin(self):
task_values = build_task_fixture(owner=self.context.owner)
task = self.db_api.task_create(self.adm_context, task_values)
self.assertIsNotNone(task)
self.assertFalse(task['deleted'])
self.assertIsNone(task['deleted_at'])
task_id = task['id']
self.db_api.task_delete(self.adm_context, task_id)
del_task = self.db_api.task_get(self.adm_context,
task_id,
force_show_deleted=True)
self.assertIsNotNone(del_task)
self.assertEqual(task_id, del_task['id'])
self.assertTrue(del_task['deleted'])
self.assertIsNotNone(del_task['deleted_at'])
class DBPurgeTests(test_utils.BaseTestCase):
def setUp(self):
super(DBPurgeTests, self).setUp()
self.adm_context = context.get_admin_context(show_deleted=True)
self.db_api = db_tests.get_db(self.config)
db_tests.reset_db(self.db_api)
self.context = context.RequestContext(is_admin=True)
self.image_fixtures, self.task_fixtures = self.build_fixtures()
self.create_tasks(self.task_fixtures)
self.create_images(self.image_fixtures)
def build_fixtures(self):
dt1 = timeutils.utcnow() - datetime.timedelta(days=5)
dt2 = dt1 + datetime.timedelta(days=1)
dt3 = dt2 + datetime.timedelta(days=1)
fixtures = [
{
'created_at': dt1,
'updated_at': dt1,
'deleted_at': dt3,
'deleted': True,
},
{
'created_at': dt1,
'updated_at': dt2,
'deleted_at': timeutils.utcnow(),
'deleted': True,
},
{
'created_at': dt2,
'updated_at': dt2,
'deleted_at': None,
'deleted': False,
},
]
return (
[build_image_fixture(**fixture) for fixture in fixtures],
[build_task_fixture(**fixture) for fixture in fixtures],
)
def create_images(self, images):
for fixture in images:
self.db_api.image_create(self.adm_context, fixture)
def create_tasks(self, tasks):
for fixture in tasks:
self.db_api.task_create(self.adm_context, fixture)
def test_db_purge(self):
self.db_api.purge_deleted_rows(self.adm_context, 1, 5)
images = self.db_api.image_get_all(self.adm_context)
# Verify that no records from images have been deleted
# as images table will be purged using 'purge_images_table'
# command.
self.assertEqual(len(images), 3)
tasks = self.db_api.task_get_all(self.adm_context)
self.assertEqual(len(tasks), 2)
def test_db_purge_images_table(self):
# purge records from images_tags table
self.db_api.purge_deleted_rows(self.adm_context, 1, 5)
# purge records from images table
self.db_api.purge_deleted_rows_from_images(self.adm_context, 1, 5)
images = self.db_api.image_get_all(self.adm_context)
self.assertEqual(len(images), 2)
tasks = self.db_api.task_get_all(self.adm_context)
self.assertEqual(len(tasks), 2)
def test_purge_images_table_fk_constraint_failure(self):
"""Test foreign key constraint failure
Test whether foreign key constraint failure during purge
operation is raising DBReferenceError or not.
"""
session = db_api.get_session()
engine = db_api.get_engine()
connection = engine.connect()
dialect = engine.url.get_dialect()
if dialect == sqlite.dialect:
# We're seeing issues with foreign key support in SQLite 3.6.20
# SQLAlchemy doesn't support it at all with SQLite < 3.6.19
# It works fine in SQLite 3.7.
# So return early to skip this test if running SQLite < 3.7
if test_utils.is_sqlite_version_prior_to(3, 7):
self.skipTest(
'sqlite version too old for reliable SQLA foreign_keys')
# This is required for enforcing Foreign Key Constraint
# in SQLite 3.x
connection.execute("PRAGMA foreign_keys = ON")
images = sqlalchemyutils.get_table(
engine, "images")
image_tags = sqlalchemyutils.get_table(
engine, "image_tags")
# Add a 4th row in images table and set it deleted 15 days ago
uuidstr = uuid.uuid4().hex
created_time = timeutils.utcnow() - datetime.timedelta(days=20)
deleted_time = created_time + datetime.timedelta(days=5)
images_row_fixture = {
'id': uuidstr,
'status': 'status',
'created_at': created_time,
'deleted_at': deleted_time,
'deleted': 1,
'visibility': 'public',
'min_disk': 1,
'min_ram': 1,
'protected': 0
}
ins_stmt = images.insert().values(**images_row_fixture)
connection.execute(ins_stmt)
# Add a record in image_tags referencing the above images record
# but do not set it as deleted
image_tags_row_fixture = {
'image_id': uuidstr,
'value': 'tag_value',
'created_at': created_time,
'deleted': 0
}
ins_stmt = image_tags.insert().values(**image_tags_row_fixture)
connection.execute(ins_stmt)
# Purge all records deleted at least 10 days ago
self.assertRaises(db_exception.DBReferenceError,
db_api.purge_deleted_rows_from_images,
self.adm_context,
age_in_days=10,
max_rows=50)
# Verify that no records from images have been deleted
# due to DBReferenceError being raised
images_rows = session.query(images).count()
self.assertEqual(4, images_rows)
def test_purge_task_info_with_refs_to_soft_deleted_tasks(self):
session = db_api.get_session()
engine = db_api.get_engine()
# check initial task and task_info row number are 3
tasks = self.db_api.task_get_all(self.adm_context)
self.assertEqual(3, len(tasks))
task_info = sqlalchemyutils.get_table(engine, 'task_info')
task_info_rows = session.query(task_info).count()
self.assertEqual(3, task_info_rows)
# purge soft deleted rows older than yesterday
self.db_api.purge_deleted_rows(self.context, 1, 5)
# check 1 row of task table is purged
tasks = self.db_api.task_get_all(self.adm_context)
self.assertEqual(2, len(tasks))
# and no task_info was left behind, 1 row purged
task_info_rows = session.query(task_info).count()
self.assertEqual(2, task_info_rows)
class TestVisibility(test_utils.BaseTestCase):
def setUp(self):
super(TestVisibility, self).setUp()
self.db_api = db_tests.get_db(self.config)
db_tests.reset_db(self.db_api)
self.setup_tenants()
self.setup_contexts()
self.fixtures = self.build_image_fixtures()
self.create_images(self.fixtures)
def setup_tenants(self):
self.admin_tenant = str(uuid.uuid4())
self.tenant1 = str(uuid.uuid4())
self.tenant2 = str(uuid.uuid4())
def setup_contexts(self):
self.admin_context = context.RequestContext(
is_admin=True, tenant=self.admin_tenant)
self.admin_none_context = context.RequestContext(
is_admin=True, tenant=None)
self.tenant1_context = context.RequestContext(tenant=self.tenant1)
self.tenant2_context = context.RequestContext(tenant=self.tenant2)
self.none_context = context.RequestContext(tenant=None)
def build_image_fixtures(self):
fixtures = []
owners = {
'Unowned': None,
'Admin Tenant': self.admin_tenant,
'Tenant 1': self.tenant1,
'Tenant 2': self.tenant2,
}
visibilities = ['community', 'private', 'public', 'shared']
for owner_label, owner in owners.items():
for visibility in visibilities:
fixture = {
'name': '%s, %s' % (owner_label, visibility),
'owner': owner,
'visibility': visibility,
}
fixtures.append(fixture)
return [build_image_fixture(**f) for f in fixtures]
def create_images(self, images):
for fixture in images:
self.db_api.image_create(self.admin_context, fixture)
class VisibilityTests(object):
def test_unknown_admin_sees_all_but_community(self):
images = self.db_api.image_get_all(self.admin_none_context)
self.assertEqual(12, len(images))
def test_unknown_admin_is_public_true(self):
images = self.db_api.image_get_all(self.admin_none_context,
is_public=True)
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('public', i['visibility'])
def test_unknown_admin_is_public_false(self):
images = self.db_api.image_get_all(self.admin_none_context,
is_public=False)
self.assertEqual(8, len(images))
for i in images:
self.assertTrue(i['visibility'] in ['shared', 'private'])
def test_unknown_admin_is_public_none(self):
images = self.db_api.image_get_all(self.admin_none_context)
self.assertEqual(12, len(images))
def test_unknown_admin_visibility_public(self):
images = self.db_api.image_get_all(self.admin_none_context,
filters={'visibility': 'public'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('public', i['visibility'])
def test_unknown_admin_visibility_shared(self):
images = self.db_api.image_get_all(self.admin_none_context,
filters={'visibility': 'shared'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('shared', i['visibility'])
def test_unknown_admin_visibility_private(self):
images = self.db_api.image_get_all(self.admin_none_context,
filters={'visibility': 'private'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('private', i['visibility'])
def test_unknown_admin_visibility_community(self):
images = self.db_api.image_get_all(self.admin_none_context,
filters={'visibility': 'community'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('community', i['visibility'])
def test_unknown_admin_visibility_all(self):
images = self.db_api.image_get_all(self.admin_none_context,
filters={'visibility': 'all'})
self.assertEqual(16, len(images))
def test_known_admin_sees_all_but_others_community_images(self):
images = self.db_api.image_get_all(self.admin_context)
self.assertEqual(13, len(images))
def test_known_admin_is_public_true(self):
images = self.db_api.image_get_all(self.admin_context, is_public=True)
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('public', i['visibility'])
def test_known_admin_is_public_false(self):
images = self.db_api.image_get_all(self.admin_context,
is_public=False)
self.assertEqual(9, len(images))
for i in images:
self.assertTrue(i['visibility']
in ['shared', 'private', 'community'])
def test_known_admin_is_public_none(self):
images = self.db_api.image_get_all(self.admin_context)
self.assertEqual(13, len(images))
def test_admin_as_user_true(self):
images = self.db_api.image_get_all(self.admin_context,
admin_as_user=True)
self.assertEqual(7, len(images))
for i in images:
self.assertTrue(('public' == i['visibility'])
or i['owner'] == self.admin_tenant)
def test_known_admin_visibility_public(self):
images = self.db_api.image_get_all(self.admin_context,
filters={'visibility': 'public'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('public', i['visibility'])
def test_known_admin_visibility_shared(self):
images = self.db_api.image_get_all(self.admin_context,
filters={'visibility': 'shared'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('shared', i['visibility'])
def test_known_admin_visibility_private(self):
images = self.db_api.image_get_all(self.admin_context,
filters={'visibility': 'private'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('private', i['visibility'])
def test_known_admin_visibility_community(self):
images = self.db_api.image_get_all(self.admin_context,
filters={'visibility': 'community'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('community', i['visibility'])
def test_known_admin_visibility_all(self):
images = self.db_api.image_get_all(self.admin_context,
filters={'visibility': 'all'})
self.assertEqual(16, len(images))
def test_what_unknown_user_sees(self):
images = self.db_api.image_get_all(self.none_context)
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('public', i['visibility'])
def test_unknown_user_is_public_true(self):
images = self.db_api.image_get_all(self.none_context, is_public=True)
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('public', i['visibility'])
def test_unknown_user_is_public_false(self):
images = self.db_api.image_get_all(self.none_context, is_public=False)
self.assertEqual(0, len(images))
def test_unknown_user_is_public_none(self):
images = self.db_api.image_get_all(self.none_context)
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('public', i['visibility'])
def test_unknown_user_visibility_public(self):
images = self.db_api.image_get_all(self.none_context,
filters={'visibility': 'public'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('public', i['visibility'])
def test_unknown_user_visibility_shared(self):
images = self.db_api.image_get_all(self.none_context,
filters={'visibility': 'shared'})
self.assertEqual(0, len(images))
def test_unknown_user_visibility_private(self):
images = self.db_api.image_get_all(self.none_context,
filters={'visibility': 'private'})
self.assertEqual(0, len(images))
def test_unknown_user_visibility_community(self):
images = self.db_api.image_get_all(self.none_context,
filters={'visibility': 'community'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('community', i['visibility'])
def test_unknown_user_visibility_all(self):
images = self.db_api.image_get_all(self.none_context,
filters={'visibility': 'all'})
self.assertEqual(8, len(images))
def test_what_tenant1_sees(self):
images = self.db_api.image_get_all(self.tenant1_context)
self.assertEqual(7, len(images))
for i in images:
if not ('public' == i['visibility']):
self.assertEqual(i['owner'], self.tenant1)
def test_tenant1_is_public_true(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=True)
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('public', i['visibility'])
def test_tenant1_is_public_false(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=False)
self.assertEqual(3, len(images))
for i in images:
self.assertEqual(i['owner'], self.tenant1)
self.assertTrue(i['visibility']
in ['private', 'shared', 'community'])
def test_tenant1_is_public_none(self):
images = self.db_api.image_get_all(self.tenant1_context)
self.assertEqual(7, len(images))
for i in images:
if not ('public' == i['visibility']):
self.assertEqual(self.tenant1, i['owner'])
def test_tenant1_visibility_public(self):
images = self.db_api.image_get_all(self.tenant1_context,
filters={'visibility': 'public'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('public', i['visibility'])
def test_tenant1_visibility_shared(self):
images = self.db_api.image_get_all(self.tenant1_context,
filters={'visibility': 'shared'})
self.assertEqual(1, len(images))
self.assertEqual('shared', images[0]['visibility'])
self.assertEqual(self.tenant1, images[0]['owner'])
def test_tenant1_visibility_private(self):
images = self.db_api.image_get_all(self.tenant1_context,
filters={'visibility': 'private'})
self.assertEqual(1, len(images))
self.assertEqual('private', images[0]['visibility'])
self.assertEqual(self.tenant1, images[0]['owner'])
def test_tenant1_visibility_community(self):
images = self.db_api.image_get_all(self.tenant1_context,
filters={'visibility': 'community'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('community', i['visibility'])
def test_tenant1_visibility_all(self):
images = self.db_api.image_get_all(self.tenant1_context,
filters={'visibility': 'all'})
self.assertEqual(10, len(images))
def _setup_is_public_red_herring(self):
values = {
'name': 'Red Herring',
'owner': self.tenant1,
'visibility': 'shared',
'properties': {'is_public': 'silly'}
}
fixture = build_image_fixture(**values)
self.db_api.image_create(self.admin_context, fixture)
def test_is_public_is_a_normal_filter_for_admin(self):
self._setup_is_public_red_herring()
images = self.db_api.image_get_all(self.admin_context,
filters={'is_public': 'silly'})
self.assertEqual(1, len(images))
self.assertEqual('Red Herring', images[0]['name'])
def test_is_public_is_a_normal_filter_for_user(self):
self._setup_is_public_red_herring()
images = self.db_api.image_get_all(self.tenant1_context,
filters={'is_public': 'silly'})
self.assertEqual(1, len(images))
self.assertEqual('Red Herring', images[0]['name'])
# NOTE(markwash): the following tests are sanity checks to make sure
# visibility filtering and is_public=(True|False) do not interact in
# unexpected ways. However, using both of the filtering techniques
# simultaneously is not an anticipated use case.
def test_admin_is_public_true_and_visibility_public(self):
images = self.db_api.image_get_all(self.admin_context, is_public=True,
filters={'visibility': 'public'})
self.assertEqual(4, len(images))
def test_admin_is_public_false_and_visibility_public(self):
images = self.db_api.image_get_all(self.admin_context, is_public=False,
filters={'visibility': 'public'})
self.assertEqual(0, len(images))
def test_admin_is_public_true_and_visibility_shared(self):
images = self.db_api.image_get_all(self.admin_context, is_public=True,
filters={'visibility': 'shared'})
self.assertEqual(0, len(images))
def test_admin_is_public_false_and_visibility_shared(self):
images = self.db_api.image_get_all(self.admin_context, is_public=False,
filters={'visibility': 'shared'})
self.assertEqual(4, len(images))
def test_admin_is_public_true_and_visibility_private(self):
images = self.db_api.image_get_all(self.admin_context, is_public=True,
filters={'visibility': 'private'})
self.assertEqual(0, len(images))
def test_admin_is_public_false_and_visibility_private(self):
images = self.db_api.image_get_all(self.admin_context, is_public=False,
filters={'visibility': 'private'})
self.assertEqual(4, len(images))
def test_admin_is_public_true_and_visibility_community(self):
images = self.db_api.image_get_all(self.admin_context, is_public=True,
filters={'visibility': 'community'})
self.assertEqual(0, len(images))
def test_admin_is_public_false_and_visibility_community(self):
images = self.db_api.image_get_all(self.admin_context, is_public=False,
filters={'visibility': 'community'})
self.assertEqual(4, len(images))
def test_tenant1_is_public_true_and_visibility_public(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=True,
filters={'visibility': 'public'})
self.assertEqual(4, len(images))
def test_tenant1_is_public_false_and_visibility_public(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=False,
filters={'visibility': 'public'})
self.assertEqual(0, len(images))
def test_tenant1_is_public_true_and_visibility_shared(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=True,
filters={'visibility': 'shared'})
self.assertEqual(0, len(images))
def test_tenant1_is_public_false_and_visibility_shared(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=False,
filters={'visibility': 'shared'})
self.assertEqual(1, len(images))
def test_tenant1_is_public_true_and_visibility_private(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=True,
filters={'visibility': 'private'})
self.assertEqual(0, len(images))
def test_tenant1_is_public_false_and_visibility_private(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=False,
filters={'visibility': 'private'})
self.assertEqual(1, len(images))
def test_tenant1_is_public_true_and_visibility_community(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=True,
filters={'visibility': 'community'})
self.assertEqual(0, len(images))
def test_tenant1_is_public_false_and_visibility_community(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=False,
filters={'visibility': 'community'})
self.assertEqual(4, len(images))
class TestMembershipVisibility(test_utils.BaseTestCase):
def setUp(self):
super(TestMembershipVisibility, self).setUp()
self.db_api = db_tests.get_db(self.config)
db_tests.reset_db(self.db_api)
self._create_contexts()
self._create_images()
def _create_contexts(self):
self.owner1, self.owner1_ctx = self._user_fixture()
self.owner2, self.owner2_ctx = self._user_fixture()
self.tenant1, self.user1_ctx = self._user_fixture()
self.tenant2, self.user2_ctx = self._user_fixture()
self.tenant3, self.user3_ctx = self._user_fixture()
self.admin_tenant, self.admin_ctx = self._user_fixture(admin=True)
def _user_fixture(self, admin=False):
tenant_id = str(uuid.uuid4())
ctx = context.RequestContext(tenant=tenant_id, is_admin=admin)
return tenant_id, ctx
def _create_images(self):
self.image_ids = {}
for owner in [self.owner1, self.owner2]:
self._create_image('not_shared', owner)
self._create_image('shared-with-1', owner, members=[self.tenant1])
self._create_image('shared-with-2', owner, members=[self.tenant2])
self._create_image('shared-with-both', owner,
members=[self.tenant1, self.tenant2])
def _create_image(self, name, owner, members=None):
image = build_image_fixture(name=name, owner=owner,
visibility='shared')
self.image_ids[(owner, name)] = image['id']
self.db_api.image_create(self.admin_ctx, image)
for member in members or []:
member = {'image_id': image['id'], 'member': member}
self.db_api.image_member_create(self.admin_ctx, member)
class MembershipVisibilityTests(object):
def _check_by_member(self, ctx, member_id, expected):
members = self.db_api.image_member_find(ctx, member=member_id)
images = [self.db_api.image_get(self.admin_ctx, member['image_id'])
for member in members]
facets = [(image['owner'], image['name']) for image in images]
self.assertEqual(set(expected), set(facets))
def test_owner1_finding_user1_memberships(self):
"""Owner1 should see images it owns that are shared with User1."""
expected = [
(self.owner1, 'shared-with-1'),
(self.owner1, 'shared-with-both'),
]
self._check_by_member(self.owner1_ctx, self.tenant1, expected)
def test_user1_finding_user1_memberships(self):
"""User1 should see all images shared with User1 """
expected = [
(self.owner1, 'shared-with-1'),
(self.owner1, 'shared-with-both'),
(self.owner2, 'shared-with-1'),
(self.owner2, 'shared-with-both'),
]
self._check_by_member(self.user1_ctx, self.tenant1, expected)
def test_user2_finding_user1_memberships(self):
"""User2 should see no images shared with User1 """
expected = []
self._check_by_member(self.user2_ctx, self.tenant1, expected)
def test_admin_finding_user1_memberships(self):
"""Admin should see all images shared with User1 """
expected = [
(self.owner1, 'shared-with-1'),
(self.owner1, 'shared-with-both'),
(self.owner2, 'shared-with-1'),
(self.owner2, 'shared-with-both'),
]
self._check_by_member(self.admin_ctx, self.tenant1, expected)
def _check_by_image(self, context, image_id, expected):
members = self.db_api.image_member_find(context, image_id=image_id)
member_ids = [member['member'] for member in members]
self.assertEqual(set(expected), set(member_ids))
def test_owner1_finding_owner1s_image_members(self):
"""Owner1 should see all memberships of its image """
expected = [self.tenant1, self.tenant2]
image_id = self.image_ids[(self.owner1, 'shared-with-both')]
self._check_by_image(self.owner1_ctx, image_id, expected)
def test_admin_finding_owner1s_image_members(self):
"""Admin should see all memberships of owner1's image """
expected = [self.tenant1, self.tenant2]
image_id = self.image_ids[(self.owner1, 'shared-with-both')]
self._check_by_image(self.admin_ctx, image_id, expected)
def test_user1_finding_owner1s_image_members(self):
"""User1 should see its own membership of owner1's image """
expected = [self.tenant1]
image_id = self.image_ids[(self.owner1, 'shared-with-both')]
self._check_by_image(self.user1_ctx, image_id, expected)
def test_user2_finding_owner1s_image_members(self):
"""User2 should see its own membership of owner1's image """
expected = [self.tenant2]
image_id = self.image_ids[(self.owner1, 'shared-with-both')]
self._check_by_image(self.user2_ctx, image_id, expected)
def test_user3_finding_owner1s_image_members(self):
"""User3 should see no memberships of owner1's image """
expected = []
image_id = self.image_ids[(self.owner1, 'shared-with-both')]
self._check_by_image(self.user3_ctx, image_id, expected)