Clean up is_public filtering in image_get_all

Prior to this patch, is_public filtering was applied in a
counterintuitive and potentially dangerous manner. In particular,
is_public=True would return images where is_public=False. Also, in a
variety of cases (that weren't exposed in the http api) users could list
private images that they do not own. This patch fixes those problems by
making is_public filtering work the same as any other filter and adding
separate visibility restrictions that always apply to list queries. To
preserve the existing behavior of the v1 api where admins by default do
not see all private images, admin contexts are deescalated to normal
contexts in certain situations.

With this change, pep8 ignores E712 because it is normal to use
"column == True" in sqlalchemy.

Fixes bug 1060481.
Fixes bug 1061331.

Change-Id: I086bd9273e337ebe184902b6f12bc8c9a7fc5867
This commit is contained in:
Mark Washenberger 2012-10-05 21:01:01 +00:00 committed by Brian Waldon
parent 8fdc05aa7d
commit 54607f764d
8 changed files with 198 additions and 67 deletions

View File

@ -113,10 +113,7 @@ class ImagesController(object):
sort_dir='desc', filters={}):
self._enforce(req, 'get_images')
filters['deleted'] = False
#NOTE(bcwaldon): is_public=True gets public images and those
# owned by the authenticated tenant
result = {}
filters.setdefault('is_public', True)
if limit is None:
limit = CONF.limit_param_default
limit = min(CONF.api_limit_max, limit)

View File

@ -119,9 +119,15 @@ def _filter_images(images, filters, context):
prop_filter = filters.pop('properties')
filters.update(prop_filter)
is_public_filter = filters.pop('is_public', None)
if 'is_public' in filters and filters['is_public'] is None:
filters.pop('is_public')
for i, image in enumerate(images):
has_ownership = context.owner and image['owner'] == context.owner
can_see = image['is_public'] or has_ownership or context.is_admin
if not can_see:
continue
add = True
for k, value in filters.iteritems():
key = k
@ -152,22 +158,6 @@ def _filter_images(images, filters, context):
if add:
filtered_images.append(image)
#NOTE(bcwaldon): This hacky code is only here to match what the
# sqlalchemy driver wants - refactor it out of the db layer when
# it seems appropriate
if is_public_filter is not None:
def _filter_ownership(image):
has_ownership = image['owner'] == context.owner
can_see = image['is_public'] or has_ownership or context.is_admin
if can_see:
return (has_ownership or
(can_see and image['is_public'] == is_public_filter))
else:
return False
filtered_images = filter(_filter_ownership, filtered_images)
return filtered_images

View File

