[WORKER] Support new GALERA protocol
Adds a new feature that allows us to setup a Galera load balancer that will allow users to treat the LB address as the single writeable database host, thus preventing deadlocks. This is discussed in more detail at this URL: http://www.severalnines.com/blog/avoiding-deadlocks-galera-set-haproxy-single-node-writes-and-multi-node-reads When the 'protocol' value is set to 'galera', we set up a TCP load balancer. All backend nodes, except for one, must be defined using the 'backup' node config option. The one NOT defined with 'backup' will act as the main database host. If it fails, it will then failover to one of the backups. We still allow the user to define their own 'algorithm', though this likely has little effect since none of the backups will have any connections, so LEAST_CONNECTIONS and ROUND_ROBIN could choose any other node. A health monitor can still be defined, although CONNECT is the only one that makes sense. This also fixes a bug where if adding a server failed, this was never reported back. Change-Id: I0a572b1930806c6e0c1b6ddb2f678fbe234ce993
This commit is contained in:
@@ -27,22 +27,50 @@ class TestHAProxyDriver(testtools.TestCase):
|
||||
self.assertEqual(self.driver._config[proto]['bind_address'], '0.0.0.0')
|
||||
self.assertEqual(self.driver._config[proto]['bind_port'], 443)
|
||||
|
||||
proto = 'galera'
|
||||
self.driver.add_protocol(proto, 3306)
|
||||
self.assertIn(proto, self.driver._config)
|
||||
self.assertEqual(self.driver._config[proto]['bind_address'], '0.0.0.0')
|
||||
self.assertEqual(self.driver._config[proto]['bind_port'], 3306)
|
||||
|
||||
proto = 'tnetennba'
|
||||
e = self.assertRaises(Exception, self.driver.add_protocol, proto, 99)
|
||||
self.assertEqual("Unsupported protocol: %s" % proto, e.message)
|
||||
|
||||
def testAddGaleraRequiresPort(self):
|
||||
e = self.assertRaises(Exception, self.driver.add_protocol, 'galera', None)
|
||||
self.assertEqual("Port is required for this protocol.", e.message)
|
||||
|
||||
def testAddTCPRequiresPort(self):
|
||||
e = self.assertRaises(Exception, self.driver.add_protocol, 'tcp', None)
|
||||
self.assertEqual("Port is required for TCP protocol.", e.message)
|
||||
self.assertEqual("Port is required for this protocol.", e.message)
|
||||
|
||||
def testAddServer(self):
|
||||
""" Test the HAProxy add_server() method """
|
||||
proto = 'http'
|
||||
self.driver.add_protocol(proto, None)
|
||||
self.driver.add_server(proto, 100, '1.2.3.4', 7777)
|
||||
self.driver.add_server(proto, 101, '5.6.7.8', 8888)
|
||||
self.driver.add_server(proto, 101, '5.6.7.8', 8888, 1, True)
|
||||
self.driver.add_server(proto, 102, '2.3.4.5', 9999,
|
||||
weight=2, backup=True)
|
||||
self.assertIn(proto, self.driver._config)
|
||||
self.assertIn('servers', self.driver._config[proto])
|
||||
servers = self.driver._config[proto]['servers']
|
||||
self.assertEqual(len(servers), 2)
|
||||
self.assertEqual(servers[0], (100, '1.2.3.4', 7777, 1))
|
||||
self.assertEqual(servers[1], (101, '5.6.7.8', 8888, 1))
|
||||
self.assertEqual(len(servers), 3)
|
||||
self.assertEqual(servers[0], (100, '1.2.3.4', 7777, 1, False))
|
||||
self.assertEqual(servers[1], (101, '5.6.7.8', 8888, 1, True))
|
||||
self.assertEqual(servers[2], (102, '2.3.4.5', 9999, 2, True))
|
||||
|
||||
def testAddServerMultipleGaleraPrimaries(self):
|
||||
proto = 'galera'
|
||||
self.driver.add_protocol(proto, 33306)
|
||||
self.driver.add_server(proto, 100, '1.2.3.4', 3306, backup=False)
|
||||
self.driver.add_server(proto, 101, '1.2.3.5', 3306, backup=True)
|
||||
e = self.assertRaises(Exception, self.driver.add_server,
|
||||
proto, 101, '1.2.3.6', 3306, backup=False)
|
||||
self.assertEqual(
|
||||
"Galera protocol does not accept more than one non-backup node",
|
||||
e.message)
|
||||
|
||||
def testSetAlgorithm(self):
|
||||
""" Test the HAProxy set_algorithm() method """
|
||||
@@ -64,7 +92,7 @@ class TestHAProxyDriver(testtools.TestCase):
|
||||
self.driver.add_server(proto, 100, '1.2.3.4', 7777, 10)
|
||||
servers = self.driver._config[proto]['servers']
|
||||
self.assertEqual(len(servers), 1)
|
||||
self.assertEqual(servers[0], (100, '1.2.3.4', 7777, 10))
|
||||
self.assertEqual(servers[0], (100, '1.2.3.4', 7777, 10, False))
|
||||
|
||||
def testServerWeightStr(self):
|
||||
""" Test setting string server weights """
|
||||
@@ -73,7 +101,7 @@ class TestHAProxyDriver(testtools.TestCase):
|
||||
self.driver.add_server(proto, 100, '1.2.3.4', 7777, "20")
|
||||
servers = self.driver._config[proto]['servers']
|
||||
self.assertEqual(len(servers), 1)
|
||||
self.assertEqual(servers[0], (100, '1.2.3.4', 7777, 20))
|
||||
self.assertEqual(servers[0], (100, '1.2.3.4', 7777, 20, False))
|
||||
|
||||
def testServerWeightInvalid(self):
|
||||
""" Test setting string server weights """
|
||||
|
||||
@@ -202,7 +202,11 @@ class LBaaSController(object):
|
||||
return self.msg
|
||||
|
||||
for lb_node in current_lb['nodes']:
|
||||
port, address, node_id, weight = None, None, None, None
|
||||
port = None
|
||||
address = None
|
||||
node_id = None
|
||||
weight = None
|
||||
backup = False
|
||||
|
||||
if 'port' in lb_node:
|
||||
port = lb_node['port']
|
||||
@@ -222,21 +226,30 @@ class LBaaSController(object):
|
||||
if 'weight' in lb_node:
|
||||
weight = lb_node['weight']
|
||||
|
||||
if 'backup' in lb_node and lb_node['backup'].lower() == 'true':
|
||||
backup = True
|
||||
|
||||
try:
|
||||
self.driver.add_server(current_lb['protocol'],
|
||||
node_id,
|
||||
address,
|
||||
port,
|
||||
weight)
|
||||
weight,
|
||||
backup)
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
"Selected driver does not support adding a server."
|
||||
)
|
||||
lb_node['condition'] = self.NODE_ERR
|
||||
error = "Selected driver does not support adding a server"
|
||||
self.logger.error(error)
|
||||
self.msg[self.ERROR_FIELD] = error
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
except Exception as e:
|
||||
self.logger.error("Failure trying adding server: %s, %s" %
|
||||
(e.__class__, e))
|
||||
lb_node['condition'] = self.NODE_ERR
|
||||
error = "Failure adding server %s: %s" % (node_id, e)
|
||||
self.logger.error(error)
|
||||
self.msg[self.ERROR_FIELD] = error
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
else:
|
||||
self.logger.debug("Added server: %s:%s" % (address, port))
|
||||
lb_node['condition'] = self.NODE_OK
|
||||
|
||||
@@ -48,7 +48,7 @@ class LoadBalancerDriver(object):
|
||||
""" Add a supported protocol and listening port for the instance. """
|
||||
raise NotImplementedError()
|
||||
|
||||
def add_server(self, protocol, host, port, weight):
|
||||
def add_server(self, protocol, host, port, weight, backup):
|
||||
""" Add a server for the protocol for which we will proxy. """
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@@ -82,36 +82,39 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
|
||||
for proto in self._config:
|
||||
protocfg = self._config[proto]
|
||||
real_proto = proto
|
||||
if proto == 'galera':
|
||||
real_proto = 'tcp'
|
||||
|
||||
#------------------------
|
||||
# Frontend configuration
|
||||
#------------------------
|
||||
output.append('frontend %s-in' % proto)
|
||||
output.append(' mode %s' % proto)
|
||||
output.append('frontend %s-in' % real_proto)
|
||||
output.append(' mode %s' % real_proto)
|
||||
output.append(' bind %s:%s' % (protocfg['bind_address'],
|
||||
protocfg['bind_port']))
|
||||
output.append(' default_backend %s-servers' % proto)
|
||||
output.append(' default_backend %s-servers' % real_proto)
|
||||
|
||||
# HTTP specific options for the frontend
|
||||
if proto == 'http':
|
||||
if real_proto == 'http':
|
||||
output.append(' option httplog')
|
||||
# TCP specific options for the frontend
|
||||
elif proto == 'tcp':
|
||||
elif real_proto == 'tcp':
|
||||
output.append(' option tcplog')
|
||||
|
||||
#------------------------
|
||||
# Backend configuration
|
||||
#------------------------
|
||||
|
||||
output.append('backend %s-servers' % proto)
|
||||
output.append(' mode %s' % proto)
|
||||
output.append('backend %s-servers' % real_proto)
|
||||
output.append(' mode %s' % real_proto)
|
||||
output.append(' balance %s' % protocfg['algorithm'])
|
||||
|
||||
# default healthcheck if none specified
|
||||
monitor = 'check inter 30s'
|
||||
|
||||
# HTTP specific options for the backend
|
||||
if proto == 'http':
|
||||
if real_proto == 'http':
|
||||
output.append(' cookie SERVERID insert indirect')
|
||||
output.append(' option httpclose')
|
||||
output.append(' option forwardfor')
|
||||
@@ -127,19 +130,40 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
monitor = "check inter %ds rise %d fall %d" % (
|
||||
mon['delay'], mon['attempts'], mon['attempts'])
|
||||
|
||||
for (node_id, addr, port, weight) in protocfg['servers']:
|
||||
output.append(' server id-%s %s:%s cookie id-%s '
|
||||
'weight %d %s' %
|
||||
(node_id, addr, port, node_id,
|
||||
weight, monitor))
|
||||
for (node_id, addr, port, wt, bkup) in protocfg['servers']:
|
||||
if bkup:
|
||||
output.append(
|
||||
' server id-%s %s:%s backup cookie id-%s'
|
||||
' weight %d %s' %
|
||||
(node_id, addr, port, node_id, wt, monitor)
|
||||
)
|
||||
else:
|
||||
output.append(
|
||||
' server id-%s %s:%s cookie id-%s'
|
||||
' weight %d %s' %
|
||||
(node_id, addr, port, node_id, wt, monitor)
|
||||
)
|
||||
|
||||
# TCP or Galera specific options for the backend
|
||||
#
|
||||
# The Galera protocol is a convenience option that lets us set
|
||||
# our TCP options specifically for load balancing between Galera
|
||||
# database nodes in a manner that helps avoid deadlocks. A main
|
||||
# node is chosen which will act as the 'write' node, sending all
|
||||
# updates to this one node.
|
||||
|
||||
# TCP specific options for the backend
|
||||
else:
|
||||
# Allow session stickiness for TCP connections. The 'size'
|
||||
# value affects memory usage (about 50 bytes per entry).
|
||||
output.append(' stick-table type ip size 200k expire 30m')
|
||||
output.append(' stick store-request src')
|
||||
output.append(' stick match src')
|
||||
|
||||
# No stick table for Galera protocol since we want to return to
|
||||
# the main backend node once it is available after being down.
|
||||
if proto == 'tcp':
|
||||
# Allow session stickiness for TCP connections. The 'size'
|
||||
# value affects memory usage (about 50 bytes per entry).
|
||||
output.append(
|
||||
' stick-table type ip size 200k expire 30m'
|
||||
)
|
||||
output.append(' stick store-request src')
|
||||
output.append(' stick match src')
|
||||
|
||||
if 'monitor' in self._config[proto]:
|
||||
mon = self._config[proto]['monitor']
|
||||
@@ -152,9 +176,17 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
monitor = "check inter %ds rise %d fall %d" % (
|
||||
mon['delay'], mon['attempts'], mon['attempts'])
|
||||
|
||||
for (node_id, addr, port, weight) in protocfg['servers']:
|
||||
output.append(' server id-%s %s:%s weight %d %s' %
|
||||
(node_id, addr, port, weight, monitor))
|
||||
for (node_id, addr, port, wt, bkup) in protocfg['servers']:
|
||||
if bkup:
|
||||
output.append(
|
||||
' server id-%s %s:%s backup weight %d %s' %
|
||||
(node_id, addr, port, wt, monitor)
|
||||
)
|
||||
else:
|
||||
output.append(
|
||||
' server id-%s %s:%s weight %d %s' %
|
||||
(node_id, addr, port, wt, monitor)
|
||||
)
|
||||
|
||||
return '\n'.join(output) + '\n'
|
||||
|
||||
@@ -251,7 +283,7 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
|
||||
def add_protocol(self, protocol, port=None):
|
||||
proto = protocol.lower()
|
||||
if proto not in ('tcp', 'http', 'health'):
|
||||
if proto not in ('tcp', 'http', 'galera'):
|
||||
raise Exception("Unsupported protocol: %s" % protocol)
|
||||
if proto in self._config:
|
||||
raise Exception("Protocol '%s' is already defined." % protocol)
|
||||
@@ -259,14 +291,15 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
self._config[proto] = dict()
|
||||
|
||||
if port is None:
|
||||
if proto == 'tcp':
|
||||
raise Exception('Port is required for TCP protocol.')
|
||||
if proto in ('tcp', 'galera'):
|
||||
raise Exception('Port is required for this protocol.')
|
||||
elif proto == 'http':
|
||||
self._bind(proto, '0.0.0.0', 80)
|
||||
else:
|
||||
self._bind(proto, '0.0.0.0', port)
|
||||
|
||||
def add_server(self, protocol, node_id, host, port, weight=1):
|
||||
def add_server(self, protocol, node_id, host, port,
|
||||
weight=1, backup=False):
|
||||
proto = protocol.lower()
|
||||
if weight is None:
|
||||
weight = 1
|
||||
@@ -281,7 +314,15 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
|
||||
if 'servers' not in self._config[proto]:
|
||||
self._config[proto]['servers'] = []
|
||||
self._config[proto]['servers'].append((node_id, host, port, weight))
|
||||
|
||||
if proto == 'galera':
|
||||
for (n, h, p, w, b) in self._config[proto]['servers']:
|
||||
if b is False and backup is False:
|
||||
raise Exception("Galera protocol does not accept more"
|
||||
" than one non-backup node")
|
||||
|
||||
self._config[proto]['servers'].append((node_id, host, port,
|
||||
weight, backup))
|
||||
|
||||
def set_algorithm(self, protocol, algo):
|
||||
proto = protocol.lower()
|
||||
|
||||
Reference in New Issue
Block a user