From b4cebeacca3ec9586336c752adbe8fafa6e7760b Mon Sep 17 00:00:00 2001 From: David Shrewsbury Date: Wed, 14 Aug 2013 15:32:07 +0000 Subject: [PATCH] [WORKER] Active monitoring support Support active (HTTP) monitoring in addition to passive (CONNECT) tcp monitoring. If no monitoring for a load balancer is defined, we always use a simple tcp-only monitor. Change-Id: Ib2d4e87097069a4fec5c307bf2718ec1954ebf10 --- libra/tests/test_worker_controller.py | 173 +++++++++++++++++++++++++ libra/worker/controller.py | 30 +++++ libra/worker/drivers/base.py | 24 ++++ libra/worker/drivers/haproxy/driver.py | 81 +++++++++++- 4 files changed, 301 insertions(+), 7 deletions(-) diff --git a/libra/tests/test_worker_controller.py b/libra/tests/test_worker_controller.py index dfc4f24e..f79255e6 100644 --- a/libra/tests/test_worker_controller.py +++ b/libra/tests/test_worker_controller.py @@ -169,6 +169,179 @@ class TestWorkerController(testtools.TestCase): msg = response['badRequest']['validationErrors']['message'] self.assertEquals(msg, "Missing required 'protocol' value.") + def testUpdateGoodMonitor(self): + msg = { + c.ACTION_FIELD: 'UPDATE', + c.LBLIST_FIELD: [ + { + 'protocol': 'http', + 'nodes': [ + { + 'id': 1234, + 'address': '10.0.0.1', + 'port': 80 + } + ], + 'monitor': + { + 'type': 'CONNECT', + 'delay': 60, + 'timeout': 30, + 'attempts': 1, + 'path': '/healthcheck' + } + } + ] + } + controller = c(self.logger, self.driver, msg) + response = controller.run() + self.assertNotIn('badRequest', response) + self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS) + + def testUpdateMonitorMissingType(self): + msg = { + c.ACTION_FIELD: 'UPDATE', + c.LBLIST_FIELD: [ + { + 'protocol': 'http', + 'nodes': [ + { + 'id': 1234, + 'address': '10.0.0.1', + 'port': 80 + } + ], + 'monitor': + { + 'delay': 60, + 'timeout': 30, + 'attempts': 1, + 'path': '/healthcheck' + } + } + ] + } + controller = c(self.logger, self.driver, msg) + response = controller.run() + self.assertIn('badRequest', response) + msg = response['badRequest']['validationErrors']['message'] + self.assertEquals(msg, "Missing monitor value 'type'") + + def testUpdateMonitorMissingDelay(self): + msg = { + c.ACTION_FIELD: 'UPDATE', + c.LBLIST_FIELD: [ + { + 'protocol': 'http', + 'nodes': [ + { + 'id': 1234, + 'address': '10.0.0.1', + 'port': 80 + } + ], + 'monitor': + { + 'type': 'CONNECT', + 'timeout': 30, + 'attempts': 1, + 'path': '/healthcheck' + } + } + ] + } + controller = c(self.logger, self.driver, msg) + response = controller.run() + self.assertIn('badRequest', response) + msg = response['badRequest']['validationErrors']['message'] + self.assertEquals(msg, "Missing monitor value 'delay'") + + def testUpdateMonitorMissingTimeout(self): + msg = { + c.ACTION_FIELD: 'UPDATE', + c.LBLIST_FIELD: [ + { + 'protocol': 'http', + 'nodes': [ + { + 'id': 1234, + 'address': '10.0.0.1', + 'port': 80 + } + ], + 'monitor': + { + 'type': 'CONNECT', + 'delay': 60, + 'attempts': 1, + 'path': '/healthcheck' + } + } + ] + } + controller = c(self.logger, self.driver, msg) + response = controller.run() + self.assertIn('badRequest', response) + msg = response['badRequest']['validationErrors']['message'] + self.assertEquals(msg, "Missing monitor value 'timeout'") + + def testUpdateMonitorMissingAttempts(self): + msg = { + c.ACTION_FIELD: 'UPDATE', + c.LBLIST_FIELD: [ + { + 'protocol': 'http', + 'nodes': [ + { + 'id': 1234, + 'address': '10.0.0.1', + 'port': 80 + } + ], + 'monitor': + { + 'type': 'CONNECT', + 'delay': 60, + 'timeout': 30, + 'path': '/healthcheck' + } + } + ] + } + controller = c(self.logger, self.driver, msg) + response = controller.run() + self.assertIn('badRequest', response) + msg = response['badRequest']['validationErrors']['message'] + self.assertEquals(msg, "Missing monitor value 'attempts'") + + def testUpdateMonitorMissingPath(self): + msg = { + c.ACTION_FIELD: 'UPDATE', + c.LBLIST_FIELD: [ + { + 'protocol': 'http', + 'nodes': [ + { + 'id': 1234, + 'address': '10.0.0.1', + 'port': 80 + } + ], + 'monitor': + { + 'type': 'CONNECT', + 'delay': 60, + 'timeout': 30, + 'attempts': 1 + } + } + ] + } + controller = c(self.logger, self.driver, msg) + response = controller.run() + self.assertIn(c.RESPONSE_FIELD, response) + self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS) + def testBadAlgorithm(self): msg = { c.ACTION_FIELD: 'UPDATE', diff --git a/libra/worker/controller.py b/libra/worker/controller.py index d2c7078e..f7d154db 100644 --- a/libra/worker/controller.py +++ b/libra/worker/controller.py @@ -171,6 +171,36 @@ class LBaaSController(object): self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE return self.msg + if 'monitor' in current_lb: + monitor = current_lb['monitor'] + for opt in ['type', 'delay', 'timeout', 'attempts']: + if opt not in monitor: + return BadRequest("Missing monitor value '%s'" % + opt).to_json() + if 'path' not in monitor: + monitor['path'] = '/' + + try: + self.driver.add_monitor(current_lb['protocol'], + monitor['type'], + monitor['delay'], + monitor['timeout'], + monitor['attempts'], + monitor['path']) + except NotImplementedError: + self.logger.error( + "Selected driver does not support adding healthchecks." + ) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + except Exception as e: + self.logger.error( + "Selected driver failed adding healthchecks: %s, %s" % + (e.__class__, e) + ) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + for lb_node in current_lb['nodes']: port, address, node_id, weight = None, None, None, None diff --git a/libra/worker/drivers/base.py b/libra/worker/drivers/base.py index 154c086f..754ebbb2 100644 --- a/libra/worker/drivers/base.py +++ b/libra/worker/drivers/base.py @@ -56,6 +56,30 @@ class LoadBalancerDriver(object): """ Set the algorithm used by the load balancer for this protocol. """ raise NotImplementedError() + def add_monitor(self, protocol, mtype, delay, timeout, attempts, path): + """ + Add a health check monitor for this protocol. + + protocol + Protocol of the load balancer (HTTP, TCP) + mtype + Monitor type (CONNECT, HTTP) + delay + Minimum time in seconds between regular calls to a monitor. + timeout + Maximum number of seconds for a monitor to wait for a connection + to be established to the node before it times out. The value must + be less than the delay value. + attempts + Number of permissible monitor failures before removing a node from + rotation. + path + The HTTP path used in the HTTP request by the monitor. This must + be a string beginning with a / (forward slash). The monitor + expects a response from the node with an HTTP status code of 200. + """ + raise NotImplementedError() + def create(self): """ Create the load balancer. """ raise NotImplementedError() diff --git a/libra/worker/drivers/haproxy/driver.py b/libra/worker/drivers/haproxy/driver.py index 12024c02..84e4a93a 100644 --- a/libra/worker/drivers/haproxy/driver.py +++ b/libra/worker/drivers/haproxy/driver.py @@ -102,26 +102,51 @@ class HAProxyDriver(LoadBalancerDriver): #------------------------ # Backend configuration #------------------------ + output.append('backend %s-servers' % proto) output.append(' mode %s' % proto) output.append(' balance %s' % protocfg['algorithm']) + # default healthcheck if none specified + monitor = 'check inter 30s' + # HTTP specific options for the backend if proto == 'http': output.append(' cookie SERVERID insert indirect') output.append(' option httpclose') output.append(' option forwardfor') + if 'monitor' in self._config[proto]: + mon = self._config[proto]['monitor'] + if mon['type'] == 'http': + output.append(' option httpchk %s' % mon['path']) + # our timeout will be connect + read time + output.append(' timeout check %ds' % mon['timeout']) + # intentionally set rise/fall to the same value + monitor = "check inter %ds rise %d fall %d" % ( + mon['delay'], mon['attempts'], mon['attempts']) + for (node_id, addr, port, weight) in protocfg['servers']: - output.append(' server id-%s %s:%s check ' - 'inter 30000 cookie id-%s weight %d' % - (node_id, addr, port, node_id, weight)) - # HTTPS (TCP) specific options for the backend + output.append(' server id-%s %s:%s cookie id-%s ' + 'weight %d %s' % + (node_id, addr, port, node_id, + weight, monitor)) + + # TCP specific options for the backend else: + if 'monitor' in self._config[proto]: + mon = self._config[proto]['monitor'] + if mon['type'] == 'http': + output.append(' option httpchk %s' % mon['path']) + # our timeout will be connect + read time + output.append(' timeout check %ds' % mon['timeout']) + # intentionally set rise/fall to the same value + monitor = "check inter %ds rise %d fall %d" % ( + mon['delay'], mon['attempts'], mon['attempts']) + for (node_id, addr, port, weight) in protocfg['servers']: - output.append(' server id-%s %s:%s check ' - 'inter 30000 weight %d' % - (node_id, addr, port, weight)) + output.append(' server id-%s %s:%s weight %d %s' % + (node_id, addr, port, weight, monitor)) return '\n'.join(output) + '\n' @@ -259,6 +284,48 @@ class HAProxyDriver(LoadBalancerDriver): else: raise Exception('Invalid algorithm: %s' % protocol) + def add_monitor(self, protocol, mtype, delay, timeout, attempts, path): + proto = protocol.lower() + if mtype.lower() not in ['connect', 'http']: + raise Exception('Invalid monitor type: %s' % mtype) + + # default values + if delay is None: + delay = 30 + if attempts is None: + attempts = 2 + if timeout is None: + timeout = delay + if path is None: + path = '/' + + if path[0] != '/': + path = '/' + path + + try: + delay = int(delay) + except ValueError: + raise Exception("Non-integer 'delay' value: '%s'" % delay) + + try: + timeout = int(timeout) + except ValueError: + raise Exception("Non-integer 'timeout' value: '%s'" % timeout) + + try: + attempts = int(attempts) + except ValueError: + raise Exception("Non-integer 'attempts' value: '%s'" % attempts) + + if timeout > delay: + raise Exception("Timeout cannot be greater than delay") + + self._config[proto]['monitor'] = {'type': mtype.lower(), + 'delay': delay, + 'timeout': timeout, + 'attempts': attempts, + 'path': path} + def create(self): self.ossvc.write_config(self._config_to_string()) self.ossvc.service_stop()