Updates for calculate delta task

Member has subnet_id and we need to compare with network_id.
In order to do this CalculateDelta task now takes in a load balancer
loops through the amphora, collects the plugged networks
and gathers the network_ids from member subnet_ids and
builds the list accordingly.

Updates network_tasks CalculateDelta
Updates network_tasks adding method to handle deltas
Updates driver adding method to get network
Updates tests, spec and other required code

Change-Id: I698625e4e2e7352cb3000e22de808c33fa7636ed
This commit is contained in:
ptoohill1 2015-05-08 21:56:19 -05:00
parent 88049e87ae
commit a3f53841f8
8 changed files with 352 additions and 57 deletions

View File

@ -81,6 +81,7 @@ SUPPORTED_AMPHORA_TYPES = (AMPHORA_VM,)
AMPHORA = 'amphora'
AMPHORA_ID = 'amphora_id'
DELTA = 'delta'
DELTAS = 'deltas'
LISTENER = 'listener'
LOADBALANCER = 'loadbalancer'
LOADBALANCER_ID = 'loadbalancer_id'

View File

@ -51,9 +51,9 @@ class CalculateDelta(BaseNetworkTask):
plumbing them.
"""
default_provides = constants.DELTA
default_provides = constants.DELTAS
def execute(self, amphora, nics):
def execute(self, loadbalancer):
"""Compute which NICs need to be plugged
for the amphora to become operational.
@ -63,37 +63,50 @@ class CalculateDelta(BaseNetworkTask):
:returns the delta
"""
LOG.debug("Calculating network delta for amphora id: %s" % amphora.id)
deltas = []
for amphora in loadbalancer.amphorae:
# Figure out what networks we want
# seed with lb network(s)
desired_network_ids = set(CONF.controller_worker.amp_network)
if (not amphora.load_balancer) or (
not amphora.load_balancer.listeners):
return {'add': [], 'delete': []}
LOG.debug("Calculating network delta for amphora id: %s"
% amphora.id)
for listener in amphora.load_balancer.listeners:
if (not listener.default_pool) or (
not listener.default_pool.members):
continue
desired_network_ids.update(list(
member.subnet_id for member in listener.default_pool.members))
# Figure out what networks we want
# seed with lb network(s)
desired_network_ids = {CONF.controller_worker.amp_network,
loadbalancer.vip.network_id}
# assume we don't have two nics in the same network
actual_network_nics = dict((nic.network_id, nic) for nic in nics)
if not loadbalancer.listeners:
return []
del_ids = set(actual_network_nics) - desired_network_ids
delete_nics = list(
actual_network_nics[net_id] for net_id in del_ids)
for listener in loadbalancer.listeners:
if (not listener.default_pool) or (
not listener.default_pool.members):
continue
desired_network_ids.update(
list(
self.network_driver.get_network(
subnet_id=member.subnet_id).id
for member in listener.default_pool.members
if member.subnet_id
)
)
add_ids = desired_network_ids - set(actual_network_nics)
add_nics = list(
data_models.Interface(network_id=net_id) for net_id in add_ids)
nics = self.network_driver.get_plugged_networks(amphora.compute_id)
# assume we don't have two nics in the same network
actual_network_nics = dict((nic.network_id, nic) for nic in nics)
return {
'delete': delete_nics,
'add': add_nics
}
del_ids = set(actual_network_nics) - desired_network_ids
delete_nics = list(
actual_network_nics[net_id] for net_id in del_ids)
add_ids = desired_network_ids - set(actual_network_nics)
add_nics = list(data_models.Interface(
network_id=net_id) for net_id in add_ids)
deltas.append(data_models.Delta(amphora_id=amphora.id,
compute_id=amphora.compute_id,
add_nics=add_nics,
delete_nics=delete_nics))
return deltas
class GetPlumbedNetworks(BaseNetworkTask):
@ -129,7 +142,7 @@ class PlugNetworks(BaseNetworkTask):
return None
# add nics
for nic in delta['add']:
for nic in delta.add_nics:
self.network_driver.plug_network(amphora.compute_id,
nic.network_id)
@ -140,7 +153,7 @@ class PlugNetworks(BaseNetworkTask):
if not delta:
return
for nic in delta['add']:
for nic in delta.add_nics:
try:
self.network_driver.unplug_network(amphora.compute_id,
nic.network_id)
@ -163,7 +176,7 @@ class UnPlugNetworks(BaseNetworkTask):
LOG.debug("No network deltas for amphora id: %s" % amphora.id)
return None
for nic in delta['delete']:
for nic in delta.delete_nics:
try:
self.network_driver.unplug_network(amphora.compute_id,
nic.network_id)
@ -177,6 +190,51 @@ class UnPlugNetworks(BaseNetworkTask):
pass # Todo(german) follow up if that makes sense
class HandleNetworkDeltas(BaseNetworkTask):
"""Task to plug and unplug networks
Loop through the deltas and plug or unplug
networks based on delta
"""
def execute(self, deltas):
"""Handle network plugging based off deltas."""
for delta in deltas:
for nic in delta.add_nics:
self.network_driver.plug_network(delta.compute_id,
nic.network_id)
for nic in delta.delete_nics:
try:
self.network_driver.unplug_network(delta.compute_id,
nic.network_id)
except base.NetworkNotFound as e:
LOG.debug("Network %d not found ", nic.network_id)
pass
except Exception as e:
LOG.error(
_LE("Unable to unplug network - exception: %s"),
e.message)
pass
def revert(self, deltas):
"""Handle a network plug or unplug failures."""
for delta in deltas:
LOG.warn(_LW("Unable to plug networks for amp id %s"),
delta.amphora_id)
if not delta:
return
for nic in delta.add_nics:
try:
self.network_driver.unplug_network(delta.compute_id,
nic.network_id)
except base.NetworkNotFound:
pass
class PlugVIP(BaseNetworkTask):
"""Task to plumb a VIP."""

View File

@ -180,3 +180,13 @@ class AbstractNetworkDriver(object):
:return: None
"""
pass
@abc.abstractmethod
def get_network(self, network_id=None, subnet_id=None):
"""Retrieves network from network id or subnet id .
:param network_id: id of an network to retrieve
:param subnet_id: id of an subnet to retrieve network
:return: octavia.network.data_models.Network
:raises: NetworkException, NetworkNotFound
"""

