diff --git a/mistral/api/app.py b/mistral/api/app.py index 181f0b6d..052f5b07 100644 --- a/mistral/api/app.py +++ b/mistral/api/app.py @@ -19,6 +19,7 @@ import pecan from mistral.api import access_control from mistral import context as ctx +from mistral import coordination from mistral.db.v2 import api as db_api_v2 from mistral.services import periodic @@ -49,6 +50,8 @@ def setup_app(config=None): periodic.setup() + coordination.Service('api_group').register_membership() + app = pecan.make_app( app_conf.pop('root'), hooks=lambda: [ctx.ContextHook(), ctx.AuthHook()], diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index 0d3e10bd..35861071 100755 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -52,6 +52,7 @@ from mistral import version CONF = cfg.CONF + LOG = logging.getLogger(__name__) @@ -73,6 +74,8 @@ def launch_executor(transport): serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) ) + executor_v2.register_membership() + server.start() server.wait() @@ -102,6 +105,8 @@ def launch_engine(transport): serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) ) + engine_v2.register_membership() + server.start() server.wait() diff --git a/mistral/utils/coordination.py b/mistral/coordination.py similarity index 73% rename from mistral/utils/coordination.py rename to mistral/coordination.py index c2ade800..c2076eed 100644 --- a/mistral/utils/coordination.py +++ b/mistral/coordination.py @@ -14,8 +14,10 @@ import six +from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log +from oslo_service import threadgroup from retrying import retry import tooz.coordination @@ -24,6 +26,8 @@ from mistral import utils LOG = log.getLogger(__name__) +_SERVICE_COORDINATOR = None + class ServiceCoordinator(object): """Service coordinator. @@ -143,3 +147,56 @@ class ServiceCoordinator(object): LOG.warning('Group %s does not exist.', group_id) return [] + + +def cleanup_service_coordinator(): + """Intends to be used by tests to recreate service coordinator.""" + + global _SERVICE_COORDINATOR + + _SERVICE_COORDINATOR = None + + +def get_service_coordinator(my_id=None): + global _SERVICE_COORDINATOR + + if not _SERVICE_COORDINATOR: + _SERVICE_COORDINATOR = ServiceCoordinator(my_id=my_id) + _SERVICE_COORDINATOR.start() + + return _SERVICE_COORDINATOR + + +class Service(object): + def __init__(self, group_type): + self.group_type = group_type + self._tg = None + + @lockutils.synchronized('service_coordinator') + def register_membership(self): + """Registers group membership. + + Because this method will be invoked on each service startup almost at + the same time, so it must be synchronized, in case all the services + are started within same process. + """ + + service_coordinator = get_service_coordinator() + + if service_coordinator.is_active(): + service_coordinator.join_group(self.group_type) + + self._tg = threadgroup.ThreadGroup() + + self._tg.add_timer( + cfg.CONF.coordination.heartbeat_interval, + service_coordinator.heartbeat + ) + + def stop(self): + service_coordinator = get_service_coordinator() + + if service_coordinator.is_active(): + self._tg.stop() + + service_coordinator.stop() diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 0800fe18..64732a0f 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -18,6 +18,7 @@ import traceback from oslo_log import log as logging +from mistral import coordination from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import action_handler @@ -43,10 +44,12 @@ LOG = logging.getLogger(__name__) # the submodules are referenced. -class DefaultEngine(base.Engine): +class DefaultEngine(base.Engine, coordination.Service): def __init__(self, engine_client): self._engine_client = engine_client + coordination.Service.__init__(self, 'engine_group') + @u.log_exec(LOG) def start_workflow(self, wf_name, wf_input, description='', **params): wf_exec_id = None diff --git a/mistral/engine/default_executor.py b/mistral/engine/default_executor.py index 7f080c61..9ff4a33c 100644 --- a/mistral/engine/default_executor.py +++ b/mistral/engine/default_executor.py @@ -13,9 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from oslo_log import log as logging from mistral.actions import action_factory as a_f +from mistral import coordination from mistral.engine import base from mistral import exceptions as exc from mistral.utils import inspect_utils as i_u @@ -25,10 +27,12 @@ from mistral.workflow import utils as wf_utils LOG = logging.getLogger(__name__) -class DefaultExecutor(base.Executor): +class DefaultExecutor(base.Executor, coordination.Service): def __init__(self, engine_client): self._engine_client = engine_client + coordination.Service.__init__(self, 'executor_group') + def run_action(self, action_ex_id, action_class_str, attributes, action_params): """Runs action. diff --git a/mistral/tests/unit/utils/test_coordination.py b/mistral/tests/unit/test_coordination.py similarity index 75% rename from mistral/tests/unit/utils/test_coordination.py rename to mistral/tests/unit/test_coordination.py index e11edcc7..93937e07 100644 --- a/mistral/tests/unit/utils/test_coordination.py +++ b/mistral/tests/unit/test_coordination.py @@ -12,15 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock from oslo_config import cfg +from mistral import coordination from mistral.tests import base -from mistral.utils import coordination -class CoordinationTest(base.BaseTest): +class ServiceCoordinatorTest(base.BaseTest): def setUp(self): - super(CoordinationTest, self).setUp() + super(ServiceCoordinatorTest, self).setUp() def test_start(self): cfg.CONF.set_default( @@ -116,3 +117,31 @@ class CoordinationTest(base.BaseTest): self.assertEqual(0, len(members_after)) self.assertEqual(set([]), members_after) + + +class ServiceTest(base.BaseTest): + def setUp(self): + super(ServiceTest, self).setUp() + + # Re-intialize the global service coordinator object, in order to use + # new coordination configuration. + coordination.cleanup_service_coordinator() + + @mock.patch('mistral.utils.get_process_identifier', return_value='fake_id') + def test_register_membership(self, mock_get_identifier): + cfg.CONF.set_default('backend_url', 'zake://', 'coordination') + + srv = coordination.Service('fake_group') + srv.register_membership() + + self.addCleanup(srv.stop) + + srv_coordinator = coordination.get_service_coordinator() + + self.assertIsNotNone(srv_coordinator) + self.assertTrue(srv_coordinator.is_active()) + + members = srv_coordinator.get_members('fake_group') + + mock_get_identifier.assert_called_once_with() + self.assertEqual(set(['fake_id']), members) diff --git a/requirements.txt b/requirements.txt index a5bcfe87..a58cfa16 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,7 @@ keystonemiddleware>=1.5.0 kombu>=3.0.7 mock>=1.0 networkx>=1.8 +oslo.concurrency>=2.3.0 # Apache-2.0 oslo.config>=1.11.0 # Apache-2.0 oslo.db>=1.10.0 # Apache-2.0 oslo.messaging!=1.12.0,>=1.8.0 # Apache-2.0