Haproxy Namespace Driver

This adds the haproxy namespace driver based on the lbaas
v2 object model.  It has been retrofitted to work with the
new object model and new device driver interface.  Everything
else has been shamelessly copied from the v1 haproxy namespace
driver.

Change-Id: Icf34a215e094d27270a5f64df2d1cb995f505370
Partially-implements: blueprint lbaas-refactor-haproxy-namespace-driver-to-new-driver-interface
Co-Authored-By: Brandon Logan <brandon.logan@rackspace.com>
This commit is contained in:
Phillip Toohill 2015-02-22 22:42:47 -06:00
parent 158f0990d8
commit ce05376126
5 changed files with 1158 additions and 2 deletions

View File

@ -54,7 +54,9 @@ service_provider=LOADBALANCER:Haproxy:neutron_lbaas.services.loadbalancer.driver
# service_provider=LOADBALANCER:NetScaler:neutron_lbaas.services.loadbalancer.drivers.netscaler.netscaler_driver.NetScalerPluginDriver
# service_provider=LOADBALANCER:Embrane:neutron_lbaas.services.loadbalancer.drivers.embrane.driver.EmbraneLbaas:default
# service_provider = LOADBALANCER:A10Networks:neutron_lbaas.services.loadbalancer.drivers.a10networks.driver_v1.ThunderDriver:default
# LBaaS v2 drivers
# service_provider = LOADBALANCERV2:LoggingNoop:neutron_lbaas.drivers.logging_noop.driver.LoggingNoopLoadBalancerDriver:default
# service_provider=LOADBALANCERV2:Haproxy:neutron_lbaas.drivers.haproxy.plugin_driver.HaproxyOnHostPluginDriver:default
[certificates]
# cert_manager_class = neutron_lbaas.common.cert_manager.local_cert_manager

View File

