Merge "Ack message after processing (oslo.messaging)"
This commit is contained in:
commit
85b7b9c1fa
@ -75,7 +75,9 @@ def launch_executor(transport):
|
||||
|
||||
endpoints = [rpc.ExecutorServer(executor_v2)]
|
||||
|
||||
server = messaging.get_rpc_server(
|
||||
get_rpc_server = get_rpc_server_function()
|
||||
|
||||
server = get_rpc_server(
|
||||
transport,
|
||||
target,
|
||||
endpoints,
|
||||
@ -114,7 +116,9 @@ def launch_engine(transport):
|
||||
# Setup expiration policy
|
||||
expiration_policy.setup()
|
||||
|
||||
server = messaging.get_rpc_server(
|
||||
get_rpc_server = get_rpc_server_function()
|
||||
|
||||
server = get_rpc_server(
|
||||
transport,
|
||||
target,
|
||||
endpoints,
|
||||
@ -140,6 +144,13 @@ class ThreadingWSGIServer(socketserver.ThreadingMixIn, WSGIServer):
|
||||
pass
|
||||
|
||||
|
||||
def get_rpc_server_function():
|
||||
if CONF.use_mistral_rpc:
|
||||
return rpc.get_rpc_server
|
||||
else:
|
||||
return messaging.get_rpc_server
|
||||
|
||||
|
||||
def launch_api(transport):
|
||||
host = cfg.CONF.api.host
|
||||
port = cfg.CONF.api.port
|
||||
|
@ -93,6 +93,14 @@ executor_opts = [
|
||||
help='The version of the executor.')
|
||||
]
|
||||
|
||||
rpc_option = cfg.BoolOpt(
|
||||
'use_mistral_rpc',
|
||||
default=False,
|
||||
help='Specifies whether Mistral uses modified oslo.messaging (if True)'
|
||||
' or original oslo.messaging. Modified oslo.messaging is done for'
|
||||
' acknowledgement a message after processing.'
|
||||
)
|
||||
|
||||
execution_expiration_policy_opts = [
|
||||
cfg.IntOpt('evaluation_interval',
|
||||
help='How often will the executions be evaluated '
|
||||
@ -139,6 +147,7 @@ CONF.register_opts(executor_opts, group=EXECUTOR_GROUP)
|
||||
CONF.register_opts(execution_expiration_policy_opts,
|
||||
group=EXECUTION_EXPIRATION_POLICY_GROUP)
|
||||
CONF.register_opt(wf_trace_log_name_opt)
|
||||
CONF.register_opt(rpc_option)
|
||||
CONF.register_opts(coordination_opts, group=COORDINATION_GROUP)
|
||||
|
||||
CLI_OPTS = [
|
||||
@ -172,7 +181,10 @@ def list_opts():
|
||||
(EXECUTION_EXPIRATION_POLICY_GROUP, execution_expiration_policy_opts),
|
||||
(None, itertools.chain(
|
||||
CLI_OPTS,
|
||||
[wf_trace_log_name_opt]
|
||||
[
|
||||
wf_trace_log_name_opt,
|
||||
rpc_option
|
||||
]
|
||||
))
|
||||
]
|
||||
|
||||
|
@ -17,6 +17,7 @@ from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from oslo_messaging.rpc import client
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
|
||||
from mistral import context as auth_ctx
|
||||
from mistral.engine import base
|
||||
@ -32,6 +33,35 @@ _ENGINE_CLIENT = None
|
||||
_EXECUTOR_CLIENT = None
|
||||
|
||||
|
||||
class RPCDispatcherPostAck(dispatcher.RPCDispatcher):
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
return messaging.rpc.dispatcher.dispatcher.DispatcherExecutorContext(
|
||||
incoming,
|
||||
self._dispatch_and_reply,
|
||||
executor_callback=executor_callback
|
||||
)
|
||||
|
||||
def _dispatch_and_reply(self, incoming, executor_callback):
|
||||
incoming = incoming[0]
|
||||
|
||||
super(RPCDispatcherPostAck, self)._dispatch_and_reply(
|
||||
incoming,
|
||||
executor_callback
|
||||
)
|
||||
|
||||
incoming.acknowledge()
|
||||
|
||||
|
||||
def get_rpc_server(transport, target, endpoints, executor='blocking',
|
||||
serializer=None):
|
||||
dispatcher = RPCDispatcherPostAck(target, endpoints, serializer)
|
||||
return messaging.server.MessageHandlingServer(
|
||||
transport,
|
||||
dispatcher,
|
||||
executor
|
||||
)
|
||||
|
||||
|
||||
def cleanup():
|
||||
"""Intended to be used by tests to recreate all RPC related objects."""
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user