Fix invalid DOWN status when updating a UDP pool
When adding a new UDP member or a UDP-CONNECT health-monitor to a UDP
pool, there can be a race condition in the first heartbeat message
sent to the health-manager service.
This message might contain a DOWN status for a working member that
hasn't been checked yet.
This commit introduces a new member status between the amphora-agent and
the health-manager: it indicates that the UDP pool has been updated and
that the status of a member is a transitional state, preventing an
incorrect ERROR status.
Story: 2007792
Task: 40042
Change-Id: Id9e19375ebca6a720e6a85006f5e8948d3aed760
(cherry picked from commit 9fb58eb9f4
)
This commit is contained in:
parent
8fe7cff8dc
commit
ea47a0efad
|
@ -11,6 +11,7 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import ipaddress
|
import ipaddress
|
||||||
|
import os
|
||||||
import re
|
import re
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
|
@ -236,6 +237,20 @@ def get_udp_listener_pool_status(listener_id):
|
||||||
'members': {}
|
'members': {}
|
||||||
}}
|
}}
|
||||||
|
|
||||||
|
config_path = util.keepalived_lvs_cfg_path(listener_id)
|
||||||
|
pids_pathes = util.keepalived_lvs_pids_path(listener_id)
|
||||||
|
|
||||||
|
config_stat = os.stat(config_path)
|
||||||
|
check_pid_stat = os.stat(pids_pathes[2])
|
||||||
|
|
||||||
|
# Indicates that keepalived configuration has been updated but the service
|
||||||
|
# has yet to be restarted.
|
||||||
|
# NOTE: It only works if we are doing a RESTART on configuration change,
|
||||||
|
# Iaa34db6cb1dfed98e96a585c5d105e263c7efa65 forces a RESTART instead of a
|
||||||
|
# RELOAD, we need to be careful if we want to switch back to RELOAD after
|
||||||
|
# updating to a recent keepalived release.
|
||||||
|
restarting = config_stat.st_mtime > check_pid_stat.st_mtime
|
||||||
|
|
||||||
with open(util.keepalived_lvs_cfg_path(listener_id), 'r') as f:
|
with open(util.keepalived_lvs_cfg_path(listener_id), 'r') as f:
|
||||||
cfg = f.read()
|
cfg = f.read()
|
||||||
hm_enabled = len(CHECKER_REGEX.findall(cfg)) > 0
|
hm_enabled = len(CHECKER_REGEX.findall(cfg)) > 0
|
||||||
|
@ -259,7 +274,8 @@ def get_udp_listener_pool_status(listener_id):
|
||||||
if member_ip_port is None:
|
if member_ip_port is None:
|
||||||
status = constants.MAINT
|
status = constants.MAINT
|
||||||
elif member_ip_port in down_member_ip_port_set:
|
elif member_ip_port in down_member_ip_port_set:
|
||||||
status = constants.DOWN
|
status = (
|
||||||
|
constants.RESTARTING if restarting else constants.DOWN)
|
||||||
elif int(realserver_result[member_ip_port]['Weight']) == 0:
|
elif int(realserver_result[member_ip_port]['Weight']) == 0:
|
||||||
status = constants.DRAIN
|
status = constants.DRAIN
|
||||||
else:
|
else:
|
||||||
|
@ -275,7 +291,8 @@ def get_udp_listener_pool_status(listener_id):
|
||||||
if member['ipport'] is None:
|
if member['ipport'] is None:
|
||||||
member_results[member['id']] = constants.MAINT
|
member_results[member['id']] = constants.MAINT
|
||||||
elif hm_enabled:
|
elif hm_enabled:
|
||||||
member_results[member['id']] = constants.DOWN
|
member_results[member['id']] = (
|
||||||
|
constants.RESTARTING if restarting else constants.DOWN)
|
||||||
else:
|
else:
|
||||||
member_results[member['id']] = constants.NO_CHECK
|
member_results[member['id']] = constants.NO_CHECK
|
||||||
|
|
||||||
|
|
|
@ -641,6 +641,8 @@ HAPROXY_MEMBER_STATUSES = (UP, DOWN, DRAIN, MAINT, NO_CHECK)
|
||||||
# the sysctl fs.file-max fs.nr_open settings in the image
|
# the sysctl fs.file-max fs.nr_open settings in the image
|
||||||
HAPROXY_MAX_MAXCONN = 1000000
|
HAPROXY_MAX_MAXCONN = 1000000
|
||||||
|
|
||||||
|
RESTARTING = 'RESTARTING'
|
||||||
|
|
||||||
# Quota Constants
|
# Quota Constants
|
||||||
QUOTA_UNLIMITED = -1
|
QUOTA_UNLIMITED = -1
|
||||||
MIN_QUOTA = QUOTA_UNLIMITED
|
MIN_QUOTA = QUOTA_UNLIMITED
|
||||||
|
|
|
@ -431,6 +431,14 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
||||||
member_status = constants.OFFLINE
|
member_status = constants.OFFLINE
|
||||||
elif status == constants.NO_CHECK:
|
elif status == constants.NO_CHECK:
|
||||||
member_status = constants.NO_MONITOR
|
member_status = constants.NO_MONITOR
|
||||||
|
elif status == constants.RESTARTING:
|
||||||
|
# RESTARTING means that keepalived is restarting and a down
|
||||||
|
# member has been detected, the real status of the member
|
||||||
|
# is not clear, it might mean that the checker hasn't run
|
||||||
|
# yet.
|
||||||
|
# In this case, keep previous member_status, and wait for a
|
||||||
|
# non-transitional status.
|
||||||
|
pass
|
||||||
else:
|
else:
|
||||||
LOG.warning('Member %(mem)s reported '
|
LOG.warning('Member %(mem)s reported '
|
||||||
'status of %(status)s',
|
'status of %(status)s',
|
||||||
|
|
|
@ -298,8 +298,15 @@ class LvsQueryTestCase(base.TestCase):
|
||||||
self.disabled_listener_id)
|
self.disabled_listener_id)
|
||||||
self.assertEqual((None, constants.AMPHORA_NAMESPACE), res)
|
self.assertEqual((None, constants.AMPHORA_NAMESPACE), res)
|
||||||
|
|
||||||
|
@mock.patch('os.stat')
|
||||||
@mock.patch('subprocess.check_output')
|
@mock.patch('subprocess.check_output')
|
||||||
def test_get_udp_listener_pool_status(self, mock_check_output):
|
def test_get_udp_listener_pool_status(self, mock_check_output,
|
||||||
|
mock_os_stat):
|
||||||
|
mock_os_stat.side_effect = (
|
||||||
|
mock.Mock(st_mtime=1234),
|
||||||
|
mock.Mock(st_mtime=1234),
|
||||||
|
)
|
||||||
|
|
||||||
# test with ipv4 and ipv6
|
# test with ipv4 and ipv6
|
||||||
mock_check_output.return_value = KERNAL_FILE_SAMPLE_V4
|
mock_check_output.return_value = KERNAL_FILE_SAMPLE_V4
|
||||||
res = lvs_query.get_udp_listener_pool_status(self.listener_id_v4)
|
res = lvs_query.get_udp_listener_pool_status(self.listener_id_v4)
|
||||||
|
@ -313,6 +320,11 @@ class LvsQueryTestCase(base.TestCase):
|
||||||
self.member_id4_v4: constants.MAINT}}}
|
self.member_id4_v4: constants.MAINT}}}
|
||||||
self.assertEqual(expected, res)
|
self.assertEqual(expected, res)
|
||||||
|
|
||||||
|
mock_os_stat.side_effect = (
|
||||||
|
mock.Mock(st_mtime=1234),
|
||||||
|
mock.Mock(st_mtime=1234),
|
||||||
|
)
|
||||||
|
|
||||||
mock_check_output.return_value = KERNAL_FILE_SAMPLE_V6
|
mock_check_output.return_value = KERNAL_FILE_SAMPLE_V6
|
||||||
res = lvs_query.get_udp_listener_pool_status(self.listener_id_v6)
|
res = lvs_query.get_udp_listener_pool_status(self.listener_id_v6)
|
||||||
expected = {
|
expected = {
|
||||||
|
@ -325,6 +337,28 @@ class LvsQueryTestCase(base.TestCase):
|
||||||
self.member_id4_v6: constants.MAINT}}}
|
self.member_id4_v6: constants.MAINT}}}
|
||||||
self.assertEqual(expected, res)
|
self.assertEqual(expected, res)
|
||||||
|
|
||||||
|
@mock.patch('os.stat')
|
||||||
|
@mock.patch('subprocess.check_output')
|
||||||
|
def test_get_udp_listener_pool_status_restarting(self, mock_check_output,
|
||||||
|
mock_os_stat):
|
||||||
|
mock_os_stat.side_effect = (
|
||||||
|
mock.Mock(st_mtime=1234), # config file
|
||||||
|
mock.Mock(st_mtime=1220), # pid file
|
||||||
|
)
|
||||||
|
|
||||||
|
# test with ipv4 and ipv6
|
||||||
|
mock_check_output.return_value = KERNAL_FILE_SAMPLE_V4
|
||||||
|
res = lvs_query.get_udp_listener_pool_status(self.listener_id_v4)
|
||||||
|
expected = {
|
||||||
|
'lvs':
|
||||||
|
{'uuid': self.pool_id_v4,
|
||||||
|
'status': constants.UP,
|
||||||
|
'members': {self.member_id1_v4: constants.UP,
|
||||||
|
self.member_id2_v4: constants.UP,
|
||||||
|
self.member_id3_v4: constants.RESTARTING,
|
||||||
|
self.member_id4_v4: constants.MAINT}}}
|
||||||
|
self.assertEqual(expected, res)
|
||||||
|
|
||||||
@mock.patch('octavia.amphorae.backends.utils.keepalivedlvs_query.'
|
@mock.patch('octavia.amphorae.backends.utils.keepalivedlvs_query.'
|
||||||
'get_udp_listener_resource_ipports_nsname')
|
'get_udp_listener_resource_ipports_nsname')
|
||||||
def test_get_udp_listener_pool_status_when_no_pool(
|
def test_get_udp_listener_pool_status_when_no_pool(
|
||||||
|
@ -366,14 +400,19 @@ class LvsQueryTestCase(base.TestCase):
|
||||||
}}
|
}}
|
||||||
self.assertEqual(expected, res)
|
self.assertEqual(expected, res)
|
||||||
|
|
||||||
|
@mock.patch('os.stat')
|
||||||
@mock.patch('octavia.amphorae.backends.utils.keepalivedlvs_query.'
|
@mock.patch('octavia.amphorae.backends.utils.keepalivedlvs_query.'
|
||||||
'get_listener_realserver_mapping')
|
'get_listener_realserver_mapping')
|
||||||
def test_get_udp_listener_pool_status_when_not_get_realserver_result(
|
def test_get_udp_listener_pool_status_when_not_get_realserver_result(
|
||||||
self, mock_get_mapping):
|
self, mock_get_mapping, mock_os_stat):
|
||||||
# This will hit if the kernel lvs file (/proc/net/ip_vs)
|
# This will hit if the kernel lvs file (/proc/net/ip_vs)
|
||||||
# lose its content. So at this moment, eventhough we configure the
|
# lose its content. So at this moment, eventhough we configure the
|
||||||
# pool and member into udp keepalived config file, we have to set
|
# pool and member into udp keepalived config file, we have to set
|
||||||
# ths status of pool and its members to DOWN.
|
# ths status of pool and its members to DOWN.
|
||||||
|
mock_os_stat.side_effect = (
|
||||||
|
mock.Mock(st_mtime=1234),
|
||||||
|
mock.Mock(st_mtime=1234),
|
||||||
|
)
|
||||||
mock_get_mapping.return_value = (False, {})
|
mock_get_mapping.return_value = (False, {})
|
||||||
res = lvs_query.get_udp_listener_pool_status(self.listener_id_v4)
|
res = lvs_query.get_udp_listener_pool_status(self.listener_id_v4)
|
||||||
expected = {
|
expected = {
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Fix a potential invalid DOWN operating status for members of a UDP pool.
|
||||||
|
A race condition could have occured when building the first heartbeat
|
||||||
|
message after adding a new member in a pool, this recently added member
|
||||||
|
could have been seen as DOWN.
|
Loading…
Reference in New Issue