Merge "Add the config option for Oslo Messaging executor type"
This commit is contained in:
commit
910948abaa
@ -111,6 +111,14 @@ rpc_response_timeout_opt = cfg.IntOpt(
|
||||
help=_('Seconds to wait for a response from a call.')
|
||||
)
|
||||
|
||||
oslo_rpc_executor = cfg.StrOpt(
|
||||
'oslo_rpc_executor',
|
||||
default='eventlet',
|
||||
choices=['eventlet', 'blocking', 'threading'],
|
||||
help=_('Executor type used by Oslo Messaging framework. Defines how '
|
||||
'Oslo Messaging based RPC subsystem processes incoming calls.')
|
||||
)
|
||||
|
||||
expiration_token_duration = cfg.IntOpt(
|
||||
'expiration_token_duration',
|
||||
default=30,
|
||||
@ -559,6 +567,7 @@ CONF.register_opt(auth_type_opt)
|
||||
CONF.register_opt(js_impl_opt)
|
||||
CONF.register_opt(rpc_impl_opt)
|
||||
CONF.register_opt(rpc_response_timeout_opt)
|
||||
CONF.register_opt(oslo_rpc_executor)
|
||||
CONF.register_opt(expiration_token_duration)
|
||||
|
||||
CONF.register_opts(api_opts, group=API_GROUP)
|
||||
@ -596,6 +605,7 @@ default_group_opts = itertools.chain(
|
||||
js_impl_opt,
|
||||
rpc_impl_opt,
|
||||
rpc_response_timeout_opt,
|
||||
oslo_rpc_executor,
|
||||
expiration_token_duration
|
||||
]
|
||||
)
|
||||
|
@ -61,10 +61,7 @@ class EngineServer(service_base.MistralService):
|
||||
self._rpc_server = rpc.get_rpc_server_driver()(cfg.CONF.engine)
|
||||
self._rpc_server.register_endpoint(self)
|
||||
|
||||
# Note(ddeja): Engine needs to be run in default (blocking) mode
|
||||
# since using another mode may lead to a deadlock.
|
||||
# See https://review.openstack.org/#/c/356343 for more info.
|
||||
self._rpc_server.run(executor='blocking')
|
||||
self._rpc_server.run(executor=cfg.CONF.oslo_rpc_executor)
|
||||
|
||||
self._notify_started('Engine server started.')
|
||||
|
||||
|
@ -112,7 +112,7 @@ def _check_and_complete(wf_ex_id):
|
||||
else 4
|
||||
)
|
||||
|
||||
# Rescheduling this check may not happen if erros are
|
||||
# Rescheduling this check may not happen if errors are
|
||||
# raised in the business logic. If the error is DB related
|
||||
# and not considered fatal (e.g. disconnect, deadlock), the
|
||||
# retry annotation around the method will ensure that the
|
||||
|
@ -168,7 +168,7 @@ class RPCServer(object):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def run(self, executor='blocking'):
|
||||
def run(self, executor='eventlet'):
|
||||
"""Runs the RPC server.
|
||||
|
||||
:param executor: Executor used to process incoming requests. Different
|
||||
|
@ -83,7 +83,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
||||
"""Return whether server is running."""
|
||||
return self._running.is_set()
|
||||
|
||||
def run(self, executor='blocking'):
|
||||
def run(self, executor='eventlet'):
|
||||
if self._thread is None:
|
||||
self._thread = threading.Thread(target=self._run, args=(executor,))
|
||||
self._thread.daemon = True
|
||||
|
@ -35,7 +35,7 @@ class OsloRPCServer(rpc.RPCServer):
|
||||
def register_endpoint(self, endpoint):
|
||||
self.endpoints.append(endpoint)
|
||||
|
||||
def run(self, executor='blocking'):
|
||||
def run(self, executor='eventlet'):
|
||||
target = messaging.Target(
|
||||
topic=self.topic,
|
||||
server=self.server_id
|
||||
|
Loading…
Reference in New Issue
Block a user