Decouple amp driver from network using new models

The amphora driver's post_vip_plug method required network specific information
about the amphora to complete implementations.  It was not accepting anything
other than a loadbalancer object.  Now it takes a dictionary of
AmphoraNetworkConfig objects that is keyed of amphora id.  Each instance
contains network specific information required to set up the correct routing
on the amphora.

This patch sets up the routing correctly to solve the vip blackholing issue but
only for the ssh_driver.  The argument has only been added to the post_vip_plug
method of the rest_api_driver but will need to be updated to handle this new
information and to also fix the vip blackholing problems.

Change-Id: I17ce89b6c050a2a36e0a802920e2dedb063f615b
Closes-Bug: #1453951
This commit is contained in:
Brandon Logan 2015-07-22 21:31:23 -05:00
parent 60b35f64ff
commit c2bdb48419
14 changed files with 228 additions and 65 deletions

View File

@ -19,6 +19,7 @@ import six
@six.add_metaclass(abc.ABCMeta)
class AmphoraLoadBalancerDriver(object):
@abc.abstractmethod
def update(self, listener, vip):
"""Update the amphora with a new configuration
@ -134,12 +135,16 @@ class AmphoraLoadBalancerDriver(object):
"""
pass
def post_vip_plug(self, load_balancer):
def post_vip_plug(self, load_balancer, amphorae_network_config):
"""Called after network driver has allocated and plugged the VIP
:param load_balancer: A load balancer that just had its vip allocated
and plugged in the network driver.
:type load_balancer: octavia.common.data_models.LoadBalancer
:param amphorae_network_config: A data model containing information
about the subnets and ports that an
amphorae owns.
:type vip_network: octavia.network.data_models.AmphoraNetworkConfig
:returns: None
"""
pass

View File

@ -99,7 +99,7 @@ class HaproxyAmphoraLoadBalancerDriver(driver_base.AmphoraLoadBalancerDriver):
def finalize_amphora(self, amphora):
pass
def post_vip_plug(self, load_balancer):
def post_vip_plug(self, load_balancer, amphorae_network_config):
for amp in load_balancer.amphorae:
self.client.plug_vip(amp, load_balancer.vip.ip_address)

View File

