Merge "Performance improvement for non-udp health checks"

This commit is contained in:
Zuul 2019-05-15 01:05:26 +00:00 committed by Gerrit Code Review
commit 0573d2c609
4 changed files with 76 additions and 23 deletions

View File

@ -75,18 +75,17 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
# need to adjust the expected listener count
# This is for backward compatibility with Rocky pre-versioning
# heartbeat amphora.
def _update_listener_count_for_UDP(self, session, lb_id,
def _update_listener_count_for_UDP(self, session, db_lb,
expected_listener_count):
lb_db_obj = self.loadbalancer_repo.get(session, id=lb_id)
# For udp listener, the udp health won't send out by amp agent.
# Once the default_pool of udp listener have the first enabled
# member, then the health will be sent out. So during this
# period, need to figure out the udp listener and ignore them
# by changing expected_listener_count.
for listener in lb_db_obj.listeners:
for list_id, list_db in db_lb.get('listeners', {}).items():
need_remove = False
if listener.protocol == constants.PROTOCOL_UDP:
if list_db['protocol'] == constants.PROTOCOL_UDP:
listener = self.listener_repo.get(session, id=list_id)
enabled_members = ([member
for member in
listener.default_pool.members
@ -139,13 +138,16 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
if 'PENDING' in db_lb['provisioning_status']:
ignore_listener_count = True
else:
# If this is a heartbeat older than versioning, handle
# UDP special for backward compatibility.
if 'ver' not in health:
expected_listener_count = (
self._update_listener_count_for_UDP(
session, db_lb['id'], expected_listener_count))
udp_listeners = [
l for k, l in db_lb.get('listeners', {}).items()
if l['protocol'] == constants.PROTOCOL_UDP]
if udp_listeners:
expected_listener_count = (
self._update_listener_count_for_UDP(
session, db_lb, expected_listener_count))
else:
# If this is not a spare amp, log and skip it.
amp = self.amphora_repo.get(session, id=health['id'])

View File

@ -1252,6 +1252,7 @@ class AmphoraRepository(BaseRepository):
"load_balancer.operating_status AS lb_op_status, "
"listener.id AS list_id, "
"listener.operating_status AS list_op_status, "
"listener.protocol AS list_protocol, "
"pool.id AS pool_id, "
"pool.operating_status AS pool_op_status, "
"member.id AS member_id, "
@ -1275,7 +1276,8 @@ class AmphoraRepository(BaseRepository):
lb['provisioning_status'] = row['lb_prov_status']
lb['operating_status'] = row['lb_op_status']
if row['list_id'] and row['list_id'] not in listeners:
listener = {'operating_status': row['list_op_status']}
listener = {'operating_status': row['list_op_status'],
'protocol': row['list_protocol']}
listeners[row['list_id']] = listener
if row['pool_id']:
if row['pool_id'] in pools and row['member_id']:

View File

@ -3273,7 +3273,8 @@ class AmphoraRepositoryTest(BaseRepositoryTest):
load_balancer_id=self.lb.id, provisioning_status=constants.ACTIVE,
enabled=True, peer_port=1025, default_pool_id=pool.id)
listener_ref = {listener.id: {'operating_status': constants.ONLINE}}
listener_ref = {listener.id: {'operating_status': constants.ONLINE,
'protocol': constants.PROTOCOL_HTTP}}
lb_ref['listeners'] = listener_ref
# Test with an LB, pool, and listener (no members)

View File

@ -110,7 +110,8 @@ class TestUpdateHealthDb(base.TestCase):
def _make_fake_lb_health_dict(self, listener=True, pool=True,
health_monitor=True, members=1,
lb_prov_status=constants.ACTIVE):
lb_prov_status=constants.ACTIVE,
listener_protocol=constants.PROTOCOL_TCP):
lb_ref = {'enabled': True, 'id': self.FAKE_UUID_1,
constants.OPERATING_STATUS: 'bogus',
@ -134,7 +135,8 @@ class TestUpdateHealthDb(base.TestCase):
if listener:
listener_ref = {'listener-id-1': {
constants.OPERATING_STATUS: 'bogus'}}
constants.OPERATING_STATUS: 'bogus',
'protocol': listener_protocol}}
lb_ref['listeners'] = listener_ref
return lb_ref
@ -330,7 +332,8 @@ class TestUpdateHealthDb(base.TestCase):
'members': {'member-id-2': {constants.OPERATING_STATUS: 'bogus'}}}
lb_ref['listeners']['listener-id-2'] = {
constants.OPERATING_STATUS: 'bogus'}
constants.OPERATING_STATUS: 'bogus',
'protocol': constants.PROTOCOL_TCP}
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
@ -891,7 +894,8 @@ class TestUpdateHealthDb(base.TestCase):
for i in [1, 2, 3, 4, 5]:
lb_ref['listeners']['listener-id-%s' % i] = {
constants.OPERATING_STATUS: 'bogus'}
constants.OPERATING_STATUS: 'bogus',
'protocol': constants.PROTOCOL_TCP}
if i == 3:
members_dict = {'member-id-3': {
@ -1110,11 +1114,14 @@ class TestUpdateHealthDb(base.TestCase):
self.loadbalancer_repo.get.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict()
lb_ref = self._make_fake_lb_health_dict(
listener_protocol=constants.PROTOCOL_UDP)
lb_ref['listeners']['listener-id-2'] = {
constants.OPERATING_STATUS: 'bogus'}
constants.OPERATING_STATUS: 'bogus',
'protocol': constants.PROTOCOL_UDP}
lb_ref['listeners']['listener-id-3'] = {
constants.OPERATING_STATUS: 'bogus'}
constants.OPERATING_STATUS: 'bogus',
'protocol': constants.PROTOCOL_UDP}
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
@ -1149,30 +1156,71 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_repo.get.called)
self.assertTrue(self.amphora_health_repo.replace.called)
def test_update_listener_count_for_UDP(self):
def test_update_health_with_without_udp_listeners(self):
health = {
"id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"status": constants.OPEN, "pools": {
"pool-id-1": {"status": constants.UP,
"members": {"member-id-1": constants.DOWN}
}
}}},
"recv_time": time.time()
}
# Test with a TCP listener
lb_ref = self._make_fake_lb_health_dict(
listener_protocol=constants.PROTOCOL_TCP)
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
# We should have no calls to listener_repo.get, because we skip
# running the extra UDP function
self.assertFalse(self.listener_repo.get.called)
# Reset the mocks to try again
self.listener_repo.reset_mock()
# Test with a UDP listener
lb_ref = self._make_fake_lb_health_dict(
listener_protocol=constants.PROTOCOL_UDP)
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
# This time we should have a call to listener_repo.get because the
# UDP helper function is triggered
self.assertTrue(self.listener_repo.get.called)
def test_update_listener_count_for_UDP(self):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
mock_listener1.protocol = constants.PROTOCOL_TCP
self.hm.loadbalancer_repo.get.return_value = mock_lb
self.hm.listener_repo.get.return_value = mock_listener1
# Test only TCP listeners
result = self.hm._update_listener_count_for_UDP('bogus_session', 1, 0)
lb_ref = self._make_fake_lb_health_dict(
listener_protocol=constants.PROTOCOL_TCP)
result = self.hm._update_listener_count_for_UDP(
'bogus_session', lb_ref, 0)
self.assertEqual(0, result)
# Test with a valid member
lb_ref = self._make_fake_lb_health_dict(
listener_protocol=constants.PROTOCOL_UDP)
mock_listener1.protocol = constants.PROTOCOL_UDP
result = self.hm._update_listener_count_for_UDP('bogus_session', 1, 1)
result = self.hm._update_listener_count_for_UDP(
'bogus_session', lb_ref, 1)
self.assertEqual(1, result)
# Test with a disabled member
mock_listener1.protocol = constants.PROTOCOL_UDP
mock_members[0].enabled = False
result = self.hm._update_listener_count_for_UDP('bogus_session', 1, 1)
result = self.hm._update_listener_count_for_UDP(
'bogus_session', lb_ref, 1)
self.assertEqual(0, result)
def test_update_status(self):