Support server node weights.
Support setting a weighting value for each node that will determine the proportion of load balancing requests that it receives in relation to the other defined servers and their weights. Also adds a file describing the worker JSON message versioning. Fixes bug 1117349 Change-Id: Ie1460c165f03b0523ee28d864b74ea9e5858b256
This commit is contained in:
29
WORKER_MSG_FMT_VERSIONS
Normal file
29
WORKER_MSG_FMT_VERSIONS
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
# The worker versions the JSON messages that it comprehends. This version
|
||||||
|
# is used for the DISCOVER message that can be sent by the API server. E.g.,
|
||||||
|
#
|
||||||
|
# { 'hpcs_action': 'DISCOVER' }
|
||||||
|
#
|
||||||
|
# And an example response from the worker:
|
||||||
|
#
|
||||||
|
# {
|
||||||
|
# 'hpcs_action': 'DISCOVER',
|
||||||
|
# 'version': '1.0',
|
||||||
|
# 'hpcs_response': 'PASS'
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# The version number is set in the method:
|
||||||
|
#
|
||||||
|
# libra.worker.controller._action_discover()
|
||||||
|
#
|
||||||
|
# Version format is: <major>.<minor>
|
||||||
|
#
|
||||||
|
# A change to the <minor> value means that a change has been made, but
|
||||||
|
# we are still backwards compatible. A change to the <major> value means
|
||||||
|
# a significant change was made and we are no longer compatible with older
|
||||||
|
# versions.
|
||||||
|
|
||||||
|
[Version 1.1]
|
||||||
|
- Accept node 'weight' values.
|
||||||
|
|
||||||
|
[Version 1.0]
|
||||||
|
- Initial version.
|
||||||
@@ -75,7 +75,7 @@ class LBaaSController(object):
|
|||||||
version this worker supports.
|
version this worker supports.
|
||||||
"""
|
"""
|
||||||
# Version of the JSON message format that this worker understands.
|
# Version of the JSON message format that this worker understands.
|
||||||
msg_fmt_version = "1.0"
|
msg_fmt_version = "1.1"
|
||||||
self.msg['version'] = msg_fmt_version
|
self.msg['version'] = msg_fmt_version
|
||||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
|
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
|
||||||
return self.msg
|
return self.msg
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ class LoadBalancerDriver(object):
|
|||||||
""" Add a supported protocol and listening port for the instance. """
|
""" Add a supported protocol and listening port for the instance. """
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def add_server(self, protocol, host, port):
|
def add_server(self, protocol, host, port, weight):
|
||||||
""" Add a server for the protocol for which we will proxy. """
|
""" Add a server for the protocol for which we will proxy. """
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|||||||
@@ -92,9 +92,9 @@ class HAProxyDriver(LoadBalancerDriver):
|
|||||||
if proto == 'http':
|
if proto == 'http':
|
||||||
output.append(' cookie SERVERID rewrite')
|
output.append(' cookie SERVERID rewrite')
|
||||||
|
|
||||||
for (addr, port) in protocfg['servers']:
|
for (addr, port, weight) in protocfg['servers']:
|
||||||
output.append(' server server%d %s:%s' %
|
output.append(' server server%d %s:%s weight %d' %
|
||||||
(serv_num, addr, port))
|
(serv_num, addr, port, weight))
|
||||||
serv_num += 1
|
serv_num += 1
|
||||||
|
|
||||||
return '\n'.join(output) + '\n'
|
return '\n'.join(output) + '\n'
|
||||||
@@ -123,11 +123,20 @@ class HAProxyDriver(LoadBalancerDriver):
|
|||||||
else:
|
else:
|
||||||
self._bind(proto, '0.0.0.0', port)
|
self._bind(proto, '0.0.0.0', port)
|
||||||
|
|
||||||
def add_server(self, protocol, host, port):
|
def add_server(self, protocol, host, port, weight=1):
|
||||||
proto = protocol.lower()
|
proto = protocol.lower()
|
||||||
|
|
||||||
|
try:
|
||||||
|
weight = int(weight)
|
||||||
|
except ValueError:
|
||||||
|
raise Exception("Non-integer 'weight' value: '%s'" % weight)
|
||||||
|
|
||||||
|
if weight > 256:
|
||||||
|
raise Exception("Server 'weight' %d exceeds max of 256" % weight)
|
||||||
|
|
||||||
if 'servers' not in self._config[proto]:
|
if 'servers' not in self._config[proto]:
|
||||||
self._config[proto]['servers'] = []
|
self._config[proto]['servers'] = []
|
||||||
self._config[proto]['servers'].append((host, port))
|
self._config[proto]['servers'].append((host, port, weight))
|
||||||
|
|
||||||
def set_algorithm(self, protocol, algo):
|
def set_algorithm(self, protocol, algo):
|
||||||
proto = protocol.lower()
|
proto = protocol.lower()
|
||||||
|
|||||||
@@ -41,8 +41,8 @@ class TestHAProxyDriver(testtools.TestCase):
|
|||||||
self.assertIn('servers', self.driver._config[proto])
|
self.assertIn('servers', self.driver._config[proto])
|
||||||
servers = self.driver._config[proto]['servers']
|
servers = self.driver._config[proto]['servers']
|
||||||
self.assertEqual(len(servers), 2)
|
self.assertEqual(len(servers), 2)
|
||||||
self.assertEqual(servers[0], ('1.2.3.4', 7777))
|
self.assertEqual(servers[0], ('1.2.3.4', 7777, 1))
|
||||||
self.assertEqual(servers[1], ('5.6.7.8', 8888))
|
self.assertEqual(servers[1], ('5.6.7.8', 8888, 1))
|
||||||
|
|
||||||
def testSetAlgorithm(self):
|
def testSetAlgorithm(self):
|
||||||
""" Test the HAProxy set_algorithm() method """
|
""" Test the HAProxy set_algorithm() method """
|
||||||
@@ -56,3 +56,35 @@ class TestHAProxyDriver(testtools.TestCase):
|
|||||||
self.assertEqual(self.driver._config[proto]['algorithm'], 'leastconn')
|
self.assertEqual(self.driver._config[proto]['algorithm'], 'leastconn')
|
||||||
e = self.assertRaises(Exception, self.driver.set_algorithm, proto, 99)
|
e = self.assertRaises(Exception, self.driver.set_algorithm, proto, 99)
|
||||||
self.assertEqual("Invalid algorithm: http", e.message)
|
self.assertEqual("Invalid algorithm: http", e.message)
|
||||||
|
|
||||||
|
def testServerWeightInt(self):
|
||||||
|
""" Test setting integer server weights """
|
||||||
|
proto = 'http'
|
||||||
|
self.driver.add_protocol(proto, None)
|
||||||
|
self.driver.add_server(proto, '1.2.3.4', 7777, 10)
|
||||||
|
servers = self.driver._config[proto]['servers']
|
||||||
|
self.assertEqual(len(servers), 1)
|
||||||
|
self.assertEqual(servers[0], ('1.2.3.4', 7777, 10))
|
||||||
|
|
||||||
|
def testServerWeightStr(self):
|
||||||
|
""" Test setting string server weights """
|
||||||
|
proto = 'http'
|
||||||
|
self.driver.add_protocol(proto, None)
|
||||||
|
self.driver.add_server(proto, '1.2.3.4', 7777, "20")
|
||||||
|
servers = self.driver._config[proto]['servers']
|
||||||
|
self.assertEqual(len(servers), 1)
|
||||||
|
self.assertEqual(servers[0], ('1.2.3.4', 7777, 20))
|
||||||
|
|
||||||
|
def testServerWeightInvalid(self):
|
||||||
|
""" Test setting string server weights """
|
||||||
|
proto = 'http'
|
||||||
|
self.driver.add_protocol(proto, None)
|
||||||
|
e = self.assertRaises(
|
||||||
|
Exception,
|
||||||
|
self.driver.add_server, proto, '1.2.3.4', 7777, 257)
|
||||||
|
self.assertEqual("Server 'weight' 257 exceeds max of 256", e.message)
|
||||||
|
|
||||||
|
e = self.assertRaises(
|
||||||
|
Exception,
|
||||||
|
self.driver.add_server, proto, '1.2.3.4', 7777, "abc")
|
||||||
|
self.assertEqual("Non-integer 'weight' value: 'abc'", e.message)
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
|
fixtures>=0.3.12
|
||||||
pep8
|
pep8
|
||||||
mock
|
mock
|
||||||
httplib2
|
httplib2
|
||||||
|
python-subunit
|
||||||
sphinx>=1.1.2
|
sphinx>=1.1.2
|
||||||
testrepository>=0.0.8
|
testrepository>=0.0.8
|
||||||
testtools>=0.9.22
|
testtools>=0.9.22
|
||||||
|
|||||||
Reference in New Issue
Block a user