feat: distributed task health check endpoint
REQUEST: GET http://127.0.0.1:8888/v1.0/health/distributed_task/taskflow RESPONSE: { "online": "true" } 200 OK Change-Id: I050a8bf04c30fa844e23654a626cc2ea3d77c0a2
This commit is contained in:
parent
28991d23c9
commit
50960c9242
|
@ -79,7 +79,21 @@ class TaskFlowDistributedTaskDriver(base.Driver):
|
|||
|
||||
def is_alive(self):
|
||||
"""Health check for TaskFlow worker."""
|
||||
return True
|
||||
is_alive = False
|
||||
try:
|
||||
with self.persistence() as persistence:
|
||||
with self.job_board(
|
||||
self.jobboard_backend_conf.copy(),
|
||||
persistence=persistence) as board:
|
||||
if board.connected:
|
||||
is_alive = True
|
||||
except Exception as e:
|
||||
LOG.error("{0} connection cannot be established to {1}".format(
|
||||
self.vendor_name,
|
||||
self.distributed_task_conf.jobboard_backend_type))
|
||||
LOG.exception(str(e))
|
||||
|
||||
return is_alive
|
||||
|
||||
def persistence(self):
|
||||
return persistence_backends.backend(
|
||||
|
|
|
@ -148,7 +148,7 @@ def create_log_delivery_container(project_id, auth_token):
|
|||
class ContextUpdateTask(task.Task):
|
||||
|
||||
def execute(self, context_dict):
|
||||
context = context_utils.RequestContext.from_dict(ctx=context_dict)
|
||||
context = context_utils.RequestContext.from_dict(context_dict)
|
||||
context.update_store()
|
||||
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ class HealthControllerBase(controller.ManagerControllerBase):
|
|||
self._dns = self.driver.dns
|
||||
self._storage = self.driver.storage
|
||||
self._providers = self.driver.providers
|
||||
self._distributed_task = self.driver.distributed_task
|
||||
|
||||
@abc.abstractmethod
|
||||
def health(self):
|
||||
|
@ -48,6 +49,15 @@ class HealthControllerBase(controller.ManagerControllerBase):
|
|||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def is_distributed_task_alive(self, distributed_task_name):
|
||||
"""Returns the health of distributed_task
|
||||
|
||||
:param distributed_task_name
|
||||
:raises: NotImplementedError
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def is_dns_alive(self, dns_name):
|
||||
"""Returns the health of DNS Provider
|
||||
|
|
|
@ -32,6 +32,16 @@ class DefaultHealthController(health.HealthControllerBase):
|
|||
|
||||
dns_name = self._dns.dns_name.lower()
|
||||
dns_alive = self._dns.is_alive()
|
||||
distributed_task_name = self._distributed_task.vendor_name.lower()
|
||||
distributed_task_alive = self.is_distributed_task_alive(
|
||||
distributed_task_name)
|
||||
health_distributed_task = {
|
||||
'distributed_task_name': distributed_task_name,
|
||||
'is_alive': distributed_task_alive
|
||||
}
|
||||
health_map['distributed_task'] = health_distributed_task
|
||||
if not distributed_task_alive:
|
||||
is_alive = False
|
||||
health_dns = {'dns_name': dns_name,
|
||||
'is_alive': dns_alive}
|
||||
health_map['dns'] = health_dns
|
||||
|
@ -64,6 +74,14 @@ class DefaultHealthController(health.HealthControllerBase):
|
|||
|
||||
return self._providers[provider_name].obj.is_alive()
|
||||
|
||||
def is_distributed_task_alive(self, distributed_task_name):
|
||||
"""Returns the health of distributed_task."""
|
||||
|
||||
if distributed_task_name == self._distributed_task.vendor_name.lower():
|
||||
return self._distributed_task.is_alive()
|
||||
else:
|
||||
raise KeyError
|
||||
|
||||
def is_storage_alive(self, storage_name):
|
||||
"""Returns the health of storage."""
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ from poppy.transport.pecan.controllers.v1 import ssl_certificates
|
|||
# Hoist into package namespace
|
||||
Admin = admin.AdminController
|
||||
DNSHealth = health.DNSHealthController
|
||||
Distributed_Task_Health = health.DistributedTaskHealthController
|
||||
Flavors = flavors.FlavorsController
|
||||
Health = health.HealthController
|
||||
Home = home.HomeController
|
||||
|
|
|
@ -71,6 +71,32 @@ class StorageHealthController(base.Controller, hooks.HookController):
|
|||
pecan.response.status = 404
|
||||
|
||||
|
||||
class DistributedTaskHealthController(base.Controller, hooks.HookController):
|
||||
|
||||
__hooks__ = [poppy_hooks.Context(), poppy_hooks.Error()]
|
||||
|
||||
"""Distributed Task Health Controller."""
|
||||
|
||||
@pecan.expose('json')
|
||||
def get(self, distributed_task_name):
|
||||
"""GET.
|
||||
|
||||
Returns the health of distributed task manager
|
||||
|
||||
:param distributed_task_name
|
||||
:returns JSON storage model or HTTP 404
|
||||
"""
|
||||
|
||||
health_controller = self._driver.manager.health_controller
|
||||
|
||||
try:
|
||||
is_alive = health_controller.is_distributed_task_alive(
|
||||
distributed_task_name)
|
||||
return health_response.DistributedTaskModel(is_alive)
|
||||
except KeyError:
|
||||
pecan.response.status = 404
|
||||
|
||||
|
||||
class ProviderHealthController(base.Controller, hooks.HookController):
|
||||
|
||||
__hooks__ = [poppy_hooks.Context(), poppy_hooks.Error()]
|
||||
|
|
|
@ -60,6 +60,8 @@ class PecanTransportDriver(transport.Driver):
|
|||
health_controller.add_controller('dns', v1.DNSHealth(self))
|
||||
health_controller.add_controller('storage', v1.StorageHealth(self))
|
||||
health_controller.add_controller('provider', v1.ProviderHealth(self))
|
||||
health_controller.add_controller('distributed_task',
|
||||
v1.Distributed_Task_Health(self))
|
||||
home_controller.add_controller('services', v1.Services(self))
|
||||
home_controller.add_controller('flavors', v1.Flavors(self))
|
||||
home_controller.add_controller('admin', v1.Admin(self))
|
||||
|
|
|
@ -41,6 +41,16 @@ class StorageModel(collections.OrderedDict):
|
|||
self['online'] = 'false'
|
||||
|
||||
|
||||
class DistributedTaskModel(collections.OrderedDict):
|
||||
def __init__(self, is_alive):
|
||||
super(DistributedTaskModel, self).__init__()
|
||||
|
||||
if is_alive:
|
||||
self['online'] = 'true'
|
||||
else:
|
||||
self['online'] = 'false'
|
||||
|
||||
|
||||
class ProviderModel(collections.OrderedDict):
|
||||
def __init__(self, is_alive):
|
||||
super(ProviderModel, self).__init__()
|
||||
|
@ -84,6 +94,22 @@ class HealthModel(collections.OrderedDict):
|
|||
self['storage'] = {
|
||||
health_map['storage']['storage_name']: health_storage}
|
||||
|
||||
health_distributed_task = collections.OrderedDict()
|
||||
if health_map['distributed_task']['is_alive']:
|
||||
health_distributed_task['online'] = 'true'
|
||||
else:
|
||||
health_distributed_task['online'] = 'false'
|
||||
|
||||
health_distributed_task['links'] = link.Model(
|
||||
u'{0}/health/distributed_task/{1}'.format(
|
||||
controller.base_url,
|
||||
health_map['distributed_task']['distributed_task_name']),
|
||||
'self')
|
||||
|
||||
self['distributed_task'] = {
|
||||
health_map['distributed_task']['distributed_task_name']:
|
||||
health_distributed_task}
|
||||
|
||||
self['providers'] = {}
|
||||
for provider in health_map['providers']:
|
||||
health_provider = collections.OrderedDict()
|
||||
|
|
|
@ -44,6 +44,7 @@ class BaseFunctionalTest(base.TestCase):
|
|||
b_obj.distributed_task.job_board = mock.Mock()
|
||||
b_obj.distributed_task.job_board.return_value = (
|
||||
mock_persistence.copy())
|
||||
b_obj.distributed_task.is_alive = mock.Mock(return_value=True)
|
||||
# Note(tonytan4ever):Need this hack to preserve mockdb storage
|
||||
# controller's service cache
|
||||
b_obj.manager.ssl_certificate_controller.storage_controller = (
|
||||
|
|
|
@ -81,3 +81,25 @@ class HealthControllerTest(base.FunctionalTest):
|
|||
headers={'X-Project-ID': self.project_id},
|
||||
expect_errors=True)
|
||||
self.assertEqual(404, response.status_code)
|
||||
|
||||
@mock.patch('requests.get')
|
||||
def test_health_distributed_task(self, mock_requests):
|
||||
response_object = util.dict2obj(
|
||||
{'content': '', 'status_code': 200})
|
||||
mock_requests.return_value = response_object
|
||||
|
||||
response = self.app.get('/v1.0/health',
|
||||
headers={'X-Project-ID': self.project_id})
|
||||
for name in response.json['distributed_task']:
|
||||
endpoint = '/v1.0/health/distributed_task/{0}'.format(
|
||||
name)
|
||||
response = self.app.get(endpoint,
|
||||
headers={'X-Project-ID': self.project_id})
|
||||
self.assertEqual(200, response.status_code)
|
||||
self.assertIn('true', str(response.body))
|
||||
|
||||
def test_get_unknown_distributed_task(self):
|
||||
response = self.app.get('/v1.0/health/distributed_task/unknown',
|
||||
headers={'X-Project-ID': self.project_id},
|
||||
expect_errors=True)
|
||||
self.assertEqual(404, response.status_code)
|
||||
|
|
|
@ -15,6 +15,8 @@
|
|||
|
||||
"""Unittests for TaskFlow distributed_task driver implementation."""
|
||||
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from poppy.distributed_task.taskflow import driver
|
||||
|
@ -37,7 +39,10 @@ class TestDriver(base.TestCase):
|
|||
self.assertEqual('TaskFlow', self.distributed_task_driver.vendor_name)
|
||||
|
||||
def test_is_alive(self):
|
||||
self.assertEqual(True, self.distributed_task_driver.is_alive())
|
||||
with mock.patch.object(driver.TaskFlowDistributedTaskDriver,
|
||||
'is_alive') as mock_alive:
|
||||
mock_alive.return_value = True
|
||||
self.assertEqual(True, self.distributed_task_driver.is_alive())
|
||||
|
||||
def test_service_contoller(self):
|
||||
self.assertTrue(self.distributed_task_driver.services_controller
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
"healthy": {
|
||||
"dns": {"dns_name": "rackspace_cloud_dns", "is_alive": "True"},
|
||||
"storage": {"storage_name": "cassandra", "is_alive": "True"},
|
||||
"providers": [{"provider_name": "fastly", "is_alive": "True"}]
|
||||
"providers": [{"provider_name": "fastly", "is_alive": "True"}],
|
||||
"distributed_task": {"distributed_task_name": "taskflow", "is_alive": "True"}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
{
|
||||
"provider_not_available":{
|
||||
"dns": {"dns_name": "rackspace_cloud_dns", "is_alive": "True"},
|
||||
"storage": {"storage_name": "cassandra", "is_alive": "True"},
|
||||
"providers": [{"provider_name": "fastly", "is_alive": "True"}],
|
||||
"distributed_task": {"distributed_task_name": "taskflow", "is_alive": ""}
|
||||
}
|
||||
}
|
|
@ -2,6 +2,7 @@
|
|||
"provider_not_available":{
|
||||
"dns": {"dns_name": "rackspace_cloud_dns", "is_alive": ""},
|
||||
"storage": {"storage_name": "cassandra", "is_alive": "True"},
|
||||
"providers": [{"provider_name": "fastly", "is_alive": "True"}]
|
||||
"providers": [{"provider_name": "fastly", "is_alive": "True"}],
|
||||
"distributed_task": {"distributed_task_name": "taskflow", "is_alive": "False"}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
"provider_not_available":{
|
||||
"dns": {"dns_name": "rackspace_cloud_dns", "is_alive": "True"},
|
||||
"storage": {"storage_name": "cassandra", "is_alive": "True"},
|
||||
"providers": [{"provider_name": "fastly", "is_alive": ""}]
|
||||
"providers": [{"provider_name": "fastly", "is_alive": ""}],
|
||||
"distributed_task": {"distributed_task_name": "taskflow", "is_alive": "True"}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
"storage_not_available" : {
|
||||
"dns": {"dns_name": "rackspace_cloud_dns", "is_alive": 1},
|
||||
"storage": {"storage_name": "cassandra", "is_alive": 0},
|
||||
"providers": [{"provider_name": "fastly", "is_alive": 1}]
|
||||
"providers": [{"provider_name": "fastly", "is_alive": 1}],
|
||||
"distributed_task": {"distributed_task_name": "taskflow", "is_alive": 1}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,6 +79,11 @@ class TestHealthModel(base.TestCase):
|
|||
dns_name = health_map['dns']['dns_name']
|
||||
self.assertEqual('true',
|
||||
health_model['dns'][dns_name]['online'])
|
||||
distributed_task_name = \
|
||||
health_map['distributed_task']['distributed_task_name']
|
||||
status = \
|
||||
health_model['distributed_task'][distributed_task_name]['online']
|
||||
self.assertEqual('true', status)
|
||||
|
||||
@ddt.file_data('health_map_dns_not_available.json')
|
||||
def test_health_dns_not_available(self, health_map):
|
||||
|
@ -107,3 +112,11 @@ class TestHealthModel(base.TestCase):
|
|||
self.assertEqual('true', provider_model['online'])
|
||||
else:
|
||||
self.assertEqual('false', provider_model['online'])
|
||||
|
||||
@ddt.file_data('health_map_distributed_task_not_available.json')
|
||||
def test_health_distributed_task_not_available(self, health_map):
|
||||
health_model = health.HealthModel(self.mock_controller, health_map)
|
||||
distributed_task = health_map['distributed_task']
|
||||
distributed_task_name = distributed_task['distributed_task_name']
|
||||
status = health_model['distributed_task'][distributed_task_name]
|
||||
self.assertEqual('false', status['online'])
|
||||
|
|
Loading…
Reference in New Issue