merge trunk

This commit is contained in:
Jason Koelker 2011-07-22 19:07:38 -05:00
commit e4eaef7bb8
17 changed files with 508 additions and 52 deletions

@ -13,6 +13,7 @@ Jinwoo 'Joseph' Suh <jsuh@isi.edu>
Josh Kearney <josh@jk0.org>
Justin Shepherd <jshepher@rackspace.com>
Ken Pepple <ken.pepple@gmail.com>
Kevin L. Mitchell <kevin.mitchell@rackspace.com>
Matt Dietz <matt.dietz@rackspace.com>
Monty Taylor <mordred@inaugust.com>
Rick Clark <rick@openstack.org>

@ -57,7 +57,7 @@ swift_store_create_container_on_put = False
delayed_delete = False
[pipeline:glance-api]
pipeline = versionnegotiation apiv1app
pipeline = versionnegotiation context apiv1app
[pipeline:versions]
pipeline = versionsapp
@ -70,3 +70,6 @@ paste.app_factory = glance.api.v1:app_factory
[filter:versionnegotiation]
paste.filter_factory = glance.api.middleware.version_negotiation:filter_factory
[filter:context]
paste.filter_factory = glance.common.context:filter_factory

@ -29,5 +29,11 @@ sql_connection = sqlite:///glance.sqlite
# before MySQL can drop the connection.
sql_idle_timeout = 3600
[app:glance-registry]
[pipeline:glance-registry]
pipeline = context registryapp
[app:registryapp]
paste.app_factory = glance.registry.server:app_factory
[filter:context]
paste.filter_factory = glance.common.context:filter_factory

