From 78f1c7b1286873e9c70d075a9f3b90304f54a929 Mon Sep 17 00:00:00 2001
From: Adam Harwell <flux.adam@gmail.com>
Date: Wed, 3 Jan 2018 12:27:13 -0800
Subject: [PATCH] Minimize the effect overloaded Health Manager processes

If a Health Manager is overloaded, it can begin to fall very far behind
in processing health updates. This causes huge delays in the whole
system and can cause two distinctly different issues:

1) If the HMs are all suddenly busy, delays can be long enough that no
messages get through within the failover timeout, and amps start to
fail, increasing load on the HMs and causing a cascade failure (I have
witnessed this happen once and take down over 50 LBs before manual
intervention could be taken)..

2) Even one overloaded HM can cause updates to queue for extremely long
periods, which makes the system unreliable. Amps can go down and still
have health updates register for some time as the HM processes the queue
(in some cases I have seen dead amps updated for 5-10 minutes).

If we short-circuit handling before we update the health table, we can
solve these problems in two ways:

1) The heavy processing generally happens after this, so
short-circuiting early will let some other threads finish faster and
have some chance of success.

2) Amphora health won't continue to be updated long after the messages
were received, so it won't be possible for zombie amphorae to eat as
many brains.

Change-Id: Iceeacfdcaebe1f9bb99bc08e318c9da73a66898d
(cherry picked from commit 61e0c14f48130d1d0519fa5527d2712ba6ce504f)
---
 .../amphorae/drivers/health/heartbeat_udp.py  |  2 +
 octavia/controller/healthmanager/update_db.py | 21 ++++-
 .../drivers/health/test_heartbeat_udp.py      |  1 +
 .../healthmanager/test_update_db.py           | 79 ++++++++++++++-----
 4 files changed, 80 insertions(+), 23 deletions(-)

diff --git a/octavia/amphorae/drivers/health/heartbeat_udp.py b/octavia/amphorae/drivers/health/heartbeat_udp.py
index d395024b13..54232ae1bd 100644
--- a/octavia/amphorae/drivers/health/heartbeat_udp.py
+++ b/octavia/amphorae/drivers/health/heartbeat_udp.py
@@ -13,6 +13,7 @@
 #    under the License.
 
 import socket
+import time
 
 from concurrent import futures
 from oslo_config import cfg
@@ -166,6 +167,7 @@ class UDPStatusGetter(object):
         (data, srcaddr) = self.sock.recvfrom(UDP_MAX_SIZE)
         LOG.debug('Received packet from %s', srcaddr)
         obj = status_message.unwrap_envelope(data, self.key)
+        obj['recv_time'] = time.time()
         return obj, srcaddr
 
     def check(self):
diff --git a/octavia/controller/healthmanager/update_db.py b/octavia/controller/healthmanager/update_db.py
index 16a858e94e..9de4d7e857 100644
--- a/octavia/controller/healthmanager/update_db.py
+++ b/octavia/controller/healthmanager/update_db.py
@@ -13,6 +13,7 @@
 # under the License.
 
 import datetime
+import time
 
 from oslo_config import cfg
 from oslo_log import log as logging
@@ -26,6 +27,7 @@ from octavia.controller.healthmanager import update_serializer
 from octavia.db import api as db_api
 from octavia.db import repositories as repo
 
+CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
 
@@ -35,7 +37,7 @@ class UpdateHealthDb(object):
         # first setup repo for amphora, listener,member(nodes),pool repo
         self.event_streamer = stevedore_driver.DriverManager(
             namespace='octavia.controller.queues',
-            name=cfg.CONF.health_manager.event_streamer_driver,
+            name=CONF.health_manager.event_streamer_driver,
             invoke_on_load=True).driver
         self.amphora_repo = repo.AmphoraRepository()
         self.amphora_health_repo = repo.AmphoraHealthRepository()
@@ -43,7 +45,7 @@ class UpdateHealthDb(object):
         self.loadbalancer_repo = repo.LoadBalancerRepository()
         self.member_repo = repo.MemberRepository()
         self.pool_repo = repo.PoolRepository()
-        self.sync_prv_status = cfg.CONF.health_manager.sync_provisioning_status
+        self.sync_prv_status = CONF.health_manager.sync_provisioning_status
 
     def emit(self, info_type, info_id, info_obj):
         cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
@@ -121,6 +123,19 @@ class UpdateHealthDb(object):
         if len(listeners) == expected_listener_count:
 
             lock_session = db_api.get_session(autocommit=False)
