Merge "Restrict users from downloading protected image"

This commit is contained in:
Jenkins 2014-08-30 12:38:00 +00:00 committed by Gerrit Code Review
commit b9107a7cce
10 changed files with 1028 additions and 93 deletions

View File

@ -67,7 +67,7 @@ class CacheFilter(wsgi.Middleware):
"""
# NOTE: admins can see image metadata in the v1 API, but shouldn't
# be able to download the actual image data.
if image_meta['deleted']:
if image_meta['status'] == 'deleted' and image_meta['deleted']:
raise exception.NotFound()
if not image_meta['size']:
@ -95,13 +95,45 @@ class CacheFilter(wsgi.Middleware):
else:
return (version, method, image_id)
def _enforce(self, req, action):
def _enforce(self, req, action, target=None):
"""Authorize an action against our policies"""
if target is None:
target = {}
try:
self.policy.enforce(req.context, action, {})
self.policy.enforce(req.context, action, target)
except exception.Forbidden as e:
raise webob.exc.HTTPForbidden(explanation=e.msg, request=req)
def _get_v1_image_metadata(self, request, image_id):
"""
Retrieves image metadata using registry for v1 api and creates
dictionary-like mash-up of image core and custom properties.
"""
try:
image_metadata = registry.get_image_metadata(request.context,
image_id)
return utils.create_mashup_dict(image_metadata)
except exception.NotFound as e:
LOG.debug("No metadata found for image '%s'" % image_id)
raise webob.exc.HTTPNotFound(explanation=e.msg, request=request)
def _get_v2_image_metadata(self, request, image_id):
"""
Retrieves image and for v2 api and creates adapter like object
to access image core or custom properties on request.
"""
db_api = glance.db.get_api()
image_repo = glance.db.ImageRepo(request.context, db_api)
try:
image = image_repo.get(image_id)
# Storing image object in request as it is required in
# _process_v2_request call.
request.environ['api.cache.image'] = image
return policy.ImageTarget(image)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.msg, request=request)
def process_request(self, request):
"""
For requests for an image file, we check the local image
@ -116,14 +148,15 @@ class CacheFilter(wsgi.Middleware):
# Trying to unpack None raises this exception
return None
self._stash_request_info(request, image_id, method)
self._stash_request_info(request, image_id, method, version)
if request.method != 'GET' or not self.cache.is_cached(image_id):
return None
method = getattr(self, '_get_%s_image_metadata' % version)
image_metadata = method(request, image_id)
try:
self._enforce(request, 'download_image')
except webob.exc.HTTPForbidden:
self._enforce(request, 'download_image', target=image_metadata)
except exception.Forbidden:
return None
LOG.debug("Cache hit for image '%s'", image_id)
@ -131,7 +164,7 @@ class CacheFilter(wsgi.Middleware):
method = getattr(self, '_process_%s_request' % version)
try:
return method(request, image_id, image_iterator)
return method(request, image_id, image_iterator, image_metadata)
except exception.NotFound:
msg = _LE("Image cache contained image file for image '%s', "
"however the registry did not contain metadata for "
@ -140,29 +173,31 @@ class CacheFilter(wsgi.Middleware):
self.cache.delete_cached_image(image_id)
@staticmethod
def _stash_request_info(request, image_id, method):
def _stash_request_info(request, image_id, method, version):
"""
Preserve the image id and request method for later retrieval
Preserve the image id, version and request method for later retrieval
"""
request.environ['api.cache.image_id'] = image_id
request.environ['api.cache.method'] = method
request.environ['api.cache.version'] = version
@staticmethod
def _fetch_request_info(request):
"""
Preserve the cached image id for consumption by the
Preserve the cached image id, version for consumption by the
process_response method of this middleware
"""
try:
image_id = request.environ['api.cache.image_id']
method = request.environ['api.cache.method']
version = request.environ['api.cache.version']
except KeyError:
return None
else:
return (image_id, method)
return (image_id, method, version)
def _process_v1_request(self, request, image_id, image_iterator):
image_meta = registry.get_image_metadata(request.context, image_id)
def _process_v1_request(self, request, image_id, image_iterator,
image_meta):
# Don't display location
if 'location' in image_meta:
del image_meta['location']
@ -176,16 +211,14 @@ class CacheFilter(wsgi.Middleware):
}
return self.serializer.show(response, raw_response)
def _process_v2_request(self, request, image_id, image_iterator):
def _process_v2_request(self, request, image_id, image_iterator,
image_meta):
# We do some contortions to get the image_metadata so
# that we can provide it to 'size_checked_iter' which
# will generate a notification.
# TODO(mclaren): Make notification happen more
# naturally once caching is part of the domain model.
db_api = glance.db.get_api()
image_repo = glance.db.ImageRepo(request.context, db_api)
image = image_repo.get(image_id)
image_meta = glance.notifier.format_image_notification(image)
image = request.environ['api.cache.image']
self._verify_metadata(image_meta)
response = webob.Response(request=request)
response.app_iter = size_checked_iter(response, image_meta,
@ -217,7 +250,8 @@ class CacheFilter(wsgi.Middleware):
return resp
try:
(image_id, method) = self._fetch_request_info(resp.request)
(image_id, method, version) = self._fetch_request_info(
resp.request)
except TypeError:
return resp
@ -235,17 +269,16 @@ class CacheFilter(wsgi.Middleware):
# Nothing to do here, move along
return resp
else:
return process_response_method(resp, image_id)
return process_response_method(resp, image_id, version=version)
def _process_DELETE_response(self, resp, image_id):
def _process_DELETE_response(self, resp, image_id, version=None):
if self.cache.is_cached(image_id):
LOG.debug("Removing image %s from cache", image_id)
self.cache.delete_cached_image(image_id)
return resp
def _process_GET_response(self, resp, image_id):
def _process_GET_response(self, resp, image_id, version=None):
image_checksum = resp.headers.get('Content-MD5')
if not image_checksum:
# API V1 stores the checksum in a different header:
image_checksum = resp.headers.get('x-image-meta-checksum')
@ -253,12 +286,17 @@ class CacheFilter(wsgi.Middleware):
if not image_checksum:
LOG.error(_LE("Checksum header is missing."))
# fetch image_meta on the basis of version
image_metadata = None
if version:
method = getattr(self, '_get_%s_image_metadata' % version)
image_metadata = method(resp.request, image_id)
# NOTE(zhiyan): image_cache return a generator object and set to
# response.app_iter, it will be called by eventlet.wsgi later.
# So we need enforce policy firstly but do it by application
# since eventlet.wsgi could not catch webob.exc.HTTPForbidden and
# return 403 error to client then.
self._enforce(resp.request, 'download_image')
self._enforce(resp.request, 'download_image', target=image_metadata)
resp.app_iter = self.cache.get_caching_iter(image_id, image_checksum,
resp.app_iter)

View File

@ -230,7 +230,9 @@ class ImageProxy(glance.domain.proxy.Image):
return self.image.delete()
def get_data(self, *args, **kwargs):
self.policy.enforce(self.context, 'download_image', {})
target = ImageTarget(self.image)
self.policy.enforce(self.context, 'download_image',
target=target)
return self.image.get_data(*args, **kwargs)
def set_data(self, *args, **kwargs):
@ -416,3 +418,31 @@ class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
task_factory,
task_proxy_class=TaskProxy,
task_proxy_kwargs=proxy_kwargs)
class ImageTarget(object):
def __init__(self, image):
"""
Initialize the object
:param image: Image object
"""
self.image = image
def __getitem__(self, key):
"""
Returns the value of 'key' from the image if image has that attribute
else tries to retrieve value from the extra_properties of image.
:param key: value to retrieve
"""
# Need to change the key 'id' to 'image_id' as Image object has
# attribute as 'image_id' in case of V2.
if key == 'id':
key = 'image_id'
if hasattr(self.image, key):
return getattr(self.image, key)
else:
return self.image.extra_properties[key]