@ -29,6 +29,22 @@ from octavia.common.tls_utils import cert_parser
from octavia.i18n import _LW
LOG = logging.getLogger(__name__)
NEUTRON_VERSION = '2.0'
VIP_ROUTE_TABLE = 'vip'
# ip and route commands
CMD_DHCLIENT = "dhclient {0}"
CMD_ADD_IP_ADDR = "ip addr add {0}/24 dev {1}"
CMD_SHOW_IP_ADDR = "ip addr show {0}"
CMD_GREP_DOWN_LINKS = "ip link | grep DOWN -m 1 | awk '{print $2}'"
CMD_CREATE_VIP_ROUTE_TABLE = (
"su -c 'echo \"1 {0}\" >> /etc/iproute2/rt_tables'"
)
CMD_ADD_ROUTE_TO_TABLE = "ip route add {0} dev {1} table {2}"
CMD_ADD_DEFAULT_ROUTE_TO_TABLE = ("ip route add default via {0} "
"dev {1} table {2}")
CMD_ADD_RULE_FROM_NET_TO_TABLE = "ip rule add from {0} table {1}"
CMD_ADD_RULE_TO_NET_TO_TABLE = "ip rule add to {0} table {1}"
class HaproxyManager(driver_base.AmphoraLoadBalancerDriver):
@ -128,7 +144,36 @@ class HaproxyManager(driver_base.AmphoraLoadBalancerDriver):
self.__class__.__name__, amphora.id)
self.amphoraconfig[amphora.id] = (amphora.id, 'finalize amphora')
def post_vip_plug(self, load_balancer):
def _configure_amp_routes(self, vip_iface, amp_net_config):
subnet = amp_net_config.vip_subnet
command = CMD_CREATE_VIP_ROUTE_TABLE.format(VIP_ROUTE_TABLE)
self._execute_command(command, run_as_root=True)
command = CMD_ADD_ROUTE_TO_TABLE.format(
subnet.cidr, vip_iface, VIP_ROUTE_TABLE)
self._execute_command(command, run_as_root=True)
command = CMD_ADD_DEFAULT_ROUTE_TO_TABLE.format(
subnet.gateway_ip, vip_iface, VIP_ROUTE_TABLE)
self._execute_command(command, run_as_root=True)
command = CMD_ADD_RULE_FROM_NET_TO_TABLE.format(
subnet.cidr, VIP_ROUTE_TABLE)
self._execute_command(command, run_as_root=True)
command = CMD_ADD_RULE_TO_NET_TO_TABLE.format(
subnet.cidr, VIP_ROUTE_TABLE)
self._execute_command(command, run_as_root=True)
def _configure_amp_interface(self, iface, secondary_ip=None):
# just grab the ip from dhcp
command = CMD_DHCLIENT.format(iface)
self._execute_command(command, run_as_root=True)
if secondary_ip:
# add secondary_ip
command = CMD_ADD_IP_ADDR.format(secondary_ip, iface)
self._execute_command(command, run_as_root=True)
# log interface details
command = CMD_SHOW_IP_ADDR.format(iface)
self._execute_command(command)
def post_vip_plug(self, load_balancer, amphorae_network_config):
LOG.debug("Add vip to interface for all amphora on %s",
load_balancer.id)
@ -136,55 +181,25 @@ class HaproxyManager(driver_base.AmphoraLoadBalancerDriver):
# Connect to amphora
self._connect(hostname=amp.lb_network_ip)
stdout, _ = self._execute_command(
"ip link | grep DOWN -m 1 | awk '{print $2}'")
stdout, _ = self._execute_command(CMD_GREP_DOWN_LINKS)
iface = stdout[:-2]
if not iface:
self.client.close()
continue
vip = load_balancer.vip.ip_address
sections = vip.split('.')[:3]
sections.append('255')
broadcast = '.'.join(sections)
command = ("sh -c 'echo \"\nauto {0} {0}:0\n"
"iface {0} inet dhcp\n\niface {0}:0 inet static\n"
"address {1}\nbroadcast {2}\nnetmask {3}\" "
">> /etc/network/interfaces'".format(
iface, vip, broadcast, '255.255.255.0'))
self._execute_command(command, run_as_root=True)
# sanity ifdown for interface
command = "ifdown {0}".format(iface)
self._execute_command(command, run_as_root=True)
# sanity ifdown for static ip
command = "ifdown {0}:0".format(iface)
self._execute_command(command, run_as_root=True)
# ifup for interface
command = "ifup {0}".format(iface)
self._execute_command(command, run_as_root=True)
# ifup for static ip
command = "ifup {0}:0".format(iface)
self._execute_command(command, run_as_root=True)
self._configure_amp_interface(
iface, secondary_ip=load_balancer.vip.ip_address)
self._configure_amp_routes(
iface, amphorae_network_config.get(amp.id))
self.client.close()
def post_network_plug(self, amphora):
self._connect(hostname=amphora.lb_network_ip)
stdout, _ = self._execute_command(
"ip link | grep DOWN -m 1 | awk '{print $2}'")
stdout, _ = self._execute_command(CMD_GREP_DOWN_LINKS)
iface = stdout[:-2]
if not iface:
self.client.close()
return
# make interface come up on boot
command = ("sh -c 'echo \"\nauto {0}\niface {0} inet dhcp\" "
">> /etc/network/interfaces'".format(iface))
self._execute_command(command, run_as_root=True)
# ifdown for sanity
command = "ifdown {0}".format(iface)
self._execute_command(command, run_as_root=True)
# ifup to bring it up
command = "ifup {0}".format(iface)
self._execute_command(command, run_as_root=True)
self._configure_amp_interface(iface)
self.client.close()
def _execute_command(self, command, run_as_root=False):

View File

