Fix Health Manager Problems
Problems of HealthManager are caused by context usage. This patch fix them by revision of arguments to adopt admin context and request context cooperatively. Change-Id: I206111f4d2664f545b6e5235fc4656c135093542
This commit is contained in:
parent
d8d9676157
commit
9e2e9ed79a
|
@ -90,6 +90,12 @@ CLUSTER_SORT_KEYS = [
|
|||
CLUSTER_INIT_AT, CLUSTER_CREATED_AT, CLUSTER_UPDATED_AT,
|
||||
]
|
||||
|
||||
DETECTION_TYPES = (
|
||||
VM_LIFECYCLE_EVENTS, NODE_STATUS_POLLING, LB_STATUS_POLLING,
|
||||
) = (
|
||||
'VM_LIFECYCLE_EVENTS', 'NODE_STATUS_POLLING', 'LB_STATUS_POLLING',
|
||||
)
|
||||
|
||||
NODE_ATTRS = (
|
||||
NODE_INDEX, NODE_NAME, NODE_PROFILE_ID, NODE_CLUSTER_ID,
|
||||
NODE_INIT_AT, NODE_CREATED_AT, NODE_UPDATED_AT,
|
||||
|
|
|
@ -73,11 +73,11 @@ class HealthManager(service.Service):
|
|||
# listen events targeted at that cluster
|
||||
self.TG.add_timer(cfg.CONF.periodic_interval, self._idle_task)
|
||||
|
||||
# for registry in self.registries:
|
||||
# if registry.check_type == 'NODE_STATUS_POLLING':
|
||||
# interval = min(registry.interval, self.periodic_interval_max)
|
||||
# self.TG.add_timer(interval,
|
||||
# self._periodic_check(registry.cluster_id))
|
||||
for registry in self.registries:
|
||||
if registry.check_type == 'NODE_STATUS_POLLING':
|
||||
interval = min(registry.interval, self.periodic_interval_max)
|
||||
self.TG.add_timer(interval,
|
||||
self._periodic_check(registry.cluster_id))
|
||||
|
||||
def start(self):
|
||||
super(HealthManager, self).start()
|
||||
|
@ -86,7 +86,7 @@ class HealthManager(service.Service):
|
|||
version=self.version)
|
||||
server = rpc_messaging.get_rpc_server(self.target, self)
|
||||
server.start()
|
||||
# self._load_runtime_registry()
|
||||
self._load_runtime_registry()
|
||||
self.start_periodic_tasks()
|
||||
|
||||
def _load_runtime_registry(self):
|
||||
|
@ -105,10 +105,11 @@ class HealthManager(service.Service):
|
|||
"""Respond to confirm that the rpc service is still alive."""
|
||||
return True
|
||||
|
||||
def register_cluster(self, cluster_id, check_type, interval=None,
|
||||
**params):
|
||||
def register_cluster(self, ctx, cluster_id, check_type, interval=None,
|
||||
params=None):
|
||||
"""Register cluster for health checking.
|
||||
|
||||
:param ctx: The context of notify request.
|
||||
:param cluster_id: The ID of the cluster to be checked.
|
||||
:param check_type: A string indicating the type of checks.
|
||||
:param interval: An optional integer indicating the length of checking
|
||||
|
@ -116,23 +117,25 @@ class HealthManager(service.Service):
|
|||
:param \*\*params: Other parameters for the health check.
|
||||
:return: None
|
||||
"""
|
||||
registry = db_api.registry_create(self.ctx, cluster_id, check_type,
|
||||
params = params or {}
|
||||
registry = db_api.registry_create(ctx, cluster_id, check_type,
|
||||
interval, params, self.engine_id)
|
||||
self.rt['registries'].append(registry)
|
||||
|
||||
def unregister_cluster(self, cluster_id):
|
||||
def unregister_cluster(self, ctx, cluster_id):
|
||||
"""Unregister a cluster from health checking.
|
||||
|
||||
:param ctx: The context of notify request.
|
||||
:param cluster_id: The ID of the cluste to be unregistered.
|
||||
:return: None
|
||||
"""
|
||||
for registry in self.rt['registries']:
|
||||
if registry.cluster_id == cluster_id:
|
||||
self.rt['registries'].remove(registry)
|
||||
db_api.registry_delete(self.ctx, cluster_id=cluster_id)
|
||||
db_api.registry_delete(ctx, cluster_id)
|
||||
|
||||
|
||||
def notify(engine_id, method, *args, **kwargs):
|
||||
def notify(engine_id, method, **kwargs):
|
||||
"""Send notification to health manager service.
|
||||
|
||||
:param engine_id: dispatcher to notify; broadcast if value is None
|
||||
|
@ -159,18 +162,27 @@ def notify(engine_id, method, *args, **kwargs):
|
|||
ctx = context.get_admin_context()
|
||||
|
||||
try:
|
||||
call_context.call(ctx, method, *args, **kwargs)
|
||||
call_context.call(ctx, method, **kwargs)
|
||||
return True
|
||||
except oslo_messaging.MessagingTimeout:
|
||||
return False
|
||||
|
||||
|
||||
def register(cluster_id, engine_id=None, *args, **kwargs):
|
||||
return notify(engine_id, 'register_cluster', cluster_id, *args, **kwargs)
|
||||
def register(cluster_id, engine_id=None, **kwargs):
|
||||
params = kwargs.pop('params', {})
|
||||
interval = kwargs.pop('interval', cfg.CONF.periodic_interval)
|
||||
check_type = kwargs.pop('check_type', consts.NODE_STATUS_POLLING)
|
||||
return notify(engine_id, 'register_cluster',
|
||||
cluster_id=cluster_id,
|
||||
interval=interval,
|
||||
check_type=check_type,
|
||||
params=params)
|
||||
|
||||
|
||||
def unregister(cluster_id, engine_id=None):
|
||||
return notify(engine_id, 'unregister_cluster', cluster_id)
|
||||
return notify(engine_id,
|
||||
'unregister_cluster',
|
||||
cluster_id=cluster_id)
|
||||
|
||||
|
||||
def list_opts():
|
||||
|
|
|
@ -53,7 +53,6 @@ class TestHealthManager(base.SenlinTestCase):
|
|||
]
|
||||
|
||||
self.hm._load_runtime_registry()
|
||||
|
||||
self.assertEqual(2, len(self.hm.registries))
|
||||
|
||||
@mock.patch('senlin.rpc.client.EngineClient')
|
||||
|
@ -67,11 +66,14 @@ class TestHealthManager(base.SenlinTestCase):
|
|||
|
||||
@mock.patch.object(db_api, 'registry_create')
|
||||
def test_register_cluster(self, mock_reg_create):
|
||||
self.hm.register_cluster(cluster_id='CLUSTER_ID',
|
||||
|
||||
ctx = mock.Mock()
|
||||
self.hm.register_cluster(ctx,
|
||||
cluster_id='CLUSTER_ID',
|
||||
check_type='NODE_STATUS_POLLING',
|
||||
interval=50)
|
||||
matched_type = 'NODE_STATUS_POLLING'
|
||||
mock_reg_create.assert_called_once_with(self.hm.ctx,
|
||||
mock_reg_create.assert_called_once_with(ctx,
|
||||
cluster_id='CLUSTER_ID',
|
||||
check_type=matched_type,
|
||||
engine_id='ENGINE_ID',
|
||||
|
@ -80,12 +82,15 @@ class TestHealthManager(base.SenlinTestCase):
|
|||
|
||||
@mock.patch.object(db_api, 'registry_delete')
|
||||
def test_unregister_cluster(self, mock_reg_delete):
|
||||
self.hm.unregister_cluster(cluster_id='CLUSTER_ID')
|
||||
mock_reg_delete.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID')
|
||||
|
||||
ctx = mock.Mock()
|
||||
self.hm.unregister_cluster(ctx, cluster_id='CLUSTER_ID')
|
||||
mock_reg_delete.assert_called_once_with(ctx, 'CLUSTER_ID')
|
||||
|
||||
@mock.patch.object(db_api, 'registry_claim')
|
||||
@mock.patch('oslo_messaging.Target')
|
||||
def test_start(self, mock_target, mock_reg_claim):
|
||||
|
||||
tg = mock.Mock()
|
||||
self.hm.TG = tg
|
||||
target = mock.Mock()
|
||||
|
@ -100,8 +105,8 @@ class TestHealthManager(base.SenlinTestCase):
|
|||
self.patchobject(self.hm, '_load_runtime_registry',
|
||||
return_value=mock.Mock)
|
||||
self.hm.rt['registries'] = [registry]
|
||||
# mock_add_tm = self.patchobject(self.hm.TG, 'add_timer',
|
||||
# return_value=mock.Mock())
|
||||
mock_add_tm = self.patchobject(self.hm.TG, 'add_timer',
|
||||
return_value=mock.Mock())
|
||||
pc = mock.Mock()
|
||||
self.patchobject(self.hm, '_periodic_check',
|
||||
return_value=pc)
|
||||
|
@ -111,4 +116,4 @@ class TestHealthManager(base.SenlinTestCase):
|
|||
version='1.0')
|
||||
mock_st.assert_called_once_with(target, self.hm)
|
||||
rpc_server.start.assert_called_once_with()
|
||||
# mock_add_tm.assert_called_with(50, pc)
|
||||
mock_add_tm.assert_called_with(50, pc)
|
||||
|
|
Loading…
Reference in New Issue