Fix launch process of Mistral components

* Fixed the bug #1622534
* Introduced MistralService class extending oslo.service.Service
  that all Mistral components running standalone should extend
* Refactored engine, executor and event engine with MistralService
* Moved most of the startup logic from launch.cmd to individual
  corresponding components
* Cluster membership is now under control of MistralService
* Fixed test_join.test_full_join_with_conditions() which previously
  had a bug and we were just 'lucky' that it passed due to a
  different work of scheduler
* Fixed a number of test cases
* Other minor changes

TODO:
* We now use many launchers (from oslo.service) to launch
  services whereas we could use just one which is recommended by
  oslo. However, we can't do that because of the api service
  that uses many workers. We'll need to look at how to refactor it
  moving forward.
* Write tests for MistralService and its derrived classes
* Address a number of TODO comments.

Closes-Bug: 1622534
Change-Id: I34ba6a8b8caf8bea17109e0f259085b373eb6d45
This commit is contained in:
Renat Akhmerov 2016-11-22 11:24:41 +07:00
parent 196ee5a7af
commit a5d284aa4f
31 changed files with 717 additions and 448 deletions

View File

@ -21,9 +21,9 @@ import pecan
from mistral.api import access_control
from mistral import config as m_config
from mistral import context as ctx
from mistral import coordination
from mistral.db.v2 import api as db_api_v2
from mistral.engine.rpc_backend import rpc
from mistral.service import coordination
from mistral.services import periodic
@ -68,6 +68,7 @@ def setup_app(config=None):
# Set up access control.
app = access_control.setup(app)
# TODO(rakhmerov): need to get rid of this call.
# Set up RPC related flags in config
rpc.get_transport()

View File

@ -21,10 +21,12 @@ import wsmeext.pecan as wsme_pecan
from mistral.api import access_control as acl
from mistral.api.controllers.v2 import resources
# TODO(rakhmerov): invalid dependency, a REST controller must not depend on
# a launch script.
from mistral.cmd import launch
from mistral import context
from mistral import coordination
from mistral import exceptions as exc
from mistral.service import coordination
from mistral.utils import rest_utils

View File

@ -26,8 +26,9 @@ class WSGIService(service.ServiceBase):
def __init__(self, name):
self.name = name
self.app = app.setup_app()
self.workers = (cfg.CONF.api.api_workers or
processutils.get_worker_count())
self.workers = (
cfg.CONF.api.api_workers or processutils.get_worker_count()
)
self.server = wsgi.Server(
cfg.CONF,
@ -49,7 +50,3 @@ class WSGIService(service.ServiceBase):
def reset(self):
self.server.reset()
def process_launcher():
return service.ProcessLauncher(cfg.CONF)

View File

@ -39,18 +39,14 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'mistral', '__init__.py')):
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import service
from mistral.api import service as mistral_service
from mistral.api import service as api_service
from mistral import config
from mistral.db.v2 import api as db_api
from mistral.engine import default_engine as def_eng
from mistral.engine import default_executor as def_executor
from mistral.engine import engine_server
from mistral.engine import executor_server
from mistral.engine.rpc_backend import rpc
from mistral.services import event_engine
from mistral.services import expiration_policy
from mistral.services import scheduler
from mistral.utils import profiler
from mistral.utils import rpc_utils
from mistral.event_engine import event_engine_server
from mistral import version
@ -58,84 +54,48 @@ CONF = cfg.CONF
def launch_executor():
profiler.setup('mistral-executor', cfg.CONF.executor.host)
executor_v2 = def_executor.DefaultExecutor(rpc.get_engine_client())
executor_endpoint = rpc.ExecutorServer(executor_v2)
executor_server = rpc.get_rpc_server_driver()(
rpc_utils.get_rpc_info_from_oslo(CONF.executor)
)
executor_server.register_endpoint(executor_endpoint)
executor_v2.register_membership()
try:
executor_server.run(executor='threading')
except (KeyboardInterrupt, SystemExit):
pass
finally:
print("Stopping executor service...")
launcher = service.ServiceLauncher(CONF)
launcher.launch_service(executor_server.get_oslo_service())
launcher.wait()
except RuntimeError as e:
sys.stderr.write("ERROR: %s\n" % e)
sys.exit(1)
def launch_engine():
profiler.setup('mistral-engine', cfg.CONF.engine.host)
engine_v2 = def_eng.DefaultEngine(rpc.get_engine_client())
engine_endpoint = rpc.EngineServer(engine_v2)
# Setup scheduler in engine.
db_api.setup_db()
scheduler.setup()
# Setup expiration policy
expiration_policy.setup()
engine_server = rpc.get_rpc_server_driver()(
rpc_utils.get_rpc_info_from_oslo(CONF.engine)
)
engine_server.register_endpoint(engine_endpoint)
engine_v2.register_membership()
try:
# Note(ddeja): Engine needs to be run in default (blocking) mode
# since using another mode may lead to deadlock.
# See https://review.openstack.org/#/c/356343/
# for more info.
engine_server.run()
except (KeyboardInterrupt, SystemExit):
pass
finally:
print("Stopping engine service...")
launcher = service.ServiceLauncher(CONF)
launcher.launch_service(engine_server.get_oslo_service())
launcher.wait()
except RuntimeError as e:
sys.stderr.write("ERROR: %s\n" % e)
sys.exit(1)
def launch_event_engine():
profiler.setup('mistral-event-engine', cfg.CONF.event_engine.host)
event_eng = event_engine.EventEngine(rpc.get_engine_client())
endpoint = rpc.EventEngineServer(event_eng)
event_engine_server = rpc.get_rpc_server_driver()(
rpc_utils.get_rpc_info_from_oslo(CONF.event_engine)
)
event_engine_server.register_endpoint(endpoint)
event_eng.register_membership()
try:
event_engine_server.run()
except (KeyboardInterrupt, SystemExit):
pass
finally:
print("Stopping event_engine service...")
launcher = service.ServiceLauncher(CONF)
launcher.launch_service(event_engine_server.get_oslo_service())
launcher.wait()
except RuntimeError as e:
sys.stderr.write("ERROR: %s\n" % e)
sys.exit(1)
def launch_api():
launcher = mistral_service.process_launcher()
server = mistral_service.WSGIService('mistral_api')
launcher = service.ProcessLauncher(cfg.CONF)
server = api_service.WSGIService('mistral_api')
launcher.launch_service(server, workers=server.workers)
launcher.wait()
@ -144,8 +104,6 @@ def launch_any(options):
threads = [eventlet.spawn(LAUNCH_OPTIONS[option])
for option in options]
print('Server started.')
[thread.wait() for thread in threads]

