Minimize the effect overloaded Health Manager processes

If a Health Manager is overloaded, it can begin to fall very far behind
in processing health updates. This causes huge delays in the whole
system and can cause two distinctly different issues:

1) If the HMs are all suddenly busy, delays can be long enough that no
messages get through within the failover timeout, and amps start to
fail, increasing load on the HMs and causing a cascade failure (I have
witnessed this happen once and take down over 50 LBs before manual
intervention could be taken)..

2) Even one overloaded HM can cause updates to queue for extremely long
periods, which makes the system unreliable. Amps can go down and still
have health updates register for some time as the HM processes the queue
(in some cases I have seen dead amps updated for 5-10 minutes).

If we short-circuit handling before we update the health table, we can
solve these problems in two ways:

1) The heavy processing generally happens after this, so
short-circuiting early will let some other threads finish faster and
have some chance of success.

2) Amphora health won't continue to be updated long after the messages
were received, so it won't be possible for zombie amphorae to eat as
many brains.

Change-Id: Iceeacfdcaebe1f9bb99bc08e318c9da73a66898d
This commit is contained in:
Adam Harwell 2018-01-03 12:27:13 -08:00
parent 5f5634117e
commit 61e0c14f48
4 changed files with 80 additions and 23 deletions

View File

@ -14,6 +14,7 @@
from concurrent import futures
import socket
import time
from oslo_config import cfg
from oslo_log import log as logging
@ -166,6 +167,7 @@ class UDPStatusGetter(object):
(data, srcaddr) = self.sock.recvfrom(UDP_MAX_SIZE)
LOG.debug('Received packet from %s', srcaddr)
obj = status_message.unwrap_envelope(data, self.key)
obj['recv_time'] = time.time()
return obj, srcaddr
def check(self):

View File