@ -27,7 +27,8 @@ import sys
import webob
from webob.exc import (HTTPNotFound,
HTTPConflict,
HTTPBadRequest)
HTTPBadRequest,
HTTPForbidden)
from glance.common import exception
from glance.common import wsgi
@ -96,7 +97,8 @@ class Controller(object):
"""
params = self._get_query_params(req)
try:
images = registry.get_images_list(self.options, **params)
images = registry.get_images_list(self.options, req.context,
**params)
except exception.Invalid, e:
raise HTTPBadRequest(explanation=str(e))
@ -126,7 +128,8 @@ class Controller(object):
"""
params = self._get_query_params(req)
try:
images = registry.get_images_detail(self.options, **params)
images = registry.get_images_detail(self.options, req.context,
**params)
except exception.Invalid, e:
raise HTTPBadRequest(explanation=str(e))
return dict(images=images)
@ -226,6 +229,7 @@ class Controller(object):
try:
image_meta = registry.add_image_metadata(self.options,
req.context,
image_meta)
return image_meta
except exception.Duplicate:
@ -238,6 +242,11 @@ class Controller(object):
for line in msg.split('\n'):
logger.error(line)
raise HTTPBadRequest(msg, request=req, content_type="text/plain")
except exception.NotAuthorized:
msg = "Not authorized to reserve image."
logger.error(msg)
raise HTTPForbidden(msg, request=req,
content_type="text/plain")
def _upload(self, req, image_meta):
"""
@ -267,7 +276,7 @@ class Controller(object):
image_id = image_meta['id']
logger.debug("Setting image %s to status 'saving'" % image_id)
registry.update_image_metadata(self.options, image_id,
registry.update_image_metadata(self.options, req.context, image_id,
{'status': 'saving'})
try:
logger.debug("Uploading image data for image %(image_id)s "
@ -294,7 +303,8 @@ class Controller(object):
logger.debug("Updating image %(image_id)s data. "
"Checksum set to %(checksum)s, size set "
"to %(size)d" % locals())
registry.update_image_metadata(self.options, image_id,
registry.update_image_metadata(self.options, req.context,
image_id,
{'checksum': checksum,
'size': size})
@ -306,6 +316,13 @@ class Controller(object):
self._safe_kill(req, image_id)
raise HTTPConflict(msg, request=req)
except exception.NotAuthorized, e:
msg = ("Unauthorized upload attempt: %s") % str(e)
logger.error(msg)
self._safe_kill(req, image_id)
raise HTTPForbidden(msg, request=req,
content_type='text/plain')
except Exception, e:
msg = ("Error uploading image: %s") % str(e)
logger.error(msg)
@ -325,6 +342,7 @@ class Controller(object):
image_meta['location'] = location
image_meta['status'] = 'active'
return registry.update_image_metadata(self.options,
req.context,
image_id,
image_meta)
@ -336,6 +354,7 @@ class Controller(object):
:param image_id: Opaque image identifier
"""
registry.update_image_metadata(self.options,
req.context,
image_id,
{'status': 'killed'})
@ -404,6 +423,12 @@ class Controller(object):
and the request body is not application/octet-stream
image data.
"""
if req.context.read_only:
msg = "Read-only access"
logger.debug(msg)
raise HTTPForbidden(msg, request=req,
content_type="text/plain")
image_meta = self._reserve(req, image_meta)
image_id = image_meta['id']
@ -425,6 +450,12 @@ class Controller(object):
:retval Returns the updated image information as a mapping
"""
if req.context.read_only:
msg = "Read-only access"
logger.debug(msg)
raise HTTPForbidden(msg, request=req,
content_type="text/plain")
orig_image_meta = self.get_image_meta_or_404(req, id)
orig_status = orig_image_meta['status']
@ -432,8 +463,9 @@ class Controller(object):
raise HTTPConflict("Cannot upload to an unqueued image")
try:
image_meta = registry.update_image_metadata(self.options, id,
image_meta, True)
image_meta = registry.update_image_metadata(self.options,
req.context, id,
image_meta, True)
if image_data is not None:
image_meta = self._upload_and_activate(req, image_meta)
except exception.Invalid, e:
@ -457,6 +489,12 @@ class Controller(object):
:raises HttpNotAuthorized if image or any chunk is not
deleteable by the requesting user
"""
if req.context.read_only:
msg = "Read-only access"
logger.debug(msg)
raise HTTPForbidden(msg, request=req,
content_type="text/plain")
image = self.get_image_meta_or_404(req, id)
# The image's location field may be None in the case
@ -465,7 +503,7 @@ class Controller(object):
# See https://bugs.launchpad.net/glance/+bug/747799
if image['location']:
schedule_delete_from_backend(image['location'], self.options, id)
registry.delete_image_metadata(self.options, id)
registry.delete_image_metadata(self.options, req.context, id)
def get_image_meta_or_404(self, request, id):
"""
@ -478,12 +516,18 @@ class Controller(object):
:raises HTTPNotFound if image does not exist
"""
try:
return registry.get_image_metadata(self.options, id)
return registry.get_image_metadata(self.options,
request.context, id)
except exception.NotFound:
msg = "Image with identifier %s not found" % id
logger.debug(msg)
raise HTTPNotFound(msg, request=request,
content_type='text/plain')
except exception.NotAuthorized:
msg = "Unauthorized image access"
logger.debug(msg)
raise HTTPForbidden(msg, request=request,
content_type='text/plain')
def get_store_or_400(self, request, store_name):
"""

@ -35,7 +35,8 @@ class V1Client(base_client.BaseClient):
DEFAULT_PORT = 9292
def __init__(self, host, port=None, use_ssl=False, doc_root="/v1"):
def __init__(self, host, port=None, use_ssl=False, doc_root="/v1",
auth_tok=None):
"""
Creates a new client to a Glance API service.
@ -43,10 +44,11 @@ class V1Client(base_client.BaseClient):
:param port: The port where Glance resides (defaults to 9292)
:param use_ssl: Should we use HTTPS? (defaults to False)
:param doc_root: Prefix for all URLs we request from host
:param auth_tok: The auth token to pass to the server
"""
port = port or self.DEFAULT_PORT
self.doc_root = doc_root
super(Client, self).__init__(host, port, use_ssl)
super(Client, self).__init__(host, port, use_ssl, auth_tok)
def do_request(self, method, action, body=None, headers=None, params=None):
action = "%s/%s" % (self.doc_root, action.lstrip("/"))