View File

@ -16,6 +16,7 @@
import contextlib
import sys
import threading
from oslo_config import cfg
from oslo_db import exception as db_exc
@ -42,6 +43,10 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
_SCHEMA_LOCK = threading.RLock()
_initialized = False
def get_backend():
"""Consumed by openstack common code.
@ -52,20 +57,33 @@ def get_backend():
def setup_db():
try:
models.Workbook.metadata.create_all(b.get_engine())
except sa.exc.OperationalError as e:
raise exc.DBError("Failed to setup database: %s" % e)
global _initialized
with _SCHEMA_LOCK:
if _initialized:
return
try:
models.Workbook.metadata.create_all(b.get_engine())
_initialized = True
except sa.exc.OperationalError as e:
raise exc.DBError("Failed to setup database: %s" % e)
def drop_db():
global _facade
global _initialized
try:
models.Workbook.metadata.drop_all(b.get_engine())
_facade = None
except Exception as e:
raise exc.DBError("Failed to drop database: %s" % e)
with _SCHEMA_LOCK:
if not _initialized:
return
try:
models.Workbook.metadata.drop_all(b.get_engine())
_initialized = False
except Exception as e:
raise exc.DBError("Failed to drop database: %s" % e)
# Transaction management.

View File

@ -17,7 +17,6 @@
from oslo_log import log as logging
from osprofiler import profiler
from mistral import coordination
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import action_handler
@ -36,17 +35,13 @@ LOG = logging.getLogger(__name__)
# the submodules are referenced.
class DefaultEngine(base.Engine, coordination.Service):
def __init__(self, engine_client):
self._engine_client = engine_client
coordination.Service.__init__(self, 'engine_group')
class DefaultEngine(base.Engine):
@action_queue.process
@u.log_exec(LOG)
@profiler.trace('engine-start-workflow')
def start_workflow(self, wf_identifier, wf_input, description='',
**params):
with db_api.transaction():
wf_ex = wf_handler.start_workflow(
wf_identifier,

View File

@ -17,8 +17,8 @@ from oslo_log import log as logging
from osprofiler import profiler
from mistral.actions import action_factory as a_f
from mistral import coordination
from mistral.engine import base
from mistral.engine.rpc_backend import rpc
from mistral.utils import inspect_utils as i_u
from mistral.workflow import utils as wf_utils
@ -26,11 +26,9 @@ from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
class DefaultExecutor(base.Executor, coordination.Service):
def __init__(self, engine_client):
self._engine_client = engine_client
coordination.Service.__init__(self, 'executor_group')
class DefaultExecutor(base.Executor):
def __init__(self):
self._engine_client = rpc.get_engine_client()
@profiler.trace('executor-run-action')
def run_action(self, action_ex_id, action_class_str, attributes,

View File

@ -0,0 +1,250 @@
# Copyright 2016 - Nokia Networks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from mistral import config as cfg
from mistral.db.v2 import api as db_api
from mistral.engine import default_engine
from mistral.engine.rpc_backend import rpc
from mistral.service import base as service_base
from mistral.services import expiration_policy
from mistral.services import scheduler
from mistral.utils import profiler as profiler_utils
from mistral.utils import rpc_utils
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
class EngineServer(service_base.MistralService):
"""Engine server.
This class manages engine life-cycle and gets registered as an RPC
endpoint to process engine specific calls. It also registers a
cluster member associated with this instance of engine.
"""
def __init__(self, engine, setup_profiler=True):
super(EngineServer, self).__init__('engine_group', setup_profiler)
self.engine = engine
self._rpc_server = None
self._scheduler = None
self._expiration_policy_tg = None
def start(self):
super(EngineServer, self).start()
db_api.setup_db()
self._scheduler = scheduler.start()
self._expiration_policy_tg = expiration_policy.setup()
if self._setup_profiler:
profiler_utils.setup('mistral-engine', cfg.CONF.engine.host)
# Initialize and start RPC server.
self._rpc_server = rpc.get_rpc_server_driver()(
rpc_utils.get_rpc_info_from_oslo(cfg.CONF.engine)
)
self._rpc_server.register_endpoint(self)
# Note(ddeja): Engine needs to be run in default (blocking) mode
# since using another mode may leads to deadlock.
# See https://review.openstack.org/#/c/356343 for more info.
self._rpc_server.run(executor='blocking')
self._notify_started('Engine server started.')
def stop(self, graceful=False):
super(EngineServer, self).stop(graceful)
if self._scheduler:
scheduler.stop_scheduler(self._scheduler, graceful)
if self._expiration_policy_tg:
self._expiration_policy_tg.stop(graceful)
if self._rpc_server:
self._rpc_server.stop(graceful)
def start_workflow(self, rpc_ctx, workflow_identifier, workflow_input,
description, params):
"""Receives calls over RPC to start workflows on engine.
:param rpc_ctx: RPC request context.
:param workflow_identifier: Workflow definition identifier.
:param workflow_input: Workflow input.
:param description: Workflow execution description.
:param params: Additional workflow type specific parameters.
:return: Workflow execution.
"""
LOG.info(
"Received RPC request 'start_workflow'[rpc_ctx=%s,"
" workflow_identifier=%s, workflow_input=%s, description=%s, "
"params=%s]"
% (rpc_ctx, workflow_identifier, workflow_input, description,
params)
)
return self.engine.start_workflow(
workflow_identifier,
workflow_input,
description,
**params
)
def start_action(self, rpc_ctx, action_name,
action_input, description, params):
"""Receives calls over RPC to start actions on engine.
:param rpc_ctx: RPC request context.
:param action_name: name of the Action.
:param action_input: input dictionary for Action.
:param description: description of new Action execution.
:param params: extra parameters to run Action.
:return: Action execution.
"""
LOG.info(
"Received RPC request 'start_action'[rpc_ctx=%s,"
" name=%s, input=%s, description=%s, params=%s]"
% (rpc_ctx, action_name, action_input, description, params)
)
return self.engine.start_action(
action_name,
action_input,
description,
**params
)
def on_task_state_change(self, rpc_ctx, task_ex_id, state,
state_info=None):
return self.engine.on_task_state_change(task_ex_id, state, state_info)
def on_action_complete(self, rpc_ctx, action_ex_id, result_data,
result_error, wf_action):
"""Receives RPC calls to communicate action result to engine.
:param rpc_ctx: RPC request context.
:param action_ex_id: Action execution id.
:param result_data: Action result data.
:param result_error: Action result error.
:param wf_action: True if given id points to a workflow execution.
:return: Action execution.
"""
result = wf_utils.Result(result_data, result_error)
LOG.info(
"Received RPC request 'on_action_complete'[rpc_ctx=%s,"
" action_ex_id=%s, result=%s]" % (rpc_ctx, action_ex_id, result)
)
return self.engine.on_action_complete(action_ex_id, result, wf_action)
def pause_workflow(self, rpc_ctx, execution_id):
"""Receives calls over RPC to pause workflows on engine.
:param rpc_ctx: Request context.
:param execution_id: Workflow execution id.
:return: Workflow execution.
"""
LOG.info(
"Received RPC request 'pause_workflow'[rpc_ctx=%s,"
" execution_id=%s]" % (rpc_ctx, execution_id)
)
return self.engine.pause_workflow(execution_id)
def rerun_workflow(self, rpc_ctx, task_ex_id, reset=True, env=None):
"""Receives calls over RPC to rerun workflows on engine.
:param rpc_ctx: RPC request context.
:param task_ex_id: Task execution id.
:param reset: If true, then purge action execution for the task.
:param env: Environment variables to update.
:return: Workflow execution.
"""
LOG.info(
"Received RPC request 'rerun_workflow'[rpc_ctx=%s, "
"task_ex_id=%s]" % (rpc_ctx, task_ex_id)
)
return self.engine.rerun_workflow(task_ex_id, reset, env)
def resume_workflow(self, rpc_ctx, wf_ex_id, env=None):
"""Receives calls over RPC to resume workflows on engine.
:param rpc_ctx: RPC request context.
:param wf_ex_id: Workflow execution id.
:param env: Environment variables to update.
:return: Workflow execution.
"""
LOG.info(
"Received RPC request 'resume_workflow'[rpc_ctx=%s, "
"wf_ex_id=%s]" % (rpc_ctx, wf_ex_id)
)
return self.engine.resume_workflow(wf_ex_id, env)
def stop_workflow(self, rpc_ctx, execution_id, state, message=None):
"""Receives calls over RPC to stop workflows on engine.
Sets execution state to SUCCESS or ERROR. No more tasks will be
scheduled. Running tasks won't be killed, but their results
will be ignored.
:param rpc_ctx: RPC request context.
:param execution_id: Workflow execution id.
:param state: State assigned to the workflow. Permitted states are
SUCCESS or ERROR.
:param message: Optional information string.
:return: Workflow execution.
"""
LOG.info(
"Received RPC request 'stop_workflow'[rpc_ctx=%s, execution_id=%s,"
" state=%s, message=%s]" % (rpc_ctx, execution_id, state, message)
)
return self.engine.stop_workflow(execution_id, state, message)
def rollback_workflow(self, rpc_ctx, execution_id):
"""Receives calls over RPC to rollback workflows on engine.
:param rpc_ctx: RPC request context.
:return: Workflow execution.
"""
LOG.info(
"Received RPC request 'rollback_workflow'[rpc_ctx=%s,"
" execution_id=%s]" % (rpc_ctx, execution_id)
)
return self.engine.rollback_workflow(execution_id)
def get_oslo_service(setup_profiler=True):
return EngineServer(
default_engine.DefaultEngine(),
setup_profiler=setup_profiler
)

View File

@ -0,0 +1,99 @@
# Copyright 2016 - Nokia Networks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from mistral import config as cfg
from mistral.engine import default_executor
from mistral.engine.rpc_backend import rpc
from mistral.service import base as service_base
from mistral.utils import profiler as profiler_utils
from mistral.utils import rpc_utils
LOG = logging.getLogger(__name__)
class ExecutorServer(service_base.MistralService):
"""Executor server.
This class manages executor life-cycle and gets registered as an RPC
endpoint to process executor specific calls. It also registers a
cluster member associated with this instance of executor.
"""
def __init__(self, executor, setup_profiler=True):
super(ExecutorServer, self).__init__('executor_group', setup_profiler)
self.executor = executor
self._rpc_server = None
def start(self):
super(ExecutorServer, self).start()
if self._setup_profiler:
profiler_utils.setup('mistral-executor', cfg.CONF.executor.host)
# Initialize and start RPC server.
self._rpc_server = rpc.get_rpc_server_driver()(
rpc_utils.get_rpc_info_from_oslo(cfg.CONF.executor)
)
self._rpc_server.register_endpoint(self)
self._rpc_server.run(executor='threading')
self._notify_started('Executor server started.')
def stop(self, graceful=False):
super(ExecutorServer, self).stop(graceful)
if self._rpc_server:
self._rpc_server.stop(graceful)
def run_action(self, rpc_ctx, action_ex_id, action_class_str,
attributes, params, safe_rerun):
"""Receives calls over RPC to run action on executor.
:param rpc_ctx: RPC request context dictionary.
:param action_ex_id: Action execution id.
:param action_class_str: Action class name.
:param attributes: Action class attributes.
:param params: Action input parameters.
:param safe_rerun: Tells if given action can be safely rerun.
:return: Action result.
"""
LOG.info(
"Received RPC request 'run_action'[rpc_ctx=%s,"
" action_ex_id=%s, action_class=%s, attributes=%s, params=%s]"
% (rpc_ctx, action_ex_id, action_class_str, attributes, params)
)
redelivered = rpc_ctx.redelivered or False
return self.executor.run_action(
action_ex_id,
action_class_str,
attributes,
params,
safe_rerun,
redelivered
)
def get_oslo_service(setup_profiler=True):
return ExecutorServer(
default_executor.DefaultExecutor(),
setup_profiler=setup_profiler
)

View File

@ -73,5 +73,22 @@ class RPCServer(object):
def run(self, executor='blocking'):
"""Runs the RPC server.
:param executor: Executor used to process incoming requests. Different
implementations may support different options.
"""
raise NotImplementedError
def stop(self, graceful=False):
"""Stop the RPC server.
:param graceful: True if this method call should wait till all
internal threads are finished.
:return:
"""
# No-op by default.
pass
def wait(self):
"""Wait till all internal threads are finished."""
# No-op by default.
pass

View File

@ -47,6 +47,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
self.channel = None
self.conn = None
self._running = threading.Event()
self._stopped = threading.Event()
self.endpoints = []
@property
@ -63,6 +64,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
self.password,
self.virtual_host,
)
LOG.info("Connected to AMQP at %s:%s" % (self.host, self.port))
try:
@ -84,7 +86,10 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
callbacks=[self._on_message_safe],
) as consumer:
consumer.qos(prefetch_count=1)
self._running.set()
self._stopped.clear()
while self.is_running:
try:
conn.drain_events(timeout=1)
@ -92,16 +97,25 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
pass
except KeyboardInterrupt:
self.stop()
LOG.info("Server with id='{0}' stopped.".format(
self.server_id))
return
except socket.error as e:
raise exc.MistralException("Broker connection failed: %s" % e)
finally:
self._stopped.set()
def stop(self):
"""Stop the server."""
def stop(self, graceful=False):
self._running.clear()
if graceful:
self.wait()
def wait(self):
self._stopped.wait()
def _get_rpc_method(self, method_name):
for endpoint in self.endpoints:
if hasattr(endpoint, method_name):

View File

@ -34,6 +34,7 @@ class OsloRPCServer(rpc_base.RPCServer):
self.channel = None
self.connection = None
self.endpoints = []
self.oslo_server = None
def register_endpoint(self, endpoint):
self.endpoints.append(endpoint)
@ -44,7 +45,9 @@ class OsloRPCServer(rpc_base.RPCServer):
server=self.server_id
)
server = messaging.get_rpc_server(
# TODO(rakhmerov): rpc.get_transport() should be in oslo.messaging
# related module.
self.oslo_server = messaging.get_rpc_server(
rpc.get_transport(),
target,
self.endpoints,
@ -52,5 +55,13 @@ class OsloRPCServer(rpc_base.RPCServer):
serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer())
)
server.start()
server.wait()
self.oslo_server.start()
def stop(self, graceful=False):
self.oslo_server.stop()
if graceful:
self.oslo_server.wait()
def wait(self):
self.oslo_server.wait()

View File

@ -52,6 +52,9 @@ def cleanup():
_EVENT_ENGINE_CLIENT = None
# TODO(rakhmerov): This method seems misplaced. Now we have different kind
# of transports (oslo, kombu) and this module should not have any oslo
# specific things anymore.
def get_transport():
global _TRANSPORT
@ -120,174 +123,6 @@ def get_rpc_client_driver():
return _IMPL_CLIENT
class EngineServer(object):
"""RPC Engine server."""
def __init__(self, engine):
self._engine = engine
def start_workflow(self, rpc_ctx, workflow_identifier, workflow_input,
description, params):
"""Receives calls over RPC to start workflows on engine.
:param rpc_ctx: RPC request context.
:param workflow_identifier: Workflow definition identifier.
:param workflow_input: Workflow input.
:param description: Workflow execution description.
:param params: Additional workflow type specific parameters.
:return: Workflow execution.
"""
LOG.info(
"Received RPC request 'start_workflow'[rpc_ctx=%s,"
" workflow_identifier=%s, workflow_input=%s, description=%s, "
"params=%s]"
% (rpc_ctx, workflow_identifier, workflow_input, description,
params)
)
return self._engine.start_workflow(
workflow_identifier,
workflow_input,
description,
**params
)
def start_action(self, rpc_ctx, action_name,
action_input, description, params):
"""Receives calls over RPC to start actions on engine.
:param rpc_ctx: RPC request context.
:param action_name: name of the Action.
:param action_input: input dictionary for Action.
:param description: description of new Action execution.
:param params: extra parameters to run Action.
:return: Action execution.
"""
LOG.info(
"Received RPC request 'start_action'[rpc_ctx=%s,"
" name=%s, input=%s, description=%s, params=%s]"
% (rpc_ctx, action_name, action_input, description, params)
)
return self._engine.start_action(
action_name,
action_input,
description,
**params
)
def on_task_state_change(self, rpc_ctx, task_ex_id, state,
state_info=None):
return self._engine.on_task_state_change(task_ex_id, state, state_info)
def on_action_complete(self, rpc_ctx, action_ex_id, result_data,
result_error, wf_action):
"""Receives RPC calls to communicate action result to engine.
:param rpc_ctx: RPC request context.
:param action_ex_id: Action execution id.
:param result_data: Action result data.
:param result_error: Action result error.
:param wf_action: True if given id points to a workflow execution.
:return: Action execution.
"""
result = wf_utils.Result(result_data, result_error)
LOG.info(
"Received RPC request 'on_action_complete'[rpc_ctx=%s,"
" action_ex_id=%s, result=%s]" % (rpc_ctx, action_ex_id, result)
)
return self._engine.on_action_complete(action_ex_id, result, wf_action)
def pause_workflow(self, rpc_ctx, execution_id):
"""Receives calls over RPC to pause workflows on engine.
:param rpc_ctx: Request context.
:param execution_id: Workflow execution id.
:return: Workflow execution.
"""
LOG.info(
"Received RPC request 'pause_workflow'[rpc_ctx=%s,"
" execution_id=%s]" % (rpc_ctx, execution_id)
)
return self._engine.pause_workflow(execution_id)
def rerun_workflow(self, rpc_ctx, task_ex_id, reset=True, env=None):
"""Receives calls over RPC to rerun workflows on engine.
:param rpc_ctx: RPC request context.
:param task_ex_id: Task execution id.
:param reset: If true, then purge action execution for the task.
:param env: Environment variables to update.
:return: Workflow execution.
"""
LOG.info(
"Received RPC request 'rerun_workflow'[rpc_ctx=%s, "
"task_ex_id=%s]" % (rpc_ctx, task_ex_id)
)
return self._engine.rerun_workflow(task_ex_id, reset, env)
def resume_workflow(self, rpc_ctx, wf_ex_id, env=None):
"""Receives calls over RPC to resume workflows on engine.
:param rpc_ctx: RPC request context.
:param wf_ex_id: Workflow execution id.
:param env: Environment variables to update.
:return: Workflow execution.
"""
LOG.info(
"Received RPC request 'resume_workflow'[rpc_ctx=%s, "
"wf_ex_id=%s]" % (rpc_ctx, wf_ex_id)
)
return self._engine.resume_workflow(wf_ex_id, env)
def stop_workflow(self, rpc_ctx, execution_id, state, message=None):
"""Receives calls over RPC to stop workflows on engine.
Sets execution state to SUCCESS or ERROR. No more tasks will be
scheduled. Running tasks won't be killed, but their results
will be ignored.
:param rpc_ctx: RPC request context.
:param execution_id: Workflow execution id.
:param state: State assigned to the workflow. Permitted states are
SUCCESS or ERROR.
:param message: Optional information string.
:return: Workflow execution.
"""
LOG.info(
"Received RPC request 'stop_workflow'[rpc_ctx=%s, execution_id=%s,"
" state=%s, message=%s]" % (rpc_ctx, execution_id, state, message)
)
return self._engine.stop_workflow(execution_id, state, message)
def rollback_workflow(self, rpc_ctx, execution_id):
"""Receives calls over RPC to rollback workflows on engine.
:param rpc_ctx: RPC request context.
:return: Workflow execution.
"""
LOG.info(
"Received RPC request 'rollback_workflow'[rpc_ctx=%s,"
" execution_id=%s]" % (rpc_ctx, execution_id)
)
return self._engine.resume_workflow(execution_id)
def _wrap_exception_and_reraise(exception):
message = "%s: %s" % (exception.__class__.__name__, exception.args[0])
@ -498,43 +333,6 @@ class EngineClient(base.Engine):
)
class ExecutorServer(object):
"""RPC Executor server."""
def __init__(self, executor):
self._executor = executor
def run_action(self, rpc_ctx, action_ex_id, action_class_str,
attributes, params, safe_rerun):
"""Receives calls over RPC to run action on executor.
:param rpc_ctx: RPC request context dictionary.
:param action_ex_id: Action execution id.
:param action_class_str: Action class name.
:param attributes: Action class attributes.
:param params: Action input parameters.
:param safe_rerun: Tells if given action can be safely rerun.
:return: Action result.
"""
LOG.info(
"Received RPC request 'run_action'[rpc_ctx=%s,"
" action_ex_id=%s, action_class=%s, attributes=%s, params=%s]"
% (rpc_ctx, action_ex_id, action_class_str, attributes, params)
)
redelivered = rpc_ctx.redelivered or False
return self._executor.run_action(
action_ex_id,
action_class_str,
attributes,
params,
safe_rerun,
redelivered
)
class ExecutorClient(base.Executor):
"""RPC Executor client."""
@ -588,37 +386,6 @@ class ExecutorClient(base.Executor):
)
class EventEngineServer(object):
"""RPC EventEngine server."""
def __init__(self, event_engine):
self._event_engine = event_engine
def create_event_trigger(self, rpc_ctx, trigger, events):
LOG.info(
"Received RPC request 'create_event_trigger'[rpc_ctx=%s,"
" trigger=%s, events=%s", rpc_ctx, trigger, events
)
return self._event_engine.create_event_trigger(trigger, events)
def delete_event_trigger(self, rpc_ctx, trigger, events):
LOG.info(
"Received RPC request 'delete_event_trigger'[rpc_ctx=%s,"
" trigger=%s, events=%s", rpc_ctx, trigger, events
)
return self._event_engine.delete_event_trigger(trigger, events)
def update_event_trigger(self, rpc_ctx, trigger):
LOG.info(
"Received RPC request 'update_event_trigger'[rpc_ctx=%s,"
" trigger=%s", rpc_ctx, trigger
)
return self._event_engine.update_event_trigger(trigger)
class EventEngineClient(base.EventEngine):
"""RPC EventEngine client."""

View File

View File

@ -24,16 +24,18 @@ import six
import yaml
from mistral import context as auth_ctx
from mistral import coordination
from mistral.db.v2 import api as db_api
from mistral.engine.rpc_backend import rpc
from mistral import exceptions
from mistral import expressions
from mistral import messaging as mistral_messaging
from mistral.services import security
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
DEFAULT_PROPERTIES = {
'service': '<% $.publisher %>',
'project_id': '<% $.context.project_id %>',
@ -124,16 +126,14 @@ class NotificationsConverter(object):
return edef.convert(event)
class EventEngine(coordination.Service):
class EventEngine(object):
"""Event engine server.
A separate service that is responsible for listening event notification
and trigger workflows defined by end user.
and triggering workflows defined by end user.
"""
def __init__(self, engine_client):
coordination.Service.__init__(self, 'event_engine_group')
self.engine_client = engine_client
def __init__(self):
self.engine_client = rpc.get_engine_client()
self.event_queue = six.moves.queue.Queue()
self.handler_tg = threadgroup.ThreadGroup()

View File

@ -0,0 +1,94 @@
# Copyright 2016 - Nokia Networks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from mistral import config as cfg
from mistral.engine.rpc_backend import rpc
from mistral.event_engine import event_engine
from mistral.service import base as service_base
from mistral.utils import profiler as profiler_utils
from mistral.utils import rpc_utils
LOG = logging.getLogger(__name__)
class EventEngineServer(service_base.MistralService):
"""RPC EventEngine server.
This class manages event engine life-cycle and gets registered as
an RPC endpoint to process event engine specific calls. It also
registers a cluster member associated with this instance of event
engine.
"""
def __init__(self, event_engine):
super(EventEngineServer, self).__init__('event_engine_group')
self._event_engine = event_engine
self._rpc_server = None
def start(self):
super(EventEngineServer, self).start()
profiler_utils.setup(
'mistral-event-engine',
cfg.CONF.event_engine.host
)
# Initialize and start RPC server.
self._rpc_server = rpc.get_rpc_server_driver()(
rpc_utils.get_rpc_info_from_oslo(cfg.CONF.event_engine)
)
self._rpc_server.register_endpoint(self)
self._rpc_server.run()
self._notify_started('Event engine server started.')
def stop(self, graceful=False):
super(EventEngineServer, self).stop(graceful)
if self._rpc_server:
self._rpc_server.stop(graceful)
def create_event_trigger(self, rpc_ctx, trigger, events):
LOG.info(
"Received RPC request 'create_event_trigger'[rpc_ctx=%s,"
" trigger=%s, events=%s", rpc_ctx, trigger, events
)
return self._event_engine.create_event_trigger(trigger, events)
def delete_event_trigger(self, rpc_ctx, trigger, events):
LOG.info(
"Received RPC request 'delete_event_trigger'[rpc_ctx=%s,"
" trigger=%s, events=%s", rpc_ctx, trigger, events
)
return self._event_engine.delete_event_trigger(trigger, events)
def update_event_trigger(self, rpc_ctx, trigger):
LOG.info(
"Received RPC request 'update_event_trigger'[rpc_ctx=%s,"
" trigger=%s", rpc_ctx, trigger
)
return self._event_engine.update_event_trigger(trigger)
def get_oslo_service():
return EventEngineServer(event_engine.EventEngine())

View File

63
mistral/service/base.py Normal file
View File

@ -0,0 +1,63 @@
# Copyright 2016 - Nokia Networks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from eventlet import event
from oslo_log import log
from oslo_service import service
from mistral.service import coordination
LOG = log.getLogger(__name__)
class MistralService(service.Service):
"""Base class for Mistral services.
The term 'service' here means any Mistral component that can run as
an independent process and thus can be registered as a cluster member.
"""
def __init__(self, cluster_group, setup_profiler=True):
super(MistralService, self).__init__()
self.cluster_member = coordination.Service(cluster_group)
self._setup_profiler = setup_profiler
self._started = event.Event()
def wait_started(self):
"""Wait until the service is fully started."""
self._started.wait()
def _notify_started(self, message):
print(message)
self._started.send()
def start(self):
super(MistralService, self).start()
self.cluster_member.register_membership()
def stop(self, graceful=False):
super(MistralService, self).stop(graceful)
self._started = event.Event()
# TODO(rakhmerov): Probably we could also take care of an RPC server
# if it exists for this particular service type. Take a look at
# executor and engine servers.
# TODO(rakhmerov): This method is not implemented correctly now
# (not thread-safe). Uncomment this call once it's fixed.
# self.cluster_member.stop()

View File

@ -110,3 +110,5 @@ def setup():
periodic_interval_max=1,
context=ctx
)
return tg

View File

@ -200,21 +200,33 @@ class CallScheduler(periodic_task.PeriodicTasks):
)
def setup():
def start():
tg = threadgroup.ThreadGroup()
scheduler = CallScheduler(CONF)
sched = CallScheduler(CONF)
tg.add_dynamic_timer(
scheduler.run_periodic_tasks,
sched.run_periodic_tasks,
initial_delay=None,
periodic_interval_max=1,
context=None
)
_schedulers[scheduler] = tg
_schedulers[sched] = tg
return tg
return sched
def stop_scheduler(sched, graceful=False):
if sched:
tg = _schedulers[sched]
tg.stop()
del _schedulers[sched]
if graceful:
tg.wait()
def stop_all_schedulers():

View File

@ -17,7 +17,7 @@ from oslo_config import cfg
import tooz.coordination
from webtest import app as webtest_app
from mistral import coordination
from mistral.service import coordination
from mistral.tests.unit.api import base
@ -54,7 +54,7 @@ class TestServicesController(base.APITest):
self.assertIn('Service API is not supported', context.args[0])
@mock.patch('mistral.coordination.ServiceCoordinator.get_members',
@mock.patch('mistral.service.coordination.ServiceCoordinator.get_members',
side_effect=tooz.coordination.ToozError('error message'))
def test_get_all_with_get_members_error(self, mock_get_members):
cfg.CONF.set_default('backend_url', 'zake://', 'coordination')

View File

@ -18,13 +18,12 @@ import eventlet
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import service
from mistral import context as ctx
from mistral.db.v2 import api as db_api
from mistral.engine import default_engine as def_eng
from mistral.engine import default_executor as def_exec
from mistral.engine import engine_server
from mistral.engine import executor_server
from mistral.engine.rpc_backend import rpc
from mistral.services import scheduler
from mistral.tests.unit import base
from mistral.workflow import states
@ -36,54 +35,12 @@ DEFAULT_DELAY = 1
DEFAULT_TIMEOUT = 30
def launch_engine_server(transport, engine):
target = messaging.Target(
topic=cfg.CONF.engine.topic,
server=cfg.CONF.engine.host
)
def launch_service(s):
launcher = service.ServiceLauncher(cfg.CONF)
server = messaging.get_rpc_server(
transport,
target,
[rpc.EngineServer(engine)],
executor='blocking',
serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer())
)
launcher.launch_service(s)
try:
server.start()
while True:
eventlet.sleep(604800)
except (KeyboardInterrupt, SystemExit):
LOG.info("Stopping engine service...")
finally:
server.stop()
server.wait()
def launch_executor_server(transport, executor):
target = messaging.Target(
topic=cfg.CONF.executor.topic,
server=cfg.CONF.executor.host
)
server = messaging.get_rpc_server(
transport,
target,
[rpc.ExecutorServer(executor)],
executor='blocking',
serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer())
)
try:
server.start()
while True:
eventlet.sleep(604800)
except (KeyboardInterrupt, SystemExit):
LOG.info("Stopping executor service...")
finally:
server.stop()
server.wait()
launcher.wait()
class EngineTestCase(base.DbTestCase):
@ -100,28 +57,35 @@ class EngineTestCase(base.DbTestCase):
# Drop all RPC objects (transport, clients).
rpc.cleanup()
transport = rpc.get_transport()
self.engine_client = rpc.get_engine_client()
self.executor_client = rpc.get_executor_client()
self.engine = def_eng.DefaultEngine(self.engine_client)
self.executor = def_exec.DefaultExecutor(self.engine_client)
LOG.info("Starting engine and executor threads...")
engine_service = engine_server.get_oslo_service(setup_profiler=False)
executor_service = executor_server.get_oslo_service(
setup_profiler=False
)
self.engine = engine_service.engine
self.executor = executor_service.executor
self.threads = [
eventlet.spawn(launch_engine_server, transport, self.engine),
eventlet.spawn(launch_executor_server, transport, self.executor),
eventlet.spawn(launch_service, executor_service),
eventlet.spawn(launch_service, engine_service)
]
self.addOnException(self.print_executions)
# Start scheduler.
scheduler_thread_group = scheduler.setup()
self.addCleanup(executor_service.stop, True)
self.addCleanup(engine_service.stop, True)
self.addCleanup(self.kill_threads)
self.addCleanup(scheduler_thread_group.stop)
# Make sure that both services fully started, otherwise
# the test may run too early.
executor_service.wait_started()
engine_service.wait_started()
def kill_threads(self):
LOG.info("Finishing engine and executor threads...")

View File

@ -23,6 +23,7 @@ from oslo_utils import uuidutils
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral.engine import default_engine as d_eng
from mistral.engine.rpc_backend import rpc
from mistral import exceptions as exc
from mistral.services import workbooks as wb_service
from mistral.tests.unit import base
@ -92,6 +93,7 @@ MOCK_ENVIRONMENT = mock.MagicMock(return_value=ENVIRONMENT_DB)
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
@mock.patch.object(rpc, 'get_executor_client', mock.Mock())
class DefaultEngineTest(base.DbTestCase):
def setUp(self):
super(DefaultEngineTest, self).setUp()
@ -100,7 +102,7 @@ class DefaultEngineTest(base.DbTestCase):
# Note: For purposes of this test we can easily use
# simple magic mocks for engine and executor clients
self.engine = d_eng.DefaultEngine(mock.MagicMock())
self.engine = d_eng.DefaultEngine()
def test_start_workflow(self):
wf_input = {'param1': 'Hey', 'param2': 'Hi'}

View File

@ -80,7 +80,7 @@ workflows:
def _run_at_target(action_ex_id, action_class_str, attributes,
action_params, target=None, async=True, safe_rerun=False):
# We'll just call executor directly for testing purposes.
executor = default_executor.DefaultExecutor(rpc.get_engine_client())
executor = default_executor.DefaultExecutor()
executor.run_action(
action_ex_id,

View File

@ -221,13 +221,18 @@ class JoinEngineTest(base.EngineTestCase):
task4 = self._assert_single_item(tasks, name='task4')
# NOTE(xylan): We ensure task4 is successful here because of the
# uncertainty of its running parallelly with task3.
# uncertainty of its running in parallel with task3.
self.await_task_success(task4.id)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(states.SUCCESS, task1.state)
self.assertEqual(states.SUCCESS, task2.state)
self.assertEqual(states.WAITING, task3.state)
# NOTE(rakhmerov): Task 3 must fail because task2->task3 transition
# will never trigger due to its condition.
self.await_task_error(task3.id)
self.await_workflow_error(wf_ex.id)
def test_partial_join(self):
wf_text = """---

