Add dynamic result message routing

This patch adds a dynamic result routing for static Agent.
This feature is designed to support Murano Agent which is
installed on external out of OpenStack system.
It adds _ReplyTo entry in the message body as well as supports
message.reply_to property.

Implements blueprint static-agent

Co-Authored-by: Alexander Tivelkov <ativelkov@mirantis.com>

Change-Id: Ief4c25a7fb63fc092f68f08b9665bde03fbe7752
This commit is contained in:
Georgy Okrokvertskhov 2015-04-15 18:04:25 -07:00 committed by Alexander Tivelkov
parent 06cc634e5c
commit 107c2077be
4 changed files with 24 additions and 1 deletions

View File

@ -101,8 +101,11 @@ class MuranoAgent(service.Service):
msg = messaging.Message()
msg.body = result
msg.id = result.get('SourceID')
routing_key = CONF.rabbitmq.result_routing_key
if ('ReplyTo' in result) and CONF.enable_dynamic_result_queue:
routing_key = result.pop('ReplyTo')
mq.send(message=msg,
key=CONF.rabbitmq.result_routing_key,
key=routing_key,
exchange=CONF.rabbitmq.result_exchange)
return True
@ -145,6 +148,8 @@ class MuranoAgent(service.Service):
def _handle_message(self, msg):
if 'ID' not in msg.body and msg.id:
msg.body['ID'] = msg.id
if 'ReplyTo' not in msg.body and msg.reply_to:
msg.body['ReplyTo'] = msg.reply_to
try:
self._verify_plan(msg.body)
self._queue.put_execution_plan(msg.body)
@ -152,6 +157,9 @@ class MuranoAgent(service.Service):
try:
execution_result = ex_result.ExecutionResult.from_error(
err, bunch.Bunch(msg.body))
if ('ReplyTo' in msg.body) and \
CONF.enable_dynamic_result_queue:
execution_result['ReplyTo'] = msg.body.get('ReplyTo')
self._send_result(execution_result)
except ValueError:

View File

@ -30,6 +30,12 @@ storage_opt = [
help='Directory to store execution plans')
]
message_routing_opt = [
cfg.BoolOpt('enable_dynamic_result_queue', help='Enable taking dynamic '
'result queue from task field reply_to',
default=False)
]
rabbit_opts = [
cfg.StrOpt('host',
help='The RabbitMQ broker address which used for communication '
@ -64,6 +70,7 @@ rabbit_opts = [
]
CONF.register_cli_opts(storage_opt)
CONF.register_cli_opts(message_routing_opt)
CONF.register_opts(rabbit_opts, group='rabbitmq')
logging.register_options(CONF)

View File

@ -26,8 +26,10 @@ class Message(object):
self._message_handle = message_handle
if message_handle:
self.id = message_handle.properties.get('message_id')
self._reply_to = message_handle.properties.get('reply_to')
else:
self.id = None
self._reply_to = None
try:
if message_handle:
@ -54,5 +56,9 @@ class Message(object):
def id(self, value):
self._id = value or ''
@property
def reply_to(self):
return self._reply_to
def ack(self):
self._message_handle.ack()

View File

@ -73,6 +73,8 @@ class ExecutionPlanQueue(object):
def put_execution_result(self, result, execution_plan):
timestamp = execution_plan['_timestamp']
if 'ReplyTo' in execution_plan:
result['ReplyTo'] = execution_plan.get('ReplyTo')
path = os.path.join(
self._plans_folder, timestamp,
ExecutionPlanQueue.result_filename)