@ -41,17 +41,19 @@ class BaseClient(object):
CHUNKSIZE = 65536
def __init__(self, host, port, use_ssl):
def __init__(self, host, port, use_ssl, auth_tok):
"""
Creates a new client to some service.
:param host: The host where service resides
:param port: The port where service resides
:param use_ssl: Should we use HTTPS?
:param auth_tok: The auth token to pass to the server
"""
self.host = host
self.port = port
self.use_ssl = use_ssl
self.auth_tok = auth_tok
self.connection = None
def get_connection_type(self):
@ -99,6 +101,8 @@ class BaseClient(object):
try:
connection_type = self.get_connection_type()
headers = headers or {}
if 'x-auth-token' not in headers and self.auth_tok:
headers['x-auth-token'] = self.auth_tok
c = connection_type(self.host, self.port)
# Do a simple request or a chunked request, depending

96
glance/common/context.py Normal file

@ -0,0 +1,96 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from glance.common import utils
from glance.common import wsgi
class RequestContext(object):
"""
Stores information about the security context under which the user
accesses the system, as well as additional request information.
"""
def __init__(self, auth_tok=None, user=None, tenant=None, is_admin=False,
read_only=False):
self.auth_tok = auth_tok
self.user = user
self.tenant = tenant
self.is_admin = is_admin
self.read_only = read_only
def is_image_visible(self, image):
"""Return True if the image is visible in this context."""
# Is admin == image visible
if self.is_admin:
return True
# No owner == image visible
if image.owner is None:
return True
# Image is_public == image visible
if image.is_public:
return True
# Private image
return self.owner is not None and self.owner == image.owner
@property
def owner(self):
"""Return the owner to correlate with an image."""
return self.tenant
class ContextMiddleware(wsgi.Middleware):
def __init__(self, app, options):
self.options = options
super(ContextMiddleware, self).__init__(app)
def make_context(self, *args, **kwargs):
"""
Create a context with the given arguments.
"""
# Determine the context class to use
ctxcls = RequestContext
if 'context_class' in self.options:
ctxcls = utils.import_class(self.options['context_class'])
return ctxcls(*args, **kwargs)
def process_request(self, req):
"""
Extract any authentication information in the request and
construct an appropriate context from it.
"""
# Use the default empty context, with admin turned on for
# backwards compatibility
req.context = self.make_context(is_admin=True)
def filter_factory(global_conf, **local_conf):
"""
Factory method for paste.deploy
"""
conf = global_conf.copy()
conf.update(local_conf)
def filter(app):
return ContextMiddleware(app, conf)
return filter

@ -26,33 +26,33 @@ from glance.registry import client
logger = logging.getLogger('glance.registry')
def get_registry_client(options):
def get_registry_client(options, cxt):
host = options['registry_host']
port = int(options['registry_port'])
return client.RegistryClient(host, port)
return client.RegistryClient(host, port, auth_tok=cxt.auth_tok)
def get_images_list(options, **kwargs):
c = get_registry_client(options)
def get_images_list(options, context, **kwargs):
c = get_registry_client(options, context)
return c.get_images(**kwargs)
def get_images_detail(options, **kwargs):
c = get_registry_client(options)
def get_images_detail(options, context, **kwargs):
c = get_registry_client(options, context)
return c.get_images_detailed(**kwargs)
def get_image_metadata(options, image_id):
c = get_registry_client(options)
def get_image_metadata(options, context, image_id):
c = get_registry_client(options, context)
return c.get_image(image_id)
def add_image_metadata(options, image_meta):
def add_image_metadata(options, context, image_meta):
if options['debug']:
logger.debug("Adding image metadata...")
_debug_print_metadata(image_meta)
c = get_registry_client(options)
c = get_registry_client(options, context)
new_image_meta = c.add_image(image_meta)
if options['debug']:
@ -63,12 +63,13 @@ def add_image_metadata(options, image_meta):
return new_image_meta
def update_image_metadata(options, image_id, image_meta, purge_props=False):
def update_image_metadata(options, context, image_id, image_meta,
purge_props=False):
if options['debug']:
logger.debug("Updating image metadata for image %s...", image_id)
_debug_print_metadata(image_meta)
c = get_registry_client(options)
c = get_registry_client(options, context)
new_image_meta = c.update_image(image_id, image_meta, purge_props)
if options['debug']:
@ -79,9 +80,9 @@ def update_image_metadata(options, image_id, image_meta, purge_props=False):
return new_image_meta
def delete_image_metadata(options, image_id):
def delete_image_metadata(options, context, image_id):
logger.debug("Deleting image metadata for image %s...", image_id)
c = get_registry_client(options)
c = get_registry_client(options, context)
return c.delete_image(image_id)