View File

@ -24,3 +24,32 @@ class Interface(data_models.BaseDataModel):
self.network_id = network_id
self.port_id = port_id
self.ip_address = ip_address
class Delta(data_models.BaseDataModel):
def __init__(self, amphora_id=None, compute_id=None,
add_nics=None, delete_nics=None):
self.compute_id = compute_id
self.amphora_id = amphora_id
self.add_nics = add_nics
self.delete_nics = delete_nics
class Network(data_models.BaseDataModel):
def __init__(self, id=None, name=None, subnets=None,
tenant_id=None, admin_state_up=None, mtu=None,
provider_network_type=None,
provider_physical_network=None,
provider_segmentation_id=None,
router_external=None):
self.id = id
self.name = name
self.subnets = subnets
self.tenant_id = tenant_id
self.admin_state_up = admin_state_up
self.provider_network_type = provider_network_type
self.provider_physical_network = provider_physical_network
self.provider_segmentation_id = provider_segmentation_id
self.router_external = router_external

View File

@ -179,6 +179,17 @@ class AllowedAddressPairsDriver(base.AbstractNetworkDriver):
self.nova_client.servers.add_security_group(
amphora.compute_id, sec_grp.get('id'))
def _map_network_to_data_model(self, network):
nw = network.get('network')
return network_models.Network(
id=nw.get('id'), name=nw.get('name'), subnets=nw.get('subnets'),
tenant_id=nw.get('tenant_id'),
admin_state_up=nw.get('admin_state_up'), mtu=nw.get('mtu'),
provider_network_type=nw.get('provider:network_type'),
provider_physical_network=nw.get('provider:physical_network'),
provider_segmentation_id=nw.get('provider:segmentation_id'),
router_external=nw.get('router:external'))
def deallocate_vip(self, vip):
try:
self.neutron_client.delete_port(vip.port_id)
@ -349,3 +360,31 @@ class AllowedAddressPairsDriver(base.AbstractNetworkDriver):
def update_vip(self, load_balancer):
sec_grp = self._get_lb_security_group(load_balancer.id)
self._update_security_group_rules(load_balancer, sec_grp.get('id'))
def get_network(self, network_id=None, subnet_id=None):
network = None
try:
if network_id:
network = self.neutron_client.show_network(network_id)
elif subnet_id:
subnet = (self.neutron_client.show_subnet(subnet_id)
.get('subnet').get('network_id'))
network = self.neutron_client.show_network(subnet)
except base.NetworkNotFound:
message = _LE('Network not found '
'(network id: {network_id} '
'and subnet id: {subnet_id}.').format(
network_id=network_id,
subnet_id=subnet_id)
LOG.exception(message)
raise base.NetworkNotFound(message)
except Exception:
message = _LE('Error retrieving network '
'(network id: {network_id} '
'and subnet id: {subnet_id}.').format(
network_id=network_id,
subnet_id=subnet_id)
LOG.exception(message)
raise base.NetworkException(message)
return self._map_network_to_data_model(network)

