Merge "Add timeouts to murano-agent calls"
This commit is contained in:
commit
2bfe4e3e69
21
murano/common/exceptions.py
Normal file
21
murano/common/exceptions.py
Normal file
@ -0,0 +1,21 @@
|
||||
# Copyright (c) 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
class TimeoutException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class PolicyViolationException(Exception):
|
||||
pass
|
@ -23,13 +23,13 @@ import uuid
|
||||
import eventlet.event
|
||||
|
||||
import murano.common.config as config
|
||||
import murano.common.exceptions as exceptions
|
||||
import murano.common.messaging as messaging
|
||||
import murano.dsl.murano_class as murano_class
|
||||
import murano.dsl.murano_object as murano_object
|
||||
import murano.dsl.yaql_expression as yaql_expression
|
||||
import murano.engine.system.common as common
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -42,7 +42,8 @@ class Agent(murano_object.MuranoObject):
|
||||
def initialize(self, _context, host):
|
||||
self._enabled = False
|
||||
if config.CONF.engine.disable_murano_agent:
|
||||
LOG.debug("murano-agent is disabled by the server")
|
||||
LOG.debug('Use of murano-agent is disallowed '
|
||||
'by the server configuration')
|
||||
return
|
||||
|
||||
self._environment = self._get_environment(_context)
|
||||
@ -62,7 +63,8 @@ class Agent(murano_object.MuranoObject):
|
||||
def prepare(self):
|
||||
# (sjmc7) - turn this into a no-op if agents are disabled
|
||||
if config.CONF.engine.disable_murano_agent:
|
||||
LOG.debug("murano-agent is disabled by the server")
|
||||
LOG.debug('Use of murano-agent is disallowed '
|
||||
'by the server configuration')
|
||||
return
|
||||
|
||||
with common.create_rmq_client() as client:
|
||||
@ -73,11 +75,11 @@ class Agent(murano_object.MuranoObject):
|
||||
|
||||
def _check_enabled(self):
|
||||
if config.CONF.engine.disable_murano_agent:
|
||||
raise AgentException(
|
||||
"Use of murano-agent is disallowed "
|
||||
"by the server configuration")
|
||||
raise exceptions.PolicyViolationException(
|
||||
'Use of murano-agent is disallowed '
|
||||
'by the server configuration')
|
||||
|
||||
def _send(self, template, wait_results):
|
||||
def _send(self, template, wait_results, timeout):
|
||||
"""Send a message over the MQ interface."""
|
||||
msg_id = template.get('ID', uuid.uuid4().hex)
|
||||
if wait_results:
|
||||
@ -94,22 +96,32 @@ class Agent(murano_object.MuranoObject):
|
||||
client.send(message=msg, key=self._queue)
|
||||
|
||||
if wait_results:
|
||||
result = event.wait()
|
||||
try:
|
||||
with eventlet.Timeout(timeout):
|
||||
result = event.wait()
|
||||
|
||||
except eventlet.Timeout:
|
||||
listener.unsubscribe(msg_id)
|
||||
raise exceptions.TimeoutException(
|
||||
'The Agent does not respond'
|
||||
'within {0} seconds'.format(timeout))
|
||||
|
||||
if not result:
|
||||
return None
|
||||
|
||||
if result.get('FormatVersion', '1.0.0').startswith('1.'):
|
||||
return self._process_v1_result(result)
|
||||
|
||||
else:
|
||||
return self._process_v2_result(result)
|
||||
|
||||
else:
|
||||
return None
|
||||
|
||||
def call(self, template, resources):
|
||||
def call(self, template, resources, timeout=600):
|
||||
self._check_enabled()
|
||||
plan = self.buildExecutionPlan(template, resources)
|
||||
return self._send(plan, True)
|
||||
return self._send(plan, True, timeout)
|
||||
|
||||
def send(self, template, resources):
|
||||
self._check_enabled()
|
||||
@ -124,6 +136,19 @@ class Agent(murano_object.MuranoObject):
|
||||
self._check_enabled()
|
||||
return self._send(plan, False)
|
||||
|
||||
def isReady(self, timeout=100):
|
||||
try:
|
||||
self.waitReady(timeout)
|
||||
except exceptions.TimeoutException:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def waitReady(self, timeout=100):
|
||||
self._check_enabled()
|
||||
template = {'Body': 'return', 'FormatVersion': '2.0.0', 'Scripts': {}}
|
||||
self.call(template, False, timeout)
|
||||
|
||||
def _process_v1_result(self, result):
|
||||
if result['IsException']:
|
||||
raise AgentException(dict(self._get_exception_info(
|
||||
|
@ -16,6 +16,7 @@
|
||||
import eventlet
|
||||
|
||||
import murano.common.config as config
|
||||
import murano.common.exceptions as exceptions
|
||||
import murano.dsl.murano_class as murano_class
|
||||
import murano.dsl.murano_object as murano_object
|
||||
import murano.engine.system.common as common
|
||||
@ -39,6 +40,16 @@ class AgentListener(murano_object.MuranoObject):
|
||||
self._subscriptions = {}
|
||||
self._receive_thread = None
|
||||
|
||||
def _check_enabled(self):
|
||||
if config.CONF.engine.disable_murano_agent:
|
||||
LOG.debug(
|
||||
'Use of murano-agent is disallowed '
|
||||
'by the server configuration')
|
||||
|
||||
raise exceptions.PolicyViolationException(
|
||||
'Use of murano-agent is disallowed '
|
||||
'by the server configuration')
|
||||
|
||||
@property
|
||||
def enabled(self):
|
||||
return self._enabled
|
||||
@ -66,13 +77,13 @@ class AgentListener(murano_object.MuranoObject):
|
||||
self._receive_thread = None
|
||||
|
||||
def subscribe(self, message_id, event):
|
||||
if config.CONF.engine.disable_murano_agent:
|
||||
raise AgentListenerException(
|
||||
"Use of murano-agent is disallowed "
|
||||
"by the server configuration")
|
||||
|
||||
self._check_enabled()
|
||||
self._subscriptions[message_id] = event
|
||||
|
||||
def unsubscribe(self, message_id):
|
||||
self._check_enabled()
|
||||
self._subscriptions.pop(message_id)
|
||||
|
||||
def _receive(self):
|
||||
with common.create_rmq_client() as client:
|
||||
client.declare(self._results_queue, enable_ha=True, ttl=86400000)
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import mock
|
||||
|
||||
from murano.common import exceptions as exc
|
||||
from murano.engine.system import agent
|
||||
from murano.engine.system import agent_listener
|
||||
from murano.tests.unit.dsl.foundation import object_model as om
|
||||
@ -42,7 +43,7 @@ class TestAgentListener(test_case.DslTestCase):
|
||||
self.override_config('disable_murano_agent', True, 'engine')
|
||||
al = self.runner.testAgentListener()
|
||||
self.assertFalse(al.enabled)
|
||||
self.assertRaises(agent_listener.AgentListenerException,
|
||||
self.assertRaises(exc.PolicyViolationException,
|
||||
al.subscribe, 'msgid', 'event')
|
||||
|
||||
|
||||
@ -76,7 +77,7 @@ class TestAgent(test_case.DslTestCase):
|
||||
self.override_config('disable_murano_agent', True, 'engine')
|
||||
a = self.runner.testAgent()
|
||||
self.assertFalse(a.enabled)
|
||||
self.assertRaises(agent.AgentException, a.call, {}, None)
|
||||
self.assertRaises(agent.AgentException, a.send, {}, None)
|
||||
self.assertRaises(agent.AgentException, a.callRaw, {})
|
||||
self.assertRaises(agent.AgentException, a.sendRaw, {})
|
||||
self.assertRaises(exc.PolicyViolationException, a.call, {}, None)
|
||||
self.assertRaises(exc.PolicyViolationException, a.send, {}, None)
|
||||
self.assertRaises(exc.PolicyViolationException, a.callRaw, {})
|
||||
self.assertRaises(exc.PolicyViolationException, a.sendRaw, {})
|
||||
|
Loading…
Reference in New Issue
Block a user