+
+            # if we're running too far behind, warn and bail
+            proc_delay = time.time() - health['recv_time']
+            hb_interval = CONF.health_manager.heartbeat_interval
+            if proc_delay >= hb_interval:
+                LOG.warning('Amphora %(id)s health message was processed too '
+                            'slowly: %(delay)ss! The system may be overloaded '
+                            'or otherwise malfunctioning. This heartbeat has '
+                            'been ignored and no update was made to the '
+                            'amphora health entry. THIS IS NOT GOOD.',
+                            {'id': health['id'], 'delay': proc_delay})
+                return
+
             # if the input amphora is healthy, we update its db info
             try:
                 self.amphora_health_repo.replace(
@@ -303,7 +318,7 @@ class UpdateStatsDb(stats.StatsMixin):
         super(UpdateStatsDb, self).__init__()
         self.event_streamer = stevedore_driver.DriverManager(
             namespace='octavia.controller.queues',
-            name=cfg.CONF.health_manager.event_streamer_driver,
+            name=CONF.health_manager.event_streamer_driver,
             invoke_on_load=True).driver
         self.repo_listener = repo.ListenerRepository()
 
diff --git a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py
index 3a849b3c52..d3fa581668 100644
--- a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py
+++ b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py
@@ -92,6 +92,7 @@ class TestHeartbeatUDP(base.TestCase):
         recvfrom.return_value = bin_msg, 2
         (obj, srcaddr) = getter.dorecv()
         self.assertEqual(2, srcaddr)
+        self.assertIsNotNone(obj.pop('recv_time'))
         self.assertEqual({"testkey": "TEST"}, obj)
 
     @mock.patch('socket.getaddrinfo')
diff --git a/octavia/tests/unit/controller/healthmanager/test_update_db.py b/octavia/tests/unit/controller/healthmanager/test_update_db.py
index 2985f326f7..c792a63783 100644
--- a/octavia/tests/unit/controller/healthmanager/test_update_db.py
+++ b/octavia/tests/unit/controller/healthmanager/test_update_db.py
@@ -13,6 +13,7 @@
 # under the License.
 
 import random
+import time
 
 import mock
 from oslo_config import cfg
@@ -115,7 +116,8 @@ class TestUpdateHealthDb(base.TestCase):
                                   }
                 }
                 }
-            }
+            },
+            "recv_time": time.time()
         }
 
         mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@@ -140,7 +142,9 @@ class TestUpdateHealthDb(base.TestCase):
 
         health = {
             "id": self.FAKE_UUID_1,
-            "listeners": {}}
+            "listeners": {},
+            "recv_time": time.time()
+        }
 
         mock_lb, mock_listener1, mock_pool1, mock_members = (
             self._make_mock_lb_tree(listener=False, pool=False))
@@ -150,11 +154,29 @@ class TestUpdateHealthDb(base.TestCase):
         self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
         self.assertTrue(self.loadbalancer_repo.update.called)
 
+    def test_update_health_recv_time_stale(self):
+        hb_interval = cfg.CONF.health_manager.heartbeat_interval
+        health = {
+            "id": self.FAKE_UUID_1,
+            "listeners": {},
+            "recv_time": time.time() - hb_interval - 1  # extra -1 for buffer
+        }
+
+        mock_lb, mock_listener1, mock_pool1, mock_members = (
+            self._make_mock_lb_tree(listener=False, pool=False))
+        self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
+
+        self.hm.update_health(health)
+        self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
+        # Receive time is stale, so we shouldn't see this called
+        self.assertFalse(self.loadbalancer_repo.update.called)
+
     def test_update_health_replace_error(self):
 
         health = {
             "id": self.FAKE_UUID_1,
-            "listeners": {}
+            "listeners": {},
+            "recv_time": time.time()
         }
 
         self.session_mock.commit.side_effect = TestException('boom')
@@ -176,7 +198,8 @@ class TestUpdateHealthDb(base.TestCase):
                                   }
                 }
                 }
-            }
+            },
+            "recv_time": time.time()
         }
 
         mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@@ -226,7 +249,8 @@ class TestUpdateHealthDb(base.TestCase):
                                   }
                 }
                 }
-            }
+            },
+            "recv_time": time.time()
         }
 
         mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@@ -270,7 +294,8 @@ class TestUpdateHealthDb(base.TestCase):
                                   }
                 }
                 }
-            }
+            },
+            "recv_time": time.time()
         }
 
         mock_lb, mock_listener1, mock_pool1, mock_members = (
@@ -311,7 +336,8 @@ class TestUpdateHealthDb(base.TestCase):
                                   }
                 }
                 }
