Merge "Read keepalived initial state in parallel to interface monitoring"
This commit is contained in:
commit
1ad9ca56b0
|
@ -27,11 +27,13 @@ from neutron.agent.linux import daemon
|
||||||
from neutron.agent.linux import ip_lib
|
from neutron.agent.linux import ip_lib
|
||||||
from neutron.agent.linux import utils as agent_utils
|
from neutron.agent.linux import utils as agent_utils
|
||||||
from neutron.common import config
|
from neutron.common import config
|
||||||
|
from neutron.common import utils as common_utils
|
||||||
from neutron.conf.agent.l3 import keepalived
|
from neutron.conf.agent.l3 import keepalived
|
||||||
from neutron import privileged
|
from neutron import privileged
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
INITIAL_STATE_READ_TIMEOUT = 10
|
||||||
|
|
||||||
|
|
||||||
class KeepalivedUnixDomainConnection(agent_utils.UnixDomainHTTPConnection):
|
class KeepalivedUnixDomainConnection(agent_utils.UnixDomainHTTPConnection):
|
||||||
|
@ -56,10 +58,24 @@ class MonitorDaemon(daemon.Daemon):
|
||||||
self.event_stop = threading.Event()
|
self.event_stop = threading.Event()
|
||||||
self.event_started = threading.Event()
|
self.event_started = threading.Event()
|
||||||
self.queue = queue.Queue()
|
self.queue = queue.Queue()
|
||||||
|
self._initial_state = None
|
||||||
super(MonitorDaemon, self).__init__(pidfile, uuid=router_id,
|
super(MonitorDaemon, self).__init__(pidfile, uuid=router_id,
|
||||||
user=user, group=group)
|
user=user, group=group)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def initial_state(self):
|
||||||
|
return self._initial_state
|
||||||
|
|
||||||
|
@initial_state.setter
|
||||||
|
def initial_state(self, state):
|
||||||
|
if not self._initial_state:
|
||||||
|
LOG.debug('Initial status of router %s is %s', self.router_id,
|
||||||
|
state)
|
||||||
|
self._initial_state = state
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
self._thread_initial_state = threading.Thread(
|
||||||
|
target=self.handle_initial_state)
|
||||||
self._thread_ip_monitor = threading.Thread(
|
self._thread_ip_monitor = threading.Thread(
|
||||||
target=ip_lib.ip_monitor,
|
target=ip_lib.ip_monitor,
|
||||||
args=(self.namespace, self.queue, self.event_stop,
|
args=(self.namespace, self.queue, self.event_stop,
|
||||||
|
@ -67,9 +83,19 @@ class MonitorDaemon(daemon.Daemon):
|
||||||
self._thread_read_queue = threading.Thread(
|
self._thread_read_queue = threading.Thread(
|
||||||
target=self.read_queue,
|
target=self.read_queue,
|
||||||
args=(self.queue, self.event_stop, self.event_started))
|
args=(self.queue, self.event_stop, self.event_started))
|
||||||
|
self._thread_initial_state.start()
|
||||||
self._thread_ip_monitor.start()
|
self._thread_ip_monitor.start()
|
||||||
self._thread_read_queue.start()
|
self._thread_read_queue.start()
|
||||||
self.handle_initial_state()
|
|
||||||
|
# NOTE(ralonsoh): if the initial status is not read in a defined
|
||||||
|
# timeout, "backup" state is set.
|
||||||
|
self._thread_initial_state.join(timeout=INITIAL_STATE_READ_TIMEOUT)
|
||||||
|
if not self.initial_state:
|
||||||
|
LOG.warning('Timeout reading the initial status of router %s, '
|
||||||
|
'state is set to "backup".', self.router_id)
|
||||||
|
self.write_state_change('backup')
|
||||||
|
self.notify_agent('backup')
|
||||||
|
|
||||||
self._thread_read_queue.join()
|
self._thread_read_queue.join()
|
||||||
|
|
||||||
def read_queue(self, _queue, event_stop, event_started):
|
def read_queue(self, _queue, event_stop, event_started):
|
||||||
|
@ -93,21 +119,26 @@ class MonitorDaemon(daemon.Daemon):
|
||||||
def handle_initial_state(self):
|
def handle_initial_state(self):
|
||||||
try:
|
try:
|
||||||
state = 'backup'
|
state = 'backup'
|
||||||
ip = ip_lib.IPDevice(self.interface, self.namespace)
|
cidr = common_utils.ip_to_cidr(self.cidr)
|
||||||
for address in ip.addr.list():
|
# NOTE(ralonsoh): "get_devices_with_ip" without passing an IP
|
||||||
if address.get('cidr') == self.cidr:
|
# address performs one single pyroute2 command. Because the number
|
||||||
|
# of interfaces in the namespace is reduced, this is faster.
|
||||||
|
for address in ip_lib.get_devices_with_ip(self.namespace):
|
||||||
|
if (address['name'] == self.interface and
|
||||||
|
address['cidr'] == cidr):
|
||||||
state = 'primary'
|
state = 'primary'
|
||||||
break
|
break
|
||||||
|
|
||||||
LOG.debug('Initial status of router %s is %s',
|
if not self.initial_state:
|
||||||
self.router_id, state)
|
self.write_state_change(state)
|
||||||
self.write_state_change(state)
|
self.notify_agent(state)
|
||||||
self.notify_agent(state)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception('Failed to get initial status of router %s',
|
if not self.initial_state:
|
||||||
self.router_id)
|
LOG.exception('Failed to get initial status of router %s',
|
||||||
|
self.router_id)
|
||||||
|
|
||||||
def write_state_change(self, state):
|
def write_state_change(self, state):
|
||||||
|
self.initial_state = state
|
||||||
with open(os.path.join(
|
with open(os.path.join(
|
||||||
self.conf_dir, 'state'), 'w') as state_file:
|
self.conf_dir, 'state'), 'w') as state_file:
|
||||||
state_file.write(state)
|
state_file.write(state)
|
||||||
|
|
|
@ -127,3 +127,22 @@ class TestMonitorDaemon(base.BaseLoggingTestCase):
|
||||||
self._run_monitor()
|
self._run_monitor()
|
||||||
msg = 'Initial status of router %s is %s' % (self.router_id, 'primary')
|
msg = 'Initial status of router %s is %s' % (self.router_id, 'primary')
|
||||||
self._search_in_file(self.log_file, msg)
|
self._search_in_file(self.log_file, msg)
|
||||||
|
|
||||||
|
def test_handle_initial_state_backup_error_reading_initial_status(self):
|
||||||
|
# By passing this wrong IP address, the thread "_thread_initial_state"
|
||||||
|
# will fail generating an exception (caught inside the called method).
|
||||||
|
# The main thread will timeout waiting for an initial state and
|
||||||
|
# "backup" will be set.
|
||||||
|
self.router.port.addr.add(self.cidr)
|
||||||
|
self._generate_cmd_opts(cidr='failed_IP_address')
|
||||||
|
self.ext_process = external_process.ProcessManager(
|
||||||
|
conf=None, uuid=self.router_id, namespace=self.router.namespace,
|
||||||
|
service='test_ip_mon', pids_path=self.conf_dir,
|
||||||
|
default_cmd_callback=self._callback, run_as_root=True,
|
||||||
|
pid_file=self.pid_file)
|
||||||
|
self._run_monitor()
|
||||||
|
msg = ('Timeout reading the initial status of router %s' %
|
||||||
|
self.router_id)
|
||||||
|
self._search_in_file(self.log_file, msg)
|
||||||
|
msg = 'Initial status of router %s is %s' % (self.router_id, 'backup')
|
||||||
|
self._search_in_file(self.log_file, msg)
|
||||||
|
|
Loading…
Reference in New Issue