Add coordination util for service management
Tooz is a Python library that provides a coordination API. Its primary goal is to handle groups and membership of these groups in distributed systems. This patch adds coordination util to Mistral, which makes use of Tooz libary. Change-Id: Icbf3086d01649af813727f0972d6d5b0631d6afb Partially-Implements: blueprint mistral-service-api
This commit is contained in:
parent
30589cfa5f
commit
4960da9496
@ -96,18 +96,29 @@ wf_trace_log_name_opt = cfg.StrOpt(
|
||||
'workflow trace output.'
|
||||
)
|
||||
|
||||
coordination_opts = [
|
||||
cfg.StrOpt('backend_url',
|
||||
default=None,
|
||||
help='The backend URL to be used for coordination'),
|
||||
cfg.FloatOpt('heartbeat_interval',
|
||||
default=5.0,
|
||||
help='Number of seconds between heartbeats for coordination.')
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
API_GROUP = 'api'
|
||||
ENGINE_GROUP = 'engine'
|
||||
EXECUTOR_GROUP = 'executor'
|
||||
PECAN_GROUP = 'pecan'
|
||||
COORDINATION_GROUP = 'coordination'
|
||||
|
||||
CONF.register_opts(api_opts, group=API_GROUP)
|
||||
CONF.register_opts(engine_opts, group=ENGINE_GROUP)
|
||||
CONF.register_opts(pecan_opts, group=PECAN_GROUP)
|
||||
CONF.register_opts(executor_opts, group=EXECUTOR_GROUP)
|
||||
CONF.register_opt(wf_trace_log_name_opt)
|
||||
CONF.register_opts(coordination_opts, group=COORDINATION_GROUP)
|
||||
|
||||
CLI_OPTS = [
|
||||
use_debugger,
|
||||
@ -134,6 +145,7 @@ def list_opts():
|
||||
(ENGINE_GROUP, engine_opts),
|
||||
(EXECUTOR_GROUP, executor_opts),
|
||||
(PECAN_GROUP, pecan_opts),
|
||||
(COORDINATION_GROUP, coordination_opts),
|
||||
(None, itertools.chain(
|
||||
CLI_OPTS,
|
||||
[wf_trace_log_name_opt]
|
||||
|
@ -119,3 +119,7 @@ class SizeLimitExceededException(MistralException):
|
||||
super(SizeLimitExceededException, self).__init__(
|
||||
"Size of '%s' is %dKB which exceeds the limit of %dKB"
|
||||
% (field_name, size_kb, size_limit_kb))
|
||||
|
||||
|
||||
class CoordinationException(MistralException):
|
||||
http_code = 500
|
||||
|
118
mistral/tests/unit/utils/test_coordination.py
Normal file
118
mistral/tests/unit/utils/test_coordination.py
Normal file
@ -0,0 +1,118 @@
|
||||
# Copyright 2015 Huawei Technologies Co., Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.tests import base
|
||||
from mistral.utils import coordination
|
||||
|
||||
|
||||
class CoordinationTest(base.BaseTest):
|
||||
def setUp(self):
|
||||
super(CoordinationTest, self).setUp()
|
||||
|
||||
def test_start(self):
|
||||
cfg.CONF.set_default(
|
||||
'backend_url',
|
||||
'zake://',
|
||||
'coordination'
|
||||
)
|
||||
|
||||
coordinator = coordination.ServiceCoordinator('fake_id')
|
||||
coordinator.start()
|
||||
|
||||
self.assertTrue(coordinator.is_active())
|
||||
|
||||
def test_start_without_backend(self):
|
||||
cfg.CONF.set_default('backend_url', None, 'coordination')
|
||||
|
||||
coordinator = coordination.ServiceCoordinator()
|
||||
coordinator.start()
|
||||
|
||||
self.assertFalse(coordinator.is_active())
|
||||
|
||||
def test_stop_not_active(self):
|
||||
cfg.CONF.set_default('backend_url', None, 'coordination')
|
||||
|
||||
coordinator = coordination.ServiceCoordinator()
|
||||
coordinator.start()
|
||||
|
||||
coordinator.stop()
|
||||
|
||||
self.assertFalse(coordinator.is_active())
|
||||
|
||||
def test_stop(self):
|
||||
cfg.CONF.set_default(
|
||||
'backend_url',
|
||||
'zake://',
|
||||
'coordination'
|
||||
)
|
||||
|
||||
coordinator = coordination.ServiceCoordinator()
|
||||
coordinator.start()
|
||||
|
||||
coordinator.stop()
|
||||
|
||||
self.assertFalse(coordinator.is_active())
|
||||
|
||||
def test_join_group_not_active(self):
|
||||
cfg.CONF.set_default('backend_url', None, 'coordination')
|
||||
|
||||
coordinator = coordination.ServiceCoordinator()
|
||||
coordinator.start()
|
||||
|
||||
coordinator.join_group('fake_group')
|
||||
members = coordinator.get_members('fake_group')
|
||||
|
||||
self.assertFalse(coordinator.is_active())
|
||||
|
||||
self.assertEqual(0, len(members))
|
||||
|
||||
def test_join_group_and_get_members(self):
|
||||
cfg.CONF.set_default(
|
||||
'backend_url',
|
||||
'zake://',
|
||||
'coordination'
|
||||
)
|
||||
|
||||
coordinator = coordination.ServiceCoordinator(my_id='fake_id')
|
||||
coordinator.start()
|
||||
|
||||
coordinator.join_group('fake_group')
|
||||
members = coordinator.get_members('fake_group')
|
||||
|
||||
self.assertEqual(1, len(members))
|
||||
self.assertItemsEqual(('fake_id',), members)
|
||||
|
||||
def test_join_group_and_leave_group(self):
|
||||
cfg.CONF.set_default(
|
||||
'backend_url',
|
||||
'zake://',
|
||||
'coordination'
|
||||
)
|
||||
|
||||
coordinator = coordination.ServiceCoordinator(my_id='fake_id')
|
||||
coordinator.start()
|
||||
|
||||
coordinator.join_group('fake_group')
|
||||
members_before = coordinator.get_members('fake_group')
|
||||
|
||||
coordinator.leave_group('fake_group')
|
||||
members_after = coordinator.get_members('fake_group')
|
||||
|
||||
self.assertEqual(1, len(members_before))
|
||||
self.assertEqual(set(['fake_id']), members_before)
|
||||
|
||||
self.assertEqual(0, len(members_after))
|
||||
self.assertEqual(set([]), members_after)
|
@ -19,6 +19,7 @@ import json
|
||||
import logging
|
||||
import os
|
||||
from os import path
|
||||
import socket
|
||||
import threading
|
||||
import uuid
|
||||
|
||||
@ -241,3 +242,9 @@ def get_input_dict(inputs):
|
||||
input_dict[x] = NotDefined
|
||||
|
||||
return input_dict
|
||||
|
||||
|
||||
def get_process_identifier():
|
||||
"""Gets current running process identifier."""
|
||||
|
||||
return "%s_%s" % (socket.gethostname(), os.getpid())
|
||||
|
145
mistral/utils/coordination.py
Normal file
145
mistral/utils/coordination.py
Normal file
@ -0,0 +1,145 @@
|
||||
# Copyright 2015 Huawei Technologies Co., Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import six
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from retrying import retry
|
||||
import tooz.coordination
|
||||
|
||||
from mistral import utils
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class ServiceCoordinator(object):
|
||||
"""Service coordinator.
|
||||
|
||||
This class uses the `tooz` library to manage group membership.
|
||||
|
||||
To ensure that the other agents know this agent is still alive,
|
||||
the `heartbeat` method should be called periodically.
|
||||
"""
|
||||
|
||||
def __init__(self, my_id=None):
|
||||
self._coordinator = None
|
||||
self._my_id = my_id or utils.get_process_identifier()
|
||||
self._started = False
|
||||
|
||||
def start(self):
|
||||
backend_url = cfg.CONF.coordination.backend_url
|
||||
|
||||
if backend_url:
|
||||
try:
|
||||
self._coordinator = tooz.coordination.get_coordinator(
|
||||
backend_url,
|
||||
self._my_id
|
||||
)
|
||||
|
||||
self._coordinator.start()
|
||||
self._started = True
|
||||
|
||||
LOG.info('Coordination backend started successfully.')
|
||||
except tooz.coordination.ToozError as e:
|
||||
self._started = False
|
||||
|
||||
LOG.exception('Error connecting to coordination backend. '
|
||||
'%s', six.text_type(e))
|
||||
|
||||
def stop(self):
|
||||
if not self.is_active():
|
||||
return
|
||||
|
||||
try:
|
||||
self._coordinator.stop()
|
||||
except tooz.coordination.ToozError:
|
||||
LOG.warning('Error connecting to coordination backend.')
|
||||
finally:
|
||||
self._coordinator = None
|
||||
self._started = False
|
||||
|
||||
def is_active(self):
|
||||
return self._coordinator and self._started
|
||||
|
||||
def heartbeat(self):
|
||||
if not self.is_active():
|
||||
# Re-connect.
|
||||
self.start()
|
||||
|
||||
if not self.is_active():
|
||||
LOG.debug("Coordination backend didn't start.")
|
||||
return
|
||||
|
||||
try:
|
||||
self._coordinator.heartbeat()
|
||||
except tooz.coordination.ToozError as e:
|
||||
LOG.exception('Error sending a heartbeat to coordination '
|
||||
'backend. %s', six.text_type(e))
|
||||
|
||||
@retry(stop_max_attempt_number=5)
|
||||
def join_group(self, group_id):
|
||||
if not self.is_active() or not group_id:
|
||||
return
|
||||
|
||||
try:
|
||||
join_req = self._coordinator.join_group(group_id)
|
||||
join_req.get()
|
||||
|
||||
LOG.info(
|
||||
'Joined service group:%s, member:%s',
|
||||
group_id,
|
||||
self._my_id
|
||||
)
|
||||
|
||||
return
|
||||
except tooz.coordination.MemberAlreadyExist:
|
||||
return
|
||||
except tooz.coordination.GroupNotCreated as e:
|
||||
create_grp_req = self._coordinator.create_group(group_id)
|
||||
|
||||
try:
|
||||
create_grp_req.get()
|
||||
except tooz.coordination.GroupAlreadyExist:
|
||||
pass
|
||||
|
||||
# Re-raise exception to join group again.
|
||||
raise e
|
||||
|
||||
def leave_group(self, group_id):
|
||||
if self.is_active():
|
||||
self._coordinator.leave_group(group_id)
|
||||
|
||||
LOG.info(
|
||||
'Left service group:%s, member:%s',
|
||||
group_id,
|
||||
self._my_id
|
||||
)
|
||||
|
||||
def get_members(self, group_id):
|
||||
if not self.is_active():
|
||||
return []
|
||||
|
||||
get_members_req = self._coordinator.get_members(group_id)
|
||||
try:
|
||||
members = get_members_req.get()
|
||||
|
||||
LOG.debug('Members of group %s: %s', group_id, members)
|
||||
|
||||
return members
|
||||
except tooz.coordination.GroupNotCreated:
|
||||
LOG.warning('Group %s does not exist.', group_id)
|
||||
|
||||
return []
|
@ -31,8 +31,10 @@ python-neutronclient<3,>=2.3.11
|
||||
python-novaclient>=2.22.0
|
||||
PyYAML>=3.1.0
|
||||
requests>=2.5.2
|
||||
retrying>=1.2.3,!=1.3.0 # Apache-2.0
|
||||
six>=1.9.0
|
||||
SQLAlchemy<1.1.0,>=0.9.7
|
||||
stevedore>=1.5.0 # Apache-2.0
|
||||
WSME>=0.7
|
||||
yaql>=0.2.7,!=0.3.0 # Apache 2.0 License
|
||||
tooz>=0.16.0 # Apache-2.0
|
||||
|
Loading…
Reference in New Issue
Block a user