Use a queue instead of an event to send messages

This has a couple of effects. One is that we can send more than one message
to an action (since the action need not terminate immediately upon
receiving a message). The other is that we only ever receive the message
once, instead of every time we check for it once it has been sent.

Change-Id: Ib3b33534c0e8b03dfbbb1acde6f7b2d3dcfa82b6
This commit is contained in:
Zane Bitter 2016-06-22 20:40:46 +02:00
parent 796cea6cf4
commit b12ec30368
6 changed files with 73 additions and 68 deletions

View File

@ -87,7 +87,7 @@ class ThreadGroupManager(object):
def __init__(self):
super(ThreadGroupManager, self).__init__()
self.groups = {}
self.events = collections.defaultdict(list)
self.msg_queues = collections.defaultdict(list)
# Create dummy service task, because when there is nothing queued
# on self.tg the process exits
@ -205,13 +205,13 @@ class ThreadGroupManager(object):
self.groups[stack_id].add_timer(cfg.CONF.periodic_interval,
func, *args, **kwargs)
def add_event(self, stack_id, event):
self.events[stack_id].append(event)
def add_msg_queue(self, stack_id, msg_queue):
self.msg_queues[stack_id].append(msg_queue)
def remove_event(self, gt, stack_id, event):
for e in self.events.pop(stack_id, []):
if e is not event:
self.add_event(stack_id, e)
def remove_msg_queue(self, gt, stack_id, msg_queue):
for q in self.msg_queues.pop(stack_id, []):
if q is not msg_queue:
self.add_msg_queue(stack_id, q)
def stop_timers(self, stack_id):
if stack_id in self.groups:
@ -220,7 +220,7 @@ class ThreadGroupManager(object):
def stop(self, stack_id, graceful=False):
"""Stop any active threads on a stack."""
if stack_id in self.groups:
self.events.pop(stack_id, None)
self.msg_queues.pop(stack_id, None)
threadgroup = self.groups.pop(stack_id)
threads = threadgroup.threads[:]
@ -239,8 +239,8 @@ class ThreadGroupManager(object):
eventlet.sleep()
def send(self, stack_id, message):
for event in self.events.pop(stack_id, []):
event.send(message)
for msg_queue in self.msg_queues.get(stack_id, []):
msg_queue.put_nowait(message)
@profiler.trace_cls("rpc")
@ -985,15 +985,15 @@ class EngineService(service.Service):
current_stack.converge_stack(template=tmpl,
new_stack=updated_stack)
else:
event = eventlet.event.Event()
msg_queue = eventlet.queue.LightQueue()
th = self.thread_group_mgr.start_with_lock(cnxt, current_stack,
self.engine_id,
current_stack.update,
updated_stack,
event=event)
th.link(self.thread_group_mgr.remove_event,
current_stack.id, event)
self.thread_group_mgr.add_event(current_stack.id, event)
msg_queue=msg_queue)
th.link(self.thread_group_mgr.remove_msg_queue,
current_stack.id, msg_queue)
self.thread_group_mgr.add_msg_queue(current_stack.id, msg_queue)
return dict(current_stack.identifier())
@context.request_context

View File

