Update amphora-agent to report UDP listener health

Currently the amphora-agent is not reporting UDP listener health
when the UDP listener does not have a pool and members.
This patch changes that behavior to report the listener as healthy
if the keepalived process is started and running in the amphora.

This patch also introduces message versioning for the health
heartbeat messages.

It also corrects a few assertEqual tests that had the reference and
actual values backwards.

Change-Id: Ifc28b4991852e59c0d27b4ab3d1afc4e9965e88b
Story: 2003592
Task: 24911
This commit is contained in:
Michael Johnson 2018-08-30 07:42:27 -07:00
parent aed0867de4
commit 2170cc6c45
5 changed files with 42 additions and 9 deletions

View File

@ -37,6 +37,14 @@ else:
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
SEQ = 0
# MSG_VER is an incrementing integer heartbeat message format version
# this allows for backward compatibility when the amphora-agent is older
# than the controller version and the message format has backwards
# incompatible changes.
#
# ver 1 - Adds UDP listener status when no pool or members are present
#
MSG_VER = 1
def list_sock_stat_files(hadir=None):
@ -115,7 +123,8 @@ def get_stats(stat_sock_file):
def build_stats_message():
global SEQ
msg = {'id': CONF.amphora_agent.amphora_id,
'seq': SEQ, "listeners": {}}
'seq': SEQ, "listeners": {},
'ver': MSG_VER}
SEQ += 1
stat_sock_files = list_sock_stat_files()
for listener_id, stat_sock_file in stat_sock_files.items():
@ -163,6 +172,7 @@ def build_stats_message():
'totconns': listener_stats['stats']['stot'],
'ereq': listener_stats['stats']['ereq']
}
udp_listener_dict['pools'] = {}
if pool_status:
udp_listener_dict['pools'] = {
pool_status['lvs']['uuid']: {

View File

@ -350,6 +350,7 @@ def get_udp_listeners_stats():
listener_id for listener_id in udp_listener_ids
if util.is_udp_listener_running(listener_id)]
ipport_mapping = dict()
listener_stats_res = dict()
for check_listener_id in need_check_listener_ids:
# resource_ipport_mapping = {'Listener': {'id': listener-id,
# 'ipport': ipport},
@ -361,6 +362,20 @@ def get_udp_listeners_stats():
# 'HealthMonitor': {'id': healthmonitor-id}}
(resource_ipport_mapping,
ns_name) = get_udp_listener_resource_ipports_nsname(check_listener_id)
# Since we found the keepalived running, acknowledge the listener
# in the heartbeat. If this listener has a pool and members,
# the stats will be updated later in the code flow.
listener_stats_res.update({
check_listener_id: {
'stats': {
'bout': 0,
'bin': 0,
'scur': 0,
'stot': 0,
'ereq': 0},
'status': constants.OPEN}})
# If we can not read the lvs configuration from file, that means
# the pool of this listener may own zero enabled member, but the
# keepalived process is running. So we need to skip it.
@ -371,7 +386,7 @@ def get_udp_listeners_stats():
# So here, if we can not get any ipport_mapping,
# we do nothing, just return
if not ipport_mapping:
return None
return listener_stats_res
# contains bout, bin, scur, stot, ereq, status
# bout(OutBytes), bin(InBytes), stot(Conns) from cmd ipvsadm -Ln --stats
@ -381,7 +396,6 @@ def get_udp_listeners_stats():
scur_res = get_ipvsadm_info(constants.AMPHORA_NAMESPACE)
stats_res = get_ipvsadm_info(constants.AMPHORA_NAMESPACE,
is_stats_cmd=True)
listener_stats_res = dict()
for listener_id, ipport in ipport_mapping.items():
listener_ipport = ipport['Listener']['ipport']
# This would be in Error, wait for the next loop to sync for the

View File

@ -128,7 +128,8 @@ SAMPLE_STATS_MSG = {
'status': 'OPEN'}
},
'id': None,
'seq': 0
'seq': 0,
'ver': health_daemon.MSG_VER
}
@ -152,7 +153,7 @@ class TestHealthDaemon(base.TestCase):
LISTENER_ID1 + '.sock',
LISTENER_ID2: BASE_PATH + '/' +
LISTENER_ID2 + '.sock'}
self.assertEqual(files, expected_files)
self.assertEqual(expected_files, files)
@mock.patch('os.kill')
@mock.patch('os.path.isfile')
@ -311,7 +312,7 @@ class TestHealthDaemon(base.TestCase):
msg = health_daemon.build_stats_message()
self.assertEqual(msg, SAMPLE_STATS_MSG)
self.assertEqual(SAMPLE_STATS_MSG, msg)
mock_get_stats.assert_any_call('TEST')
mock_get_stats.assert_any_call('TEST2')
@ -352,7 +353,7 @@ class TestHealthDaemon(base.TestCase):
msg = health_daemon.build_stats_message()
self.assertEqual(msg['listeners'][LISTENER_ID1]['pools'], {})
self.assertEqual({}, msg['listeners'][LISTENER_ID1]['pools'])
@mock.patch("octavia.amphorae.backends.utils.keepalivedlvs_query."
"get_udp_listener_pool_status")
@ -408,8 +409,10 @@ class TestHealthDaemon(base.TestCase):
'rx': 6387472, 'tx': 7490}},
udp_listener_id3: {
'status': constants.DOWN,
'pools': {},
'stats': {'conns': 0, 'totconns': 0, 'ereq': 0,
'rx': 0, 'tx': 0}}}, 'id': None, 'seq': mock.ANY}
'rx': 0, 'tx': 0}}}, 'id': None,
'seq': mock.ANY, 'ver': health_daemon.MSG_VER}
msg = health_daemon.build_stats_message()
self.assertEqual(expected, msg)

View File

@ -390,4 +390,4 @@ class LvsQueryTestCase(base.TestCase):
# Then this function will return nothing.
mock_is_running.return_value = False
res = lvs_query.get_udp_listeners_stats()
self.assertIsNone(res)
self.assertEqual({}, res)

View File

@ -0,0 +1,6 @@
---
upgrade:
- |
To enable UDP listener monitoring when no pool is attached, the amphora
image needs to be updated and load balancers with UDP listeners need
to be failed over to the new image.