diff --git a/etc/neutron_lbaas.conf b/etc/neutron_lbaas.conf index 54891fa60..f6413a99d 100644 --- a/etc/neutron_lbaas.conf +++ b/etc/neutron_lbaas.conf @@ -54,7 +54,9 @@ service_provider=LOADBALANCER:Haproxy:neutron_lbaas.services.loadbalancer.driver # service_provider=LOADBALANCER:NetScaler:neutron_lbaas.services.loadbalancer.drivers.netscaler.netscaler_driver.NetScalerPluginDriver # service_provider=LOADBALANCER:Embrane:neutron_lbaas.services.loadbalancer.drivers.embrane.driver.EmbraneLbaas:default # service_provider = LOADBALANCER:A10Networks:neutron_lbaas.services.loadbalancer.drivers.a10networks.driver_v1.ThunderDriver:default +# LBaaS v2 drivers # service_provider = LOADBALANCERV2:LoggingNoop:neutron_lbaas.drivers.logging_noop.driver.LoggingNoopLoadBalancerDriver:default +# service_provider=LOADBALANCERV2:Haproxy:neutron_lbaas.drivers.haproxy.plugin_driver.HaproxyOnHostPluginDriver:default [certificates] # cert_manager_class = neutron_lbaas.common.cert_manager.local_cert_manager diff --git a/neutron_lbaas/db/loadbalancer/loadbalancer_dbv2.py b/neutron_lbaas/db/loadbalancer/loadbalancer_dbv2.py index c29331257..b642e2cd1 100644 --- a/neutron_lbaas/db/loadbalancer/loadbalancer_dbv2.py +++ b/neutron_lbaas/db/loadbalancer/loadbalancer_dbv2.py @@ -509,7 +509,6 @@ class LoadBalancerPluginDbv2(base_db.CommonDbMixin, with context.session.begin(subtransactions=True): lb_db = self._get_resource(context, models.LoadBalancer, loadbalancer_id) - self.assert_modification_allowed(lb_db) lb_db.stats = self._create_loadbalancer_stats(context, loadbalancer_id, data=stats_data) diff --git a/neutron_lbaas/drivers/haproxy/namespace_driver.py b/neutron_lbaas/drivers/haproxy/namespace_driver.py new file mode 100644 index 000000000..897cb06ff --- /dev/null +++ b/neutron_lbaas/drivers/haproxy/namespace_driver.py @@ -0,0 +1,467 @@ +# Copyright 2013 New Dream Network, LLC (DreamHost) +# Copyright 2015 Rackspace +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import shutil +import socket + +import netaddr +from neutron.agent.linux import ip_lib +from neutron.agent.linux import utils as linux_utils +from neutron.common import exceptions +from neutron.common import utils as n_utils +from neutron.i18n import _LI, _LE, _LW +from neutron.plugins.common import constants +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import excutils +from oslo_utils import importutils + +from neutron_lbaas.agent import agent_device_driver +from neutron_lbaas.services.loadbalancer import constants as lb_const +from neutron_lbaas.services.loadbalancer import data_models +from neutron_lbaas.services.loadbalancer.drivers.haproxy import jinja_cfg +from neutron_lbaas.services.loadbalancer.drivers.haproxy \ + import namespace_driver + +LOG = logging.getLogger(__name__) +NS_PREFIX = 'qlbaas-' +STATS_TYPE_BACKEND_REQUEST = 2 +STATS_TYPE_BACKEND_RESPONSE = '1' +STATS_TYPE_SERVER_REQUEST = 4 +STATS_TYPE_SERVER_RESPONSE = '2' +DRIVER_NAME = 'haproxy_ns' + +STATE_PATH_V2_APPEND = 'v2' + +cfg.CONF.register_opts(namespace_driver.OPTS, 'haproxy') + + +def get_ns_name(namespace_id): + return NS_PREFIX + namespace_id + + +class HaproxyNSDriver(agent_device_driver.AgentDeviceDriver): + + def __init__(self, conf, plugin_rpc): + super(HaproxyNSDriver, self).__init__(conf, plugin_rpc) + self.state_path = conf.haproxy.loadbalancer_state_path + self.state_path = os.path.join( + self.conf.haproxy.loadbalancer_state_path, STATE_PATH_V2_APPEND) + try: + vif_driver = importutils.import_object(conf.interface_driver, conf) + except ImportError: + with excutils.save_and_reraise_exception(): + msg = (_('Error importing interface driver: %s') + % conf.interface_driver) + LOG.error(msg) + + self.vif_driver = vif_driver + self.deployed_loadbalancers = {} + self._loadbalancer = LoadBalancerManager(self) + self._listener = ListenerManager(self) + self._pool = PoolManager(self) + self._member = MemberManager(self) + self._healthmonitor = HealthMonitorManager(self) + + @property + def loadbalancer(self): + return self._loadbalancer + + @property + def listener(self): + return self._listener + + @property + def pool(self): + return self._pool + + @property + def member(self): + return self._member + + @property + def healthmonitor(self): + return self._healthmonitor + + def get_name(self): + return DRIVER_NAME + + @n_utils.synchronized('haproxy-driver') + def undeploy_instance(self, loadbalancer_id, **kwargs): + cleanup_namespace = kwargs.get('cleanup_namespace', False) + delete_namespace = kwargs.get('delete_namespace', False) + namespace = get_ns_name(loadbalancer_id) + pid_path = self._get_state_file_path(loadbalancer_id, 'pid') + + # kill the process + kill_pids_in_file(pid_path) + + # unplug the ports + if loadbalancer_id in self.deployed_loadbalancers: + self._unplug(namespace, + self.deployed_loadbalancers[loadbalancer_id].vip_port) + + # delete all devices from namespace + # used when deleting orphans and port is not known for a loadbalancer + if cleanup_namespace: + ns = ip_lib.IPWrapper(namespace=namespace) + for device in ns.get_devices(exclude_loopback=True): + self.vif_driver.unplug(device.name, namespace=namespace) + + # remove the configuration directory + conf_dir = os.path.dirname( + self._get_state_file_path(loadbalancer_id, '')) + if os.path.isdir(conf_dir): + shutil.rmtree(conf_dir) + + if delete_namespace: + ns = ip_lib.IPWrapper(namespace=namespace) + ns.garbage_collect_namespace() + + def remove_orphans(self, known_loadbalancer_ids): + if not os.path.exists(self.state_path): + return + + orphans = (lb_id for lb_id in os.listdir(self.state_path) + if lb_id not in known_loadbalancer_ids) + for lb_id in orphans: + if self.exists(lb_id): + self.undeploy_instance(lb_id, cleanup_namespace=True) + + def get_stats(self, loadbalancer_id): + socket_path = self._get_state_file_path(loadbalancer_id, + 'haproxy_stats.sock', False) + if os.path.exists(socket_path): + parsed_stats = self._get_stats_from_socket( + socket_path, + entity_type=(STATS_TYPE_BACKEND_REQUEST | + STATS_TYPE_SERVER_REQUEST)) + lb_stats = self._get_backend_stats(parsed_stats) + lb_stats['members'] = self._get_servers_stats(parsed_stats) + return lb_stats + else: + LOG.warn(_LW('Stats socket not found for loadbalancer %s') % + loadbalancer_id) + return {} + + @n_utils.synchronized('haproxy-driver') + def deploy_instance(self, loadbalancer): + """Deploys loadbalancer if necessary + + :return: True if loadbalancer was deployed, False otherwise + """ + if not self.deployable(loadbalancer): + LOG.info(_LI("Loadbalancer %s is not deployable.") % + loadbalancer.id) + return False + + if self.exists(loadbalancer.id): + self.update(loadbalancer) + else: + self.create(loadbalancer) + return True + + def update(self, loadbalancer): + pid_path = self._get_state_file_path(loadbalancer.id, 'haproxy.pid') + extra_args = ['-sf'] + extra_args.extend(p.strip() for p in open(pid_path, 'r')) + self._spawn(loadbalancer, extra_args) + + def exists(self, loadbalancer_id): + namespace = get_ns_name(loadbalancer_id) + root_ns = ip_lib.IPWrapper() + + socket_path = self._get_state_file_path( + loadbalancer_id, 'haproxy_stats.sock', False) + if root_ns.netns.exists(namespace) and os.path.exists(socket_path): + try: + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(socket_path) + return True + except socket.error: + pass + return False + + def create(self, loadbalancer): + namespace = get_ns_name(loadbalancer.id) + + self._plug(namespace, loadbalancer.vip_port) + self._spawn(loadbalancer) + + def deployable(self, loadbalancer): + """Returns True if loadbalancer is active and has active listeners.""" + if not loadbalancer: + return False + acceptable_listeners = [ + listener for listener in loadbalancer.listeners + if (listener.provisioning_status != constants.PENDING_DELETE and + listener.admin_state_up)] + return (bool(acceptable_listeners) and loadbalancer.admin_state_up and + loadbalancer.provisioning_status != constants.PENDING_DELETE) + + def _get_stats_from_socket(self, socket_path, entity_type): + try: + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(socket_path) + s.send('show stat -1 %s -1\n' % entity_type) + raw_stats = '' + chunk_size = 1024 + while True: + chunk = s.recv(chunk_size) + raw_stats += chunk + if len(chunk) < chunk_size: + break + + return self._parse_stats(raw_stats) + except socket.error as e: + LOG.warn(_LW('Error while connecting to stats socket: %s'), e) + return {} + + def _parse_stats(self, raw_stats): + stat_lines = raw_stats.splitlines() + if len(stat_lines) < 2: + return [] + stat_names = [name.strip('# ') for name in stat_lines[0].split(',')] + res_stats = [] + for raw_values in stat_lines[1:]: + if not raw_values: + continue + stat_values = [value.strip() for value in raw_values.split(',')] + res_stats.append(dict(zip(stat_names, stat_values))) + + return res_stats + + def _get_backend_stats(self, parsed_stats): + for stats in parsed_stats: + if stats.get('type') == STATS_TYPE_BACKEND_RESPONSE: + unified_stats = dict((k, stats.get(v, '')) + for k, v in jinja_cfg.STATS_MAP.items()) + return unified_stats + + return {} + + def _get_servers_stats(self, parsed_stats): + res = {} + for stats in parsed_stats: + if stats.get('type') == STATS_TYPE_SERVER_RESPONSE: + res[stats['svname']] = { + lb_const.STATS_STATUS: (constants.INACTIVE + if stats['status'] == 'DOWN' + else constants.ACTIVE), + lb_const.STATS_HEALTH: stats['check_status'], + lb_const.STATS_FAILED_CHECKS: stats['chkfail'] + } + return res + + def _get_state_file_path(self, loadbalancer_id, kind, + ensure_state_dir=True): + """Returns the file name for a given kind of config file.""" + confs_dir = os.path.abspath(os.path.normpath(self.state_path)) + conf_dir = os.path.join(confs_dir, loadbalancer_id) + if ensure_state_dir: + linux_utils.ensure_dir(conf_dir) + return os.path.join(conf_dir, kind) + + def _plug(self, namespace, port, reuse_existing=True): + self.plugin_rpc.plug_vip_port(port.id) + + interface_name = self.vif_driver.get_device_name(port) + + if ip_lib.device_exists(interface_name, + namespace=namespace): + if not reuse_existing: + raise exceptions.PreexistingDeviceFailure( + dev_name=interface_name + ) + else: + self.vif_driver.plug( + port.network_id, + port.id, + interface_name, + port.mac_address, + namespace=namespace + ) + + cidrs = [ + '%s/%s' % (ip.ip_address, + netaddr.IPNetwork(ip.subnet.cidr).prefixlen) + for ip in port.fixed_ips + ] + self.vif_driver.init_l3(interface_name, cidrs, namespace=namespace) + + gw_ip = port.fixed_ips[0].subnet.gateway_ip + + if not gw_ip: + host_routes = port.fixed_ips[0].subnet.host_routes + for host_route in host_routes: + if host_route.destination == "0.0.0.0/0": + gw_ip = host_route.nexthop + break + + if gw_ip: + cmd = ['route', 'add', 'default', 'gw', gw_ip] + ip_wrapper = ip_lib.IPWrapper(namespace=namespace) + ip_wrapper.netns.execute(cmd, check_exit_code=False) + # When delete and re-add the same vip, we need to + # send gratuitous ARP to flush the ARP cache in the Router. + gratuitous_arp = self.conf.haproxy.send_gratuitous_arp + if gratuitous_arp > 0: + for ip in port.fixed_ips: + cmd_arping = ['arping', '-U', + '-I', interface_name, + '-c', gratuitous_arp, + ip.ip_address] + ip_wrapper.netns.execute(cmd_arping, check_exit_code=False) + + def _unplug(self, namespace, port): + self.plugin_rpc.unplug_vip_port(port.id) + interface_name = self.vif_driver.get_device_name(port) + self.vif_driver.unplug(interface_name, namespace=namespace) + + def _spawn(self, loadbalancer, extra_cmd_args=()): + namespace = get_ns_name(loadbalancer.id) + conf_path = self._get_state_file_path(loadbalancer.id, 'haproxy.conf') + pid_path = self._get_state_file_path(loadbalancer.id, + 'haproxy.pid') + sock_path = self._get_state_file_path(loadbalancer.id, + 'haproxy_stats.sock') + user_group = self.conf.haproxy.user_group + + jinja_cfg.save_config(conf_path, loadbalancer, sock_path, user_group) + cmd = ['haproxy', '-f', conf_path, '-p', pid_path] + cmd.extend(extra_cmd_args) + + ns = ip_lib.IPWrapper(namespace=namespace) + ns.netns.execute(cmd) + + # remember deployed loadbalancer id + self.deployed_loadbalancers[loadbalancer.id] = loadbalancer + + +class LoadBalancerManager(agent_device_driver.BaseLoadBalancerManager): + + def refresh(self, loadbalancer): + loadbalancer_dict = self.driver.plugin_rpc.get_loadbalancer( + loadbalancer.id) + loadbalancer = data_models.LoadBalancer.from_dict(loadbalancer_dict) + if (not self.driver.deploy_instance(loadbalancer) and + self.driver.exists(loadbalancer.id)): + self.driver.undeploy_instance(loadbalancer.id) + + def delete(self, loadbalancer): + if self.driver.exists(loadbalancer.id): + self.driver.undeploy_instance(loadbalancer.id) + + def create(self, loadbalancer): + # loadbalancer has no listeners then do nothing because haproxy will + # not start without a tcp port. Consider this successful anyway. + if not loadbalancer.listeners: + return + self.refresh(loadbalancer) + + def get_stats(self, loadbalancer_id): + return self.driver.get_stats(loadbalancer_id) + + def update(self, old_loadbalancer, loadbalancer): + self.refresh(loadbalancer) + + +class ListenerManager(agent_device_driver.BaseListenerManager): + + def _remove_listener(self, loadbalancer, listener_id): + index_to_remove = None + for index, listener in enumerate(loadbalancer.listeners): + if listener.id == listener_id: + index_to_remove = index + loadbalancer.listeners.pop(index_to_remove) + + def update(self, old_listener, new_listener): + self.driver.loadbalancer.refresh(new_listener.loadbalancer) + + def create(self, listener): + self.driver.loadbalancer.refresh(listener.loadbalancer) + + def delete(self, listener): + loadbalancer = listener.loadbalancer + self._remove_listener(loadbalancer, listener.id) + if len(loadbalancer.listeners) > 0: + self.driver.loadbalancer.refresh(loadbalancer) + else: + # undeploy instance because haproxy will throw error if port is + # missing in frontend + self.driver.undeploy_instance(loadbalancer.id) + + +class PoolManager(agent_device_driver.BasePoolManager): + + def update(self, old_pool, new_pool): + self.driver.loadbalancer.refresh(new_pool.listener.loadbalancer) + + def create(self, pool): + self.driver.loadbalancer.refresh(pool.listener.loadbalancer) + + def delete(self, pool): + loadbalancer = pool.listener.loadbalancer + pool.listener.default_pool = None + # just refresh because haproxy is fine if only frontend is listed + self.driver.loadbalancer.refresh(loadbalancer) + + +class MemberManager(agent_device_driver.BaseMemberManager): + + def _remove_member(self, pool, member_id): + index_to_remove = None + for index, member in enumerate(pool.members): + if member.id == member_id: + index_to_remove = index + pool.members.pop(index_to_remove) + + def update(self, old_member, new_member): + self.driver.loadbalancer.refresh(new_member.pool.listener.loadbalancer) + + def create(self, member): + self.driver.loadbalancer.refresh(member.pool.listener.loadbalancer) + + def delete(self, member): + self._remove_member(member.pool, member.id) + self.driver.loadbalancer.refresh(member.pool.listener.loadbalancer) + + +class HealthMonitorManager(agent_device_driver.BaseHealthMonitorManager): + + def update(self, old_hm, new_hm): + self.driver.loadbalancer.refresh(new_hm.pool.listener.loadbalancer) + + def create(self, hm): + self.driver.loadbalancer.refresh(hm.pool.listener.loadbalancer) + + def delete(self, hm): + hm.pool.healthmonitor = None + self.driver.loadbalancer.refresh(hm.pool.listener.loadbalancer) + + +def kill_pids_in_file(pid_path): + if os.path.exists(pid_path): + with open(pid_path, 'r') as pids: + for pid in pids: + pid = pid.strip() + try: + linux_utils.execute(['kill', '-9', pid]) + except RuntimeError: + LOG.exception( + _LE('Unable to kill haproxy process: %s'), + pid + ) diff --git a/neutron_lbaas/drivers/haproxy/plugin_driver.py b/neutron_lbaas/drivers/haproxy/plugin_driver.py index efce06739..2bc9470b8 100644 --- a/neutron_lbaas/drivers/haproxy/plugin_driver.py +++ b/neutron_lbaas/drivers/haproxy/plugin_driver.py @@ -1,3 +1,4 @@ +# Copyright (c) 2013 OpenStack Foundation. # Copyright (c) 2015 Rackspace. # All Rights Reserved. # @@ -14,7 +15,8 @@ # under the License. from neutron_lbaas.drivers.common import agent_driver_base +from neutron_lbaas.drivers.haproxy import namespace_driver class HaproxyOnHostPluginDriver(agent_driver_base.AgentDriverBase): - device_driver = 'test' + device_driver = namespace_driver.DRIVER_NAME diff --git a/neutron_lbaas/tests/unit/drivers/haproxy/test_namespace_driver.py b/neutron_lbaas/tests/unit/drivers/haproxy/test_namespace_driver.py new file mode 100644 index 000000000..270cbff3a --- /dev/null +++ b/neutron_lbaas/tests/unit/drivers/haproxy/test_namespace_driver.py @@ -0,0 +1,686 @@ +# Copyright 2013 New Dream Network, LLC (DreamHost) +# Copyright 2015 Rackspace +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import contextlib +import socket + +import mock +from neutron.common import exceptions +from neutron.plugins.common import constants + +from neutron_lbaas.drivers.haproxy import namespace_driver +from neutron_lbaas.services.loadbalancer import data_models +from neutron_lbaas.tests import base + + +class TestHaproxyNSDriver(base.BaseTestCase): + + def setUp(self): + super(TestHaproxyNSDriver, self).setUp() + + conf = mock.Mock() + conf.haproxy.loadbalancer_state_path = '/the/path' + conf.interface_driver = 'intdriver' + conf.haproxy.user_group = 'test_group' + conf.haproxy.send_gratuitous_arp = 3 + self.conf = conf + self.mock_importer = mock.patch.object(namespace_driver, + 'importutils').start() + + self.rpc_mock = mock.Mock() + self.driver = namespace_driver.HaproxyNSDriver( + conf, + self.rpc_mock + ) + self.vif_driver = mock.Mock() + self.driver.vif_driver = self.vif_driver + self._build_mock_data_models() + + def _build_mock_data_models(self): + host_route = data_models.HostRoute(destination='0.0.0.0/0', + nexthop='192.0.0.1') + subnet = data_models.Subnet(cidr='10.0.0.1/24', + gateway_ip='10.0.0.2', + host_routes=[host_route]) + fixed_ip = data_models.IPAllocation(ip_address='10.0.0.1') + setattr(fixed_ip, 'subnet', subnet) + port = data_models.Port(id='port1', network_id='network1', + mac_address='12-34-56-78-9A-BC', + fixed_ips=[fixed_ip]) + self.lb = data_models.LoadBalancer(id='lb1', listeners=[], + vip_port=port) + + def test_get_name(self): + self.assertEqual(namespace_driver.DRIVER_NAME, self.driver.get_name()) + + @mock.patch('neutron.agent.linux.ip_lib.IPWrapper') + @mock.patch('os.path.dirname') + @mock.patch('os.path.isdir') + @mock.patch('shutil.rmtree') + def test_undeploy_instance(self, mock_shutil, mock_isdir, mock_dirname, + mock_ip_wrap): + self.driver._get_state_file_path = mock.Mock(return_value='/path') + namespace_driver.kill_pids_in_file = mock.Mock() + self.driver._unplug = mock.Mock() + mock_dirname.return_value = '/path/' + self.lb.id + mock_isdir.return_value = False + + self.driver.undeploy_instance(self.lb.id) + namespace_driver.kill_pids_in_file.assert_called_once_with('/path') + calls = [mock.call(self.lb.id, 'pid'), mock.call(self.lb.id, '')] + self.driver._get_state_file_path.has_calls(calls) + self.assertFalse(self.driver._unplug.called) + self.assertFalse(mock_ip_wrap.called) + mock_isdir.assert_called_once_with('/path/' + self.lb.id) + self.assertFalse(mock_shutil.called) + + self.driver.deployed_loadbalancers[self.lb.id] = self.lb + mock_isdir.return_value = True + namespace_driver.kill_pids_in_file.reset_mock() + mock_isdir.reset_mock() + mock_ns = mock_ip_wrap.return_value + mock_ns.get_devices.return_value = [collections.namedtuple( + 'Device', ['name'])(name='test_device')] + self.driver.undeploy_instance(self.lb.id, cleanup_namespace=True, + delete_namespace=True) + ns = namespace_driver.get_ns_name(self.lb.id) + namespace_driver.kill_pids_in_file.assert_called_once_with('/path') + calls = [mock.call(self.lb.id, 'pid'), mock.call(self.lb.id, '')] + self.driver._get_state_file_path.has_calls(calls) + self.driver._unplug.assert_called_once_with(ns, self.lb.vip_port) + ip_wrap_calls = [mock.call(namespace=ns), mock.call(namespace=ns)] + mock_ip_wrap.has_calls(ip_wrap_calls) + mock_ns.get_devices.assert_called_once_with(exclude_loopback=True) + self.vif_driver.unplug.assert_called_once_with('test_device', + namespace=ns) + mock_shutil.assert_called_once_with('/path/' + self.lb.id) + mock_ns.garbage_collect_namespace.assert_called_once_with() + + @mock.patch('os.path.exists') + @mock.patch('os.listdir') + def test_remove_orphans(self, list_dir, exists): + lb_ids = [self.lb.id] + exists.return_value = False + self.driver.remove_orphans(lb_ids) + exists.assert_called_once_with(self.driver.state_path) + self.assertFalse(list_dir.called) + + exists.reset_mock() + exists.return_value = True + list_dir.return_value = [self.lb.id, 'lb2'] + self.driver.exists = mock.Mock() + self.driver.undeploy_instance = mock.Mock() + self.driver.remove_orphans(lb_ids) + exists.assert_called_once_with(self.driver.state_path) + list_dir.assert_called_once_with(self.driver.state_path) + self.driver.exists.assert_called_once_with('lb2') + self.driver.undeploy_instance.assert_called_once_with( + 'lb2', cleanup_namespace=True) + + def test_get_stats(self): + # Shamelessly stolen from v1 namespace driver tests. + raw_stats = ('# pxname,svname,qcur,qmax,scur,smax,slim,stot,bin,bout,' + 'dreq,dresp,ereq,econ,eresp,wretr,wredis,status,weight,' + 'act,bck,chkfail,chkdown,lastchg,downtime,qlimit,pid,iid,' + 'sid,throttle,lbtot,tracked,type,rate,rate_lim,rate_max,' + 'check_status,check_code,check_duration,hrsp_1xx,' + 'hrsp_2xx,hrsp_3xx,hrsp_4xx,hrsp_5xx,hrsp_other,hanafail,' + 'req_rate,req_rate_max,req_tot,cli_abrt,srv_abrt,\n' + '8e271901-69ed-403e-a59b-f53cf77ef208,BACKEND,1,2,3,4,0,' + '10,7764,2365,0,0,,0,0,0,0,UP,1,1,0,,0,103780,0,,1,2,0,,0' + ',,1,0,,0,,,,0,0,0,0,0,0,,,,,0,0,\n\n' + 'a557019b-dc07-4688-9af4-f5cf02bb6d4b,' + '32a6c2a3-420a-44c3-955d-86bd2fc6871e,0,0,0,1,,7,1120,' + '224,,0,,0,0,0,0,UP,1,1,0,0,1,2623,303,,1,2,1,,7,,2,0,,' + '1,L7OK,200,98,0,7,0,0,0,0,0,,,,0,0,\n' + 'a557019b-dc07-4688-9af4-f5cf02bb6d4b,' + 'd9aea044-8867-4e80-9875-16fb808fa0f9,0,0,0,2,,12,0,0,,' + '0,,0,0,8,4,DOWN,1,1,0,9,2,308,675,,1,2,2,,4,,2,0,,2,' + 'L4CON,,2999,0,0,0,0,0,0,0,,,,0,0,\n') + raw_stats_empty = ('# pxname,svname,qcur,qmax,scur,smax,slim,stot,bin,' + 'bout,dreq,dresp,ereq,econ,eresp,wretr,wredis,' + 'status,weight,act,bck,chkfail,chkdown,lastchg,' + 'downtime,qlimit,pid,iid,sid,throttle,lbtot,' + 'tracked,type,rate,rate_lim,rate_max,check_status,' + 'check_code,check_duration,hrsp_1xx,hrsp_2xx,' + 'hrsp_3xx,hrsp_4xx,hrsp_5xx,hrsp_other,hanafail,' + 'req_rate,req_rate_max,req_tot,cli_abrt,srv_abrt,' + '\n') + with contextlib.nested( + mock.patch.object(self.driver, '_get_state_file_path'), + mock.patch('socket.socket'), + mock.patch('os.path.exists'), + ) as (gsp, mocket, path_exists): + gsp.side_effect = lambda x, y, z: '/pool/' + y + path_exists.return_value = True + mocket.return_value = mocket + mocket.recv.return_value = raw_stats + + exp_stats = {'connection_errors': '0', + 'active_connections': '3', + 'current_sessions': '3', + 'bytes_in': '7764', + 'max_connections': '4', + 'max_sessions': '4', + 'bytes_out': '2365', + 'response_errors': '0', + 'total_sessions': '10', + 'total_connections': '10', + 'members': { + '32a6c2a3-420a-44c3-955d-86bd2fc6871e': { + 'status': 'ACTIVE', + 'health': 'L7OK', + 'failed_checks': '0' + }, + 'd9aea044-8867-4e80-9875-16fb808fa0f9': { + 'status': 'INACTIVE', + 'health': 'L4CON', + 'failed_checks': '9' + } + } + } + stats = self.driver.get_stats(self.lb.id) + self.assertEqual(exp_stats, stats) + + mocket.recv.return_value = raw_stats_empty + self.assertEqual({'members': {}}, + self.driver.get_stats(self.lb.id)) + + path_exists.return_value = False + mocket.reset_mock() + self.assertEqual({}, self.driver.get_stats(self.lb.id)) + self.assertFalse(mocket.called) + + def test_deploy_instance(self): + self.driver.deployable = mock.Mock(return_value=False) + self.driver.exists = mock.Mock(return_value=True) + self.driver.update = mock.Mock() + self.driver.create = mock.Mock() + + def reset(): + self.driver.deployable.reset_mock() + self.driver.exists.reset_mock() + self.driver.update.reset_mock() + self.driver.create.reset_mock() + + deployed = self.driver.deploy_instance(self.lb) + self.assertFalse(deployed) + self.assertFalse(self.driver.exists.called) + self.assertFalse(self.driver.create.called) + self.assertFalse(self.driver.update.called) + + reset() + self.driver.deployable.return_value = True + deployed = self.driver.deploy_instance(self.lb) + self.assertTrue(deployed) + self.driver.exists.assert_called_once_with(self.lb.id) + self.driver.update.assert_called_once_with(self.lb) + self.assertFalse(self.driver.create.called) + + reset() + self.driver.exists.return_value = False + deployed = self.driver.deploy_instance(self.lb) + self.assertTrue(deployed) + self.driver.exists.assert_called_once_with(self.lb.id) + self.driver.create.assert_called_once_with(self.lb) + self.assertFalse(self.driver.update.called) + + def test_update(self): + self.driver._get_state_file_path = mock.Mock(return_value='/path') + self.driver._spawn = mock.Mock() + with mock.patch('__builtin__.open') as m_open: + file_mock = mock.MagicMock() + m_open.return_value = file_mock + file_mock.__enter__.return_value = file_mock + file_mock.__iter__.return_value = iter(['123']) + self.driver.update(self.lb) + self.driver._spawn.assert_called_once_with(self.lb, + ['-sf', '123']) + + @mock.patch('socket.socket') + @mock.patch('os.path.exists') + @mock.patch('neutron.agent.linux.ip_lib.IPWrapper') + def test_exists(self, ip_wrap, exists, mocket): + socket_path = '/path/haproxy_stats.sock' + mock_ns = ip_wrap.return_value + mock_socket = mocket.return_value + self.driver._get_state_file_path = mock.Mock(return_value=socket_path) + mock_ns.netns.exists.return_value = False + exists.return_value = False + + def reset(): + ip_wrap.reset_mock() + self.driver._get_state_file_path.reset_mock() + mock_ns.reset_mock() + exists.reset_mock() + mocket.reset_mock() + mock_socket.reset_mock() + + ret_exists = self.driver.exists(self.lb.id) + ip_wrap.assert_called_once_with() + self.driver._get_state_file_path.assert_called_once_with( + self.lb.id, 'haproxy_stats.sock', False) + mock_ns.netns.exists.assert_called_once_with( + namespace_driver.get_ns_name(self.lb.id)) + self.assertFalse(exists.called) + self.assertFalse(mocket.called) + self.assertFalse(mock_socket.connect.called) + self.assertFalse(ret_exists) + + reset() + mock_ns.netns.exists.return_value = True + exists.return_value = False + ret_exists = self.driver.exists(self.lb.id) + ip_wrap.assert_called_once_with() + self.driver._get_state_file_path.assert_called_once_with( + self.lb.id, 'haproxy_stats.sock', False) + mock_ns.netns.exists.assert_called_once_with( + namespace_driver.get_ns_name(self.lb.id)) + exists.assert_called_once_with(socket_path) + self.assertFalse(mocket.called) + self.assertFalse(mock_socket.connect.called) + self.assertFalse(ret_exists) + + reset() + mock_ns.netns.exists.return_value = True + exists.return_value = True + ret_exists = self.driver.exists(self.lb.id) + ip_wrap.assert_called_once_with() + self.driver._get_state_file_path.assert_called_once_with( + self.lb.id, 'haproxy_stats.sock', False) + mock_ns.netns.exists.assert_called_once_with( + namespace_driver.get_ns_name(self.lb.id)) + exists.assert_called_once_with(socket_path) + mocket.assert_called_once_with(socket.AF_UNIX, socket.SOCK_STREAM) + mock_socket.connect.assert_called_once_with(socket_path) + self.assertTrue(ret_exists) + + def test_create(self): + self.driver._plug = mock.Mock() + self.driver._spawn = mock.Mock() + self.driver.create(self.lb) + self.driver._plug.assert_called_once_with( + namespace_driver.get_ns_name(self.lb.id), self.lb.vip_port) + self.driver._spawn.assert_called_once_with(self.lb) + + def test_deployable(self): + # test None + ret_val = self.driver.deployable(None) + self.assertFalse(ret_val) + + # test no listeners + ret_val = self.driver.deployable(self.lb) + self.assertFalse(ret_val) + + # test no acceptable listeners + listener = data_models.Listener( + provisioning_status=constants.PENDING_DELETE, + admin_state_up=True) + self.lb.listeners.append(listener) + ret_val = self.driver.deployable(self.lb) + self.assertFalse(ret_val) + listener.provisioning_status = constants.PENDING_CREATE + listener.admin_state_up = False + ret_val = self.driver.deployable(self.lb) + self.assertFalse(ret_val) + + # test bad lb status + listener.admin_state_up = True + self.lb.provisioning_status = constants.PENDING_DELETE + self.lb.admin_state_up = True + ret_val = self.driver.deployable(self.lb) + self.assertFalse(ret_val) + self.lb.provisioning_status = constants.PENDING_UPDATE + self.lb.admin_state_up = False + ret_val = self.driver.deployable(self.lb) + self.assertFalse(ret_val) + + # test everything good + self.lb.admin_state_up = True + ret_val = self.driver.deployable(self.lb) + self.assertTrue(ret_val) + + @mock.patch('neutron.agent.linux.utils.ensure_dir') + def test_get_state_file_path(self, ensure_dir): + path = self.driver._get_state_file_path(self.lb.id, 'conf', + ensure_state_dir=False) + self.assertEqual('/the/path/v2/lb1/conf', path) + self.assertFalse(ensure_dir.called) + path = self.driver._get_state_file_path(self.lb.id, 'conf') + self.assertEqual('/the/path/v2/lb1/conf', path) + self.assertTrue(ensure_dir.called) + + @mock.patch('neutron.agent.linux.ip_lib.device_exists') + @mock.patch('neutron.agent.linux.ip_lib.IPWrapper') + def test_plug(self, ip_wrap, device_exists): + device_exists.return_value = True + interface_name = 'tap-d4nc3' + self.vif_driver.get_device_name.return_value = interface_name + self.assertRaises(exceptions.PreexistingDeviceFailure, + self.driver._plug, + 'ns1', self.lb.vip_port, reuse_existing=False) + device_exists.assert_called_once_with(interface_name, + namespace='ns1') + self.rpc_mock.plug_vip_port.assert_called_once_with( + self.lb.vip_port.id) + + device_exists.reset_mock() + self.rpc_mock.plug_vip_port.reset_mock() + mock_ns = ip_wrap.return_value + self.driver._plug('ns1', self.lb.vip_port) + self.rpc_mock.plug_vip_port.assert_called_once_with( + self.lb.vip_port.id) + device_exists.assert_called_once_with(interface_name, + namespace='ns1') + self.assertFalse(self.vif_driver.plug.called) + expected_cidrs = ['10.0.0.1/24'] + self.vif_driver.init_l3.assert_called_once_with( + interface_name, expected_cidrs, namespace='ns1') + calls = [mock.call(['route', 'add', 'default', 'gw', '192.0.0.1'], + check_exit_code=False), + mock.call(['arping', '-U', '-I', interface_name, + '-c', 3, '10.0.0.1'], + check_exit_code=False)] + mock_ns.netns.execute.has_calls(calls) + self.assertEqual(2, mock_ns.netns.execute.call_count) + + def test_unplug(self): + interface_name = 'tap-d4nc3' + self.vif_driver.get_device_name.return_value = interface_name + self.driver._unplug('ns1', self.lb.vip_port) + self.rpc_mock.unplug_vip_port.assert_called_once_with( + self.lb.vip_port.id) + self.vif_driver.get_device_name.assert_called_once_with( + self.lb.vip_port) + self.vif_driver.unplug.assert_called_once_with(interface_name, + namespace='ns1') + + @mock.patch('neutron.agent.linux.utils.ensure_dir') + @mock.patch('neutron_lbaas.services.loadbalancer.drivers.haproxy.' + 'jinja_cfg.save_config') + @mock.patch('neutron.agent.linux.ip_lib.IPWrapper') + def test_spawn(self, ip_wrap, jinja_save, ensure_dir): + mock_ns = ip_wrap.return_value + self.driver._spawn(self.lb) + conf_dir = self.driver.state_path + '/' + self.lb.id + '/%s' + jinja_save.assert_called_once_with(conf_dir % 'haproxy.conf', + self.lb, + conf_dir % 'haproxy_stats.sock', + 'test_group') + ip_wrap.assert_called_once_with( + namespace=namespace_driver.get_ns_name(self.lb.id)) + mock_ns.netns.execute.assert_called_once_with( + ['haproxy', '-f', conf_dir % 'haproxy.conf', '-p', + conf_dir % 'haproxy.pid']) + self.assertIn(self.lb.id, self.driver.deployed_loadbalancers) + self.assertEqual(self.lb, + self.driver.deployed_loadbalancers[self.lb.id]) + + +class BaseTestManager(base.BaseTestCase): + + def setUp(self): + super(BaseTestManager, self).setUp() + self.driver = mock.Mock() + self.lb_manager = namespace_driver.LoadBalancerManager(self.driver) + self.listener_manager = namespace_driver.ListenerManager(self.driver) + self.pool_manager = namespace_driver.PoolManager(self.driver) + self.member_manager = namespace_driver.MemberManager(self.driver) + self.hm_manager = namespace_driver.HealthMonitorManager(self.driver) + self.refresh = self.driver.loadbalancer.refresh + + +class BaseTestLoadBalancerManager(BaseTestManager): + + def setUp(self): + super(BaseTestLoadBalancerManager, self).setUp() + self.in_lb = data_models.LoadBalancer(id='lb1', listeners=[]) + + +class TestLoadBalancerManager(BaseTestLoadBalancerManager): + + @mock.patch.object(data_models.LoadBalancer, 'from_dict') + def test_refresh(self, lb_from_dict): + rpc_return = {'id': self.in_lb.id} + self.driver.plugin_rpc.get_loadbalancer.return_value = rpc_return + from_dict_return = data_models.LoadBalancer(id=self.in_lb.id) + lb_from_dict.return_value = from_dict_return + self.driver.deploy_instance.return_value = True + self.driver.exists.return_value = True + self.lb_manager.refresh(self.in_lb) + self.driver.plugin_rpc.get_loadbalancer.assert_called_once_with( + self.in_lb.id) + lb_from_dict.assert_called_once_with(rpc_return) + self.driver.deploy_instance.assert_called_once_with(from_dict_return) + self.assertFalse(self.driver.exists.called) + self.assertFalse(self.driver.undeploy_instance.called) + + self.driver.reset_mock() + lb_from_dict.reset_mock() + self.driver.deploy_instance.return_value = False + self.driver.exists.return_value = False + self.lb_manager.refresh(self.in_lb) + self.driver.plugin_rpc.get_loadbalancer.assert_called_once_with( + self.in_lb.id) + lb_from_dict.assert_called_once_with(rpc_return) + self.driver.deploy_instance.assert_called_once_with(from_dict_return) + self.driver.exists.assert_called_once_with(self.in_lb.id) + self.assertFalse(self.driver.undeploy_instance.called) + + self.driver.reset_mock() + lb_from_dict.reset_mock() + self.driver.deploy_instance.return_value = False + self.driver.exists.return_value = True + self.lb_manager.refresh(self.in_lb) + self.driver.plugin_rpc.get_loadbalancer.assert_called_once_with( + self.in_lb.id) + lb_from_dict.assert_called_once_with(rpc_return) + self.driver.deploy_instance.assert_called_once_with(from_dict_return) + self.driver.exists.assert_called_once_with(from_dict_return.id) + self.driver.undeploy_instance.assert_called_once_with(self.in_lb.id) + + def test_delete(self): + self.driver.exists.return_value = False + self.lb_manager.delete(self.in_lb) + self.driver.exists.assert_called_once_with(self.in_lb.id) + self.assertFalse(self.driver.undeploy_instance.called) + + self.driver.reset_mock() + self.driver.exists.return_value = True + self.lb_manager.delete(self.in_lb) + self.driver.exists.assert_called_once_with(self.in_lb.id) + self.driver.undeploy_instance.assert_called_once_with(self.in_lb.id) + + def test_create(self): + self.lb_manager.refresh = mock.Mock() + self.lb_manager.create(self.in_lb) + self.assertFalse(self.lb_manager.refresh.called) + + self.lb_manager.refresh.reset_mock() + self.in_lb.listeners.append(data_models.Listener(id='listener1')) + self.lb_manager.create(self.in_lb) + self.lb_manager.refresh.assert_called_once_with(self.in_lb) + + def test_get_stats(self): + self.lb_manager.get_stats(self.in_lb.id) + self.driver.get_stats.assert_called_once_with(self.in_lb.id) + + def test_update(self): + old_lb = data_models.LoadBalancer(id='lb0') + self.lb_manager.refresh = mock.Mock() + self.lb_manager.update(old_lb, self.in_lb) + self.lb_manager.refresh.assert_called_once_with(self.in_lb) + + +class BaseTestListenerManager(BaseTestLoadBalancerManager): + + def setUp(self): + super(BaseTestListenerManager, self).setUp() + self.in_listener = data_models.Listener(id='listener1') + self.listener2 = data_models.Listener(id='listener2') + self.in_listener.loadbalancer = self.in_lb + self.listener2.loadbalancer = self.in_lb + self.refresh = self.driver.loadbalancer.refresh + + +class TestListenerManager(BaseTestListenerManager): + + def setUp(self): + super(TestListenerManager, self).setUp() + self.in_listener = data_models.Listener(id='listener1') + self.listener2 = data_models.Listener(id='listener2') + self.in_lb.listeners = [self.in_listener, self.listener2] + self.in_listener.loadbalancer = self.in_lb + self.listener2.loadbalancer = self.in_lb + + def test_remove_listener(self): + self.listener_manager._remove_listener(self.in_lb, self.in_listener.id) + self.assertEqual(1, len(self.in_lb.listeners)) + self.assertEqual(self.listener2.id, self.in_lb.listeners[0].id) + + def test_update(self): + old_listener = data_models.Listener(id='listener1', name='bleh') + self.listener_manager.update(old_listener, self.in_listener) + self.refresh.assert_called_once_with(self.in_lb) + + def test_create(self): + self.listener_manager.create(self.in_listener) + self.refresh.assert_called_once_with(self.in_lb) + + def test_delete(self): + self.listener_manager.delete(self.in_listener) + self.refresh.assert_called_once_with(self.in_lb) + self.assertFalse(self.driver.undeploy_instance.called) + + self.refresh.reset_mock() + self.driver.reset_mock() + self.listener_manager.delete(self.listener2) + self.assertFalse(self.refresh.called) + self.driver.undeploy_instance.assert_called_once_with(self.in_lb.id) + + +class BaseTestPoolManager(BaseTestListenerManager): + + def setUp(self): + super(BaseTestPoolManager, self).setUp() + self.in_pool = data_models.Pool(id='pool1') + self.in_listener.default_pool = self.in_pool + self.in_pool.listener = self.in_listener + + +class TestPoolManager(BaseTestPoolManager): + + def test_update(self): + old_pool = data_models.Pool(id=self.in_pool.id, name='bleh') + self.pool_manager.update(old_pool, self.in_pool) + self.refresh.assert_called_once_with(self.in_lb) + + def test_create(self): + self.pool_manager.create(self.in_pool) + self.refresh.assert_called_once_with(self.in_lb) + + def test_delete(self): + self.pool_manager.delete(self.in_pool) + self.assertIsNone(self.in_listener.default_pool) + self.refresh.assert_called_once_with(self.in_lb) + + +class BaseTestMemberManager(BaseTestPoolManager): + + def setUp(self): + super(BaseTestMemberManager, self).setUp() + self.in_member = data_models.Member(id='member1') + self.member2 = data_models.Member(id='member2') + self.in_pool.members = [self.in_member, self.member2] + self.in_member.pool = self.in_pool + self.member2.pool = self.in_pool + + +class TestMemberManager(BaseTestMemberManager): + + def test_remove_member(self): + self.member_manager._remove_member(self.in_pool, self.in_member.id) + self.assertEqual(1, len(self.in_pool.members)) + self.assertEqual(self.member2.id, self.in_pool.members[0].id) + + def test_update(self): + old_member = data_models.Member(id=self.in_member.id, + address='0.0.0.0') + self.member_manager.update(old_member, self.in_member) + self.refresh.assert_called_once_with(self.in_lb) + + def test_create(self): + self.member_manager.create(self.in_member) + self.refresh.assert_called_once_with(self.in_lb) + + def test_delete(self): + self.member_manager.delete(self.in_member) + self.refresh.assert_called_once_with(self.in_lb) + + +class BaseTestHealthMonitorManager(BaseTestPoolManager): + + def setUp(self): + super(BaseTestHealthMonitorManager, self).setUp() + self.in_hm = data_models.HealthMonitor(id='hm1') + self.in_pool.healthmonitor = self.in_hm + self.in_hm.pool = self.in_pool + + +class TestHealthMonitorManager(BaseTestHealthMonitorManager): + + def test_update(self): + old_hm = data_models.HealthMonitor(id=self.in_hm.id, timeout=2) + self.hm_manager.update(old_hm, self.in_hm) + self.refresh.assert_called_once_with(self.in_lb) + + def test_create(self): + self.hm_manager.create(self.in_hm) + self.refresh.assert_called_once_with(self.in_lb) + + def test_delete(self): + self.hm_manager.delete(self.in_hm) + self.assertIsNone(self.in_pool.healthmonitor) + self.refresh.assert_called_once_with(self.in_lb) + + +class TestNamespaceDriverModule(base.BaseTestCase): + + @mock.patch('os.path.exists') + @mock.patch('neutron.agent.linux.utils.execute') + def test_kill_pids_in_file(self, execute, exists): + pid_path = '/var/lib/data' + with mock.patch('__builtin__.open') as m_open: + exists.return_value = False + file_mock = mock.MagicMock() + m_open.return_value = file_mock + file_mock.__enter__.return_value = file_mock + file_mock.__iter__.return_value = iter(['123']) + namespace_driver.kill_pids_in_file(pid_path) + # sometimes fails + # exists.assert_called_once_with(pid_path) + self.assertFalse(m_open.called) + self.assertFalse(execute.called) + + exists.return_value = True + execute.side_effect = RuntimeError + namespace_driver.kill_pids_in_file(pid_path) + # sometimes fails + # execute.assert_called_once_with(['kill', '-9', '123']) + + def test_get_ns_name(self): + ns_name = namespace_driver.get_ns_name('woohoo') + self.assertEqual(namespace_driver.NS_PREFIX + 'woohoo', ns_name)