Merge "[WORKER] Support new GALERA protocol"
This commit is contained in:
@@ -27,22 +27,50 @@ class TestHAProxyDriver(testtools.TestCase):
|
||||
self.assertEqual(self.driver._config[proto]['bind_address'], '0.0.0.0')
|
||||
self.assertEqual(self.driver._config[proto]['bind_port'], 443)
|
||||
|
||||
proto = 'galera'
|
||||
self.driver.add_protocol(proto, 3306)
|
||||
self.assertIn(proto, self.driver._config)
|
||||
self.assertEqual(self.driver._config[proto]['bind_address'], '0.0.0.0')
|
||||
self.assertEqual(self.driver._config[proto]['bind_port'], 3306)
|
||||
|
||||
proto = 'tnetennba'
|
||||
e = self.assertRaises(Exception, self.driver.add_protocol, proto, 99)
|
||||
self.assertEqual("Unsupported protocol: %s" % proto, e.message)
|
||||
|
||||
def testAddGaleraRequiresPort(self):
|
||||
e = self.assertRaises(Exception, self.driver.add_protocol, 'galera', None)
|
||||
self.assertEqual("Port is required for this protocol.", e.message)
|
||||
|
||||
def testAddTCPRequiresPort(self):
|
||||
e = self.assertRaises(Exception, self.driver.add_protocol, 'tcp', None)
|
||||
self.assertEqual("Port is required for TCP protocol.", e.message)
|
||||
self.assertEqual("Port is required for this protocol.", e.message)
|
||||
|
||||
def testAddServer(self):
|
||||
""" Test the HAProxy add_server() method """
|
||||
proto = 'http'
|
||||
self.driver.add_protocol(proto, None)
|
||||
self.driver.add_server(proto, 100, '1.2.3.4', 7777)
|
||||
self.driver.add_server(proto, 101, '5.6.7.8', 8888)
|
||||
self.driver.add_server(proto, 101, '5.6.7.8', 8888, 1, True)
|
||||
self.driver.add_server(proto, 102, '2.3.4.5', 9999,
|
||||
weight=2, backup=True)
|
||||
self.assertIn(proto, self.driver._config)
|
||||
self.assertIn('servers', self.driver._config[proto])
|
||||
servers = self.driver._config[proto]['servers']
|
||||
self.assertEqual(len(servers), 2)
|
||||
self.assertEqual(servers[0], (100, '1.2.3.4', 7777, 1))
|
||||
self.assertEqual(servers[1], (101, '5.6.7.8', 8888, 1))
|
||||
self.assertEqual(len(servers), 3)
|
||||
self.assertEqual(servers[0], (100, '1.2.3.4', 7777, 1, False))
|
||||
self.assertEqual(servers[1], (101, '5.6.7.8', 8888, 1, True))
|
||||
self.assertEqual(servers[2], (102, '2.3.4.5', 9999, 2, True))
|
||||
|
||||
def testAddServerMultipleGaleraPrimaries(self):
|
||||
proto = 'galera'
|
||||
self.driver.add_protocol(proto, 33306)
|
||||
self.driver.add_server(proto, 100, '1.2.3.4', 3306, backup=False)
|
||||
self.driver.add_server(proto, 101, '1.2.3.5', 3306, backup=True)
|
||||
e = self.assertRaises(Exception, self.driver.add_server,
|
||||
proto, 101, '1.2.3.6', 3306, backup=False)
|
||||
self.assertEqual(
|
||||
"Galera protocol does not accept more than one non-backup node",
|
||||
e.message)
|
||||
|
||||
def testSetAlgorithm(self):
|
||||
""" Test the HAProxy set_algorithm() method """
|
||||
@@ -64,7 +92,7 @@ class TestHAProxyDriver(testtools.TestCase):
|
||||
self.driver.add_server(proto, 100, '1.2.3.4', 7777, 10)
|
||||
servers = self.driver._config[proto]['servers']
|
||||
self.assertEqual(len(servers), 1)
|
||||
self.assertEqual(servers[0], (100, '1.2.3.4', 7777, 10))
|
||||
self.assertEqual(servers[0], (100, '1.2.3.4', 7777, 10, False))
|
||||
|
||||
def testServerWeightStr(self):
|
||||
""" Test setting string server weights """
|
||||
@@ -73,7 +101,7 @@ class TestHAProxyDriver(testtools.TestCase):
|
||||
self.driver.add_server(proto, 100, '1.2.3.4', 7777, "20")
|
||||
servers = self.driver._config[proto]['servers']
|
||||
self.assertEqual(len(servers), 1)
|
||||
self.assertEqual(servers[0], (100, '1.2.3.4', 7777, 20))
|
||||
self.assertEqual(servers[0], (100, '1.2.3.4', 7777, 20, False))
|
||||
|
||||
def testServerWeightInvalid(self):
|
||||
""" Test setting string server weights """
|
||||
|
||||
@@ -202,7 +202,11 @@ class LBaaSController(object):
|
||||
return self.msg
|
||||
|
||||
for lb_node in current_lb['nodes']:
|
||||
port, address, node_id, weight = None, None, None, None
|
||||
port = None
|
||||
address = None
|
||||
node_id = None
|
||||
weight = None
|
||||
backup = False
|
||||
|
||||
if 'port' in lb_node:
|
||||
port = lb_node['port']
|
||||
@@ -222,21 +226,30 @@ class LBaaSController(object):
|
||||
if 'weight' in lb_node:
|
||||
weight = lb_node['weight']
|
||||
|
||||
if 'backup' in lb_node and lb_node['backup'].lower() == 'true':
|
||||
backup = True
|
||||
|
||||
try:
|
||||
self.driver.add_server(current_lb['protocol'],
|
||||
node_id,
|
||||
address,
|
||||
port,
|
||||
weight)
|
||||
weight,
|
||||
backup)
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
"Selected driver does not support adding a server."
|
||||
)
|
||||
lb_node['condition'] = self.NODE_ERR
|
||||
error = "Selected driver does not support adding a server"
|
||||
self.logger.error(error)
|
||||
self.msg[self.ERROR_FIELD] = error
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
except Exception as e:
|
||||
self.logger.error("Failure trying adding server: %s, %s" %
|
||||
(e.__class__, e))
|
||||
lb_node['condition'] = self.NODE_ERR
|
||||
error = "Failure adding server %s: %s" % (node_id, e)
|
||||
self.logger.error(error)
|
||||
self.msg[self.ERROR_FIELD] = error
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
else:
|
||||
self.logger.debug("Added server: %s:%s" % (address, port))
|
||||
lb_node['condition'] = self.NODE_OK
|
||||
|
||||
@@ -48,7 +48,7 @@ class LoadBalancerDriver(object):
|
||||
""" Add a supported protocol and listening port for the instance. """
|
||||
raise NotImplementedError()
|
||||
|
||||
def add_server(self, protocol, host, port, weight):
|
||||
def add_server(self, protocol, host, port, weight, backup):
|
||||
""" Add a server for the protocol for which we will proxy. """
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@@ -82,36 +82,39 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
|
||||
for proto in self._config:
|
||||
protocfg = self._config[proto]
|
||||
real_proto = proto
|
||||
if proto == 'galera':
|
||||
real_proto = 'tcp'
|
||||
|
||||
#------------------------
|
||||
# Frontend configuration
|
||||
#------------------------
|
||||
output.append('frontend %s-in' % proto)
|
||||
output.append(' mode %s' % proto)
|
||||
output.append('frontend %s-in' % real_proto)
|
||||
output.append(' mode %s' % real_proto)
|
||||
output.append(' bind %s:%s' % (protocfg['bind_address'],
|
||||
protocfg['bind_port']))
|
||||
output.append(' default_backend %s-servers' % proto)
|
||||
output.append(' default_backend %s-servers' % real_proto)
|
||||
|
||||
# HTTP specific options for the frontend
|
||||
if proto == 'http':
|
||||
if real_proto == 'http':
|
||||
output.append(' option httplog')
|
||||
# TCP specific options for the frontend
|
||||
elif proto == 'tcp':
|
||||
elif real_proto == 'tcp':
|
||||
output.append(' option tcplog')
|
||||
|
||||
#------------------------
|
||||
# Backend configuration
|
||||
#------------------------
|
||||
|
||||
output.append('backend %s-servers' % proto)
|
||||
output.append(' mode %s' % proto)
|
||||
output.append('backend %s-servers' % real_proto)
|
||||
output.append(' mode %s' % real_proto)
|
||||
output.append(' balance %s' % protocfg['algorithm'])
|
||||
|
||||
# default healthcheck if none specified
|
||||
monitor = 'check inter 30s'
|
||||
|
||||
# HTTP specific options for the backend
|
||||
if proto == 'http':
|
||||
if real_proto == 'http':
|
||||
output.append(' cookie SERVERID insert indirect')
|
||||
output.append(' option httpclose')
|
||||
output.append(' option forwardfor')
|
||||
@@ -127,19 +130,40 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
monitor = "check inter %ds rise %d fall %d" % (
|
||||
mon['delay'], mon['attempts'], mon['attempts'])
|
||||
|
||||
for (node_id, addr, port, weight) in protocfg['servers']:
|
||||
output.append(' server id-%s %s:%s cookie id-%s '
|
||||
'weight %d %s' %
|
||||
(node_id, addr, port, node_id,
|
||||
weight, monitor))
|
||||
for (node_id, addr, port, wt, bkup) in protocfg['servers']:
|
||||
if bkup:
|
||||
output.append(
|
||||
' server id-%s %s:%s backup cookie id-%s'
|
||||
' weight %d %s' %
|
||||
(node_id, addr, port, node_id, wt, monitor)
|
||||
)
|
||||
else:
|
||||
output.append(
|
||||
' server id-%s %s:%s cookie id-%s'
|
||||
' weight %d %s' %
|
||||
(node_id, addr, port, node_id, wt, monitor)
|
||||
)
|
||||
|
||||
# TCP or Galera specific options for the backend
|
||||
#
|
||||
# The Galera protocol is a convenience option that lets us set
|
||||
# our TCP options specifically for load balancing between Galera
|
||||
# database nodes in a manner that helps avoid deadlocks. A main
|
||||
# node is chosen which will act as the 'write' node, sending all
|
||||
# updates to this one node.
|
||||
|
||||
# TCP specific options for the backend
|
||||
else:
|
||||
# Allow session stickiness for TCP connections. The 'size'
|
||||
# value affects memory usage (about 50 bytes per entry).
|
||||
output.append(' stick-table type ip size 200k expire 30m')
|
||||
output.append(' stick store-request src')
|
||||
output.append(' stick match src')
|
||||
|
||||
# No stick table for Galera protocol since we want to return to
|
||||
# the main backend node once it is available after being down.
|
||||
if proto == 'tcp':
|
||||
# Allow session stickiness for TCP connections. The 'size'
|
||||
# value affects memory usage (about 50 bytes per entry).
|
||||
output.append(
|
||||
' stick-table type ip size 200k expire 30m'
|
||||
)
|
||||
output.append(' stick store-request src')
|
||||
output.append(' stick match src')
|
||||
|
||||
if 'monitor' in self._config[proto]:
|
||||
mon = self._config[proto]['monitor']
|
||||
@@ -152,9 +176,17 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
monitor = "check inter %ds rise %d fall %d" % (
|
||||
mon['delay'], mon['attempts'], mon['attempts'])
|
||||
|
||||
for (node_id, addr, port, weight) in protocfg['servers']:
|
||||
output.append(' server id-%s %s:%s weight %d %s' %
|
||||
(node_id, addr, port, weight, monitor))
|
||||
for (node_id, addr, port, wt, bkup) in protocfg['servers']:
|
||||
if bkup:
|
||||
output.append(
|
||||
' server id-%s %s:%s backup weight %d %s' %
|
||||
(node_id, addr, port, wt, monitor)
|
||||
)
|
||||
else:
|
||||
output.append(
|
||||
' server id-%s %s:%s weight %d %s' %
|
||||
(node_id, addr, port, wt, monitor)
|
||||
)
|
||||
|
||||
return '\n'.join(output) + '\n'
|
||||
|
||||
@@ -251,7 +283,7 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
|
||||
def add_protocol(self, protocol, port=None):
|
||||
proto = protocol.lower()
|
||||
if proto not in ('tcp', 'http', 'health'):
|
||||
if proto not in ('tcp', 'http', 'galera'):
|
||||
raise Exception("Unsupported protocol: %s" % protocol)
|
||||
if proto in self._config:
|
||||
raise Exception("Protocol '%s' is already defined." % protocol)
|
||||
@@ -259,14 +291,15 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
self._config[proto] = dict()
|
||||
|
||||
if port is None:
|
||||
if proto == 'tcp':
|
||||
raise Exception('Port is required for TCP protocol.')
|
||||
if proto in ('tcp', 'galera'):
|
||||
raise Exception('Port is required for this protocol.')
|
||||
elif proto == 'http':
|
||||
self._bind(proto, '0.0.0.0', 80)
|
||||
else:
|
||||
self._bind(proto, '0.0.0.0', port)
|
||||
|
||||
def add_server(self, protocol, node_id, host, port, weight=1):
|
||||
def add_server(self, protocol, node_id, host, port,
|
||||
weight=1, backup=False):
|
||||
proto = protocol.lower()
|
||||
if weight is None:
|
||||
weight = 1
|
||||
@@ -281,7 +314,15 @@ class HAProxyDriver(LoadBalancerDriver):
|
||||
|
||||
if 'servers' not in self._config[proto]:
|
||||
self._config[proto]['servers'] = []
|
||||
self._config[proto]['servers'].append((node_id, host, port, weight))
|
||||
|
||||
if proto == 'galera':
|
||||
for (n, h, p, w, b) in self._config[proto]['servers']:
|
||||
if b is False and backup is False:
|
||||
raise Exception("Galera protocol does not accept more"
|
||||
" than one non-backup node")
|
||||
|
||||
self._config[proto]['servers'].append((node_id, host, port,
|
||||
weight, backup))
|
||||
|
||||
def set_algorithm(self, protocol, algo):
|
||||
proto = protocol.lower()
|
||||
|
||||
Reference in New Issue
Block a user