diff --git a/octavia/amphorae/drivers/health/heartbeat_udp.py b/octavia/amphorae/drivers/health/heartbeat_udp.py index d395024b13..54232ae1bd 100644 --- a/octavia/amphorae/drivers/health/heartbeat_udp.py +++ b/octavia/amphorae/drivers/health/heartbeat_udp.py @@ -13,6 +13,7 @@ # under the License. import socket +import time from concurrent import futures from oslo_config import cfg @@ -166,6 +167,7 @@ class UDPStatusGetter(object): (data, srcaddr) = self.sock.recvfrom(UDP_MAX_SIZE) LOG.debug('Received packet from %s', srcaddr) obj = status_message.unwrap_envelope(data, self.key) + obj['recv_time'] = time.time() return obj, srcaddr def check(self): diff --git a/octavia/controller/healthmanager/update_db.py b/octavia/controller/healthmanager/update_db.py index 16a858e94e..9de4d7e857 100644 --- a/octavia/controller/healthmanager/update_db.py +++ b/octavia/controller/healthmanager/update_db.py @@ -13,6 +13,7 @@ # under the License. import datetime +import time from oslo_config import cfg from oslo_log import log as logging @@ -26,6 +27,7 @@ from octavia.controller.healthmanager import update_serializer from octavia.db import api as db_api from octavia.db import repositories as repo +CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -35,7 +37,7 @@ class UpdateHealthDb(object): # first setup repo for amphora, listener,member(nodes),pool repo self.event_streamer = stevedore_driver.DriverManager( namespace='octavia.controller.queues', - name=cfg.CONF.health_manager.event_streamer_driver, + name=CONF.health_manager.event_streamer_driver, invoke_on_load=True).driver self.amphora_repo = repo.AmphoraRepository() self.amphora_health_repo = repo.AmphoraHealthRepository() @@ -43,7 +45,7 @@ class UpdateHealthDb(object): self.loadbalancer_repo = repo.LoadBalancerRepository() self.member_repo = repo.MemberRepository() self.pool_repo = repo.PoolRepository() - self.sync_prv_status = cfg.CONF.health_manager.sync_provisioning_status + self.sync_prv_status = CONF.health_manager.sync_provisioning_status def emit(self, info_type, info_id, info_obj): cnt = update_serializer.InfoContainer(info_type, info_id, info_obj) @@ -121,6 +123,19 @@ class UpdateHealthDb(object): if len(listeners) == expected_listener_count: lock_session = db_api.get_session(autocommit=False) + + # if we're running too far behind, warn and bail + proc_delay = time.time() - health['recv_time'] + hb_interval = CONF.health_manager.heartbeat_interval + if proc_delay >= hb_interval: + LOG.warning('Amphora %(id)s health message was processed too ' + 'slowly: %(delay)ss! The system may be overloaded ' + 'or otherwise malfunctioning. This heartbeat has ' + 'been ignored and no update was made to the ' + 'amphora health entry. THIS IS NOT GOOD.', + {'id': health['id'], 'delay': proc_delay}) + return + # if the input amphora is healthy, we update its db info try: self.amphora_health_repo.replace( @@ -303,7 +318,7 @@ class UpdateStatsDb(stats.StatsMixin): super(UpdateStatsDb, self).__init__() self.event_streamer = stevedore_driver.DriverManager( namespace='octavia.controller.queues', - name=cfg.CONF.health_manager.event_streamer_driver, + name=CONF.health_manager.event_streamer_driver, invoke_on_load=True).driver self.repo_listener = repo.ListenerRepository() diff --git a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py index 3a849b3c52..d3fa581668 100644 --- a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py +++ b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py @@ -92,6 +92,7 @@ class TestHeartbeatUDP(base.TestCase): recvfrom.return_value = bin_msg, 2 (obj, srcaddr) = getter.dorecv() self.assertEqual(2, srcaddr) + self.assertIsNotNone(obj.pop('recv_time')) self.assertEqual({"testkey": "TEST"}, obj) @mock.patch('socket.getaddrinfo') diff --git a/octavia/tests/unit/controller/healthmanager/test_update_db.py b/octavia/tests/unit/controller/healthmanager/test_update_db.py index 2985f326f7..c792a63783 100644 --- a/octavia/tests/unit/controller/healthmanager/test_update_db.py +++ b/octavia/tests/unit/controller/healthmanager/test_update_db.py @@ -13,6 +13,7 @@ # under the License. import random +import time import mock from oslo_config import cfg @@ -115,7 +116,8 @@ class TestUpdateHealthDb(base.TestCase): } } } - } + }, + "recv_time": time.time() } mock_lb, mock_listener1, mock_pool1, mock_member1 = ( @@ -140,7 +142,9 @@ class TestUpdateHealthDb(base.TestCase): health = { "id": self.FAKE_UUID_1, - "listeners": {}} + "listeners": {}, + "recv_time": time.time() + } mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=False, pool=False)) @@ -150,11 +154,29 @@ class TestUpdateHealthDb(base.TestCase): self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) self.assertTrue(self.loadbalancer_repo.update.called) + def test_update_health_recv_time_stale(self): + hb_interval = cfg.CONF.health_manager.heartbeat_interval + health = { + "id": self.FAKE_UUID_1, + "listeners": {}, + "recv_time": time.time() - hb_interval - 1 # extra -1 for buffer + } + + mock_lb, mock_listener1, mock_pool1, mock_members = ( + self._make_mock_lb_tree(listener=False, pool=False)) + self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + + self.hm.update_health(health) + self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + # Receive time is stale, so we shouldn't see this called + self.assertFalse(self.loadbalancer_repo.update.called) + def test_update_health_replace_error(self): health = { "id": self.FAKE_UUID_1, - "listeners": {} + "listeners": {}, + "recv_time": time.time() } self.session_mock.commit.side_effect = TestException('boom') @@ -176,7 +198,8 @@ class TestUpdateHealthDb(base.TestCase): } } } - } + }, + "recv_time": time.time() } mock_lb, mock_listener1, mock_pool1, mock_member1 = ( @@ -226,7 +249,8 @@ class TestUpdateHealthDb(base.TestCase): } } } - } + }, + "recv_time": time.time() } mock_lb, mock_listener1, mock_pool1, mock_member1 = ( @@ -270,7 +294,8 @@ class TestUpdateHealthDb(base.TestCase): } } } - } + }, + "recv_time": time.time() } mock_lb, mock_listener1, mock_pool1, mock_members = ( @@ -311,7 +336,8 @@ class TestUpdateHealthDb(base.TestCase): } } } - } + }, + "recv_time": time.time() } mock_lb, mock_listener1, mock_pool1, mock_member1 = ( @@ -342,7 +368,8 @@ class TestUpdateHealthDb(base.TestCase): "pools": { "pool-id-1": { "status": constants.UP, - "members": {"member-id-1": constants.DRAIN}}}}}} + "members": {"member-id-1": constants.DRAIN}}}}}, + "recv_time": time.time()} mock_lb, mock_listener1, mock_pool1, mock_member1 = ( self._make_mock_lb_tree()) @@ -381,7 +408,8 @@ class TestUpdateHealthDb(base.TestCase): "pools": { "pool-id-1": { "status": constants.UP, - "members": {"member-id-1": constants.MAINT}}}}}} + "members": {"member-id-1": constants.MAINT}}}}}, + "recv_time": time.time()} mock_lb, mock_listener1, mock_pool1, mock_member1 = ( self._make_mock_lb_tree()) @@ -420,7 +448,8 @@ class TestUpdateHealthDb(base.TestCase): "pools": { "pool-id-1": { "status": constants.UP, - "members": {"member-id-1": "blah"}}}}}} + "members": {"member-id-1": "blah"}}}}}, + "recv_time": time.time()} mock_lb, mock_listener1, mock_pool1, mock_member1 = ( self._make_mock_lb_tree()) @@ -454,7 +483,8 @@ class TestUpdateHealthDb(base.TestCase): } } } - } + }, + "recv_time": time.time() } mock_lb, mock_listener1, mock_pool1, mock_member1 = ( @@ -497,7 +527,8 @@ class TestUpdateHealthDb(base.TestCase): } } } - } + }, + "recv_time": time.time() } mock_lb, mock_listener1, mock_pool1, mock_member1 = ( @@ -538,7 +569,8 @@ class TestUpdateHealthDb(base.TestCase): "pool-id-1": { "status": constants.UP, "members": { - "member-id-1": constants.UP}}}}}} + "member-id-1": constants.UP}}}}}, + "recv_time": time.time()} mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(members=2)) @@ -581,7 +613,8 @@ class TestUpdateHealthDb(base.TestCase): } } } - } + }, + "recv_time": time.time() } mock_lb, mock_listener1, mock_pool1, mock_member1 = ( @@ -631,7 +664,8 @@ class TestUpdateHealthDb(base.TestCase): } } } - } + }, + "recv_time": time.time() } mock_lb, mock_listener1, mock_pool1, mock_member1 = ( @@ -705,7 +739,8 @@ class TestUpdateHealthDb(base.TestCase): } } } - } + }, + "recv_time": time.time() } mock_lb, mock_listener1, mock_pool1, mock_members = ( @@ -762,7 +797,8 @@ class TestUpdateHealthDb(base.TestCase): } } } - } + }, + "recv_time": time.time() } self.hm.listener_repo.update.side_effect = ( @@ -815,7 +851,8 @@ class TestUpdateHealthDb(base.TestCase): } } } - } + }, + "recv_time": time.time() } mock_lb, mock_listener1, mock_pool1, mock_members = ( @@ -838,7 +875,8 @@ class TestUpdateHealthDb(base.TestCase): def test_update_health_lb_admin_down(self): health = { "id": self.FAKE_UUID_1, - "listeners": {}} + "listeners": {}, + "recv_time": time.time()} mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=False, pool=False)) @@ -855,7 +893,8 @@ class TestUpdateHealthDb(base.TestCase): def test_update_health_lb_admin_up(self): health = { "id": self.FAKE_UUID_1, - "listeners": {}} + "listeners": {}, + "recv_time": time.time()} mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=False, pool=False))