Network Delta calculations should respect AZs

The network delta calculations were all based on the static configured
amp_boot_network_list which is not correct if it's overridden by the AZ.

Change-Id: Ia930e17c76cd601ac005de10fb03231a19f1a776
This commit is contained in:
Adam Harwell 2020-01-30 23:27:14 -08:00
parent 2c76209003
commit 741397f1a9
16 changed files with 249 additions and 114 deletions

@ -431,14 +431,21 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
listeners = pool.listeners
load_balancer = pool.load_balancer
create_member_tf = self._taskflow_load(self._member_flows.
get_create_member_flow(),
store={constants.MEMBER: member,
constants.LISTENERS:
listeners,
constants.LOADBALANCER:
load_balancer,
constants.POOL: pool})
store = {
constants.MEMBER: member,
constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer,
constants.POOL: pool}
if load_balancer.availability_zone:
store[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), load_balancer.availability_zone))
else:
store[constants.AVAILABILITY_ZONE] = {}
create_member_tf = self._taskflow_load(
self._member_flows.get_create_member_flow(),
store=store)
with tf_logging.DynamicLoggingListener(create_member_tf,
log=LOG):
create_member_tf.run()
@ -456,10 +463,21 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
listeners = pool.listeners
load_balancer = pool.load_balancer
store = {
constants.MEMBER: member,
constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer,
constants.POOL: pool}
if load_balancer.availability_zone:
store[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), load_balancer.availability_zone))
else:
store[constants.AVAILABILITY_ZONE] = {}
delete_member_tf = self._taskflow_load(
self._member_flows.get_delete_member_flow(),
store={constants.MEMBER: member, constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer, constants.POOL: pool}
store=store
)
with tf_logging.DynamicLoggingListener(delete_member_tf,
log=LOG):
@ -483,12 +501,21 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
listeners = pool.listeners
load_balancer = pool.load_balancer
store = {
constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer,
constants.POOL: pool}
if load_balancer.availability_zone:
store[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), load_balancer.availability_zone))
else:
store[constants.AVAILABILITY_ZONE] = {}
batch_update_members_tf = self._taskflow_load(
self._member_flows.get_batch_update_members_flow(
old_members, new_members, updated_members),
store={constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer,
constants.POOL: pool})
store=store)
with tf_logging.DynamicLoggingListener(batch_update_members_tf,
log=LOG):
batch_update_members_tf.run()
@ -501,7 +528,6 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
:returns: None
:raises MemberNotFound: The referenced member was not found
"""
member = None
try:
member = self._get_db_obj_until_pending_update(
self._member_repo, member_id)
@ -517,17 +543,22 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
listeners = pool.listeners
load_balancer = pool.load_balancer
update_member_tf = self._taskflow_load(self._member_flows.
get_update_member_flow(),
store={constants.MEMBER: member,
constants.LISTENERS:
listeners,
constants.LOADBALANCER:
load_balancer,
constants.POOL:
pool,
constants.UPDATE_DICT:
member_updates})
store = {
constants.MEMBER: member,
constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer,
constants.POOL: pool,
constants.UPDATE_DICT: member_updates}
if load_balancer.availability_zone:
store[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), load_balancer.availability_zone))
else:
store[constants.AVAILABILITY_ZONE] = {}
update_member_tf = self._taskflow_load(
self._member_flows.get_update_member_flow(),
store=store)
with tf_logging.DynamicLoggingListener(update_member_tf,
log=LOG):
update_member_tf.run()

@ -491,7 +491,8 @@ class AmphoraFlows(object):
# Plug the member networks into the new amphora
failover_amphora_flow.add(network_tasks.CalculateAmphoraDelta(
requires=(constants.LOADBALANCER, constants.AMPHORA),
requires=(constants.LOADBALANCER, constants.AMPHORA,
constants.AVAILABILITY_ZONE),
provides=constants.DELTA))
failover_amphora_flow.add(network_tasks.HandleNetworkDelta(

@ -145,7 +145,8 @@ class LoadBalancerFlows(object):
)
flows.append(
network_tasks.CalculateDelta(
requires=constants.LOADBALANCER, provides=constants.DELTAS
requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
provides=constants.DELTAS
)
)
flows.append(

@ -40,7 +40,7 @@ class MemberFlows(object):
create_member_flow.add(database_tasks.MarkMemberPendingCreateInDB(
requires=constants.MEMBER))
create_member_flow.add(network_tasks.CalculateDelta(
requires=constants.LOADBALANCER,
requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
provides=constants.DELTAS))
create_member_flow.add(network_tasks.HandleNetworkDeltas(
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
@ -185,7 +185,7 @@ class MemberFlows(object):
# Done, do real updates
batch_update_members_flow.add(network_tasks.CalculateDelta(
requires=constants.LOADBALANCER,
requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
provides=constants.DELTAS))
batch_update_members_flow.add(network_tasks.HandleNetworkDeltas(
requires=constants.DELTAS, provides=constants.ADDED_PORTS))

@ -48,14 +48,19 @@ class CalculateAmphoraDelta(BaseNetworkTask):
default_provides = constants.DELTA
def execute(self, loadbalancer, amphora):
def execute(self, loadbalancer, amphora, availability_zone):
LOG.debug("Calculating network delta for amphora id: %s", amphora.id)
# Figure out what networks we want
# seed with lb network(s)
vrrp_port = self.network_driver.get_port(amphora.vrrp_port_id)
desired_network_ids = {vrrp_port.network_id}.union(
CONF.controller_worker.amp_boot_network_list)
if availability_zone:
management_nets = (
[availability_zone.get(constants.MANAGEMENT_NETWORK)] or
CONF.controller_worker.amp_boot_network_list)
else:
management_nets = CONF.controller_worker.amp_boot_network_list
desired_network_ids = {vrrp_port.network_id}.union(management_nets)
for pool in loadbalancer.pools:
member_networks = [
@ -92,13 +97,15 @@ class CalculateDelta(BaseNetworkTask):
default_provides = constants.DELTAS
def execute(self, loadbalancer):
def execute(self, loadbalancer, availability_zone):
"""Compute which NICs need to be plugged
for the amphora to become operational.
:param loadbalancer: the loadbalancer to calculate deltas for all
amphorae
:param availability_zone: availability zone metadata dict
:returns: dict of octavia.network.data_models.Delta keyed off amphora
id
"""
@ -109,7 +116,8 @@ class CalculateDelta(BaseNetworkTask):
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
delta = calculate_amp.execute(loadbalancer, amphora)
delta = calculate_amp.execute(loadbalancer, amphora,
availability_zone)
deltas[amphora.id] = delta
return deltas

@ -438,13 +438,22 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
store = {
constants.MEMBER: member,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb,
constants.POOL_ID: pool.id}
if load_balancer.availability_zone:
store[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), load_balancer.availability_zone))
else:
store[constants.AVAILABILITY_ZONE] = {}
create_member_tf = self._taskflow_load(
self._member_flows.get_create_member_flow(),
store={constants.MEMBER: member,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb,
constants.POOL_ID: pool.id})
store=store)
with tf_logging.DynamicLoggingListener(create_member_tf,
log=LOG):
create_member_tf.run()
@ -467,16 +476,23 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
store = {
constants.MEMBER: member,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb,
constants.POOL_ID: pool.id,
constants.PROJECT_ID: load_balancer.project_id}
if load_balancer.availability_zone:
store[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), load_balancer.availability_zone))
else:
store[constants.AVAILABILITY_ZONE] = {}
delete_member_tf = self._taskflow_load(
self._member_flows.get_delete_member_flow(),
store={constants.MEMBER: member,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id,
constants.POOL_ID: pool.id,
constants.PROJECT_ID: load_balancer.project_id
}
store=store
)
with tf_logging.DynamicLoggingListener(delete_member_tf,
log=LOG):
@ -513,14 +529,23 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
load_balancer).to_dict()
store = {
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb,
constants.POOL_ID: pool.id,
constants.PROJECT_ID: load_balancer.project_id}
if load_balancer.availability_zone:
store[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), load_balancer.availability_zone))
else:
store[constants.AVAILABILITY_ZONE] = {}
batch_update_members_tf = self._taskflow_load(
self._member_flows.get_batch_update_members_flow(
provider_old_members, new_members, updated_members),
store={constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id,
constants.POOL_ID: pool.id,
constants.PROJECT_ID: load_balancer.project_id})
store=store)
with tf_logging.DynamicLoggingListener(batch_update_members_tf,
log=LOG):
batch_update_members_tf.run()
@ -544,14 +569,23 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
store = {
constants.MEMBER: member,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb,
constants.POOL_ID: pool.id,
constants.UPDATE_DICT: member_updates}
if load_balancer.availability_zone:
store[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), load_balancer.availability_zone))
else:
store[constants.AVAILABILITY_ZONE] = {}
update_member_tf = self._taskflow_load(
self._member_flows.get_update_member_flow(),
store={constants.MEMBER: member,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id,
constants.POOL_ID: pool.id,
constants.UPDATE_DICT: member_updates})
store=store)
with tf_logging.DynamicLoggingListener(update_member_tf,
log=LOG):
update_member_tf.run()

@ -526,7 +526,8 @@ class AmphoraFlows(object):
# Plug the member networks into the new amphora
failover_amphora_flow.add(network_tasks.CalculateAmphoraDelta(
requires=(constants.LOADBALANCER, constants.AMPHORA),
requires=(constants.LOADBALANCER, constants.AMPHORA,
constants.AVAILABILITY_ZONE),
provides=constants.DELTA))
failover_amphora_flow.add(network_tasks.HandleNetworkDelta(

@ -149,7 +149,8 @@ class LoadBalancerFlows(object):
)
flows.append(
network_tasks.CalculateDelta(
requires=constants.LOADBALANCER, provides=constants.DELTAS
requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
provides=constants.DELTAS
)
)
flows.append(

@ -39,7 +39,7 @@ class MemberFlows(object):
create_member_flow.add(database_tasks.MarkMemberPendingCreateInDB(
requires=constants.MEMBER))
create_member_flow.add(network_tasks.CalculateDelta(
requires=constants.LOADBALANCER,
requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
provides=constants.DELTAS))
create_member_flow.add(network_tasks.HandleNetworkDeltas(
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
@ -180,7 +180,7 @@ class MemberFlows(object):
# Done, do real updates
batch_update_members_flow.add(network_tasks.CalculateDelta(
requires=constants.LOADBALANCER,
requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
provides=constants.DELTAS))
batch_update_members_flow.add(network_tasks.HandleNetworkDeltas(
requires=constants.DELTAS, provides=constants.ADDED_PORTS))

@ -53,7 +53,7 @@ class CalculateAmphoraDelta(BaseNetworkTask):
default_provides = constants.DELTA
def execute(self, loadbalancer, amphora):
def execute(self, loadbalancer, amphora, availability_zone):
LOG.debug("Calculating network delta for amphora id: %s",
amphora.get(constants.ID))
@ -61,8 +61,13 @@ class CalculateAmphoraDelta(BaseNetworkTask):
# seed with lb network(s)
vrrp_port = self.network_driver.get_port(
amphora[constants.VRRP_PORT_ID])
desired_network_ids = {vrrp_port.network_id}.union(
CONF.controller_worker.amp_boot_network_list)
if availability_zone:
management_nets = (
[availability_zone.get(constants.MANAGEMENT_NETWORK)] or
CONF.controller_worker.amp_boot_network_list)
else:
management_nets = CONF.controller_worker.amp_boot_network_list
desired_network_ids = {vrrp_port.network_id}.union(management_nets)
db_lb = self.loadbalancer_repo.get(
db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID])
for pool in db_lb.pools:
@ -102,13 +107,15 @@ class CalculateDelta(BaseNetworkTask):
default_provides = constants.DELTAS
def execute(self, loadbalancer):
def execute(self, loadbalancer, availability_zone):
"""Compute which NICs need to be plugged
for the amphora to become operational.
:param loadbalancer: the loadbalancer to calculate deltas for all
amphorae
:param availability_zone: availability zone metadata dict
:returns: dict of octavia.network.data_models.Delta keyed off amphora
id
"""
@ -121,7 +128,8 @@ class CalculateDelta(BaseNetworkTask):
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
db_lb.amphorae):
delta = calculate_amp.execute(loadbalancer, amphora.to_dict())
delta = calculate_amp.execute(loadbalancer, amphora.to_dict(),
availability_zone)
deltas[amphora.id] = delta
return deltas

@ -40,8 +40,13 @@ class TestMemberFlows(base.TestCase):
self.assertIn(constants.LISTENERS, member_flow.requires)
self.assertIn(constants.LOADBALANCER, member_flow.requires)
self.assertIn(constants.POOL, member_flow.requires)
self.assertIn(constants.MEMBER, member_flow.requires)
self.assertIn(constants.AVAILABILITY_ZONE, member_flow.requires)
self.assertEqual(4, len(member_flow.requires))
self.assertIn(constants.DELTAS, member_flow.provides)
self.assertIn(constants.ADDED_PORTS, member_flow.provides)
self.assertEqual(5, len(member_flow.requires))
self.assertEqual(2, len(member_flow.provides))
def test_get_delete_member_flow(self, mock_get_net_driver):
@ -83,6 +88,10 @@ class TestMemberFlows(base.TestCase):
self.assertIn(constants.LISTENERS, member_flow.requires)
self.assertIn(constants.LOADBALANCER, member_flow.requires)
self.assertIn(constants.POOL, member_flow.requires)
self.assertIn(constants.AVAILABILITY_ZONE, member_flow.requires)
self.assertEqual(3, len(member_flow.requires))
self.assertIn(constants.DELTAS, member_flow.provides)
self.assertIn(constants.ADDED_PORTS, member_flow.provides)
self.assertEqual(4, len(member_flow.requires))
self.assertEqual(2, len(member_flow.provides))

@ -96,7 +96,8 @@ class TestNetworkTasks(base.TestCase):
calc_delta = network_tasks.CalculateDelta()
self.assertEqual(EMPTY, calc_delta.execute(self.load_balancer_mock))
self.assertEqual(EMPTY,
calc_delta.execute(self.load_balancer_mock, {}))
# Test with one amp and no pools, nothing plugged
# Delta should be empty
@ -107,7 +108,7 @@ class TestNetworkTasks(base.TestCase):
self.load_balancer_mock.pools = []
self.assertEqual(empty_deltas,
calc_delta.execute(self.load_balancer_mock))
calc_delta.execute(self.load_balancer_mock, {}))
mock_driver.get_plugged_networks.assert_called_once_with(COMPUTE_ID)
# Pool mock should be configured explicitly for each test
@ -118,7 +119,7 @@ class TestNetworkTasks(base.TestCase):
# Delta should be empty
pool_mock.members = []
self.assertEqual(empty_deltas,
calc_delta.execute(self.load_balancer_mock))
calc_delta.execute(self.load_balancer_mock, {}))
# Test with one amp and one pool and one member, nothing plugged
# Delta should be one additional subnet to plug
@ -135,7 +136,7 @@ class TestNetworkTasks(base.TestCase):
data_models.Interface(network_id=2)],
delete_nics=[])
self.assertEqual({self.amphora_mock.id: ndm},
calc_delta.execute(self.load_balancer_mock))
calc_delta.execute(self.load_balancer_mock, {}))
vrrp_port_call = mock.call(self.amphora_mock.vrrp_port_id)
mock_driver.get_port.assert_has_calls([vrrp_port_call])
@ -155,7 +156,7 @@ class TestNetworkTasks(base.TestCase):
data_models.Interface(network_id=2)]
self.assertEqual(empty_deltas,
calc_delta.execute(self.load_balancer_mock))
calc_delta.execute(self.load_balancer_mock, {}))
# Test with one amp and one pool and one member, wrong network plugged
# Delta should be one network to add and one to remove
@ -173,7 +174,7 @@ class TestNetworkTasks(base.TestCase):
delete_nics=[
data_models.Interface(network_id=3)])
self.assertEqual({self.amphora_mock.id: ndm},
calc_delta.execute(self.load_balancer_mock))
calc_delta.execute(self.load_balancer_mock, {}))
# Test with one amp and one pool and no members, one network plugged
# Delta should be one network to remove
@ -188,7 +189,7 @@ class TestNetworkTasks(base.TestCase):
delete_nics=[
data_models.Interface(network_id=2)])
self.assertEqual({self.amphora_mock.id: ndm},
calc_delta.execute(self.load_balancer_mock))
calc_delta.execute(self.load_balancer_mock, {}))
def test_get_plumbed_networks(self, mock_get_net_driver):
mock_driver = mock.MagicMock()

@ -748,7 +748,10 @@ class TestControllerWorker(base.TestCase):
@mock.patch('octavia.controller.worker.v1.flows.'
'member_flows.MemberFlows.get_create_member_flow',
return_value=_flow_mock)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict')
def test_create_member(self,
mock_get_az_metadata_dict,
mock_get_create_member_flow,
mock_api_get_session,
mock_dyn_log_listener,
@ -763,20 +766,20 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_get_az_metadata_dict.return_value = {}
mock_member_repo_get.side_effect = [None, _member_mock]
cw = controller_worker.ControllerWorker()
cw.create_member(MEMBER_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.MEMBER: _member_mock,
constants.LISTENERS:
[_listener_mock],
constants.LOADBALANCER:
_load_balancer_mock,
constants.POOL:
_pool_mock}))
assert_called_once_with(
_flow_mock,
store={constants.MEMBER: _member_mock,
constants.LISTENERS: [_listener_mock],
constants.LOADBALANCER: _load_balancer_mock,
constants.POOL: _pool_mock,
constants.AVAILABILITY_ZONE: {}}))
_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_member_repo_get.call_count)
@ -784,7 +787,10 @@ class TestControllerWorker(base.TestCase):
@mock.patch('octavia.controller.worker.v1.flows.'
'member_flows.MemberFlows.get_delete_member_flow',
return_value=_flow_mock)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict')
def test_delete_member(self,
mock_get_az_metadata_dict,
mock_get_delete_member_flow,
mock_api_get_session,
mock_dyn_log_listener,
@ -799,7 +805,7 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_get_az_metadata_dict.return_value = {}
cw = controller_worker.ControllerWorker()
cw.delete_member(MEMBER_ID)
@ -811,14 +817,18 @@ class TestControllerWorker(base.TestCase):
constants.LOADBALANCER:
_load_balancer_mock,
constants.POOL:
_pool_mock}))
_pool_mock,
constants.AVAILABILITY_ZONE: {}}))
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.v1.flows.'
'member_flows.MemberFlows.get_update_member_flow',
return_value=_flow_mock)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict')
def test_update_member(self,
mock_get_az_metadata_dict,
mock_get_update_member_flow,
mock_api_get_session,
mock_dyn_log_listener,
@ -834,7 +844,7 @@ class TestControllerWorker(base.TestCase):
_flow_mock.reset_mock()
_member_mock.provisioning_status = constants.PENDING_UPDATE
mock_get_az_metadata_dict.return_value = {}
cw = controller_worker.ControllerWorker()
cw.update_member(MEMBER_ID, MEMBER_UPDATE_DICT)
@ -848,14 +858,18 @@ class TestControllerWorker(base.TestCase):
constants.POOL:
_pool_mock,
constants.UPDATE_DICT:
MEMBER_UPDATE_DICT}))
MEMBER_UPDATE_DICT,
constants.AVAILABILITY_ZONE: {}}))
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.v1.flows.'
'member_flows.MemberFlows.get_batch_update_members_flow',
return_value=_flow_mock)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict')
def test_batch_update_members(self,
mock_get_az_metadata_dict,
mock_get_batch_update_members_flow,
mock_api_get_session,
mock_dyn_log_listener,
@ -870,7 +884,7 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_get_az_metadata_dict.return_value = {}
cw = controller_worker.ControllerWorker()
cw.batch_update_members([9], [11], [MEMBER_UPDATE_DICT])
@ -880,7 +894,8 @@ class TestControllerWorker(base.TestCase):
constants.LISTENERS: [_listener_mock],
constants.LOADBALANCER:
_load_balancer_mock,
constants.POOL: _pool_mock}))
constants.POOL: _pool_mock,
constants.AVAILABILITY_ZONE: {}}))
_flow_mock.run.assert_called_once_with()

@ -41,8 +41,13 @@ class TestMemberFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER, member_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, member_flow.requires)
self.assertIn(constants.POOL_ID, member_flow.requires)
self.assertIn(constants.MEMBER, member_flow.requires)
self.assertIn(constants.AVAILABILITY_ZONE, member_flow.requires)
self.assertEqual(5, len(member_flow.requires))
self.assertIn(constants.DELTAS, member_flow.provides)
self.assertIn(constants.ADDED_PORTS, member_flow.provides)
self.assertEqual(6, len(member_flow.requires))
self.assertEqual(2, len(member_flow.provides))
def test_get_delete_member_flow(self, mock_get_net_driver):
@ -88,6 +93,10 @@ class TestMemberFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER, member_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, member_flow.requires)
self.assertIn(constants.POOL_ID, member_flow.requires)
self.assertIn(constants.AVAILABILITY_ZONE, member_flow.requires)
self.assertEqual(4, len(member_flow.requires))
self.assertIn(constants.DELTAS, member_flow.provides)
self.assertIn(constants.ADDED_PORTS, member_flow.provides)
self.assertEqual(5, len(member_flow.requires))
self.assertEqual(2, len(member_flow.provides))

@ -122,7 +122,8 @@ class TestNetworkTasks(base.TestCase):
calc_delta = network_tasks.CalculateDelta()
self.assertEqual(EMPTY, calc_delta.execute(self.load_balancer_mock))
self.assertEqual(EMPTY,
calc_delta.execute(self.load_balancer_mock, {}))
# Test with one amp and no pools, nothing plugged
# Delta should be empty
@ -132,7 +133,7 @@ class TestNetworkTasks(base.TestCase):
self.db_load_balancer_mock.amphorae = [self.db_amphora_mock]
self.db_load_balancer_mock.pools = []
self.assertEqual(empty_deltas,
calc_delta.execute(self.load_balancer_mock))
calc_delta.execute(self.load_balancer_mock, {}))
mock_driver.get_plugged_networks.assert_called_once_with(COMPUTE_ID)
# Pool mock should be configured explicitly for each test
@ -143,7 +144,7 @@ class TestNetworkTasks(base.TestCase):
# Delta should be empty
pool_mock.members = []
self.assertEqual(empty_deltas,
calc_delta.execute(self.load_balancer_mock))
calc_delta.execute(self.load_balancer_mock, {}))
# Test with one amp and one pool and one member, nothing plugged
# Delta should be one additional subnet to plug
@ -160,7 +161,7 @@ class TestNetworkTasks(base.TestCase):
data_models.Interface(network_id=3)],
delete_nics=[]).to_dict(recurse=True)
self.assertEqual({self.db_amphora_mock.id: ndm},
calc_delta.execute(self.load_balancer_mock))
calc_delta.execute(self.load_balancer_mock, {}))
vrrp_port_call = mock.call(PORT_ID)
mock_driver.get_port.assert_has_calls([vrrp_port_call])
@ -181,7 +182,7 @@ class TestNetworkTasks(base.TestCase):
data_models.Interface(network_id='netid')]
self.assertEqual(empty_deltas,
calc_delta.execute(self.load_balancer_mock))
calc_delta.execute(self.load_balancer_mock, {}))
# Test with one amp and one pool and one member, wrong network plugged
# Delta should be one network to add and one to remove
@ -201,7 +202,7 @@ class TestNetworkTasks(base.TestCase):
data_models.Interface(network_id=2)]
).to_dict(recurse=True)
self.assertEqual({self.db_amphora_mock.id: ndm},
calc_delta.execute(self.load_balancer_mock))
calc_delta.execute(self.load_balancer_mock, {}))
# Test with one amp and one pool and no members, one network plugged
# Delta should be one network to remove
@ -219,7 +220,7 @@ class TestNetworkTasks(base.TestCase):
data_models.Interface(network_id=2)]
).to_dict(recurse=True)
self.assertEqual({self.db_amphora_mock.id: ndm},
calc_delta.execute(self.load_balancer_mock))
calc_delta.execute(self.load_balancer_mock, {}))
def test_get_plumbed_networks(self, mock_get_net_driver):
mock_driver = mock.MagicMock()

@ -807,7 +807,10 @@ class TestControllerWorker(base.TestCase):
@mock.patch('octavia.controller.worker.v2.flows.'
'member_flows.MemberFlows.get_create_member_flow',
return_value=_flow_mock)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict')
def test_create_member(self,
mock_get_az_metadata_dict,
mock_get_create_member_flow,
mock_api_get_session,
mock_dyn_log_listener,
@ -822,6 +825,7 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_get_az_metadata_dict.return_value = {}
mock_member_repo_get.side_effect = [None, _member_mock]
_member = _member_mock.to_dict()
cw = controller_worker.ControllerWorker()
@ -830,23 +834,24 @@ class TestControllerWorker(base.TestCase):
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
_db_load_balancer_mock).to_dict()
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.MEMBER: _member,
constants.LISTENERS:
[self.ref_listener_dict],
constants.LOADBALANCER_ID:
LB_ID,
constants.LOADBALANCER:
provider_lb,
constants.POOL_ID:
POOL_ID}))
assert_called_once_with(
_flow_mock,
store={constants.MEMBER: _member,
constants.LISTENERS: [self.ref_listener_dict],
constants.LOADBALANCER_ID: LB_ID,
constants.LOADBALANCER: provider_lb,
constants.POOL_ID: POOL_ID,
constants.AVAILABILITY_ZONE: {}}))
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.v2.flows.'
'member_flows.MemberFlows.get_delete_member_flow',
return_value=_flow_mock)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict')
def test_delete_member(self,
mock_get_az_metadata_dict,
mock_get_delete_member_flow,
mock_api_get_session,
mock_dyn_log_listener,
@ -862,6 +867,7 @@ class TestControllerWorker(base.TestCase):
_flow_mock.reset_mock()
_member = _member_mock.to_dict()
mock_get_az_metadata_dict.return_value = {}
cw = controller_worker.ControllerWorker()
cw.delete_member(_member)
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
@ -878,14 +884,18 @@ class TestControllerWorker(base.TestCase):
provider_lb,
constants.POOL_ID:
POOL_ID,
constants.PROJECT_ID: PROJECT_ID}))
constants.PROJECT_ID: PROJECT_ID,
constants.AVAILABILITY_ZONE: {}}))
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.v2.flows.'
'member_flows.MemberFlows.get_update_member_flow',
return_value=_flow_mock)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict')
def test_update_member(self,
mock_get_az_metadata_dict,
mock_get_update_member_flow,
mock_api_get_session,
mock_dyn_log_listener,
@ -902,7 +912,7 @@ class TestControllerWorker(base.TestCase):
_flow_mock.reset_mock()
_member = _member_mock.to_dict()
_member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE
mock_get_az_metadata_dict.return_value = {}
cw = controller_worker.ControllerWorker()
cw.update_member(_member, MEMBER_UPDATE_DICT)
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
@ -920,14 +930,18 @@ class TestControllerWorker(base.TestCase):
constants.LOADBALANCER_ID:
LB_ID,
constants.UPDATE_DICT:
MEMBER_UPDATE_DICT}))
MEMBER_UPDATE_DICT,
constants.AVAILABILITY_ZONE: {}}))
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.v2.flows.'
'member_flows.MemberFlows.get_batch_update_members_flow',
return_value=_flow_mock)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict')
def test_batch_update_members(self,
mock_get_az_metadata_dict,
mock_get_batch_update_members_flow,
mock_api_get_session,
mock_dyn_log_listener,
@ -942,7 +956,7 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_get_az_metadata_dict.return_value = {}
cw = controller_worker.ControllerWorker()
cw.batch_update_members([{constants.MEMBER_ID: 9,
constants.POOL_ID: 'testtest'}],
@ -957,7 +971,8 @@ class TestControllerWorker(base.TestCase):
constants.LOADBALANCER_ID: LB_ID,
constants.LOADBALANCER: provider_lb,
constants.POOL_ID: POOL_ID,
constants.PROJECT_ID: PROJECT_ID}))
constants.PROJECT_ID: PROJECT_ID,
constants.AVAILABILITY_ZONE: {}}))
_flow_mock.run.assert_called_once_with()