@ -13,6 +13,7 @@
# under the License.
import datetime
import time
from oslo_config import cfg
from oslo_log import log as logging
@ -26,6 +27,7 @@ from octavia.controller.healthmanager import update_serializer
from octavia.db import api as db_api
from octavia.db import repositories as repo
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -35,7 +37,7 @@ class UpdateHealthDb(object):
# first setup repo for amphora, listener,member(nodes),pool repo
self.event_streamer = stevedore_driver.DriverManager(
namespace='octavia.controller.queues',
name=cfg.CONF.health_manager.event_streamer_driver,
name=CONF.health_manager.event_streamer_driver,
invoke_on_load=True).driver
self.amphora_repo = repo.AmphoraRepository()
self.amphora_health_repo = repo.AmphoraHealthRepository()
@ -43,7 +45,7 @@ class UpdateHealthDb(object):
self.loadbalancer_repo = repo.LoadBalancerRepository()
self.member_repo = repo.MemberRepository()
self.pool_repo = repo.PoolRepository()
self.sync_prv_status = cfg.CONF.health_manager.sync_provisioning_status
self.sync_prv_status = CONF.health_manager.sync_provisioning_status
def emit(self, info_type, info_id, info_obj):
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
@ -113,6 +115,19 @@ class UpdateHealthDb(object):
if len(listeners) == expected_listener_count:
lock_session = db_api.get_session(autocommit=False)
# if we're running too far behind, warn and bail
proc_delay = time.time() - health['recv_time']
hb_interval = CONF.health_manager.heartbeat_interval
if proc_delay >= hb_interval:
LOG.warning('Amphora %(id)s health message was processed too '
'slowly: %(delay)ss! The system may be overloaded '
'or otherwise malfunctioning. This heartbeat has '
'been ignored and no update was made to the '
'amphora health entry. THIS IS NOT GOOD.',
{'id': health['id'], 'delay': proc_delay})
return
# if the input amphora is healthy, we update its db info
try:
self.amphora_health_repo.replace(
@ -295,7 +310,7 @@ class UpdateStatsDb(stats.StatsMixin):
super(UpdateStatsDb, self).__init__()
self.event_streamer = stevedore_driver.DriverManager(
namespace='octavia.controller.queues',
name=cfg.CONF.health_manager.event_streamer_driver,
name=CONF.health_manager.event_streamer_driver,
invoke_on_load=True).driver
self.repo_listener = repo.ListenerRepository()

View File

@ -92,6 +92,7 @@ class TestHeartbeatUDP(base.TestCase):
recvfrom.return_value = bin_msg, 2
(obj, srcaddr) = getter.dorecv()
self.assertEqual(2, srcaddr)
self.assertIsNotNone(obj.pop('recv_time'))
self.assertEqual({"testkey": "TEST"}, obj)
@mock.patch('socket.getaddrinfo')

View File

@ -13,6 +13,7 @@
# under the License.
import random
import time
import mock
from oslo_config import cfg
@ -115,7 +116,8 @@ class TestUpdateHealthDb(base.TestCase):
}
}
}
}
},
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@ -140,7 +142,9 @@ class TestUpdateHealthDb(base.TestCase):
health = {
"id": self.FAKE_UUID_1,
"listeners": {}}
"listeners": {},
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=False, pool=False))
@ -150,11 +154,29 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
self.assertTrue(self.loadbalancer_repo.update.called)
def test_update_health_recv_time_stale(self):
hb_interval = cfg.CONF.health_manager.heartbeat_interval
health = {
"id": self.FAKE_UUID_1,
"listeners": {},
"recv_time": time.time() - hb_interval - 1 # extra -1 for buffer
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=False, pool=False))
self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.update_health(health)
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
# Receive time is stale, so we shouldn't see this called
self.assertFalse(self.loadbalancer_repo.update.called)
def test_update_health_replace_error(self):
health = {
"id": self.FAKE_UUID_1,
"listeners": {}
"listeners": {},
"recv_time": time.time()
}
self.session_mock.commit.side_effect = TestException('boom')
@ -176,7 +198,8 @@ class TestUpdateHealthDb(base.TestCase):
}
}
}
}
},
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@ -226,7 +249,8 @@ class TestUpdateHealthDb(base.TestCase):
}
}
}
}
},
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@ -270,7 +294,8 @@ class TestUpdateHealthDb(base.TestCase):
}
}
}
}
},
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
@ -311,7 +336,8 @@ class TestUpdateHealthDb(base.TestCase):
}
}
}
}
},
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@ -342,7 +368,8 @@ class TestUpdateHealthDb(base.TestCase):
"pools": {
"pool-id-1": {
"status": constants.UP,
"members": {"member-id-1": constants.DRAIN}}}}}}
"members": {"member-id-1": constants.DRAIN}}}}},
"recv_time": time.time()}
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
self._make_mock_lb_tree())
@ -381,7 +408,8 @@ class TestUpdateHealthDb(base.TestCase):
"pools": {
"pool-id-1": {
"status": constants.UP,
"members": {"member-id-1": constants.MAINT}}}}}}
"members": {"member-id-1": constants.MAINT}}}}},
"recv_time": time.time()}
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
self._make_mock_lb_tree())
@ -420,7 +448,8 @@ class TestUpdateHealthDb(base.TestCase):
"pools": {
"pool-id-1": {
"status": constants.UP,
"members": {"member-id-1": "blah"}}}}}}
"members": {"member-id-1": "blah"}}}}},
"recv_time": time.time()}
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
self._make_mock_lb_tree())
@ -454,7 +483,8 @@ class TestUpdateHealthDb(base.TestCase):
}
}
}
}
},
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@ -497,7 +527,8 @@ class TestUpdateHealthDb(base.TestCase):
}
}
}
}
},
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@ -538,7 +569,8 @@ class TestUpdateHealthDb(base.TestCase):
"pool-id-1": {
"status": constants.UP,
"members": {
"member-id-1": constants.UP}}}}}}
"member-id-1": constants.UP}}}}},
"recv_time": time.time()}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(members=2))
@ -581,7 +613,8 @@ class TestUpdateHealthDb(base.TestCase):
}
}
}
}
},
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@ -631,7 +664,8 @@ class TestUpdateHealthDb(base.TestCase):
}
}
}
}
},
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@ -705,7 +739,8 @@ class TestUpdateHealthDb(base.TestCase):
}
}
}
}
},
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
@ -762,7 +797,8 @@ class TestUpdateHealthDb(base.TestCase):
}
}
}
}
},
"recv_time": time.time()
}
self.hm.listener_repo.update.side_effect = (
@ -815,7 +851,8 @@ class TestUpdateHealthDb(base.TestCase):
}
}
}
}
},
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
@ -838,7 +875,8 @@ class TestUpdateHealthDb(base.TestCase):
def test_update_health_lb_admin_down(self):
health = {
"id": self.FAKE_UUID_1,
"listeners": {}}
"listeners": {},
"recv_time": time.time()}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=False, pool=False))
@ -855,7 +893,8 @@ class TestUpdateHealthDb(base.TestCase):
def test_update_health_lb_admin_up(self):
health = {
"id": self.FAKE_UUID_1,
"listeners": {}}
"listeners": {},
"recv_time": time.time()}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=False, pool=False))