@ -14,6 +14,7 @@
import collections
import copy
import datetime
import eventlet
import functools
import itertools
import re
@ -1127,7 +1128,7 @@ class Stack(collections.Mapping):
@profiler.trace('Stack.update', hide_args=False)
@reset_state_on_error
def update(self, newstack, event=None):
def update(self, newstack, msg_queue=None):
"""Update the stack.
Compare the current stack with newstack,
@ -1142,7 +1143,7 @@ class Stack(collections.Mapping):
"""
self.updated_time = oslo_timeutils.utcnow()
updater = scheduler.TaskRunner(self.update_task, newstack,
event=event)
msg_queue=msg_queue)
updater()
@profiler.trace('Stack.converge_stack', hide_args=False)
@ -1344,7 +1345,7 @@ class Stack(collections.Mapping):
return self._convg_deps
@scheduler.wrappertask
def update_task(self, newstack, action=UPDATE, event=None):
def update_task(self, newstack, action=UPDATE, msg_queue=None):
if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE):
LOG.error(_LE("Unexpected action %s passed to update!"), action)
self.state_set(self.UPDATE, self.FAILED,
@ -1417,11 +1418,8 @@ class Stack(collections.Mapping):
updater.start(timeout=self.timeout_secs())
yield
while not updater.step():
if event is None or not event.ready():
yield
else:
message = event.wait()
self._message_parser(message)
self._check_for_message(msg_queue)
yield
finally:
self.reset_dependencies()
@ -1510,12 +1508,22 @@ class Stack(collections.Mapping):
if not cfg.CONF.encrypt_parameters_and_properties:
self.t.env.encrypted_param_names = []
def _message_parser(self, message):
@staticmethod
def _check_for_message(msg_queue):
if msg_queue is None:
return
try:
message = msg_queue.get_nowait()
except eventlet.queue.Empty:
return
if message == rpc_api.THREAD_CANCEL:
raise ForcedCancel(with_rollback=False)
elif message == rpc_api.THREAD_CANCEL_WITH_ROLLBACK:
raise ForcedCancel(with_rollback=True)
LOG.error(_LE('Unknown message "%s" received'), message)
def _delete_backup_stack(self, stack):
# Delete resources in the backup stack referred to by 'stack'

View File

@ -12,7 +12,7 @@
import uuid
from eventlet import event as grevent
import eventlet.queue
import mock
from oslo_config import cfg
from oslo_messaging.rpc import dispatcher
@ -66,8 +66,8 @@ class ServiceStackUpdateTest(common.HeatTestCase):
return_value=stk.env)
mock_validate = self.patchobject(stk, 'validate', return_value=None)
event_mock = mock.Mock()
self.patchobject(grevent, 'Event', return_value=event_mock)
msgq_mock = mock.Mock()
self.patchobject(eventlet.queue, 'LightQueue', return_value=msgq_mock)
# do update
api_args = {'timeout_mins': 60}
@ -78,7 +78,7 @@ class ServiceStackUpdateTest(common.HeatTestCase):
self.assertEqual(old_stack.identifier(), result)
self.assertIsInstance(result, dict)
self.assertTrue(result['stack_id'])
self.assertEqual([event_mock], self.man.thread_group_mgr.events)
self.assertEqual([msgq_mock], self.man.thread_group_mgr.msg_queues)
mock_tmpl.assert_called_once_with(template, files=None, env=stk.env)
mock_env.assert_called_once_with(params)
mock_stack.assert_called_once_with(
@ -118,7 +118,8 @@ class ServiceStackUpdateTest(common.HeatTestCase):
self.patchobject(templatem, 'Template', return_value=stk.t)
self.patchobject(environment, 'Environment', return_value=stk.env)
self.patchobject(stk, 'validate', return_value=None)
self.patchobject(grevent, 'Event', return_value=mock.Mock())
self.patchobject(eventlet.queue, 'LightQueue',
return_value=mock.Mock())
mock_merge = self.patchobject(self.man, '_merge_environments')
@ -149,8 +150,8 @@ class ServiceStackUpdateTest(common.HeatTestCase):
return_value=stk.t)
mock_validate = self.patchobject(stk, 'validate', return_value=None)
event_mock = mock.Mock()
self.patchobject(grevent, 'Event', return_value=event_mock)
msgq_mock = mock.Mock()
self.patchobject(eventlet.queue, 'LightQueue', return_value=msgq_mock)
# do update
api_args = {'timeout_mins': 60}
@ -162,7 +163,7 @@ class ServiceStackUpdateTest(common.HeatTestCase):
self.assertEqual(old_stack.identifier(), result)
self.assertIsInstance(result, dict)
self.assertTrue(result['stack_id'])
self.assertEqual([event_mock], self.man.thread_group_mgr.events)
self.assertEqual([msgq_mock], self.man.thread_group_mgr.msg_queues)
mock_tmpl.assert_called_once_with(self.ctx, tmpl_id)
mock_stack.assert_called_once_with(
self.ctx, stk.name, stk.t,

View File

@ -78,31 +78,31 @@ class ThreadGroupManagerTest(common.HeatTestCase):
self.cfg_mock.CONF.periodic_interval,
self.f, *self.fargs, **self.fkwargs)
def test_tgm_add_event(self):
stack_id = 'add_events_test'
def test_tgm_add_msg_queue(self):
stack_id = 'add_msg_queues_test'
e1, e2 = mock.Mock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
self.assertEqual([e1, e2], thm.events[stack_id])
thm.add_msg_queue(stack_id, e1)
thm.add_msg_queue(stack_id, e2)
self.assertEqual([e1, e2], thm.msg_queues[stack_id])
def test_tgm_remove_event(self):
stack_id = 'add_events_test'
def test_tgm_remove_msg_queue(self):
stack_id = 'add_msg_queues_test'
e1, e2 = mock.Mock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
thm.remove_event(None, stack_id, e2)
self.assertEqual([e1], thm.events[stack_id])
thm.remove_event(None, stack_id, e1)
self.assertNotIn(stack_id, thm.events)
thm.add_msg_queue(stack_id, e1)
thm.add_msg_queue(stack_id, e2)
thm.remove_msg_queue(None, stack_id, e2)
self.assertEqual([e1], thm.msg_queues[stack_id])
thm.remove_msg_queue(None, stack_id, e1)
self.assertNotIn(stack_id, thm.msg_queues)
def test_tgm_send(self):
stack_id = 'send_test'
e1, e2 = mock.MagicMock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
thm.add_msg_queue(stack_id, e1)
thm.add_msg_queue(stack_id, e2)
thm.send(stack_id, 'test_message')
@ -122,7 +122,7 @@ class ThreadGroupManagerStopTest(common.HeatTestCase):
done.append(thread)
thm = service.ThreadGroupManager()
thm.add_event(stack_id, mock.Mock())
thm.add_msg_queue(stack_id, mock.Mock())
thread = thm.start(stack_id, function)
thread.link(linked, thread)
@ -130,4 +130,4 @@ class ThreadGroupManagerStopTest(common.HeatTestCase):
self.assertIn(thread, done)
self.assertNotIn(stack_id, thm.groups)
self.assertNotIn(stack_id, thm.events)
self.assertNotIn(stack_id, thm.msg_queues)

