Merge "Minimize the effect overloaded Health Manager processes"
This commit is contained in:
commit
0fc009555c
|
@ -14,6 +14,7 @@
|
|||
|
||||
from concurrent import futures
|
||||
import socket
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
@ -166,6 +167,7 @@ class UDPStatusGetter(object):
|
|||
(data, srcaddr) = self.sock.recvfrom(UDP_MAX_SIZE)
|
||||
LOG.debug('Received packet from %s', srcaddr)
|
||||
obj = status_message.unwrap_envelope(data, self.key)
|
||||
obj['recv_time'] = time.time()
|
||||
return obj, srcaddr
|
||||
|
||||
def check(self):
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
import datetime
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
@ -26,6 +27,7 @@ from octavia.controller.healthmanager import update_serializer
|
|||
from octavia.db import api as db_api
|
||||
from octavia.db import repositories as repo
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -35,7 +37,7 @@ class UpdateHealthDb(object):
|
|||
# first setup repo for amphora, listener,member(nodes),pool repo
|
||||
self.event_streamer = stevedore_driver.DriverManager(
|
||||
namespace='octavia.controller.queues',
|
||||
name=cfg.CONF.health_manager.event_streamer_driver,
|
||||
name=CONF.health_manager.event_streamer_driver,
|
||||
invoke_on_load=True).driver
|
||||
self.amphora_repo = repo.AmphoraRepository()
|
||||
self.amphora_health_repo = repo.AmphoraHealthRepository()
|
||||
|
@ -43,7 +45,7 @@ class UpdateHealthDb(object):
|
|||
self.loadbalancer_repo = repo.LoadBalancerRepository()
|
||||
self.member_repo = repo.MemberRepository()
|
||||
self.pool_repo = repo.PoolRepository()
|
||||
self.sync_prv_status = cfg.CONF.health_manager.sync_provisioning_status
|
||||
self.sync_prv_status = CONF.health_manager.sync_provisioning_status
|
||||
|
||||
def emit(self, info_type, info_id, info_obj):
|
||||
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
|
||||
|
@ -113,6 +115,19 @@ class UpdateHealthDb(object):
|
|||
if len(listeners) == expected_listener_count:
|
||||
|
||||
lock_session = db_api.get_session(autocommit=False)
|
||||
|
||||
# if we're running too far behind, warn and bail
|
||||
proc_delay = time.time() - health['recv_time']
|
||||
hb_interval = CONF.health_manager.heartbeat_interval
|
||||
if proc_delay >= hb_interval:
|
||||
LOG.warning('Amphora %(id)s health message was processed too '
|
||||
'slowly: %(delay)ss! The system may be overloaded '
|
||||
'or otherwise malfunctioning. This heartbeat has '
|
||||
'been ignored and no update was made to the '
|
||||
'amphora health entry. THIS IS NOT GOOD.',
|
||||
{'id': health['id'], 'delay': proc_delay})
|
||||
return
|
||||
|
||||
# if the input amphora is healthy, we update its db info
|
||||
try:
|
||||
self.amphora_health_repo.replace(
|
||||
|
@ -295,7 +310,7 @@ class UpdateStatsDb(stats.StatsMixin):
|
|||
super(UpdateStatsDb, self).__init__()
|
||||
self.event_streamer = stevedore_driver.DriverManager(
|
||||
namespace='octavia.controller.queues',
|
||||
name=cfg.CONF.health_manager.event_streamer_driver,
|
||||
name=CONF.health_manager.event_streamer_driver,
|
||||
invoke_on_load=True).driver
|
||||
self.repo_listener = repo.ListenerRepository()
|
||||
|
||||
|
|
|
@ -92,6 +92,7 @@ class TestHeartbeatUDP(base.TestCase):
|
|||
recvfrom.return_value = bin_msg, 2
|
||||
(obj, srcaddr) = getter.dorecv()
|
||||
self.assertEqual(2, srcaddr)
|
||||
self.assertIsNotNone(obj.pop('recv_time'))
|
||||
self.assertEqual({"testkey": "TEST"}, obj)
|
||||
|
||||
@mock.patch('socket.getaddrinfo')
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
import random
|
||||
import time
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
|
@ -115,7 +116,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||
|
@ -140,7 +142,9 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {}}
|
||||
"listeners": {},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
|
@ -150,11 +154,29 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
|
||||
def test_update_health_recv_time_stale(self):
|
||||
hb_interval = cfg.CONF.health_manager.heartbeat_interval
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {},
|
||||
"recv_time": time.time() - hb_interval - 1 # extra -1 for buffer
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
|
||||
|
||||
self.hm.update_health(health)
|
||||
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
|
||||
# Receive time is stale, so we shouldn't see this called
|
||||
self.assertFalse(self.loadbalancer_repo.update.called)
|
||||
|
||||
def test_update_health_replace_error(self):
|
||||
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {}
|
||||
"listeners": {},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
self.session_mock.commit.side_effect = TestException('boom')
|
||||
|
@ -176,7 +198,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||
|
@ -226,7 +249,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||
|
@ -270,7 +294,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
|
@ -311,7 +336,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||
|
@ -342,7 +368,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"pools": {
|
||||
"pool-id-1": {
|
||||
"status": constants.UP,
|
||||
"members": {"member-id-1": constants.DRAIN}}}}}}
|
||||
"members": {"member-id-1": constants.DRAIN}}}}},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||
self._make_mock_lb_tree())
|
||||
|
@ -381,7 +408,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"pools": {
|
||||
"pool-id-1": {
|
||||
"status": constants.UP,
|
||||
"members": {"member-id-1": constants.MAINT}}}}}}
|
||||
"members": {"member-id-1": constants.MAINT}}}}},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||
self._make_mock_lb_tree())
|
||||
|
@ -420,7 +448,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"pools": {
|
||||
"pool-id-1": {
|
||||
"status": constants.UP,
|
||||
"members": {"member-id-1": "blah"}}}}}}
|
||||
"members": {"member-id-1": "blah"}}}}},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||
self._make_mock_lb_tree())
|
||||
|
@ -454,7 +483,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||
|
@ -497,7 +527,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||
|
@ -538,7 +569,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"pool-id-1": {
|
||||
"status": constants.UP,
|
||||
"members": {
|
||||
"member-id-1": constants.UP}}}}}}
|
||||
"member-id-1": constants.UP}}}}},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(members=2))
|
||||
|
@ -581,7 +613,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||
|
@ -631,7 +664,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||
|
@ -705,7 +739,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
|
@ -762,7 +797,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
self.hm.listener_repo.update.side_effect = (
|
||||
|
@ -815,7 +851,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
|
@ -838,7 +875,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
def test_update_health_lb_admin_down(self):
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {}}
|
||||
"listeners": {},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
|
@ -855,7 +893,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
def test_update_health_lb_admin_up(self):
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {}}
|
||||
"listeners": {},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
|
|
Loading…
Reference in New Issue