View File

@ -148,10 +148,12 @@ class Controller(controller.BaseController):
else:
self.prop_enforcer = None
def _enforce(self, req, action):
def _enforce(self, req, action, target=None):
"""Authorize an action against our policies"""
if target is None:
target = {}
try:
self.policy.enforce(req.context, action, {})
self.policy.enforce(req.context, action, target)
except exception.Forbidden:
raise HTTPForbidden()
@ -466,9 +468,20 @@ class Controller(controller.BaseController):
:raises HTTPNotFound if image is not available to user
"""
self._enforce(req, 'get_image')
self._enforce(req, 'download_image')
image_meta = self.get_active_image_meta_or_404(req, id)
try:
image_meta = self.get_active_image_meta_or_404(req, id)
except HTTPNotFound:
# provision for backward-compatibility breaking issue
# catch the 404 exception and raise it after enforcing
# the policy
with excutils.save_and_reraise_exception():
self._enforce(req, 'download_image')
else:
target = utils.create_mashup_dict(image_meta)
self._enforce(req, 'download_image', target=target)
self._enforce_read_protected_props(image_meta, req)

View File

@ -270,6 +270,27 @@ def get_image_meta_from_headers(response):
return result
def create_mashup_dict(image_meta):
"""
Returns a dictionary-like mashup of the image core properties
and the image custom properties from given image metadata.
:param image_meta: metadata of image with core and custom properties
"""
def get_items():
for key, value in six.iteritems(image_meta):
if isinstance(value, dict):
for subkey, subvalue in six.iteritems(
create_mashup_dict(value)):
if subkey not in image_meta:
yield subkey, subvalue
else:
yield key, value
return dict(get_items())
def safe_mkdirs(path):
try:
os.makedirs(path)

View File

@ -557,3 +557,159 @@ class TestApi(functional.FunctionalTest):
self.assertEqual('GET', response.get('allow'))
self.stop_servers()
def test_download_non_exists_image_raises_http_forbidden(self):
"""
We test the following sequential series of actions:
0. POST /images with public image named Image1
and no custom properties
- Verify 201 returned
1. HEAD image
- Verify HTTP headers have correct information we just added
2. GET image
- Verify all information on image we just added is correct
3. DELETE image1
- Delete the newly added image
4. GET image
- Verify that 403 HTTPForbidden exception is raised prior to
404 HTTPNotFound
"""
self.cleanup()
self.start_servers(**self.__dict__.copy())
image_data = "*" * FIVE_KB
headers = minimal_headers('Image1')
path = "http://%s:%d/v1/images" % ("127.0.0.1", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'POST', headers=headers,
body=image_data)
self.assertEqual(response.status, 201)
data = jsonutils.loads(content)
image_id = data['image']['id']
self.assertEqual(data['image']['checksum'],
hashlib.md5(image_data).hexdigest())
self.assertEqual(data['image']['size'], FIVE_KB)
self.assertEqual(data['image']['name'], "Image1")
self.assertEqual(data['image']['is_public'], True)
# 1. HEAD image
# Verify image found now
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'HEAD')
self.assertEqual(response.status, 200)
self.assertEqual(response['x-image-meta-name'], "Image1")
# 2. GET /images
# Verify one public image
path = "http://%s:%d/v1/images" % ("127.0.0.1", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
expected_result = {"images": [
{"container_format": "ovf",
"disk_format": "raw",
"id": image_id,
"name": "Image1",
"checksum": "c2e5db72bd7fd153f53ede5da5a06de3",
"size": 5120}]}
self.assertEqual(jsonutils.loads(content), expected_result)
# 3. DELETE image1
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'DELETE')
self.assertEqual(response.status, 200)
# 4. GET image
# Verify that 403 HTTPForbidden exception is raised prior to
# 404 HTTPNotFound
rules = {"download_image": '!'}
self.set_policy_rules(rules)
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 403)
self.stop_servers()
def test_download_non_exists_image_raises_http_not_found(self):
"""
We test the following sequential series of actions:
0. POST /images with public image named Image1
and no custom properties
- Verify 201 returned
1. HEAD image
- Verify HTTP headers have correct information we just added
2. GET image
- Verify all information on image we just added is correct
3. DELETE image1
- Delete the newly added image
4. GET image
- Verify that 404 HTTPNotFound exception is raised
"""
self.cleanup()
self.start_servers(**self.__dict__.copy())
image_data = "*" * FIVE_KB
headers = minimal_headers('Image1')
path = "http://%s:%d/v1/images" % ("127.0.0.1", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'POST', headers=headers,
body=image_data)
self.assertEqual(response.status, 201)
data = jsonutils.loads(content)
image_id = data['image']['id']
self.assertEqual(data['image']['checksum'],
hashlib.md5(image_data).hexdigest())
self.assertEqual(data['image']['size'], FIVE_KB)
self.assertEqual(data['image']['name'], "Image1")
self.assertEqual(data['image']['is_public'], True)
# 1. HEAD image
# Verify image found now
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'HEAD')
self.assertEqual(response.status, 200)
self.assertEqual(response['x-image-meta-name'], "Image1")
# 2. GET /images
# Verify one public image
path = "http://%s:%d/v1/images" % ("127.0.0.1", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
expected_result = {"images": [
{"container_format": "ovf",
"disk_format": "raw",
"id": image_id,
"name": "Image1",
"checksum": "c2e5db72bd7fd153f53ede5da5a06de3",
"size": 5120}]}
self.assertEqual(jsonutils.loads(content), expected_result)
# 3. DELETE image1
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'DELETE')
self.assertEqual(response.status, 200)
# 4. GET image
# Verify that 404 HTTPNotFound exception is raised
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 404)
self.stop_servers()

View File

@ -520,6 +520,138 @@ class TestImages(functional.FunctionalTest):
self.stop_servers()
def test_download_image_not_allowed_using_restricted_policy(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
# Create an image
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'member'})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Returned image entity
image = jsonutils.loads(response.text)
image_id = image['id']
expected_image = {
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
'min_disk': 0,
'min_ram': 0,
'schema': '/v2/schemas/image',
}
for key, value in six.iteritems(expected_image):
self.assertEqual(image[key], value, key)
# Upload data to image
path = self._url('/v2/images/%s/file' % image_id)
headers = self._headers({'Content-Type': 'application/octet-stream'})
response = requests.put(path, headers=headers, data='ZZZZZ')
self.assertEqual(204, response.status_code)
# Get an image should fail
path = self._url('/v2/images/%s/file' % image_id)
headers = self._headers({'Content-Type': 'application/octet-stream',
'X-Roles': '_member_'})
response = requests.get(path, headers=headers)
self.assertEqual(403, response.status_code)
# Image Deletion should work
path = self._url('/v2/images/%s' % image_id)
response = requests.delete(path, headers=self._headers())
self.assertEqual(204, response.status_code)
# This image should be no longer be directly accessible
path = self._url('/v2/images/%s' % image_id)
response = requests.get(path, headers=self._headers())
self.assertEqual(404, response.status_code)
self.stop_servers()
def test_download_image_allowed_using_restricted_policy(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
# Create an image
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'member'})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Returned image entity
image = jsonutils.loads(response.text)
image_id = image['id']
expected_image = {
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
'min_disk': 0,
'min_ram': 0,
'schema': '/v2/schemas/image',
}
for key, value in six.iteritems(expected_image):
self.assertEqual(image[key], value, key)
# Upload data to image
path = self._url('/v2/images/%s/file' % image_id)
headers = self._headers({'Content-Type': 'application/octet-stream'})
response = requests.put(path, headers=headers, data='ZZZZZ')
self.assertEqual(204, response.status_code)
# Get an image should be allowed
path = self._url('/v2/images/%s/file' % image_id)
headers = self._headers({'Content-Type': 'application/octet-stream',
'X-Roles': 'member'})
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code)
# Image Deletion should work
path = self._url('/v2/images/%s' % image_id)
response = requests.delete(path, headers=self._headers())
self.assertEqual(204, response.status_code)
# This image should be no longer be directly accessible
path = self._url('/v2/images/%s' % image_id)
response = requests.get(path, headers=self._headers())
self.assertEqual(404, response.status_code)
self.stop_servers()
def test_image_size_cap(self):
self.api_server.image_size_cap = 128
self.start_servers(**self.__dict__.copy())

View File

@ -145,6 +145,61 @@ class TestUtils(test_utils.BaseTestCase):
self.assertEqual({'x-image-meta-property-test': u'test'},
actual_test2)
def test_create_mashup_dict_with_different_core_custom_properties(self):
image_meta = {
'id': 'test-123',
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': True,
'updated_at': '',
'properties': {'test_key': 'test_1234'},
}
mashup_dict = utils.create_mashup_dict(image_meta)
self.assertFalse('properties' in mashup_dict)
self.assertEqual(image_meta['properties']['test_key'],
mashup_dict['test_key'])
def test_create_mashup_dict_with_same_core_custom_properties(self):
image_meta = {
'id': 'test-123',
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': True,
'updated_at': '',
'properties': {'min_ram': '2048M'},
}
mashup_dict = utils.create_mashup_dict(image_meta)
self.assertFalse('properties' in mashup_dict)
self.assertNotEqual(image_meta['properties']['min_ram'],
mashup_dict['min_ram'])
self.assertEqual(image_meta['min_ram'], mashup_dict['min_ram'])
def test_create_pretty_table(self):
class MyPrettyTable(utils.PrettyTable):
def __init__(self):

View File

@ -19,12 +19,21 @@ import webob
import glance.api.middleware.cache
from glance.common import exception
from glance import context
import glance.db.sqlalchemy.api as db
import glance.registry.client.v1.api as registry
from glance.tests.unit import base
from glance.tests.unit import utils as unit_test_utils
class ImageStub(object):
def __init__(self, image_id, extra_properties={}, visibility='private'):
self.image_id = image_id
self.visibility = visibility
self.status = 'active'
self.extra_properties = extra_properties
self.checksum = 'c1234'
self.size = 123456789
class TestCacheMiddlewareURLMatching(testtools.TestCase):
def test_v1_no_match_detail(self):
req = webob.Request.blank('/v1/images/detail')
@ -64,16 +73,20 @@ class TestCacheMiddlewareRequestStashCacheInfo(testtools.TestCase):
self.middleware = glance.api.middleware.cache.CacheFilter
def test_stash_cache_request_info(self):
self.middleware._stash_request_info(self.request, 'asdf', 'GET')
self.middleware._stash_request_info(self.request, 'asdf', 'GET', 'v2')
self.assertEqual(self.request.environ['api.cache.image_id'], 'asdf')
self.assertEqual(self.request.environ['api.cache.method'], 'GET')
self.assertEqual(self.request.environ['api.cache.version'], 'v2')
def test_fetch_cache_request_info(self):
self.request.environ['api.cache.image_id'] = 'asdf'
self.request.environ['api.cache.method'] = 'GET'
(image_id, method) = self.middleware._fetch_request_info(self.request)
self.request.environ['api.cache.version'] = 'v2'
(image_id, method, version) = \
self.middleware._fetch_request_info(self.request)
self.assertEqual('asdf', image_id)
self.assertEqual('GET', method)
self.assertEqual('v2', version)
def test_fetch_cache_request_info_unset(self):
out = self.middleware._fetch_request_info(self.request)
@ -159,35 +172,56 @@ class TestCacheMiddlewareProcessRequest(base.IsolatedUnitTest):
Test for determining that when an admin tries to download a deleted
image it returns 404 Not Found error.
"""
def fake_get_image_metadata(context, image_id):
return {'deleted': True}
def dummy_img_iterator():
for i in range(3):
yield i
image_id = 'test1'
image_meta = {
'id': image_id,
'name': 'fake_image',
'status': 'deleted',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': True,
'updated_at': '',
'properties': {},
}
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext()
cache_filter = ProcessRequestTestCacheFilter()
self.stubs.Set(registry, 'get_image_metadata',
fake_get_image_metadata)
self.assertRaises(exception.NotFound, cache_filter._process_v1_request,
request, image_id, dummy_img_iterator)
request, image_id, dummy_img_iterator, image_meta)
def test_process_v1_request_for_deleted_but_cached_image(self):
"""
Test for determining image is deleted from cache when it is not found
in Glance Registry.
"""
def fake_process_v1_request(request, image_id, image_iterator):
def fake_process_v1_request(request, image_id, image_iterator,
image_meta):
raise exception.NotFound()
def fake_get_v1_image_metadata(request, image_id):
return {'properties': {}}
image_id = 'test1'
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext()
cache_filter = ProcessRequestTestCacheFilter()
self.stubs.Set(cache_filter, '_get_v1_image_metadata',
fake_get_v1_image_metadata)
self.stubs.Set(cache_filter, '_process_v1_request',
fake_process_v1_request)
cache_filter.process_request(request)
@ -195,21 +229,36 @@ class TestCacheMiddlewareProcessRequest(base.IsolatedUnitTest):
def test_v1_process_request_image_fetch(self):
def fake_get_image_metadata(context, image_id):
return {'is_public': True, 'deleted': False, 'size': '20'}
def dummy_img_iterator():
for i in range(3):
yield i
image_id = 'test1'
image_meta = {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'properties': {},
}
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext()
cache_filter = ProcessRequestTestCacheFilter()
self.stubs.Set(registry, 'get_image_metadata',
fake_get_image_metadata)
actual = cache_filter._process_v1_request(
request, image_id, dummy_img_iterator)
request, image_id, dummy_img_iterator, image_meta)
self.assertTrue(actual)
def test_v1_remove_location_image_fetch(self):
@ -218,31 +267,44 @@ class TestCacheMiddlewareProcessRequest(base.IsolatedUnitTest):
def show(self, response, raw_response):
return 'location_data' in raw_response['image_meta']
def fake_get_image_metadata(context, image_id):
return {'location_data': {'url': "file:///some/path",
'metadata': {}},
'is_public': True, 'deleted': False, 'size': '20'}
def dummy_img_iterator():
for i in range(3):
yield i
image_id = 'test1'
image_meta = {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'properties': {},
}
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext()
cache_filter = ProcessRequestTestCacheFilter()
cache_filter.serializer = CheckNoLocationDataSerializer()
self.stubs.Set(registry, 'get_image_metadata',
fake_get_image_metadata)
actual = cache_filter._process_v1_request(
request, image_id, dummy_img_iterator)
request, image_id, dummy_img_iterator, image_meta)
self.assertFalse(actual)
def test_verify_metadata_deleted_image(self):
"""
Test verify_metadata raises exception.NotFound for a deleted image
"""
image_meta = {'is_public': True, 'deleted': True}
image_meta = {'status': 'deleted', 'is_public': True, 'deleted': True}
cache_filter = ProcessRequestTestCacheFilter()
self.assertRaises(exception.NotFound,
cache_filter._verify_metadata, image_meta)
@ -258,7 +320,8 @@ class TestCacheMiddlewareProcessRequest(base.IsolatedUnitTest):
return image_size
image_id = 'test1'
image_meta = {'size': 0, 'deleted': False, 'id': image_id}
image_meta = {'size': 0, 'deleted': False, 'id': image_id,
'status': 'active'}
cache_filter = ProcessRequestTestCacheFilter()
self.stubs.Set(cache_filter.cache, 'get_image_size',
fake_get_image_size)
@ -270,45 +333,39 @@ class TestCacheMiddlewareProcessRequest(base.IsolatedUnitTest):
for i in range(3):
yield i
def fake_image_get(self, image_id):
return {
'id': 'test1',
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c352f4e7121c6eae958bc1570324f17e',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'properties': {},
}
def fake_image_tag_get_all(context, image_id, session=None):
return None
image_id = 'test1'
request = webob.Request.blank('/v2/images/test1/file')
request.context = context.RequestContext()
request.environ['api.cache.image'] = ImageStub(image_id)
self.stubs.Set(db, 'image_get', fake_image_get)
self.stubs.Set(db, 'image_tag_get_all', fake_image_tag_get_all)
image_meta = {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'properties': {},
}
cache_filter = ProcessRequestTestCacheFilter()
response = cache_filter._process_v2_request(
request, image_id, dummy_img_iterator)
request, image_id, dummy_img_iterator, image_meta)
self.assertEqual(response.headers['Content-Type'],
'application/octet-stream')
self.assertEqual(response.headers['Content-MD5'],
'c352f4e7121c6eae958bc1570324f17e')
'c1234')
self.assertEqual(response.headers['Content-Length'],
'123456789')
@ -317,17 +374,196 @@ class TestCacheMiddlewareProcessRequest(base.IsolatedUnitTest):
Test for cache middleware skip processing when request
context has not 'download_image' role.
"""
def fake_get_v1_image_metadata(*args, **kwargs):
return {'properties': {}}
image_id = 'test1'
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext()
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {'download_image': '!'}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
self.assertRaises(webob.exc.HTTPForbidden,
cache_filter.process_request, request)
self.assertIsNone(cache_filter.process_request(request))
def test_v1_process_request_download_restricted(self):
"""
Test process_request for v1 api where _member_ role not able to
download the image with custom property.
"""
image_id = 'test1'
def fake_get_v1_image_metadata(*args, **kwargs):
return {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'x_test_key': 'test_1234'
}
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['_member_'])
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
self.assertRaises(webob.exc.HTTPForbidden,
cache_filter.process_request, request)
def test_v1_process_request_download_permitted(self):
"""
Test process_request for v1 api where member role able to
download the image with custom property.
"""
image_id = 'test1'
def fake_get_v1_image_metadata(*args, **kwargs):
return {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'x_test_key': 'test_1234'
}
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['member'])
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
actual = cache_filter.process_request(request)
self.assertTrue(actual)
def test_v1_process_request_image_meta_not_found(self):
"""
Test process_request for v1 api where registry raises NotFound
exception as image metadata not found.
"""
image_id = 'test1'
def fake_get_v1_image_metadata(*args, **kwargs):
raise exception.NotFound()
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['_member_'])
cache_filter = ProcessRequestTestCacheFilter()
self.stubs.Set(registry, 'get_image_metadata',
fake_get_v1_image_metadata)
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
self.assertRaises(webob.exc.HTTPNotFound,
cache_filter.process_request, request)
def test_v2_process_request_download_restricted(self):
"""
Test process_request for v2 api where _member_ role not able to
download the image with custom property.
"""
image_id = 'test1'
extra_properties = {
'x_test_key': 'test_1234'
}
def fake_get_v2_image_metadata(*args, **kwargs):
image = ImageStub(image_id, extra_properties=extra_properties)
request.environ['api.cache.image'] = image
return glance.api.policy.ImageTarget(image)
request = webob.Request.blank('/v2/images/test1/file')
request.context = context.RequestContext(roles=['_member_'])
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._get_v2_image_metadata = fake_get_v2_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
self.assertRaises(webob.exc.HTTPForbidden,
cache_filter.process_request, request)
def test_v2_process_request_download_permitted(self):
"""
Test process_request for v2 api where member role able to
download the image with custom property.
"""
image_id = 'test1'
extra_properties = {
'x_test_key': 'test_1234'
}
def fake_get_v2_image_metadata(*args, **kwargs):
image = ImageStub(image_id, extra_properties=extra_properties)
request.environ['api.cache.image'] = image
return glance.api.policy.ImageTarget(image)
request = webob.Request.blank('/v2/images/test1/file')
request.context = context.RequestContext(roles=['member'])
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._get_v2_image_metadata = fake_get_v2_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
actual = cache_filter.process_request(request)
self.assertTrue(actual)
class TestCacheMiddlewareProcessResponse(base.IsolatedUnitTest):
@ -350,10 +586,14 @@ class TestCacheMiddlewareProcessResponse(base.IsolatedUnitTest):
def test_process_response(self):
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET')
return ('test1', 'GET', 'v1')
def fake_get_v1_image_metadata(*args, **kwargs):
return {'properties': {}}
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
image_id = 'test1'
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext()
@ -368,10 +608,14 @@ class TestCacheMiddlewareProcessResponse(base.IsolatedUnitTest):
when request context has not 'download_image' role.
"""
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET')
return ('test1', 'GET', 'v1')
def fake_get_v1_image_metadata(*args, **kwargs):
return {'properties': {}}
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {'download_image': '!'}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
@ -383,3 +627,206 @@ class TestCacheMiddlewareProcessResponse(base.IsolatedUnitTest):
self.assertRaises(webob.exc.HTTPForbidden,
cache_filter.process_response, resp)
self.assertEqual([''], resp.app_iter)
def test_v1_process_response_download_restricted(self):
"""
Test process_response for v1 api where _member_ role not able to
download the image with custom property.
"""
image_id = 'test1'
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET', 'v1')
def fake_get_v1_image_metadata(*args, **kwargs):
return {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'x_test_key': 'test_1234'
}
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['_member_'])
resp = webob.Response(request=request)
self.assertRaises(webob.exc.HTTPForbidden,
cache_filter.process_response, resp)
def test_v1_process_response_download_permitted(self):
"""
Test process_response for v1 api where member role able to
download the image with custom property.
"""
image_id = 'test1'
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET', 'v1')
def fake_get_v1_image_metadata(*args, **kwargs):
return {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': '',
'x_test_key': 'test_1234'
}
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['member'])
resp = webob.Response(request=request)
actual = cache_filter.process_response(resp)
self.assertEqual(actual, resp)
def test_v1_process_response_image_meta_not_found(self):
"""
Test process_response for v1 api where registry raises NotFound
exception as image metadata not found.
"""
image_id = 'test1'
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET', 'v1')
def fake_get_v1_image_metadata(*args, **kwargs):
raise exception.NotFound()
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
self.stubs.Set(registry, 'get_image_metadata',
fake_get_v1_image_metadata)
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['_member_'])
resp = webob.Response(request=request)
self.assertRaises(webob.exc.HTTPNotFound,
cache_filter.process_response, resp)
def test_v2_process_response_download_restricted(self):
"""
Test process_response for v2 api where _member_ role not able to
download the image with custom property.
"""
image_id = 'test1'
extra_properties = {
'x_test_key': 'test_1234'
}
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET', 'v2')
def fake_get_v2_image_metadata(*args, **kwargs):
image = ImageStub(image_id, extra_properties=extra_properties)
request.environ['api.cache.image'] = image
return glance.api.policy.ImageTarget(image)
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v2_image_metadata = fake_get_v2_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
request = webob.Request.blank('/v2/images/test1/file')
request.context = context.RequestContext(roles=['_member_'])
resp = webob.Response(request=request)
self.assertRaises(webob.exc.HTTPForbidden,
cache_filter.process_response, resp)
def test_v2_process_response_download_permitted(self):
"""
Test process_response for v2 api where member role able to
download the image with custom property.
"""
image_id = 'test1'
extra_properties = {
'x_test_key': 'test_1234'
}
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET', 'v2')
def fake_get_v2_image_metadata(*args, **kwargs):
image = ImageStub(image_id, extra_properties=extra_properties)
request.environ['api.cache.image'] = image
return glance.api.policy.ImageTarget(image)
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v2_image_metadata = fake_get_v2_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
request = webob.Request.blank('/v2/images/test1/file')
request.context = context.RequestContext(roles=['member'])
resp = webob.Response(request=request)
actual = cache_filter.process_response(resp)
self.assertEqual(actual, resp)

