[WORKER] Active monitoring support
Support active (HTTP) monitoring in addition to passive (CONNECT) tcp monitoring. If no monitoring for a load balancer is defined, we always use a simple tcp-only monitor. Change-Id: Ib2d4e87097069a4fec5c307bf2718ec1954ebf10
This commit is contained in:
@@ -169,6 +169,179 @@ class TestWorkerController(testtools.TestCase):
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
self.assertEquals(msg, "Missing required 'protocol' value.")
|
||||
|
||||
def testUpdateGoodMonitor(self):
|
||||
msg = {
|
||||
c.ACTION_FIELD: 'UPDATE',
|
||||
c.LBLIST_FIELD: [
|
||||
{
|
||||
'protocol': 'http',
|
||||
'nodes': [
|
||||
{
|
||||
'id': 1234,
|
||||
'address': '10.0.0.1',
|
||||
'port': 80
|
||||
}
|
||||
],
|
||||
'monitor':
|
||||
{
|
||||
'type': 'CONNECT',
|
||||
'delay': 60,
|
||||
'timeout': 30,
|
||||
'attempts': 1,
|
||||
'path': '/healthcheck'
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertNotIn('badRequest', response)
|
||||
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
|
||||
|
||||
def testUpdateMonitorMissingType(self):
|
||||
msg = {
|
||||
c.ACTION_FIELD: 'UPDATE',
|
||||
c.LBLIST_FIELD: [
|
||||
{
|
||||
'protocol': 'http',
|
||||
'nodes': [
|
||||
{
|
||||
'id': 1234,
|
||||
'address': '10.0.0.1',
|
||||
'port': 80
|
||||
}
|
||||
],
|
||||
'monitor':
|
||||
{
|
||||
'delay': 60,
|
||||
'timeout': 30,
|
||||
'attempts': 1,
|
||||
'path': '/healthcheck'
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
self.assertEquals(msg, "Missing monitor value 'type'")
|
||||
|
||||
def testUpdateMonitorMissingDelay(self):
|
||||
msg = {
|
||||
c.ACTION_FIELD: 'UPDATE',
|
||||
c.LBLIST_FIELD: [
|
||||
{
|
||||
'protocol': 'http',
|
||||
'nodes': [
|
||||
{
|
||||
'id': 1234,
|
||||
'address': '10.0.0.1',
|
||||
'port': 80
|
||||
}
|
||||
],
|
||||
'monitor':
|
||||
{
|
||||
'type': 'CONNECT',
|
||||
'timeout': 30,
|
||||
'attempts': 1,
|
||||
'path': '/healthcheck'
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
self.assertEquals(msg, "Missing monitor value 'delay'")
|
||||
|
||||
def testUpdateMonitorMissingTimeout(self):
|
||||
msg = {
|
||||
c.ACTION_FIELD: 'UPDATE',
|
||||
c.LBLIST_FIELD: [
|
||||
{
|
||||
'protocol': 'http',
|
||||
'nodes': [
|
||||
{
|
||||
'id': 1234,
|
||||
'address': '10.0.0.1',
|
||||
'port': 80
|
||||
}
|
||||
],
|
||||
'monitor':
|
||||
{
|
||||
'type': 'CONNECT',
|
||||
'delay': 60,
|
||||
'attempts': 1,
|
||||
'path': '/healthcheck'
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
self.assertEquals(msg, "Missing monitor value 'timeout'")
|
||||
|
||||
def testUpdateMonitorMissingAttempts(self):
|
||||
msg = {
|
||||
c.ACTION_FIELD: 'UPDATE',
|
||||
c.LBLIST_FIELD: [
|
||||
{
|
||||
'protocol': 'http',
|
||||
'nodes': [
|
||||
{
|
||||
'id': 1234,
|
||||
'address': '10.0.0.1',
|
||||
'port': 80
|
||||
}
|
||||
],
|
||||
'monitor':
|
||||
{
|
||||
'type': 'CONNECT',
|
||||
'delay': 60,
|
||||
'timeout': 30,
|
||||
'path': '/healthcheck'
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
self.assertEquals(msg, "Missing monitor value 'attempts'")
|
||||
|
||||
def testUpdateMonitorMissingPath(self):
|
||||
msg = {
|
||||
c.ACTION_FIELD: 'UPDATE',
|
||||
c.LBLIST_FIELD: [
|
||||
{
|
||||
'protocol': 'http',
|
||||
'nodes': [
|
||||
{
|
||||
'id': 1234,
|
||||
'address': '10.0.0.1',
|
||||
'port': 80
|
||||
}
|
||||
],
|
||||
'monitor':
|
||||
{
|
||||
'type': 'CONNECT',
|
||||
'delay': 60,
|
||||
'timeout': 30,
|
||||
'attempts': 1
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn(c.RESPONSE_FIELD, response)
|
||||
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
|
||||
|
||||
def testBadAlgorithm(self):
|
||||
msg = {
|
||||
c.ACTION_FIELD: 'UPDATE',
|
||||
|
||||
@@ -171,6 +171,36 @@ class LBaaSController(object):
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
if 'monitor' in current_lb:
|
||||
monitor = current_lb['monitor']
|
||||
for opt in ['type', 'delay', 'timeout', 'attempts']:
|
||||
if opt not in monitor:
|
||||
return BadRequest("Missing monitor value '%s'" %
|
||||
opt).to_json()
|
||||
if 'path' not in monitor:
|
||||
monitor['path'] = '/'
|
||||
|
||||
try:
|
||||
self.driver.add_monitor(current_lb['protocol'],
|
||||
monitor['type'],
|
||||
monitor['delay'],
|
||||
monitor['timeout'],
|
||||
monitor['attempts'],
|
||||
monitor['path'])
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
"Selected driver does not support adding healthchecks."
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
"Selected driver failed adding healthchecks: %s, %s" %
|
||||
(e.__class__, e)
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
for lb_node in current_lb['nodes']:
|
||||
port, address, node_id, weight = None, None, None, None
|
||||
|
||||
|
||||
@@ -56,6 +56,30 @@ class LoadBalancerDriver(object):
|
||||
""" Set the algorithm used by the load balancer for this protocol. """
|
||||
raise NotImplementedError()
|
||||
|
||||
def add_monitor(self, protocol, mtype, delay, timeout, attempts, path):
|
||||
"""
|
||||
Add a health check monitor for this protocol.
|
||||
|
||||
protocol
|
||||
Protocol of the load balancer (HTTP, TCP)
|
||||
mtype
|
||||
Monitor type (CONNECT, HTTP)
|
||||
delay
|
||||
Minimum time in seconds between regular calls to a monitor.
|
||||
timeout
|
||||
Maximum number of seconds for a monitor to wait for a connection
|
||||
to be established to the node before it times out. The value must
|
||||
be less than the delay value.
|
||||
attempts
|
||||
Number of permissible monitor failures before removing a node from
|
||||
rotation.
|
||||
path
|
||||
The HTTP path used in the HTTP request by the monitor. This must
|
||||
be a string beginning with a / (forward slash). The monitor
|
||||
expects a response from the node with an HTTP status code of 200.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def create(self):
|
||||
""" Create the load balancer. """
|
||||
raise NotImplementedError()
|
||||
|
||||
@@ -102,26 +102,51 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
#------------------------
|
||||
# Backend configuration
|
||||
#------------------------
|
||||
|
||||
output.append('backend %s-servers' % proto)
|
||||
output.append(' mode %s' % proto)
|
||||
output.append(' balance %s' % protocfg['algorithm'])
|
||||
|
||||
# default healthcheck if none specified
|
||||
monitor = 'check inter 30s'
|
||||
|
||||
# HTTP specific options for the backend
|
||||
if proto == 'http':
|
||||
output.append(' cookie SERVERID insert indirect')
|
||||
output.append(' option httpclose')
|
||||
output.append(' option forwardfor')
|
||||
|
||||
if 'monitor' in self._config[proto]:
|
||||
mon = self._config[proto]['monitor']
|
||||
if mon['type'] == 'http':
|
||||
output.append(' option httpchk %s' % mon['path'])
|
||||
# our timeout will be connect + read time
|
||||
output.append(' timeout check %ds' % mon['timeout'])
|
||||
# intentionally set rise/fall to the same value
|
||||
monitor = "check inter %ds rise %d fall %d" % (
|
||||
mon['delay'], mon['attempts'], mon['attempts'])
|
||||
|
||||
for (node_id, addr, port, weight) in protocfg['servers']:
|
||||
output.append(' server id-%s %s:%s check '
|
||||
'inter 30000 cookie id-%s weight %d' %
|
||||
(node_id, addr, port, node_id, weight))
|
||||
# HTTPS (TCP) specific options for the backend
|
||||
output.append(' server id-%s %s:%s cookie id-%s '
|
||||
'weight %d %s' %
|
||||
(node_id, addr, port, node_id,
|
||||
weight, monitor))
|
||||
|
||||
# TCP specific options for the backend
|
||||
else:
|
||||
if 'monitor' in self._config[proto]:
|
||||
mon = self._config[proto]['monitor']
|
||||
if mon['type'] == 'http':
|
||||
output.append(' option httpchk %s' % mon['path'])
|
||||
# our timeout will be connect + read time
|
||||
output.append(' timeout check %ds' % mon['timeout'])
|
||||
# intentionally set rise/fall to the same value
|
||||
monitor = "check inter %ds rise %d fall %d" % (
|
||||
mon['delay'], mon['attempts'], mon['attempts'])
|
||||
|
||||
for (node_id, addr, port, weight) in protocfg['servers']:
|
||||
output.append(' server id-%s %s:%s check '
|
||||
'inter 30000 weight %d' %
|
||||
(node_id, addr, port, weight))
|
||||
output.append(' server id-%s %s:%s weight %d %s' %
|
||||
(node_id, addr, port, weight, monitor))
|
||||
|
||||
return '\n'.join(output) + '\n'
|
||||
|
||||
@@ -259,6 +284,48 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
else:
|
||||
raise Exception('Invalid algorithm: %s' % protocol)
|
||||
|
||||
def add_monitor(self, protocol, mtype, delay, timeout, attempts, path):
|
||||
proto = protocol.lower()
|
||||
if mtype.lower() not in ['connect', 'http']:
|
||||
raise Exception('Invalid monitor type: %s' % mtype)
|
||||
|
||||
# default values
|
||||
if delay is None:
|
||||
delay = 30
|
||||
if attempts is None:
|
||||
attempts = 2
|
||||
if timeout is None:
|
||||
timeout = delay
|
||||
if path is None:
|
||||
path = '/'
|
||||
|
||||
if path[0] != '/':
|
||||
path = '/' + path
|
||||
|
||||
try:
|
||||
delay = int(delay)
|
||||
except ValueError:
|
||||
raise Exception("Non-integer 'delay' value: '%s'" % delay)
|
||||
|
||||
try:
|
||||
timeout = int(timeout)
|
||||
except ValueError:
|
||||
raise Exception("Non-integer 'timeout' value: '%s'" % timeout)
|
||||
|
||||
try:
|
||||
attempts = int(attempts)
|
||||
except ValueError:
|
||||
raise Exception("Non-integer 'attempts' value: '%s'" % attempts)
|
||||
|
||||
if timeout > delay:
|
||||
raise Exception("Timeout cannot be greater than delay")
|
||||
|
||||
self._config[proto]['monitor'] = {'type': mtype.lower(),
|
||||
'delay': delay,
|
||||
'timeout': timeout,
|
||||
'attempts': attempts,
|
||||
'path': path}
|
||||
|
||||
def create(self):
|
||||
self.ossvc.write_config(self._config_to_string())
|
||||
self.ossvc.service_stop()
|
||||
|
||||
Reference in New Issue
Block a user