# Copyright (c) 2015 Red Hat Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import os import queue import sys import threading import httplib2 from oslo_config import cfg from oslo_log import log as logging from neutron._i18n import _ from neutron.agent.l3 import ha from neutron.agent.linux import daemon from neutron.agent.linux import ip_lib from neutron.agent.linux import utils as agent_utils from neutron.common import config from neutron.common import utils as common_utils from neutron.conf.agent import common as agent_config from neutron.conf.agent.l3 import ha as ha_conf from neutron.conf.agent.l3 import keepalived from neutron import privileged LOG = logging.getLogger(__name__) INITIAL_STATE_READ_TIMEOUT = 10 class KeepalivedUnixDomainConnection(agent_utils.UnixDomainHTTPConnection): def __init__(self, *args, **kwargs): # Old style super initialization is required! agent_utils.UnixDomainHTTPConnection.__init__( self, *args, **kwargs) self.socket_path = ( ha.L3AgentKeepalivedStateChangeServer. get_keepalived_state_change_socket_path(cfg.CONF)) class MonitorDaemon(daemon.Daemon): def __init__(self, pidfile, router_id, user, group, namespace, conf_dir, interface, cidr, ha_conntrackd_enabled): self.router_id = router_id self.namespace = namespace self.conf_dir = conf_dir self.interface = interface self.cidr = cidr self.ha_conntrackd_enabled = ha_conntrackd_enabled self.monitor = None self.event_stop = threading.Event() self.event_started = threading.Event() self.queue = queue.Queue() self._initial_state = None super().__init__(pidfile, uuid=router_id, user=user, group=group) @property def initial_state(self): return self._initial_state @initial_state.setter def initial_state(self, state): if not self._initial_state: LOG.debug('Initial status of router %s is %s', self.router_id, state) self._initial_state = state def run(self): self._thread_initial_state = threading.Thread( target=self.handle_initial_state) self._thread_ip_monitor = threading.Thread( target=ip_lib.ip_monitor, args=(self.namespace, self.queue, self.event_stop, self.event_started)) self._thread_read_queue = threading.Thread( target=self.read_queue, args=(self.queue, self.event_stop, self.event_started)) self._thread_initial_state.start() # NOTE(ralonsoh): if the initial status is not read in a defined # timeout, "backup" state is set. self._thread_initial_state.join(timeout=INITIAL_STATE_READ_TIMEOUT) if not self.initial_state: LOG.warning('Timeout reading the initial status of router %s, ' 'state is set to "backup".', self.router_id) self.write_state_change('backup') if self.ha_conntrackd_enabled: self.sync_conntrack('backup') self.notify_agent('backup') # NOTE(gaudenz): Only starte these threads after the initial status is # set because otherwise the initial status thread sometimes hangs. self._thread_ip_monitor.start() self._thread_read_queue.start() self._thread_read_queue.join() def read_queue(self, _queue, event_stop, event_started): event_started.wait() while not event_stop.is_set(): try: event = _queue.get(timeout=2) except queue.Empty: event = None if not event: continue if event['name'] == self.interface and event['cidr'] == self.cidr: if event['event'] == 'added': new_state = 'primary' else: new_state = 'backup' self.write_state_change(new_state) if self.ha_conntrackd_enabled: self.sync_conntrack(new_state) self.notify_agent(new_state) def handle_initial_state(self): try: state = 'backup' cidr = common_utils.ip_to_cidr(self.cidr) # NOTE(ralonsoh): "get_devices_with_ip" without passing an IP # address performs one single pyroute2 command. Because the number # of interfaces in the namespace is reduced, this is faster. for address in ip_lib.get_devices_with_ip(self.namespace): if (address['name'] == self.interface and address['cidr'] == cidr): state = 'primary' break if not self.initial_state: self.write_state_change(state) if self.ha_conntrackd_enabled: self.sync_conntrack(state) self.notify_agent(state) except Exception: if not self.initial_state: LOG.exception('Failed to get initial status of router %s', self.router_id) def write_state_change(self, state): self.initial_state = state with open(os.path.join( self.conf_dir, 'state'), 'w') as state_file: state_file.write(state) LOG.debug('Wrote router %s state %s', self.router_id, state) def notify_agent(self, state): resp, content = httplib2.Http().request( # Note that the message is sent via a Unix domain socket so that # the URL doesn't matter. 'http://127.0.0.1/', headers={'X-Neutron-Router-Id': self.router_id, 'X-Neutron-State': state, 'Connection': 'close'}, connection_type=KeepalivedUnixDomainConnection) if resp.status != 200: raise Exception(_('Unexpected response: %s') % resp) LOG.debug('Notified agent router %s, state %s', self.router_id, state) def conntrackd(self, option): execute = ip_lib.IPWrapper(namespace=self.namespace).netns.execute cmd = ['conntrackd', '-C', f'{self.conf_dir}/conntrackd.conf', option] LOG.debug('Executing "%s" on router %s', ' '.join(cmd), self.router_id) execute( cmd, run_as_root=True, # Errors are still logged, but should not crash the daemon check_exit_code=False, ) def sync_conntrack(self, state): if state == 'primary': self.sync_conntrack_primary() elif state == 'backup': self.sync_conntrack_backup() else: LOG.error('Unknown state "%s".', state) def sync_conntrack_primary(self): # commit the external cache into the kernel table self.conntrackd('-c') # flush the internal and the external caches self.conntrackd('-f') # resynchronize my internal cache to the kernel table self.conntrackd('-R') # send a bulk update to backups self.conntrackd('-B') LOG.debug('Synced connection tracking state on primary for router %s', self.router_id) def sync_conntrack_backup(self): # shorten kernel conntrack timers to remove the zombie entries. self.conntrackd('-t') # request resynchronization with primary firewall replica (if any) self.conntrackd('-n') LOG.debug('Synced connection tracking state on backup for router %s', self.router_id) def handle_sigterm(self, signum, frame): self.event_stop.set() self._thread_read_queue.join(timeout=5) super().handle_sigterm(signum, frame) def configure(conf): config.register_common_config_options() config.init(sys.argv[1:]) conf.set_override('log_dir', cfg.CONF.conf_dir) conf.set_override('use_syslog', True) config.setup_logging() privileged.default.set_client_mode(False) def main(): keepalived.register_cli_l3_agent_keepalived_opts() keepalived.register_l3_agent_keepalived_opts() ha_conf.register_l3_agent_ha_opts() agent_config.register_root_helper() configure(cfg.CONF) MonitorDaemon(cfg.CONF.pid_file, cfg.CONF.router_id, cfg.CONF.user, cfg.CONF.group, cfg.CONF.namespace, cfg.CONF.conf_dir, cfg.CONF.monitor_interface, cfg.CONF.monitor_cidr, cfg.CONF.enable_conntrackd).start()