Read keepalived initial state in parallel to interface monitoring

The initial router state method is now a thread that is executed
in parallel with the "ip monitor" thread. If by any circumstance
this thread does not read the interface IP addresses on time, the
state of the router will be defined as "backup".

Related-Bug: #1917793

Conflicts:
    neutron/agent/l3/keepalived_state_change.py

Change-Id: If76c6ee1734f544abdd4196431351d4328ad26fd
(cherry picked from commit c1ade52fda)
This commit is contained in:
Rodolfo Alonso Hernandez 2021-03-05 17:30:14 +00:00 committed by Slawek Kaplonski
parent 1c02b9ecf6
commit a23accea9b
2 changed files with 60 additions and 10 deletions

View File

@ -28,11 +28,13 @@ from neutron.agent.linux import daemon
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils as agent_utils
from neutron.common import config
from neutron.common import utils as common_utils
from neutron.conf.agent.l3 import keepalived
from neutron import privileged
LOG = logging.getLogger(__name__)
INITIAL_STATE_READ_TIMEOUT = 10
class KeepalivedUnixDomainConnection(agent_utils.UnixDomainHTTPConnection):
@ -57,10 +59,24 @@ class MonitorDaemon(daemon.Daemon):
self.event_stop = threading.Event()
self.event_started = threading.Event()
self.queue = queue.Queue()
self._initial_state = None
super(MonitorDaemon, self).__init__(pidfile, uuid=router_id,
user=user, group=group)
@property
def initial_state(self):
return self._initial_state
@initial_state.setter
def initial_state(self, state):
if not self._initial_state:
LOG.debug('Initial status of router %s is %s', self.router_id,
state)
self._initial_state = state
def run(self):
self._thread_initial_state = threading.Thread(
target=self.handle_initial_state)
self._thread_ip_monitor = threading.Thread(
target=ip_lib.ip_monitor,
args=(self.namespace, self.queue, self.event_stop,
@ -68,9 +84,19 @@ class MonitorDaemon(daemon.Daemon):
self._thread_read_queue = threading.Thread(
target=self.read_queue,
args=(self.queue, self.event_stop, self.event_started))
self._thread_initial_state.start()
self._thread_ip_monitor.start()
self._thread_read_queue.start()
self.handle_initial_state()
# NOTE(ralonsoh): if the initial status is not read in a defined
# timeout, "backup" state is set.
self._thread_initial_state.join(timeout=INITIAL_STATE_READ_TIMEOUT)
if not self.initial_state:
LOG.warning('Timeout reading the initial status of router %s, '
'state is set to "backup".', self.router_id)
self.write_state_change('backup')
self.notify_agent('backup')
self._thread_read_queue.join()
def read_queue(self, _queue, event_stop, event_started):
@ -100,21 +126,26 @@ class MonitorDaemon(daemon.Daemon):
def handle_initial_state(self):
try:
state = 'backup'
ip = ip_lib.IPDevice(self.interface, self.namespace)
for address in ip.addr.list():
if address.get('cidr') == self.cidr:
cidr = common_utils.ip_to_cidr(self.cidr)
# NOTE(ralonsoh): "get_devices_with_ip" without passing an IP
# address performs one single pyroute2 command. Because the number
# of interfaces in the namespace is reduced, this is faster.
for address in ip_lib.get_devices_with_ip(self.namespace):
if (address['name'] == self.interface and
address['cidr'] == cidr):
state = 'master'
break
LOG.debug('Initial status of router %s is %s',
self.router_id, state)
self.write_state_change(state)
self.notify_agent(state)
if not self.initial_state:
self.write_state_change(state)
self.notify_agent(state)
except Exception:
LOG.exception('Failed to get initial status of router %s',
self.router_id)
if not self.initial_state:
LOG.exception('Failed to get initial status of router %s',
self.router_id)
def write_state_change(self, state):
self.initial_state = state
with open(os.path.join(
self.conf_dir, 'state'), 'w') as state_file:
state_file.write(state)

View File

@ -189,3 +189,22 @@ class TestMonitorDaemon(base.BaseLoggingTestCase):
self._run_monitor()
msg = 'Initial status of router %s is %s' % (self.router_id, 'master')
self._search_in_file(self.log_file, msg)
def test_handle_initial_state_backup_error_reading_initial_status(self):
# By passing this wrong IP address, the thread "_thread_initial_state"
# will fail generating an exception (caught inside the called method).
# The main thread will timeout waiting for an initial state and
# "backup" will be set.
self.router.port.addr.add(self.cidr)
self._generate_cmd_opts(cidr='failed_IP_address')
self.ext_process = external_process.ProcessManager(
conf=None, uuid=self.router_id, namespace=self.router.namespace,
service='test_ip_mon', pids_path=self.conf_dir,
default_cmd_callback=self._callback, run_as_root=True,
pid_file=self.pid_file)
self._run_monitor()
msg = ('Timeout reading the initial status of router %s' %
self.router_id)
self._search_in_file(self.log_file, msg)
msg = 'Initial status of router %s is %s' % (self.router_id, 'backup')
self._search_in_file(self.log_file, msg)