Fixes agent call may hanged upon action call

When action called AgentListener automatically starts listening upon
first EP send to the agent. But Environment.deploy() were the only
place where AgentLister was stopped. So when action other than
Environment.deploy() was called there is no one to stop listener.
Thus on each action call new listener on the same RabbitMQ queue
was started causing listeners to steal messages from each other.
Agent.call() that never received response from agent caused
deployment/action hang.

Change-Id: Ia778c816a0e2f57d1f694fd1f128848f61b21a2d
Closes-Bug: #1425963
This commit is contained in:
Stan Lagun 2015-02-26 17:24:24 +03:00
parent 926f15a235
commit 83ba5fa731
6 changed files with 78 additions and 31 deletions

View File

@ -63,11 +63,7 @@ Methods:
resources: {}
- $.stack.updateTemplate($minimalStack)
- $.stack.push()
- Try:
- $.agentListener.start()
- $.applications.pselect($.deploy())
Finally:
- $.agentListener.stop()
- $.applications.pselect($.deploy())
destroy:
Body:

View File

@ -135,6 +135,8 @@ class TaskExecutor(object):
self._validate_model(obj, self.action, class_loader)
try:
LOG.info(_LI('Invoking pre-execution hooks'))
self.environment.start()
# Skip execution of action in case no action is provided.
# Model will be just loaded, cleaned-up and unloaded.
# Most of the time this is used for deletion of environments.
@ -151,6 +153,9 @@ class TaskExecutor(object):
reporter = status_reporter.StatusReporter()
reporter.initialize(obj)
reporter.report_error(obj, str(e))
finally:
LOG.info(_LI('Invoking post-execution hooks'))
self.environment.finish()
result = results_serializer.serialize(obj, exc)
result['SystemData'] = self._environment.system_attributes

View File

@ -13,6 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from murano.common.i18n import _LE
import murano.openstack.common.log as logging
LOG = logging.getLogger(__name__)
class Environment(object):
def __init__(self):
@ -21,3 +27,29 @@ class Environment(object):
self.trust_id = None
self.system_attributes = {}
self.clients = None
self._set_up_list = []
self._tear_down_list = []
def on_session_start(self, delegate):
self._set_up_list.append(delegate)
def on_session_finish(self, delegate):
self._tear_down_list.append(delegate)
def start(self):
for delegate in self._set_up_list:
try:
delegate()
except Exception:
LOG.exception(_LE('Unhandled exception on invocation of '
'pre-execution hook'))
self._set_up_list = []
def finish(self):
for delegate in self._tear_down_list:
try:
delegate()
except Exception:
LOG.exception(_LE('Unhandled exception on invocation of '
'post-execution hook'))
self._tear_down_list = []

View File

@ -79,14 +79,13 @@ class Agent(murano_object.MuranoObject):
'Use of murano-agent is disallowed '
'by the server configuration')
def _send(self, template, wait_results, timeout):
def _send(self, template, wait_results, timeout, _context):
"""Send a message over the MQ interface."""
msg_id = template.get('ID', uuid.uuid4().hex)
if wait_results:
event = eventlet.event.Event()
listener = self._environment.agentListener
listener.subscribe(msg_id, event)
listener.start()
listener.subscribe(msg_id, event, _context)
msg = messaging.Message()
msg.body = template
@ -118,36 +117,36 @@ class Agent(murano_object.MuranoObject):
else:
return None
def call(self, template, resources, timeout=600):
def call(self, template, resources, _context, timeout=600):
self._check_enabled()
plan = self.buildExecutionPlan(template, resources)
return self._send(plan, True, timeout)
return self._send(plan, True, timeout, _context)
def send(self, template, resources):
def send(self, template, resources, _context):
self._check_enabled()
plan = self.buildExecutionPlan(template, resources)
return self._send(plan, False)
return self._send(plan, False, 0, _context)
def callRaw(self, plan):
def callRaw(self, plan, _context, timeout=600):
self._check_enabled()
return self._send(plan, True)
return self._send(plan, True, timeout, _context)
def sendRaw(self, plan):
def sendRaw(self, plan, _context):
self._check_enabled()
return self._send(plan, False)
return self._send(plan, False, 0, _context)
def isReady(self, timeout=100):
def isReady(self, _context, timeout=100):
try:
self.waitReady(timeout)
self.waitReady(_context, timeout)
except exceptions.TimeoutException:
return False
else:
return True
def waitReady(self, timeout=100):
def waitReady(self, _context, timeout=100):
self._check_enabled()
template = {'Body': 'return', 'FormatVersion': '2.0.0', 'Scripts': {}}
self.call(template, False, timeout)
self.call(template, False, _context, timeout)
def _process_v1_result(self, result):
if result['IsException']:

View File

@ -14,9 +14,11 @@
# limitations under the License.
import eventlet
import greenlet
import murano.common.config as config
import murano.common.exceptions as exceptions
from murano.dsl import helpers
import murano.dsl.murano_class as murano_class
import murano.dsl.murano_object as murano_object
import murano.engine.system.common as common
@ -57,13 +59,15 @@ class AgentListener(murano_object.MuranoObject):
def queueName(self):
return self._results_queue
def start(self):
def start(self, _context):
if config.CONF.engine.disable_murano_agent:
# Noop
LOG.debug("murano-agent is disabled by the server")
return
if self._receive_thread is None:
helpers.get_environment(_context).on_session_finish(
lambda: self.stop())
self._receive_thread = eventlet.spawn(self._receive)
def stop(self):
@ -74,11 +78,17 @@ class AgentListener(murano_object.MuranoObject):
if self._receive_thread is not None:
self._receive_thread.kill()
self._receive_thread = None
try:
self._receive_thread.wait()
except greenlet.GreenletExit:
pass
finally:
self._receive_thread = None
def subscribe(self, message_id, event):
def subscribe(self, message_id, event, _context):
self._check_enabled()
self._subscriptions[message_id] = event
self.start(_context)
def unsubscribe(self, message_id):
self._check_enabled()

View File

@ -15,7 +15,10 @@
import mock
import yaql.context
from murano.common import exceptions as exc
from murano.engine import environment
from murano.engine.system import agent
from murano.engine.system import agent_listener
from murano.tests.unit.dsl.foundation import object_model as om
@ -31,12 +34,14 @@ class TestAgentListener(test_case.DslTestCase):
model = om.Object(
'AgentListenerTests')
self.runner = self.new_runner(model)
self.context = yaql.context.Context()
self.context.set_data(environment.Environment(), '?environment')
def test_listener_enabled(self):
self.override_config('disable_murano_agent', False, 'engine')
al = self.runner.testAgentListener()
self.assertTrue(al.enabled)
al.subscribe('msgid', 'event')
al.subscribe('msgid', 'event', self.context)
self.assertEqual({'msgid': 'event'}, al._subscriptions)
def test_listener_disabled(self):
@ -44,7 +49,7 @@ class TestAgentListener(test_case.DslTestCase):
al = self.runner.testAgentListener()
self.assertFalse(al.enabled)
self.assertRaises(exc.PolicyViolationException,
al.subscribe, 'msgid', 'event')
al.subscribe, 'msgid', 'event', None)
class TestAgent(test_case.DslTestCase):
@ -70,14 +75,14 @@ class TestAgent(test_case.DslTestCase):
with mock.patch(agent_cls + '._send') as s:
s.return_value = mock.MagicMock()
a.sendRaw({})
s.assert_called_with({}, False)
a.sendRaw({}, None)
s.assert_called_with({}, False, 0, None)
def test_agent_disabled(self):
self.override_config('disable_murano_agent', True, 'engine')
a = self.runner.testAgent()
self.assertFalse(a.enabled)
self.assertRaises(exc.PolicyViolationException, a.call, {}, None)
self.assertRaises(exc.PolicyViolationException, a.send, {}, None)
self.assertRaises(exc.PolicyViolationException, a.callRaw, {})
self.assertRaises(exc.PolicyViolationException, a.sendRaw, {})
self.assertRaises(exc.PolicyViolationException, a.call, {}, None, None)
self.assertRaises(exc.PolicyViolationException, a.send, {}, None, None)
self.assertRaises(exc.PolicyViolationException, a.callRaw, {}, None)
self.assertRaises(exc.PolicyViolationException, a.sendRaw, {}, None)