@ -85,6 +85,12 @@ class NoopManager(object):
self.__class__.__name__, amphora.id)
self.amphoraconfig[amphora.id] = (amphora.id, 'post_network_plug')
def post_vip_plug(self, load_balancer, amphorae_network_config):
LOG.debug("Amphora %s no-op, post vip plug load balancer %s",
self.__class__.__name__, load_balancer.id)
self.amphoraconfig[(load_balancer.id, id(amphorae_network_config))] = (
load_balancer.id, amphorae_network_config, 'post_vip_plug')
class NoopAmphoraLoadBalancerDriver(driver_base.AmphoraLoadBalancerDriver):
def __init__(self):
@ -122,3 +128,7 @@ class NoopAmphoraLoadBalancerDriver(driver_base.AmphoraLoadBalancerDriver):
def post_network_plug(self, amphora):
self.driver.post_network_plug(amphora)
def post_vip_plug(self, load_balancer, amphorae_network_config):
self.driver.post_vip_plug(load_balancer, amphorae_network_config)

View File

@ -95,6 +95,8 @@ POOL = 'pool'
POOL_ID = 'pool_id'
OBJECT = 'object'
SERVER_PEM = 'server_pem'
VIP_NETWORK = 'vip_network'
AMPHORAE_NETWORK_CONFIG = 'amphorae_network_config'
CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow'
CREATE_AMPHORA_FOR_LB_FLOW = 'octavia-create-amp-for-lb-flow'

View File

@ -103,8 +103,12 @@ class LoadBalancerFlows(object):
name=constants.RELOAD_LB_AFTER_PLUG_VIP,
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER))
new_LB_net_subflow.add(network_tasks.GetAmphoraeNetworkConfigs(
requires=constants.LOADBALANCER,
provides=constants.AMPHORAE_NETWORK_CONFIG))
new_LB_net_subflow.add(amphora_driver_tasks.AmphoraPostVIPPlug(
requires=constants.LOADBALANCER))
requires=(constants.LOADBALANCER,
constants.AMPHORAE_NETWORK_CONFIG)))
return new_LB_net_subflow

View File

@ -40,7 +40,6 @@ class BaseAmphoraTask(task.Task):
name=CONF.controller_worker.amphora_driver,
invoke_on_load=True
).driver
self.amphora_repo = repo.AmphoraRepository()
self.listener_repo = repo.ListenerRepository()
self.loadbalancer_repo = repo.LoadBalancerRepository()
@ -185,9 +184,10 @@ class AmphoraePostNetworkPlug(BaseAmphoraTask):
class AmphoraPostVIPPlug(BaseAmphoraTask):
"""Task to notify the amphora post VIP plug."""
def execute(self, loadbalancer):
def execute(self, loadbalancer, amphorae_network_config):
"""Execute post_vip_routine."""
self.amphora_driver.post_vip_plug(loadbalancer)
self.amphora_driver.post_vip_plug(
loadbalancer, amphorae_network_config)
LOG.debug("Notfied amphora of vip plug")
def revert(self, result, loadbalancer, *args, **kwargs):

View File

@ -109,7 +109,7 @@ class HaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
pass
def test_post_vip_plug(self):
self.driver.post_vip_plug(self.lb)
self.driver.post_vip_plug(self.lb, mock.Mock())
self.driver.client.plug_vip.assert_called_once_with(
self.amp, self.lb.vip.ip_address)

View File