-            }
+            },
+            "recv_time": time.time()
         }
 
         mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@@ -342,7 +368,8 @@ class TestUpdateHealthDb(base.TestCase):
                     "pools": {
                         "pool-id-1": {
                             "status": constants.UP,
-                            "members": {"member-id-1": constants.DRAIN}}}}}}
+                            "members": {"member-id-1": constants.DRAIN}}}}},
+            "recv_time": time.time()}
 
         mock_lb, mock_listener1, mock_pool1, mock_member1 = (
             self._make_mock_lb_tree())
@@ -381,7 +408,8 @@ class TestUpdateHealthDb(base.TestCase):
                     "pools": {
                         "pool-id-1": {
                             "status": constants.UP,
-                            "members": {"member-id-1": constants.MAINT}}}}}}
+                            "members": {"member-id-1": constants.MAINT}}}}},
+            "recv_time": time.time()}
 
         mock_lb, mock_listener1, mock_pool1, mock_member1 = (
             self._make_mock_lb_tree())
@@ -420,7 +448,8 @@ class TestUpdateHealthDb(base.TestCase):
                     "pools": {
                         "pool-id-1": {
                             "status": constants.UP,
-                            "members": {"member-id-1": "blah"}}}}}}
+                            "members": {"member-id-1": "blah"}}}}},
+            "recv_time": time.time()}
 
         mock_lb, mock_listener1, mock_pool1, mock_member1 = (
             self._make_mock_lb_tree())
@@ -454,7 +483,8 @@ class TestUpdateHealthDb(base.TestCase):
                                   }
                 }
                 }
-            }
+            },
+            "recv_time": time.time()
         }
 
         mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@@ -497,7 +527,8 @@ class TestUpdateHealthDb(base.TestCase):
                                   }
                 }
                 }
-            }
+            },
+            "recv_time": time.time()
         }
 
         mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@@ -538,7 +569,8 @@ class TestUpdateHealthDb(base.TestCase):
                         "pool-id-1": {
                             "status": constants.UP,
                             "members": {
-                                "member-id-1": constants.UP}}}}}}
+                                "member-id-1": constants.UP}}}}},
+            "recv_time": time.time()}
 
         mock_lb, mock_listener1, mock_pool1, mock_members = (
             self._make_mock_lb_tree(members=2))
@@ -581,7 +613,8 @@ class TestUpdateHealthDb(base.TestCase):
                                   }
                 }
                 }
-            }
+            },
+            "recv_time": time.time()
         }
 
         mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@@ -631,7 +664,8 @@ class TestUpdateHealthDb(base.TestCase):
                                   }
                 }
                 }
-            }
+            },
+            "recv_time": time.time()
         }
 
         mock_lb, mock_listener1, mock_pool1, mock_member1 = (
@@ -705,7 +739,8 @@ class TestUpdateHealthDb(base.TestCase):
                         }
                     }
                 }
-            }
+            },
+            "recv_time": time.time()
         }
 
         mock_lb, mock_listener1, mock_pool1, mock_members = (
@@ -762,7 +797,8 @@ class TestUpdateHealthDb(base.TestCase):
                                   }
                 }
                 }
-            }
+            },
+            "recv_time": time.time()
         }
 
         self.hm.listener_repo.update.side_effect = (
@@ -815,7 +851,8 @@ class TestUpdateHealthDb(base.TestCase):
                         }
                     }
                 }
-            }
+            },
+            "recv_time": time.time()
         }
 
         mock_lb, mock_listener1, mock_pool1, mock_members = (
@@ -838,7 +875,8 @@ class TestUpdateHealthDb(base.TestCase):
     def test_update_health_lb_admin_down(self):
         health = {
             "id": self.FAKE_UUID_1,
-            "listeners": {}}
+            "listeners": {},
+            "recv_time": time.time()}
 
         mock_lb, mock_listener1, mock_pool1, mock_members = (
             self._make_mock_lb_tree(listener=False, pool=False))
@@ -855,7 +893,8 @@ class TestUpdateHealthDb(base.TestCase):
     def test_update_health_lb_admin_up(self):
         health = {
             "id": self.FAKE_UUID_1,
-            "listeners": {}}
+            "listeners": {},
+            "recv_time": time.time()}
 
         mock_lb, mock_listener1, mock_pool1, mock_members = (
             self._make_mock_lb_tree(listener=False, pool=False))