Improve unit test coverage per bug/934566.

bug/934566 identified an error which should have been caught by
more complete unit test coverage. This review extends unit test
coverage to include handle_base_image and is one of a series I
will be sending.

Change-Id: I287fc50ea6a92239f11a107f65da84d3ff0c8b3b
This commit is contained in:
Michael Still
2012-02-18 14:22:02 +11:00
parent cc747f1814
commit e1cf4a71af

View File

@@ -30,6 +30,7 @@ from nova import test
from nova import db from nova import db
from nova import flags from nova import flags
from nova import log from nova import log
from nova import utils
from nova.virt.libvirt import imagecache from nova.virt.libvirt import imagecache
from nova.virt.libvirt import utils as virtutils from nova.virt.libvirt import utils as virtutils
@@ -279,12 +280,21 @@ class ImageCacheManagerTestCase(test.TestCase):
(base_file2, True, False), (base_file2, True, False),
(base_file3, False, True)]) (base_file3, False, True)])
def _intercept_log_messages(self):
mylog = log.getLogger()
stream = cStringIO.StringIO()
handler = logging.StreamHandler(stream)
handler.setFormatter(log.LegacyNovaFormatter())
mylog.logger.addHandler(handler)
return mylog, handler, stream
def test_verify_checksum(self): def test_verify_checksum(self):
testdata = ('OpenStack Software delivers a massively scalable cloud ' testdata = ('OpenStack Software delivers a massively scalable cloud '
'operating system.') 'operating system.')
img = {'container_format': 'ami', 'id': '42'} img = {'container_format': 'ami', 'id': '42'}
self.flags(checksum_base_images=True) self.flags(checksum_base_images=True)
mylog, handler, stream = self._intercept_log_messages()
try: try:
dirname = tempfile.mkdtemp() dirname = tempfile.mkdtemp()
@@ -313,6 +323,8 @@ class ImageCacheManagerTestCase(test.TestCase):
image_cache_manager = imagecache.ImageCacheManager() image_cache_manager = imagecache.ImageCacheManager()
res = image_cache_manager._verify_checksum(img, fname) res = image_cache_manager._verify_checksum(img, fname)
self.assertFalse(res) self.assertFalse(res)
self.assertNotEqual(stream.getvalue().find('image verification '
'failed'), -1)
# Checksum file missing # Checksum file missing
os.remove('%s.sha1' % fname) os.remove('%s.sha1' % fname)
@@ -326,6 +338,7 @@ class ImageCacheManagerTestCase(test.TestCase):
finally: finally:
shutil.rmtree(dirname) shutil.rmtree(dirname)
mylog.logger.removeHandler(handler)
def _make_base_file(checksum=True): def _make_base_file(checksum=True):
"""Make a base file for testing.""" """Make a base file for testing."""
@@ -333,14 +346,17 @@ class ImageCacheManagerTestCase(test.TestCase):
dirname = tempfile.mkdtemp() dirname = tempfile.mkdtemp()
fname = os.path.join(dirname, 'aaa') fname = os.path.join(dirname, 'aaa')
f = open(fname, 'w') base_file = open(fname, 'w')
f.write('data') base_file.write('data')
f.close() base_file.close()
base_file = open(fname, 'r')
if checksum: if checksum:
f = open('%s.sha1' % fname, 'w') checksum_file = open('%s.sha1' % fname, 'w')
f.close() checksum_file.write(utils.hash_file(base_file))
checksum_file.close()
base_file.close()
return dirname, fname return dirname, fname
def test_remove_base_file(self): def test_remove_base_file(self):
@@ -405,15 +421,8 @@ class ImageCacheManagerTestCase(test.TestCase):
def test_remove_base_file_oserror(self): def test_remove_base_file_oserror(self):
dirname = tempfile.mkdtemp() dirname = tempfile.mkdtemp()
# Intercept log messages
mylog = log.getLogger()
stream = cStringIO.StringIO()
handler = logging.StreamHandler(stream)
handler.setFormatter(log.LegacyNovaFormatter())
mylog.logger.addHandler(handler)
fname = os.path.join(dirname, 'aaa') fname = os.path.join(dirname, 'aaa')
mylog, handler, stream = self._intercept_log_messages()
try: try:
os.mkdir(fname) os.mkdir(fname)
@@ -430,3 +439,134 @@ class ImageCacheManagerTestCase(test.TestCase):
finally: finally:
shutil.rmtree(dirname) shutil.rmtree(dirname)
mylog.logger.removeHandler(handler) mylog.logger.removeHandler(handler)
def test_handle_base_image_unused(self):
img = {'container_format': 'ami',
'id': '123',
'uuid': '1234-4567-2378'}
dirname, fname = self._make_base_file()
os.utime(fname, (-1, time.time() - 3601))
try:
image_cache_manager = imagecache.ImageCacheManager()
image_cache_manager.unexplained_images = [fname]
image_cache_manager._handle_base_image(img, fname)
self.assertEquals(image_cache_manager.unexplained_images, [])
self.assertEquals(image_cache_manager.removable_base_files,
[fname])
self.assertEquals(image_cache_manager.corrupt_base_files, [])
finally:
shutil.rmtree(dirname)
def test_handle_base_image_used(self):
img = {'container_format': 'ami',
'id': '123',
'uuid': '1234-4567-2378'}
dirname, fname = self._make_base_file()
os.utime(fname, (-1, time.time() - 3601))
try:
image_cache_manager = imagecache.ImageCacheManager()
image_cache_manager.unexplained_images = [fname]
image_cache_manager.used_images = {'123': (1, 0, ['banana-42'])}
image_cache_manager._handle_base_image(img, fname)
self.assertEquals(image_cache_manager.unexplained_images, [])
self.assertEquals(image_cache_manager.removable_base_files, [])
self.assertEquals(image_cache_manager.corrupt_base_files, [])
finally:
shutil.rmtree(dirname)
def test_handle_base_image_used_remotely(self):
img = {'container_format': 'ami',
'id': '123',
'uuid': '1234-4567-2378'}
dirname, fname = self._make_base_file()
os.utime(fname, (-1, time.time() - 3601))
try:
image_cache_manager = imagecache.ImageCacheManager()
image_cache_manager.used_images = {'123': (0, 1, ['banana-42'])}
image_cache_manager._handle_base_image(img, None)
self.assertEquals(image_cache_manager.unexplained_images, [])
self.assertEquals(image_cache_manager.removable_base_files, [])
self.assertEquals(image_cache_manager.corrupt_base_files, [])
finally:
shutil.rmtree(dirname)
def test_handle_base_image_absent(self):
"""Ensure we warn for use of a missing base image."""
img = {'container_format': 'ami',
'id': '123',
'uuid': '1234-4567-2378'}
mylog, handler, stream = self._intercept_log_messages()
try:
image_cache_manager = imagecache.ImageCacheManager()
image_cache_manager.used_images = {'123': (1, 0, ['banana-42'])}
image_cache_manager._handle_base_image(img, None)
self.assertEquals(image_cache_manager.unexplained_images, [])
self.assertEquals(image_cache_manager.removable_base_files, [])
self.assertEquals(image_cache_manager.corrupt_base_files, [])
self.assertNotEqual(stream.getvalue().find('an absent base file'),
-1)
finally:
mylog.logger.removeHandler(handler)
def test_handle_base_image_used_missing(self):
img = {'container_format': 'ami',
'id': '123',
'uuid': '1234-4567-2378'}
dirname = tempfile.mkdtemp()
fname = os.path.join(dirname, 'aaa')
try:
image_cache_manager = imagecache.ImageCacheManager()
image_cache_manager.unexplained_images = [fname]
image_cache_manager.used_images = {'123': (1, 0, ['banana-42'])}
image_cache_manager._handle_base_image(img, fname)
self.assertEquals(image_cache_manager.unexplained_images, [])
self.assertEquals(image_cache_manager.removable_base_files, [])
self.assertEquals(image_cache_manager.corrupt_base_files, [])
finally:
shutil.rmtree(dirname)
def test_handle_base_image_checksum_fails(self):
img = {'container_format': 'ami',
'id': '123',
'uuid': '1234-4567-2378'}
dirname, fname = self._make_base_file()
try:
f = open(fname, 'w')
f.write('banana')
f.close()
image_cache_manager = imagecache.ImageCacheManager()
image_cache_manager.unexplained_images = [fname]
image_cache_manager.used_images = {'123': (1, 0, ['banana-42'])}
image_cache_manager._handle_base_image(img, fname)
self.assertEquals(image_cache_manager.unexplained_images, [])
self.assertEquals(image_cache_manager.removable_base_files, [])
self.assertEquals(image_cache_manager.corrupt_base_files,
[fname])
finally:
shutil.rmtree(dirname)