Restrict users from downloading protected image

Added new rule in policy.json and applied that rule to
'download_image' policy.

For example,
"restricted": "not ('test_key':(test_key)s and role:_member_)"
"download_image": "role:admin or rule:restricted"

So if 'download_image' policy is enforced then in above case only admin or
user who satisfies rule 'restricted' will able to download image. Other users
will not be able to download the image and will get 403 Forbidden response.

In addition, delete property access should be restricted for other users
so that they will not be able to delete the property of the image.

[test_key]
create = admin,member
read = admin,member,_member_
update = admin,member
delete = admin,member

Added new method to create dictionary-like mashup of image core and custom
properties.
Modified v1 and v2 api to add download restriction.
Modified logic of caching to restrict download for v1 and v2 api.

DocImpact:
Need to add new rule in policy.json
"restricted": "not ('test_key':%(test_key)s and role:_member_)"

blueprint: restrict-downloading-images-protected-properties

Change-Id: I05bad0441952150bd15b831ac1b1a0bb9ae79c74
This commit is contained in:
Abhishek Kekane 2014-06-04 13:55:06 +00:00
parent 516dbc346c
commit 0656386e99
10 changed files with 1028 additions and 93 deletions

View File

@ -67,7 +67,7 @@ class CacheFilter(wsgi.Middleware):
"""
# NOTE: admins can see image metadata in the v1 API, but shouldn't
# be able to download the actual image data.
if image_meta['deleted']:
if image_meta['status'] == 'deleted' and image_meta['deleted']:
raise exception.NotFound()
if not image_meta['size']:
@ -95,13 +95,45 @@ class CacheFilter(wsgi.Middleware):
else:
return (version, method, image_id)
def _enforce(self, req, action):
def _enforce(self, req, action, target=None):
"""Authorize an action against our policies"""
if target is None:
target = {}
try:
self.policy.enforce(req.context, action, {})
self.policy.enforce(req.context, action, target)
except exception.Forbidden as e:
raise webob.exc.HTTPForbidden(explanation=e.msg, request=req)
def _get_v1_image_metadata(self, request, image_id):
"""
Retrieves image metadata using registry for v1 api and creates
dictionary-like mash-up of image core and custom properties.
"""
try:
image_metadata = registry.get_image_metadata(request.context,
image_id)
return utils.create_mashup_dict(image_metadata)
except exception.NotFound as e:
LOG.debug("No metadata found for image '%s'" % image_id)
raise webob.exc.HTTPNotFound(explanation=e.msg, request=request)
def _get_v2_image_metadata(self, request, image_id):
"""
Retrieves image and for v2 api and creates adapter like object
to access image core or custom properties on request.
"""
db_api = glance.db.get_api()
image_repo = glance.db.ImageRepo(request.context, db_api)
try:
image = image_repo.get(image_id)
# Storing image object in request as it is required in
# _process_v2_request call.
request.environ['api.cache.image'] = image
return policy.ImageTarget(image)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.msg, request=request)
def process_request(self, request):
"""
For requests for an image file, we check the local image
@ -116,14 +148,15 @@ class CacheFilter(wsgi.Middleware):
# Trying to unpack None raises this exception
return None
self._stash_request_info(request, image_id, method)
self._stash_request_info(request, image_id, method, version)
if request.method != 'GET' or not self.cache.is_cached(image_id):
return None
method = getattr(self, '_get_%s_image_metadata' % version)
image_metadata = method(request, image_id)
try:
self._enforce(request, 'download_image')
except webob.exc.HTTPForbidden:
self._enforce(request, 'download_image', target=image_metadata)
except exception.Forbidden:
return None
LOG.debug("Cache hit for image '%s'", image_id)
@ -131,7 +164,7 @@ class CacheFilter(wsgi.Middleware):
method = getattr(self, '_process_%s_request' % version)
try:
return method(request, image_id, image_iterator)
return method(request, image_id, image_iterator, image_metadata)
except exception.NotFound:
msg = _LE("Image cache contained image file for image '%s', "
"however the registry did not contain metadata for "
@ -140,29 +173,31 @@ class CacheFilter(wsgi.Middleware):
self.cache.delete_cached_image(image_id)
@staticmethod
def _stash_request_info(request, image_id, method):
def _stash_request_info(request, image_id, method, version):
"""
Preserve the image id and request method for later retrieval
Preserve the image id, version and request method for later retrieval
"""
request.environ['api.cache.image_id'] = image_id
request.environ['api.cache.method'] = method
request.environ['api.cache.version'] = version
@staticmethod
def _fetch_request_info(request):
"""
Preserve the cached image id for consumption by the
Preserve the cached image id, version for consumption by the
process_response method of this middleware
"""
try:
image_id = request.environ['api.cache.image_id']
method = request.environ['api.cache.method']
version = request.environ['api.cache.version']
except KeyError:
return None
else:
return (image_id, method)
return (image_id, method, version)
def _process_v1_request(self, request, image_id, image_iterator):
image_meta = registry.get_image_metadata(request.context, image_id)
def _process_v1_request(self, request, image_id, image_iterator,
image_meta):
# Don't display location
if 'location' in image_meta:
del image_meta['location']
@ -176,16 +211,14 @@ class CacheFilter(wsgi.Middleware):
}
return self.serializer.show(response, raw_response)
def _process_v2_request(self, request, image_id, image_iterator):
def _process_v2_request(self, request, image_id, image_iterator,
image_meta):
# We do some contortions to get the image_metadata so
# that we can provide it to 'size_checked_iter' which
# will generate a notification.
# TODO(mclaren): Make notification happen more
# naturally once caching is part of the domain model.
db_api = glance.db.get_api()
image_repo = glance.db.ImageRepo(request.context, db_api)
image = image_repo.get(image_id)
image_meta = glance.notifier.format_image_notification(image)
image = request.environ['api.cache.image']
self._verify_metadata(image_meta)
response = webob.Response(request=request)
response.app_iter = size_checked_iter(response, image_meta,
@ -217,7 +250,8 @@ class CacheFilter(wsgi.Middleware):
return resp
try:
(image_id, method) = self._fetch_request_info(resp.request)
(image_id, method, version) = self._fetch_request_info(
resp.request)
except TypeError:
return resp
@ -235,17 +269,16 @@ class CacheFilter(wsgi.Middleware):
# Nothing to do here, move along
return resp
else:
return process_response_method(resp, image_id)
return process_response_method(resp, image_id, version=version)
def _process_DELETE_response(self, resp, image_id):
def _process_DELETE_response(self, resp, image_id, version=None):
if self.cache.is_cached(image_id):
LOG.debug("Removing image %s from cache", image_id)
self.cache.delete_cached_image(image_id)
return resp
def _process_GET_response(self, resp, image_id):
def _process_GET_response(self, resp, image_id, version=None):
image_checksum = resp.headers.get('Content-MD5')
if not image_checksum:
# API V1 stores the checksum in a different header:
image_checksum = resp.headers.get('x-image-meta-checksum')
@ -253,12 +286,17 @@ class CacheFilter(wsgi.Middleware):
if not image_checksum:
LOG.error(_LE("Checksum header is missing."))
# fetch image_meta on the basis of version
image_metadata = None
if version:
method = getattr(self, '_get_%s_image_metadata' % version)
image_metadata = method(resp.request, image_id)
# NOTE(zhiyan): image_cache return a generator object and set to
# response.app_iter, it will be called by eventlet.wsgi later.
# So we need enforce policy firstly but do it by application
# since eventlet.wsgi could not catch webob.exc.HTTPForbidden and
# return 403 error to client then.
self._enforce(resp.request, 'download_image')
self._enforce(resp.request, 'download_image', target=image_metadata)
resp.app_iter = self.cache.get_caching_iter(image_id, image_checksum,
resp.app_iter)

View File

@ -230,7 +230,9 @@ class ImageProxy(glance.domain.proxy.Image):
return self.image.delete()
def get_data(self, *args, **kwargs):
self.policy.enforce(self.context, 'download_image', {})
target = ImageTarget(self.image)
self.policy.enforce(self.context, 'download_image',
target=target)
return self.image.get_data(*args, **kwargs)
def set_data(self, *args, **kwargs):
@ -416,3 +418,31 @@ class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
task_factory,
task_proxy_class=TaskProxy,
task_proxy_kwargs=proxy_kwargs)
class ImageTarget(object):
def __init__(self, image):
"""
Initialize the object
:param image: Image object
"""
self.image = image
def __getitem__(self, key):
"""
Returns the value of 'key' from the image if image has that attribute
else tries to retrieve value from the extra_properties of image.
:param key: value to retrieve
"""
# Need to change the key 'id' to 'image_id' as Image object has
# attribute as 'image_id' in case of V2.
if key == 'id':
key = 'image_id'
if hasattr(self.image, key):
return getattr(self.image, key)
else:
return self.image.extra_properties[key]