@ -33,16 +33,17 @@ class RegistryClient(BaseClient):
DEFAULT_PORT = 9191
def __init__(self, host, port=None, use_ssl=False):
def __init__(self, host, port=None, use_ssl=False, auth_tok=None):
"""
Creates a new client to a Glance Registry service.
:param host: The host where Glance resides
:param port: The port where Glance resides (defaults to 9191)
:param use_ssl: Should we use HTTPS? (defaults to False)
:param auth_tok: The auth token to pass to the server
"""
port = port or self.DEFAULT_PORT
super(RegistryClient, self).__init__(host, port, use_ssl)
super(RegistryClient, self).__init__(host, port, use_ssl, auth_tok)
def get_images(self, **kwargs):
"""

@ -46,7 +46,8 @@ BASE_MODEL_ATTRS = set(['id', 'created_at', 'updated_at', 'deleted_at',
IMAGE_ATTRS = BASE_MODEL_ATTRS | set(['name', 'status', 'size',
'disk_format', 'container_format',
'is_public', 'location', 'checksum'])
'is_public', 'location', 'checksum',
'owner'])
CONTAINER_FORMATS = ['ami', 'ari', 'aki', 'bare', 'ovf']
DISK_FORMATS = ['ami', 'ari', 'aki', 'vhd', 'vmdk', 'raw', 'qcow2', 'vdi',
@ -121,7 +122,7 @@ def image_get(context, image_id, session=None):
"""Get an image or raise if it does not exist."""
session = session or get_session()
try:
return session.query(models.Image).\
image = session.query(models.Image).\
options(joinedload(models.Image.properties)).\
filter_by(deleted=_deleted(context)).\
filter_by(id=image_id).\
@ -129,6 +130,12 @@ def image_get(context, image_id, session=None):
except exc.NoResultFound:
raise exception.NotFound("No image found with ID %s" % image_id)
# Make sure they can look at it
if not context.is_image_visible(image):
raise exception.NotAuthorized("Image not visible to you")
return image
def image_get_all_pending_delete(context, delete_time=None, limit=None):
"""Get all images that are pending deletion
@ -192,6 +199,13 @@ def image_get_all(context, filters=None, marker=None, limit=None,
query = query.filter(models.Image.size <= filters['size_max'])
del filters['size_max']
if 'is_public' in filters and filters['is_public'] is not None:
the_filter = models.Image.is_public == filters['is_public']
if filters['is_public'] and context.owner is not None:
the_filter = or_(the_filter, models.Image.owner == context.owner)
query = query.filter(the_filter)
del filters['is_public']
for (k, v) in filters.pop('properties', {}).items():
query = query.filter(models.Image.properties.any(name=k, value=v))

@ -0,0 +1,82 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from migrate.changeset import *
from sqlalchemy import *
from sqlalchemy.sql import and_, not_
from glance.registry.db.migrate_repo.schema import (
Boolean, DateTime, BigInteger, Integer, String,
Text, from_migration_import)
def get_images_table(meta):
"""
Returns the Table object for the images table that corresponds to
the images table definition of this version.
"""
images = Table('images', meta,
Column('id', Integer(), primary_key=True, nullable=False),
Column('name', String(255)),
Column('disk_format', String(20)),
Column('container_format', String(20)),
Column('size', BigInteger()),
Column('status', String(30), nullable=False),
Column('is_public', Boolean(), nullable=False, default=False,
index=True),
Column('location', Text()),
Column('created_at', DateTime(), nullable=False),
Column('updated_at', DateTime()),
Column('deleted_at', DateTime()),
Column('deleted', Boolean(), nullable=False, default=False,
index=True),
Column('checksum', String(32)),
Column('owner', String(255)),
mysql_engine='InnoDB',
useexisting=True)
return images
def get_image_properties_table(meta):
"""
No changes to the image properties table from 006...
"""
(get_image_properties_table,) = from_migration_import(
'006_key_to_name', ['get_image_properties_table'])
image_properties = get_image_properties_table(meta)
return image_properties
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
images = get_images_table(meta)
owner = Column('owner', String(255))
owner.create(images)
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
images = get_images_table(meta)
images.columns['owner'].drop()

@ -105,6 +105,7 @@ class Image(BASE, ModelBase):
is_public = Column(Boolean, nullable=False, default=False)
location = Column(Text)
checksum = Column(String(32))
owner = Column(String(255))
class ImageProperty(BASE, ModelBase):

@ -61,7 +61,7 @@ class Controller(object):
Get images, wrapping in exception if necessary.
"""
try:
return db_api.image_get_all(None, **params)
return db_api.image_get_all(context, **params)
except exception.NotFound, e:
msg = "Invalid marker. Image could not be found."
raise exc.HTTPBadRequest(explanation=msg)
@ -87,7 +87,7 @@ class Controller(object):
}
"""
params = self._get_query_params(req)
images = self._get_images(None, **params)
images = self._get_images(req.context, **params)
results = []
for image in images:
@ -111,7 +111,7 @@ class Controller(object):
"""
params = self._get_query_params(req)
images = self._get_images(None, **params)
images = self._get_images(req.context, **params)
image_dicts = [make_image_dict(i) for i in images]
return dict(images=image_dicts)
@ -146,7 +146,11 @@ class Controller(object):
filters = {}
properties = {}
filters['is_public'] = self._get_is_public(req)
if req.context.is_admin:
# Only admin gets to look for non-public images
filters['is_public'] = self._get_is_public(req)
else:
filters['is_public'] = True
for param in req.str_params:
if param in SUPPORTED_FILTERS:
filters[param] = req.str_params.get(param)
@ -223,9 +227,15 @@ class Controller(object):
def show(self, req, id):
"""Return data about the given image id."""
try:
image = db_api.image_get(None, id)
image = db_api.image_get(req.context, id)
except exception.NotFound:
raise exc.HTTPNotFound()
except exception.NotAuthorized:
# If it's private and doesn't belong to them, don't let on
# that it exists
logger.info("Access by %s to image %s denied" %
(req.context.user, id))
raise exc.HTTPNotFound()
return dict(image=make_image_dict(image))
@ -238,11 +248,19 @@ class Controller(object):
:retval Returns 200 if delete was successful, a fault if not.
"""
context = None
if req.context.read_only:
raise exc.HTTPForbidden()
try:
db_api.image_destroy(context, id)
db_api.image_destroy(req.context, id)
except exception.NotFound:
return exc.HTTPNotFound()
except exception.NotAuthorized:
# If it's private and doesn't belong to them, don't let on
# that it exists
logger.info("Access by %s to image %s denied" %
(req.context.user, id))
raise exc.HTTPNotFound()
def create(self, req, body):
"""
@ -255,14 +273,20 @@ class Controller(object):
which will include the newly-created image's internal id
in the 'id' field
"""
if req.context.read_only:
raise exc.HTTPForbidden()
image_data = body['image']
# Ensure the image has a status set
image_data.setdefault('status', 'active')
context = None
# Set up the image owner
if not req.context.is_admin or 'owner' not in image_data:
image_data['owner'] = req.context.owner
try:
image_data = db_api.image_create(context, image_data)
image_data = db_api.image_create(req.context, image_data)
return dict(image=make_image_dict(image_data))
except exception.Duplicate:
msg = ("Image with identifier %s already exists!" % id)
@ -283,18 +307,25 @@ class Controller(object):
:retval Returns the updated image information as a mapping,
"""
if req.context.read_only:
raise exc.HTTPForbidden()
image_data = body['image']
# Prohibit modification of 'owner'
if not req.context.is_admin and 'owner' in image_data:
del image_data['owner']
purge_props = req.headers.get("X-Glance-Registry-Purge-Props", "false")
context = None
try:
logger.debug("Updating image %(id)s with metadata: %(image_data)r"
% locals())
if purge_props == "true":
updated_image = db_api.image_update(context, id, image_data,
True)
updated_image = db_api.image_update(req.context, id,
image_data, True)
else:
updated_image = db_api.image_update(context, id, image_data)
updated_image = db_api.image_update(req.context, id,
image_data)
return dict(image=make_image_dict(updated_image))
except exception.Invalid, e:
msg = ("Failed to update image metadata. "
@ -305,6 +336,14 @@ class Controller(object):
raise exc.HTTPNotFound(body='Image not found',
request=req,
content_type='text/plain')
except exception.NotAuthorized:
# If it's private and doesn't belong to them, don't let on
# that it exists
logger.info("Access by %s to image %s denied" %
(req.context.user, id))
raise exc.HTTPNotFound(body='Image not found',
request=req,
content_type='text/plain')
def create_resource(options):

@ -151,7 +151,7 @@ log_file = %(log_file)s
delayed_delete = %(delayed_delete)s
[pipeline:glance-api]
pipeline = versionnegotiation apiv1app
pipeline = versionnegotiation context apiv1app
[pipeline:versions]
pipeline = versionsapp
@ -164,6 +164,9 @@ paste.app_factory = glance.api.v1:app_factory
[filter:versionnegotiation]
paste.filter_factory = glance.api.middleware.version_negotiation:filter_factory
[filter:context]
paste.filter_factory = glance.common.context:filter_factory
"""
@ -193,8 +196,14 @@ log_file = %(log_file)s
sql_connection = %(sql_connection)s
sql_idle_timeout = 3600
[app:glance-registry]
[pipeline:glance-registry]
pipeline = context registryapp
[app:registryapp]
paste.app_factory = glance.registry.server:app_factory
[filter:context]
paste.filter_factory = glance.common.context:filter_factory
"""

@ -29,6 +29,7 @@ import stubout
import webob
import glance.common.client
from glance.common import context
from glance.common import exception
from glance.registry import server as rserver
from glance.api import v1 as server
@ -172,7 +173,8 @@ def stub_out_registry_and_store_server(stubs):
"sqlite://")
options = {'sql_connection': sql_connection, 'verbose': VERBOSE,
'debug': DEBUG}
res = self.req.get_response(rserver.API(options))
api = context.ContextMiddleware(rserver.API(options), options)
res = self.req.get_response(api)
# httplib.Response has a read() method...fake it out
def fake_reader():
@ -223,7 +225,8 @@ def stub_out_registry_and_store_server(stubs):
'registry_port': '9191',
'default_store': 'file',
'filesystem_store_datadir': FAKE_FILESYSTEM_ROOTDIR}
res = self.req.get_response(server.API(options))
api = context.ContextMiddleware(server.API(options), options)
res = self.req.get_response(api)
# httplib.Response has a read() method...fake it out
def fake_reader():

@ -26,6 +26,7 @@ import stubout
import webob
from glance.api import v1 as server
from glance.common import context
from glance.registry import server as rserver
import glance.registry.db.api
from tests import stubs
@ -41,8 +42,9 @@ class TestRegistryAPI(unittest.TestCase):
stubs.stub_out_registry_and_store_server(self.stubs)
stubs.stub_out_registry_db_image_api(self.stubs)
stubs.stub_out_filesystem_backend()
self.api = rserver.API({'verbose': VERBOSE,
'debug': DEBUG})
options = {'verbose': VERBOSE,
'debug': DEBUG}
self.api = context.ContextMiddleware(rserver.API(options), options)
def tearDown(self):
"""Clear the test environment"""
@ -1458,7 +1460,7 @@ class TestGlanceAPI(unittest.TestCase):
'sql_connection': sql_connection,
'default_store': 'file',
'filesystem_store_datadir': stubs.FAKE_FILESYSTEM_ROOTDIR}
self.api = server.API(options)
self.api = context.ContextMiddleware(server.API(options), options)
def tearDown(self):
"""Clear the test environment"""

148
tests/unit/test_context.py Normal file

@ -0,0 +1,148 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010-2011 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from glance.common import context
class FakeImage(object):
"""
Fake image for providing the image attributes needed for
TestContext.
"""
def __init__(self, owner, is_public):
self.owner = owner
self.is_public = is_public
class TestContext(unittest.TestCase):
def do_visible(self, exp_res, img_owner, img_public, **kwargs):
"""
Perform a context test. Creates a (fake) image with the
specified owner and is_public attributes, then creates a
context with the given keyword arguments and expects exp_res
as the result of an is_image_visible() call on the context.
"""
img = FakeImage(img_owner, img_public)
ctx = context.RequestContext(**kwargs)
self.assertEqual(ctx.is_image_visible(img), exp_res)
def test_empty_public(self):
"""
Tests that an empty context (with is_admin set to True) can
access an image with is_public set to True.
"""
self.do_visible(True, None, True, is_admin=True)
def test_empty_public_owned(self):
"""
Tests that an empty context (with is_admin set to True) can
access an owned image with is_public set to True.
"""
self.do_visible(True, 'pattieblack', True, is_admin=True)
def test_empty_private(self):
"""
Tests that an empty context (with is_admin set to True) can
access an image with is_public set to False.
"""
self.do_visible(True, None, False, is_admin=True)
def test_empty_private_owned(self):
"""
Tests that an empty context (with is_admin set to True) can
access an owned image with is_public set to False.
"""
self.do_visible(True, 'pattieblack', False, is_admin=True)
def test_anon_public(self):
"""
Tests that an anonymous context (with is_admin set to False)
can access an image with is_public set to True.
"""
self.do_visible(True, None, True)
def test_anon_public_owned(self):
"""
Tests that an anonymous context (with is_admin set to False)
can access an owned image with is_public set to True.
"""
self.do_visible(True, 'pattieblack', True)
def test_anon_private(self):
"""
Tests that an anonymous context (with is_admin set to False)
can access an unowned image with is_public set to False.
"""
self.do_visible(True, None, False)
def test_anon_private_owned(self):
"""
Tests that an anonymous context (with is_admin set to False)
cannot access an owned image with is_public set to False.
"""
self.do_visible(False, 'pattieblack', False)
def test_auth_public(self):
"""
Tests that an authenticated context (with is_admin set to
False) can access an image with is_public set to True.
"""
self.do_visible(True, None, True, tenant='froggy')
def test_auth_public_unowned(self):
"""
Tests that an authenticated context (with is_admin set to
False) can access an image (which it does not own) with
is_public set to True.
"""
self.do_visible(True, 'pattieblack', True, tenant='froggy')
def test_auth_public_owned(self):
"""
Tests that an authenticated context (with is_admin set to
False) can access an image (which it does own) with is_public
set to True.
"""
self.do_visible(True, 'pattieblack', True, tenant='pattieblack')
def test_auth_private(self):
"""
Tests that an authenticated context (with is_admin set to
False) can access an image with is_public set to False.
"""
self.do_visible(True, None, False, tenant='froggy')
def test_auth_private_unowned(self):
"""
Tests that an authenticated context (with is_admin set to
False) cannot access an image (which it does not own) with
is_public set to False.
"""
self.do_visible(False, 'pattieblack', False, tenant='froggy')
def test_auth_private_owned(self):
"""
Tests that an authenticated context (with is_admin set to
False) can access an image (which it does own) with is_public
set to False.
"""
self.do_visible(True, 'pattieblack', False, tenant='pattieblack')