diff --git a/mistral/config.py b/mistral/config.py index b18904fe6..aa69b950e 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -96,18 +96,29 @@ wf_trace_log_name_opt = cfg.StrOpt( 'workflow trace output.' ) +coordination_opts = [ + cfg.StrOpt('backend_url', + default=None, + help='The backend URL to be used for coordination'), + cfg.FloatOpt('heartbeat_interval', + default=5.0, + help='Number of seconds between heartbeats for coordination.') +] + CONF = cfg.CONF API_GROUP = 'api' ENGINE_GROUP = 'engine' EXECUTOR_GROUP = 'executor' PECAN_GROUP = 'pecan' +COORDINATION_GROUP = 'coordination' CONF.register_opts(api_opts, group=API_GROUP) CONF.register_opts(engine_opts, group=ENGINE_GROUP) CONF.register_opts(pecan_opts, group=PECAN_GROUP) CONF.register_opts(executor_opts, group=EXECUTOR_GROUP) CONF.register_opt(wf_trace_log_name_opt) +CONF.register_opts(coordination_opts, group=COORDINATION_GROUP) CLI_OPTS = [ use_debugger, @@ -134,6 +145,7 @@ def list_opts(): (ENGINE_GROUP, engine_opts), (EXECUTOR_GROUP, executor_opts), (PECAN_GROUP, pecan_opts), + (COORDINATION_GROUP, coordination_opts), (None, itertools.chain( CLI_OPTS, [wf_trace_log_name_opt] diff --git a/mistral/exceptions.py b/mistral/exceptions.py index fdbcab172..f2c583b49 100644 --- a/mistral/exceptions.py +++ b/mistral/exceptions.py @@ -119,3 +119,7 @@ class SizeLimitExceededException(MistralException): super(SizeLimitExceededException, self).__init__( "Size of '%s' is %dKB which exceeds the limit of %dKB" % (field_name, size_kb, size_limit_kb)) + + +class CoordinationException(MistralException): + http_code = 500 diff --git a/mistral/tests/unit/utils/test_coordination.py b/mistral/tests/unit/utils/test_coordination.py new file mode 100644 index 000000000..e11edcc76 --- /dev/null +++ b/mistral/tests/unit/utils/test_coordination.py @@ -0,0 +1,118 @@ +# Copyright 2015 Huawei Technologies Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + +from mistral.tests import base +from mistral.utils import coordination + + +class CoordinationTest(base.BaseTest): + def setUp(self): + super(CoordinationTest, self).setUp() + + def test_start(self): + cfg.CONF.set_default( + 'backend_url', + 'zake://', + 'coordination' + ) + + coordinator = coordination.ServiceCoordinator('fake_id') + coordinator.start() + + self.assertTrue(coordinator.is_active()) + + def test_start_without_backend(self): + cfg.CONF.set_default('backend_url', None, 'coordination') + + coordinator = coordination.ServiceCoordinator() + coordinator.start() + + self.assertFalse(coordinator.is_active()) + + def test_stop_not_active(self): + cfg.CONF.set_default('backend_url', None, 'coordination') + + coordinator = coordination.ServiceCoordinator() + coordinator.start() + + coordinator.stop() + + self.assertFalse(coordinator.is_active()) + + def test_stop(self): + cfg.CONF.set_default( + 'backend_url', + 'zake://', + 'coordination' + ) + + coordinator = coordination.ServiceCoordinator() + coordinator.start() + + coordinator.stop() + + self.assertFalse(coordinator.is_active()) + + def test_join_group_not_active(self): + cfg.CONF.set_default('backend_url', None, 'coordination') + + coordinator = coordination.ServiceCoordinator() + coordinator.start() + + coordinator.join_group('fake_group') + members = coordinator.get_members('fake_group') + + self.assertFalse(coordinator.is_active()) + + self.assertEqual(0, len(members)) + + def test_join_group_and_get_members(self): + cfg.CONF.set_default( + 'backend_url', + 'zake://', + 'coordination' + ) + + coordinator = coordination.ServiceCoordinator(my_id='fake_id') + coordinator.start() + + coordinator.join_group('fake_group') + members = coordinator.get_members('fake_group') + + self.assertEqual(1, len(members)) + self.assertItemsEqual(('fake_id',), members) + + def test_join_group_and_leave_group(self): + cfg.CONF.set_default( + 'backend_url', + 'zake://', + 'coordination' + ) + + coordinator = coordination.ServiceCoordinator(my_id='fake_id') + coordinator.start() + + coordinator.join_group('fake_group') + members_before = coordinator.get_members('fake_group') + + coordinator.leave_group('fake_group') + members_after = coordinator.get_members('fake_group') + + self.assertEqual(1, len(members_before)) + self.assertEqual(set(['fake_id']), members_before) + + self.assertEqual(0, len(members_after)) + self.assertEqual(set([]), members_after) diff --git a/mistral/utils/__init__.py b/mistral/utils/__init__.py index 454a816b3..fb2f3a14a 100644 --- a/mistral/utils/__init__.py +++ b/mistral/utils/__init__.py @@ -19,6 +19,7 @@ import json import logging import os from os import path +import socket import threading import uuid @@ -241,3 +242,9 @@ def get_input_dict(inputs): input_dict[x] = NotDefined return input_dict + + +def get_process_identifier(): + """Gets current running process identifier.""" + + return "%s_%s" % (socket.gethostname(), os.getpid()) diff --git a/mistral/utils/coordination.py b/mistral/utils/coordination.py new file mode 100644 index 000000000..c2ade8007 --- /dev/null +++ b/mistral/utils/coordination.py @@ -0,0 +1,145 @@ +# Copyright 2015 Huawei Technologies Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six + +from oslo_config import cfg +from oslo_log import log +from retrying import retry +import tooz.coordination + +from mistral import utils + + +LOG = log.getLogger(__name__) + + +class ServiceCoordinator(object): + """Service coordinator. + + This class uses the `tooz` library to manage group membership. + + To ensure that the other agents know this agent is still alive, + the `heartbeat` method should be called periodically. + """ + + def __init__(self, my_id=None): + self._coordinator = None + self._my_id = my_id or utils.get_process_identifier() + self._started = False + + def start(self): + backend_url = cfg.CONF.coordination.backend_url + + if backend_url: + try: + self._coordinator = tooz.coordination.get_coordinator( + backend_url, + self._my_id + ) + + self._coordinator.start() + self._started = True + + LOG.info('Coordination backend started successfully.') + except tooz.coordination.ToozError as e: + self._started = False + + LOG.exception('Error connecting to coordination backend. ' + '%s', six.text_type(e)) + + def stop(self): + if not self.is_active(): + return + + try: + self._coordinator.stop() + except tooz.coordination.ToozError: + LOG.warning('Error connecting to coordination backend.') + finally: + self._coordinator = None + self._started = False + + def is_active(self): + return self._coordinator and self._started + + def heartbeat(self): + if not self.is_active(): + # Re-connect. + self.start() + + if not self.is_active(): + LOG.debug("Coordination backend didn't start.") + return + + try: + self._coordinator.heartbeat() + except tooz.coordination.ToozError as e: + LOG.exception('Error sending a heartbeat to coordination ' + 'backend. %s', six.text_type(e)) + + @retry(stop_max_attempt_number=5) + def join_group(self, group_id): + if not self.is_active() or not group_id: + return + + try: + join_req = self._coordinator.join_group(group_id) + join_req.get() + + LOG.info( + 'Joined service group:%s, member:%s', + group_id, + self._my_id + ) + + return + except tooz.coordination.MemberAlreadyExist: + return + except tooz.coordination.GroupNotCreated as e: + create_grp_req = self._coordinator.create_group(group_id) + + try: + create_grp_req.get() + except tooz.coordination.GroupAlreadyExist: + pass + + # Re-raise exception to join group again. + raise e + + def leave_group(self, group_id): + if self.is_active(): + self._coordinator.leave_group(group_id) + + LOG.info( + 'Left service group:%s, member:%s', + group_id, + self._my_id + ) + + def get_members(self, group_id): + if not self.is_active(): + return [] + + get_members_req = self._coordinator.get_members(group_id) + try: + members = get_members_req.get() + + LOG.debug('Members of group %s: %s', group_id, members) + + return members + except tooz.coordination.GroupNotCreated: + LOG.warning('Group %s does not exist.', group_id) + + return [] diff --git a/requirements.txt b/requirements.txt index 1ed335203..a5bcfe877 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,8 +31,10 @@ python-neutronclient<3,>=2.3.11 python-novaclient>=2.22.0 PyYAML>=3.1.0 requests>=2.5.2 +retrying>=1.2.3,!=1.3.0 # Apache-2.0 six>=1.9.0 SQLAlchemy<1.1.0,>=0.9.7 stevedore>=1.5.0 # Apache-2.0 WSME>=0.7 yaql>=0.2.7,!=0.3.0 # Apache 2.0 License +tooz>=0.16.0 # Apache-2.0