Merge "Add an image proxy to handle stored image data"

This commit is contained in:
Jenkins 2012-11-25 20:16:46 +00:00 committed by Gerrit Code Review
commit e6ae6e428c
2 changed files with 122 additions and 0 deletions

View File

@ -22,6 +22,7 @@ import time
from glance.common import exception
from glance.common import utils
import glance.context
import glance.domain
from glance.openstack.common import cfg
from glance.openstack.common import importutils
import glance.openstack.common.log as logging
@ -298,3 +299,42 @@ def set_acls(context, location_uri, public=False, read_tenants=[],
write_tenants=write_tenants)
except NotImplementedError:
LOG.debug(_("Skipping store.set_acls... not implemented."))
class ImageRepoProxy(glance.domain.ImageRepoProxy):
def __init__(self, context, store_api, image_repo):
self.context = context
self.store_api = store_api
self.image_repo = image_repo
super(ImageRepoProxy, self).__init__(image_repo)
def get(self, *args, **kwargs):
image = self.image_repo.get(*args, **kwargs)
return ImageProxy(image, self.context, self.store_api)
def list(self, *args, **kwargs):
images = self.image_repo.list(*args, **kwargs)
return [ImageProxy(i, self.context, self.store_api)
for i in images]
class ImageProxy(glance.domain.ImageProxy):
def __init__(self, image, context, store_api):
self.image = image
self.context = context
self.store_api = store_api
super(ImageProxy, self).__init__(image)
def delete(self):
self.image.delete()
if self.image.location:
if CONF.delayed_delete:
self.image.status = 'pending_delete'
self.store_api.schedule_delayed_delete_from_backend(
self.image.location, self.image.image_id)
else:
self.store_api.safe_delete_from_backend(self.image.location,
self.context,
self.image.image_id)

View File

@ -0,0 +1,82 @@
# Copyright 2012 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from glance.common import exception
import glance.store
from glance.tests.unit import utils as unit_test_utils
from glance.tests import utils
BASE_URI = 'swift+http://storeurl.com/container'
UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
class ImageRepoStub(object):
def get(self, *args, **kwargs):
return 'image_from_get'
def list(self, *args, **kwargs):
return ['image_from_list_0', 'image_from_list_1']
class ImageStub(object):
def __init__(self, image_id, status, location):
self.image_id = image_id
self.status = status
self.location = location
def delete(self):
self.status = 'deleted'
class TestStoreImage(utils.BaseTestCase):
def setUp(self):
location = '%s/%s' % (BASE_URI, UUID1)
self.image_stub = ImageStub(UUID1, 'active', location)
self.image_repo_stub = ImageRepoStub()
self.store_api = unit_test_utils.FakeStoreAPI()
super(TestStoreImage, self).setUp()
def test_image_delete(self):
image = glance.store.ImageProxy(self.image_stub, {}, self.store_api)
self.assertEquals(image.status, 'active')
self.store_api.get_from_backend({}, image.location) # no exception
image.delete()
self.assertEquals(image.status, 'deleted')
self.assertRaises(exception.NotFound,
self.store_api.get_from_backend, {}, image.location)
def test_image_delayed_delete(self):
self.config(delayed_delete=True)
image = glance.store.ImageProxy(self.image_stub, {}, self.store_api)
self.assertEquals(image.status, 'active')
image.delete()
self.assertEquals(image.status, 'pending_delete')
self.store_api.get_from_backend({}, image.location) # no exception
def test_image_repo_get(self):
image_repo = glance.store.ImageRepoProxy({}, self.store_api,
self.image_repo_stub)
image = image_repo.get(UUID1)
self.assertTrue(isinstance(image, glance.store.ImageProxy))
self.assertEqual(image.image, 'image_from_get')
def test_image_repo_list(self):
image_repo = glance.store.ImageRepoProxy({}, self.store_api,
self.image_repo_stub)
images = image_repo.list()
for i, image in enumerate(images):
self.assertTrue(isinstance(image, glance.store.ImageProxy))
self.assertEqual(image.image, 'image_from_list_%d' % i)