Refactor health manager for future work

This patch is about a revision to the health manager (service). It adds
to the health manager a thread pool that can be used for cluster status
monitoring.

Change-Id: Iaaebe80a34ee4ae0f636342ebc27231c6f312db9
This commit is contained in:
tengqm 2016-02-24 10:26:18 -05:00
parent a868d690ee
commit fb7781d3ca
3 changed files with 79 additions and 51 deletions

View File

@ -10,90 +10,108 @@
# License for the specific language governing permissions and limitations
# under the License.
'''
"""
Health Manager class.
Health Manager is responsible for monitoring the health of the clusters and
take corresponding actions to recover the clusters based on the pre-defined
trigger corresponding actions to recover the clusters based on the pre-defined
health policies.
'''
"""
from oslo_config import cfg
from oslo_context import context as oslo_context
import oslo_messaging
from oslo_service import service
from oslo_service import threadgroup
from senlin.common import consts
from senlin.common import context
from senlin.common import messaging as rpc_messaging
health_mgr_opts = [
cfg.IntOpt('periodic_interval_max',
default=60,
help='Seconds between periodic tasks to be called'),
cfg.BoolOpt('periodic_enable',
default=True,
help='Enable periodic tasks'),
cfg.IntOpt('periodic_fuzzy_delay',
default=60,
help='Range of seconds to randomly delay when starting the'
' periodic task scheduler to reduce stampeding.'
' (Disable by setting to 0)'),
]
CONF = cfg.CONF
CONF.register_opts(health_mgr_opts)
class Health_Manager(service.Service):
class HealthManager(service.Service):
def __init__(self, engine_service, topic, version, thread_group_mgr):
super(Health_Manager, self).__init__()
self.threadgroup = thread_group_mgr
def __init__(self, engine_service, topic, version):
super(HealthManager, self).__init__()
self.TG = threadgroup.ThreadGroup()
self.engine_id = engine_service.engine_id
self.topic = topic
self.version = version
# params for periodic running task
self.periodic_interval_max = CONF.periodic_interval_max
self.periodic_enable = CONF.periodic_enable
self.periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
def periodic_tasks(self, raise_on_error=False):
def _idle_task(self):
pass
def start_periodic_tasks(self):
"""Tasks to be run at a periodic interval."""
# TODO(anyone): iterate clusters and call their periodic_tasks
return self.periodic_interval_max
self.TG.add_timer(cfg.CONF.periodic_interval, self._idle_task)
# TODO(anyone): start timers to check clusters
# - get clusters that needs health management from DB
# - get their checking options
# * if it is about node status polling, add a timer to trigger its
# do_check logic
# * if it is about listening to message queue, start a thread to
# listen events targeted at that cluster
def start(self):
super(Health_Manager, self).start()
super(HealthManager, self).start()
self.target = oslo_messaging.Target(server=self.engine_id,
topic=self.topic,
version=self.version)
server = rpc_messaging.get_rpc_server(self.target, self)
server.start()
if self.periodic_enable:
# if self.periodic_fuzzy_delay:
# initial_delay = random.randint(0, self.periodic_fuzzy_delay)
# else:
# initial_delay = None
self.threadgroup.add_timer(self.periodic_interval_max,
self.periodic_tasks)
def listening(self, context):
'''Respond to confirm that the engine is still alive.'''
return True
self.start_periodic_tasks()
def stop(self):
super(Health_Manager, self).stop()
self.TG.stop_timers()
super(HealthManager, self).stop()
def listening(self, ctx):
"""Respond to confirm that the rpc service is still alive."""
return True
def register_cluster(self, cluster_id, check_type, interval=None,
**params):
"""Register cluster for health checking.
:param cluster_id: The ID of the cluster to be checked.
:param check_type: A string indicating the type of checks.
:param interval: An optional integer indicating the length of checking
periods in seconds.
:param \*\*params: Other parameters for the health check.
:return: None
"""
pass
def unregister_cluster(self, cluster_id):
"""Unregister a cluster from health checking.
:param cluster_id: The ID of the cluste to be unregistered.
:return: None
"""
pass
def notify(context, method, engine_id, *args, **kwargs):
'''Send notification to dispatcher
def notify(engine_id, method, *args, **kwargs):
"""Send notification to health manager service.
:param context: rpc request context
:param method: remote method to call
:param engine_id: dispatcher to notify; broadcast if value is None
'''
:param method: remote method to call
"""
timeout = cfg.CONF.engine_life_check_timeout
client = rpc_messaging.get_rpc_client(version=consts.RPC_API_VERSION)
@ -103,21 +121,33 @@ def notify(context, method, engine_id, *args, **kwargs):
call_context = client.prepare(
version=consts.RPC_API_VERSION,
timeout=timeout,
topic=consts.ENGINE_DISPATCHER_TOPIC,
topic=consts.ENGINE_HEALTH_MGR_TOPIC,
server=engine_id)
else:
# Broadcast to all disptachers
call_context = client.prepare(
version=consts.RPC_API_VERSION,
timeout=timeout,
topic=consts.ENGINE_DISPATCHER_TOPIC)
topic=consts.ENGINE_HEALTH_MGR_TOPIC)
ctx = oslo_context.get_current()
if ctx is None:
ctx = context.RequestContext(is_admin=True)
try:
call_context.call(context, method, *args, **kwargs)
call_context.call(ctx, method, *args, **kwargs)
return True
except oslo_messaging.MessagingTimeout:
return False
def register(cluster_id, engine_id=None, *args, **kwargs):
return notify(engine_id, 'register_cluster', cluster_id, *args, **kwargs)
def unregister(cluster_id, engine_id=None):
return notify(engine_id, 'unregister_cluster', cluster_id)
def list_opts():
yield None, health_mgr_opts

View File

@ -100,7 +100,7 @@ class EngineService(service.Service):
self.engine_id = str(uuid.uuid4())
self.init_tgm()
# create a dispatcher greenthread for this engine.
# create a dispatcher RPC service for this engine.
self.dispatcher = dispatcher.Dispatcher(self,
self.dispatcher_topic,
consts.RPC_API_VERSION,
@ -108,11 +108,10 @@ class EngineService(service.Service):
LOG.info(_LI("Starting dispatcher for engine %s"), self.engine_id)
self.dispatcher.start()
# create a health manager greenthread for this engine.
self.health_mgr = health_manager.Health_Manager(self,
self.health_mgr_topic,
consts.RPC_API_VERSION,
self.TG)
# create a health manager RPC service for this engine.
self.health_mgr = health_manager.HealthManager(
self, self.health_mgr_topic, consts.RPC_API_VERSION)
LOG.info(_LI("Starting health manager for engine %s"), self.engine_id)
self.health_mgr.start()

View File

@ -26,7 +26,7 @@ from senlin.tests.unit.common import base
@mock.patch('senlin.engine.dispatcher.Dispatcher')
@mock.patch('senlin.engine.health_manager.Health_Manager')
@mock.patch('senlin.engine.health_manager.HealthManager')
@mock.patch('oslo_messaging.Target')
class EngineBasicTest(base.SenlinTestCase):
@ -65,8 +65,7 @@ class EngineBasicTest(base.SenlinTestCase):
mock_hm_cls.assert_called_once_with(self.eng,
self.eng.health_mgr_topic,
consts.RPC_API_VERSION,
self.eng.TG)
consts.RPC_API_VERSION)
self.assertEqual(mock_hm, self.eng.health_mgr)
mock_hm.start.assert_called_once_with()