@ -446,19 +446,21 @@ def image_get_all(context, filters=None, marker=None, limit=None,
query = session.query(models.Image)\
.options(sa_orm.joinedload(models.Image.properties))
if 'is_public' in filters and filters['is_public'] is not None:
the_filter = [models.Image.is_public == filters['is_public']]
if filters['is_public'] and context.owner is not None:
the_filter.extend([
(models.Image.owner == context.owner),
models.Image.members.any(member=context.owner, deleted=False)
])
if len(the_filter) > 1:
query = query.filter(sa_sql.or_(*the_filter))
else:
query = query.filter(the_filter[0])
# NOTE(markwash) treat is_public=None as if it weren't filtered
if 'is_public' in filters and filters['is_public'] is None:
del filters['is_public']
if not context.is_admin:
visibility_filters = [models.Image.is_public == True]
if context.owner is not None:
visibility_filters.extend([
models.Image.owner == context.owner,
models.Image.members.any(member=context.owner, deleted=False),
])
query = query.filter(sa_sql.or_(*visibility_filters))
showing_deleted = False
if 'changes-since' in filters:
# normalize timestamp to UTC, as sqlalchemy doesn't appear to

View File

@ -56,12 +56,19 @@ class Controller(object):
self.db_api = glance.db.get_api()
self.db_api.configure_db()
def _get_images(self, context, **params):
def _get_images(self, context, filters, **params):
"""
Get images, wrapping in exception if necessary.
"""
# NOTE(markwash): for backwards compatibility, is_public=True for
# admins actually means "treat me as if I'm not an admin and show me
# all my images"
if context.is_admin and filters.get('is_public') is True:
context.is_admin = False
del filters['is_public']
try:
return self.db_api.image_get_all(context, **params)
return self.db_api.image_get_all(context, filters=filters,
**params)
except exception.NotFound, e:
msg = _("Invalid marker. Image could not be found.")
raise exc.HTTPBadRequest(explanation=msg)
@ -152,8 +159,6 @@ class Controller(object):
if req.context.is_admin:
# Only admin gets to look for non-public images
filters['is_public'] = self._get_is_public(req)
else:
filters['is_public'] = True
for param in req.params:
if param in SUPPORTED_FILTERS:

View File

@ -369,10 +369,7 @@ class TestDriver(base.IsolatedUnitTest):
'status': 'queued',
'owner': TENANT2})
# NOTE(bcwaldon): the is_public=True flag indicates that you want
# to get all images that are public AND those that are owned by the
# calling context
images = self.db_api.image_get_all(ctxt1, filters={'is_public': True})
images = self.db_api.image_get_all(ctxt1)
image_ids = [image['id'] for image in images]
expected = [UUIDX, UUID3, UUID2, UUID1]
self.assertEqual(sorted(expected), sorted(image_ids))
@ -528,3 +525,151 @@ class TestDriver(base.IsolatedUnitTest):
self.assertEqual(1, len(self.db_api.image_member_find(self.context)))
member = self.db_api.image_member_delete(self.context, member['id'])
self.assertEqual(0, len(self.db_api.image_member_find(self.context)))
class TestVisibility(base.IsolatedUnitTest):
def setUp(self):
super(TestVisibility, self).setUp()
self.db_api = db_tests.get_db(self.config)
db_tests.reset_db(self.db_api)
self.setup_tenants()
self.setup_contexts()
self.fixtures = self.build_image_fixtures()
self.create_images(self.fixtures)
def setup_tenants(self):
self.admin_tenant = utils.generate_uuid()
self.tenant1 = utils.generate_uuid()
self.tenant2 = utils.generate_uuid()
def setup_contexts(self):
self.admin_context = context.RequestContext(
is_admin=True, tenant=self.admin_tenant)
self.admin_none_context = context.RequestContext(
is_admin=True, tenant=None)
self.tenant1_context = context.RequestContext(tenant=self.tenant1)
self.tenant2_context = context.RequestContext(tenant=self.tenant2)
self.none_context = context.RequestContext(tenant=None)
def build_image_fixtures(self):
fixtures = []
owners = {
'Unowned': None,
'Admin Tenant': self.admin_tenant,
'Tenant 1': self.tenant1,
'Tenant 2': self.tenant2,
}
visibilities = {'public': True, 'private': False}
for owner_label, owner in owners.items():
for visibility, is_public in visibilities.items():
fixture = {
'name': '%s, %s' % (owner_label, visibility),
'owner': owner,
'is_public': is_public,
}
fixtures.append(fixture)
return [build_image_fixture(**fixture) for fixture in fixtures]
def create_images(self, images):
for fixture in images:
self.db_api.image_create(self.admin_context, fixture)
def test_unknown_admin_sees_all(self):
images = self.db_api.image_get_all(self.admin_none_context)
self.assertEquals(len(images), 8)
def test_unknown_admin_is_public_true(self):
images = self.db_api.image_get_all(self.admin_none_context,
filters={'is_public': True})
self.assertEquals(len(images), 4)
for i in images:
self.assertTrue(i['is_public'])
def test_unknown_admin_is_public_false(self):
images = self.db_api.image_get_all(self.admin_none_context,
filters={'is_public': False})
self.assertEquals(len(images), 4)
for i in images:
self.assertFalse(i['is_public'])
def test_unknown_admin_is_public_none(self):
images = self.db_api.image_get_all(self.admin_none_context,
filters={'is_public': None})
self.assertEquals(len(images), 8)
def test_known_admin_sees_all(self):
images = self.db_api.image_get_all(self.admin_context)
self.assertEquals(len(images), 8)
def test_known_admin_is_public_true(self):
images = self.db_api.image_get_all(self.admin_context,
filters={'is_public': True})
self.assertEquals(len(images), 4)
for i in images:
self.assertTrue(i['is_public'])
def test_known_admin_is_public_false(self):
images = self.db_api.image_get_all(self.admin_context,
filters={'is_public': False})
self.assertEquals(len(images), 4)
for i in images:
self.assertFalse(i['is_public'])
def test_known_admin_is_public_none(self):
images = self.db_api.image_get_all(self.admin_context,
filters={'is_public': None})
self.assertEquals(len(images), 8)
def test_what_unknown_user_sees(self):
images = self.db_api.image_get_all(self.none_context)
self.assertEquals(len(images), 4)
for i in images:
self.assertTrue(i['is_public'])
def test_unknown_user_is_public_true(self):
images = self.db_api.image_get_all(self.none_context,
filters={'is_public': True})
self.assertEquals(len(images), 4)
for i in images:
self.assertTrue(i['is_public'])
def test_unknown_user_is_public_false(self):
images = self.db_api.image_get_all(self.none_context,
filters={'is_public': False})
self.assertEquals(len(images), 0)
def test_unknown_user_is_public_none(self):
images = self.db_api.image_get_all(self.none_context,
filters={'is_public': None})
self.assertEquals(len(images), 4)
for i in images:
self.assertTrue(i['is_public'])
def test_what_tenant1_sees(self):
images = self.db_api.image_get_all(self.tenant1_context)
self.assertEquals(len(images), 5)
for i in images:
if not i['is_public']:
self.assertEquals(i['owner'], self.tenant1)
def test_tenant1_is_public_true(self):
images = self.db_api.image_get_all(self.tenant1_context,
filters={'is_public': True})
self.assertEquals(len(images), 4)
for i in images:
self.assertTrue(i['is_public'])
def test_tenant1_is_public_false(self):
images = self.db_api.image_get_all(self.tenant1_context,
filters={'is_public': False})
self.assertEquals(len(images), 1)
self.assertFalse(images[0]['is_public'])
self.assertEquals(images[0]['owner'], self.tenant1)
def test_tenant1_is_public_none(self):
images = self.db_api.image_get_all(self.tenant1_context,
filters={'is_public': None})
self.assertEquals(len(images), 5)
for i in images:
if not i['is_public']:
self.assertEquals(i['owner'], self.tenant1)