@ -23,6 +23,7 @@ from octavia.certificates.manager import barbican
from octavia.common import data_models
from octavia.common.tls_utils import cert_parser
from octavia.db import models as models
from octavia.network import data_models as network_models
from octavia.tests.unit import base
from octavia.tests.unit.common.sample_configs import sample_configs
@ -33,6 +34,14 @@ else:
LOG = log.getLogger(__name__)
MOCK_NETWORK_ID = '1'
MOCK_SUBNET_ID = '2'
MOCK_PORT_ID = '3'
MOCK_COMPUTE_ID = '4'
MOCK_AMP_ID = '5'
MOCK_IP_ADDRESS = '10.0.0.1'
MOCK_CIDR = '10.0.0.0/24'
class TestSshDriver(base.TestCase):
FAKE_UUID_1 = uuidutils.generate_uuid()
@ -232,3 +241,92 @@ class TestSshDriver(base.TestCase):
certificate='imacert', private_key='imakey',
intermediates=['imainter', 'imainter2'])
self.assertEqual(expected, self.driver._build_pem(tls_tupe))
@mock.patch.object(ssh_driver.HaproxyManager, '_execute_command')
def test_post_vip_plug_no_down_links(self, exec_command):
amps = [data_models.Amphora(id=MOCK_AMP_ID, compute_id=MOCK_COMPUTE_ID,
lb_network_ip=MOCK_IP_ADDRESS)]
vip = data_models.Vip(ip_address=MOCK_IP_ADDRESS)
lb = data_models.LoadBalancer(amphorae=amps, vip=vip)
vip_network = network_models.Network(id=MOCK_NETWORK_ID)
exec_command.return_value = ('', '')
self.driver.post_vip_plug(lb, vip_network)
exec_command.assert_called_once_with(ssh_driver.CMD_GREP_DOWN_LINKS)
@mock.patch.object(ssh_driver.HaproxyManager, '_execute_command')
def test_post_vip_plug(self, exec_command):
amps = [data_models.Amphora(id=MOCK_AMP_ID, compute_id=MOCK_COMPUTE_ID,
lb_network_ip=MOCK_IP_ADDRESS)]
vip = data_models.Vip(ip_address=MOCK_IP_ADDRESS)
lb = data_models.LoadBalancer(amphorae=amps, vip=vip)
vip_subnet = network_models.Subnet(id=MOCK_SUBNET_ID,
gateway_ip=MOCK_IP_ADDRESS,
cidr=MOCK_CIDR)
vip_port = network_models.Port(id=MOCK_PORT_ID,
device_id=MOCK_COMPUTE_ID)
amphorae_net_config = {amps[0].id: network_models.AmphoraNetworkConfig(
amphora=amps[0],
vip_subnet=vip_subnet,
vip_port=vip_port
)}
iface = 'eth1'
exec_command.return_value = ('{0}: '.format(iface), '')
self.driver.post_vip_plug(lb, amphorae_net_config)
grep_call = mock.call(ssh_driver.CMD_GREP_DOWN_LINKS)
dhclient_call = mock.call(ssh_driver.CMD_DHCLIENT.format(iface),
run_as_root=True)
add_ip_call = mock.call(ssh_driver.CMD_ADD_IP_ADDR.format(
MOCK_IP_ADDRESS, iface), run_as_root=True)
show_ip_call = mock.call(ssh_driver.CMD_SHOW_IP_ADDR.format(iface))
create_vip_table_call = mock.call(
ssh_driver.CMD_CREATE_VIP_ROUTE_TABLE.format(
ssh_driver.VIP_ROUTE_TABLE),
run_as_root=True
)
add_route_call = mock.call(
ssh_driver.CMD_ADD_ROUTE_TO_TABLE.format(
MOCK_CIDR, iface, ssh_driver.VIP_ROUTE_TABLE),
run_as_root=True
)
add_default_route_call = mock.call(
ssh_driver.CMD_ADD_DEFAULT_ROUTE_TO_TABLE.format(
MOCK_IP_ADDRESS, iface, ssh_driver.VIP_ROUTE_TABLE),
run_as_root=True
)
add_rule_from_call = mock.call(
ssh_driver.CMD_ADD_RULE_FROM_NET_TO_TABLE.format(
MOCK_CIDR, ssh_driver.VIP_ROUTE_TABLE),
run_as_root=True
)
add_rule_to_call = mock.call(
ssh_driver.CMD_ADD_RULE_TO_NET_TO_TABLE.format(
MOCK_CIDR, ssh_driver.VIP_ROUTE_TABLE),
run_as_root=True
)
exec_command.assert_has_calls([grep_call, dhclient_call, add_ip_call,
show_ip_call, create_vip_table_call,
add_route_call, add_default_route_call,
add_rule_from_call, add_rule_to_call])
self.assertEqual(9, exec_command.call_count)
@mock.patch.object(ssh_driver.HaproxyManager, '_execute_command')
def test_post_network_plug_no_down_links(self, exec_command):
amp = data_models.Amphora(id=MOCK_AMP_ID, compute_id=MOCK_COMPUTE_ID,
lb_network_ip=MOCK_IP_ADDRESS)
exec_command.return_value = ('', '')
self.driver.post_network_plug(amp)
exec_command.assert_called_once_with(ssh_driver.CMD_GREP_DOWN_LINKS)
@mock.patch.object(ssh_driver.HaproxyManager, '_execute_command')
def test_post_network_plug(self, exec_command):
amp = data_models.Amphora(id=MOCK_AMP_ID, compute_id=MOCK_COMPUTE_ID,
lb_network_ip=MOCK_IP_ADDRESS)
iface = 'eth1'
exec_command.return_value = ('{0}: '.format(iface), '')
self.driver.post_network_plug(amp)
grep_call = mock.call(ssh_driver.CMD_GREP_DOWN_LINKS)
dhclient_call = mock.call(ssh_driver.CMD_DHCLIENT.format(iface),
run_as_root=True)
show_ip_call = mock.call(ssh_driver.CMD_SHOW_IP_ADDR.format(iface))
exec_command.assert_has_calls([grep_call, dhclient_call, show_ip_call])
self.assertEqual(3, exec_command.call_count)

