Implements ProcessMonitor in the haproxy driver

The ProcessMonitor class will monitor spawned external processes.

This patch enhances the HaproxyNSDriver class (v2) to utilize the
external_process module in order to monitor and respawn the haproxy processes
if and when needed.

With this patch the LBaaS agent (v2) will load external_process related options
in order to take a configured action when haproxy process dies unexpectedly.

Closes-Bug: #1565801

Change-Id: I420ca20b2620487909885e0e9f08dae60ebec2bf
This commit is contained in:
Nir Magnezi 2016-06-09 23:36:19 +03:00
parent 5a0082b512
commit 56795d7309
5 changed files with 62 additions and 73 deletions

View File

@ -16,6 +16,7 @@
import sys
from neutron.agent.common import config
from neutron.agent.linux import external_process
from neutron.agent.linux import interface
from neutron.common import config as common_config
from neutron.common import rpc as n_rpc
@ -51,6 +52,7 @@ def main():
cfg.CONF.register_opts(manager.OPTS)
# import interface options just in case the driver uses namespaces
cfg.CONF.register_opts(interface.OPTS)
cfg.CONF.register_opts(external_process.OPTS)
config.register_interface_driver_opts_helper(cfg.CONF)
config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)

View File

@ -22,9 +22,10 @@ import six
class AgentDeviceDriver(object):
"""Abstract device driver that defines the API required by LBaaS agent."""
def __init__(self, conf, plugin_rpc):
def __init__(self, conf, plugin_rpc, process_monitor=None):
self.conf = conf
self.plugin_rpc = plugin_rpc
self.process_monitor = process_monitor
@abc.abstractproperty
def loadbalancer(self):

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.linux import external_process
from neutron.agent import rpc as agent_rpc
from neutron import context as ncontext
from neutron.plugins.common import constants
@ -65,6 +66,8 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
self.context,
self.conf.host
)
self._process_monitor = external_process.ProcessMonitor(
config=self.conf, resource_type='loadbalancer')
self._load_drivers()
self.agent_state = {
@ -90,7 +93,8 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
driver_inst = importutils.import_object(
driver,
self.conf,
self.plugin_rpc
self.plugin_rpc,
self._process_monitor
)
except ImportError:
msg = _('Error importing loadbalancer device driver: %s')

View File

@ -18,8 +18,8 @@ import shutil
import socket
import netaddr
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils as linux_utils
from neutron.common import utils as n_utils
from neutron.plugins.common import constants
from neutron_lib import exceptions
@ -27,7 +27,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from neutron_lbaas._i18n import _, _LI, _LE, _LW
from neutron_lbaas._i18n import _, _LI, _LW
from neutron_lbaas.agent import agent_device_driver
from neutron_lbaas.services.loadbalancer import constants as lb_const
from neutron_lbaas.services.loadbalancer import data_models
@ -42,6 +42,7 @@ STATS_TYPE_BACKEND_RESPONSE = '1'
STATS_TYPE_SERVER_REQUEST = 4
STATS_TYPE_SERVER_RESPONSE = '2'
DRIVER_NAME = 'haproxy_ns'
HAPROXY_SERVICE_NAME = 'haproxy'
STATE_PATH_V2_APPEND = 'v2'
@ -54,8 +55,9 @@ def get_ns_name(namespace_id):
class HaproxyNSDriver(agent_device_driver.AgentDeviceDriver):
def __init__(self, conf, plugin_rpc):
super(HaproxyNSDriver, self).__init__(conf, plugin_rpc)
def __init__(self, conf, plugin_rpc, process_monitor):
super(HaproxyNSDriver, self).__init__(conf, plugin_rpc,
process_monitor)
self.state_path = conf.haproxy.loadbalancer_state_path
self.state_path = os.path.join(
self.conf.haproxy.loadbalancer_state_path, STATE_PATH_V2_APPEND)
@ -105,11 +107,17 @@ class HaproxyNSDriver(agent_device_driver.AgentDeviceDriver):
cleanup_namespace = kwargs.get('cleanup_namespace', False)
delete_namespace = kwargs.get('delete_namespace', False)
namespace = get_ns_name(loadbalancer_id)
pid_path = self._get_state_file_path(loadbalancer_id, 'haproxy.pid')
# kill the process
kill_pids_in_file(pid_path)
pid_data = self._get_state_file_path(loadbalancer_id, 'haproxy.pid')
pid_path = os.path.split(pid_data)[0]
self.process_monitor.unregister(uuid=loadbalancer_id,
service_name=HAPROXY_SERVICE_NAME)
pm = external_process.ProcessManager(uuid=loadbalancer_id,
namespace=namespace,
service=HAPROXY_SERVICE_NAME,
conf=self.conf,
pids_path=pid_path,
pid_file=pid_data)
pm.disable()
# unplug the ports
if loadbalancer_id in self.deployed_loadbalancers:
self._unplug(namespace,
@ -338,25 +346,37 @@ class HaproxyNSDriver(agent_device_driver.AgentDeviceDriver):
self.vif_driver.unplug(interface_name, namespace=namespace)
def _spawn(self, loadbalancer, extra_cmd_args=()):
def callback(pid_path):
conf_path = self._get_state_file_path(loadbalancer.id,
'haproxy.conf')
sock_path = self._get_state_file_path(loadbalancer.id,
'haproxy_stats.sock')
user_group = self.conf.haproxy.user_group
haproxy_base_dir = self._get_state_file_path(loadbalancer.id, '')
jinja_cfg.save_config(conf_path,
loadbalancer,
sock_path,
user_group,
haproxy_base_dir)
cmd = ['haproxy', '-f', conf_path, '-p', pid_path]
cmd.extend(extra_cmd_args)
return cmd
pid_data = self._get_state_file_path(loadbalancer.id, 'haproxy.pid')
pid_path = os.path.split(pid_data)[0]
namespace = get_ns_name(loadbalancer.id)
conf_path = self._get_state_file_path(loadbalancer.id, 'haproxy.conf')
pid_path = self._get_state_file_path(loadbalancer.id,
'haproxy.pid')
sock_path = self._get_state_file_path(loadbalancer.id,
'haproxy_stats.sock')
user_group = self.conf.haproxy.user_group
haproxy_base_dir = self._get_state_file_path(loadbalancer.id, '')
jinja_cfg.save_config(conf_path,
loadbalancer,
sock_path,
user_group,
haproxy_base_dir)
cmd = ['haproxy', '-f', conf_path, '-p', pid_path]
cmd.extend(extra_cmd_args)
ns = ip_lib.IPWrapper(namespace=namespace)
ns.netns.execute(cmd)
pm = external_process.ProcessManager(
uuid=loadbalancer.id,
default_cmd_callback=callback,
namespace=namespace,
service=HAPROXY_SERVICE_NAME,
conf=self.conf,
pids_path=pid_path,
pid_file=pid_data)
pm.enable(reload_cfg=False)
self.process_monitor.register(uuid=loadbalancer.id,
service_name=HAPROXY_SERVICE_NAME,
monitored_process=pm)
# remember deployed loadbalancer id
self.deployed_loadbalancers[loadbalancer.id] = loadbalancer
@ -465,17 +485,3 @@ class HealthMonitorManager(agent_device_driver.BaseHealthMonitorManager):
def delete(self, hm):
hm.pool.healthmonitor = None
self.driver.loadbalancer.refresh(hm.pool.loadbalancer)
def kill_pids_in_file(pid_path):
if os.path.exists(pid_path):
with open(pid_path, 'r') as pids:
for pid in pids:
pid = pid.strip()
try:
linux_utils.execute(['kill', '-9', pid], run_as_root=True)
except RuntimeError:
LOG.exception(
_LE('Unable to kill haproxy process: %s'),
pid
)

View File

@ -38,11 +38,13 @@ class TestHaproxyNSDriver(base.BaseTestCase):
conf.haproxy.send_gratuitous_arp = 3
self.conf = conf
self.rpc_mock = mock.Mock()
self._process_monitor = mock.Mock()
with mock.patch(
'neutron.common.utils.load_class_by_alias_or_classname'):
self.driver = namespace_driver.HaproxyNSDriver(
conf,
self.rpc_mock
self.rpc_mock,
self._process_monitor
)
self.vif_driver = mock.Mock()
self.driver.vif_driver = self.vif_driver
@ -67,19 +69,17 @@ class TestHaproxyNSDriver(base.BaseTestCase):
self.assertEqual(namespace_driver.DRIVER_NAME, self.driver.get_name())
@mock.patch('neutron.agent.linux.ip_lib.IPWrapper')
@mock.patch('os.makedirs')
@mock.patch('os.path.dirname')
@mock.patch('os.path.isdir')
@mock.patch('shutil.rmtree')
def test_undeploy_instance(self, mock_shutil, mock_isdir, mock_dirname,
mock_ip_wrap):
mock_makedirs, mock_ip_wrap):
self.driver._get_state_file_path = mock.Mock(return_value='/path')
namespace_driver.kill_pids_in_file = mock.Mock()
self.driver._unplug = mock.Mock()
mock_dirname.return_value = '/path/' + self.lb.id
mock_isdir.return_value = False
self.driver.undeploy_instance(self.lb.id)
namespace_driver.kill_pids_in_file.assert_called_once_with('/path')
calls = [mock.call(self.lb.id, 'pid'), mock.call(self.lb.id, '')]
self.driver._get_state_file_path.has_calls(calls)
self.assertFalse(self.driver._unplug.called)
@ -89,7 +89,6 @@ class TestHaproxyNSDriver(base.BaseTestCase):
self.driver.deployed_loadbalancers[self.lb.id] = self.lb
mock_isdir.return_value = True
namespace_driver.kill_pids_in_file.reset_mock()
mock_isdir.reset_mock()
mock_ns = mock_ip_wrap.return_value
mock_ns.get_devices.return_value = [collections.namedtuple(
@ -97,7 +96,6 @@ class TestHaproxyNSDriver(base.BaseTestCase):
self.driver.undeploy_instance(self.lb.id, cleanup_namespace=True,
delete_namespace=True)
ns = namespace_driver.get_ns_name(self.lb.id)
namespace_driver.kill_pids_in_file.assert_called_once_with('/path')
calls = [mock.call(self.lb.id, 'pid'), mock.call(self.lb.id, '')]
self.driver._get_state_file_path.has_calls(calls)
self.driver._unplug.assert_called_once_with(ns, self.lb.vip_port)
@ -427,7 +425,7 @@ class TestHaproxyNSDriver(base.BaseTestCase):
namespace=namespace_driver.get_ns_name(self.lb.id))
mock_ns.netns.execute.assert_called_once_with(
['haproxy', '-f', conf_dir % 'haproxy.conf', '-p',
conf_dir % 'haproxy.pid'])
conf_dir % 'haproxy.pid'], addl_env=None, run_as_root=False)
self.assertIn(self.lb.id, self.driver.deployed_loadbalancers)
self.assertEqual(self.lb,
self.driver.deployed_loadbalancers[self.lb.id])
@ -666,28 +664,6 @@ class TestHealthMonitorManager(BaseTestHealthMonitorManager):
class TestNamespaceDriverModule(base.BaseTestCase):
@mock.patch('os.path.exists')
@mock.patch('neutron.agent.linux.utils.execute')
def test_kill_pids_in_file(self, execute, exists):
pid_path = '/var/lib/data'
with mock.patch('six.moves.builtins.open') as m_open:
exists.return_value = False
file_mock = mock.MagicMock()
m_open.return_value = file_mock
file_mock.__enter__.return_value = file_mock
file_mock.__iter__.return_value = iter(['123'])
namespace_driver.kill_pids_in_file(pid_path)
# sometimes fails
# exists.assert_called_once_with(pid_path)
self.assertFalse(m_open.called)
self.assertFalse(execute.called)
exists.return_value = True
execute.side_effect = RuntimeError
namespace_driver.kill_pids_in_file(pid_path)
# sometimes fails
# execute.assert_called_once_with(['kill', '-9', '123'])
def test_get_ns_name(self):
ns_name = namespace_driver.get_ns_name('woohoo')
self.assertEqual(namespace_driver.NS_PREFIX + 'woohoo', ns_name)