diff --git a/mistral/api/app.py b/mistral/api/app.py index c84814b1..a2807e85 100644 --- a/mistral/api/app.py +++ b/mistral/api/app.py @@ -21,9 +21,9 @@ import pecan from mistral.api import access_control from mistral import config as m_config from mistral import context as ctx -from mistral import coordination from mistral.db.v2 import api as db_api_v2 from mistral.engine.rpc_backend import rpc +from mistral.service import coordination from mistral.services import periodic @@ -68,6 +68,7 @@ def setup_app(config=None): # Set up access control. app = access_control.setup(app) + # TODO(rakhmerov): need to get rid of this call. # Set up RPC related flags in config rpc.get_transport() diff --git a/mistral/api/controllers/v2/service.py b/mistral/api/controllers/v2/service.py index d20450ee..9c47b2d0 100644 --- a/mistral/api/controllers/v2/service.py +++ b/mistral/api/controllers/v2/service.py @@ -21,10 +21,12 @@ import wsmeext.pecan as wsme_pecan from mistral.api import access_control as acl from mistral.api.controllers.v2 import resources +# TODO(rakhmerov): invalid dependency, a REST controller must not depend on +# a launch script. from mistral.cmd import launch from mistral import context -from mistral import coordination from mistral import exceptions as exc +from mistral.service import coordination from mistral.utils import rest_utils diff --git a/mistral/api/service.py b/mistral/api/service.py index 7242ebcb..ce32e675 100644 --- a/mistral/api/service.py +++ b/mistral/api/service.py @@ -26,8 +26,9 @@ class WSGIService(service.ServiceBase): def __init__(self, name): self.name = name self.app = app.setup_app() - self.workers = (cfg.CONF.api.api_workers or - processutils.get_worker_count()) + self.workers = ( + cfg.CONF.api.api_workers or processutils.get_worker_count() + ) self.server = wsgi.Server( cfg.CONF, @@ -49,7 +50,3 @@ class WSGIService(service.ServiceBase): def reset(self): self.server.reset() - - -def process_launcher(): - return service.ProcessLauncher(cfg.CONF) diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index 422f28cc..c965bd20 100644 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -39,18 +39,14 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'mistral', '__init__.py')): from oslo_config import cfg from oslo_log import log as logging +from oslo_service import service -from mistral.api import service as mistral_service +from mistral.api import service as api_service from mistral import config -from mistral.db.v2 import api as db_api -from mistral.engine import default_engine as def_eng -from mistral.engine import default_executor as def_executor +from mistral.engine import engine_server +from mistral.engine import executor_server from mistral.engine.rpc_backend import rpc -from mistral.services import event_engine -from mistral.services import expiration_policy -from mistral.services import scheduler -from mistral.utils import profiler -from mistral.utils import rpc_utils +from mistral.event_engine import event_engine_server from mistral import version @@ -58,84 +54,48 @@ CONF = cfg.CONF def launch_executor(): - profiler.setup('mistral-executor', cfg.CONF.executor.host) - - executor_v2 = def_executor.DefaultExecutor(rpc.get_engine_client()) - executor_endpoint = rpc.ExecutorServer(executor_v2) - - executor_server = rpc.get_rpc_server_driver()( - rpc_utils.get_rpc_info_from_oslo(CONF.executor) - ) - executor_server.register_endpoint(executor_endpoint) - - executor_v2.register_membership() - try: - executor_server.run(executor='threading') - except (KeyboardInterrupt, SystemExit): - pass - finally: - print("Stopping executor service...") + launcher = service.ServiceLauncher(CONF) + + launcher.launch_service(executor_server.get_oslo_service()) + + launcher.wait() + except RuntimeError as e: + sys.stderr.write("ERROR: %s\n" % e) + sys.exit(1) def launch_engine(): - profiler.setup('mistral-engine', cfg.CONF.engine.host) - - engine_v2 = def_eng.DefaultEngine(rpc.get_engine_client()) - - engine_endpoint = rpc.EngineServer(engine_v2) - - # Setup scheduler in engine. - db_api.setup_db() - scheduler.setup() - - # Setup expiration policy - expiration_policy.setup() - - engine_server = rpc.get_rpc_server_driver()( - rpc_utils.get_rpc_info_from_oslo(CONF.engine) - ) - engine_server.register_endpoint(engine_endpoint) - - engine_v2.register_membership() - try: - # Note(ddeja): Engine needs to be run in default (blocking) mode - # since using another mode may lead to deadlock. - # See https://review.openstack.org/#/c/356343/ - # for more info. - engine_server.run() - except (KeyboardInterrupt, SystemExit): - pass - finally: - print("Stopping engine service...") + launcher = service.ServiceLauncher(CONF) + + launcher.launch_service(engine_server.get_oslo_service()) + + launcher.wait() + except RuntimeError as e: + sys.stderr.write("ERROR: %s\n" % e) + sys.exit(1) def launch_event_engine(): - profiler.setup('mistral-event-engine', cfg.CONF.event_engine.host) - - event_eng = event_engine.EventEngine(rpc.get_engine_client()) - endpoint = rpc.EventEngineServer(event_eng) - - event_engine_server = rpc.get_rpc_server_driver()( - rpc_utils.get_rpc_info_from_oslo(CONF.event_engine) - ) - event_engine_server.register_endpoint(endpoint) - - event_eng.register_membership() - try: - event_engine_server.run() - except (KeyboardInterrupt, SystemExit): - pass - finally: - print("Stopping event_engine service...") + launcher = service.ServiceLauncher(CONF) + + launcher.launch_service(event_engine_server.get_oslo_service()) + + launcher.wait() + except RuntimeError as e: + sys.stderr.write("ERROR: %s\n" % e) + sys.exit(1) def launch_api(): - launcher = mistral_service.process_launcher() - server = mistral_service.WSGIService('mistral_api') + launcher = service.ProcessLauncher(cfg.CONF) + + server = api_service.WSGIService('mistral_api') + launcher.launch_service(server, workers=server.workers) + launcher.wait() @@ -144,8 +104,6 @@ def launch_any(options): threads = [eventlet.spawn(LAUNCH_OPTIONS[option]) for option in options] - print('Server started.') - [thread.wait() for thread in threads] diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 7f14584f..f2fcd61b 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -16,6 +16,7 @@ import contextlib import sys +import threading from oslo_config import cfg from oslo_db import exception as db_exc @@ -42,6 +43,10 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) +_SCHEMA_LOCK = threading.RLock() +_initialized = False + + def get_backend(): """Consumed by openstack common code. @@ -52,20 +57,33 @@ def get_backend(): def setup_db(): - try: - models.Workbook.metadata.create_all(b.get_engine()) - except sa.exc.OperationalError as e: - raise exc.DBError("Failed to setup database: %s" % e) + global _initialized + + with _SCHEMA_LOCK: + if _initialized: + return + + try: + models.Workbook.metadata.create_all(b.get_engine()) + + _initialized = True + except sa.exc.OperationalError as e: + raise exc.DBError("Failed to setup database: %s" % e) def drop_db(): - global _facade + global _initialized - try: - models.Workbook.metadata.drop_all(b.get_engine()) - _facade = None - except Exception as e: - raise exc.DBError("Failed to drop database: %s" % e) + with _SCHEMA_LOCK: + if not _initialized: + return + + try: + models.Workbook.metadata.drop_all(b.get_engine()) + + _initialized = False + except Exception as e: + raise exc.DBError("Failed to drop database: %s" % e) # Transaction management. diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index b1600b5d..8a56dfb5 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -17,7 +17,6 @@ from oslo_log import log as logging from osprofiler import profiler -from mistral import coordination from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import action_handler @@ -36,17 +35,13 @@ LOG = logging.getLogger(__name__) # the submodules are referenced. -class DefaultEngine(base.Engine, coordination.Service): - def __init__(self, engine_client): - self._engine_client = engine_client - - coordination.Service.__init__(self, 'engine_group') - +class DefaultEngine(base.Engine): @action_queue.process @u.log_exec(LOG) @profiler.trace('engine-start-workflow') def start_workflow(self, wf_identifier, wf_input, description='', **params): + with db_api.transaction(): wf_ex = wf_handler.start_workflow( wf_identifier, diff --git a/mistral/engine/default_executor.py b/mistral/engine/default_executor.py index 210c965a..55c76cf3 100644 --- a/mistral/engine/default_executor.py +++ b/mistral/engine/default_executor.py @@ -17,8 +17,8 @@ from oslo_log import log as logging from osprofiler import profiler from mistral.actions import action_factory as a_f -from mistral import coordination from mistral.engine import base +from mistral.engine.rpc_backend import rpc from mistral.utils import inspect_utils as i_u from mistral.workflow import utils as wf_utils @@ -26,11 +26,9 @@ from mistral.workflow import utils as wf_utils LOG = logging.getLogger(__name__) -class DefaultExecutor(base.Executor, coordination.Service): - def __init__(self, engine_client): - self._engine_client = engine_client - - coordination.Service.__init__(self, 'executor_group') +class DefaultExecutor(base.Executor): + def __init__(self): + self._engine_client = rpc.get_engine_client() @profiler.trace('executor-run-action') def run_action(self, action_ex_id, action_class_str, attributes, diff --git a/mistral/engine/engine_server.py b/mistral/engine/engine_server.py new file mode 100644 index 00000000..7c3d5cdf --- /dev/null +++ b/mistral/engine/engine_server.py @@ -0,0 +1,250 @@ +# Copyright 2016 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral import config as cfg +from mistral.db.v2 import api as db_api +from mistral.engine import default_engine +from mistral.engine.rpc_backend import rpc +from mistral.service import base as service_base +from mistral.services import expiration_policy +from mistral.services import scheduler +from mistral.utils import profiler as profiler_utils +from mistral.utils import rpc_utils +from mistral.workflow import utils as wf_utils + +LOG = logging.getLogger(__name__) + + +class EngineServer(service_base.MistralService): + """Engine server. + + This class manages engine life-cycle and gets registered as an RPC + endpoint to process engine specific calls. It also registers a + cluster member associated with this instance of engine. + """ + + def __init__(self, engine, setup_profiler=True): + super(EngineServer, self).__init__('engine_group', setup_profiler) + + self.engine = engine + self._rpc_server = None + self._scheduler = None + self._expiration_policy_tg = None + + def start(self): + super(EngineServer, self).start() + + db_api.setup_db() + + self._scheduler = scheduler.start() + self._expiration_policy_tg = expiration_policy.setup() + + if self._setup_profiler: + profiler_utils.setup('mistral-engine', cfg.CONF.engine.host) + + # Initialize and start RPC server. + + self._rpc_server = rpc.get_rpc_server_driver()( + rpc_utils.get_rpc_info_from_oslo(cfg.CONF.engine) + ) + self._rpc_server.register_endpoint(self) + + # Note(ddeja): Engine needs to be run in default (blocking) mode + # since using another mode may leads to deadlock. + # See https://review.openstack.org/#/c/356343 for more info. + self._rpc_server.run(executor='blocking') + + self._notify_started('Engine server started.') + + def stop(self, graceful=False): + super(EngineServer, self).stop(graceful) + + if self._scheduler: + scheduler.stop_scheduler(self._scheduler, graceful) + + if self._expiration_policy_tg: + self._expiration_policy_tg.stop(graceful) + + if self._rpc_server: + self._rpc_server.stop(graceful) + + def start_workflow(self, rpc_ctx, workflow_identifier, workflow_input, + description, params): + """Receives calls over RPC to start workflows on engine. + + :param rpc_ctx: RPC request context. + :param workflow_identifier: Workflow definition identifier. + :param workflow_input: Workflow input. + :param description: Workflow execution description. + :param params: Additional workflow type specific parameters. + :return: Workflow execution. + """ + + LOG.info( + "Received RPC request 'start_workflow'[rpc_ctx=%s," + " workflow_identifier=%s, workflow_input=%s, description=%s, " + "params=%s]" + % (rpc_ctx, workflow_identifier, workflow_input, description, + params) + ) + + return self.engine.start_workflow( + workflow_identifier, + workflow_input, + description, + **params + ) + + def start_action(self, rpc_ctx, action_name, + action_input, description, params): + """Receives calls over RPC to start actions on engine. + + :param rpc_ctx: RPC request context. + :param action_name: name of the Action. + :param action_input: input dictionary for Action. + :param description: description of new Action execution. + :param params: extra parameters to run Action. + :return: Action execution. + """ + LOG.info( + "Received RPC request 'start_action'[rpc_ctx=%s," + " name=%s, input=%s, description=%s, params=%s]" + % (rpc_ctx, action_name, action_input, description, params) + ) + + return self.engine.start_action( + action_name, + action_input, + description, + **params + ) + + def on_task_state_change(self, rpc_ctx, task_ex_id, state, + state_info=None): + return self.engine.on_task_state_change(task_ex_id, state, state_info) + + def on_action_complete(self, rpc_ctx, action_ex_id, result_data, + result_error, wf_action): + """Receives RPC calls to communicate action result to engine. + + :param rpc_ctx: RPC request context. + :param action_ex_id: Action execution id. + :param result_data: Action result data. + :param result_error: Action result error. + :param wf_action: True if given id points to a workflow execution. + :return: Action execution. + """ + + result = wf_utils.Result(result_data, result_error) + + LOG.info( + "Received RPC request 'on_action_complete'[rpc_ctx=%s," + " action_ex_id=%s, result=%s]" % (rpc_ctx, action_ex_id, result) + ) + + return self.engine.on_action_complete(action_ex_id, result, wf_action) + + def pause_workflow(self, rpc_ctx, execution_id): + """Receives calls over RPC to pause workflows on engine. + + :param rpc_ctx: Request context. + :param execution_id: Workflow execution id. + :return: Workflow execution. + """ + + LOG.info( + "Received RPC request 'pause_workflow'[rpc_ctx=%s," + " execution_id=%s]" % (rpc_ctx, execution_id) + ) + + return self.engine.pause_workflow(execution_id) + + def rerun_workflow(self, rpc_ctx, task_ex_id, reset=True, env=None): + """Receives calls over RPC to rerun workflows on engine. + + :param rpc_ctx: RPC request context. + :param task_ex_id: Task execution id. + :param reset: If true, then purge action execution for the task. + :param env: Environment variables to update. + :return: Workflow execution. + """ + + LOG.info( + "Received RPC request 'rerun_workflow'[rpc_ctx=%s, " + "task_ex_id=%s]" % (rpc_ctx, task_ex_id) + ) + + return self.engine.rerun_workflow(task_ex_id, reset, env) + + def resume_workflow(self, rpc_ctx, wf_ex_id, env=None): + """Receives calls over RPC to resume workflows on engine. + + :param rpc_ctx: RPC request context. + :param wf_ex_id: Workflow execution id. + :param env: Environment variables to update. + :return: Workflow execution. + """ + + LOG.info( + "Received RPC request 'resume_workflow'[rpc_ctx=%s, " + "wf_ex_id=%s]" % (rpc_ctx, wf_ex_id) + ) + + return self.engine.resume_workflow(wf_ex_id, env) + + def stop_workflow(self, rpc_ctx, execution_id, state, message=None): + """Receives calls over RPC to stop workflows on engine. + + Sets execution state to SUCCESS or ERROR. No more tasks will be + scheduled. Running tasks won't be killed, but their results + will be ignored. + + :param rpc_ctx: RPC request context. + :param execution_id: Workflow execution id. + :param state: State assigned to the workflow. Permitted states are + SUCCESS or ERROR. + :param message: Optional information string. + + :return: Workflow execution. + """ + + LOG.info( + "Received RPC request 'stop_workflow'[rpc_ctx=%s, execution_id=%s," + " state=%s, message=%s]" % (rpc_ctx, execution_id, state, message) + ) + + return self.engine.stop_workflow(execution_id, state, message) + + def rollback_workflow(self, rpc_ctx, execution_id): + """Receives calls over RPC to rollback workflows on engine. + + :param rpc_ctx: RPC request context. + :return: Workflow execution. + """ + + LOG.info( + "Received RPC request 'rollback_workflow'[rpc_ctx=%s," + " execution_id=%s]" % (rpc_ctx, execution_id) + ) + + return self.engine.rollback_workflow(execution_id) + + +def get_oslo_service(setup_profiler=True): + return EngineServer( + default_engine.DefaultEngine(), + setup_profiler=setup_profiler + ) diff --git a/mistral/engine/executor_server.py b/mistral/engine/executor_server.py new file mode 100644 index 00000000..b408ab77 --- /dev/null +++ b/mistral/engine/executor_server.py @@ -0,0 +1,99 @@ +# Copyright 2016 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral import config as cfg +from mistral.engine import default_executor +from mistral.engine.rpc_backend import rpc +from mistral.service import base as service_base +from mistral.utils import profiler as profiler_utils +from mistral.utils import rpc_utils + +LOG = logging.getLogger(__name__) + + +class ExecutorServer(service_base.MistralService): + """Executor server. + + This class manages executor life-cycle and gets registered as an RPC + endpoint to process executor specific calls. It also registers a + cluster member associated with this instance of executor. + """ + + def __init__(self, executor, setup_profiler=True): + super(ExecutorServer, self).__init__('executor_group', setup_profiler) + + self.executor = executor + self._rpc_server = None + + def start(self): + super(ExecutorServer, self).start() + + if self._setup_profiler: + profiler_utils.setup('mistral-executor', cfg.CONF.executor.host) + + # Initialize and start RPC server. + + self._rpc_server = rpc.get_rpc_server_driver()( + rpc_utils.get_rpc_info_from_oslo(cfg.CONF.executor) + ) + self._rpc_server.register_endpoint(self) + + self._rpc_server.run(executor='threading') + + self._notify_started('Executor server started.') + + def stop(self, graceful=False): + super(ExecutorServer, self).stop(graceful) + + if self._rpc_server: + self._rpc_server.stop(graceful) + + def run_action(self, rpc_ctx, action_ex_id, action_class_str, + attributes, params, safe_rerun): + """Receives calls over RPC to run action on executor. + + :param rpc_ctx: RPC request context dictionary. + :param action_ex_id: Action execution id. + :param action_class_str: Action class name. + :param attributes: Action class attributes. + :param params: Action input parameters. + :param safe_rerun: Tells if given action can be safely rerun. + :return: Action result. + """ + + LOG.info( + "Received RPC request 'run_action'[rpc_ctx=%s," + " action_ex_id=%s, action_class=%s, attributes=%s, params=%s]" + % (rpc_ctx, action_ex_id, action_class_str, attributes, params) + ) + + redelivered = rpc_ctx.redelivered or False + + return self.executor.run_action( + action_ex_id, + action_class_str, + attributes, + params, + safe_rerun, + redelivered + ) + + +def get_oslo_service(setup_profiler=True): + return ExecutorServer( + default_executor.DefaultExecutor(), + setup_profiler=setup_profiler + ) diff --git a/mistral/engine/rpc_backend/base.py b/mistral/engine/rpc_backend/base.py index a065e967..e5df5ca0 100644 --- a/mistral/engine/rpc_backend/base.py +++ b/mistral/engine/rpc_backend/base.py @@ -73,5 +73,22 @@ class RPCServer(object): def run(self, executor='blocking'): """Runs the RPC server. + :param executor: Executor used to process incoming requests. Different + implementations may support different options. """ raise NotImplementedError + + def stop(self, graceful=False): + """Stop the RPC server. + + :param graceful: True if this method call should wait till all + internal threads are finished. + :return: + """ + # No-op by default. + pass + + def wait(self): + """Wait till all internal threads are finished.""" + # No-op by default. + pass diff --git a/mistral/engine/rpc_backend/kombu/kombu_server.py b/mistral/engine/rpc_backend/kombu/kombu_server.py index 4e43f686..90b364a2 100644 --- a/mistral/engine/rpc_backend/kombu/kombu_server.py +++ b/mistral/engine/rpc_backend/kombu/kombu_server.py @@ -47,6 +47,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): self.channel = None self.conn = None self._running = threading.Event() + self._stopped = threading.Event() self.endpoints = [] @property @@ -63,6 +64,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): self.password, self.virtual_host, ) + LOG.info("Connected to AMQP at %s:%s" % (self.host, self.port)) try: @@ -84,7 +86,10 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): callbacks=[self._on_message_safe], ) as consumer: consumer.qos(prefetch_count=1) + self._running.set() + self._stopped.clear() + while self.is_running: try: conn.drain_events(timeout=1) @@ -92,16 +97,25 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): pass except KeyboardInterrupt: self.stop() + LOG.info("Server with id='{0}' stopped.".format( self.server_id)) + return except socket.error as e: raise exc.MistralException("Broker connection failed: %s" % e) + finally: + self._stopped.set() - def stop(self): - """Stop the server.""" + def stop(self, graceful=False): self._running.clear() + if graceful: + self.wait() + + def wait(self): + self._stopped.wait() + def _get_rpc_method(self, method_name): for endpoint in self.endpoints: if hasattr(endpoint, method_name): diff --git a/mistral/engine/rpc_backend/oslo/oslo_server.py b/mistral/engine/rpc_backend/oslo/oslo_server.py index 70ee46cf..fec1f57d 100644 --- a/mistral/engine/rpc_backend/oslo/oslo_server.py +++ b/mistral/engine/rpc_backend/oslo/oslo_server.py @@ -34,6 +34,7 @@ class OsloRPCServer(rpc_base.RPCServer): self.channel = None self.connection = None self.endpoints = [] + self.oslo_server = None def register_endpoint(self, endpoint): self.endpoints.append(endpoint) @@ -44,7 +45,9 @@ class OsloRPCServer(rpc_base.RPCServer): server=self.server_id ) - server = messaging.get_rpc_server( + # TODO(rakhmerov): rpc.get_transport() should be in oslo.messaging + # related module. + self.oslo_server = messaging.get_rpc_server( rpc.get_transport(), target, self.endpoints, @@ -52,5 +55,13 @@ class OsloRPCServer(rpc_base.RPCServer): serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) ) - server.start() - server.wait() + self.oslo_server.start() + + def stop(self, graceful=False): + self.oslo_server.stop() + + if graceful: + self.oslo_server.wait() + + def wait(self): + self.oslo_server.wait() diff --git a/mistral/engine/rpc_backend/rpc.py b/mistral/engine/rpc_backend/rpc.py index 64dd09fb..4816ee17 100644 --- a/mistral/engine/rpc_backend/rpc.py +++ b/mistral/engine/rpc_backend/rpc.py @@ -52,6 +52,9 @@ def cleanup(): _EVENT_ENGINE_CLIENT = None +# TODO(rakhmerov): This method seems misplaced. Now we have different kind +# of transports (oslo, kombu) and this module should not have any oslo +# specific things anymore. def get_transport(): global _TRANSPORT @@ -120,174 +123,6 @@ def get_rpc_client_driver(): return _IMPL_CLIENT -class EngineServer(object): - """RPC Engine server.""" - - def __init__(self, engine): - self._engine = engine - - def start_workflow(self, rpc_ctx, workflow_identifier, workflow_input, - description, params): - """Receives calls over RPC to start workflows on engine. - - :param rpc_ctx: RPC request context. - :param workflow_identifier: Workflow definition identifier. - :param workflow_input: Workflow input. - :param description: Workflow execution description. - :param params: Additional workflow type specific parameters. - :return: Workflow execution. - """ - - LOG.info( - "Received RPC request 'start_workflow'[rpc_ctx=%s," - " workflow_identifier=%s, workflow_input=%s, description=%s, " - "params=%s]" - % (rpc_ctx, workflow_identifier, workflow_input, description, - params) - ) - - return self._engine.start_workflow( - workflow_identifier, - workflow_input, - description, - **params - ) - - def start_action(self, rpc_ctx, action_name, - action_input, description, params): - """Receives calls over RPC to start actions on engine. - - :param rpc_ctx: RPC request context. - :param action_name: name of the Action. - :param action_input: input dictionary for Action. - :param description: description of new Action execution. - :param params: extra parameters to run Action. - :return: Action execution. - """ - LOG.info( - "Received RPC request 'start_action'[rpc_ctx=%s," - " name=%s, input=%s, description=%s, params=%s]" - % (rpc_ctx, action_name, action_input, description, params) - ) - - return self._engine.start_action( - action_name, - action_input, - description, - **params - ) - - def on_task_state_change(self, rpc_ctx, task_ex_id, state, - state_info=None): - return self._engine.on_task_state_change(task_ex_id, state, state_info) - - def on_action_complete(self, rpc_ctx, action_ex_id, result_data, - result_error, wf_action): - """Receives RPC calls to communicate action result to engine. - - :param rpc_ctx: RPC request context. - :param action_ex_id: Action execution id. - :param result_data: Action result data. - :param result_error: Action result error. - :param wf_action: True if given id points to a workflow execution. - :return: Action execution. - """ - - result = wf_utils.Result(result_data, result_error) - - LOG.info( - "Received RPC request 'on_action_complete'[rpc_ctx=%s," - " action_ex_id=%s, result=%s]" % (rpc_ctx, action_ex_id, result) - ) - - return self._engine.on_action_complete(action_ex_id, result, wf_action) - - def pause_workflow(self, rpc_ctx, execution_id): - """Receives calls over RPC to pause workflows on engine. - - :param rpc_ctx: Request context. - :param execution_id: Workflow execution id. - :return: Workflow execution. - """ - - LOG.info( - "Received RPC request 'pause_workflow'[rpc_ctx=%s," - " execution_id=%s]" % (rpc_ctx, execution_id) - ) - - return self._engine.pause_workflow(execution_id) - - def rerun_workflow(self, rpc_ctx, task_ex_id, reset=True, env=None): - """Receives calls over RPC to rerun workflows on engine. - - :param rpc_ctx: RPC request context. - :param task_ex_id: Task execution id. - :param reset: If true, then purge action execution for the task. - :param env: Environment variables to update. - :return: Workflow execution. - """ - - LOG.info( - "Received RPC request 'rerun_workflow'[rpc_ctx=%s, " - "task_ex_id=%s]" % (rpc_ctx, task_ex_id) - ) - - return self._engine.rerun_workflow(task_ex_id, reset, env) - - def resume_workflow(self, rpc_ctx, wf_ex_id, env=None): - """Receives calls over RPC to resume workflows on engine. - - :param rpc_ctx: RPC request context. - :param wf_ex_id: Workflow execution id. - :param env: Environment variables to update. - :return: Workflow execution. - """ - - LOG.info( - "Received RPC request 'resume_workflow'[rpc_ctx=%s, " - "wf_ex_id=%s]" % (rpc_ctx, wf_ex_id) - ) - - return self._engine.resume_workflow(wf_ex_id, env) - - def stop_workflow(self, rpc_ctx, execution_id, state, message=None): - """Receives calls over RPC to stop workflows on engine. - - Sets execution state to SUCCESS or ERROR. No more tasks will be - scheduled. Running tasks won't be killed, but their results - will be ignored. - - :param rpc_ctx: RPC request context. - :param execution_id: Workflow execution id. - :param state: State assigned to the workflow. Permitted states are - SUCCESS or ERROR. - :param message: Optional information string. - - :return: Workflow execution. - """ - - LOG.info( - "Received RPC request 'stop_workflow'[rpc_ctx=%s, execution_id=%s," - " state=%s, message=%s]" % (rpc_ctx, execution_id, state, message) - ) - - return self._engine.stop_workflow(execution_id, state, message) - - def rollback_workflow(self, rpc_ctx, execution_id): - """Receives calls over RPC to rollback workflows on engine. - - :param rpc_ctx: RPC request context. - :return: Workflow execution. - """ - - LOG.info( - "Received RPC request 'rollback_workflow'[rpc_ctx=%s," - " execution_id=%s]" % (rpc_ctx, execution_id) - ) - - return self._engine.resume_workflow(execution_id) - - def _wrap_exception_and_reraise(exception): message = "%s: %s" % (exception.__class__.__name__, exception.args[0]) @@ -498,43 +333,6 @@ class EngineClient(base.Engine): ) -class ExecutorServer(object): - """RPC Executor server.""" - - def __init__(self, executor): - self._executor = executor - - def run_action(self, rpc_ctx, action_ex_id, action_class_str, - attributes, params, safe_rerun): - """Receives calls over RPC to run action on executor. - - :param rpc_ctx: RPC request context dictionary. - :param action_ex_id: Action execution id. - :param action_class_str: Action class name. - :param attributes: Action class attributes. - :param params: Action input parameters. - :param safe_rerun: Tells if given action can be safely rerun. - :return: Action result. - """ - - LOG.info( - "Received RPC request 'run_action'[rpc_ctx=%s," - " action_ex_id=%s, action_class=%s, attributes=%s, params=%s]" - % (rpc_ctx, action_ex_id, action_class_str, attributes, params) - ) - - redelivered = rpc_ctx.redelivered or False - - return self._executor.run_action( - action_ex_id, - action_class_str, - attributes, - params, - safe_rerun, - redelivered - ) - - class ExecutorClient(base.Executor): """RPC Executor client.""" @@ -588,37 +386,6 @@ class ExecutorClient(base.Executor): ) -class EventEngineServer(object): - """RPC EventEngine server.""" - - def __init__(self, event_engine): - self._event_engine = event_engine - - def create_event_trigger(self, rpc_ctx, trigger, events): - LOG.info( - "Received RPC request 'create_event_trigger'[rpc_ctx=%s," - " trigger=%s, events=%s", rpc_ctx, trigger, events - ) - - return self._event_engine.create_event_trigger(trigger, events) - - def delete_event_trigger(self, rpc_ctx, trigger, events): - LOG.info( - "Received RPC request 'delete_event_trigger'[rpc_ctx=%s," - " trigger=%s, events=%s", rpc_ctx, trigger, events - ) - - return self._event_engine.delete_event_trigger(trigger, events) - - def update_event_trigger(self, rpc_ctx, trigger): - LOG.info( - "Received RPC request 'update_event_trigger'[rpc_ctx=%s," - " trigger=%s", rpc_ctx, trigger - ) - - return self._event_engine.update_event_trigger(trigger) - - class EventEngineClient(base.EventEngine): """RPC EventEngine client.""" diff --git a/mistral/event_engine/__init__.py b/mistral/event_engine/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/services/event_engine.py b/mistral/event_engine/event_engine.py similarity index 97% rename from mistral/services/event_engine.py rename to mistral/event_engine/event_engine.py index aef425c4..c34a0696 100644 --- a/mistral/services/event_engine.py +++ b/mistral/event_engine/event_engine.py @@ -24,16 +24,18 @@ import six import yaml from mistral import context as auth_ctx -from mistral import coordination from mistral.db.v2 import api as db_api +from mistral.engine.rpc_backend import rpc from mistral import exceptions from mistral import expressions from mistral import messaging as mistral_messaging from mistral.services import security + LOG = logging.getLogger(__name__) CONF = cfg.CONF + DEFAULT_PROPERTIES = { 'service': '<% $.publisher %>', 'project_id': '<% $.context.project_id %>', @@ -124,16 +126,14 @@ class NotificationsConverter(object): return edef.convert(event) -class EventEngine(coordination.Service): +class EventEngine(object): """Event engine server. A separate service that is responsible for listening event notification - and trigger workflows defined by end user. + and triggering workflows defined by end user. """ - def __init__(self, engine_client): - coordination.Service.__init__(self, 'event_engine_group') - - self.engine_client = engine_client + def __init__(self): + self.engine_client = rpc.get_engine_client() self.event_queue = six.moves.queue.Queue() self.handler_tg = threadgroup.ThreadGroup() diff --git a/mistral/event_engine/event_engine_server.py b/mistral/event_engine/event_engine_server.py new file mode 100644 index 00000000..8e34c7c6 --- /dev/null +++ b/mistral/event_engine/event_engine_server.py @@ -0,0 +1,94 @@ +# Copyright 2016 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral import config as cfg +from mistral.engine.rpc_backend import rpc +from mistral.event_engine import event_engine +from mistral.service import base as service_base +from mistral.utils import profiler as profiler_utils +from mistral.utils import rpc_utils + + +LOG = logging.getLogger(__name__) + + +class EventEngineServer(service_base.MistralService): + """RPC EventEngine server. + + This class manages event engine life-cycle and gets registered as + an RPC endpoint to process event engine specific calls. It also + registers a cluster member associated with this instance of event + engine. + """ + + def __init__(self, event_engine): + super(EventEngineServer, self).__init__('event_engine_group') + + self._event_engine = event_engine + self._rpc_server = None + + def start(self): + super(EventEngineServer, self).start() + + profiler_utils.setup( + 'mistral-event-engine', + cfg.CONF.event_engine.host + ) + + # Initialize and start RPC server. + + self._rpc_server = rpc.get_rpc_server_driver()( + rpc_utils.get_rpc_info_from_oslo(cfg.CONF.event_engine) + ) + self._rpc_server.register_endpoint(self) + + self._rpc_server.run() + + self._notify_started('Event engine server started.') + + def stop(self, graceful=False): + super(EventEngineServer, self).stop(graceful) + + if self._rpc_server: + self._rpc_server.stop(graceful) + + def create_event_trigger(self, rpc_ctx, trigger, events): + LOG.info( + "Received RPC request 'create_event_trigger'[rpc_ctx=%s," + " trigger=%s, events=%s", rpc_ctx, trigger, events + ) + + return self._event_engine.create_event_trigger(trigger, events) + + def delete_event_trigger(self, rpc_ctx, trigger, events): + LOG.info( + "Received RPC request 'delete_event_trigger'[rpc_ctx=%s," + " trigger=%s, events=%s", rpc_ctx, trigger, events + ) + + return self._event_engine.delete_event_trigger(trigger, events) + + def update_event_trigger(self, rpc_ctx, trigger): + LOG.info( + "Received RPC request 'update_event_trigger'[rpc_ctx=%s," + " trigger=%s", rpc_ctx, trigger + ) + + return self._event_engine.update_event_trigger(trigger) + + +def get_oslo_service(): + return EventEngineServer(event_engine.EventEngine()) diff --git a/mistral/service/__init__.py b/mistral/service/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/service/base.py b/mistral/service/base.py new file mode 100644 index 00000000..c1dd203c --- /dev/null +++ b/mistral/service/base.py @@ -0,0 +1,63 @@ +# Copyright 2016 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from eventlet import event +from oslo_log import log +from oslo_service import service + +from mistral.service import coordination + + +LOG = log.getLogger(__name__) + + +class MistralService(service.Service): + """Base class for Mistral services. + + The term 'service' here means any Mistral component that can run as + an independent process and thus can be registered as a cluster member. + """ + def __init__(self, cluster_group, setup_profiler=True): + super(MistralService, self).__init__() + + self.cluster_member = coordination.Service(cluster_group) + self._setup_profiler = setup_profiler + self._started = event.Event() + + def wait_started(self): + """Wait until the service is fully started.""" + self._started.wait() + + def _notify_started(self, message): + print(message) + + self._started.send() + + def start(self): + super(MistralService, self).start() + + self.cluster_member.register_membership() + + def stop(self, graceful=False): + super(MistralService, self).stop(graceful) + + self._started = event.Event() + + # TODO(rakhmerov): Probably we could also take care of an RPC server + # if it exists for this particular service type. Take a look at + # executor and engine servers. + + # TODO(rakhmerov): This method is not implemented correctly now + # (not thread-safe). Uncomment this call once it's fixed. + # self.cluster_member.stop() diff --git a/mistral/coordination.py b/mistral/service/coordination.py similarity index 100% rename from mistral/coordination.py rename to mistral/service/coordination.py diff --git a/mistral/services/expiration_policy.py b/mistral/services/expiration_policy.py index 27d2e8f9..98b4e4aa 100644 --- a/mistral/services/expiration_policy.py +++ b/mistral/services/expiration_policy.py @@ -110,3 +110,5 @@ def setup(): periodic_interval_max=1, context=ctx ) + + return tg diff --git a/mistral/services/scheduler.py b/mistral/services/scheduler.py index efa7337f..1041a557 100644 --- a/mistral/services/scheduler.py +++ b/mistral/services/scheduler.py @@ -200,21 +200,33 @@ class CallScheduler(periodic_task.PeriodicTasks): ) -def setup(): +def start(): tg = threadgroup.ThreadGroup() - scheduler = CallScheduler(CONF) + sched = CallScheduler(CONF) tg.add_dynamic_timer( - scheduler.run_periodic_tasks, + sched.run_periodic_tasks, initial_delay=None, periodic_interval_max=1, context=None ) - _schedulers[scheduler] = tg + _schedulers[sched] = tg - return tg + return sched + + +def stop_scheduler(sched, graceful=False): + if sched: + tg = _schedulers[sched] + + tg.stop() + + del _schedulers[sched] + + if graceful: + tg.wait() def stop_all_schedulers(): diff --git a/mistral/tests/unit/api/v2/test_services.py b/mistral/tests/unit/api/v2/test_services.py index 1d7010fe..a86c5a00 100644 --- a/mistral/tests/unit/api/v2/test_services.py +++ b/mistral/tests/unit/api/v2/test_services.py @@ -17,7 +17,7 @@ from oslo_config import cfg import tooz.coordination from webtest import app as webtest_app -from mistral import coordination +from mistral.service import coordination from mistral.tests.unit.api import base @@ -54,7 +54,7 @@ class TestServicesController(base.APITest): self.assertIn('Service API is not supported', context.args[0]) - @mock.patch('mistral.coordination.ServiceCoordinator.get_members', + @mock.patch('mistral.service.coordination.ServiceCoordinator.get_members', side_effect=tooz.coordination.ToozError('error message')) def test_get_all_with_get_members_error(self, mock_get_members): cfg.CONF.set_default('backend_url', 'zake://', 'coordination') diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index f92af668..0c2ef249 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -18,13 +18,12 @@ import eventlet from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging +from oslo_service import service -from mistral import context as ctx from mistral.db.v2 import api as db_api -from mistral.engine import default_engine as def_eng -from mistral.engine import default_executor as def_exec +from mistral.engine import engine_server +from mistral.engine import executor_server from mistral.engine.rpc_backend import rpc -from mistral.services import scheduler from mistral.tests.unit import base from mistral.workflow import states @@ -36,54 +35,12 @@ DEFAULT_DELAY = 1 DEFAULT_TIMEOUT = 30 -def launch_engine_server(transport, engine): - target = messaging.Target( - topic=cfg.CONF.engine.topic, - server=cfg.CONF.engine.host - ) +def launch_service(s): + launcher = service.ServiceLauncher(cfg.CONF) - server = messaging.get_rpc_server( - transport, - target, - [rpc.EngineServer(engine)], - executor='blocking', - serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) - ) + launcher.launch_service(s) - try: - server.start() - while True: - eventlet.sleep(604800) - except (KeyboardInterrupt, SystemExit): - LOG.info("Stopping engine service...") - finally: - server.stop() - server.wait() - - -def launch_executor_server(transport, executor): - target = messaging.Target( - topic=cfg.CONF.executor.topic, - server=cfg.CONF.executor.host - ) - - server = messaging.get_rpc_server( - transport, - target, - [rpc.ExecutorServer(executor)], - executor='blocking', - serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) - ) - - try: - server.start() - while True: - eventlet.sleep(604800) - except (KeyboardInterrupt, SystemExit): - LOG.info("Stopping executor service...") - finally: - server.stop() - server.wait() + launcher.wait() class EngineTestCase(base.DbTestCase): @@ -100,28 +57,35 @@ class EngineTestCase(base.DbTestCase): # Drop all RPC objects (transport, clients). rpc.cleanup() - transport = rpc.get_transport() self.engine_client = rpc.get_engine_client() self.executor_client = rpc.get_executor_client() - self.engine = def_eng.DefaultEngine(self.engine_client) - self.executor = def_exec.DefaultExecutor(self.engine_client) - LOG.info("Starting engine and executor threads...") + engine_service = engine_server.get_oslo_service(setup_profiler=False) + executor_service = executor_server.get_oslo_service( + setup_profiler=False + ) + + self.engine = engine_service.engine + self.executor = executor_service.executor + self.threads = [ - eventlet.spawn(launch_engine_server, transport, self.engine), - eventlet.spawn(launch_executor_server, transport, self.executor), + eventlet.spawn(launch_service, executor_service), + eventlet.spawn(launch_service, engine_service) ] self.addOnException(self.print_executions) - # Start scheduler. - scheduler_thread_group = scheduler.setup() - + self.addCleanup(executor_service.stop, True) + self.addCleanup(engine_service.stop, True) self.addCleanup(self.kill_threads) - self.addCleanup(scheduler_thread_group.stop) + + # Make sure that both services fully started, otherwise + # the test may run too early. + executor_service.wait_started() + engine_service.wait_started() def kill_threads(self): LOG.info("Finishing engine and executor threads...") diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index d397b2b0..f734b13f 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -23,6 +23,7 @@ from oslo_utils import uuidutils from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models from mistral.engine import default_engine as d_eng +from mistral.engine.rpc_backend import rpc from mistral import exceptions as exc from mistral.services import workbooks as wb_service from mistral.tests.unit import base @@ -92,6 +93,7 @@ MOCK_ENVIRONMENT = mock.MagicMock(return_value=ENVIRONMENT_DB) MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) +@mock.patch.object(rpc, 'get_executor_client', mock.Mock()) class DefaultEngineTest(base.DbTestCase): def setUp(self): super(DefaultEngineTest, self).setUp() @@ -100,7 +102,7 @@ class DefaultEngineTest(base.DbTestCase): # Note: For purposes of this test we can easily use # simple magic mocks for engine and executor clients - self.engine = d_eng.DefaultEngine(mock.MagicMock()) + self.engine = d_eng.DefaultEngine() def test_start_workflow(self): wf_input = {'param1': 'Hey', 'param2': 'Hi'} diff --git a/mistral/tests/unit/engine/test_environment.py b/mistral/tests/unit/engine/test_environment.py index fd7ed2ca..b2fee4df 100644 --- a/mistral/tests/unit/engine/test_environment.py +++ b/mistral/tests/unit/engine/test_environment.py @@ -80,7 +80,7 @@ workflows: def _run_at_target(action_ex_id, action_class_str, attributes, action_params, target=None, async=True, safe_rerun=False): # We'll just call executor directly for testing purposes. - executor = default_executor.DefaultExecutor(rpc.get_engine_client()) + executor = default_executor.DefaultExecutor() executor.run_action( action_ex_id, diff --git a/mistral/tests/unit/engine/test_join.py b/mistral/tests/unit/engine/test_join.py index 1aa749b4..a5859a6f 100644 --- a/mistral/tests/unit/engine/test_join.py +++ b/mistral/tests/unit/engine/test_join.py @@ -221,13 +221,18 @@ class JoinEngineTest(base.EngineTestCase): task4 = self._assert_single_item(tasks, name='task4') # NOTE(xylan): We ensure task4 is successful here because of the - # uncertainty of its running parallelly with task3. + # uncertainty of its running in parallel with task3. self.await_task_success(task4.id) self.assertEqual(states.RUNNING, wf_ex.state) self.assertEqual(states.SUCCESS, task1.state) self.assertEqual(states.SUCCESS, task2.state) - self.assertEqual(states.WAITING, task3.state) + + # NOTE(rakhmerov): Task 3 must fail because task2->task3 transition + # will never trigger due to its condition. + self.await_task_error(task3.id) + + self.await_workflow_error(wf_ex.id) def test_partial_join(self): wf_text = """--- diff --git a/mistral/tests/unit/engine/test_safe_rerun.py b/mistral/tests/unit/engine/test_safe_rerun.py index 0c1c9ae8..9d9d38d9 100644 --- a/mistral/tests/unit/engine/test_safe_rerun.py +++ b/mistral/tests/unit/engine/test_safe_rerun.py @@ -27,7 +27,7 @@ from mistral.workflow import states def _run_at_target(action_ex_id, action_class_str, attributes, action_params, target=None, async=True, safe_rerun=False): # We'll just call executor directly for testing purposes. - executor = default_executor.DefaultExecutor(rpc.get_engine_client()) + executor = default_executor.DefaultExecutor() executor.run_action( action_ex_id, @@ -43,7 +43,6 @@ MOCK_RUN_AT_TARGET = mock.MagicMock(side_effect=_run_at_target) class TestSafeRerun(base.EngineTestCase): - @mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET) def test_safe_rerun_true(self): wf_text = """--- diff --git a/mistral/tests/unit/engine/test_task_defaults.py b/mistral/tests/unit/engine/test_task_defaults.py index 845664a7..62e43597 100644 --- a/mistral/tests/unit/engine/test_task_defaults.py +++ b/mistral/tests/unit/engine/test_task_defaults.py @@ -18,7 +18,6 @@ from oslo_config import cfg import requests from mistral.db.v2 import api as db_api -from mistral.services import scheduler from mistral.services import workflows as wf_service from mistral.tests.unit.engine import base from mistral.workflow import states @@ -83,13 +82,6 @@ class TaskDefaultsDirectWorkflowEngineTest(base.EngineTestCase): class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase): - def setUp(self): - super(TaskDefaultsReverseWorkflowEngineTest, self).setUp() - - thread_group = scheduler.setup() - - self.addCleanup(thread_group.stop) - def test_task_defaults_retry_policy(self): wf_text = """--- version: '2.0' diff --git a/mistral/tests/unit/services/test_event_engine.py b/mistral/tests/unit/services/test_event_engine.py index 3a3f6cac..195d96f0 100644 --- a/mistral/tests/unit/services/test_event_engine.py +++ b/mistral/tests/unit/services/test_event_engine.py @@ -18,7 +18,8 @@ import mock from oslo_config import cfg from mistral.db.v2.sqlalchemy import api as db_api -from mistral.services import event_engine +from mistral.engine.rpc_backend import rpc +from mistral.event_engine import event_engine from mistral.services import workflows from mistral.tests.unit import base @@ -55,10 +56,13 @@ class EventEngineTest(base.DbTestCase): super(EventEngineTest, self).setUp() self.wf = workflows.create_workflows(WORKFLOW_LIST)[0] + EVENT_TRIGGER['workflow_id'] = self.wf.id + @mock.patch.object(rpc, 'get_engine_client', mock.Mock()) def test_event_engine_start_with_no_triggers(self): - e_engine = event_engine.EventEngine(mock.Mock()) + e_engine = event_engine.EventEngine() + self.addCleanup(e_engine.handler_tg.stop) self.assertEqual(0, len(e_engine.event_triggers_map)) @@ -66,10 +70,12 @@ class EventEngineTest(base.DbTestCase): self.assertEqual(0, len(e_engine.exchange_topic_listener_map)) @mock.patch('mistral.messaging.start_listener') + @mock.patch.object(rpc, 'get_engine_client', mock.Mock()) def test_event_engine_start_with_triggers(self, mock_start): trigger = db_api.create_event_trigger(EVENT_TRIGGER) - e_engine = event_engine.EventEngine(mock.MagicMock()) + e_engine = event_engine.EventEngine() + self.addCleanup(e_engine.handler_tg.stop) self.assertEqual(1, len(e_engine.exchange_topic_events_map)) @@ -86,11 +92,12 @@ class EventEngineTest(base.DbTestCase): self.assertEqual(1, len(e_engine.exchange_topic_listener_map)) @mock.patch('mistral.messaging.start_listener') + @mock.patch.object(rpc, 'get_engine_client', mock.Mock()) def test_process_event_queue(self, mock_start): db_api.create_event_trigger(EVENT_TRIGGER) - client = mock.MagicMock() - e_engine = event_engine.EventEngine(client) + e_engine = event_engine.EventEngine() + self.addCleanup(e_engine.handler_tg.stop) event = { @@ -103,6 +110,7 @@ class EventEngineTest(base.DbTestCase): with mock.patch.object(e_engine, 'engine_client') as client_mock: e_engine.event_queue.put(event) + time.sleep(1) self.assertEqual(1, client_mock.start_workflow.call_count) diff --git a/mistral/tests/unit/services/test_scheduler.py b/mistral/tests/unit/services/test_scheduler.py index 15db8fb4..ac5197ed 100644 --- a/mistral/tests/unit/services/test_scheduler.py +++ b/mistral/tests/unit/services/test_scheduler.py @@ -81,9 +81,9 @@ class SchedulerServiceTest(base.DbTestCase): def setUp(self): super(SchedulerServiceTest, self).setUp() - self.thread_group = scheduler.setup() + sched = scheduler.start() - self.addCleanup(self.thread_group.stop) + self.addCleanup(scheduler.stop_scheduler, sched, True) @mock.patch(FACTORY_METHOD_PATH) def test_scheduler_with_factory(self, factory): @@ -242,11 +242,12 @@ class SchedulerServiceTest(base.DbTestCase): @mock.patch(TARGET_METHOD_PATH) def test_scheduler_multi_instance(self, method): - def stop_thread_groups(): - [tg.stop() for tg in self.tgs] + scheds = [scheduler.start(), scheduler.start()] - self.tgs = [scheduler.setup(), scheduler.setup()] - self.addCleanup(stop_thread_groups) + def stop_schedulers(): + [scheduler.stop_scheduler(s, True) for s in scheds] + + self.addCleanup(stop_schedulers) method_args = {'name': 'task', 'id': '321'} diff --git a/mistral/tests/unit/test_coordination.py b/mistral/tests/unit/test_coordination.py index 47a5065b..377a5d42 100644 --- a/mistral/tests/unit/test_coordination.py +++ b/mistral/tests/unit/test_coordination.py @@ -16,7 +16,7 @@ import mock from oslo_config import cfg import six -from mistral import coordination +from mistral.service import coordination from mistral.tests.unit import base