View File

@ -148,10 +148,12 @@ class Controller(controller.BaseController):
else:
self.prop_enforcer = None
def _enforce(self, req, action):
def _enforce(self, req, action, target=None):
"""Authorize an action against our policies"""
if target is None:
target = {}
try:
self.policy.enforce(req.context, action, {})
self.policy.enforce(req.context, action, target)
except exception.Forbidden:
raise HTTPForbidden()
@ -466,9 +468,20 @@ class Controller(controller.BaseController):
:raises HTTPNotFound if image is not available to user
"""
self._enforce(req, 'get_image')
self._enforce(req, 'download_image')
image_meta = self.get_active_image_meta_or_404(req, id)
try:
image_meta = self.get_active_image_meta_or_404(req, id)
except HTTPNotFound:
# provision for backward-compatibility breaking issue
# catch the 404 exception and raise it after enforcing
# the policy
with excutils.save_and_reraise_exception():
self._enforce(req, 'download_image')
else:
target = utils.create_mashup_dict(image_meta)
self._enforce(req, 'download_image', target=target)
self._enforce_read_protected_props(image_meta, req)

View File

@ -270,6 +270,27 @@ def get_image_meta_from_headers(response):
return result
def create_mashup_dict(image_meta):
"""
Returns a dictionary-like mashup of the image core properties
and the image custom properties from given image metadata.
:param image_meta: metadata of image with core and custom properties
"""
def get_items():
for key, value in six.iteritems(image_meta):
if isinstance(value, dict):
for subkey, subvalue in six.iteritems(
create_mashup_dict(value)):
if subkey not in image_meta:
yield subkey, subvalue
else:
yield key, value
return dict(get_items())
def safe_mkdirs(path):
try:
os.makedirs(path)

View File

@ -557,3 +557,159 @@ class TestApi(functional.FunctionalTest):
self.assertEqual('GET', response.get('allow'))
self.stop_servers()
def test_download_non_exists_image_raises_http_forbidden(self):
"""
We test the following sequential series of actions:
0. POST /images with public image named Image1
and no custom properties
- Verify 201 returned
1. HEAD image
- Verify HTTP headers have correct information we just added
2. GET image
- Verify all information on image we just added is correct
3. DELETE image1
- Delete the newly added image
4. GET image
- Verify that 403 HTTPForbidden exception is raised prior to
404 HTTPNotFound
"""
self.cleanup()
self.start_servers(**self.__dict__.copy())
image_data = "*" * FIVE_KB
headers = minimal_headers('Image1')
path = "http://%s:%d/v1/images" % ("127.0.0.1", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'POST', headers=headers,
body=image_data)
self.assertEqual(response.status, 201)
data = jsonutils.loads(content)
image_id = data['image']['id']
self.assertEqual(data['image']['checksum'],
hashlib.md5(image_data).hexdigest())
self.assertEqual(data['image']['size'], FIVE_KB)
self.assertEqual(data['image']['name'], "Image1")
self.assertEqual(data['image']['is_public'], True)
# 1. HEAD image
# Verify image found now
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'HEAD')
self.assertEqual(response.status, 200)
self.assertEqual(response['x-image-meta-name'], "Image1")
# 2. GET /images
# Verify one public image
path = "http://%s:%d/v1/images" % ("127.0.0.1", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
expected_result = {"images": [
{"container_format": "ovf",
"disk_format": "raw",
"id": image_id,
"name": "Image1",
"checksum": "c2e5db72bd7fd153f53ede5da5a06de3",
"size": 5120}]}
self.assertEqual(jsonutils.loads(content), expected_result)
# 3. DELETE image1
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'DELETE')
self.assertEqual(response.status, 200)
# 4. GET image
# Verify that 403 HTTPForbidden exception is raised prior to
# 404 HTTPNotFound
rules = {"download_image": '!'}
self.set_policy_rules(rules)
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 403)
self.stop_servers()
def test_download_non_exists_image_raises_http_not_found(self):
"""
We test the following sequential series of actions:
0. POST /images with public image named Image1
and no custom properties
- Verify 201 returned
1. HEAD image
- Verify HTTP headers have correct information we just added
2. GET image
- Verify all information on image we just added is correct
3. DELETE image1
- Delete the newly added image
4. GET image
- Verify that 404 HTTPNotFound exception is raised
"""
self.cleanup()
self.start_servers(**self.__dict__.copy())
image_data = "*" * FIVE_KB
headers = minimal_headers('Image1')
path = "http://%s:%d/v1/images" % ("127.0.0.1", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'POST', headers=headers,
body=image_data)
self.assertEqual(response.status, 201)
data = jsonutils.loads(content)
image_id = data['image']['id']
self.assertEqual(data['image']['checksum'],
hashlib.md5(image_data).hexdigest())
self.assertEqual(data['image']['size'], FIVE_KB)
self.assertEqual(data['image']['name'], "Image1")
self.assertEqual(data['image']['is_public'], True)
# 1. HEAD image
# Verify image found now
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'HEAD')
self.assertEqual(response.status, 200)
self.assertEqual(response['x-image-meta-name'], "Image1")
# 2. GET /images
# Verify one public image
path = "http://%s:%d/v1/images" % ("127.0.0.1", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
expected_result = {"images": [
{"container_format": "ovf",
"disk_format": "raw",
"id": image_id,
"name": "Image1",
"checksum": "c2e5db72bd7fd153f53ede5da5a06de3",
"size": 5120}]}
self.assertEqual(jsonutils.loads(content), expected_result)
# 3. DELETE image1
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'DELETE')
self.assertEqual(response.status, 200)
# 4. GET image
# Verify that 404 HTTPNotFound exception is raised
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 404)
self.stop_servers()

View File

@ -520,6 +520,138 @@ class TestImages(functional.FunctionalTest):
self.stop_servers()
def test_download_image_not_allowed_using_restricted_policy(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
# Create an image
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'member'})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Returned image entity
image = jsonutils.loads(response.text)
image_id = image['id']
expected_image = {
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
'min_disk': 0,
'min_ram': 0,
'schema': '/v2/schemas/image',
}
for key, value in six.iteritems(expected_image):
self.assertEqual(image[key], value, key)
# Upload data to image
path = self._url('/v2/images/%s/file' % image_id)
headers = self._headers({'Content-Type': 'application/octet-stream'})
response = requests.put(path, headers=headers, data='ZZZZZ')
self.assertEqual(204, response.status_code)
# Get an image should fail
path = self._url('/v2/images/%s/file' % image_id)
headers = self._headers({'Content-Type': 'application/octet-stream',
'X-Roles': '_member_'})
response = requests.get(path, headers=headers)
self.assertEqual(403, response.status_code)
# Image Deletion should work
path = self._url('/v2/images/%s' % image_id)
response = requests.delete(path, headers=self._headers())
self.assertEqual(204, response.status_code)
# This image should be no longer be directly accessible
path = self._url('/v2/images/%s' % image_id)
response = requests.get(path, headers=self._headers())
self.assertEqual(404, response.status_code)
self.stop_servers()
def test_download_image_allowed_using_restricted_policy(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
# Create an image
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'member'})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Returned image entity
image = jsonutils.loads(response.text)
image_id = image['id']
expected_image = {
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
'min_disk': 0,
'min_ram': 0,
'schema': '/v2/schemas/image',
}
for key, value in six.iteritems(expected_image):
self.assertEqual(image[key], value, key)
# Upload data to image
path = self._url('/v2/images/%s/file' % image_id)
headers = self._headers({'Content-Type': 'application/octet-stream'})
response = requests.put(path, headers=headers, data='ZZZZZ')
self.assertEqual(204, response.status_code)
# Get an image should be allowed
path = self._url('/v2/images/%s/file' % image_id)
headers = self._headers({'Content-Type': 'application/octet-stream',
'X-Roles': 'member'})
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code)
# Image Deletion should work
path = self._url('/v2/images/%s' % image_id)
response = requests.delete(path, headers=self._headers())
self.assertEqual(204, response.status_code)
# This image should be no longer be directly accessible
path = self._url('/v2/images/%s' % image_id)
response = requests.get(path, headers=self._headers())
self.assertEqual(404, response.status_code)
self.stop_servers()
def test_image_size_cap(self):
self.api_server.image_size_cap = 128
self.start_servers(**self.__dict__.copy())

View File

@ -145,6 +145,61 @@ class TestUtils(test_utils.BaseTestCase):
self.assertEqual({'x-image-meta-property-test': u'test'},
actual_test2)
def test_create_mashup_dict_with_different_core_custom_properties(self):
image_meta = {
'id': 'test-123',
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': True,
'updated_at': '',
'properties': {'test_key': 'test_1234'},
}
mashup_dict = utils.create_mashup_dict(image_meta)
self.assertFalse('properties' in mashup_dict)
self.assertEqual(image_meta['properties']['test_key'],
mashup_dict['test_key'])
def test_create_mashup_dict_with_same_core_custom_properties(self):
image_meta = {
'id': 'test-123',
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': True,
'updated_at': '',
'properties': {'min_ram': '2048M'},
}
mashup_dict = utils.create_mashup_dict(image_meta)
self.assertFalse('properties' in mashup_dict)
self.assertNotEqual(image_meta['properties']['min_ram'],
mashup_dict['min_ram'])
self.assertEqual(image_meta['min_ram'], mashup_dict['min_ram'])
def test_create_pretty_table(self):
class MyPrettyTable(utils.PrettyTable):
def __init__(self):

View File

@ -19,12 +19,21 @@ import webob
import glance.api.middleware.cache
from glance.common import exception
from glance import context
import glance.db.sqlalchemy.api as db
import glance.registry.client.v1.api as registry
from glance.tests.unit import base
from glance.tests.unit import utils as unit_test_utils
class ImageStub(object):
def __init__(self, image_id, extra_properties={}, visibility='private'):
self.image_id = image_id
self.visibility = visibility
self.status = 'active'
self.extra_properties = extra_properties
self.checksum = 'c1234'
self.size = 123456789
class TestCacheMiddlewareURLMatching(testtools.TestCase):
def test_v1_no_match_detail(self):
req = webob.Request.blank('/v1/images/detail')
@ -64,16 +73,20 @@ class TestCacheMiddlewareRequestStashCacheInfo(testtools.TestCase):
self.middleware = glance.api.middleware.cache.CacheFilter
def test_stash_cache_request_info(self):
self.middleware._stash_request_info(self.request, 'asdf', 'GET')
self.middleware._stash_request_info(self.request, 'asdf', 'GET', 'v2')
self.assertEqual(self.request.environ['api.cache.image_id'], 'asdf')
self.assertEqual(self.request.environ['api.cache.method'], 'GET')
self.assertEqual(self.request.environ['api.cache.version'], 'v2')
def test_fetch_cache_request_info(self):
self.request.environ['api.cache.image_id'] = 'asdf'
self.request.environ['api.cache.method'] = 'GET'
(image_id, method) = self.middleware._fetch_request_info(self.request)
self.request.environ['api.cache.version'] = 'v2'
(image_id, method, version) = \
self.middleware._fetch_request_info(self.request)
self.assertEqual('asdf', image_id)
self.assertEqual('GET', method)
self.assertEqual('v2', version)
def test_fetch_cache_request_info_unset(self):
out = self.middleware._fetch_request_info(self.request)
@ -159,35 +172,56 @@ class TestCacheMiddlewareProcessRequest(base.IsolatedUnitTest):
Test for determining that when an admin tries to download a deleted
image it returns 404 Not Found error.
"""
def fake_get_image_metadata(context, image_id):
return {'deleted': True}
def dummy_img_iterator():
for i in range(3):
yield i
image_id = 'test1'
image_meta = {
'id': image_id,
'name': 'fake_image',
'status': 'deleted',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': True,
'updated_at': '',
'properties': {},
}
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext()
cache_filter = ProcessRequestTestCacheFilter()
self.stubs.Set(registry, 'get_image_metadata',
fake_get_image_metadata)
self.assertRaises(exception.NotFound, cache_filter._process_v1_request,
request, image_id, dummy_img_iterator)
request, image_id, dummy_img_iterator, image_meta)
def test_process_v1_request_for_deleted_but_cached_image(self):
"""
Test for determining image is deleted from cache when it is not found
in Glance Registry.
"""
def fake_process_v1_request(request, image_id, image_iterator):
def fake_process_v1_request(request, image_id, image_iterator,
image_meta):
raise exception.NotFound()
def fake_get_v1_image_metadata(request, image_id):
return {'properties': {}}
image_id = 'test1'
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext()
cache_filter = ProcessRequestTestCacheFilter()
self.stubs.Set(cache_filter, '_get_v1_image_metadata',
fake_get_v1_image_metadata)
self.stubs.Set(cache_filter, '_process_v1_request',
fake_process_v1_request)
cache_filter.process_request(request)
@ -195,21 +229,36 @@ class TestCacheMiddlewareProcessRequest(base.IsolatedUnitTest):
def test_v1_process_request_image_fetch(self):
def fake_get_image_metadata(context, image_id):
return {'is_public': True, 'deleted': False, 'size': '20'}
def dummy_img_iterator():
for i in range(3):
yield i
image_id = 'test1'
image_meta = {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'properties': {},
}
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext()
cache_filter = ProcessRequestTestCacheFilter()
self.stubs.Set(registry, 'get_image_metadata',
fake_get_image_metadata)
actual = cache_filter._process_v1_request(
request, image_id, dummy_img_iterator)
request, image_id, dummy_img_iterator, image_meta)
self.assertTrue(actual)
def test_v1_remove_location_image_fetch(self):
@ -218,31 +267,44 @@ class TestCacheMiddlewareProcessRequest(base.IsolatedUnitTest):
def show(self, response, raw_response):
return 'location_data' in raw_response['image_meta']
def fake_get_image_metadata(context, image_id):
return {'location_data': {'url': "file:///some/path",
'metadata': {}},
'is_public': True, 'deleted': False, 'size': '20'}
def dummy_img_iterator():
for i in range(3):
yield i
image_id = 'test1'
image_meta = {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'properties': {},
}
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext()
cache_filter = ProcessRequestTestCacheFilter()
cache_filter.serializer = CheckNoLocationDataSerializer()
self.stubs.Set(registry, 'get_image_metadata',
fake_get_image_metadata)
actual = cache_filter._process_v1_request(
request, image_id, dummy_img_iterator)
request, image_id, dummy_img_iterator, image_meta)
self.assertFalse(actual)
def test_verify_metadata_deleted_image(self):
"""
Test verify_metadata raises exception.NotFound for a deleted image
"""
image_meta = {'is_public': True, 'deleted': True}
image_meta = {'status': 'deleted', 'is_public': True, 'deleted': True}
cache_filter = ProcessRequestTestCacheFilter()
self.assertRaises(exception.NotFound,
cache_filter._verify_metadata, image_meta)
@ -258,7 +320,8 @@ class TestCacheMiddlewareProcessRequest(base.IsolatedUnitTest):
return image_size
image_id = 'test1'
image_meta = {'size': 0, 'deleted': False, 'id': image_id}
image_meta = {'size': 0, 'deleted': False, 'id': image_id,
'status': 'active'}
cache_filter = ProcessRequestTestCacheFilter()
self.stubs.Set(cache_filter.cache, 'get_image_size',
fake_get_image_size)
@ -270,45 +333,39 @@ class TestCacheMiddlewareProcessRequest(base.IsolatedUnitTest):
for i in range(3):
yield i
def fake_image_get(self, image_id):
return {
'id': 'test1',
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c352f4e7121c6eae958bc1570324f17e',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'properties': {},
}
def fake_image_tag_get_all(context, image_id, session=None):
return None
image_id = 'test1'
request = webob.Request.blank('/v2/images/test1/file')
request.context = context.RequestContext()
request.environ['api.cache.image'] = ImageStub(image_id)
self.stubs.Set(db, 'image_get', fake_image_get)
self.stubs.Set(db, 'image_tag_get_all', fake_image_tag_get_all)
image_meta = {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'properties': {},
}
cache_filter = ProcessRequestTestCacheFilter()
response = cache_filter._process_v2_request(
request, image_id, dummy_img_iterator)
request, image_id, dummy_img_iterator, image_meta)
self.assertEqual(response.headers['Content-Type'],
'application/octet-stream')
self.assertEqual(response.headers['Content-MD5'],
'c352f4e7121c6eae958bc1570324f17e')
'c1234')
self.assertEqual(response.headers['Content-Length'],
'123456789')
@ -317,17 +374,196 @@ class TestCacheMiddlewareProcessRequest(base.IsolatedUnitTest):
Test for cache middleware skip processing when request
context has not 'download_image' role.
"""
def fake_get_v1_image_metadata(*args, **kwargs):
return {'properties': {}}
image_id = 'test1'
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext()
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {'download_image': '!'}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
self.assertRaises(webob.exc.HTTPForbidden,
cache_filter.process_request, request)
self.assertIsNone(cache_filter.process_request(request))
def test_v1_process_request_download_restricted(self):
"""
Test process_request for v1 api where _member_ role not able to
download the image with custom property.
"""
image_id = 'test1'
def fake_get_v1_image_metadata(*args, **kwargs):
return {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'x_test_key': 'test_1234'
}
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['_member_'])
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
self.assertRaises(webob.exc.HTTPForbidden,
cache_filter.process_request, request)
def test_v1_process_request_download_permitted(self):
"""
Test process_request for v1 api where member role able to
download the image with custom property.
"""
image_id = 'test1'
def fake_get_v1_image_metadata(*args, **kwargs):
return {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'x_test_key': 'test_1234'
}
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['member'])
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
actual = cache_filter.process_request(request)
self.assertTrue(actual)
def test_v1_process_request_image_meta_not_found(self):
"""
Test process_request for v1 api where registry raises NotFound
exception as image metadata not found.
"""
image_id = 'test1'
def fake_get_v1_image_metadata(*args, **kwargs):
raise exception.NotFound()
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['_member_'])
cache_filter = ProcessRequestTestCacheFilter()
self.stubs.Set(registry, 'get_image_metadata',
fake_get_v1_image_metadata)
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
self.assertRaises(webob.exc.HTTPNotFound,
cache_filter.process_request, request)
def test_v2_process_request_download_restricted(self):
"""
Test process_request for v2 api where _member_ role not able to
download the image with custom property.
"""
image_id = 'test1'
extra_properties = {
'x_test_key': 'test_1234'
}
def fake_get_v2_image_metadata(*args, **kwargs):
image = ImageStub(image_id, extra_properties=extra_properties)
request.environ['api.cache.image'] = image
return glance.api.policy.ImageTarget(image)
request = webob.Request.blank('/v2/images/test1/file')
request.context = context.RequestContext(roles=['_member_'])
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._get_v2_image_metadata = fake_get_v2_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
self.assertRaises(webob.exc.HTTPForbidden,
cache_filter.process_request, request)
def test_v2_process_request_download_permitted(self):
"""
Test process_request for v2 api where member role able to
download the image with custom property.
"""
image_id = 'test1'
extra_properties = {
'x_test_key': 'test_1234'
}
def fake_get_v2_image_metadata(*args, **kwargs):
image = ImageStub(image_id, extra_properties=extra_properties)
request.environ['api.cache.image'] = image
return glance.api.policy.ImageTarget(image)
request = webob.Request.blank('/v2/images/test1/file')
request.context = context.RequestContext(roles=['member'])
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._get_v2_image_metadata = fake_get_v2_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
actual = cache_filter.process_request(request)
self.assertTrue(actual)
class TestCacheMiddlewareProcessResponse(base.IsolatedUnitTest):
@ -350,10 +586,14 @@ class TestCacheMiddlewareProcessResponse(base.IsolatedUnitTest):
def test_process_response(self):
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET')
return ('test1', 'GET', 'v1')
def fake_get_v1_image_metadata(*args, **kwargs):
return {'properties': {}}
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
image_id = 'test1'
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext()
@ -368,10 +608,14 @@ class TestCacheMiddlewareProcessResponse(base.IsolatedUnitTest):
when request context has not 'download_image' role.
"""
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET')
return ('test1', 'GET', 'v1')
def fake_get_v1_image_metadata(*args, **kwargs):
return {'properties': {}}
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {'download_image': '!'}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
@ -383,3 +627,206 @@ class TestCacheMiddlewareProcessResponse(base.IsolatedUnitTest):
self.assertRaises(webob.exc.HTTPForbidden,
cache_filter.process_response, resp)
self.assertEqual([''], resp.app_iter)
def test_v1_process_response_download_restricted(self):
"""
Test process_response for v1 api where _member_ role not able to
download the image with custom property.
"""
image_id = 'test1'
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET', 'v1')
def fake_get_v1_image_metadata(*args, **kwargs):
return {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'x_test_key': 'test_1234'
}
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['_member_'])
resp = webob.Response(request=request)
self.assertRaises(webob.exc.HTTPForbidden,
cache_filter.process_response, resp)
def test_v1_process_response_download_permitted(self):
"""
Test process_response for v1 api where member role able to
download the image with custom property.
"""
image_id = 'test1'
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET', 'v1')
def fake_get_v1_image_metadata(*args, **kwargs):
return {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'x_test_key': 'test_1234'
}
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['member'])
resp = webob.Response(request=request)
actual = cache_filter.process_response(resp)
self.assertEqual(actual, resp)
def test_v1_process_response_image_meta_not_found(self):
"""
Test process_response for v1 api where registry raises NotFound
exception as image metadata not found.
"""
image_id = 'test1'
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET', 'v1')
def fake_get_v1_image_metadata(*args, **kwargs):
raise exception.NotFound()
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
self.stubs.Set(registry, 'get_image_metadata',
fake_get_v1_image_metadata)
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['_member_'])
resp = webob.Response(request=request)
self.assertRaises(webob.exc.HTTPNotFound,
cache_filter.process_response, resp)
def test_v2_process_response_download_restricted(self):
"""
Test process_response for v2 api where _member_ role not able to
download the image with custom property.
"""
image_id = 'test1'
extra_properties = {
'x_test_key': 'test_1234'
}
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET', 'v2')
def fake_get_v2_image_metadata(*args, **kwargs):
image = ImageStub(image_id, extra_properties=extra_properties)
request.environ['api.cache.image'] = image
return glance.api.policy.ImageTarget(image)
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v2_image_metadata = fake_get_v2_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
request = webob.Request.blank('/v2/images/test1/file')
request.context = context.RequestContext(roles=['_member_'])
resp = webob.Response(request=request)
self.assertRaises(webob.exc.HTTPForbidden,
cache_filter.process_response, resp)
def test_v2_process_response_download_permitted(self):
"""
Test process_response for v2 api where member role able to
download the image with custom property.
"""
image_id = 'test1'
extra_properties = {
'x_test_key': 'test_1234'
}
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET', 'v2')
def fake_get_v2_image_metadata(*args, **kwargs):
image = ImageStub(image_id, extra_properties=extra_properties)
request.environ['api.cache.image'] = image
return glance.api.policy.ImageTarget(image)
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v2_image_metadata = fake_get_v2_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
request = webob.Request.blank('/v2/images/test1/file')
request.context = context.RequestContext(roles=['member'])
resp = webob.Response(request=request)
actual = cache_filter.process_response(resp)
self.assertEqual(actual, resp)

