Merge "Improve unit test coverage per bug/934566."

This commit is contained in:
Jenkins
2012-02-22 21:43:31 +00:00
committed by Gerrit Code Review

View File

@@ -30,6 +30,7 @@ from nova import test
from nova import db from nova import db
from nova import flags from nova import flags
from nova import log from nova import log
from nova import utils
from nova.virt.libvirt import imagecache from nova.virt.libvirt import imagecache
from nova.virt.libvirt import utils as virtutils from nova.virt.libvirt import utils as virtutils
@@ -279,12 +280,21 @@ class ImageCacheManagerTestCase(test.TestCase):
(base_file2, True, False), (base_file2, True, False),
(base_file3, False, True)]) (base_file3, False, True)])
def _intercept_log_messages(self):
mylog = log.getLogger()
stream = cStringIO.StringIO()
handler = logging.StreamHandler(stream)
handler.setFormatter(log.LegacyNovaFormatter())
mylog.logger.addHandler(handler)
return mylog, handler, stream
def test_verify_checksum(self): def test_verify_checksum(self):
testdata = ('OpenStack Software delivers a massively scalable cloud ' testdata = ('OpenStack Software delivers a massively scalable cloud '
'operating system.') 'operating system.')
img = {'container_format': 'ami', 'id': '42'} img = {'container_format': 'ami', 'id': '42'}
self.flags(checksum_base_images=True) self.flags(checksum_base_images=True)
mylog, handler, stream = self._intercept_log_messages()
try: try:
dirname = tempfile.mkdtemp() dirname = tempfile.mkdtemp()
@@ -313,6 +323,8 @@ class ImageCacheManagerTestCase(test.TestCase):
image_cache_manager = imagecache.ImageCacheManager() image_cache_manager = imagecache.ImageCacheManager()
res = image_cache_manager._verify_checksum(img, fname) res = image_cache_manager._verify_checksum(img, fname)
self.assertFalse(res) self.assertFalse(res)
self.assertNotEqual(stream.getvalue().find('image verification '
'failed'), -1)
# Checksum file missing # Checksum file missing
os.remove('%s.sha1' % fname) os.remove('%s.sha1' % fname)
@@ -326,6 +338,7 @@ class ImageCacheManagerTestCase(test.TestCase):
finally: finally:
shutil.rmtree(dirname) shutil.rmtree(dirname)
mylog.logger.removeHandler(handler)
def _make_base_file(checksum=True): def _make_base_file(checksum=True):
"""Make a base file for testing.""" """Make a base file for testing."""
@@ -333,14 +346,17 @@ class ImageCacheManagerTestCase(test.TestCase):
dirname = tempfile.mkdtemp() dirname = tempfile.mkdtemp()
fname = os.path.join(dirname, 'aaa') fname = os.path.join(dirname, 'aaa')
f = open(fname, 'w') base_file = open(fname, 'w')
f.write('data') base_file.write('data')
f.close() base_file.close()
base_file = open(fname, 'r')
if checksum: if checksum:
f = open('%s.sha1' % fname, 'w') checksum_file = open('%s.sha1' % fname, 'w')
f.close() checksum_file.write(utils.hash_file(base_file))
checksum_file.close()
base_file.close()
return dirname, fname return dirname, fname
def test_remove_base_file(self): def test_remove_base_file(self):
@@ -405,15 +421,8 @@ class ImageCacheManagerTestCase(test.TestCase):
def test_remove_base_file_oserror(self): def test_remove_base_file_oserror(self):
dirname = tempfile.mkdtemp() dirname = tempfile.mkdtemp()
# Intercept log messages
mylog = log.getLogger()
stream = cStringIO.StringIO()
handler = logging.StreamHandler(stream)
handler.setFormatter(log.LegacyNovaFormatter())
mylog.logger.addHandler(handler)
fname = os.path.join(dirname, 'aaa') fname = os.path.join(dirname, 'aaa')
mylog, handler, stream = self._intercept_log_messages()
try: try:
os.mkdir(fname) os.mkdir(fname)
@@ -430,3 +439,134 @@ class ImageCacheManagerTestCase(test.TestCase):
finally: finally:
shutil.rmtree(dirname) shutil.rmtree(dirname)
mylog.logger.removeHandler(handler) mylog.logger.removeHandler(handler)
def test_handle_base_image_unused(self):
img = {'container_format': 'ami',
'id': '123',
'uuid': '1234-4567-2378'}
dirname, fname = self._make_base_file()
os.utime(fname, (-1, time.time() - 3601))
try:
image_cache_manager = imagecache.ImageCacheManager()
image_cache_manager.unexplained_images = [fname]
image_cache_manager._handle_base_image(img, fname)
self.assertEquals(image_cache_manager.unexplained_images, [])
self.assertEquals(image_cache_manager.removable_base_files,
[fname])
self.assertEquals(image_cache_manager.corrupt_base_files, [])
finally:
shutil.rmtree(dirname)
def test_handle_base_image_used(self):
img = {'container_format': 'ami',
'id': '123',
'uuid': '1234-4567-2378'}
dirname, fname = self._make_base_file()
os.utime(fname, (-1, time.time() - 3601))
try:
image_cache_manager = imagecache.ImageCacheManager()
image_cache_manager.unexplained_images = [fname]
image_cache_manager.used_images = {'123': (1, 0, ['banana-42'])}
image_cache_manager._handle_base_image(img, fname)
self.assertEquals(image_cache_manager.unexplained_images, [])
self.assertEquals(image_cache_manager.removable_base_files, [])
self.assertEquals(image_cache_manager.corrupt_base_files, [])
finally:
shutil.rmtree(dirname)
def test_handle_base_image_used_remotely(self):
img = {'container_format': 'ami',
'id': '123',
'uuid': '1234-4567-2378'}
dirname, fname = self._make_base_file()
os.utime(fname, (-1, time.time() - 3601))
try:
image_cache_manager = imagecache.ImageCacheManager()
image_cache_manager.used_images = {'123': (0, 1, ['banana-42'])}
image_cache_manager._handle_base_image(img, None)
self.assertEquals(image_cache_manager.unexplained_images, [])
self.assertEquals(image_cache_manager.removable_base_files, [])
self.assertEquals(image_cache_manager.corrupt_base_files, [])
finally:
shutil.rmtree(dirname)
def test_handle_base_image_absent(self):
"""Ensure we warn for use of a missing base image."""
img = {'container_format': 'ami',
'id': '123',
'uuid': '1234-4567-2378'}
mylog, handler, stream = self._intercept_log_messages()
try:
image_cache_manager = imagecache.ImageCacheManager()
image_cache_manager.used_images = {'123': (1, 0, ['banana-42'])}
image_cache_manager._handle_base_image(img, None)
self.assertEquals(image_cache_manager.unexplained_images, [])
self.assertEquals(image_cache_manager.removable_base_files, [])
self.assertEquals(image_cache_manager.corrupt_base_files, [])
self.assertNotEqual(stream.getvalue().find('an absent base file'),
-1)
finally:
mylog.logger.removeHandler(handler)
def test_handle_base_image_used_missing(self):
img = {'container_format': 'ami',
'id': '123',
'uuid': '1234-4567-2378'}
dirname = tempfile.mkdtemp()
fname = os.path.join(dirname, 'aaa')
try:
image_cache_manager = imagecache.ImageCacheManager()
image_cache_manager.unexplained_images = [fname]
image_cache_manager.used_images = {'123': (1, 0, ['banana-42'])}
image_cache_manager._handle_base_image(img, fname)
self.assertEquals(image_cache_manager.unexplained_images, [])
self.assertEquals(image_cache_manager.removable_base_files, [])
self.assertEquals(image_cache_manager.corrupt_base_files, [])
finally:
shutil.rmtree(dirname)
def test_handle_base_image_checksum_fails(self):
img = {'container_format': 'ami',
'id': '123',
'uuid': '1234-4567-2378'}
dirname, fname = self._make_base_file()
try:
f = open(fname, 'w')
f.write('banana')
f.close()
image_cache_manager = imagecache.ImageCacheManager()
image_cache_manager.unexplained_images = [fname]
image_cache_manager.used_images = {'123': (1, 0, ['banana-42'])}
image_cache_manager._handle_base_image(img, fname)
self.assertEquals(image_cache_manager.unexplained_images, [])
self.assertEquals(image_cache_manager.removable_base_files, [])
self.assertEquals(image_cache_manager.corrupt_base_files,
[fname])
finally:
shutil.rmtree(dirname)