Merge "Functional test enhancement for lock busting"
This commit is contained in:
commit
8da47cb5ca
@ -118,6 +118,15 @@ class ImportActionWrapper(object):
|
||||
# NOTE(danms): Do not save the image if we raised in context
|
||||
return
|
||||
|
||||
# NOTE(danms): If we were in the middle of a long-running
|
||||
# set_data() where someone else stole our lock, we may race
|
||||
# with them to update image locations and erase one that
|
||||
# someone else is working on. Checking the task lock here
|
||||
# again is not perfect exclusion, but in lieu of actual
|
||||
# thread-safe location updating, this at least reduces the
|
||||
# likelihood of that happening.
|
||||
self.assert_task_lock()
|
||||
|
||||
if self._image_previous_status != self._image.status:
|
||||
LOG.debug('Image %(image_id)s status changing from '
|
||||
'%(old_status)s to %(new_status)s',
|
||||
|
@ -1551,11 +1551,13 @@ class SynchronousAPIBase(test_utils.BaseTestCase):
|
||||
def setup_stores(self):
|
||||
"""Configures multiple backend stores.
|
||||
|
||||
This configures the API with two file-backed stores (store1
|
||||
and store2) as well as a os_glance_staging_store for imports.
|
||||
This configures the API with three file-backed stores (store1,
|
||||
store2, and store3) as well as a os_glance_staging_store for
|
||||
imports.
|
||||
|
||||
"""
|
||||
self.config(enabled_backends={'store1': 'file', 'store2': 'file'})
|
||||
self.config(enabled_backends={'store1': 'file', 'store2': 'file',
|
||||
'store3': 'file'})
|
||||
glance_store.register_store_opts(CONF,
|
||||
reserved_stores=wsgi.RESERVED_STORES)
|
||||
self.config(default_backend='store1',
|
||||
@ -1564,6 +1566,8 @@ class SynchronousAPIBase(test_utils.BaseTestCase):
|
||||
group='store1')
|
||||
self.config(filesystem_store_datadir=self._store_dir('store2'),
|
||||
group='store2')
|
||||
self.config(filesystem_store_datadir=self._store_dir('store3'),
|
||||
group='store3')
|
||||
self.config(filesystem_store_datadir=self._store_dir('staging'),
|
||||
group='os_glance_staging_store')
|
||||
|
||||
|
@ -17,7 +17,9 @@ import datetime
|
||||
from testtools import content as ttc
|
||||
import time
|
||||
from unittest import mock
|
||||
import uuid
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import fixture as time_fixture
|
||||
from oslo_utils import units
|
||||
@ -26,6 +28,9 @@ from glance.tests import functional
|
||||
from glance.tests import utils as test_utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestImageImportLocking(functional.SynchronousAPIBase):
|
||||
def _import_copy(self, image_id, stores):
|
||||
"""Do an import of image_id to the given stores."""
|
||||
@ -91,9 +96,16 @@ class TestImageImportLocking(functional.SynchronousAPIBase):
|
||||
|
||||
return image_id
|
||||
|
||||
def _get_image_import_task(self, image_id, task_id=None):
|
||||
if task_id is None:
|
||||
image = self.api_get('/v2/images/%s' % image_id).json
|
||||
task_id = image['os_glance_import_task']
|
||||
|
||||
return self.api_get('/v2/tasks/%s' % task_id).json
|
||||
|
||||
def _test_import_copy(self, warp_time=False):
|
||||
self.start_server()
|
||||
state = {}
|
||||
state = {'want_run': True}
|
||||
|
||||
# Create and import an image with no pipeline stall
|
||||
image_id = self._create_and_import(stores=['store1'])
|
||||
@ -101,9 +113,12 @@ class TestImageImportLocking(functional.SynchronousAPIBase):
|
||||
# Set up a fake data pipeline that will stall until we are ready
|
||||
# to unblock it
|
||||
def slow_fake_set_data(data_iter, backend=None, set_active=True):
|
||||
while True:
|
||||
me = str(uuid.uuid4())
|
||||
while state['want_run'] == True:
|
||||
LOG.info('fake_set_data running %s' % me)
|
||||
state['running'] = True
|
||||
time.sleep(0.1)
|
||||
LOG.info('fake_set_data ended %s' % me)
|
||||
|
||||
# Constrain oslo timeutils time so we can manipulate it
|
||||
tf = time_fixture.TimeFixture()
|
||||
@ -121,7 +136,7 @@ class TestImageImportLocking(functional.SynchronousAPIBase):
|
||||
|
||||
# Wait to make sure the data stream gets started
|
||||
for i in range(0, 10):
|
||||
if state:
|
||||
if 'running' in state:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
@ -130,6 +145,10 @@ class TestImageImportLocking(functional.SynchronousAPIBase):
|
||||
self.assertTrue(state.get('running', False),
|
||||
'slow_fake_set_data() never ran')
|
||||
|
||||
# Make sure the task is available and in the right state
|
||||
first_import_task = self._get_image_import_task(image_id)
|
||||
self.assertEqual('processing', first_import_task['status'])
|
||||
|
||||
# If we're warping time, then advance the clock by two hours
|
||||
if warp_time:
|
||||
tf.advance_time_delta(datetime.timedelta(hours=2))
|
||||
@ -137,7 +156,8 @@ class TestImageImportLocking(functional.SynchronousAPIBase):
|
||||
# Try a second copy-image import. If we are warping time,
|
||||
# expect the lock to be busted. If not, then we should get
|
||||
# a 409 Conflict.
|
||||
resp = self._import_copy(image_id, ['store2'])
|
||||
resp = self._import_copy(image_id, ['store3'])
|
||||
time.sleep(0.1)
|
||||
|
||||
self.addDetail('Second import response',
|
||||
ttc.text_content(str(resp)))
|
||||
@ -146,11 +166,75 @@ class TestImageImportLocking(functional.SynchronousAPIBase):
|
||||
else:
|
||||
self.assertEqual(409, resp.status_code)
|
||||
|
||||
self.addDetail('First task', ttc.text_content(str(first_import_task)))
|
||||
|
||||
# Grab the current import task for our image, and also
|
||||
# refresh our first task object
|
||||
second_import_task = self._get_image_import_task(image_id)
|
||||
first_import_task = self._get_image_import_task(
|
||||
image_id, first_import_task['id'])
|
||||
|
||||
if warp_time:
|
||||
# If we warped time and busted the lock, then we expect the
|
||||
# current task to be different than the original task
|
||||
self.assertNotEqual(first_import_task['id'],
|
||||
second_import_task['id'])
|
||||
# The original task should be failed with the expected message
|
||||
self.assertEqual('failure', first_import_task['status'])
|
||||
self.assertEqual('Expired lock preempted',
|
||||
first_import_task['message'])
|
||||
# The new task should be off and running
|
||||
self.assertEqual('processing', second_import_task['status'])
|
||||
else:
|
||||
# We didn't bust the lock, so we didn't start another
|
||||
# task, so confirm it hasn't changed
|
||||
self.assertEqual(first_import_task['id'],
|
||||
second_import_task['id'])
|
||||
|
||||
return image_id, state
|
||||
|
||||
def test_import_copy_locked(self):
|
||||
self._test_import_copy(warp_time=False)
|
||||
|
||||
def test_import_copy_bust_lock(self):
|
||||
self._test_import_copy(warp_time=True)
|
||||
image_id, state = self._test_import_copy(warp_time=True)
|
||||
|
||||
# After the import has busted the lock, wait for our
|
||||
# new import to start. We used a different store than
|
||||
# the stalled task so we can tell the difference.
|
||||
for i in range(0, 10):
|
||||
image = self.api_get('/v2/images/%s' % image_id).json
|
||||
if image['stores'] == 'store1,store3':
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
# After completion, we expect store1 (original) and store3 (new)
|
||||
# and that the other task is still stuck importing
|
||||
# FIXME(danms): The stuck importing state needs fixing
|
||||
image = self.api_get('/v2/images/%s' % image_id).json
|
||||
self.assertEqual('store1,store3', image['stores'])
|
||||
self.assertEqual('store2', image['os_glance_importing_to_stores'])
|
||||
self.assertEqual('', image['os_glance_failed_import'])
|
||||
|
||||
# Free up the stalled task and give eventlet time to let it
|
||||
# play out the rest of the task
|
||||
state['want_run'] = False
|
||||
for i in range(0, 10):
|
||||
image = self.api_get('/v2/images/%s' % image_id).json
|
||||
time.sleep(0.1)
|
||||
|
||||
# After that, we expect everything to be cleaned up and in the
|
||||
# terminal state that we expect.
|
||||
image = self.api_get('/v2/images/%s' % image_id).json
|
||||
self.assertEqual('', image.get('os_glance_import_task', ''))
|
||||
# FIXME(danms): With the strict import lock checking in
|
||||
# ImportActionWrapper, we lose the ability to update
|
||||
# importing_to_stores after our lock has been stolen. We
|
||||
# should probably do something about that in the lock-busting
|
||||
# code. We would expect this in that case:
|
||||
# self.assertEqual('', image['os_glance_importing_to_stores'])
|
||||
self.assertEqual('', image['os_glance_failed_import'])
|
||||
self.assertEqual('store1,store3', image['stores'])
|
||||
|
||||
@mock.patch('oslo_utils.timeutils.StopWatch.expired', new=lambda x: True)
|
||||
def test_import_task_status(self):
|
||||
|
@ -612,7 +612,8 @@ class TestImportActionWrapper(test_utils.BaseTestCase):
|
||||
TASK_ID1)
|
||||
with wrapper as action:
|
||||
self.assertIsInstance(action, import_flow._ImportActions)
|
||||
mock_repo.get.assert_called_once_with(IMAGE_ID1)
|
||||
mock_repo.get.assert_has_calls([mock.call(IMAGE_ID1),
|
||||
mock.call(IMAGE_ID1)])
|
||||
mock_repo.save.assert_called_once_with(
|
||||
mock_repo.get.return_value,
|
||||
mock_repo.get.return_value.status)
|
||||
|
Loading…
Reference in New Issue
Block a user