View File

@ -44,10 +44,19 @@ class ImageRepoStub(object):
class ImageStub(object):
def __init__(self, image_id, visibility='private'):
def __init__(self, image_id=None, visibility='private',
container_format='bear', disk_format='raw',
status='active', extra_properties=None):
if extra_properties is None:
extra_properties = {}
self.image_id = image_id
self.visibility = visibility
self.status = 'active'
self.container_format = container_format
self.disk_format = disk_format
self.status = status
self.extra_properties = extra_properties
def delete(self):
self.status = 'deleted'
@ -294,11 +303,19 @@ class TestImagePolicy(test_utils.BaseTestCase):
image_factory.new_image(visibility='public')
self.policy.enforce.assert_called_once_with({}, "publicize_image", {})
def test_image_get_data(self):
def test_image_get_data_policy_enforced_with_target(self):
extra_properties = {
'test_key': 'test_4321'
}
image_stub = ImageStub(UUID1, extra_properties=extra_properties)
image = glance.api.policy.ImageProxy(image_stub, {}, self.policy)
self.policy.enforce.side_effect = exception.Forbidden
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
glance.api.policy.ImageTarget = mock.Mock()
target = glance.api.policy.ImageTarget(image)
self.assertRaises(exception.Forbidden, image.get_data)
self.policy.enforce.assert_called_once_with({}, "download_image", {})
self.policy.enforce.assert_called_once_with({}, "download_image",
target=target)
def test_image_set_data(self):
self.policy.enforce.side_effect = exception.Forbidden

View File

@ -2440,6 +2440,32 @@ class TestGlanceAPI(base.IsolatedUnitTest):
res = req.get_response(self.api)
self.assertEqual(res.status_int, 403)
def test_show_image_restricted_download_for_core_property(self):
rules = {
"restricted":
"not ('1024M':%(min_ram)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
req = webob.Request.blank("/images/%s" % UUID2)
req.headers['X-Auth-Token'] = 'user:tenant:_member_'
req.headers['min_ram'] = '1024M'
res = req.get_response(self.api)
self.assertEqual(res.status_int, 403)
def test_show_image_restricted_download_for_custom_property(self):
rules = {
"restricted":
"not ('test_1234'==%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
req = webob.Request.blank("/images/%s" % UUID2)
req.headers['X-Auth-Token'] = 'user:tenant:_member_'
req.headers['x_test_key'] = 'test_1234'
res = req.get_response(self.api)
self.assertEqual(res.status_int, 403)
def test_delete_image(self):
req = webob.Request.blank("/images/%s" % UUID2)
req.method = 'DELETE'