Minimize the effect overloaded Health Manager processes
If a Health Manager is overloaded, it can begin to fall very far behind in processing health updates. This causes huge delays in the whole system and can cause two distinctly different issues: 1) If the HMs are all suddenly busy, delays can be long enough that no messages get through within the failover timeout, and amps start to fail, increasing load on the HMs and causing a cascade failure (I have witnessed this happen once and take down over 50 LBs before manual intervention could be taken).. 2) Even one overloaded HM can cause updates to queue for extremely long periods, which makes the system unreliable. Amps can go down and still have health updates register for some time as the HM processes the queue (in some cases I have seen dead amps updated for 5-10 minutes). If we short-circuit handling before we update the health table, we can solve these problems in two ways: 1) The heavy processing generally happens after this, so short-circuiting early will let some other threads finish faster and have some chance of success. 2) Amphora health won't continue to be updated long after the messages were received, so it won't be possible for zombie amphorae to eat as many brains. Change-Id: Iceeacfdcaebe1f9bb99bc08e318c9da73a66898d (cherry picked from commit 61e0c14f48130d1d0519fa5527d2712ba6ce504f)
This commit is contained in:
parent
ed77d01939
commit
78f1c7b128
octavia
amphorae/drivers/health
controller/healthmanager
tests/unit
@ -13,6 +13,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import socket
|
import socket
|
||||||
|
import time
|
||||||
|
|
||||||
from concurrent import futures
|
from concurrent import futures
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
@ -166,6 +167,7 @@ class UDPStatusGetter(object):
|
|||||||
(data, srcaddr) = self.sock.recvfrom(UDP_MAX_SIZE)
|
(data, srcaddr) = self.sock.recvfrom(UDP_MAX_SIZE)
|
||||||
LOG.debug('Received packet from %s', srcaddr)
|
LOG.debug('Received packet from %s', srcaddr)
|
||||||
obj = status_message.unwrap_envelope(data, self.key)
|
obj = status_message.unwrap_envelope(data, self.key)
|
||||||
|
obj['recv_time'] = time.time()
|
||||||
return obj, srcaddr
|
return obj, srcaddr
|
||||||
|
|
||||||
def check(self):
|
def check(self):
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
|
import time
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
@ -26,6 +27,7 @@ from octavia.controller.healthmanager import update_serializer
|
|||||||
from octavia.db import api as db_api
|
from octavia.db import api as db_api
|
||||||
from octavia.db import repositories as repo
|
from octavia.db import repositories as repo
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@ -35,7 +37,7 @@ class UpdateHealthDb(object):
|
|||||||
# first setup repo for amphora, listener,member(nodes),pool repo
|
# first setup repo for amphora, listener,member(nodes),pool repo
|
||||||
self.event_streamer = stevedore_driver.DriverManager(
|
self.event_streamer = stevedore_driver.DriverManager(
|
||||||
namespace='octavia.controller.queues',
|
namespace='octavia.controller.queues',
|
||||||
name=cfg.CONF.health_manager.event_streamer_driver,
|
name=CONF.health_manager.event_streamer_driver,
|
||||||
invoke_on_load=True).driver
|
invoke_on_load=True).driver
|
||||||
self.amphora_repo = repo.AmphoraRepository()
|
self.amphora_repo = repo.AmphoraRepository()
|
||||||
self.amphora_health_repo = repo.AmphoraHealthRepository()
|
self.amphora_health_repo = repo.AmphoraHealthRepository()
|
||||||
@ -43,7 +45,7 @@ class UpdateHealthDb(object):
|
|||||||
self.loadbalancer_repo = repo.LoadBalancerRepository()
|
self.loadbalancer_repo = repo.LoadBalancerRepository()
|
||||||
self.member_repo = repo.MemberRepository()
|
self.member_repo = repo.MemberRepository()
|
||||||
self.pool_repo = repo.PoolRepository()
|
self.pool_repo = repo.PoolRepository()
|
||||||
self.sync_prv_status = cfg.CONF.health_manager.sync_provisioning_status
|
self.sync_prv_status = CONF.health_manager.sync_provisioning_status
|
||||||
|
|
||||||
def emit(self, info_type, info_id, info_obj):
|
def emit(self, info_type, info_id, info_obj):
|
||||||
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
|
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
|
||||||
@ -121,6 +123,19 @@ class UpdateHealthDb(object):
|
|||||||
if len(listeners) == expected_listener_count:
|
if len(listeners) == expected_listener_count:
|
||||||
|
|
||||||
lock_session = db_api.get_session(autocommit=False)
|
lock_session = db_api.get_session(autocommit=False)
|
||||||
|
|
||||||
|
# if we're running too far behind, warn and bail
|
||||||
|
proc_delay = time.time() - health['recv_time']
|
||||||
|
hb_interval = CONF.health_manager.heartbeat_interval
|
||||||
|
if proc_delay >= hb_interval:
|
||||||
|
LOG.warning('Amphora %(id)s health message was processed too '
|
||||||
|
'slowly: %(delay)ss! The system may be overloaded '
|
||||||
|
'or otherwise malfunctioning. This heartbeat has '
|
||||||
|
'been ignored and no update was made to the '
|
||||||
|
'amphora health entry. THIS IS NOT GOOD.',
|
||||||
|
{'id': health['id'], 'delay': proc_delay})
|
||||||
|
return
|
||||||
|
|
||||||
# if the input amphora is healthy, we update its db info
|
# if the input amphora is healthy, we update its db info
|
||||||
try:
|
try:
|
||||||
self.amphora_health_repo.replace(
|
self.amphora_health_repo.replace(
|
||||||
@ -303,7 +318,7 @@ class UpdateStatsDb(stats.StatsMixin):
|
|||||||
super(UpdateStatsDb, self).__init__()
|
super(UpdateStatsDb, self).__init__()
|
||||||
self.event_streamer = stevedore_driver.DriverManager(
|
self.event_streamer = stevedore_driver.DriverManager(
|
||||||
namespace='octavia.controller.queues',
|
namespace='octavia.controller.queues',
|
||||||
name=cfg.CONF.health_manager.event_streamer_driver,
|
name=CONF.health_manager.event_streamer_driver,
|
||||||
invoke_on_load=True).driver
|
invoke_on_load=True).driver
|
||||||
self.repo_listener = repo.ListenerRepository()
|
self.repo_listener = repo.ListenerRepository()
|
||||||
|
|
||||||
|
@ -92,6 +92,7 @@ class TestHeartbeatUDP(base.TestCase):
|
|||||||
recvfrom.return_value = bin_msg, 2
|
recvfrom.return_value = bin_msg, 2
|
||||||
(obj, srcaddr) = getter.dorecv()
|
(obj, srcaddr) = getter.dorecv()
|
||||||
self.assertEqual(2, srcaddr)
|
self.assertEqual(2, srcaddr)
|
||||||
|
self.assertIsNotNone(obj.pop('recv_time'))
|
||||||
self.assertEqual({"testkey": "TEST"}, obj)
|
self.assertEqual({"testkey": "TEST"}, obj)
|
||||||
|
|
||||||
@mock.patch('socket.getaddrinfo')
|
@mock.patch('socket.getaddrinfo')
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import random
|
import random
|
||||||
|
import time
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
@ -115,7 +116,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"recv_time": time.time()
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||||
@ -140,7 +142,9 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
|
|
||||||
health = {
|
health = {
|
||||||
"id": self.FAKE_UUID_1,
|
"id": self.FAKE_UUID_1,
|
||||||
"listeners": {}}
|
"listeners": {},
|
||||||
|
"recv_time": time.time()
|
||||||
|
}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||||
self._make_mock_lb_tree(listener=False, pool=False))
|
self._make_mock_lb_tree(listener=False, pool=False))
|
||||||
@ -150,11 +154,29 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
|
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
|
||||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||||
|
|
||||||
|
def test_update_health_recv_time_stale(self):
|
||||||
|
hb_interval = cfg.CONF.health_manager.heartbeat_interval
|
||||||
|
health = {
|
||||||
|
"id": self.FAKE_UUID_1,
|
||||||
|
"listeners": {},
|
||||||
|
"recv_time": time.time() - hb_interval - 1 # extra -1 for buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||||
|
self._make_mock_lb_tree(listener=False, pool=False))
|
||||||
|
self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
|
||||||
|
|
||||||
|
self.hm.update_health(health)
|
||||||
|
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
|
||||||
|
# Receive time is stale, so we shouldn't see this called
|
||||||
|
self.assertFalse(self.loadbalancer_repo.update.called)
|
||||||
|
|
||||||
def test_update_health_replace_error(self):
|
def test_update_health_replace_error(self):
|
||||||
|
|
||||||
health = {
|
health = {
|
||||||
"id": self.FAKE_UUID_1,
|
"id": self.FAKE_UUID_1,
|
||||||
"listeners": {}
|
"listeners": {},
|
||||||
|
"recv_time": time.time()
|
||||||
}
|
}
|
||||||
|
|
||||||
self.session_mock.commit.side_effect = TestException('boom')
|
self.session_mock.commit.side_effect = TestException('boom')
|
||||||
@ -176,7 +198,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"recv_time": time.time()
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||||
@ -226,7 +249,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"recv_time": time.time()
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||||
@ -270,7 +294,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"recv_time": time.time()
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||||
@ -311,7 +336,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"recv_time": time.time()
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||||
@ -342,7 +368,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
"pools": {
|
"pools": {
|
||||||
"pool-id-1": {
|
"pool-id-1": {
|
||||||
"status": constants.UP,
|
"status": constants.UP,
|
||||||
"members": {"member-id-1": constants.DRAIN}}}}}}
|
"members": {"member-id-1": constants.DRAIN}}}}},
|
||||||
|
"recv_time": time.time()}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||||
self._make_mock_lb_tree())
|
self._make_mock_lb_tree())
|
||||||
@ -381,7 +408,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
"pools": {
|
"pools": {
|
||||||
"pool-id-1": {
|
"pool-id-1": {
|
||||||
"status": constants.UP,
|
"status": constants.UP,
|
||||||
"members": {"member-id-1": constants.MAINT}}}}}}
|
"members": {"member-id-1": constants.MAINT}}}}},
|
||||||
|
"recv_time": time.time()}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||||
self._make_mock_lb_tree())
|
self._make_mock_lb_tree())
|
||||||
@ -420,7 +448,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
"pools": {
|
"pools": {
|
||||||
"pool-id-1": {
|
"pool-id-1": {
|
||||||
"status": constants.UP,
|
"status": constants.UP,
|
||||||
"members": {"member-id-1": "blah"}}}}}}
|
"members": {"member-id-1": "blah"}}}}},
|
||||||
|
"recv_time": time.time()}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||||
self._make_mock_lb_tree())
|
self._make_mock_lb_tree())
|
||||||
@ -454,7 +483,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"recv_time": time.time()
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||||
@ -497,7 +527,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"recv_time": time.time()
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||||
@ -538,7 +569,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
"pool-id-1": {
|
"pool-id-1": {
|
||||||
"status": constants.UP,
|
"status": constants.UP,
|
||||||
"members": {
|
"members": {
|
||||||
"member-id-1": constants.UP}}}}}}
|
"member-id-1": constants.UP}}}}},
|
||||||
|
"recv_time": time.time()}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||||
self._make_mock_lb_tree(members=2))
|
self._make_mock_lb_tree(members=2))
|
||||||
@ -581,7 +613,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"recv_time": time.time()
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||||
@ -631,7 +664,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"recv_time": time.time()
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||||
@ -705,7 +739,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"recv_time": time.time()
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||||
@ -762,7 +797,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"recv_time": time.time()
|
||||||
}
|
}
|
||||||
|
|
||||||
self.hm.listener_repo.update.side_effect = (
|
self.hm.listener_repo.update.side_effect = (
|
||||||
@ -815,7 +851,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"recv_time": time.time()
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||||
@ -838,7 +875,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
def test_update_health_lb_admin_down(self):
|
def test_update_health_lb_admin_down(self):
|
||||||
health = {
|
health = {
|
||||||
"id": self.FAKE_UUID_1,
|
"id": self.FAKE_UUID_1,
|
||||||
"listeners": {}}
|
"listeners": {},
|
||||||
|
"recv_time": time.time()}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||||
self._make_mock_lb_tree(listener=False, pool=False))
|
self._make_mock_lb_tree(listener=False, pool=False))
|
||||||
@ -855,7 +893,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||||||
def test_update_health_lb_admin_up(self):
|
def test_update_health_lb_admin_up(self):
|
||||||
health = {
|
health = {
|
||||||
"id": self.FAKE_UUID_1,
|
"id": self.FAKE_UUID_1,
|
||||||
"listeners": {}}
|
"listeners": {},
|
||||||
|
"recv_time": time.time()}
|
||||||
|
|
||||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||||
self._make_mock_lb_tree(listener=False, pool=False))
|
self._make_mock_lb_tree(listener=False, pool=False))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user