Merge "Add coordination util for service management"

This commit is contained in:
Jenkins 2015-08-11 12:07:02 +00:00 committed by Gerrit Code Review
commit 223dee29e4
6 changed files with 288 additions and 0 deletions

View File

@ -96,18 +96,29 @@ wf_trace_log_name_opt = cfg.StrOpt(
'workflow trace output.'
)
coordination_opts = [
cfg.StrOpt('backend_url',
default=None,
help='The backend URL to be used for coordination'),
cfg.FloatOpt('heartbeat_interval',
default=5.0,
help='Number of seconds between heartbeats for coordination.')
]
CONF = cfg.CONF
API_GROUP = 'api'
ENGINE_GROUP = 'engine'
EXECUTOR_GROUP = 'executor'
PECAN_GROUP = 'pecan'
COORDINATION_GROUP = 'coordination'
CONF.register_opts(api_opts, group=API_GROUP)
CONF.register_opts(engine_opts, group=ENGINE_GROUP)
CONF.register_opts(pecan_opts, group=PECAN_GROUP)
CONF.register_opts(executor_opts, group=EXECUTOR_GROUP)
CONF.register_opt(wf_trace_log_name_opt)
CONF.register_opts(coordination_opts, group=COORDINATION_GROUP)
CLI_OPTS = [
use_debugger,
@ -134,6 +145,7 @@ def list_opts():
(ENGINE_GROUP, engine_opts),
(EXECUTOR_GROUP, executor_opts),
(PECAN_GROUP, pecan_opts),
(COORDINATION_GROUP, coordination_opts),
(None, itertools.chain(
CLI_OPTS,
[wf_trace_log_name_opt]

View File

@ -119,3 +119,7 @@ class SizeLimitExceededException(MistralException):
super(SizeLimitExceededException, self).__init__(
"Size of '%s' is %dKB which exceeds the limit of %dKB"
% (field_name, size_kb, size_limit_kb))
class CoordinationException(MistralException):
http_code = 500

View File

@ -0,0 +1,118 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from mistral.tests import base
from mistral.utils import coordination
class CoordinationTest(base.BaseTest):
def setUp(self):
super(CoordinationTest, self).setUp()
def test_start(self):
cfg.CONF.set_default(
'backend_url',
'zake://',
'coordination'
)
coordinator = coordination.ServiceCoordinator('fake_id')
coordinator.start()
self.assertTrue(coordinator.is_active())
def test_start_without_backend(self):
cfg.CONF.set_default('backend_url', None, 'coordination')
coordinator = coordination.ServiceCoordinator()
coordinator.start()
self.assertFalse(coordinator.is_active())
def test_stop_not_active(self):
cfg.CONF.set_default('backend_url', None, 'coordination')
coordinator = coordination.ServiceCoordinator()
coordinator.start()
coordinator.stop()
self.assertFalse(coordinator.is_active())
def test_stop(self):
cfg.CONF.set_default(
'backend_url',
'zake://',
'coordination'
)
coordinator = coordination.ServiceCoordinator()
coordinator.start()
coordinator.stop()
self.assertFalse(coordinator.is_active())
def test_join_group_not_active(self):
cfg.CONF.set_default('backend_url', None, 'coordination')
coordinator = coordination.ServiceCoordinator()
coordinator.start()
coordinator.join_group('fake_group')
members = coordinator.get_members('fake_group')
self.assertFalse(coordinator.is_active())
self.assertEqual(0, len(members))
def test_join_group_and_get_members(self):
cfg.CONF.set_default(
'backend_url',
'zake://',
'coordination'
)
coordinator = coordination.ServiceCoordinator(my_id='fake_id')
coordinator.start()
coordinator.join_group('fake_group')
members = coordinator.get_members('fake_group')
self.assertEqual(1, len(members))
self.assertItemsEqual(('fake_id',), members)
def test_join_group_and_leave_group(self):
cfg.CONF.set_default(
'backend_url',
'zake://',
'coordination'
)
coordinator = coordination.ServiceCoordinator(my_id='fake_id')
coordinator.start()
coordinator.join_group('fake_group')
members_before = coordinator.get_members('fake_group')
coordinator.leave_group('fake_group')
members_after = coordinator.get_members('fake_group')
self.assertEqual(1, len(members_before))
self.assertEqual(set(['fake_id']), members_before)
self.assertEqual(0, len(members_after))
self.assertEqual(set([]), members_after)

View File

@ -19,6 +19,7 @@ import json
import logging
import os
from os import path
import socket
import threading
import uuid
@ -241,3 +242,9 @@ def get_input_dict(inputs):
input_dict[x] = NotDefined
return input_dict
def get_process_identifier():
"""Gets current running process identifier."""
return "%s_%s" % (socket.gethostname(), os.getpid())

View File

@ -0,0 +1,145 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from oslo_config import cfg
from oslo_log import log
from retrying import retry
import tooz.coordination
from mistral import utils
LOG = log.getLogger(__name__)
class ServiceCoordinator(object):
"""Service coordinator.
This class uses the `tooz` library to manage group membership.
To ensure that the other agents know this agent is still alive,
the `heartbeat` method should be called periodically.
"""
def __init__(self, my_id=None):
self._coordinator = None
self._my_id = my_id or utils.get_process_identifier()
self._started = False
def start(self):
backend_url = cfg.CONF.coordination.backend_url
if backend_url:
try:
self._coordinator = tooz.coordination.get_coordinator(
backend_url,
self._my_id
)
self._coordinator.start()
self._started = True
LOG.info('Coordination backend started successfully.')
except tooz.coordination.ToozError as e:
self._started = False
LOG.exception('Error connecting to coordination backend. '
'%s', six.text_type(e))
def stop(self):
if not self.is_active():
return
try:
self._coordinator.stop()
except tooz.coordination.ToozError:
LOG.warning('Error connecting to coordination backend.')
finally:
self._coordinator = None
self._started = False
def is_active(self):
return self._coordinator and self._started
def heartbeat(self):
if not self.is_active():
# Re-connect.
self.start()
if not self.is_active():
LOG.debug("Coordination backend didn't start.")
return
try:
self._coordinator.heartbeat()
except tooz.coordination.ToozError as e:
LOG.exception('Error sending a heartbeat to coordination '
'backend. %s', six.text_type(e))
@retry(stop_max_attempt_number=5)
def join_group(self, group_id):
if not self.is_active() or not group_id:
return
try:
join_req = self._coordinator.join_group(group_id)
join_req.get()
LOG.info(
'Joined service group:%s, member:%s',
group_id,
self._my_id
)
return
except tooz.coordination.MemberAlreadyExist:
return
except tooz.coordination.GroupNotCreated as e:
create_grp_req = self._coordinator.create_group(group_id)
try:
create_grp_req.get()
except tooz.coordination.GroupAlreadyExist:
pass
# Re-raise exception to join group again.
raise e
def leave_group(self, group_id):
if self.is_active():
self._coordinator.leave_group(group_id)
LOG.info(
'Left service group:%s, member:%s',
group_id,
self._my_id
)
def get_members(self, group_id):
if not self.is_active():
return []
get_members_req = self._coordinator.get_members(group_id)
try:
members = get_members_req.get()
LOG.debug('Members of group %s: %s', group_id, members)
return members
except tooz.coordination.GroupNotCreated:
LOG.warning('Group %s does not exist.', group_id)
return []

View File

@ -31,8 +31,10 @@ python-neutronclient<3,>=2.3.11
python-novaclient>=2.22.0
PyYAML>=3.1.0
requests>=2.5.2
retrying>=1.2.3,!=1.3.0 # Apache-2.0
six>=1.9.0
SQLAlchemy<1.1.0,>=0.9.7
stevedore>=1.5.0 # Apache-2.0
WSME>=0.7
yaql>=0.2.7,!=0.3.0 # Apache 2.0 License
tooz>=0.16.0 # Apache-2.0