@ -509,7 +509,6 @@ class LoadBalancerPluginDbv2(base_db.CommonDbMixin,
with context.session.begin(subtransactions=True):
lb_db = self._get_resource(context, models.LoadBalancer,
loadbalancer_id)
self.assert_modification_allowed(lb_db)
lb_db.stats = self._create_loadbalancer_stats(context,
loadbalancer_id,
data=stats_data)

View File

@ -0,0 +1,467 @@
# Copyright 2013 New Dream Network, LLC (DreamHost)
# Copyright 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import shutil
import socket
import netaddr
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils as linux_utils
from neutron.common import exceptions
from neutron.common import utils as n_utils
from neutron.i18n import _LI, _LE, _LW
from neutron.plugins.common import constants
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import importutils
from neutron_lbaas.agent import agent_device_driver
from neutron_lbaas.services.loadbalancer import constants as lb_const
from neutron_lbaas.services.loadbalancer import data_models
from neutron_lbaas.services.loadbalancer.drivers.haproxy import jinja_cfg
from neutron_lbaas.services.loadbalancer.drivers.haproxy \
import namespace_driver
LOG = logging.getLogger(__name__)
NS_PREFIX = 'qlbaas-'
STATS_TYPE_BACKEND_REQUEST = 2
STATS_TYPE_BACKEND_RESPONSE = '1'
STATS_TYPE_SERVER_REQUEST = 4
STATS_TYPE_SERVER_RESPONSE = '2'
DRIVER_NAME = 'haproxy_ns'
STATE_PATH_V2_APPEND = 'v2'
cfg.CONF.register_opts(namespace_driver.OPTS, 'haproxy')
def get_ns_name(namespace_id):
return NS_PREFIX + namespace_id
class HaproxyNSDriver(agent_device_driver.AgentDeviceDriver):
def __init__(self, conf, plugin_rpc):
super(HaproxyNSDriver, self).__init__(conf, plugin_rpc)
self.state_path = conf.haproxy.loadbalancer_state_path
self.state_path = os.path.join(
self.conf.haproxy.loadbalancer_state_path, STATE_PATH_V2_APPEND)
try:
vif_driver = importutils.import_object(conf.interface_driver, conf)
except ImportError:
with excutils.save_and_reraise_exception():
msg = (_('Error importing interface driver: %s')
% conf.interface_driver)
LOG.error(msg)
self.vif_driver = vif_driver
self.deployed_loadbalancers = {}
self._loadbalancer = LoadBalancerManager(self)
self._listener = ListenerManager(self)
self._pool = PoolManager(self)
self._member = MemberManager(self)
self._healthmonitor = HealthMonitorManager(self)
@property
def loadbalancer(self):
return self._loadbalancer
@property
def listener(self):
return self._listener
@property
def pool(self):
return self._pool
@property
def member(self):
return self._member
@property
def healthmonitor(self):
return self._healthmonitor
def get_name(self):
return DRIVER_NAME
@n_utils.synchronized('haproxy-driver')
def undeploy_instance(self, loadbalancer_id, **kwargs):
cleanup_namespace = kwargs.get('cleanup_namespace', False)
delete_namespace = kwargs.get('delete_namespace', False)
namespace = get_ns_name(loadbalancer_id)
pid_path = self._get_state_file_path(loadbalancer_id, 'pid')
# kill the process
kill_pids_in_file(pid_path)
# unplug the ports
if loadbalancer_id in self.deployed_loadbalancers:
self._unplug(namespace,
self.deployed_loadbalancers[loadbalancer_id].vip_port)
# delete all devices from namespace
# used when deleting orphans and port is not known for a loadbalancer
if cleanup_namespace:
ns = ip_lib.IPWrapper(namespace=namespace)
for device in ns.get_devices(exclude_loopback=True):
self.vif_driver.unplug(device.name, namespace=namespace)
# remove the configuration directory
conf_dir = os.path.dirname(
self._get_state_file_path(loadbalancer_id, ''))
if os.path.isdir(conf_dir):
shutil.rmtree(conf_dir)
if delete_namespace:
ns = ip_lib.IPWrapper(namespace=namespace)
ns.garbage_collect_namespace()
def remove_orphans(self, known_loadbalancer_ids):
if not os.path.exists(self.state_path):
return
orphans = (lb_id for lb_id in os.listdir(self.state_path)
if lb_id not in known_loadbalancer_ids)
for lb_id in orphans:
if self.exists(lb_id):
self.undeploy_instance(lb_id, cleanup_namespace=True)
def get_stats(self, loadbalancer_id):
socket_path = self._get_state_file_path(loadbalancer_id,
'haproxy_stats.sock', False)
if os.path.exists(socket_path):
parsed_stats = self._get_stats_from_socket(
socket_path,
entity_type=(STATS_TYPE_BACKEND_REQUEST |
STATS_TYPE_SERVER_REQUEST))
lb_stats = self._get_backend_stats(parsed_stats)
lb_stats['members'] = self._get_servers_stats(parsed_stats)
return lb_stats
else:
LOG.warn(_LW('Stats socket not found for loadbalancer %s') %
loadbalancer_id)
return {}
@n_utils.synchronized('haproxy-driver')
def deploy_instance(self, loadbalancer):
"""Deploys loadbalancer if necessary
:return: True if loadbalancer was deployed, False otherwise
"""
if not self.deployable(loadbalancer):
LOG.info(_LI("Loadbalancer %s is not deployable.") %
loadbalancer.id)
return False
if self.exists(loadbalancer.id):
self.update(loadbalancer)
else:
self.create(loadbalancer)
return True
def update(self, loadbalancer):
pid_path = self._get_state_file_path(loadbalancer.id, 'haproxy.pid')
extra_args = ['-sf']
extra_args.extend(p.strip() for p in open(pid_path, 'r'))
self._spawn(loadbalancer, extra_args)
def exists(self, loadbalancer_id):
namespace = get_ns_name(loadbalancer_id)
root_ns = ip_lib.IPWrapper()
socket_path = self._get_state_file_path(
loadbalancer_id, 'haproxy_stats.sock', False)
if root_ns.netns.exists(namespace) and os.path.exists(socket_path):
try:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(socket_path)
return True
except socket.error:
pass
return False
def create(self, loadbalancer):
namespace = get_ns_name(loadbalancer.id)
self._plug(namespace, loadbalancer.vip_port)
self._spawn(loadbalancer)
def deployable(self, loadbalancer):
"""Returns True if loadbalancer is active and has active listeners."""
if not loadbalancer:
return False
acceptable_listeners = [
listener for listener in loadbalancer.listeners
if (listener.provisioning_status != constants.PENDING_DELETE and
listener.admin_state_up)]
return (bool(acceptable_listeners) and loadbalancer.admin_state_up and
loadbalancer.provisioning_status != constants.PENDING_DELETE)
def _get_stats_from_socket(self, socket_path, entity_type):
try:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(socket_path)
s.send('show stat -1 %s -1\n' % entity_type)
raw_stats = ''
chunk_size = 1024
while True:
chunk = s.recv(chunk_size)
raw_stats += chunk
if len(chunk) < chunk_size:
break
return self._parse_stats(raw_stats)
except socket.error as e:
LOG.warn(_LW('Error while connecting to stats socket: %s'), e)
return {}
def _parse_stats(self, raw_stats):
stat_lines = raw_stats.splitlines()
if len(stat_lines) < 2:
return []
stat_names = [name.strip('# ') for name in stat_lines[0].split(',')]
res_stats = []
for raw_values in stat_lines[1:]:
if not raw_values:
continue
stat_values = [value.strip() for value in raw_values.split(',')]
res_stats.append(dict(zip(stat_names, stat_values)))
return res_stats
def _get_backend_stats(self, parsed_stats):
for stats in parsed_stats:
if stats.get('type') == STATS_TYPE_BACKEND_RESPONSE:
unified_stats = dict((k, stats.get(v, ''))
for k, v in jinja_cfg.STATS_MAP.items())
return unified_stats
return {}
def _get_servers_stats(self, parsed_stats):
res = {}
for stats in parsed_stats:
if stats.get('type') == STATS_TYPE_SERVER_RESPONSE:
res[stats['svname']] = {
lb_const.STATS_STATUS: (constants.INACTIVE
if stats['status'] == 'DOWN'
else constants.ACTIVE),
lb_const.STATS_HEALTH: stats['check_status'],
lb_const.STATS_FAILED_CHECKS: stats['chkfail']
}
return res
def _get_state_file_path(self, loadbalancer_id, kind,
ensure_state_dir=True):
"""Returns the file name for a given kind of config file."""
confs_dir = os.path.abspath(os.path.normpath(self.state_path))
conf_dir = os.path.join(confs_dir, loadbalancer_id)
if ensure_state_dir:
linux_utils.ensure_dir(conf_dir)
return os.path.join(conf_dir, kind)
def _plug(self, namespace, port, reuse_existing=True):
self.plugin_rpc.plug_vip_port(port.id)
interface_name = self.vif_driver.get_device_name(port)
if ip_lib.device_exists(interface_name,
namespace=namespace):
if not reuse_existing:
raise exceptions.PreexistingDeviceFailure(
dev_name=interface_name
)
else:
self.vif_driver.plug(
port.network_id,
port.id,
interface_name,
port.mac_address,
namespace=namespace
)
cidrs = [
'%s/%s' % (ip.ip_address,
netaddr.IPNetwork(ip.subnet.cidr).prefixlen)
for ip in port.fixed_ips
]
self.vif_driver.init_l3(interface_name, cidrs, namespace=namespace)
gw_ip = port.fixed_ips[0].subnet.gateway_ip
if not gw_ip:
host_routes = port.fixed_ips[0].subnet.host_routes
for host_route in host_routes:
if host_route.destination == "0.0.0.0/0":
gw_ip = host_route.nexthop
break
if gw_ip:
cmd = ['route', 'add', 'default', 'gw', gw_ip]
ip_wrapper = ip_lib.IPWrapper(namespace=namespace)
ip_wrapper.netns.execute(cmd, check_exit_code=False)
# When delete and re-add the same vip, we need to
# send gratuitous ARP to flush the ARP cache in the Router.
gratuitous_arp = self.conf.haproxy.send_gratuitous_arp
if gratuitous_arp > 0:
for ip in port.fixed_ips:
cmd_arping = ['arping', '-U',
'-I', interface_name,
'-c', gratuitous_arp,
ip.ip_address]
ip_wrapper.netns.execute(cmd_arping, check_exit_code=False)
def _unplug(self, namespace, port):
self.plugin_rpc.unplug_vip_port(port.id)
interface_name = self.vif_driver.get_device_name(port)
self.vif_driver.unplug(interface_name, namespace=namespace)
def _spawn(self, loadbalancer, extra_cmd_args=()):
namespace = get_ns_name(loadbalancer.id)
conf_path = self._get_state_file_path(loadbalancer.id, 'haproxy.conf')
pid_path = self._get_state_file_path(loadbalancer.id,
'haproxy.pid')
sock_path = self._get_state_file_path(loadbalancer.id,
'haproxy_stats.sock')
user_group = self.conf.haproxy.user_group
jinja_cfg.save_config(conf_path, loadbalancer, sock_path, user_group)
cmd = ['haproxy', '-f', conf_path, '-p', pid_path]
cmd.extend(extra_cmd_args)
ns = ip_lib.IPWrapper(namespace=namespace)
ns.netns.execute(cmd)
# remember deployed loadbalancer id
self.deployed_loadbalancers[loadbalancer.id] = loadbalancer
class LoadBalancerManager(agent_device_driver.BaseLoadBalancerManager):
def refresh(self, loadbalancer):
loadbalancer_dict = self.driver.plugin_rpc.get_loadbalancer(
loadbalancer.id)
loadbalancer = data_models.LoadBalancer.from_dict(loadbalancer_dict)
if (not self.driver.deploy_instance(loadbalancer) and
self.driver.exists(loadbalancer.id)):
self.driver.undeploy_instance(loadbalancer.id)
def delete(self, loadbalancer):
if self.driver.exists(loadbalancer.id):
self.driver.undeploy_instance(loadbalancer.id)
def create(self, loadbalancer):
# loadbalancer has no listeners then do nothing because haproxy will
# not start without a tcp port. Consider this successful anyway.
if not loadbalancer.listeners:
return
self.refresh(loadbalancer)
def get_stats(self, loadbalancer_id):
return self.driver.get_stats(loadbalancer_id)
def update(self, old_loadbalancer, loadbalancer):
self.refresh(loadbalancer)
class ListenerManager(agent_device_driver.BaseListenerManager):
def _remove_listener(self, loadbalancer, listener_id):
index_to_remove = None
for index, listener in enumerate(loadbalancer.listeners):
if listener.id == listener_id:
index_to_remove = index
loadbalancer.listeners.pop(index_to_remove)
def update(self, old_listener, new_listener):
self.driver.loadbalancer.refresh(new_listener.loadbalancer)
def create(self, listener):
self.driver.loadbalancer.refresh(listener.loadbalancer)
def delete(self, listener):
loadbalancer = listener.loadbalancer
self._remove_listener(loadbalancer, listener.id)
if len(loadbalancer.listeners) > 0:
self.driver.loadbalancer.refresh(loadbalancer)
else:
# undeploy instance because haproxy will throw error if port is
# missing in frontend
self.driver.undeploy_instance(loadbalancer.id)
class PoolManager(agent_device_driver.BasePoolManager):
def update(self, old_pool, new_pool):
self.driver.loadbalancer.refresh(new_pool.listener.loadbalancer)
def create(self, pool):
self.driver.loadbalancer.refresh(pool.listener.loadbalancer)
def delete(self, pool):
loadbalancer = pool.listener.loadbalancer
pool.listener.default_pool = None
# just refresh because haproxy is fine if only frontend is listed
self.driver.loadbalancer.refresh(loadbalancer)
class MemberManager(agent_device_driver.BaseMemberManager):
def _remove_member(self, pool, member_id):
index_to_remove = None
for index, member in enumerate(pool.members):
if member.id == member_id:
index_to_remove = index
pool.members.pop(index_to_remove)
def update(self, old_member, new_member):
self.driver.loadbalancer.refresh(new_member.pool.listener.loadbalancer)
def create(self, member):
self.driver.loadbalancer.refresh(member.pool.listener.loadbalancer)
def delete(self, member):
self._remove_member(member.pool, member.id)
self.driver.loadbalancer.refresh(member.pool.listener.loadbalancer)
class HealthMonitorManager(agent_device_driver.BaseHealthMonitorManager):
def update(self, old_hm, new_hm):
self.driver.loadbalancer.refresh(new_hm.pool.listener.loadbalancer)
def create(self, hm):
self.driver.loadbalancer.refresh(hm.pool.listener.loadbalancer)
def delete(self, hm):
hm.pool.healthmonitor = None
self.driver.loadbalancer.refresh(hm.pool.listener.loadbalancer)
def kill_pids_in_file(pid_path):
if os.path.exists(pid_path):
with open(pid_path, 'r') as pids:
for pid in pids:
pid = pid.strip()
try:
linux_utils.execute(['kill', '-9', pid])
except RuntimeError:
LOG.exception(
_LE('Unable to kill haproxy process: %s'),
pid
)

View File

@ -1,3 +1,4 @@
# Copyright (c) 2013 OpenStack Foundation.
# Copyright (c) 2015 Rackspace.
# All Rights Reserved.
#
@ -14,7 +15,8 @@
# under the License.
from neutron_lbaas.drivers.common import agent_driver_base
from neutron_lbaas.drivers.haproxy import namespace_driver
class HaproxyOnHostPluginDriver(agent_driver_base.AgentDriverBase):
device_driver = 'test'
device_driver = namespace_driver.DRIVER_NAME

View File

@ -0,0 +1,686 @@
# Copyright 2013 New Dream Network, LLC (DreamHost)
# Copyright 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import socket
import mock
from neutron.common import exceptions
from neutron.plugins.common import constants
from neutron_lbaas.drivers.haproxy import namespace_driver
from neutron_lbaas.services.loadbalancer import data_models
from neutron_lbaas.tests import base
class TestHaproxyNSDriver(base.BaseTestCase):
def setUp(self):
super(TestHaproxyNSDriver, self).setUp()
conf = mock.Mock()
conf.haproxy.loadbalancer_state_path = '/the/path'
conf.interface_driver = 'intdriver'
conf.haproxy.user_group = 'test_group'
conf.haproxy.send_gratuitous_arp = 3
self.conf = conf
self.mock_importer = mock.patch.object(namespace_driver,
'importutils').start()
self.rpc_mock = mock.Mock()
self.driver = namespace_driver.HaproxyNSDriver(
conf,
self.rpc_mock
)
self.vif_driver = mock.Mock()
self.driver.vif_driver = self.vif_driver
self._build_mock_data_models()
def _build_mock_data_models(self):
host_route = data_models.HostRoute(destination='0.0.0.0/0',
nexthop='192.0.0.1')
subnet = data_models.Subnet(cidr='10.0.0.1/24',
gateway_ip='10.0.0.2',
host_routes=[host_route])
fixed_ip = data_models.IPAllocation(ip_address='10.0.0.1')
setattr(fixed_ip, 'subnet', subnet)
port = data_models.Port(id='port1', network_id='network1',
mac_address='12-34-56-78-9A-BC',
fixed_ips=[fixed_ip])
self.lb = data_models.LoadBalancer(id='lb1', listeners=[],
vip_port=port)
def test_get_name(self):
self.assertEqual(namespace_driver.DRIVER_NAME, self.driver.get_name())
@mock.patch('neutron.agent.linux.ip_lib.IPWrapper')
@mock.patch('os.path.dirname')
@mock.patch('os.path.isdir')
@mock.patch('shutil.rmtree')
def test_undeploy_instance(self, mock_shutil, mock_isdir, mock_dirname,
mock_ip_wrap):
self.driver._get_state_file_path = mock.Mock(return_value='/path')
namespace_driver.kill_pids_in_file = mock.Mock()
self.driver._unplug = mock.Mock()
mock_dirname.return_value = '/path/' + self.lb.id
mock_isdir.return_value = False
self.driver.undeploy_instance(self.lb.id)
namespace_driver.kill_pids_in_file.assert_called_once_with('/path')
calls = [mock.call(self.lb.id, 'pid'), mock.call(self.lb.id, '')]
self.driver._get_state_file_path.has_calls(calls)
self.assertFalse(self.driver._unplug.called)
self.assertFalse(mock_ip_wrap.called)
mock_isdir.assert_called_once_with('/path/' + self.lb.id)
self.assertFalse(mock_shutil.called)
self.driver.deployed_loadbalancers[self.lb.id] = self.lb
mock_isdir.return_value = True
namespace_driver.kill_pids_in_file.reset_mock()
mock_isdir.reset_mock()
mock_ns = mock_ip_wrap.return_value
mock_ns.get_devices.return_value = [collections.namedtuple(
'Device', ['name'])(name='test_device')]
self.driver.undeploy_instance(self.lb.id, cleanup_namespace=True,
delete_namespace=True)
ns = namespace_driver.get_ns_name(self.lb.id)
namespace_driver.kill_pids_in_file.assert_called_once_with('/path')
calls = [mock.call(self.lb.id, 'pid'), mock.call(self.lb.id, '')]
self.driver._get_state_file_path.has_calls(calls)
self.driver._unplug.assert_called_once_with(ns, self.lb.vip_port)
ip_wrap_calls = [mock.call(namespace=ns), mock.call(namespace=ns)]
mock_ip_wrap.has_calls(ip_wrap_calls)
mock_ns.get_devices.assert_called_once_with(exclude_loopback=True)
self.vif_driver.unplug.assert_called_once_with('test_device',
namespace=ns)
mock_shutil.assert_called_once_with('/path/' + self.lb.id)
mock_ns.garbage_collect_namespace.assert_called_once_with()
@mock.patch('os.path.exists')
@mock.patch('os.listdir')
def test_remove_orphans(self, list_dir, exists):
lb_ids = [self.lb.id]
exists.return_value = False
self.driver.remove_orphans(lb_ids)
exists.assert_called_once_with(self.driver.state_path)
self.assertFalse(list_dir.called)
exists.reset_mock()
exists.return_value = True
list_dir.return_value = [self.lb.id, 'lb2']
self.driver.exists = mock.Mock()
self.driver.undeploy_instance = mock.Mock()
self.driver.remove_orphans(lb_ids)
exists.assert_called_once_with(self.driver.state_path)
list_dir.assert_called_once_with(self.driver.state_path)
self.driver.exists.assert_called_once_with('lb2')
self.driver.undeploy_instance.assert_called_once_with(
'lb2', cleanup_namespace=True)
def test_get_stats(self):
# Shamelessly stolen from v1 namespace driver tests.
raw_stats = ('# pxname,svname,qcur,qmax,scur,smax,slim,stot,bin,bout,'
'dreq,dresp,ereq,econ,eresp,wretr,wredis,status,weight,'
'act,bck,chkfail,chkdown,lastchg,downtime,qlimit,pid,iid,'
'sid,throttle,lbtot,tracked,type,rate,rate_lim,rate_max,'
'check_status,check_code,check_duration,hrsp_1xx,'
'hrsp_2xx,hrsp_3xx,hrsp_4xx,hrsp_5xx,hrsp_other,hanafail,'
'req_rate,req_rate_max,req_tot,cli_abrt,srv_abrt,\n'
'8e271901-69ed-403e-a59b-f53cf77ef208,BACKEND,1,2,3,4,0,'
'10,7764,2365,0,0,,0,0,0,0,UP,1,1,0,,0,103780,0,,1,2,0,,0'
',,1,0,,0,,,,0,0,0,0,0,0,,,,,0,0,\n\n'
'a557019b-dc07-4688-9af4-f5cf02bb6d4b,'
'32a6c2a3-420a-44c3-955d-86bd2fc6871e,0,0,0,1,,7,1120,'
'224,,0,,0,0,0,0,UP,1,1,0,0,1,2623,303,,1,2,1,,7,,2,0,,'
'1,L7OK,200,98,0,7,0,0,0,0,0,,,,0,0,\n'
'a557019b-dc07-4688-9af4-f5cf02bb6d4b,'
'd9aea044-8867-4e80-9875-16fb808fa0f9,0,0,0,2,,12,0,0,,'
'0,,0,0,8,4,DOWN,1,1,0,9,2,308,675,,1,2,2,,4,,2,0,,2,'
'L4CON,,2999,0,0,0,0,0,0,0,,,,0,0,\n')
raw_stats_empty = ('# pxname,svname,qcur,qmax,scur,smax,slim,stot,bin,'
'bout,dreq,dresp,ereq,econ,eresp,wretr,wredis,'
'status,weight,act,bck,chkfail,chkdown,lastchg,'
'downtime,qlimit,pid,iid,sid,throttle,lbtot,'
'tracked,type,rate,rate_lim,rate_max,check_status,'
'check_code,check_duration,hrsp_1xx,hrsp_2xx,'
'hrsp_3xx,hrsp_4xx,hrsp_5xx,hrsp_other,hanafail,'
'req_rate,req_rate_max,req_tot,cli_abrt,srv_abrt,'
'\n')
with contextlib.nested(
mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch('socket.socket'),
mock.patch('os.path.exists'),
) as (gsp, mocket, path_exists):
gsp.side_effect = lambda x, y, z: '/pool/' + y
path_exists.return_value = True
mocket.return_value = mocket
mocket.recv.return_value = raw_stats
exp_stats = {'connection_errors': '0',
'active_connections': '3',
'current_sessions': '3',
'bytes_in': '7764',
'max_connections': '4',
'max_sessions': '4',
'bytes_out': '2365',
'response_errors': '0',
'total_sessions': '10',
'total_connections': '10',
'members': {
'32a6c2a3-420a-44c3-955d-86bd2fc6871e': {
'status': 'ACTIVE',
'health': 'L7OK',
'failed_checks': '0'
},
'd9aea044-8867-4e80-9875-16fb808fa0f9': {
'status': 'INACTIVE',
'health': 'L4CON',
'failed_checks': '9'
}
}
}
stats = self.driver.get_stats(self.lb.id)
self.assertEqual(exp_stats, stats)
mocket.recv.return_value = raw_stats_empty
self.assertEqual({'members': {}},
self.driver.get_stats(self.lb.id))
path_exists.return_value = False
mocket.reset_mock()
self.assertEqual({}, self.driver.get_stats(self.lb.id))
self.assertFalse(mocket.called)
def test_deploy_instance(self):
self.driver.deployable = mock.Mock(return_value=False)
self.driver.exists = mock.Mock(return_value=True)
self.driver.update = mock.Mock()
self.driver.create = mock.Mock()
def reset():
self.driver.deployable.reset_mock()
self.driver.exists.reset_mock()
self.driver.update.reset_mock()
self.driver.create.reset_mock()
deployed = self.driver.deploy_instance(self.lb)
self.assertFalse(deployed)
self.assertFalse(self.driver.exists.called)
self.assertFalse(self.driver.create.called)
self.assertFalse(self.driver.update.called)
reset()
self.driver.deployable.return_value = True
deployed = self.driver.deploy_instance(self.lb)
self.assertTrue(deployed)
self.driver.exists.assert_called_once_with(self.lb.id)
self.driver.update.assert_called_once_with(self.lb)
self.assertFalse(self.driver.create.called)
reset()
self.driver.exists.return_value = False
deployed = self.driver.deploy_instance(self.lb)
self.assertTrue(deployed)
self.driver.exists.assert_called_once_with(self.lb.id)
self.driver.create.assert_called_once_with(self.lb)
self.assertFalse(self.driver.update.called)
def test_update(self):
self.driver._get_state_file_path = mock.Mock(return_value='/path')
self.driver._spawn = mock.Mock()
with mock.patch('__builtin__.open') as m_open:
file_mock = mock.MagicMock()
m_open.return_value = file_mock
file_mock.__enter__.return_value = file_mock
file_mock.__iter__.return_value = iter(['123'])
self.driver.update(self.lb)
self.driver._spawn.assert_called_once_with(self.lb,
['-sf', '123'])
@mock.patch('socket.socket')
@mock.patch('os.path.exists')
@mock.patch('neutron.agent.linux.ip_lib.IPWrapper')
def test_exists(self, ip_wrap, exists, mocket):
socket_path = '/path/haproxy_stats.sock'
mock_ns = ip_wrap.return_value
mock_socket = mocket.return_value
self.driver._get_state_file_path = mock.Mock(return_value=socket_path)
mock_ns.netns.exists.return_value = False
exists.return_value = False
def reset():
ip_wrap.reset_mock()
self.driver._get_state_file_path.reset_mock()
mock_ns.reset_mock()
exists.reset_mock()
mocket.reset_mock()
mock_socket.reset_mock()
ret_exists = self.driver.exists(self.lb.id)
ip_wrap.assert_called_once_with()
self.driver._get_state_file_path.assert_called_once_with(
self.lb.id, 'haproxy_stats.sock', False)
mock_ns.netns.exists.assert_called_once_with(
namespace_driver.get_ns_name(self.lb.id))
self.assertFalse(exists.called)
self.assertFalse(mocket.called)
self.assertFalse(mock_socket.connect.called)
self.assertFalse(ret_exists)
reset()
mock_ns.netns.exists.return_value = True
exists.return_value = False
ret_exists = self.driver.exists(self.lb.id)
ip_wrap.assert_called_once_with()
self.driver._get_state_file_path.assert_called_once_with(
self.lb.id, 'haproxy_stats.sock', False)
mock_ns.netns.exists.assert_called_once_with(
namespace_driver.get_ns_name(self.lb.id))
exists.assert_called_once_with(socket_path)
self.assertFalse(mocket.called)
self.assertFalse(mock_socket.connect.called)
self.assertFalse(ret_exists)
reset()
mock_ns.netns.exists.return_value = True
exists.return_value = True
ret_exists = self.driver.exists(self.lb.id)
ip_wrap.assert_called_once_with()
self.driver._get_state_file_path.assert_called_once_with(
self.lb.id, 'haproxy_stats.sock', False)
mock_ns.netns.exists.assert_called_once_with(
namespace_driver.get_ns_name(self.lb.id))
exists.assert_called_once_with(socket_path)
mocket.assert_called_once_with(socket.AF_UNIX, socket.SOCK_STREAM)
mock_socket.connect.assert_called_once_with(socket_path)
self.assertTrue(ret_exists)
def test_create(self):
self.driver._plug = mock.Mock()
self.driver._spawn = mock.Mock()
self.driver.create(self.lb)
self.driver._plug.assert_called_once_with(
namespace_driver.get_ns_name(self.lb.id), self.lb.vip_port)
self.driver._spawn.assert_called_once_with(self.lb)
def test_deployable(self):
# test None
ret_val = self.driver.deployable(None)
self.assertFalse(ret_val)
# test no listeners
ret_val = self.driver.deployable(self.lb)
self.assertFalse(ret_val)
# test no acceptable listeners
listener = data_models.Listener(
provisioning_status=constants.PENDING_DELETE,
admin_state_up=True)
self.lb.listeners.append(listener)
ret_val = self.driver.deployable(self.lb)
self.assertFalse(ret_val)
listener.provisioning_status = constants.PENDING_CREATE
listener.admin_state_up = False
ret_val = self.driver.deployable(self.lb)
self.assertFalse(ret_val)
# test bad lb status
listener.admin_state_up = True
self.lb.provisioning_status = constants.PENDING_DELETE
self.lb.admin_state_up = True
ret_val = self.driver.deployable(self.lb)
self.assertFalse(ret_val)
self.lb.provisioning_status = constants.PENDING_UPDATE
self.lb.admin_state_up = False
ret_val = self.driver.deployable(self.lb)
self.assertFalse(ret_val)
# test everything good
self.lb.admin_state_up = True
ret_val = self.driver.deployable(self.lb)
self.assertTrue(ret_val)
@mock.patch('neutron.agent.linux.utils.ensure_dir')
def test_get_state_file_path(self, ensure_dir):
path = self.driver._get_state_file_path(self.lb.id, 'conf',
ensure_state_dir=False)
self.assertEqual('/the/path/v2/lb1/conf', path)
self.assertFalse(ensure_dir.called)
path = self.driver._get_state_file_path(self.lb.id, 'conf')
self.assertEqual('/the/path/v2/lb1/conf', path)
self.assertTrue(ensure_dir.called)
@mock.patch('neutron.agent.linux.ip_lib.device_exists')
@mock.patch('neutron.agent.linux.ip_lib.IPWrapper')
def test_plug(self, ip_wrap, device_exists):
device_exists.return_value = True
interface_name = 'tap-d4nc3'
self.vif_driver.get_device_name.return_value = interface_name
self.assertRaises(exceptions.PreexistingDeviceFailure,
self.driver._plug,
'ns1', self.lb.vip_port, reuse_existing=False)
device_exists.assert_called_once_with(interface_name,
namespace='ns1')
self.rpc_mock.plug_vip_port.assert_called_once_with(
self.lb.vip_port.id)
device_exists.reset_mock()
self.rpc_mock.plug_vip_port.reset_mock()
mock_ns = ip_wrap.return_value
self.driver._plug('ns1', self.lb.vip_port)
self.rpc_mock.plug_vip_port.assert_called_once_with(
self.lb.vip_port.id)
device_exists.assert_called_once_with(interface_name,
namespace='ns1')
self.assertFalse(self.vif_driver.plug.called)
expected_cidrs = ['10.0.0.1/24']
self.vif_driver.init_l3.assert_called_once_with(
interface_name, expected_cidrs, namespace='ns1')
calls = [mock.call(['route', 'add', 'default', 'gw', '192.0.0.1'],
check_exit_code=False),
mock.call(['arping', '-U', '-I', interface_name,
'-c', 3, '10.0.0.1'],
check_exit_code=False)]
mock_ns.netns.execute.has_calls(calls)
self.assertEqual(2, mock_ns.netns.execute.call_count)
def test_unplug(self):
interface_name = 'tap-d4nc3'
self.vif_driver.get_device_name.return_value = interface_name
self.driver._unplug('ns1', self.lb.vip_port)
self.rpc_mock.unplug_vip_port.assert_called_once_with(
self.lb.vip_port.id)
self.vif_driver.get_device_name.assert_called_once_with(
self.lb.vip_port)
self.vif_driver.unplug.assert_called_once_with(interface_name,
namespace='ns1')
@mock.patch('neutron.agent.linux.utils.ensure_dir')
@mock.patch('neutron_lbaas.services.loadbalancer.drivers.haproxy.'
'jinja_cfg.save_config')
@mock.patch('neutron.agent.linux.ip_lib.IPWrapper')
def test_spawn(self, ip_wrap, jinja_save, ensure_dir):
mock_ns = ip_wrap.return_value
self.driver._spawn(self.lb)
conf_dir = self.driver.state_path + '/' + self.lb.id + '/%s'
jinja_save.assert_called_once_with(conf_dir % 'haproxy.conf',
self.lb,
conf_dir % 'haproxy_stats.sock',
'test_group')
ip_wrap.assert_called_once_with(
namespace=namespace_driver.get_ns_name(self.lb.id))
mock_ns.netns.execute.assert_called_once_with(
['haproxy', '-f', conf_dir % 'haproxy.conf', '-p',
conf_dir % 'haproxy.pid'])
self.assertIn(self.lb.id, self.driver.deployed_loadbalancers)
self.assertEqual(self.lb,
self.driver.deployed_loadbalancers[self.lb.id])
class BaseTestManager(base.BaseTestCase):
def setUp(self):
super(BaseTestManager, self).setUp()
self.driver = mock.Mock()
self.lb_manager = namespace_driver.LoadBalancerManager(self.driver)
self.listener_manager = namespace_driver.ListenerManager(self.driver)
self.pool_manager = namespace_driver.PoolManager(self.driver)
self.member_manager = namespace_driver.MemberManager(self.driver)
self.hm_manager = namespace_driver.HealthMonitorManager(self.driver)
self.refresh = self.driver.loadbalancer.refresh
class BaseTestLoadBalancerManager(BaseTestManager):
def setUp(self):
super(BaseTestLoadBalancerManager, self).setUp()
self.in_lb = data_models.LoadBalancer(id='lb1', listeners=[])
class TestLoadBalancerManager(BaseTestLoadBalancerManager):
@mock.patch.object(data_models.LoadBalancer, 'from_dict')
def test_refresh(self, lb_from_dict):
rpc_return = {'id': self.in_lb.id}
self.driver.plugin_rpc.get_loadbalancer.return_value = rpc_return
from_dict_return = data_models.LoadBalancer(id=self.in_lb.id)
lb_from_dict.return_value = from_dict_return
self.driver.deploy_instance.return_value = True
self.driver.exists.return_value = True
self.lb_manager.refresh(self.in_lb)
self.driver.plugin_rpc.get_loadbalancer.assert_called_once_with(
self.in_lb.id)
lb_from_dict.assert_called_once_with(rpc_return)
self.driver.deploy_instance.assert_called_once_with(from_dict_return)
self.assertFalse(self.driver.exists.called)
self.assertFalse(self.driver.undeploy_instance.called)
self.driver.reset_mock()
lb_from_dict.reset_mock()
self.driver.deploy_instance.return_value = False
self.driver.exists.return_value = False
self.lb_manager.refresh(self.in_lb)
self.driver.plugin_rpc.get_loadbalancer.assert_called_once_with(
self.in_lb.id)
lb_from_dict.assert_called_once_with(rpc_return)
self.driver.deploy_instance.assert_called_once_with(from_dict_return)
self.driver.exists.assert_called_once_with(self.in_lb.id)
self.assertFalse(self.driver.undeploy_instance.called)
self.driver.reset_mock()
lb_from_dict.reset_mock()
self.driver.deploy_instance.return_value = False
self.driver.exists.return_value = True
self.lb_manager.refresh(self.in_lb)
self.driver.plugin_rpc.get_loadbalancer.assert_called_once_with(
self.in_lb.id)
lb_from_dict.assert_called_once_with(rpc_return)
self.driver.deploy_instance.assert_called_once_with(from_dict_return)
self.driver.exists.assert_called_once_with(from_dict_return.id)
self.driver.undeploy_instance.assert_called_once_with(self.in_lb.id)
def test_delete(self):
self.driver.exists.return_value = False
self.lb_manager.delete(self.in_lb)
self.driver.exists.assert_called_once_with(self.in_lb.id)
self.assertFalse(self.driver.undeploy_instance.called)
self.driver.reset_mock()
self.driver.exists.return_value = True
self.lb_manager.delete(self.in_lb)
self.driver.exists.assert_called_once_with(self.in_lb.id)
self.driver.undeploy_instance.assert_called_once_with(self.in_lb.id)
def test_create(self):
self.lb_manager.refresh = mock.Mock()
self.lb_manager.create(self.in_lb)
self.assertFalse(self.lb_manager.refresh.called)
self.lb_manager.refresh.reset_mock()
self.in_lb.listeners.append(data_models.Listener(id='listener1'))
self.lb_manager.create(self.in_lb)
self.lb_manager.refresh.assert_called_once_with(self.in_lb)
def test_get_stats(self):
self.lb_manager.get_stats(self.in_lb.id)
self.driver.get_stats.assert_called_once_with(self.in_lb.id)
def test_update(self):
old_lb = data_models.LoadBalancer(id='lb0')
self.lb_manager.refresh = mock.Mock()
self.lb_manager.update(old_lb, self.in_lb)
self.lb_manager.refresh.assert_called_once_with(self.in_lb)
class BaseTestListenerManager(BaseTestLoadBalancerManager):
def setUp(self):
super(BaseTestListenerManager, self).setUp()
self.in_listener = data_models.Listener(id='listener1')
self.listener2 = data_models.Listener(id='listener2')
self.in_listener.loadbalancer = self.in_lb
self.listener2.loadbalancer = self.in_lb
self.refresh = self.driver.loadbalancer.refresh
class TestListenerManager(BaseTestListenerManager):
def setUp(self):
super(TestListenerManager, self).setUp()
self.in_listener = data_models.Listener(id='listener1')
self.listener2 = data_models.Listener(id='listener2')
self.in_lb.listeners = [self.in_listener, self.listener2]
self.in_listener.loadbalancer = self.in_lb
self.listener2.loadbalancer = self.in_lb
def test_remove_listener(self):
self.listener_manager._remove_listener(self.in_lb, self.in_listener.id)
self.assertEqual(1, len(self.in_lb.listeners))
self.assertEqual(self.listener2.id, self.in_lb.listeners[0].id)
def test_update(self):
old_listener = data_models.Listener(id='listener1', name='bleh')
self.listener_manager.update(old_listener, self.in_listener)
self.refresh.assert_called_once_with(self.in_lb)
def test_create(self):
self.listener_manager.create(self.in_listener)
self.refresh.assert_called_once_with(self.in_lb)
def test_delete(self):
self.listener_manager.delete(self.in_listener)
self.refresh.assert_called_once_with(self.in_lb)
self.assertFalse(self.driver.undeploy_instance.called)
self.refresh.reset_mock()
self.driver.reset_mock()
self.listener_manager.delete(self.listener2)
self.assertFalse(self.refresh.called)
self.driver.undeploy_instance.assert_called_once_with(self.in_lb.id)
class BaseTestPoolManager(BaseTestListenerManager):
def setUp(self):
super(BaseTestPoolManager, self).setUp()
self.in_pool = data_models.Pool(id='pool1')
self.in_listener.default_pool = self.in_pool
self.in_pool.listener = self.in_listener
class TestPoolManager(BaseTestPoolManager):
def test_update(self):
old_pool = data_models.Pool(id=self.in_pool.id, name='bleh')
self.pool_manager.update(old_pool, self.in_pool)
self.refresh.assert_called_once_with(self.in_lb)
def test_create(self):
self.pool_manager.create(self.in_pool)
self.refresh.assert_called_once_with(self.in_lb)
def test_delete(self):
self.pool_manager.delete(self.in_pool)
self.assertIsNone(self.in_listener.default_pool)
self.refresh.assert_called_once_with(self.in_lb)
class BaseTestMemberManager(BaseTestPoolManager):
def setUp(self):
super(BaseTestMemberManager, self).setUp()
self.in_member = data_models.Member(id='member1')
self.member2 = data_models.Member(id='member2')
self.in_pool.members = [self.in_member, self.member2]
self.in_member.pool = self.in_pool
self.member2.pool = self.in_pool
class TestMemberManager(BaseTestMemberManager):
def test_remove_member(self):
self.member_manager._remove_member(self.in_pool, self.in_member.id)
self.assertEqual(1, len(self.in_pool.members))
self.assertEqual(self.member2.id, self.in_pool.members[0].id)
def test_update(self):
old_member = data_models.Member(id=self.in_member.id,
address='0.0.0.0')
self.member_manager.update(old_member, self.in_member)
self.refresh.assert_called_once_with(self.in_lb)
def test_create(self):
self.member_manager.create(self.in_member)
self.refresh.assert_called_once_with(self.in_lb)
def test_delete(self):
self.member_manager.delete(self.in_member)
self.refresh.assert_called_once_with(self.in_lb)
class BaseTestHealthMonitorManager(BaseTestPoolManager):
def setUp(self):
super(BaseTestHealthMonitorManager, self).setUp()
self.in_hm = data_models.HealthMonitor(id='hm1')
self.in_pool.healthmonitor = self.in_hm
self.in_hm.pool = self.in_pool
class TestHealthMonitorManager(BaseTestHealthMonitorManager):
def test_update(self):
old_hm = data_models.HealthMonitor(id=self.in_hm.id, timeout=2)
self.hm_manager.update(old_hm, self.in_hm)
self.refresh.assert_called_once_with(self.in_lb)
def test_create(self):
self.hm_manager.create(self.in_hm)
self.refresh.assert_called_once_with(self.in_lb)
def test_delete(self):
self.hm_manager.delete(self.in_hm)
self.assertIsNone(self.in_pool.healthmonitor)
self.refresh.assert_called_once_with(self.in_lb)
class TestNamespaceDriverModule(base.BaseTestCase):
@mock.patch('os.path.exists')
@mock.patch('neutron.agent.linux.utils.execute')
def test_kill_pids_in_file(self, execute, exists):
pid_path = '/var/lib/data'
with mock.patch('__builtin__.open') as m_open:
exists.return_value = False
file_mock = mock.MagicMock()
m_open.return_value = file_mock
file_mock.__enter__.return_value = file_mock
file_mock.__iter__.return_value = iter(['123'])
namespace_driver.kill_pids_in_file(pid_path)
# sometimes fails
# exists.assert_called_once_with(pid_path)
self.assertFalse(m_open.called)
self.assertFalse(execute.called)
exists.return_value = True
execute.side_effect = RuntimeError
namespace_driver.kill_pids_in_file(pid_path)
# sometimes fails
# execute.assert_called_once_with(['kill', '-9', '123'])
def test_get_ns_name(self):
ns_name = namespace_driver.get_ns_name('woohoo')
self.assertEqual(namespace_driver.NS_PREFIX + 'woohoo', ns_name)