View File

@ -77,6 +77,12 @@ class NoopManager(object):
self.__class__.__name__, load_balancer)
self.networkconfigconfig[load_balancer] = (load_balancer, 'update_vip')
def get_network(self, network_id=None, subnet_id=None):
LOG.debug("Network %s no-op, get_network network_id %s",
self.__class__.__name__, network_id)
self.networkconfigconfig[network_id, subnet_id] = (
network_id, subnet_id, 'get_network')
class NoopNetworkDriver(driver_base.AbstractNetworkDriver):
def __init__(self):
@ -106,3 +112,6 @@ class NoopNetworkDriver(driver_base.AbstractNetworkDriver):
def update_vip(self, load_balancer):
self.driver.update_vip(load_balancer)
def get_network(self, network_id=None, subnet_id=None):
self.driver.get_network(network_id, subnet_id)

View File

@ -49,54 +49,78 @@ class TestNetworkTasks(base.TestCase):
def setUp(self):
network_tasks.LOG = mock.MagicMock()
self.amphora_mock = mock.MagicMock()
self.load_balancer_mock = mock.MagicMock()
self.load_balancer_mock.amphorae = []
self.amphora_mock.id = AMPHORA_ID
self.amphora_mock.compute_id = COMPUTE_ID
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="controller_worker", amp_network=[])
conf.config(group="controller_worker", amp_network='netid')
super(TestNetworkTasks, self).setUp()
def test_calculate_delta(self,
mock_driver):
EMPTY = {'add': [], 'delete': []}
EMPTY = []
empty_deltas = [data_models.Delta(
amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[],
delete_nics=[])]
def _interface(network_id):
return [data_models.Interface(network_id=network_id)]
net = network_tasks.CalculateDelta()
self.amphora_mock.load_balancer = None
self.assertEqual(EMPTY, net.execute(self.amphora_mock, []))
self.assertEqual(EMPTY, net.execute(self.load_balancer_mock))
lb_mock = mock.MagicMock()
self.amphora_mock.load_balancer = lb_mock
lb_mock.listeners = None
self.assertEqual(EMPTY, net.execute(self.amphora_mock, []))
self.amphora_mock.load_balancer = self.load_balancer_mock
self.load_balancer_mock.amphorae = [self.amphora_mock]
self.load_balancer_mock.listeners = None
self.assertEqual(EMPTY, net.execute(self.load_balancer_mock))
listener_mock = mock.MagicMock()
lb_mock.listeners = [listener_mock]
self.load_balancer_mock.listeners = [listener_mock]
listener_mock.default_pool = None
self.assertEqual(EMPTY, net.execute(self.amphora_mock, []))
self.assertEqual(empty_deltas, net.execute(self.load_balancer_mock))
mock_driver.get_plugged_networks.assert_called_once_with(COMPUTE_ID)
pool_mock = mock.MagicMock()
listener_mock.default_pool = pool_mock
pool_mock.members = None
self.assertEqual(EMPTY, net.execute(self.amphora_mock, []))
self.assertEqual(empty_deltas, net.execute(self.load_balancer_mock))
member_mock = mock.MagicMock()
pool_mock.members = [member_mock]
member_mock.subnet_id = 1
self.assertEqual({'add': _interface(1), 'delete': []},
net.execute(self.amphora_mock, []))
self.assertEqual(EMPTY, net.execute(self.amphora_mock, _interface(1)))
mock_driver.get_network.return_value = data_models.Network(id=2)
result = {'add': _interface(1), 'delete': _interface(2)}
self.assertEqual(result, net.execute(self.amphora_mock, _interface(2)))
ndm = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=_interface(2),
delete_nics=[])
self.assertEqual([ndm], net.execute(self.load_balancer_mock))
mock_driver.get_network.assert_called_once_with(subnet_id=1)
mock_driver.get_plugged_networks.return_value = _interface(2)
self.assertEqual(empty_deltas, net.execute(self.load_balancer_mock))
mock_driver.get_plugged_networks.return_value = _interface(3)
ndm = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=_interface(2),
delete_nics=_interface(3))
self.assertEqual([ndm], net.execute(self.load_balancer_mock))
pool_mock.members = []
self.assertEqual({'add': [], 'delete': _interface(2)},
net.execute(self.amphora_mock, _interface(2)))
mock_driver.get_plugged_networks.return_value = _interface(2)
ndm = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[],
delete_nics=_interface(2))
self.assertEqual([ndm], net.execute(self.load_balancer_mock))
def test_get_plumbed_networks(self,
mock_driver):
@ -109,16 +133,26 @@ class TestNetworkTasks(base.TestCase):
def test_plug_networks(self,
mock_driver):
def _interface(network_id):
return [data_models.Interface(network_id=network_id)]
net = network_tasks.PlugNetworks()
net.execute(self.amphora_mock, None)
self.assertFalse(mock_driver.plug_network.called)
delta = {'add': []}
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[],
delete_nics=[])
net.execute(self.amphora_mock, delta)
self.assertFalse(mock_driver.plug_network.called)
delta = {'add': [data_models.Interface(network_id=1)]}
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=_interface(1),
delete_nics=[])
net.execute(self.amphora_mock, delta)
mock_driver.plug_network.assert_called_once_with(COMPUTE_ID, 1)
@ -126,11 +160,17 @@ class TestNetworkTasks(base.TestCase):
net.revert(self.amphora_mock, None)
self.assertFalse(mock_driver.unplug_network.called)
delta = {'add': []}
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[],
delete_nics=[])
net.revert(self.amphora_mock, delta)
self.assertFalse(mock_driver.unplug_network.called)
delta = {'add': [data_models.Interface(network_id=1)]}
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=_interface(1),
delete_nics=[])
net.revert(self.amphora_mock, delta)
mock_driver.unplug_network.assert_called_once_with(COMPUTE_ID, 1)
@ -147,18 +187,26 @@ class TestNetworkTasks(base.TestCase):
delta)
mock_driver.unplug_network.assert_called_once_with(COMPUTE_ID, 1)
def test_unplug_networks(self,
mock_driver):
def test_unplug_networks(self, mock_driver):
def _interface(network_id):
return [data_models.Interface(network_id=network_id)]
net = network_tasks.UnPlugNetworks()
net.execute(self.amphora_mock, None)
self.assertFalse(mock_driver.unplug_network.called)
delta = {'delete': []}
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[],
delete_nics=[])
net.execute(self.amphora_mock, delta)
self.assertFalse(mock_driver.unplug_network.called)
delta = {'delete': [data_models.Interface(network_id=1)]}
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[],
delete_nics=_interface(1))
net.execute(self.amphora_mock, delta)
mock_driver.unplug_network.assert_called_once_with(COMPUTE_ID, 1)
@ -173,6 +221,82 @@ class TestNetworkTasks(base.TestCase):
net.execute(self.amphora_mock, delta) # No exception
mock_driver.unplug_network.assert_called_once_with(COMPUTE_ID, 1)
def test_handle_network_delta(self, mock_driver):
def _interface(network_id):
return [data_models.Interface(network_id=network_id)]
net = network_tasks.HandleNetworkDeltas()
net.execute([])
self.assertFalse(mock_driver.plug_network.called)
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[],
delete_nics=[])
net.execute([delta])
self.assertFalse(mock_driver.plug_network.called)
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=_interface(1),
delete_nics=[])
net.execute([delta])
mock_driver.plug_network.assert_called_once_with(COMPUTE_ID, 1)
# revert
net.execute([delta])
self.assertFalse(mock_driver.unplug_network.called)
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[],
delete_nics=[])
net.execute([delta])
self.assertFalse(mock_driver.unplug_network.called)
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=_interface(1),
delete_nics=[])
mock_driver.reset_mock()
mock_driver.unplug_network.side_effect = net_base.NetworkNotFound
mock_driver.reset_mock()
mock_driver.unplug_network.side_effect = TestException('test')
self.assertRaises(TestException, net.revert, [delta])
mock_driver.unplug_network.assert_called_once_with(COMPUTE_ID, 1)
mock_driver.reset_mock()
net.execute([])
self.assertFalse(mock_driver.unplug_network.called)
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[],
delete_nics=[])
net.execute([delta])
self.assertFalse(mock_driver.unplug_network.called)
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[],
delete_nics=_interface(1))
net.execute([delta])
mock_driver.unplug_network.assert_called_once_with(COMPUTE_ID, 1)
mock_driver.reset_mock()
mock_driver.unplug_network.side_effect = net_base.NetworkNotFound
net.execute([delta])
mock_driver.unplug_network.assert_called_once_with(COMPUTE_ID, 1)
# Do a test with a general exception in case behavior changes
mock_driver.reset_mock()
mock_driver.unplug_network.side_effect = Exception()
net.execute([delta])
mock_driver.unplug_network.assert_called_once_with(COMPUTE_ID, 1)
def test_plug_vip(self,
mock_driver):
net = network_tasks.PlugVIP()

View File

@ -60,7 +60,7 @@ Existing data model:
topology it may exist on both amphorae. In the end, it is up
to the amphora driver to decide how to use this.
New data model:
New data models:
* class Interface
* id
@ -68,6 +68,23 @@ New data model:
* amphora_id
* ip_address - (IPv4 or IPv6)
* class Delta
* amphora_id
* compute_id
* add_nics
* delete_nics
* class Network
* id
* name
* subnets - (list of subnet ids)
* tenant_id
* admin_state_up
* provider_network_type
* provider_physical_network
* provider_segmentation_id
* router_external
New Exceptions defined in the octavia.network package:
* NetworkException - Base Exception
@ -162,6 +179,14 @@ class AbstractNetworkDriver
of the passed in loadbalancer
* loadbalancer: instance of a data_models.LoadBalancer
* get_network(network_id=None, subnet_id=None):
* Retrieves the network from network_id or subnet_id
* network_id = id of an network to retrieve
* subnet_id = id of an subnet to retrieve network
* returns = Network data model
* raises NetworkException, NetworkNotFound
Alternatives
------------