View File

@ -16,7 +16,8 @@ from oslo_log import log as logging
from oslo_utils import uuidutils
from octavia.amphorae.drivers.noop_driver import driver as driver
from octavia.db import models as models
from octavia.common import data_models
from octavia.network import data_models as network_models
from octavia.tests.unit import base as base
@ -38,18 +39,28 @@ class LoggingMixIn(base.TestCase):
self.assertEqual('test update health', self.mixin.health)
class NoopAmphoraLoadBalancerDriver(base.TestCase):
class TestNoopAmphoraLoadBalancerDriver(base.TestCase):
FAKE_UUID_1 = uuidutils.generate_uuid()
def setUp(self):
super(NoopAmphoraLoadBalancerDriver, self).setUp()
super(TestNoopAmphoraLoadBalancerDriver, self).setUp()
self.driver = driver.NoopAmphoraLoadBalancerDriver()
self.listener = models.Listener()
self.listener = data_models.Listener()
self.listener.protocol_port = 80
self.vip = models.Vip()
self.vip = data_models.Vip()
self.vip.ip_address = "10.0.0.1"
self.amphora = models.Amphora()
self.amphora = data_models.Amphora()
self.amphora.id = self.FAKE_UUID_1
self.load_balancer = data_models.LoadBalancer(
id=FAKE_UUID_1, amphorae=[self.amphora], vip=self.vip,
listeners=[self.listener])
self.network = network_models.Network(id=self.FAKE_UUID_1)
self.amphorae_net_configs = {
self.amphora.id:
network_models.AmphoraNetworkConfig(
amphora=self.amphora,
vip_subnet=network_models.Subnet(id=self.FAKE_UUID_1))
}
def test_update(self):
self.driver.update(self.listener, self.vip)
@ -102,3 +113,14 @@ class NoopAmphoraLoadBalancerDriver(base.TestCase):
self.assertEqual((self.amphora.id, 'post_network_plug'),
self.driver.driver.amphoraconfig[
self.amphora.id])
def test_post_vip_plug(self):
self.driver.post_vip_plug(self.load_balancer,
self.amphorae_net_configs)
expected_method_and_args = (self.load_balancer.id,
self.amphorae_net_configs,
'post_vip_plug')
actual_method_and_args = self.driver.driver.amphoraconfig[(
self.load_balancer.id, id(self.amphorae_net_configs)
)]
self.assertEqual(expected_method_and_args, actual_method_and_args)

View File

@ -79,7 +79,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.assertEqual(len(amp_flow.provides), 7)
self.assertEqual(len(amp_flow.provides), 8)
self.assertEqual(len(amp_flow.requires), 1)
def test_get_cert_create_amphora_for_lb_flow(self):
@ -99,7 +99,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.assertEqual(len(amp_flow.provides), 8)
self.assertEqual(len(amp_flow.provides), 9)
self.assertEqual(len(amp_flow.requires), 1)
def test_get_delete_amphora_flow(self):