View File

@ -44,10 +44,19 @@ class ImageRepoStub(object):
class ImageStub(object):
def __init__(self, image_id, visibility='private'):
def __init__(self, image_id=None, visibility='private',
container_format='bear', disk_format='raw',
status='active', extra_properties=None):
if extra_properties is None:
extra_properties = {}
self.image_id = image_id
self.visibility = visibility
self.status = 'active'
self.container_format = container_format
self.disk_format = disk_format
self.status = status
self.extra_properties = extra_properties
def delete(self):
self.status = 'deleted'
@ -294,11 +303,19 @@ class TestImagePolicy(test_utils.BaseTestCase):
image_factory.new_image(visibility='public')
self.policy.enforce.assert_called_once_with({}, "publicize_image", {})
def test_image_get_data(self):
def test_image_get_data_policy_enforced_with_target(self):
extra_properties = {
'test_key': 'test_4321'
}
image_stub = ImageStub(UUID1, extra_properties=extra_properties)
image = glance.api.policy.ImageProxy(image_stub, {}, self.policy)
self.policy.enforce.side_effect = exception.Forbidden
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
glance.api.policy.ImageTarget = mock.Mock()
target = glance.api.policy.ImageTarget(image)
self.assertRaises(exception.Forbidden, image.get_data)
self.policy.enforce.assert_called_once_with({}, "download_image", {})
self.policy.enforce.assert_called_once_with({}, "download_image",
target=target)
def test_image_set_data(self):
self.policy.enforce.side_effect = exception.Forbidden

View File

@ -2440,6 +2440,32 @@ class TestGlanceAPI(base.IsolatedUnitTest):
res = req.get_response(self.api)
self.assertEqual(res.status_int, 403)
def test_show_image_restricted_download_for_core_property(self):
rules = {
"restricted":
"not ('1024M':%(min_ram)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
req = webob.Request.blank("/images/%s" % UUID2)
req.headers['X-Auth-Token'] = 'user:tenant:_member_'
req.headers['min_ram'] = '1024M'
res = req.get_response(self.api)
self.assertEqual(res.status_int, 403)
def test_show_image_restricted_download_for_custom_property(self):
rules = {
"restricted":
"not ('test_1234'==%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
req = webob.Request.blank("/images/%s" % UUID2)
req.headers['X-Auth-Token'] = 'user:tenant:_member_'
req.headers['x_test_key'] = 'test_1234'
res = req.get_response(self.api)
self.assertEqual(res.status_int, 403)
def test_delete_image(self):
req = webob.Request.blank("/images/%s" % UUID2)
req.method = 'DELETE'