View File

@ -490,18 +490,18 @@ class TestImages(functional.FunctionalTest):
self.assertTrue(image['visibility'] == 'public'
or 'tenant1' in image['name'])
# 2. Known user, visibility=public, sees public and their images
# 2. Known user, visibility=public, sees all public images
images = list_images('tenant1', visibility='public')
self.assertEquals(len(images), 5)
for image in images:
self.assertTrue(image['visibility'] == 'public'
or 'tenant1' in image['name'])
# 3. Known user, visibility=private, sees all private images ?
images = list_images('tenant1', visibility='private')
self.assertEquals(len(images), 4)
for image in images:
self.assertEquals(image['visibility'], 'private')
self.assertEquals(image['visibility'], 'public')
# 3. Known user, visibility=private, sees only their private image
images = list_images('tenant1', visibility='private')
self.assertEquals(len(images), 1)
image = images[0]
self.assertEquals(image['visibility'], 'private')
self.assertTrue('tenant1' in image['name'])
# 4. Unknown user sees only public images
images = list_images('none')
@ -515,17 +515,13 @@ class TestImages(functional.FunctionalTest):
for image in images:
self.assertEquals(image['visibility'], 'public')
# 6. Unknown user, visibility=private, sees all private images ?
# 6. Unknown user, visibility=private, sees no images
images = list_images('none', visibility='private')
self.assertEquals(len(images), 4)
for image in images:
self.assertEquals(image['visibility'], 'private')
self.assertEquals(len(images), 0)
# 7. Unknown admin sees only public images
# 7. Unknown admin sees all images
images = list_images('none', role='admin')
self.assertEquals(len(images), 4)
for image in images:
self.assertEquals(image['visibility'], 'public')
self.assertEquals(len(images), 8)
# 8. Unknown admin, visibility=public, shows only public images
images = list_images('none', role='admin', visibility='public')
@ -539,19 +535,15 @@ class TestImages(functional.FunctionalTest):
for image in images:
self.assertEquals(image['visibility'], 'private')
# 10. Known admin sees public and their own images
# 10. Known admin sees all images
images = list_images('admin', role='admin')
self.assertEquals(len(images), 5)
for image in images:
self.assertTrue(image['visibility'] == 'public'
or 'admin' in image['name'])
self.assertEquals(len(images), 8)
# 11. Known admin, visibility=public, sees all public and their images
# 11. Known admin, visibility=public, sees all public images
images = list_images('admin', role='admin', visibility='public')
self.assertEquals(len(images), 5)
self.assertEquals(len(images), 4)
for image in images:
self.assertTrue(image['visibility'] == 'public'
or 'admin' in image['name'])
self.assertEqual(image['visibility'], 'public')
# 12. Known admin, visibility=private, sees all private images
images = list_images('admin', role='admin', visibility='private')

View File

@ -200,7 +200,7 @@ class TestImagesController(test_utils.BaseTestCase):
image = _fixture(utils.generate_uuid(), is_public=False, owner=TENANT3)
self.db.image_create(None, image)
path = '/images?visibility=private'
request = unit_test_utils.get_fake_request(path)
request = unit_test_utils.get_fake_request(path, is_admin=True)
output = self.controller.index(request, filters={'is_public': False})
self.assertEqual(2, len(output['images']))

View File

@ -18,7 +18,7 @@ downloadcache = ~/cache/pip
[testenv:pep8]
deps = pep8==1.3.3
commands = pep8 --ignore=E125,E126,E711 --repeat --show-source --exclude=.venv,.tox,dist,doc,openstack . bin/glance-control
commands = pep8 --ignore=E125,E126,E711,E712 --repeat --show-source --exclude=.venv,.tox,dist,doc,openstack . bin/glance-control
[testenv:cover]
setenv = NOSE_WITH_COVERAGE=1