View File

@ -41,7 +41,7 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER_ID, lb_flow.requires)
self.assertEqual(len(lb_flow.provides), 5)
self.assertEqual(len(lb_flow.provides), 6)
self.assertEqual(len(lb_flow.requires), 1)
def test_get_delete_load_balancer_flow(self):
@ -68,7 +68,7 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER, lb_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, lb_flow.requires)
self.assertEqual(len(lb_flow.provides), 3)
self.assertEqual(len(lb_flow.provides), 4)
self.assertEqual(len(lb_flow.requires), 2)
def test_get_update_load_balancer_flow(self):

View File

@ -27,6 +27,7 @@ else:
import unittest.mock as mock
AMP_ID = uuidutils.generate_uuid()
COMPUTE_ID = uuidutils.generate_uuid()
LISTENER_ID = uuidutils.generate_uuid()
LB_ID = uuidutils.generate_uuid()
@ -37,6 +38,7 @@ _listener_mock.id = LISTENER_ID
_vip_mock = mock.MagicMock()
_LB_mock = mock.MagicMock()
_amphorae_mock = [_amphora_mock]
_network_mock = mock.MagicMock()
@mock.patch('octavia.db.repositories.AmphoraRepository.update')
@ -45,13 +47,13 @@ _amphorae_mock = [_amphora_mock]
@mock.patch('octavia.controller.worker.tasks.amphora_driver_tasks.LOG')
@mock.patch('oslo_utils.uuidutils.generate_uuid', return_value=AMP_ID)
@mock.patch('stevedore.driver.DriverManager.driver')
class TestDatabaseTasks(base.TestCase):
class TestAmphoraDriverTasks(base.TestCase):
def setUp(self):
_LB_mock.amphorae = _amphora_mock
_LB_mock.id = LB_ID
super(TestDatabaseTasks, self).setUp()
super(TestAmphoraDriverTasks, self).setUp()
def test_listener_update(self,
mock_driver,
@ -222,14 +224,16 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_driver.get_network.return_value = _network_mock
_amphora_mock.id = AMP_ID
_amphora_mock.compute_id = COMPUTE_ID
_LB_mock.amphorae = [_amphora_mock]
amphora_post_network_plug_obj = (amphora_driver_tasks.
AmphoraePostNetworkPlug())
amphora_post_network_plug_obj.execute(_LB_mock)
(mock_driver.post_network_plug.
assert_called_once_with)(_amphora_mock)
assert_called_once_with(_amphora_mock))
# Test revert
amp = amphora_post_network_plug_obj.revert(None, _LB_mock)
@ -250,12 +254,12 @@ class TestDatabaseTasks(base.TestCase):
mock_listener_repo_update,
mock_amphora_repo_update):
amphora_post_vip_plug_obj = (amphora_driver_tasks.
AmphoraPostVIPPlug())
amphora_post_vip_plug_obj.execute(_LB_mock)
amphorae_net_config_mock = mock.Mock()
amphora_post_vip_plug_obj = amphora_driver_tasks.AmphoraPostVIPPlug()
amphora_post_vip_plug_obj.execute(_LB_mock, amphorae_net_config_mock)
(mock_driver.post_vip_plug.
assert_called_once_with)(_LB_mock)
mock_driver.post_vip_plug.assert_called_once_with(
_LB_mock, amphorae_net_config_mock)
# Test revert
amp = amphora_post_vip_plug_obj.revert(None, _LB_mock)

View File

@ -89,11 +89,14 @@ Establish a base class to model the desire functionality:
for that network on that instance.
"""
def post_vip_plug(self, load_balancer):
def post_vip_plug(self, load_balancer, amphorae_network_config):
"""OPTIONAL - called after plug_vip method of the network driver.
This is to do any additional work needed on the amphorae to plug
the vip, such as bring up interfaces.
amphorae_network_config is a dictionary of objects that include
network specific information about each amphora's connections.
"""
def start_health_check(self, health_mixin):