View File

@ -27,7 +27,7 @@ from mistral.workflow import states
def _run_at_target(action_ex_id, action_class_str, attributes,
action_params, target=None, async=True, safe_rerun=False):
# We'll just call executor directly for testing purposes.
executor = default_executor.DefaultExecutor(rpc.get_engine_client())
executor = default_executor.DefaultExecutor()
executor.run_action(
action_ex_id,
@ -43,7 +43,6 @@ MOCK_RUN_AT_TARGET = mock.MagicMock(side_effect=_run_at_target)
class TestSafeRerun(base.EngineTestCase):
@mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET)
def test_safe_rerun_true(self):
wf_text = """---

View File

@ -18,7 +18,6 @@ from oslo_config import cfg
import requests
from mistral.db.v2 import api as db_api
from mistral.services import scheduler
from mistral.services import workflows as wf_service
from mistral.tests.unit.engine import base
from mistral.workflow import states
@ -83,13 +82,6 @@ class TaskDefaultsDirectWorkflowEngineTest(base.EngineTestCase):
class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
def setUp(self):
super(TaskDefaultsReverseWorkflowEngineTest, self).setUp()
thread_group = scheduler.setup()
self.addCleanup(thread_group.stop)
def test_task_defaults_retry_policy(self):
wf_text = """---
version: '2.0'

