Merge "Replace keepalived notifier bash script with Python ip monitor"
This commit is contained in:
commit
0daf9b3e73
@ -50,3 +50,6 @@ kill_keepalived: KillFilter, root, /usr/sbin/keepalived, -HUP, -15, -9
|
||||
|
||||
# l3 agent to delete floatingip's conntrack state
|
||||
conntrack: CommandFilter, conntrack, root
|
||||
|
||||
# keepalived state change monitor
|
||||
keepalived_state_change: CommandFilter, neutron-keepalived-state-change, root
|
||||
|
@ -15,17 +15,20 @@
|
||||
|
||||
import os
|
||||
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import webob
|
||||
|
||||
from neutron.agent.linux import keepalived
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.agent.linux import utils as agent_utils
|
||||
from neutron.common import constants as l3_constants
|
||||
from neutron.i18n import _LE
|
||||
from neutron.i18n import _LE, _LI
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
HA_DEV_PREFIX = 'ha-'
|
||||
KEEPALIVED_STATE_CHANGE_SERVER_BACKLOG = 4096
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('ha_confs_path',
|
||||
@ -45,14 +48,83 @@ OPTS = [
|
||||
]
|
||||
|
||||
|
||||
class KeepalivedStateChangeHandler(object):
|
||||
def __init__(self, agent):
|
||||
self.agent = agent
|
||||
|
||||
@webob.dec.wsgify(RequestClass=webob.Request)
|
||||
def __call__(self, req):
|
||||
router_id = req.headers['X-Neutron-Router-Id']
|
||||
state = req.headers['X-Neutron-State']
|
||||
self.enqueue(router_id, state)
|
||||
|
||||
def enqueue(self, router_id, state):
|
||||
LOG.debug('Handling notification for router '
|
||||
'%(router_id)s, state %(state)s', {'router_id': router_id,
|
||||
'state': state})
|
||||
self.agent.enqueue_state_change(router_id, state)
|
||||
|
||||
|
||||
class L3AgentKeepalivedStateChangeServer(object):
|
||||
def __init__(self, agent, conf):
|
||||
self.agent = agent
|
||||
self.conf = conf
|
||||
|
||||
agent_utils.ensure_directory_exists_without_file(
|
||||
self.get_keepalived_state_change_socket_path(self.conf))
|
||||
|
||||
@classmethod
|
||||
def get_keepalived_state_change_socket_path(cls, conf):
|
||||
return os.path.join(conf.state_path, 'keepalived-state-change')
|
||||
|
||||
def run(self):
|
||||
server = agent_utils.UnixDomainWSGIServer(
|
||||
'neutron-keepalived-state-change')
|
||||
server.start(KeepalivedStateChangeHandler(self.agent),
|
||||
self.get_keepalived_state_change_socket_path(self.conf),
|
||||
workers=0,
|
||||
backlog=KEEPALIVED_STATE_CHANGE_SERVER_BACKLOG)
|
||||
server.wait()
|
||||
|
||||
|
||||
class AgentMixin(object):
|
||||
def __init__(self, host):
|
||||
self._init_ha_conf_path()
|
||||
super(AgentMixin, self).__init__(host)
|
||||
eventlet.spawn(self._start_keepalived_notifications_server)
|
||||
|
||||
def _start_keepalived_notifications_server(self):
|
||||
state_change_server = (
|
||||
L3AgentKeepalivedStateChangeServer(self, self.conf))
|
||||
state_change_server.run()
|
||||
|
||||
def enqueue_state_change(self, router_id, state):
|
||||
LOG.info(_LI('Router %(router_id)s transitioned to %(state)s'),
|
||||
{'router_id': router_id,
|
||||
'state': state})
|
||||
self._update_metadata_proxy(router_id, state)
|
||||
|
||||
def _update_metadata_proxy(self, router_id, state):
|
||||
try:
|
||||
ri = self.router_info[router_id]
|
||||
except AttributeError:
|
||||
LOG.info(_LI('Router %s is not managed by this agent. It was '
|
||||
'possibly deleted concurrently.'), router_id)
|
||||
return
|
||||
|
||||
if state == 'master':
|
||||
LOG.debug('Spawning metadata proxy for router %s', router_id)
|
||||
self.metadata_driver.spawn_monitored_metadata_proxy(
|
||||
self.process_monitor, ri.ns_name, self.conf.metadata_port,
|
||||
self.conf, router_id=ri.router_id)
|
||||
else:
|
||||
LOG.debug('Closing metadata proxy for router %s', router_id)
|
||||
self.metadata_driver.destroy_monitored_metadata_proxy(
|
||||
self.process_monitor, ri.router_id, ri.ns_name, self.conf)
|
||||
|
||||
def _init_ha_conf_path(self):
|
||||
ha_full_path = os.path.dirname("/%s/" % self.conf.ha_confs_path)
|
||||
utils.ensure_dir(ha_full_path)
|
||||
agent_utils.ensure_dir(ha_full_path)
|
||||
|
||||
def process_ha_router_added(self, ri):
|
||||
ha_port = ri.router.get(l3_constants.HA_INTERFACE_KEY)
|
||||
@ -69,7 +141,9 @@ class AgentMixin(object):
|
||||
ha_port['ip_cidr'],
|
||||
ha_port['mac_address'])
|
||||
|
||||
ri._add_keepalived_notifiers()
|
||||
ri.update_initial_state(self.enqueue_state_change)
|
||||
ri.spawn_state_change_monitor(self.process_monitor)
|
||||
|
||||
def process_ha_router_removed(self, ri):
|
||||
ri.destroy_state_change_monitor(self.process_monitor)
|
||||
ri.ha_network_removed()
|
||||
|
@ -12,21 +12,23 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import signal
|
||||
|
||||
import netaddr
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron.agent.l3 import router_info as router
|
||||
from neutron.agent.linux import external_process
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import keepalived
|
||||
from neutron.agent.metadata import driver as metadata_driver
|
||||
from neutron.common import constants as n_consts
|
||||
from neutron.common import utils as common_utils
|
||||
from neutron.i18n import _LE
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
HA_DEV_PREFIX = 'ha-'
|
||||
IP_MONITOR_PROCESS_SERVICE = 'ip_monitor'
|
||||
|
||||
|
||||
class HaRouter(router.RouterInfo):
|
||||
@ -69,6 +71,18 @@ class HaRouter(router.RouterInfo):
|
||||
LOG.debug('Error while reading HA state for %s', self.router_id)
|
||||
return None
|
||||
|
||||
@ha_state.setter
|
||||
def ha_state(self, new_state):
|
||||
self._verify_ha()
|
||||
ha_state_path = self.keepalived_manager._get_full_config_file_path(
|
||||
'state')
|
||||
try:
|
||||
with open(ha_state_path, 'w') as f:
|
||||
f.write(new_state)
|
||||
except (OSError, IOError):
|
||||
LOG.error(_LE('Error while writing HA state for %s'),
|
||||
self.router_id)
|
||||
|
||||
def _init_keepalived_manager(self, process_monitor):
|
||||
self.keepalived_manager = keepalived.KeepalivedManager(
|
||||
self.router['id'],
|
||||
@ -107,27 +121,6 @@ class HaRouter(router.RouterInfo):
|
||||
conf_dir = self.keepalived_manager.get_conf_dir()
|
||||
shutil.rmtree(conf_dir)
|
||||
|
||||
def _add_keepalived_notifiers(self):
|
||||
callback = (
|
||||
metadata_driver.MetadataDriver._get_metadata_proxy_callback(
|
||||
self.agent_conf.metadata_port, self.agent_conf,
|
||||
router_id=self.router_id))
|
||||
# TODO(mangelajo): use the process monitor in keepalived when
|
||||
# keepalived stops killing/starting metadata
|
||||
# proxy on its own
|
||||
pm = (
|
||||
metadata_driver.MetadataDriver.
|
||||
_get_metadata_proxy_process_manager(self.router_id,
|
||||
self.ns_name,
|
||||
self.agent_conf))
|
||||
pid = pm.get_pid_file_name()
|
||||
self.keepalived_manager.add_notifier(
|
||||
callback(pid), 'master', self.ha_vr_id)
|
||||
for state in ('backup', 'fault'):
|
||||
self.keepalived_manager.add_notifier(
|
||||
['kill', '-%s' % signal.SIGKILL,
|
||||
'$(cat ' + pid + ')'], state, self.ha_vr_id)
|
||||
|
||||
def _get_keepalived_instance(self):
|
||||
return self.keepalived_manager.config.get_instance(self.ha_vr_id)
|
||||
|
||||
@ -276,3 +269,53 @@ class HaRouter(router.RouterInfo):
|
||||
|
||||
interface_name = self.get_internal_device_name(port['id'])
|
||||
self._clear_vips(interface_name)
|
||||
|
||||
def _get_state_change_monitor_process_manager(self):
|
||||
return external_process.ProcessManager(
|
||||
self.agent_conf,
|
||||
'%s.monitor' % self.router_id,
|
||||
self.ns_name,
|
||||
default_cmd_callback=self._get_state_change_monitor_callback())
|
||||
|
||||
def _get_state_change_monitor_callback(self):
|
||||
ha_device = self.get_ha_device_name(self.ha_port['id'])
|
||||
ha_cidr = self._get_primary_vip()
|
||||
|
||||
def callback(pid_file):
|
||||
cmd = [
|
||||
'neutron-keepalived-state-change',
|
||||
'--router_id=%s' % self.router_id,
|
||||
'--namespace=%s' % self.ns_name,
|
||||
'--conf_dir=%s' % self.keepalived_manager.get_conf_dir(),
|
||||
'--monitor_interface=%s' % ha_device,
|
||||
'--monitor_cidr=%s' % ha_cidr,
|
||||
'--pid_file=%s' % pid_file,
|
||||
'--state_path=%s' % self.agent_conf.state_path,
|
||||
'--user=%s' % os.geteuid(),
|
||||
'--group=%s' % os.getegid()]
|
||||
return cmd
|
||||
|
||||
return callback
|
||||
|
||||
def spawn_state_change_monitor(self, process_monitor):
|
||||
pm = self._get_state_change_monitor_process_manager()
|
||||
pm.enable()
|
||||
process_monitor.register(
|
||||
self.router_id, IP_MONITOR_PROCESS_SERVICE, pm)
|
||||
|
||||
def destroy_state_change_monitor(self, process_monitor):
|
||||
pm = self._get_state_change_monitor_process_manager()
|
||||
process_monitor.unregister(
|
||||
self.router_id, IP_MONITOR_PROCESS_SERVICE)
|
||||
pm.disable()
|
||||
|
||||
def update_initial_state(self, callback):
|
||||
ha_device = ip_lib.IPDevice(
|
||||
self.get_ha_device_name(self.ha_port['id']),
|
||||
self.ns_name)
|
||||
addresses = ha_device.addr.list()
|
||||
cidrs = (address['cidr'] for address in addresses)
|
||||
ha_cidr = self._get_primary_vip()
|
||||
state = 'master' if ha_cidr in cidrs else 'backup'
|
||||
self.ha_state = state
|
||||
callback(self.router_id, state)
|
||||
|
144
neutron/agent/l3/keepalived_state_change.py
Normal file
144
neutron/agent/l3/keepalived_state_change.py
Normal file
@ -0,0 +1,144 @@
|
||||
# Copyright (c) 2015 Red Hat Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
import httplib2
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import requests
|
||||
|
||||
from neutron.agent.l3 import ha
|
||||
from neutron.agent.linux import daemon
|
||||
from neutron.agent.linux import ip_monitor
|
||||
from neutron.agent.linux import utils as agent_utils
|
||||
from neutron.common import config
|
||||
from neutron.i18n import _LE
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KeepalivedUnixDomainConnection(agent_utils.UnixDomainHTTPConnection):
|
||||
def __init__(self, *args, **kwargs):
|
||||
# Old style super initialization is required!
|
||||
agent_utils.UnixDomainHTTPConnection.__init__(
|
||||
self, *args, **kwargs)
|
||||
self.socket_path = (
|
||||
ha.L3AgentKeepalivedStateChangeServer.
|
||||
get_keepalived_state_change_socket_path(cfg.CONF))
|
||||
|
||||
|
||||
class MonitorDaemon(daemon.Daemon):
|
||||
def __init__(self, pidfile, router_id, user, group, namespace, conf_dir,
|
||||
interface, cidr):
|
||||
self.router_id = router_id
|
||||
self.namespace = namespace
|
||||
self.conf_dir = conf_dir
|
||||
self.interface = interface
|
||||
self.cidr = cidr
|
||||
super(MonitorDaemon, self).__init__(pidfile, uuid=router_id,
|
||||
user=user, group=group)
|
||||
|
||||
def run(self, run_as_root=False):
|
||||
monitor = ip_monitor.IPMonitor(namespace=self.namespace,
|
||||
run_as_root=run_as_root)
|
||||
monitor.start()
|
||||
# Only drop privileges if the process is currently running as root
|
||||
# (The run_as_root variable name here is unfortunate - It means to
|
||||
# use a root helper when the running process is NOT already running
|
||||
# as root
|
||||
if not run_as_root:
|
||||
super(MonitorDaemon, self).run()
|
||||
for iterable in monitor:
|
||||
self.parse_and_handle_event(iterable)
|
||||
|
||||
def parse_and_handle_event(self, iterable):
|
||||
try:
|
||||
event = ip_monitor.IPMonitorEvent.from_text(iterable)
|
||||
if event.interface == self.interface and event.cidr == self.cidr:
|
||||
new_state = 'master' if event.added else 'backup'
|
||||
self.write_state_change(new_state)
|
||||
self.notify_agent(new_state)
|
||||
except Exception:
|
||||
LOG.exception(_LE(
|
||||
'Failed to process or handle event for line %s'), iterable)
|
||||
|
||||
def write_state_change(self, state):
|
||||
with open(os.path.join(
|
||||
self.conf_dir, 'state'), 'w') as state_file:
|
||||
state_file.write(state)
|
||||
LOG.debug('Wrote router %s state %s', self.router_id, state)
|
||||
|
||||
def notify_agent(self, state):
|
||||
resp, content = httplib2.Http().request(
|
||||
# Note that the message is sent via a Unix domain socket so that
|
||||
# the URL doesn't matter.
|
||||
'http://127.0.0.1/',
|
||||
headers={'X-Neutron-Router-Id': self.router_id,
|
||||
'X-Neutron-State': state},
|
||||
connection_type=KeepalivedUnixDomainConnection)
|
||||
|
||||
if resp.status != requests.codes.ok:
|
||||
raise Exception(_('Unexpected response: %s') % resp)
|
||||
|
||||
LOG.debug('Notified agent router %s, state %s', self.router_id, state)
|
||||
|
||||
|
||||
def register_opts(conf):
|
||||
conf.register_cli_opt(
|
||||
cfg.StrOpt('router_id', help=_('ID of the router')))
|
||||
conf.register_cli_opt(
|
||||
cfg.StrOpt('namespace', help=_('Namespace of the router')))
|
||||
conf.register_cli_opt(
|
||||
cfg.StrOpt('conf_dir', help=_('Path to the router directory')))
|
||||
conf.register_cli_opt(
|
||||
cfg.StrOpt('monitor_interface', help=_('Interface to monitor')))
|
||||
conf.register_cli_opt(
|
||||
cfg.StrOpt('monitor_cidr', help=_('CIDR to monitor')))
|
||||
conf.register_cli_opt(
|
||||
cfg.StrOpt('pid_file', help=_('Path to PID file for this process')))
|
||||
conf.register_cli_opt(
|
||||
cfg.StrOpt('user', help=_('User (uid or name) running this process '
|
||||
'after its initialization')))
|
||||
conf.register_cli_opt(
|
||||
cfg.StrOpt('group', help=_('Group (gid or name) running this process '
|
||||
'after its initialization')))
|
||||
conf.register_opt(
|
||||
cfg.StrOpt('metadata_proxy_socket',
|
||||
default='$state_path/metadata_proxy',
|
||||
help=_('Location of Metadata Proxy UNIX domain '
|
||||
'socket')))
|
||||
|
||||
|
||||
def configure(conf):
|
||||
config.init(sys.argv[1:])
|
||||
conf.set_override('log_dir', cfg.CONF.conf_dir)
|
||||
conf.set_override('debug', True)
|
||||
conf.set_override('verbose', True)
|
||||
config.setup_logging()
|
||||
|
||||
|
||||
def main():
|
||||
register_opts(cfg.CONF)
|
||||
configure(cfg.CONF)
|
||||
MonitorDaemon(cfg.CONF.pid_file,
|
||||
cfg.CONF.router_id,
|
||||
cfg.CONF.user,
|
||||
cfg.CONF.group,
|
||||
cfg.CONF.namespace,
|
||||
cfg.CONF.conf_dir,
|
||||
cfg.CONF.monitor_interface,
|
||||
cfg.CONF.monitor_cidr).start()
|
@ -60,7 +60,7 @@ class IPMonitor(async_process.AsyncProcess):
|
||||
"""Wrapper over `ip monitor address`.
|
||||
|
||||
To monitor and react indefinitely:
|
||||
m = IPMonitor(namespace='tmp')
|
||||
m = IPMonitor(namespace='tmp', root_as_root=True)
|
||||
m.start()
|
||||
for iterable in m:
|
||||
event = IPMonitorEvent.from_text(iterable)
|
||||
@ -69,9 +69,10 @@ class IPMonitor(async_process.AsyncProcess):
|
||||
|
||||
def __init__(self,
|
||||
namespace=None,
|
||||
run_as_root=True,
|
||||
respawn_interval=None):
|
||||
super(IPMonitor, self).__init__(['ip', '-o', 'monitor', 'address'],
|
||||
run_as_root=True,
|
||||
run_as_root=run_as_root,
|
||||
respawn_interval=respawn_interval,
|
||||
namespace=namespace)
|
||||
|
||||
|
@ -15,7 +15,6 @@
|
||||
import errno
|
||||
import itertools
|
||||
import os
|
||||
import stat
|
||||
|
||||
import netaddr
|
||||
from oslo_config import cfg
|
||||
@ -26,7 +25,6 @@ from neutron.agent.linux import utils
|
||||
from neutron.common import exceptions
|
||||
|
||||
VALID_STATES = ['MASTER', 'BACKUP']
|
||||
VALID_NOTIFY_STATES = ['master', 'backup', 'fault']
|
||||
VALID_AUTH_TYPES = ['AH', 'PASS']
|
||||
HA_DEFAULT_PRIORITY = 50
|
||||
PRIMARY_VIP_RANGE_SIZE = 24
|
||||
@ -69,16 +67,6 @@ class InvalidInstanceStateException(exceptions.NeutronException):
|
||||
super(InvalidInstanceStateException, self).__init__(**kwargs)
|
||||
|
||||
|
||||
class InvalidNotifyStateException(exceptions.NeutronException):
|
||||
message = _('Invalid notify state: %(state)s, valid states are: '
|
||||
'%(valid_notify_states)s')
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if 'valid_notify_states' not in kwargs:
|
||||
kwargs['valid_notify_states'] = ', '.join(VALID_NOTIFY_STATES)
|
||||
super(InvalidNotifyStateException, self).__init__(**kwargs)
|
||||
|
||||
|
||||
class InvalidAuthenticationTypeException(exceptions.NeutronException):
|
||||
message = _('Invalid authentication type: %(auth_type)s, '
|
||||
'valid types are: %(valid_auth_types)s')
|
||||
@ -141,7 +129,6 @@ class KeepalivedInstance(object):
|
||||
self.vips = []
|
||||
self.virtual_routes = []
|
||||
self.authentication = None
|
||||
self.notifiers = []
|
||||
metadata_cidr = '169.254.169.254/32'
|
||||
self.primary_vip_range = get_free_range(
|
||||
parent_range='169.254.0.0/16',
|
||||
@ -174,11 +161,6 @@ class KeepalivedInstance(object):
|
||||
return [vip.ip_address for vip in self.vips
|
||||
if vip.interface_name == interface_name]
|
||||
|
||||
def set_notify(self, state, path):
|
||||
if state not in VALID_NOTIFY_STATES:
|
||||
raise InvalidNotifyStateException(state=state)
|
||||
self.notifiers.append((state, path))
|
||||
|
||||
def _build_track_interface_config(self):
|
||||
return itertools.chain(
|
||||
[' track_interface {'],
|
||||
@ -234,10 +216,6 @@ class KeepalivedInstance(object):
|
||||
for route in self.virtual_routes),
|
||||
[' }'])
|
||||
|
||||
def _build_notify_scripts(self):
|
||||
return itertools.chain((' notify_%s "%s"' % (state, path)
|
||||
for state, path in self.notifiers))
|
||||
|
||||
def build_config(self):
|
||||
config = ['vrrp_instance %s {' % self.name,
|
||||
' state %s' % self.state,
|
||||
@ -270,9 +248,6 @@ class KeepalivedInstance(object):
|
||||
if self.virtual_routes:
|
||||
config.extend(self._build_virtual_routes_config())
|
||||
|
||||
if self.notifiers:
|
||||
config.extend(self._build_notify_scripts())
|
||||
|
||||
config.append('}')
|
||||
|
||||
return config
|
||||
@ -309,53 +284,7 @@ class KeepalivedConf(object):
|
||||
return '\n'.join(self.build_config())
|
||||
|
||||
|
||||
class KeepalivedNotifierMixin(object):
|
||||
def _get_notifier_path(self, state):
|
||||
return self._get_full_config_file_path('notify_%s.sh' % state)
|
||||
|
||||
def _write_notify_script(self, state, script):
|
||||
name = self._get_notifier_path(state)
|
||||
utils.replace_file(name, script)
|
||||
st = os.stat(name)
|
||||
os.chmod(name, st.st_mode | stat.S_IEXEC)
|
||||
|
||||
return name
|
||||
|
||||
def _prepend_shebang(self, script):
|
||||
return '#!/bin/sh\n%s' % script
|
||||
|
||||
def _append_state(self, script, state):
|
||||
state_path = self._get_full_config_file_path('state')
|
||||
return '%s\necho -n %s > %s' % (script, state, state_path)
|
||||
|
||||
def add_notifier(self, script, state, vrouter_id):
|
||||
"""Add a master, backup or fault notifier.
|
||||
|
||||
These notifiers are executed when keepalived invokes a state
|
||||
transition. Write a notifier to disk and add it to the
|
||||
configuration.
|
||||
"""
|
||||
|
||||
script_with_prefix = self._prepend_shebang(' '.join(script))
|
||||
full_script = self._append_state(script_with_prefix, state)
|
||||
self._write_notify_script(state, full_script)
|
||||
|
||||
vr_instance = self.config.get_instance(vrouter_id)
|
||||
vr_instance.set_notify(state, self._get_notifier_path(state))
|
||||
|
||||
def get_conf_dir(self):
|
||||
confs_dir = os.path.abspath(os.path.normpath(self.conf_path))
|
||||
conf_dir = os.path.join(confs_dir, self.resource_id)
|
||||
return conf_dir
|
||||
|
||||
def _get_full_config_file_path(self, filename, ensure_conf_dir=True):
|
||||
conf_dir = self.get_conf_dir()
|
||||
if ensure_conf_dir:
|
||||
utils.ensure_dir(conf_dir)
|
||||
return os.path.join(conf_dir, filename)
|
||||
|
||||
|
||||
class KeepalivedManager(KeepalivedNotifierMixin):
|
||||
class KeepalivedManager(object):
|
||||
"""Wrapper for keepalived.
|
||||
|
||||
This wrapper permits to write keepalived config files, to start/restart
|
||||
@ -372,6 +301,17 @@ class KeepalivedManager(KeepalivedNotifierMixin):
|
||||
self.conf_path = conf_path
|
||||
self.process = None
|
||||
|
||||
def get_conf_dir(self):
|
||||
confs_dir = os.path.abspath(os.path.normpath(self.conf_path))
|
||||
conf_dir = os.path.join(confs_dir, self.resource_id)
|
||||
return conf_dir
|
||||
|
||||
def _get_full_config_file_path(self, filename, ensure_conf_dir=True):
|
||||
conf_dir = self.get_conf_dir()
|
||||
if ensure_conf_dir:
|
||||
utils.ensure_dir(conf_dir)
|
||||
return os.path.join(conf_dir, filename)
|
||||
|
||||
def _output_config_file(self):
|
||||
config_str = self.config.get_config_str()
|
||||
config_path = self._get_full_config_file_path('keepalived.conf')
|
||||
|
19
neutron/cmd/keepalived_state_change.py
Normal file
19
neutron/cmd/keepalived_state_change.py
Normal file
@ -0,0 +1,19 @@
|
||||
# Copyright (c) 2015 Red Hat Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron.agent.l3 import keepalived_state_change
|
||||
|
||||
|
||||
def main():
|
||||
keepalived_state_change.main()
|
0
neutron/tests/functional/agent/l3/__init__.py
Normal file
0
neutron/tests/functional/agent/l3/__init__.py
Normal file
@ -0,0 +1,73 @@
|
||||
# Copyright (c) 2015 Red Hat Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
|
||||
from neutron.agent.l3 import keepalived_state_change
|
||||
from neutron.openstack.common import uuidutils
|
||||
from neutron.tests.functional import base
|
||||
|
||||
|
||||
class TestKeepalivedStateChange(base.BaseSudoTestCase):
|
||||
def setUp(self):
|
||||
super(TestKeepalivedStateChange, self).setUp()
|
||||
cfg.CONF.register_opt(
|
||||
cfg.StrOpt('metadata_proxy_socket',
|
||||
default='$state_path/metadata_proxy',
|
||||
help=_('Location of Metadata Proxy UNIX domain '
|
||||
'socket')))
|
||||
|
||||
self.router_id = uuidutils.generate_uuid()
|
||||
self.conf_dir = self.get_default_temp_dir().path
|
||||
self.cidr = '169.254.128.1/24'
|
||||
self.interface_name = 'interface'
|
||||
self.monitor = keepalived_state_change.MonitorDaemon(
|
||||
self.get_temp_file_path('monitor.pid'),
|
||||
self.router_id,
|
||||
1,
|
||||
2,
|
||||
'namespace',
|
||||
self.conf_dir,
|
||||
self.interface_name,
|
||||
self.cidr)
|
||||
mock.patch.object(self.monitor, 'notify_agent').start()
|
||||
self.line = '1: %s inet %s' % (self.interface_name, self.cidr)
|
||||
|
||||
def test_parse_and_handle_event_wrong_device_completes_without_error(self):
|
||||
self.monitor.parse_and_handle_event(
|
||||
'1: wrong_device inet wrong_cidr')
|
||||
|
||||
def _get_state(self):
|
||||
with open(os.path.join(self.monitor.conf_dir, 'state')) as state_file:
|
||||
return state_file.read()
|
||||
|
||||
def test_parse_and_handle_event_writes_to_file(self):
|
||||
self.monitor.parse_and_handle_event('Deleted %s' % self.line)
|
||||
self.assertEqual('backup', self._get_state())
|
||||
|
||||
self.monitor.parse_and_handle_event(self.line)
|
||||
self.assertEqual('master', self._get_state())
|
||||
|
||||
def test_parse_and_handle_event_fails_writing_state(self):
|
||||
with mock.patch.object(
|
||||
self.monitor, 'write_state_change', side_effect=OSError):
|
||||
self.monitor.parse_and_handle_event(self.line)
|
||||
|
||||
def test_parse_and_handle_event_fails_notifying_agent(self):
|
||||
with mock.patch.object(
|
||||
self.monitor, 'notify_agent', side_effect=Exception):
|
||||
self.monitor.parse_and_handle_event(self.line)
|
@ -147,7 +147,6 @@ class L3AgentTestFramework(base.BaseOVSLinuxTestCase):
|
||||
expected_device['mac_address'], namespace)
|
||||
|
||||
def get_expected_keepalive_configuration(self, router):
|
||||
ha_confs_path = self.agent.conf.ha_confs_path
|
||||
router_id = router.router_id
|
||||
ha_device_name = router.get_ha_device_name(router.ha_port['id'])
|
||||
ha_device_cidr = router.ha_port['ip_cidr']
|
||||
@ -191,11 +190,7 @@ class L3AgentTestFramework(base.BaseOVSLinuxTestCase):
|
||||
0.0.0.0/0 via %(default_gateway_ip)s dev %(external_device_name)s
|
||||
8.8.8.0/24 via 19.4.4.4
|
||||
}
|
||||
notify_master "%(ha_confs_path)s/%(router_id)s/notify_master.sh"
|
||||
notify_backup "%(ha_confs_path)s/%(router_id)s/notify_backup.sh"
|
||||
notify_fault "%(ha_confs_path)s/%(router_id)s/notify_fault.sh"
|
||||
}""" % {
|
||||
'ha_confs_path': ha_confs_path,
|
||||
'router_id': router_id,
|
||||
'ha_device_name': ha_device_name,
|
||||
'ha_device_cidr': ha_device_cidr,
|
||||
@ -219,7 +214,8 @@ class L3AgentTestFramework(base.BaseOVSLinuxTestCase):
|
||||
# then the devices and iptable rules have also been deleted,
|
||||
# so there's no need to check that explicitly.
|
||||
self.assertFalse(self._namespace_exists(router.ns_name))
|
||||
self.assertFalse(self._metadata_proxy_exists(self.agent.conf, router))
|
||||
utils.wait_until_true(
|
||||
lambda: not self._metadata_proxy_exists(self.agent.conf, router))
|
||||
|
||||
def _assert_snat_chains(self, router):
|
||||
self.assertFalse(router.iptables_manager.is_chain_empty(
|
||||
@ -283,6 +279,25 @@ class L3AgentTestCase(L3AgentTestFramework):
|
||||
def test_observer_notifications_ha_router(self):
|
||||
self._test_observer_notifications(enable_ha=True)
|
||||
|
||||
def test_keepalived_state_change_notification(self):
|
||||
enqueue_mock = mock.patch.object(
|
||||
self.agent, 'enqueue_state_change').start()
|
||||
router_info = self.generate_router_info(enable_ha=True)
|
||||
router = self.manage_router(self.agent, router_info)
|
||||
utils.wait_until_true(lambda: router.ha_state == 'master')
|
||||
|
||||
device_name = router.get_ha_device_name(
|
||||
router.router[l3_constants.HA_INTERFACE_KEY]['id'])
|
||||
ha_device = ip_lib.IPDevice(device_name, router.ns_name)
|
||||
ha_device.link.set_down()
|
||||
utils.wait_until_true(lambda: router.ha_state == 'backup')
|
||||
|
||||
utils.wait_until_true(lambda: enqueue_mock.call_count == 3)
|
||||
calls = [args[0] for args in enqueue_mock.call_args_list]
|
||||
self.assertEqual((router.router_id, 'backup'), calls[0])
|
||||
self.assertEqual((router.router_id, 'master'), calls[1])
|
||||
self.assertEqual((router.router_id, 'backup'), calls[2])
|
||||
|
||||
def _test_observer_notifications(self, enable_ha):
|
||||
"""Test create, update, delete of router and notifications."""
|
||||
with mock.patch.object(
|
||||
@ -419,7 +434,8 @@ class L3AgentTestCase(L3AgentTestFramework):
|
||||
utils.wait_until_true(device_exists)
|
||||
|
||||
self.assertTrue(self._namespace_exists(router.ns_name))
|
||||
self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router))
|
||||
utils.wait_until_true(
|
||||
lambda: self._metadata_proxy_exists(self.agent.conf, router))
|
||||
self._assert_internal_devices(router)
|
||||
self._assert_external_device(router)
|
||||
if ip_version == 4:
|
||||
@ -538,7 +554,7 @@ class L3HATestFramework(L3AgentTestFramework):
|
||||
ha_device.link.set_down()
|
||||
|
||||
utils.wait_until_true(lambda: router2.ha_state == 'master')
|
||||
utils.wait_until_true(lambda: router1.ha_state == 'fault')
|
||||
utils.wait_until_true(lambda: router1.ha_state == 'backup')
|
||||
|
||||
|
||||
class MetadataFakeProxyHandler(object):
|
||||
|
@ -66,7 +66,6 @@ class KeepalivedConfBaseMixin(object):
|
||||
advert_int=5)
|
||||
instance1.set_authentication('AH', 'pass123')
|
||||
instance1.track_interfaces.append("eth0")
|
||||
instance1.set_notify('master', '/tmp/script.sh')
|
||||
|
||||
vip_address1 = keepalived.KeepalivedVipAddress('192.168.1.0/24',
|
||||
'eth1')
|
||||
@ -136,7 +135,6 @@ class KeepalivedConfTestCase(base.BaseTestCase,
|
||||
virtual_routes {
|
||||
0.0.0.0/0 via 192.168.1.1 dev eth1
|
||||
}
|
||||
notify_master "/tmp/script.sh"
|
||||
}
|
||||
vrrp_instance VR_2 {
|
||||
state MASTER
|
||||
@ -177,20 +175,12 @@ vrrp_instance VR_2 {
|
||||
|
||||
class KeepalivedStateExceptionTestCase(base.BaseTestCase):
|
||||
def test_state_exception(self):
|
||||
instance = keepalived.KeepalivedInstance('MASTER', 'eth0', 1,
|
||||
'169.254.192.0/18')
|
||||
|
||||
invalid_notify_state = 'a seal walks'
|
||||
self.assertRaises(keepalived.InvalidNotifyStateException,
|
||||
instance.set_notify,
|
||||
invalid_notify_state, '/tmp/script.sh')
|
||||
|
||||
invalid_vrrp_state = 'into a club'
|
||||
invalid_vrrp_state = 'a seal walks'
|
||||
self.assertRaises(keepalived.InvalidInstanceStateException,
|
||||
keepalived.KeepalivedInstance,
|
||||
invalid_vrrp_state, 'eth0', 33, '169.254.192.0/18')
|
||||
|
||||
invalid_auth_type = '[hip, hip]'
|
||||
invalid_auth_type = 'into a club'
|
||||
instance = keepalived.KeepalivedInstance('MASTER', 'eth0', 1,
|
||||
'169.254.192.0/18')
|
||||
self.assertRaises(keepalived.InvalidAuthenticationTypeException,
|
||||
@ -233,7 +223,6 @@ class KeepalivedInstanceTestCase(base.BaseTestCase,
|
||||
virtual_routes {
|
||||
0.0.0.0/0 via 192.168.1.1 dev eth1
|
||||
}
|
||||
notify_master "/tmp/script.sh"
|
||||
}
|
||||
vrrp_instance VR_2 {
|
||||
state MASTER
|
||||
|
@ -63,6 +63,7 @@ class TestMetadataDriverProcess(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestMetadataDriverProcess, self).setUp()
|
||||
mock.patch('eventlet.spawn').start()
|
||||
agent_config.register_interface_driver_opts_helper(cfg.CONF)
|
||||
cfg.CONF.set_override('interface_driver',
|
||||
'neutron.agent.linux.interface.NullDriver')
|
||||
|
@ -177,6 +177,7 @@ class BasicRouterOperationsFramework(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(BasicRouterOperationsFramework, self).setUp()
|
||||
mock.patch('eventlet.spawn').start()
|
||||
self.conf = agent_config.setup_conf()
|
||||
self.conf.register_opts(base_config.core_opts)
|
||||
log.register_options(self.conf)
|
||||
@ -200,7 +201,7 @@ class BasicRouterOperationsFramework(base.BaseTestCase):
|
||||
self.ensure_dir = mock.patch('neutron.agent.linux.utils'
|
||||
'.ensure_dir').start()
|
||||
|
||||
mock.patch('neutron.agent.linux.keepalived.KeepalivedNotifierMixin'
|
||||
mock.patch('neutron.agent.linux.keepalived.KeepalivedManager'
|
||||
'._get_full_config_file_path').start()
|
||||
|
||||
self.utils_exec_p = mock.patch(
|
||||
|
@ -94,6 +94,7 @@ console_scripts =
|
||||
neutron-debug = neutron.debug.shell:main
|
||||
neutron-dhcp-agent = neutron.cmd.eventlet.agents.dhcp:main
|
||||
neutron-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main
|
||||
neutron-keepalived-state-change = neutron.cmd.keepalived_state_change:main
|
||||
neutron-ibm-agent = neutron.plugins.ibm.agent.sdnve_neutron_agent:main
|
||||
neutron-l3-agent = neutron.cmd.eventlet.agents.l3:main
|
||||
neutron-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main
|
||||
|
Loading…
Reference in New Issue
Block a user