Fix race condition deleting in-progress stack

If we call stack_delete on a stack with an operation in progress, we kill
any existing delete thread that is running. However, we don't wait for that
thread to die before starting a new thread to delete the stack again. If
any part of the cleanup operation in the old thread (i.e. handling of the
GreenthreadExit exception) causes a context switch (which is likely), other
threads can start working while the cleanup is still in progress. This
could create race conditions like the one in bug 1328983.

Avoid this problem by making sure we wait for all threads in a thread group
to die before continuing. (Note that this means the user's API call is
blocking on the cleanup of the old thread. This is sadly unavoidable for
now, but should probably be fixed in the future by stopping the old thread
from the new delete thread.)

This was suggested earlier, but removed without explanation between
patchsets 11 and 12 of I188e43ad88b98da7d1a08269189aaefa57c36df2, which
implemented deletion of in-progress stacks with locks:
https://review.openstack.org/#/c/63002/11..12/heat/engine/service.py

Also remove the call to stack_lock_release(), which was a hack around the
fact that wait() does not wait for link()ed functions - eventlet sends the
exit event (that wait() is waiting on) before resolving links. Instead, add
another link to the end of the list to indicate that links have all been
run. This should eliminate "Lock was already released" messages in the
logs.

Change-Id: I2e4561cbe29ab10554da67859df8c2db0854dd38
This commit is contained in:
Zane Bitter 2014-06-20 12:37:01 -04:00
parent 59be7efed1
commit ae2b47d8fd
2 changed files with 42 additions and 20 deletions

@ -11,6 +11,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import functools
import json
import os
@ -152,8 +153,22 @@ class ThreadGroupManager(object):
def stop(self, stack_id, graceful=False):
'''Stop any active threads on a stack.'''
if stack_id in self.groups:
self.groups[stack_id].stop(graceful)
del self.groups[stack_id]
threadgroup = self.groups.pop(stack_id)
threads = threadgroup.threads[:]
threadgroup.stop(graceful)
threadgroup.wait()
# Wait for link()ed functions (i.e. lock release)
links_done = dict((th, False) for th in threads)
def mark_done(gt, th):
links_done[th] = True
for th in threads:
th.link(mark_done, th)
while not all(links_done.values()):
eventlet.sleep()
class StackWatch(object):
@ -717,14 +732,6 @@ class EngineService(service.Service):
raise exception.StopActionFailed(stack_name=stack.name,
engine_id=acquire_result)
# If the lock isn't released here, then the call to
# start_with_lock below will raise an ActionInProgress
# exception. Ideally, we wouldn't be calling another
# release() here, since it should be called as soon as the
# ThreadGroup is stopped. But apparently there's a race
# between release() the next call to lock.acquire().
db_api.stack_lock_release(stack.id, acquire_result)
# There may be additional resources that we don't know about
# if an update was in-progress when the stack was stopped, so
# reload the stack from the database.

@ -12,6 +12,7 @@
# under the License.
import eventlet
import functools
import json
import sys
@ -2968,16 +2969,6 @@ class ThreadGroupManagerTest(HeatTestCase):
**self.fkwargs)
self.assertEqual(self.tg_mock.add_thread(), ret)
def test_tgm_stop(self):
stack_id = 'test'
thm = service.ThreadGroupManager()
thm.start(stack_id, self.f, *self.fargs, **self.fkwargs)
thm.stop(stack_id, True)
self.tg_mock.stop.assert_called_with(True)
self.assertNotIn(stack_id, thm.groups)
def test_tgm_add_timer(self):
stack_id = 'test'
@ -2988,3 +2979,27 @@ class ThreadGroupManagerTest(HeatTestCase):
self.tg_mock.add_timer.assert_called_with(
self.cfg_mock.CONF.periodic_interval,
self.f, *self.fargs, **self.fkwargs)
class ThreadGroupManagerStopTest(HeatTestCase):
def test_tgm_stop(self):
stack_id = 'test'
done = []
def function():
while True:
eventlet.sleep()
def linked(gt, thread):
for i in range(10):
eventlet.sleep()
done.append(thread)
thm = service.ThreadGroupManager()
thread = thm.start(stack_id, function)
thread.link(linked, thread)
thm.stop(stack_id)
self.assertIn(thread, done)
self.assertNotIn(stack_id, thm.groups)