View File

@ -18,7 +18,8 @@ import mock
from oslo_config import cfg
from mistral.db.v2.sqlalchemy import api as db_api
from mistral.services import event_engine
from mistral.engine.rpc_backend import rpc
from mistral.event_engine import event_engine
from mistral.services import workflows
from mistral.tests.unit import base
@ -55,10 +56,13 @@ class EventEngineTest(base.DbTestCase):
super(EventEngineTest, self).setUp()
self.wf = workflows.create_workflows(WORKFLOW_LIST)[0]
EVENT_TRIGGER['workflow_id'] = self.wf.id
@mock.patch.object(rpc, 'get_engine_client', mock.Mock())
def test_event_engine_start_with_no_triggers(self):
e_engine = event_engine.EventEngine(mock.Mock())
e_engine = event_engine.EventEngine()
self.addCleanup(e_engine.handler_tg.stop)
self.assertEqual(0, len(e_engine.event_triggers_map))
@ -66,10 +70,12 @@ class EventEngineTest(base.DbTestCase):
self.assertEqual(0, len(e_engine.exchange_topic_listener_map))
@mock.patch('mistral.messaging.start_listener')
@mock.patch.object(rpc, 'get_engine_client', mock.Mock())
def test_event_engine_start_with_triggers(self, mock_start):
trigger = db_api.create_event_trigger(EVENT_TRIGGER)
e_engine = event_engine.EventEngine(mock.MagicMock())
e_engine = event_engine.EventEngine()
self.addCleanup(e_engine.handler_tg.stop)
self.assertEqual(1, len(e_engine.exchange_topic_events_map))
@ -86,11 +92,12 @@ class EventEngineTest(base.DbTestCase):
self.assertEqual(1, len(e_engine.exchange_topic_listener_map))
@mock.patch('mistral.messaging.start_listener')
@mock.patch.object(rpc, 'get_engine_client', mock.Mock())
def test_process_event_queue(self, mock_start):
db_api.create_event_trigger(EVENT_TRIGGER)
client = mock.MagicMock()
e_engine = event_engine.EventEngine(client)
e_engine = event_engine.EventEngine()
self.addCleanup(e_engine.handler_tg.stop)
event = {
@ -103,6 +110,7 @@ class EventEngineTest(base.DbTestCase):
with mock.patch.object(e_engine, 'engine_client') as client_mock:
e_engine.event_queue.put(event)
time.sleep(1)
self.assertEqual(1, client_mock.start_workflow.call_count)

View File

@ -81,9 +81,9 @@ class SchedulerServiceTest(base.DbTestCase):
def setUp(self):
super(SchedulerServiceTest, self).setUp()
self.thread_group = scheduler.setup()
sched = scheduler.start()
self.addCleanup(self.thread_group.stop)
self.addCleanup(scheduler.stop_scheduler, sched, True)
@mock.patch(FACTORY_METHOD_PATH)
def test_scheduler_with_factory(self, factory):
@ -242,11 +242,12 @@ class SchedulerServiceTest(base.DbTestCase):
@mock.patch(TARGET_METHOD_PATH)
def test_scheduler_multi_instance(self, method):
def stop_thread_groups():
[tg.stop() for tg in self.tgs]
scheds = [scheduler.start(), scheduler.start()]
self.tgs = [scheduler.setup(), scheduler.setup()]
self.addCleanup(stop_thread_groups)
def stop_schedulers():
[scheduler.stop_scheduler(s, True) for s in scheds]
self.addCleanup(stop_schedulers)
method_args = {'name': 'task', 'id': '321'}

View File

@ -16,7 +16,7 @@ import mock
from oslo_config import cfg
import six
from mistral import coordination
from mistral.service import coordination
from mistral.tests.unit import base