From a5d284aa4f1c18b69452500b1760f133611d1599 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Tue, 22 Nov 2016 11:24:41 +0700 Subject: [PATCH] Fix launch process of Mistral components * Fixed the bug #1622534 * Introduced MistralService class extending oslo.service.Service that all Mistral components running standalone should extend * Refactored engine, executor and event engine with MistralService * Moved most of the startup logic from launch.cmd to individual corresponding components * Cluster membership is now under control of MistralService * Fixed test_join.test_full_join_with_conditions() which previously had a bug and we were just 'lucky' that it passed due to a different work of scheduler * Fixed a number of test cases * Other minor changes TODO: * We now use many launchers (from oslo.service) to launch services whereas we could use just one which is recommended by oslo. However, we can't do that because of the api service that uses many workers. We'll need to look at how to refactor it moving forward. * Write tests for MistralService and its derrived classes * Address a number of TODO comments. Closes-Bug: 1622534 Change-Id: I34ba6a8b8caf8bea17109e0f259085b373eb6d45 --- mistral/api/app.py | 3 +- mistral/api/controllers/v2/service.py | 4 +- mistral/api/service.py | 9 +- mistral/cmd/launch.py | 110 +++----- mistral/db/v2/sqlalchemy/api.py | 38 ++- mistral/engine/default_engine.py | 9 +- mistral/engine/default_executor.py | 10 +- mistral/engine/engine_server.py | 250 ++++++++++++++++++ mistral/engine/executor_server.py | 99 +++++++ mistral/engine/rpc_backend/base.py | 17 ++ .../engine/rpc_backend/kombu/kombu_server.py | 18 +- .../engine/rpc_backend/oslo/oslo_server.py | 17 +- mistral/engine/rpc_backend/rpc.py | 239 +---------------- mistral/event_engine/__init__.py | 0 .../event_engine.py | 14 +- mistral/event_engine/event_engine_server.py | 94 +++++++ mistral/service/__init__.py | 0 mistral/service/base.py | 63 +++++ mistral/{ => service}/coordination.py | 0 mistral/services/expiration_policy.py | 2 + mistral/services/scheduler.py | 22 +- mistral/tests/unit/api/v2/test_services.py | 4 +- mistral/tests/unit/engine/base.py | 84 ++---- .../tests/unit/engine/test_default_engine.py | 4 +- mistral/tests/unit/engine/test_environment.py | 2 +- mistral/tests/unit/engine/test_join.py | 9 +- mistral/tests/unit/engine/test_safe_rerun.py | 3 +- .../tests/unit/engine/test_task_defaults.py | 8 - .../tests/unit/services/test_event_engine.py | 18 +- mistral/tests/unit/services/test_scheduler.py | 13 +- mistral/tests/unit/test_coordination.py | 2 +- 31 files changed, 717 insertions(+), 448 deletions(-) create mode 100644 mistral/engine/engine_server.py create mode 100644 mistral/engine/executor_server.py create mode 100644 mistral/event_engine/__init__.py rename mistral/{services => event_engine}/event_engine.py (97%) create mode 100644 mistral/event_engine/event_engine_server.py create mode 100644 mistral/service/__init__.py create mode 100644 mistral/service/base.py rename mistral/{ => service}/coordination.py (100%) diff --git a/mistral/api/app.py b/mistral/api/app.py index c84814b1..a2807e85 100644 --- a/mistral/api/app.py +++ b/mistral/api/app.py @@ -21,9 +21,9 @@ import pecan from mistral.api import access_control from mistral import config as m_config from mistral import context as ctx -from mistral import coordination from mistral.db.v2 import api as db_api_v2 from mistral.engine.rpc_backend import rpc +from mistral.service import coordination from mistral.services import periodic @@ -68,6 +68,7 @@ def setup_app(config=None): # Set up access control. app = access_control.setup(app) + # TODO(rakhmerov): need to get rid of this call. # Set up RPC related flags in config rpc.get_transport() diff --git a/mistral/api/controllers/v2/service.py b/mistral/api/controllers/v2/service.py index d20450ee..9c47b2d0 100644 --- a/mistral/api/controllers/v2/service.py +++ b/mistral/api/controllers/v2/service.py @@ -21,10 +21,12 @@ import wsmeext.pecan as wsme_pecan from mistral.api import access_control as acl from mistral.api.controllers.v2 import resources +# TODO(rakhmerov): invalid dependency, a REST controller must not depend on +# a launch script. from mistral.cmd import launch from mistral import context -from mistral import coordination from mistral import exceptions as exc +from mistral.service import coordination from mistral.utils import rest_utils diff --git a/mistral/api/service.py b/mistral/api/service.py index 7242ebcb..ce32e675 100644 --- a/mistral/api/service.py +++ b/mistral/api/service.py @@ -26,8 +26,9 @@ class WSGIService(service.ServiceBase): def __init__(self, name): self.name = name self.app = app.setup_app() - self.workers = (cfg.CONF.api.api_workers or - processutils.get_worker_count()) + self.workers = ( + cfg.CONF.api.api_workers or processutils.get_worker_count() + ) self.server = wsgi.Server( cfg.CONF, @@ -49,7 +50,3 @@ class WSGIService(service.ServiceBase): def reset(self): self.server.reset() - - -def process_launcher(): - return service.ProcessLauncher(cfg.CONF) diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index 422f28cc..c965bd20 100644 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -39,18 +39,14 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'mistral', '__init__.py')): from oslo_config import cfg from oslo_log import log as logging +from oslo_service import service -from mistral.api import service as mistral_service +from mistral.api import service as api_service from mistral import config -from mistral.db.v2 import api as db_api -from mistral.engine import default_engine as def_eng -from mistral.engine import default_executor as def_executor +from mistral.engine import engine_server +from mistral.engine import executor_server from mistral.engine.rpc_backend import rpc -from mistral.services import event_engine -from mistral.services import expiration_policy -from mistral.services import scheduler -from mistral.utils import profiler -from mistral.utils import rpc_utils +from mistral.event_engine import event_engine_server from mistral import version @@ -58,84 +54,48 @@ CONF = cfg.CONF def launch_executor(): - profiler.setup('mistral-executor', cfg.CONF.executor.host) - - executor_v2 = def_executor.DefaultExecutor(rpc.get_engine_client()) - executor_endpoint = rpc.ExecutorServer(executor_v2) - - executor_server = rpc.get_rpc_server_driver()( - rpc_utils.get_rpc_info_from_oslo(CONF.executor) - ) - executor_server.register_endpoint(executor_endpoint) - - executor_v2.register_membership() - try: - executor_server.run(executor='threading') - except (KeyboardInterrupt, SystemExit): - pass - finally: - print("Stopping executor service...") + launcher = service.ServiceLauncher(CONF) + + launcher.launch_service(executor_server.get_oslo_service()) + + launcher.wait() + except RuntimeError as e: + sys.stderr.write("ERROR: %s\n" % e) + sys.exit(1) def launch_engine(): - profiler.setup('mistral-engine', cfg.CONF.engine.host) - - engine_v2 = def_eng.DefaultEngine(rpc.get_engine_client()) - - engine_endpoint = rpc.EngineServer(engine_v2) - - # Setup scheduler in engine. - db_api.setup_db() - scheduler.setup() - - # Setup expiration policy - expiration_policy.setup() - - engine_server = rpc.get_rpc_server_driver()( - rpc_utils.get_rpc_info_from_oslo(CONF.engine) - ) - engine_server.register_endpoint(engine_endpoint) - - engine_v2.register_membership() - try: - # Note(ddeja): Engine needs to be run in default (blocking) mode - # since using another mode may lead to deadlock. - # See https://review.openstack.org/#/c/356343/ - # for more info. - engine_server.run() - except (KeyboardInterrupt, SystemExit): - pass - finally: - print("Stopping engine service...") + launcher = service.ServiceLauncher(CONF) + + launcher.launch_service(engine_server.get_oslo_service()) + + launcher.wait() + except RuntimeError as e: + sys.stderr.write("ERROR: %s\n" % e) + sys.exit(1) def launch_event_engine(): - profiler.setup('mistral-event-engine', cfg.CONF.event_engine.host) - - event_eng = event_engine.EventEngine(rpc.get_engine_client()) - endpoint = rpc.EventEngineServer(event_eng) - - event_engine_server = rpc.get_rpc_server_driver()( - rpc_utils.get_rpc_info_from_oslo(CONF.event_engine) - ) - event_engine_server.register_endpoint(endpoint) - - event_eng.register_membership() - try: - event_engine_server.run() - except (KeyboardInterrupt, SystemExit): - pass - finally: - print("Stopping event_engine service...") + launcher = service.ServiceLauncher(CONF) + + launcher.launch_service(event_engine_server.get_oslo_service()) + + launcher.wait() + except RuntimeError as e: + sys.stderr.write("ERROR: %s\n" % e) + sys.exit(1) def launch_api(): - launcher = mistral_service.process_launcher() - server = mistral_service.WSGIService('mistral_api') + launcher = service.ProcessLauncher(cfg.CONF) + + server = api_service.WSGIService('mistral_api') + launcher.launch_service(server, workers=server.workers) + launcher.wait() @@ -144,8 +104,6 @@ def launch_any(options): threads = [eventlet.spawn(LAUNCH_OPTIONS[option]) for option in options] - print('Server started.') - [thread.wait() for thread in threads] diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 7f14584f..f2fcd61b 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -16,6 +16,7 @@ import contextlib import sys +import threading from oslo_config import cfg from oslo_db import exception as db_exc @@ -42,6 +43,10 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) +_SCHEMA_LOCK = threading.RLock() +_initialized = False + + def get_backend(): """Consumed by openstack common code. @@ -52,20 +57,33 @@ def get_backend(): def setup_db(): - try: - models.Workbook.metadata.create_all(b.get_engine()) - except sa.exc.OperationalError as e: - raise exc.DBError("Failed to setup database: %s" % e) + global _initialized + + with _SCHEMA_LOCK: + if _initialized: + return + + try: + models.Workbook.metadata.create_all(b.get_engine()) + + _initialized = True + except sa.exc.OperationalError as e: + raise exc.DBError("Failed to setup database: %s" % e) def drop_db(): - global _facade + global _initialized - try: - models.Workbook.metadata.drop_all(b.get_engine()) - _facade = None - except Exception as e: - raise exc.DBError("Failed to drop database: %s" % e) + with _SCHEMA_LOCK: + if not _initialized: + return + + try: + models.Workbook.metadata.drop_all(b.get_engine()) + + _initialized = False + except Exception as e: + raise exc.DBError("Failed to drop database: %s" % e) # Transaction management. diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index b1600b5d..8a56dfb5 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -17,7 +17,6 @@ from oslo_log import log as logging from osprofiler import profiler -from mistral import coordination from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import action_handler @@ -36,17 +35,13 @@ LOG = logging.getLogger(__name__) # the submodules are referenced. -class DefaultEngine(base.Engine, coordination.Service): - def __init__(self, engine_client): - self._engine_client = engine_client - - coordination.Service.__init__(self, 'engine_group') - +class DefaultEngine(base.Engine): @action_queue.process @u.log_exec(LOG) @profiler.trace('engine-start-workflow') def start_workflow(self, wf_identifier, wf_input, description='', **params): + with db_api.transaction(): wf_ex = wf_handler.start_workflow( wf_identifier, diff --git a/mistral/engine/default_executor.py b/mistral/engine/default_executor.py index 210c965a..55c76cf3 100644 --- a/mistral/engine/default_executor.py +++ b/mistral/engine/default_executor.py @@ -17,8 +17,8 @@ from oslo_log import log as logging from osprofiler import profiler from mistral.actions import action_factory as a_f -from mistral import coordination from mistral.engine import base +from mistral.engine.rpc_backend import rpc from mistral.utils import inspect_utils as i_u from mistral.workflow import utils as wf_utils @@ -26,11 +26,9 @@ from mistral.workflow import utils as wf_utils LOG = logging.getLogger(__name__) -class DefaultExecutor(base.Executor, coordination.Service): - def __init__(self, engine_client): - self._engine_client = engine_client - - coordination.Service.__init__(self, 'executor_group') +class DefaultExecutor(base.Executor): + def __init__(self): + self._engine_client = rpc.get_engine_client() @profiler.trace('executor-run-action') def run_action(self, action_ex_id, action_class_str, attributes, diff --git a/mistral/engine/engine_server.py b/mistral/engine/engine_server.py new file mode 100644 index 00000000..7c3d5cdf --- /dev/null +++ b/mistral/engine/engine_server.py @@ -0,0 +1,250 @@ +# Copyright 2016 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral import config as cfg +from mistral.db.v2 import api as db_api +from mistral.engine import default_engine +from mistral.engine.rpc_backend import rpc +from mistral.service import base as service_base +from mistral.services import expiration_policy +from mistral.services import scheduler +from mistral.utils import profiler as profiler_utils +from mistral.utils import rpc_utils +from mistral.workflow import utils as wf_utils + +LOG = logging.getLogger(__name__) + + +class EngineServer(service_base.MistralService): + """Engine server. + + This class manages engine life-cycle and gets registered as an RPC + endpoint to process engine specific calls. It also registers a + cluster member associated with this instance of engine. + """ + + def __init__(self, engine, setup_profiler=True): + super(EngineServer, self).__init__('engine_group', setup_profiler) + + self.engine = engine + self._rpc_server = None + self._scheduler = None + self._expiration_policy_tg = None + + def start(self): + super(EngineServer, self).start() + + db_api.setup_db() + + self._scheduler = scheduler.start() + self._expiration_policy_tg = expiration_policy.setup() + + if self._setup_profiler: + profiler_utils.setup('mistral-engine', cfg.CONF.engine.host) + + # Initialize and start RPC server. + + self._rpc_server = rpc.get_rpc_server_driver()( + rpc_utils.get_rpc_info_from_oslo(cfg.CONF.engine) + ) + self._rpc_server.register_endpoint(self) + + # Note(ddeja): Engine needs to be run in default (blocking) mode + # since using another mode may leads to deadlock. + # See https://review.openstack.org/#/c/356343 for more info. + self._rpc_server.run(executor='blocking') + + self._notify_started('Engine server started.') + + def stop(self, graceful=False): + super(EngineServer, self).stop(graceful) + + if self._scheduler: + scheduler.stop_scheduler(self._scheduler, graceful) + + if self._expiration_policy_tg: + self._expiration_policy_tg.stop(graceful) + + if self._rpc_server: + self._rpc_server.stop(graceful) + + def start_workflow(self, rpc_ctx, workflow_identifier, workflow_input, + description, params): + """Receives calls over RPC to start workflows on engine. + + :param rpc_ctx: RPC request context. + :param workflow_identifier: Workflow definition identifier. + :param workflow_input: Workflow input. + :param description: Workflow execution description. + :param params: Additional workflow type specific parameters. + :return: Workflow execution. + """ + + LOG.info( + "Received RPC request 'start_workflow'[rpc_ctx=%s," + " workflow_identifier=%s, workflow_input=%s, description=%s, " + "params=%s]" + % (rpc_ctx, workflow_identifier, workflow_input, description, + params) + ) + + return self.engine.start_workflow( + workflow_identifier, + workflow_input, + description, + **params + ) + + def start_action(self, rpc_ctx, action_name, + action_input, description, params): + """Receives calls over RPC to start actions on engine. + + :param rpc_ctx: RPC request context. + :param action_name: name of the Action. + :param action_input: input dictionary for Action. + :param description: description of new Action execution. + :param params: extra parameters to run Action. + :return: Action execution. + """ + LOG.info( + "Received RPC request 'start_action'[rpc_ctx=%s," + " name=%s, input=%s, description=%s, params=%s]" + % (rpc_ctx, action_name, action_input, description, params) + ) + + return self.engine.start_action( + action_name, + action_input, + description, + **params + ) + + def on_task_state_change(self, rpc_ctx, task_ex_id, state, + state_info=None): + return self.engine.on_task_state_change(task_ex_id, state, state_info) + + def on_action_complete(self, rpc_ctx, action_ex_id, result_data, + result_error, wf_action): + """Receives RPC calls to communicate action result to engine. + + :param rpc_ctx: RPC request context. + :param action_ex_id: Action execution id. + :param result_data: Action result data. + :param result_error: Action result error. + :param wf_action: True if given id points to a workflow execution. + :return: Action execution. + """ + + result = wf_utils.Result(result_data, result_error) + + LOG.info( + "Received RPC request 'on_action_complete'[rpc_ctx=%s," + " action_ex_id=%s, result=%s]" % (rpc_ctx, action_ex_id, result) + ) + + return self.engine.on_action_complete(action_ex_id, result, wf_action) + + def pause_workflow(self, rpc_ctx, execution_id): + """Receives calls over RPC to pause workflows on engine. + + :param rpc_ctx: Request context. + :param execution_id: Workflow execution id. + :return: Workflow execution. + """ + + LOG.info( + "Received RPC request 'pause_workflow'[rpc_ctx=%s," + " execution_id=%s]" % (rpc_ctx, execution_id) + ) + + return self.engine.pause_workflow(execution_id) + + def rerun_workflow(self, rpc_ctx, task_ex_id, reset=True, env=None): + """Receives calls over RPC to rerun workflows on engine. + + :param rpc_ctx: RPC request context. + :param task_ex_id: Task execution id. + :param reset: If true, then purge action execution for the task. + :param env: Environment variables to update. + :return: Workflow execution. + """ + + LOG.info( + "Received RPC request 'rerun_workflow'[rpc_ctx=%s, " + "task_ex_id=%s]" % (rpc_ctx, task_ex_id) + ) + + return self.engine.rerun_workflow(task_ex_id, reset, env) + + def resume_workflow(self, rpc_ctx, wf_ex_id, env=None): + """Receives calls over RPC to resume workflows on engine. + + :param rpc_ctx: RPC request context. + :param wf_ex_id: Workflow execution id. + :param env: Environment variables to update. + :return: Workflow execution. + """ + + LOG.info( + "Received RPC request 'resume_workflow'[rpc_ctx=%s, " + "wf_ex_id=%s]" % (rpc_ctx, wf_ex_id) + ) + + return self.engine.resume_workflow(wf_ex_id, env) + + def stop_workflow(self, rpc_ctx, execution_id, state, message=None): + """Receives calls over RPC to stop workflows on engine. + + Sets execution state to SUCCESS or ERROR. No more tasks will be + scheduled. Running tasks won't be killed, but their results + will be ignored. + + :param rpc_ctx: RPC request context. + :param execution_id: Workflow execution id. + :param state: State assigned to the workflow. Permitted states are + SUCCESS or ERROR. + :param message: Optional information string. + + :return: Workflow execution. + """ + + LOG.info( + "Received RPC request 'stop_workflow'[rpc_ctx=%s, execution_id=%s," + " state=%s, message=%s]" % (rpc_ctx, execution_id, state, message) + ) + + return self.engine.stop_workflow(execution_id, state, message) + + def rollback_workflow(self, rpc_ctx, execution_id): + """Receives calls over RPC to rollback workflows on engine. + + :param rpc_ctx: RPC request context. + :return: Workflow execution. + """ + + LOG.info( + "Received RPC request 'rollback_workflow'[rpc_ctx=%s," + " execution_id=%s]" % (rpc_ctx, execution_id) + ) + + return self.engine.rollback_workflow(execution_id) + + +def get_oslo_service(setup_profiler=True): + return EngineServer( + default_engine.DefaultEngine(), + setup_profiler=setup_profiler + ) diff --git a/mistral/engine/executor_server.py b/mistral/engine/executor_server.py new file mode 100644 index 00000000..b408ab77 --- /dev/null +++ b/mistral/engine/executor_server.py @@ -0,0 +1,99 @@ +# Copyright 2016 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral import config as cfg +from mistral.engine import default_executor +from mistral.engine.rpc_backend import rpc +from mistral.service import base as service_base +from mistral.utils import profiler as profiler_utils +from mistral.utils import rpc_utils + +LOG = logging.getLogger(__name__) + + +class ExecutorServer(service_base.MistralService): + """Executor server. + + This class manages executor life-cycle and gets registered as an RPC + endpoint to process executor specific calls. It also registers a + cluster member associated with this instance of executor. + """ + + def __init__(self, executor, setup_profiler=True): + super(ExecutorServer, self).__init__('executor_group', setup_profiler) + + self.executor = executor + self._rpc_server = None + + def start(self): + super(ExecutorServer, self).start() + + if self._setup_profiler: + profiler_utils.setup('mistral-executor', cfg.CONF.executor.host) + + # Initialize and start RPC server. + + self._rpc_server = rpc.get_rpc_server_driver()( + rpc_utils.get_rpc_info_from_oslo(cfg.CONF.executor) + ) + self._rpc_server.register_endpoint(self) + + self._rpc_server.run(executor='threading') + + self._notify_started('Executor server started.') + + def stop(self, graceful=False): + super(ExecutorServer, self).stop(graceful) + + if self._rpc_server: + self._rpc_server.stop(graceful) + + def run_action(self, rpc_ctx, action_ex_id, action_class_str, + attributes, params, safe_rerun): + """Receives calls over RPC to run action on executor. + + :param rpc_ctx: RPC request context dictionary. + :param action_ex_id: Action execution id. + :param action_class_str: Action class name. + :param attributes: Action class attributes. + :param params: Action input parameters. + :param safe_rerun: Tells if given action can be safely rerun. + :return: Action result. + """ + + LOG.info( + "Received RPC request 'run_action'[rpc_ctx=%s," + " action_ex_id=%s, action_class=%s, attributes=%s, params=%s]" + % (rpc_ctx, action_ex_id, action_class_str, attributes, params) + ) + + redelivered = rpc_ctx.redelivered or False + + return self.executor.run_action( + action_ex_id, + action_class_str, + attributes, + params, + safe_rerun, + redelivered + ) + + +def get_oslo_service(setup_profiler=True): + return ExecutorServer( + default_executor.DefaultExecutor(), + setup_profiler=setup_profiler + ) diff --git a/mistral/engine/rpc_backend/base.py b/mistral/engine/rpc_backend/base.py index a065e967..e5df5ca0 100644 --- a/mistral/engine/rpc_backend/base.py +++ b/mistral/engine/rpc_backend/base.py @@ -73,5 +73,22 @@ class RPCServer(object): def run(self, executor='blocking'): """Runs the RPC server. + :param executor: Executor used to process incoming requests. Different + implementations may support different options. """ raise NotImplementedError + + def stop(self, graceful=False): + """Stop the RPC server. + + :param graceful: True if this method call should wait till all + internal threads are finished. + :return: + """ + # No-op by default. + pass + + def wait(self): + """Wait till all internal threads are finished.""" + # No-op by default. + pass diff --git a/mistral/engine/rpc_backend/kombu/kombu_server.py b/mistral/engine/rpc_backend/kombu/kombu_server.py index 4e43f686..90b364a2 100644 --- a/mistral/engine/rpc_backend/kombu/kombu_server.py +++ b/mistral/engine/rpc_backend/kombu/kombu_server.py @@ -47,6 +47,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): self.channel = None self.conn = None self._running = threading.Event() + self._stopped = threading.Event() self.endpoints = [] @property @@ -63,6 +64,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): self.password, self.virtual_host, ) + LOG.info("Connected to AMQP at %s:%s" % (self.host, self.port)) try: @@ -84,7 +86,10 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): callbacks=[self._on_message_safe], ) as consumer: consumer.qos(prefetch_count=1) + self._running.set() + self._stopped.clear() + while self.is_running: try: conn.drain_events(timeout=1) @@ -92,16 +97,25 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): pass except KeyboardInterrupt: self.stop() + LOG.info("Server with id='{0}' stopped.".format( self.server_id)) + return except socket.error as e: raise exc.MistralException("Broker connection failed: %s" % e) + finally: + self._stopped.set() - def stop(self): - """Stop the server.""" + def stop(self, graceful=False): self._running.clear() + if graceful: + self.wait() + + def wait(self): + self._stopped.wait() + def _get_rpc_method(self, method_name): for endpoint in self.endpoints: if hasattr(endpoint, method_name): diff --git a/mistral/engine/rpc_backend/oslo/oslo_server.py b/mistral/engine/rpc_backend/oslo/oslo_server.py index 70ee46cf..fec1f57d 100644 --- a/mistral/engine/rpc_backend/oslo/oslo_server.py +++ b/mistral/engine/rpc_backend/oslo/oslo_server.py @@ -34,6 +34,7 @@ class OsloRPCServer(rpc_base.RPCServer): self.channel = None self.connection = None self.endpoints = [] + self.oslo_server = None def register_endpoint(self, endpoint): self.endpoints.append(endpoint) @@ -44,7 +45,9 @@ class OsloRPCServer(rpc_base.RPCServer): server=self.server_id ) - server = messaging.get_rpc_server( + # TODO(rakhmerov): rpc.get_transport() should be in oslo.messaging + # related module. + self.oslo_server = messaging.get_rpc_server( rpc.get_transport(), target, self.endpoints, @@ -52,5 +55,13 @@ class OsloRPCServer(rpc_base.RPCServer): serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) ) - server.start() - server.wait() + self.oslo_server.start() + + def stop(self, graceful=False): + self.oslo_server.stop() + + if graceful: + self.oslo_server.wait() + + def wait(self): + self.oslo_server.wait() diff --git a/mistral/engine/rpc_backend/rpc.py b/mistral/engine/rpc_backend/rpc.py index 64dd09fb..4816ee17 100644 --- a/mistral/engine/rpc_backend/rpc.py +++ b/mistral/engine/rpc_backend/rpc.py @@ -52,6 +52,9 @@ def cleanup(): _EVENT_ENGINE_CLIENT = None +# TODO(rakhmerov): This method seems misplaced. Now we have different kind +# of transports (oslo, kombu) and this module should not have any oslo +# specific things anymore. def get_transport(): global _TRANSPORT @@ -120,174 +123,6 @@ def get_rpc_client_driver(): return _IMPL_CLIENT -class EngineServer(object): - """RPC Engine server.""" - - def __init__(self, engine): - self._engine = engine - - def start_workflow(self, rpc_ctx, workflow_identifier, workflow_input, - description, params): - """Receives calls over RPC to start workflows on engine. - - :param rpc_ctx: RPC request context. - :param workflow_identifier: Workflow definition identifier. - :param workflow_input: Workflow input. - :param description: Workflow execution description. - :param params: Additional workflow type specific parameters. - :return: Workflow execution. - """ - - LOG.info( - "Received RPC request 'start_workflow'[rpc_ctx=%s," - " workflow_identifier=%s, workflow_input=%s, description=%s, " - "params=%s]" - % (rpc_ctx, workflow_identifier, workflow_input, description, - params) - ) - - return self._engine.start_workflow( - workflow_identifier, - workflow_input, - description, - **params - ) - - def start_action(self, rpc_ctx, action_name, - action_input, description, params): - """Receives calls over RPC to start actions on engine. - - :param rpc_ctx: RPC request context. - :param action_name: name of the Action. - :param action_input: input dictionary for Action. - :param description: description of new Action execution. - :param params: extra parameters to run Action. - :return: Action execution. - """ - LOG.info( - "Received RPC request 'start_action'[rpc_ctx=%s," - " name=%s, input=%s, description=%s, params=%s]" - % (rpc_ctx, action_name, action_input, description, params) - ) - - return self._engine.start_action( - action_name, - action_input, - description, - **params - ) - - def on_task_state_change(self, rpc_ctx, task_ex_id, state, - state_info=None): - return self._engine.on_task_state_change(task_ex_id, state, state_info) - - def on_action_complete(self, rpc_ctx, action_ex_id, result_data, - result_error, wf_action): - """Receives RPC calls to communicate action result to engine. - - :param rpc_ctx: RPC request context. - :param action_ex_id: Action execution id. - :param result_data: Action result data. - :param result_error: Action result error. - :param wf_action: True if given id points to a workflow execution. - :return: Action execution. - """ - - result = wf_utils.Result(result_data, result_error) - - LOG.info( - "Received RPC request 'on_action_complete'[rpc_ctx=%s," - " action_ex_id=%s, result=%s]" % (rpc_ctx, action_ex_id, result) - ) - - return self._engine.on_action_complete(action_ex_id, result, wf_action) - - def pause_workflow(self, rpc_ctx, execution_id): - """Receives calls over RPC to pause workflows on engine. - - :param rpc_ctx: Request context. - :param execution_id: Workflow execution id. - :return: Workflow execution. - """ - - LOG.info( - "Received RPC request 'pause_workflow'[rpc_ctx=%s," - " execution_id=%s]" % (rpc_ctx, execution_id) - ) - - return self._engine.pause_workflow(execution_id) - - def rerun_workflow(self, rpc_ctx, task_ex_id, reset=True, env=None): - """Receives calls over RPC to rerun workflows on engine. - - :param rpc_ctx: RPC request context. - :param task_ex_id: Task execution id. - :param reset: If true, then purge action execution for the task. - :param env: Environment variables to update. - :return: Workflow execution. - """ - - LOG.info( - "Received RPC request 'rerun_workflow'[rpc_ctx=%s, " - "task_ex_id=%s]" % (rpc_ctx, task_ex_id) - ) - - return self._engine.rerun_workflow(task_ex_id, reset, env) - - def resume_workflow(self, rpc_ctx, wf_ex_id, env=None): - """Receives calls over RPC to resume workflows on engine. - - :param rpc_ctx: RPC request context. - :param wf_ex_id: Workflow execution id. - :param env: Environment variables to update. - :return: Workflow execution. - """ - - LOG.info( - "Received RPC request 'resume_workflow'[rpc_ctx=%s, " - "wf_ex_id=%s]" % (rpc_ctx, wf_ex_id) - ) - - return self._engine.resume_workflow(wf_ex_id, env) - - def stop_workflow(self, rpc_ctx, execution_id, state, message=None): - """Receives calls over RPC to stop workflows on engine. - - Sets execution state to SUCCESS or ERROR. No more tasks will be - scheduled. Running tasks won't be killed, but their results - will be ignored. - - :param rpc_ctx: RPC request context. - :param execution_id: Workflow execution id. - :param state: State assigned to the workflow. Permitted states are - SUCCESS or ERROR. - :param message: Optional information string. - - :return: Workflow execution. - """ - - LOG.info( - "Received RPC request 'stop_workflow'[rpc_ctx=%s, execution_id=%s," - " state=%s, message=%s]" % (rpc_ctx, execution_id, state, message) - ) - - return self._engine.stop_workflow(execution_id, state, message) - - def rollback_workflow(self, rpc_ctx, execution_id): - """Receives calls over RPC to rollback workflows on engine. - - :param rpc_ctx: RPC request context. - :return: Workflow execution. - """ - - LOG.info( - "Received RPC request 'rollback_workflow'[rpc_ctx=%s," - " execution_id=%s]" % (rpc_ctx, execution_id) - ) - - return self._engine.resume_workflow(execution_id) - - def _wrap_exception_and_reraise(exception): message = "%s: %s" % (exception.__class__.__name__, exception.args[0]) @@ -498,43 +333,6 @@ class EngineClient(base.Engine): ) -class ExecutorServer(object): - """RPC Executor server.""" - - def __init__(self, executor): - self._executor = executor - - def run_action(self, rpc_ctx, action_ex_id, action_class_str, - attributes, params, safe_rerun): - """Receives calls over RPC to run action on executor. - - :param rpc_ctx: RPC request context dictionary. - :param action_ex_id: Action execution id. - :param action_class_str: Action class name. - :param attributes: Action class attributes. - :param params: Action input parameters. - :param safe_rerun: Tells if given action can be safely rerun. - :return: Action result. - """ - - LOG.info( - "Received RPC request 'run_action'[rpc_ctx=%s," - " action_ex_id=%s, action_class=%s, attributes=%s, params=%s]" - % (rpc_ctx, action_ex_id, action_class_str, attributes, params) - ) - - redelivered = rpc_ctx.redelivered or False - - return self._executor.run_action( - action_ex_id, - action_class_str, - attributes, - params, - safe_rerun, - redelivered - ) - - class ExecutorClient(base.Executor): """RPC Executor client.""" @@ -588,37 +386,6 @@ class ExecutorClient(base.Executor): ) -class EventEngineServer(object): - """RPC EventEngine server.""" - - def __init__(self, event_engine): - self._event_engine = event_engine - - def create_event_trigger(self, rpc_ctx, trigger, events): - LOG.info( - "Received RPC request 'create_event_trigger'[rpc_ctx=%s," - " trigger=%s, events=%s", rpc_ctx, trigger, events - ) - - return self._event_engine.create_event_trigger(trigger, events) - - def delete_event_trigger(self, rpc_ctx, trigger, events): - LOG.info( - "Received RPC request 'delete_event_trigger'[rpc_ctx=%s," - " trigger=%s, events=%s", rpc_ctx, trigger, events - ) - - return self._event_engine.delete_event_trigger(trigger, events) - - def update_event_trigger(self, rpc_ctx, trigger): - LOG.info( - "Received RPC request 'update_event_trigger'[rpc_ctx=%s," - " trigger=%s", rpc_ctx, trigger - ) - - return self._event_engine.update_event_trigger(trigger) - - class EventEngineClient(base.EventEngine): """RPC EventEngine client.""" diff --git a/mistral/event_engine/__init__.py b/mistral/event_engine/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/services/event_engine.py b/mistral/event_engine/event_engine.py similarity index 97% rename from mistral/services/event_engine.py rename to mistral/event_engine/event_engine.py index aef425c4..c34a0696 100644 --- a/mistral/services/event_engine.py +++ b/mistral/event_engine/event_engine.py @@ -24,16 +24,18 @@ import six import yaml from mistral import context as auth_ctx -from mistral import coordination from mistral.db.v2 import api as db_api +from mistral.engine.rpc_backend import rpc from mistral import exceptions from mistral import expressions from mistral import messaging as mistral_messaging from mistral.services import security + LOG = logging.getLogger(__name__) CONF = cfg.CONF + DEFAULT_PROPERTIES = { 'service': '<% $.publisher %>', 'project_id': '<% $.context.project_id %>', @@ -124,16 +126,14 @@ class NotificationsConverter(object): return edef.convert(event) -class EventEngine(coordination.Service): +class EventEngine(object): """Event engine server. A separate service that is responsible for listening event notification - and trigger workflows defined by end user. + and triggering workflows defined by end user. """ - def __init__(self, engine_client): - coordination.Service.__init__(self, 'event_engine_group') - - self.engine_client = engine_client + def __init__(self): + self.engine_client = rpc.get_engine_client() self.event_queue = six.moves.queue.Queue() self.handler_tg = threadgroup.ThreadGroup() diff --git a/mistral/event_engine/event_engine_server.py b/mistral/event_engine/event_engine_server.py new file mode 100644 index 00000000..8e34c7c6 --- /dev/null +++ b/mistral/event_engine/event_engine_server.py @@ -0,0 +1,94 @@ +# Copyright 2016 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral import config as cfg +from mistral.engine.rpc_backend import rpc +from mistral.event_engine import event_engine +from mistral.service import base as service_base +from mistral.utils import profiler as profiler_utils +from mistral.utils import rpc_utils + + +LOG = logging.getLogger(__name__) + + +class EventEngineServer(service_base.MistralService): + """RPC EventEngine server. + + This class manages event engine life-cycle and gets registered as + an RPC endpoint to process event engine specific calls. It also + registers a cluster member associated with this instance of event + engine. + """ + + def __init__(self, event_engine): + super(EventEngineServer, self).__init__('event_engine_group') + + self._event_engine = event_engine + self._rpc_server = None + + def start(self): + super(EventEngineServer, self).start() + + profiler_utils.setup( + 'mistral-event-engine', + cfg.CONF.event_engine.host + ) + + # Initialize and start RPC server. + + self._rpc_server = rpc.get_rpc_server_driver()( + rpc_utils.get_rpc_info_from_oslo(cfg.CONF.event_engine) + ) + self._rpc_server.register_endpoint(self) + + self._rpc_server.run() + + self._notify_started('Event engine server started.') + + def stop(self, graceful=False): + super(EventEngineServer, self).stop(graceful) + + if self._rpc_server: + self._rpc_server.stop(graceful) + + def create_event_trigger(self, rpc_ctx, trigger, events): + LOG.info( + "Received RPC request 'create_event_trigger'[rpc_ctx=%s," + " trigger=%s, events=%s", rpc_ctx, trigger, events + ) + + return self._event_engine.create_event_trigger(trigger, events) + + def delete_event_trigger(self, rpc_ctx, trigger, events): + LOG.info( + "Received RPC request 'delete_event_trigger'[rpc_ctx=%s," + " trigger=%s, events=%s", rpc_ctx, trigger, events + ) + + return self._event_engine.delete_event_trigger(trigger, events) + + def update_event_trigger(self, rpc_ctx, trigger): + LOG.info( + "Received RPC request 'update_event_trigger'[rpc_ctx=%s," + " trigger=%s", rpc_ctx, trigger + ) + + return self._event_engine.update_event_trigger(trigger) + + +def get_oslo_service(): + return EventEngineServer(event_engine.EventEngine()) diff --git a/mistral/service/__init__.py b/mistral/service/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/service/base.py b/mistral/service/base.py new file mode 100644 index 00000000..c1dd203c --- /dev/null +++ b/mistral/service/base.py @@ -0,0 +1,63 @@ +# Copyright 2016 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from eventlet import event +from oslo_log import log +from oslo_service import service + +from mistral.service import coordination + + +LOG = log.getLogger(__name__) + + +class MistralService(service.Service): + """Base class for Mistral services. + + The term 'service' here means any Mistral component that can run as + an independent process and thus can be registered as a cluster member. + """ + def __init__(self, cluster_group, setup_profiler=True): + super(MistralService, self).__init__() + + self.cluster_member = coordination.Service(cluster_group) + self._setup_profiler = setup_profiler + self._started = event.Event() + + def wait_started(self): + """Wait until the service is fully started.""" + self._started.wait() + + def _notify_started(self, message): + print(message) + + self._started.send() + + def start(self): + super(MistralService, self).start() + + self.cluster_member.register_membership() + + def stop(self, graceful=False): + super(MistralService, self).stop(graceful) + + self._started = event.Event() + + # TODO(rakhmerov): Probably we could also take care of an RPC server + # if it exists for this particular service type. Take a look at + # executor and engine servers. + + # TODO(rakhmerov): This method is not implemented correctly now + # (not thread-safe). Uncomment this call once it's fixed. + # self.cluster_member.stop() diff --git a/mistral/coordination.py b/mistral/service/coordination.py similarity index 100% rename from mistral/coordination.py rename to mistral/service/coordination.py diff --git a/mistral/services/expiration_policy.py b/mistral/services/expiration_policy.py index 27d2e8f9..98b4e4aa 100644 --- a/mistral/services/expiration_policy.py +++ b/mistral/services/expiration_policy.py @@ -110,3 +110,5 @@ def setup(): periodic_interval_max=1, context=ctx ) + + return tg diff --git a/mistral/services/scheduler.py b/mistral/services/scheduler.py index efa7337f..1041a557 100644 --- a/mistral/services/scheduler.py +++ b/mistral/services/scheduler.py @@ -200,21 +200,33 @@ class CallScheduler(periodic_task.PeriodicTasks): ) -def setup(): +def start(): tg = threadgroup.ThreadGroup() - scheduler = CallScheduler(CONF) + sched = CallScheduler(CONF) tg.add_dynamic_timer( - scheduler.run_periodic_tasks, + sched.run_periodic_tasks, initial_delay=None, periodic_interval_max=1, context=None ) - _schedulers[scheduler] = tg + _schedulers[sched] = tg - return tg + return sched + + +def stop_scheduler(sched, graceful=False): + if sched: + tg = _schedulers[sched] + + tg.stop() + + del _schedulers[sched] + + if graceful: + tg.wait() def stop_all_schedulers(): diff --git a/mistral/tests/unit/api/v2/test_services.py b/mistral/tests/unit/api/v2/test_services.py index 1d7010fe..a86c5a00 100644 --- a/mistral/tests/unit/api/v2/test_services.py +++ b/mistral/tests/unit/api/v2/test_services.py @@ -17,7 +17,7 @@ from oslo_config import cfg import tooz.coordination from webtest import app as webtest_app -from mistral import coordination +from mistral.service import coordination from mistral.tests.unit.api import base @@ -54,7 +54,7 @@ class TestServicesController(base.APITest): self.assertIn('Service API is not supported', context.args[0]) - @mock.patch('mistral.coordination.ServiceCoordinator.get_members', + @mock.patch('mistral.service.coordination.ServiceCoordinator.get_members', side_effect=tooz.coordination.ToozError('error message')) def test_get_all_with_get_members_error(self, mock_get_members): cfg.CONF.set_default('backend_url', 'zake://', 'coordination') diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index f92af668..0c2ef249 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -18,13 +18,12 @@ import eventlet from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging +from oslo_service import service -from mistral import context as ctx from mistral.db.v2 import api as db_api -from mistral.engine import default_engine as def_eng -from mistral.engine import default_executor as def_exec +from mistral.engine import engine_server +from mistral.engine import executor_server from mistral.engine.rpc_backend import rpc -from mistral.services import scheduler from mistral.tests.unit import base from mistral.workflow import states @@ -36,54 +35,12 @@ DEFAULT_DELAY = 1 DEFAULT_TIMEOUT = 30 -def launch_engine_server(transport, engine): - target = messaging.Target( - topic=cfg.CONF.engine.topic, - server=cfg.CONF.engine.host - ) +def launch_service(s): + launcher = service.ServiceLauncher(cfg.CONF) - server = messaging.get_rpc_server( - transport, - target, - [rpc.EngineServer(engine)], - executor='blocking', - serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) - ) + launcher.launch_service(s) - try: - server.start() - while True: - eventlet.sleep(604800) - except (KeyboardInterrupt, SystemExit): - LOG.info("Stopping engine service...") - finally: - server.stop() - server.wait() - - -def launch_executor_server(transport, executor): - target = messaging.Target( - topic=cfg.CONF.executor.topic, - server=cfg.CONF.executor.host - ) - - server = messaging.get_rpc_server( - transport, - target, - [rpc.ExecutorServer(executor)], - executor='blocking', - serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) - ) - - try: - server.start() - while True: - eventlet.sleep(604800) - except (KeyboardInterrupt, SystemExit): - LOG.info("Stopping executor service...") - finally: - server.stop() - server.wait() + launcher.wait() class EngineTestCase(base.DbTestCase): @@ -100,28 +57,35 @@ class EngineTestCase(base.DbTestCase): # Drop all RPC objects (transport, clients). rpc.cleanup() - transport = rpc.get_transport() self.engine_client = rpc.get_engine_client() self.executor_client = rpc.get_executor_client() - self.engine = def_eng.DefaultEngine(self.engine_client) - self.executor = def_exec.DefaultExecutor(self.engine_client) - LOG.info("Starting engine and executor threads...") + engine_service = engine_server.get_oslo_service(setup_profiler=False) + executor_service = executor_server.get_oslo_service( + setup_profiler=False + ) + + self.engine = engine_service.engine + self.executor = executor_service.executor + self.threads = [ - eventlet.spawn(launch_engine_server, transport, self.engine), - eventlet.spawn(launch_executor_server, transport, self.executor), + eventlet.spawn(launch_service, executor_service), + eventlet.spawn(launch_service, engine_service) ] self.addOnException(self.print_executions) - # Start scheduler. - scheduler_thread_group = scheduler.setup() - + self.addCleanup(executor_service.stop, True) + self.addCleanup(engine_service.stop, True) self.addCleanup(self.kill_threads) - self.addCleanup(scheduler_thread_group.stop) + + # Make sure that both services fully started, otherwise + # the test may run too early. + executor_service.wait_started() + engine_service.wait_started() def kill_threads(self): LOG.info("Finishing engine and executor threads...") diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index d397b2b0..f734b13f 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -23,6 +23,7 @@ from oslo_utils import uuidutils from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models from mistral.engine import default_engine as d_eng +from mistral.engine.rpc_backend import rpc from mistral import exceptions as exc from mistral.services import workbooks as wb_service from mistral.tests.unit import base @@ -92,6 +93,7 @@ MOCK_ENVIRONMENT = mock.MagicMock(return_value=ENVIRONMENT_DB) MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) +@mock.patch.object(rpc, 'get_executor_client', mock.Mock()) class DefaultEngineTest(base.DbTestCase): def setUp(self): super(DefaultEngineTest, self).setUp() @@ -100,7 +102,7 @@ class DefaultEngineTest(base.DbTestCase): # Note: For purposes of this test we can easily use # simple magic mocks for engine and executor clients - self.engine = d_eng.DefaultEngine(mock.MagicMock()) + self.engine = d_eng.DefaultEngine() def test_start_workflow(self): wf_input = {'param1': 'Hey', 'param2': 'Hi'} diff --git a/mistral/tests/unit/engine/test_environment.py b/mistral/tests/unit/engine/test_environment.py index fd7ed2ca..b2fee4df 100644 --- a/mistral/tests/unit/engine/test_environment.py +++ b/mistral/tests/unit/engine/test_environment.py @@ -80,7 +80,7 @@ workflows: def _run_at_target(action_ex_id, action_class_str, attributes, action_params, target=None, async=True, safe_rerun=False): # We'll just call executor directly for testing purposes. - executor = default_executor.DefaultExecutor(rpc.get_engine_client()) + executor = default_executor.DefaultExecutor() executor.run_action( action_ex_id, diff --git a/mistral/tests/unit/engine/test_join.py b/mistral/tests/unit/engine/test_join.py index 1aa749b4..a5859a6f 100644 --- a/mistral/tests/unit/engine/test_join.py +++ b/mistral/tests/unit/engine/test_join.py @@ -221,13 +221,18 @@ class JoinEngineTest(base.EngineTestCase): task4 = self._assert_single_item(tasks, name='task4') # NOTE(xylan): We ensure task4 is successful here because of the - # uncertainty of its running parallelly with task3. + # uncertainty of its running in parallel with task3. self.await_task_success(task4.id) self.assertEqual(states.RUNNING, wf_ex.state) self.assertEqual(states.SUCCESS, task1.state) self.assertEqual(states.SUCCESS, task2.state) - self.assertEqual(states.WAITING, task3.state) + + # NOTE(rakhmerov): Task 3 must fail because task2->task3 transition + # will never trigger due to its condition. + self.await_task_error(task3.id) + + self.await_workflow_error(wf_ex.id) def test_partial_join(self): wf_text = """--- diff --git a/mistral/tests/unit/engine/test_safe_rerun.py b/mistral/tests/unit/engine/test_safe_rerun.py index 0c1c9ae8..9d9d38d9 100644 --- a/mistral/tests/unit/engine/test_safe_rerun.py +++ b/mistral/tests/unit/engine/test_safe_rerun.py @@ -27,7 +27,7 @@ from mistral.workflow import states def _run_at_target(action_ex_id, action_class_str, attributes, action_params, target=None, async=True, safe_rerun=False): # We'll just call executor directly for testing purposes. - executor = default_executor.DefaultExecutor(rpc.get_engine_client()) + executor = default_executor.DefaultExecutor() executor.run_action( action_ex_id, @@ -43,7 +43,6 @@ MOCK_RUN_AT_TARGET = mock.MagicMock(side_effect=_run_at_target) class TestSafeRerun(base.EngineTestCase): - @mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET) def test_safe_rerun_true(self): wf_text = """--- diff --git a/mistral/tests/unit/engine/test_task_defaults.py b/mistral/tests/unit/engine/test_task_defaults.py index 845664a7..62e43597 100644 --- a/mistral/tests/unit/engine/test_task_defaults.py +++ b/mistral/tests/unit/engine/test_task_defaults.py @@ -18,7 +18,6 @@ from oslo_config import cfg import requests from mistral.db.v2 import api as db_api -from mistral.services import scheduler from mistral.services import workflows as wf_service from mistral.tests.unit.engine import base from mistral.workflow import states @@ -83,13 +82,6 @@ class TaskDefaultsDirectWorkflowEngineTest(base.EngineTestCase): class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase): - def setUp(self): - super(TaskDefaultsReverseWorkflowEngineTest, self).setUp() - - thread_group = scheduler.setup() - - self.addCleanup(thread_group.stop) - def test_task_defaults_retry_policy(self): wf_text = """--- version: '2.0' diff --git a/mistral/tests/unit/services/test_event_engine.py b/mistral/tests/unit/services/test_event_engine.py index 3a3f6cac..195d96f0 100644 --- a/mistral/tests/unit/services/test_event_engine.py +++ b/mistral/tests/unit/services/test_event_engine.py @@ -18,7 +18,8 @@ import mock from oslo_config import cfg from mistral.db.v2.sqlalchemy import api as db_api -from mistral.services import event_engine +from mistral.engine.rpc_backend import rpc +from mistral.event_engine import event_engine from mistral.services import workflows from mistral.tests.unit import base @@ -55,10 +56,13 @@ class EventEngineTest(base.DbTestCase): super(EventEngineTest, self).setUp() self.wf = workflows.create_workflows(WORKFLOW_LIST)[0] + EVENT_TRIGGER['workflow_id'] = self.wf.id + @mock.patch.object(rpc, 'get_engine_client', mock.Mock()) def test_event_engine_start_with_no_triggers(self): - e_engine = event_engine.EventEngine(mock.Mock()) + e_engine = event_engine.EventEngine() + self.addCleanup(e_engine.handler_tg.stop) self.assertEqual(0, len(e_engine.event_triggers_map)) @@ -66,10 +70,12 @@ class EventEngineTest(base.DbTestCase): self.assertEqual(0, len(e_engine.exchange_topic_listener_map)) @mock.patch('mistral.messaging.start_listener') + @mock.patch.object(rpc, 'get_engine_client', mock.Mock()) def test_event_engine_start_with_triggers(self, mock_start): trigger = db_api.create_event_trigger(EVENT_TRIGGER) - e_engine = event_engine.EventEngine(mock.MagicMock()) + e_engine = event_engine.EventEngine() + self.addCleanup(e_engine.handler_tg.stop) self.assertEqual(1, len(e_engine.exchange_topic_events_map)) @@ -86,11 +92,12 @@ class EventEngineTest(base.DbTestCase): self.assertEqual(1, len(e_engine.exchange_topic_listener_map)) @mock.patch('mistral.messaging.start_listener') + @mock.patch.object(rpc, 'get_engine_client', mock.Mock()) def test_process_event_queue(self, mock_start): db_api.create_event_trigger(EVENT_TRIGGER) - client = mock.MagicMock() - e_engine = event_engine.EventEngine(client) + e_engine = event_engine.EventEngine() + self.addCleanup(e_engine.handler_tg.stop) event = { @@ -103,6 +110,7 @@ class EventEngineTest(base.DbTestCase): with mock.patch.object(e_engine, 'engine_client') as client_mock: e_engine.event_queue.put(event) + time.sleep(1) self.assertEqual(1, client_mock.start_workflow.call_count) diff --git a/mistral/tests/unit/services/test_scheduler.py b/mistral/tests/unit/services/test_scheduler.py index 15db8fb4..ac5197ed 100644 --- a/mistral/tests/unit/services/test_scheduler.py +++ b/mistral/tests/unit/services/test_scheduler.py @@ -81,9 +81,9 @@ class SchedulerServiceTest(base.DbTestCase): def setUp(self): super(SchedulerServiceTest, self).setUp() - self.thread_group = scheduler.setup() + sched = scheduler.start() - self.addCleanup(self.thread_group.stop) + self.addCleanup(scheduler.stop_scheduler, sched, True) @mock.patch(FACTORY_METHOD_PATH) def test_scheduler_with_factory(self, factory): @@ -242,11 +242,12 @@ class SchedulerServiceTest(base.DbTestCase): @mock.patch(TARGET_METHOD_PATH) def test_scheduler_multi_instance(self, method): - def stop_thread_groups(): - [tg.stop() for tg in self.tgs] + scheds = [scheduler.start(), scheduler.start()] - self.tgs = [scheduler.setup(), scheduler.setup()] - self.addCleanup(stop_thread_groups) + def stop_schedulers(): + [scheduler.stop_scheduler(s, True) for s in scheds] + + self.addCleanup(stop_schedulers) method_args = {'name': 'task', 'id': '321'} diff --git a/mistral/tests/unit/test_coordination.py b/mistral/tests/unit/test_coordination.py index 47a5065b..377a5d42 100644 --- a/mistral/tests/unit/test_coordination.py +++ b/mistral/tests/unit/test_coordination.py @@ -16,7 +16,7 @@ import mock from oslo_config import cfg import six -from mistral import coordination +from mistral.service import coordination from mistral.tests.unit import base