From 1d7f0b1279f88de0931bedaab5871c5b87cef590 Mon Sep 17 00:00:00 2001 From: Stan Lagun Date: Thu, 26 Feb 2015 01:02:58 +0300 Subject: [PATCH] Fixes agent call may hanged upon action call When action called AgentListener automatically starts listening upon first EP send to the agent. But Environment.deploy() were the only place where AgentLister was stopped. So when action other than Environment.deploy() was called there is no one to stop listener. Thus on each action call new listener on the same RabbitMQ queue was started causing listeners to steal messages from each other. Agent.call() that never received response from agent caused deployment/action hang Change-Id: I466bbf60c35e0d6a0bc6e831010e552aaa12eaab --- meta/io.murano/Classes/Environment.yaml | 14 +++++------- murano/common/engine.py | 3 +++ murano/engine/environment.py | 29 +++++++++++++++++++++++++ murano/engine/system/agent.py | 21 +++++++++--------- murano/engine/system/agent_listener.py | 18 +++++++++++---- murano/tests/unit/dsl/test_agent.py | 21 +++++++++++------- 6 files changed, 74 insertions(+), 32 deletions(-) diff --git a/meta/io.murano/Classes/Environment.yaml b/meta/io.murano/Classes/Environment.yaml index c8711eb56..db6dc472c 100644 --- a/meta/io.murano/Classes/Environment.yaml +++ b/meta/io.murano/Classes/Environment.yaml @@ -59,15 +59,11 @@ Methods: deploy: Usage: Action Body: - Try: - - $.agentListener.start() - - If: len($.applications) = 0 - Then: - - $.stack.delete() - Else: - - $.applications.pselect($.deploy()) - Finally: - - $.agentListener.stop() + - If: len($.applications) = 0 + Then: + - $.stack.delete() + Else: + - $.applications.pselect($.deploy()) destroy: Body: diff --git a/murano/common/engine.py b/murano/common/engine.py index b6bd7b584..84fdef610 100644 --- a/murano/common/engine.py +++ b/murano/common/engine.py @@ -114,6 +114,7 @@ class TaskExecutor(object): obj = exc.load(self.model) try: + self.environment.start() # Skip execution of action in case of no action is provided. # Model will be just loaded, cleaned-up and unloaded. # Most of the time this is used for deletion of environments. @@ -127,6 +128,8 @@ class TaskExecutor(object): reporter = status_reporter.StatusReporter() reporter.initialize(obj) reporter.report_error(obj, str(e)) + finally: + self.environment.finish() return results_serializer.serialize(obj, exc) diff --git a/murano/engine/environment.py b/murano/engine/environment.py index e696a4677..53638788c 100644 --- a/murano/engine/environment.py +++ b/murano/engine/environment.py @@ -13,8 +13,37 @@ # See the License for the specific language governing permissions and # limitations under the License. +import murano.openstack.common.log as logging + + +LOG = logging.getLogger(__name__) + class Environment(object): def __init__(self): self.token = None self.tenant_id = None + self._set_up_list = [] + self._tear_down_list = [] + + def on_session_start(self, delegate): + self._set_up_list.append(delegate) + + def on_session_finish(self, delegate): + self._tear_down_list.append(delegate) + + def start(self): + for delegate in self._set_up_list: + try: + delegate() + except Exception as e: + LOG.exception(e) + self._set_up_list = [] + + def finish(self): + for delegate in self._tear_down_list: + try: + delegate() + except Exception as e: + LOG.exception(e) + self._tear_down_list = [] diff --git a/murano/engine/system/agent.py b/murano/engine/system/agent.py index 3d5fd2669..796a454bb 100644 --- a/murano/engine/system/agent.py +++ b/murano/engine/system/agent.py @@ -77,14 +77,13 @@ class Agent(murano_object.MuranoObject): "Use of murano-agent is disallowed " "by the server configuration") - def _send(self, template, wait_results): + def _send(self, template, wait_results, _context): """Send a message over the MQ interface.""" msg_id = template.get('ID', uuid.uuid4().hex) if wait_results: event = eventlet.event.Event() listener = self._environment.agentListener - listener.subscribe(msg_id, event) - listener.start() + listener.subscribe(msg_id, event, _context) msg = messaging.Message() msg.body = template @@ -106,23 +105,23 @@ class Agent(murano_object.MuranoObject): else: return None - def call(self, template, resources): + def call(self, template, resources, _context): self._check_enabled() plan = self.buildExecutionPlan(template, resources) - return self._send(plan, True) + return self._send(plan, True, _context) - def send(self, template, resources): + def send(self, template, resources, _context): self._check_enabled() plan = self.buildExecutionPlan(template, resources) - return self._send(plan, False) + return self._send(plan, False, _context) - def callRaw(self, plan): + def callRaw(self, plan, _context): self._check_enabled() - return self._send(plan, True) + return self._send(plan, True, _context) - def sendRaw(self, plan): + def sendRaw(self, plan, _context): self._check_enabled() - return self._send(plan, False) + return self._send(plan, False, _context) def _process_v1_result(self, result): if result['IsException']: diff --git a/murano/engine/system/agent_listener.py b/murano/engine/system/agent_listener.py index ebc7b5ddd..630839d39 100644 --- a/murano/engine/system/agent_listener.py +++ b/murano/engine/system/agent_listener.py @@ -14,8 +14,10 @@ # limitations under the License. import eventlet +import greenlet import murano.common.config as config +from murano.dsl import helpers import murano.dsl.murano_class as murano_class import murano.dsl.murano_object as murano_object import murano.engine.system.common as common @@ -47,13 +49,15 @@ class AgentListener(murano_object.MuranoObject): def queueName(self): return self._results_queue - def start(self): + def start(self, _context): if config.CONF.engine.disable_murano_agent: # Noop LOG.debug("murano-agent is disabled by the server") return if self._receive_thread is None: + helpers.get_environment(_context).on_session_finish( + lambda: self.stop()) self._receive_thread = eventlet.spawn(self._receive) def stop(self): @@ -63,16 +67,22 @@ class AgentListener(murano_object.MuranoObject): return if self._receive_thread is not None: - self._receive_thread.kill() - self._receive_thread = None + try: + self._receive_thread.kill() + self._receive_thread.wait() + except greenlet.GreenletExit: + pass + finally: + self._receive_thread = None - def subscribe(self, message_id, event): + def subscribe(self, message_id, event, _context): if config.CONF.engine.disable_murano_agent: raise AgentListenerException( "Use of murano-agent is disallowed " "by the server configuration") self._subscriptions[message_id] = event + self.start(_context) def _receive(self): with common.create_rmq_client() as client: diff --git a/murano/tests/unit/dsl/test_agent.py b/murano/tests/unit/dsl/test_agent.py index 82a346baa..5a7b46440 100644 --- a/murano/tests/unit/dsl/test_agent.py +++ b/murano/tests/unit/dsl/test_agent.py @@ -15,6 +15,9 @@ import mock +import yaql.context + +from murano.engine import environment from murano.engine.system import agent from murano.engine.system import agent_listener from murano.tests.unit.dsl.foundation import object_model as om @@ -30,12 +33,14 @@ class TestAgentListener(test_case.DslTestCase): model = om.Object( 'AgentListenerTests') self.runner = self.new_runner(model) + self.context = yaql.context.Context() + self.context.set_data(environment.Environment(), '?environment') def test_listener_enabled(self): self.override_config('disable_murano_agent', False, 'engine') al = self.runner.testAgentListener() self.assertTrue(al.enabled) - al.subscribe('msgid', 'event') + al.subscribe('msgid', 'event', self.context) self.assertEqual({'msgid': 'event'}, al._subscriptions) def test_listener_disabled(self): @@ -43,7 +48,7 @@ class TestAgentListener(test_case.DslTestCase): al = self.runner.testAgentListener() self.assertFalse(al.enabled) self.assertRaises(agent_listener.AgentListenerException, - al.subscribe, 'msgid', 'event') + al.subscribe, 'msgid', 'event', None) class TestAgent(test_case.DslTestCase): @@ -69,14 +74,14 @@ class TestAgent(test_case.DslTestCase): with mock.patch(agent_cls + '._send') as s: s.return_value = mock.MagicMock() - a.sendRaw({}) - s.assert_called_with({}, False) + a.sendRaw({}, None) + s.assert_called_with({}, False, None) def test_agent_disabled(self): self.override_config('disable_murano_agent', True, 'engine') a = self.runner.testAgent() self.assertFalse(a.enabled) - self.assertRaises(agent.AgentException, a.call, {}, None) - self.assertRaises(agent.AgentException, a.send, {}, None) - self.assertRaises(agent.AgentException, a.callRaw, {}) - self.assertRaises(agent.AgentException, a.sendRaw, {}) + self.assertRaises(agent.AgentException, a.call, {}, None, None) + self.assertRaises(agent.AgentException, a.send, {}, None, None) + self.assertRaises(agent.AgentException, a.callRaw, {}, None) + self.assertRaises(agent.AgentException, a.sendRaw, {}, None)