View File

@ -319,7 +319,7 @@ class DummyThreadGroup(object):
class DummyThreadGroupManager(object):
def __init__(self):
self.events = []
self.msg_queues = []
self.messages = []
def start_with_lock(self, cnxt, stack, engine_id, func, *args, **kwargs):
@ -335,13 +335,13 @@ class DummyThreadGroupManager(object):
def send(self, stack_id, message):
self.messages.append(message)
def add_event(self, stack_id, event):
self.events.append(event)
def add_msg_queue(self, stack_id, msg_queue):
self.msg_queues.append(msg_queue)
def remove_event(self, gt, stack_id, event):
for e in self.events.pop(stack_id, []):
if e is not event:
self.add_event(stack_id, e)
def remove_msg_queue(self, gt, stack_id, msg_queue):
for q in self.msg_queues.pop(stack_id, []):
if q is not msg_queue:
self.add_event(stack_id, q)
class DummyThreadGroupMgrLogStart(DummyThreadGroupManager):

View File

@ -859,15 +859,13 @@ class StackUpdateTest(common.HeatTestCase):
template.Template(tmpl2),
disable_rollback=disable_rollback)
evt_mock = mock.MagicMock()
evt_mock.ready.return_value = True
evt_mock.wait.return_value = cancel_message
msgq_mock = mock.MagicMock()
msgq_mock.get_nowait.return_value = cancel_message
self.stack.update(updated_stack, event=evt_mock)
self.stack.update(updated_stack, msg_queue=msgq_mock)
self.assertEqual(state, self.stack.state)
evt_mock.ready.assert_called_once_with()
evt_mock.wait.assert_called_once_with()
msgq_mock.get_nowait.assert_called_once_with()
def test_update_force_cancel_no_rollback(self):
self._update_force_cancel(
@ -1064,16 +1062,14 @@ class StackUpdateTest(common.HeatTestCase):
updated_stack = stack.Stack(self.ctx, 'updated_stack',
template.Template(tmpl2),
disable_rollback=False)
evt_mock = mock.MagicMock()
evt_mock.ready.return_value = True
evt_mock.wait.return_value = 'cancel_with_rollback'
msgq_mock = mock.MagicMock()
msgq_mock.get_nowait.return_value = 'cancel_with_rollback'
self.stack.update(updated_stack, event=evt_mock)
self.stack.update(updated_stack, msg_queue=msgq_mock)
self.assertEqual((stack.Stack.ROLLBACK, stack.Stack.COMPLETE),
self.stack.state)
self.assertEqual('abc', self.stack['AResource'].properties['Foo'])
evt_mock.ready.assert_called_once_with()
evt_mock.wait.assert_called_once_with()
msgq_mock.get_nowait.assert_called_once_with()
def test_update_rollback_fail(self):
tmpl = {'HeatTemplateFormatVersion': '2012-12-12',