From 741397f1a9e0c5feda90f85b752a30ed7c23faf1 Mon Sep 17 00:00:00 2001
From: Adam Harwell <flux.adam@gmail.com>
Date: Thu, 30 Jan 2020 23:27:14 -0800
Subject: [PATCH] Network Delta calculations should respect AZs

The network delta calculations were all based on the static configured
amp_boot_network_list which is not correct if it's overridden by the AZ.

Change-Id: Ia930e17c76cd601ac005de10fb03231a19f1a776
---
 .../controller/worker/v1/controller_worker.py | 81 ++++++++++++------
 .../worker/v1/flows/amphora_flows.py          |  3 +-
 .../worker/v1/flows/load_balancer_flows.py    |  3 +-
 .../worker/v1/flows/member_flows.py           |  4 +-
 .../worker/v1/tasks/network_tasks.py          | 18 ++--
 .../controller/worker/v2/controller_worker.py | 82 +++++++++++++------
 .../worker/v2/flows/amphora_flows.py          |  3 +-
 .../worker/v2/flows/load_balancer_flows.py    |  3 +-
 .../worker/v2/flows/member_flows.py           |  4 +-
 .../worker/v2/tasks/network_tasks.py          | 18 ++--
 .../worker/v1/flows/test_member_flows.py      | 13 ++-
 .../worker/v1/tasks/test_network_tasks.py     | 15 ++--
 .../worker/v1/test_controller_worker.py       | 43 ++++++----
 .../worker/v2/flows/test_member_flows.py      | 13 ++-
 .../worker/v2/tasks/test_network_tasks.py     | 15 ++--
 .../worker/v2/test_controller_worker.py       | 45 ++++++----
 16 files changed, 249 insertions(+), 114 deletions(-)

diff --git a/octavia/controller/worker/v1/controller_worker.py b/octavia/controller/worker/v1/controller_worker.py
index 08ab2714a9..a33a43c08e 100644
--- a/octavia/controller/worker/v1/controller_worker.py
+++ b/octavia/controller/worker/v1/controller_worker.py
@@ -431,14 +431,21 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
         listeners = pool.listeners
         load_balancer = pool.load_balancer
 
-        create_member_tf = self._taskflow_load(self._member_flows.
-                                               get_create_member_flow(),
-                                               store={constants.MEMBER: member,
-                                                      constants.LISTENERS:
-                                                          listeners,
-                                                      constants.LOADBALANCER:
-                                                          load_balancer,
-                                                      constants.POOL: pool})
+        store = {
+            constants.MEMBER: member,
+            constants.LISTENERS: listeners,
+            constants.LOADBALANCER: load_balancer,
+            constants.POOL: pool}
+        if load_balancer.availability_zone:
+            store[constants.AVAILABILITY_ZONE] = (
+                self._az_repo.get_availability_zone_metadata_dict(
+                    db_apis.get_session(), load_balancer.availability_zone))
+        else:
+            store[constants.AVAILABILITY_ZONE] = {}
+
+        create_member_tf = self._taskflow_load(
+            self._member_flows.get_create_member_flow(),
+            store=store)
         with tf_logging.DynamicLoggingListener(create_member_tf,
                                                log=LOG):
             create_member_tf.run()
@@ -456,10 +463,21 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
         listeners = pool.listeners
         load_balancer = pool.load_balancer
 
+        store = {
+            constants.MEMBER: member,
+            constants.LISTENERS: listeners,
+            constants.LOADBALANCER: load_balancer,
+            constants.POOL: pool}
+        if load_balancer.availability_zone:
+            store[constants.AVAILABILITY_ZONE] = (
+                self._az_repo.get_availability_zone_metadata_dict(
+                    db_apis.get_session(), load_balancer.availability_zone))
+        else:
+            store[constants.AVAILABILITY_ZONE] = {}
+
         delete_member_tf = self._taskflow_load(
             self._member_flows.get_delete_member_flow(),
-            store={constants.MEMBER: member, constants.LISTENERS: listeners,
-                   constants.LOADBALANCER: load_balancer, constants.POOL: pool}
+            store=store
         )
         with tf_logging.DynamicLoggingListener(delete_member_tf,
                                                log=LOG):
@@ -483,12 +501,21 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
         listeners = pool.listeners
         load_balancer = pool.load_balancer
 
+        store = {
+            constants.LISTENERS: listeners,
+            constants.LOADBALANCER: load_balancer,
+            constants.POOL: pool}
+        if load_balancer.availability_zone:
+            store[constants.AVAILABILITY_ZONE] = (
+                self._az_repo.get_availability_zone_metadata_dict(
+                    db_apis.get_session(), load_balancer.availability_zone))
+        else:
+            store[constants.AVAILABILITY_ZONE] = {}
+
         batch_update_members_tf = self._taskflow_load(
             self._member_flows.get_batch_update_members_flow(
                 old_members, new_members, updated_members),
-            store={constants.LISTENERS: listeners,
-                   constants.LOADBALANCER: load_balancer,
-                   constants.POOL: pool})
+            store=store)
         with tf_logging.DynamicLoggingListener(batch_update_members_tf,
                                                log=LOG):
             batch_update_members_tf.run()
@@ -501,7 +528,6 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
         :returns: None
         :raises MemberNotFound: The referenced member was not found
         """
-        member = None
         try:
             member = self._get_db_obj_until_pending_update(
                 self._member_repo, member_id)
@@ -517,17 +543,22 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
         listeners = pool.listeners
         load_balancer = pool.load_balancer
 
-        update_member_tf = self._taskflow_load(self._member_flows.
-                                               get_update_member_flow(),
-                                               store={constants.MEMBER: member,
-                                                      constants.LISTENERS:
-                                                          listeners,
-                                                      constants.LOADBALANCER:
-                                                          load_balancer,
-                                                      constants.POOL:
-                                                          pool,
-                                                      constants.UPDATE_DICT:
-                                                          member_updates})
+        store = {
+            constants.MEMBER: member,
+            constants.LISTENERS: listeners,
+            constants.LOADBALANCER: load_balancer,
+            constants.POOL: pool,
+            constants.UPDATE_DICT: member_updates}
+        if load_balancer.availability_zone:
+            store[constants.AVAILABILITY_ZONE] = (
+                self._az_repo.get_availability_zone_metadata_dict(
+                    db_apis.get_session(), load_balancer.availability_zone))
+        else:
+            store[constants.AVAILABILITY_ZONE] = {}
+
+        update_member_tf = self._taskflow_load(
+            self._member_flows.get_update_member_flow(),
+            store=store)
         with tf_logging.DynamicLoggingListener(update_member_tf,
                                                log=LOG):
             update_member_tf.run()
diff --git a/octavia/controller/worker/v1/flows/amphora_flows.py b/octavia/controller/worker/v1/flows/amphora_flows.py
index 956788c617..523569e0ac 100644
--- a/octavia/controller/worker/v1/flows/amphora_flows.py
+++ b/octavia/controller/worker/v1/flows/amphora_flows.py
@@ -491,7 +491,8 @@ class AmphoraFlows(object):
 
         # Plug the member networks into the new amphora
         failover_amphora_flow.add(network_tasks.CalculateAmphoraDelta(
-            requires=(constants.LOADBALANCER, constants.AMPHORA),
+            requires=(constants.LOADBALANCER, constants.AMPHORA,
+                      constants.AVAILABILITY_ZONE),
             provides=constants.DELTA))
 
         failover_amphora_flow.add(network_tasks.HandleNetworkDelta(
diff --git a/octavia/controller/worker/v1/flows/load_balancer_flows.py b/octavia/controller/worker/v1/flows/load_balancer_flows.py
index 82478bd623..ddc34a2128 100644
--- a/octavia/controller/worker/v1/flows/load_balancer_flows.py
+++ b/octavia/controller/worker/v1/flows/load_balancer_flows.py
@@ -145,7 +145,8 @@ class LoadBalancerFlows(object):
         )
         flows.append(
             network_tasks.CalculateDelta(
-                requires=constants.LOADBALANCER, provides=constants.DELTAS
+                requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
+                provides=constants.DELTAS
             )
         )
         flows.append(
diff --git a/octavia/controller/worker/v1/flows/member_flows.py b/octavia/controller/worker/v1/flows/member_flows.py
index deefd88eec..854e50f57a 100644
--- a/octavia/controller/worker/v1/flows/member_flows.py
+++ b/octavia/controller/worker/v1/flows/member_flows.py
@@ -40,7 +40,7 @@ class MemberFlows(object):
         create_member_flow.add(database_tasks.MarkMemberPendingCreateInDB(
             requires=constants.MEMBER))
         create_member_flow.add(network_tasks.CalculateDelta(
-            requires=constants.LOADBALANCER,
+            requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
             provides=constants.DELTAS))
         create_member_flow.add(network_tasks.HandleNetworkDeltas(
             requires=constants.DELTAS, provides=constants.ADDED_PORTS))
@@ -185,7 +185,7 @@ class MemberFlows(object):
 
         # Done, do real updates
         batch_update_members_flow.add(network_tasks.CalculateDelta(
-            requires=constants.LOADBALANCER,
+            requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
             provides=constants.DELTAS))
         batch_update_members_flow.add(network_tasks.HandleNetworkDeltas(
             requires=constants.DELTAS, provides=constants.ADDED_PORTS))
diff --git a/octavia/controller/worker/v1/tasks/network_tasks.py b/octavia/controller/worker/v1/tasks/network_tasks.py
index 0f1535f88d..529ffc9355 100644
--- a/octavia/controller/worker/v1/tasks/network_tasks.py
+++ b/octavia/controller/worker/v1/tasks/network_tasks.py
@@ -48,14 +48,19 @@ class CalculateAmphoraDelta(BaseNetworkTask):
 
     default_provides = constants.DELTA
 
-    def execute(self, loadbalancer, amphora):
+    def execute(self, loadbalancer, amphora, availability_zone):
         LOG.debug("Calculating network delta for amphora id: %s", amphora.id)
 
         # Figure out what networks we want
         # seed with lb network(s)
         vrrp_port = self.network_driver.get_port(amphora.vrrp_port_id)
-        desired_network_ids = {vrrp_port.network_id}.union(
-            CONF.controller_worker.amp_boot_network_list)
+        if availability_zone:
+            management_nets = (
+                [availability_zone.get(constants.MANAGEMENT_NETWORK)] or
+                CONF.controller_worker.amp_boot_network_list)
+        else:
+            management_nets = CONF.controller_worker.amp_boot_network_list
+        desired_network_ids = {vrrp_port.network_id}.union(management_nets)
 
         for pool in loadbalancer.pools:
             member_networks = [
@@ -92,13 +97,15 @@ class CalculateDelta(BaseNetworkTask):
 
     default_provides = constants.DELTAS
 
-    def execute(self, loadbalancer):
+    def execute(self, loadbalancer, availability_zone):
         """Compute which NICs need to be plugged
 
         for the amphora to become operational.
 
         :param loadbalancer: the loadbalancer to calculate deltas for all
                              amphorae
+        :param availability_zone: availability zone metadata dict
+
         :returns: dict of octavia.network.data_models.Delta keyed off amphora
                   id
         """
@@ -109,7 +116,8 @@ class CalculateDelta(BaseNetworkTask):
             lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
                 loadbalancer.amphorae):
 
-            delta = calculate_amp.execute(loadbalancer, amphora)
+            delta = calculate_amp.execute(loadbalancer, amphora,
+                                          availability_zone)
             deltas[amphora.id] = delta
         return deltas
 
diff --git a/octavia/controller/worker/v2/controller_worker.py b/octavia/controller/worker/v2/controller_worker.py
index 77417c96e7..e1984e4c60 100644
--- a/octavia/controller/worker/v2/controller_worker.py
+++ b/octavia/controller/worker/v2/controller_worker.py
@@ -438,13 +438,22 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
                 pool.listeners))
 
+        store = {
+            constants.MEMBER: member,
+            constants.LISTENERS: listeners_dicts,
+            constants.LOADBALANCER_ID: load_balancer.id,
+            constants.LOADBALANCER: provider_lb,
+            constants.POOL_ID: pool.id}
+        if load_balancer.availability_zone:
+            store[constants.AVAILABILITY_ZONE] = (
+                self._az_repo.get_availability_zone_metadata_dict(
+                    db_apis.get_session(), load_balancer.availability_zone))
+        else:
+            store[constants.AVAILABILITY_ZONE] = {}
+
         create_member_tf = self._taskflow_load(
             self._member_flows.get_create_member_flow(),
-            store={constants.MEMBER: member,
-                   constants.LISTENERS: listeners_dicts,
-                   constants.LOADBALANCER_ID: load_balancer.id,
-                   constants.LOADBALANCER: provider_lb,
-                   constants.POOL_ID: pool.id})
+            store=store)
         with tf_logging.DynamicLoggingListener(create_member_tf,
                                                log=LOG):
             create_member_tf.run()
@@ -467,16 +476,23 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
                 pool.listeners))
 
+        store = {
+            constants.MEMBER: member,
+            constants.LISTENERS: listeners_dicts,
+            constants.LOADBALANCER_ID: load_balancer.id,
+            constants.LOADBALANCER: provider_lb,
+            constants.POOL_ID: pool.id,
+            constants.PROJECT_ID: load_balancer.project_id}
+        if load_balancer.availability_zone:
+            store[constants.AVAILABILITY_ZONE] = (
+                self._az_repo.get_availability_zone_metadata_dict(
+                    db_apis.get_session(), load_balancer.availability_zone))
+        else:
+            store[constants.AVAILABILITY_ZONE] = {}
+
         delete_member_tf = self._taskflow_load(
             self._member_flows.get_delete_member_flow(),
-            store={constants.MEMBER: member,
-                   constants.LISTENERS: listeners_dicts,
-                   constants.LOADBALANCER: provider_lb,
-                   constants.LOADBALANCER_ID: load_balancer.id,
-                   constants.POOL_ID: pool.id,
-                   constants.PROJECT_ID: load_balancer.project_id
-                   }
-
+            store=store
         )
         with tf_logging.DynamicLoggingListener(delete_member_tf,
                                                log=LOG):
@@ -513,14 +529,23 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
             load_balancer).to_dict()
 
+        store = {
+            constants.LISTENERS: listeners_dicts,
+            constants.LOADBALANCER_ID: load_balancer.id,
+            constants.LOADBALANCER: provider_lb,
+            constants.POOL_ID: pool.id,
+            constants.PROJECT_ID: load_balancer.project_id}
+        if load_balancer.availability_zone:
+            store[constants.AVAILABILITY_ZONE] = (
+                self._az_repo.get_availability_zone_metadata_dict(
+                    db_apis.get_session(), load_balancer.availability_zone))
+        else:
+            store[constants.AVAILABILITY_ZONE] = {}
+
         batch_update_members_tf = self._taskflow_load(
             self._member_flows.get_batch_update_members_flow(
                 provider_old_members, new_members, updated_members),
-            store={constants.LISTENERS: listeners_dicts,
-                   constants.LOADBALANCER: provider_lb,
-                   constants.LOADBALANCER_ID: load_balancer.id,
-                   constants.POOL_ID: pool.id,
-                   constants.PROJECT_ID: load_balancer.project_id})
+            store=store)
         with tf_logging.DynamicLoggingListener(batch_update_members_tf,
                                                log=LOG):
             batch_update_members_tf.run()
@@ -544,14 +569,23 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
                 pool.listeners))
 
+        store = {
+            constants.MEMBER: member,
+            constants.LISTENERS: listeners_dicts,
+            constants.LOADBALANCER_ID: load_balancer.id,
+            constants.LOADBALANCER: provider_lb,
+            constants.POOL_ID: pool.id,
+            constants.UPDATE_DICT: member_updates}
+        if load_balancer.availability_zone:
+            store[constants.AVAILABILITY_ZONE] = (
+                self._az_repo.get_availability_zone_metadata_dict(
+                    db_apis.get_session(), load_balancer.availability_zone))
+        else:
+            store[constants.AVAILABILITY_ZONE] = {}
+
         update_member_tf = self._taskflow_load(
             self._member_flows.get_update_member_flow(),
-            store={constants.MEMBER: member,
-                   constants.LISTENERS: listeners_dicts,
-                   constants.LOADBALANCER: provider_lb,
-                   constants.LOADBALANCER_ID: load_balancer.id,
-                   constants.POOL_ID: pool.id,
-                   constants.UPDATE_DICT: member_updates})
+            store=store)
         with tf_logging.DynamicLoggingListener(update_member_tf,
                                                log=LOG):
             update_member_tf.run()
diff --git a/octavia/controller/worker/v2/flows/amphora_flows.py b/octavia/controller/worker/v2/flows/amphora_flows.py
index a0746c42c8..926bbeacd8 100644
--- a/octavia/controller/worker/v2/flows/amphora_flows.py
+++ b/octavia/controller/worker/v2/flows/amphora_flows.py
@@ -526,7 +526,8 @@ class AmphoraFlows(object):
 
         # Plug the member networks into the new amphora
         failover_amphora_flow.add(network_tasks.CalculateAmphoraDelta(
-            requires=(constants.LOADBALANCER, constants.AMPHORA),
+            requires=(constants.LOADBALANCER, constants.AMPHORA,
+                      constants.AVAILABILITY_ZONE),
             provides=constants.DELTA))
 
         failover_amphora_flow.add(network_tasks.HandleNetworkDelta(
diff --git a/octavia/controller/worker/v2/flows/load_balancer_flows.py b/octavia/controller/worker/v2/flows/load_balancer_flows.py
index fd46ae633f..e601bae86e 100644
--- a/octavia/controller/worker/v2/flows/load_balancer_flows.py
+++ b/octavia/controller/worker/v2/flows/load_balancer_flows.py
@@ -149,7 +149,8 @@ class LoadBalancerFlows(object):
         )
         flows.append(
             network_tasks.CalculateDelta(
-                requires=constants.LOADBALANCER, provides=constants.DELTAS
+                requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
+                provides=constants.DELTAS
             )
         )
         flows.append(
diff --git a/octavia/controller/worker/v2/flows/member_flows.py b/octavia/controller/worker/v2/flows/member_flows.py
index b97c4e5c1d..b1927c83d9 100644
--- a/octavia/controller/worker/v2/flows/member_flows.py
+++ b/octavia/controller/worker/v2/flows/member_flows.py
@@ -39,7 +39,7 @@ class MemberFlows(object):
         create_member_flow.add(database_tasks.MarkMemberPendingCreateInDB(
             requires=constants.MEMBER))
         create_member_flow.add(network_tasks.CalculateDelta(
-            requires=constants.LOADBALANCER,
+            requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
             provides=constants.DELTAS))
         create_member_flow.add(network_tasks.HandleNetworkDeltas(
             requires=constants.DELTAS, provides=constants.ADDED_PORTS))
@@ -180,7 +180,7 @@ class MemberFlows(object):
 
         # Done, do real updates
         batch_update_members_flow.add(network_tasks.CalculateDelta(
-            requires=constants.LOADBALANCER,
+            requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
             provides=constants.DELTAS))
         batch_update_members_flow.add(network_tasks.HandleNetworkDeltas(
             requires=constants.DELTAS, provides=constants.ADDED_PORTS))
diff --git a/octavia/controller/worker/v2/tasks/network_tasks.py b/octavia/controller/worker/v2/tasks/network_tasks.py
index 2f415ea96a..2b2af707ed 100644
--- a/octavia/controller/worker/v2/tasks/network_tasks.py
+++ b/octavia/controller/worker/v2/tasks/network_tasks.py
@@ -53,7 +53,7 @@ class CalculateAmphoraDelta(BaseNetworkTask):
 
     default_provides = constants.DELTA
 
-    def execute(self, loadbalancer, amphora):
+    def execute(self, loadbalancer, amphora, availability_zone):
         LOG.debug("Calculating network delta for amphora id: %s",
                   amphora.get(constants.ID))
 
@@ -61,8 +61,13 @@ class CalculateAmphoraDelta(BaseNetworkTask):
         # seed with lb network(s)
         vrrp_port = self.network_driver.get_port(
             amphora[constants.VRRP_PORT_ID])
-        desired_network_ids = {vrrp_port.network_id}.union(
-            CONF.controller_worker.amp_boot_network_list)
+        if availability_zone:
+            management_nets = (
+                [availability_zone.get(constants.MANAGEMENT_NETWORK)] or
+                CONF.controller_worker.amp_boot_network_list)
+        else:
+            management_nets = CONF.controller_worker.amp_boot_network_list
+        desired_network_ids = {vrrp_port.network_id}.union(management_nets)
         db_lb = self.loadbalancer_repo.get(
             db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID])
         for pool in db_lb.pools:
@@ -102,13 +107,15 @@ class CalculateDelta(BaseNetworkTask):
 
     default_provides = constants.DELTAS
 
-    def execute(self, loadbalancer):
+    def execute(self, loadbalancer, availability_zone):
         """Compute which NICs need to be plugged
 
         for the amphora to become operational.
 
         :param loadbalancer: the loadbalancer to calculate deltas for all
                              amphorae
+        :param availability_zone: availability zone metadata dict
+
         :returns: dict of octavia.network.data_models.Delta keyed off amphora
                   id
         """
@@ -121,7 +128,8 @@ class CalculateDelta(BaseNetworkTask):
             lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
                 db_lb.amphorae):
 
-            delta = calculate_amp.execute(loadbalancer, amphora.to_dict())
+            delta = calculate_amp.execute(loadbalancer, amphora.to_dict(),
+                                          availability_zone)
             deltas[amphora.id] = delta
         return deltas
 
diff --git a/octavia/tests/unit/controller/worker/v1/flows/test_member_flows.py b/octavia/tests/unit/controller/worker/v1/flows/test_member_flows.py
index 534fca8790..138dfb5056 100644
--- a/octavia/tests/unit/controller/worker/v1/flows/test_member_flows.py
+++ b/octavia/tests/unit/controller/worker/v1/flows/test_member_flows.py
@@ -40,8 +40,13 @@ class TestMemberFlows(base.TestCase):
         self.assertIn(constants.LISTENERS, member_flow.requires)
         self.assertIn(constants.LOADBALANCER, member_flow.requires)
         self.assertIn(constants.POOL, member_flow.requires)
+        self.assertIn(constants.MEMBER, member_flow.requires)
+        self.assertIn(constants.AVAILABILITY_ZONE, member_flow.requires)
 
-        self.assertEqual(4, len(member_flow.requires))
+        self.assertIn(constants.DELTAS, member_flow.provides)
+        self.assertIn(constants.ADDED_PORTS, member_flow.provides)
+
+        self.assertEqual(5, len(member_flow.requires))
         self.assertEqual(2, len(member_flow.provides))
 
     def test_get_delete_member_flow(self, mock_get_net_driver):
@@ -83,6 +88,10 @@ class TestMemberFlows(base.TestCase):
         self.assertIn(constants.LISTENERS, member_flow.requires)
         self.assertIn(constants.LOADBALANCER, member_flow.requires)
         self.assertIn(constants.POOL, member_flow.requires)
+        self.assertIn(constants.AVAILABILITY_ZONE, member_flow.requires)
 
-        self.assertEqual(3, len(member_flow.requires))
+        self.assertIn(constants.DELTAS, member_flow.provides)
+        self.assertIn(constants.ADDED_PORTS, member_flow.provides)
+
+        self.assertEqual(4, len(member_flow.requires))
         self.assertEqual(2, len(member_flow.provides))
diff --git a/octavia/tests/unit/controller/worker/v1/tasks/test_network_tasks.py b/octavia/tests/unit/controller/worker/v1/tasks/test_network_tasks.py
index 92de9b7ae7..711673136f 100644
--- a/octavia/tests/unit/controller/worker/v1/tasks/test_network_tasks.py
+++ b/octavia/tests/unit/controller/worker/v1/tasks/test_network_tasks.py
@@ -96,7 +96,8 @@ class TestNetworkTasks(base.TestCase):
 
         calc_delta = network_tasks.CalculateDelta()
 
-        self.assertEqual(EMPTY, calc_delta.execute(self.load_balancer_mock))
+        self.assertEqual(EMPTY,
+                         calc_delta.execute(self.load_balancer_mock, {}))
 
         # Test with one amp and no pools, nothing plugged
         # Delta should be empty
@@ -107,7 +108,7 @@ class TestNetworkTasks(base.TestCase):
         self.load_balancer_mock.pools = []
 
         self.assertEqual(empty_deltas,
-                         calc_delta.execute(self.load_balancer_mock))
+                         calc_delta.execute(self.load_balancer_mock, {}))
         mock_driver.get_plugged_networks.assert_called_once_with(COMPUTE_ID)
 
         # Pool mock should be configured explicitly for each test
@@ -118,7 +119,7 @@ class TestNetworkTasks(base.TestCase):
         # Delta should be empty
         pool_mock.members = []
         self.assertEqual(empty_deltas,
-                         calc_delta.execute(self.load_balancer_mock))
+                         calc_delta.execute(self.load_balancer_mock, {}))
 
         # Test with one amp and one pool and one member, nothing plugged
         # Delta should be one additional subnet to plug
@@ -135,7 +136,7 @@ class TestNetworkTasks(base.TestCase):
                                     data_models.Interface(network_id=2)],
                                 delete_nics=[])
         self.assertEqual({self.amphora_mock.id: ndm},
-                         calc_delta.execute(self.load_balancer_mock))
+                         calc_delta.execute(self.load_balancer_mock, {}))
 
         vrrp_port_call = mock.call(self.amphora_mock.vrrp_port_id)
         mock_driver.get_port.assert_has_calls([vrrp_port_call])
@@ -155,7 +156,7 @@ class TestNetworkTasks(base.TestCase):
             data_models.Interface(network_id=2)]
 
         self.assertEqual(empty_deltas,
-                         calc_delta.execute(self.load_balancer_mock))
+                         calc_delta.execute(self.load_balancer_mock, {}))
 
         # Test with one amp and one pool and one member, wrong network plugged
         # Delta should be one network to add and one to remove
@@ -173,7 +174,7 @@ class TestNetworkTasks(base.TestCase):
                                 delete_nics=[
                                     data_models.Interface(network_id=3)])
         self.assertEqual({self.amphora_mock.id: ndm},
-                         calc_delta.execute(self.load_balancer_mock))
+                         calc_delta.execute(self.load_balancer_mock, {}))
 
         # Test with one amp and one pool and no members, one network plugged
         # Delta should be one network to remove
@@ -188,7 +189,7 @@ class TestNetworkTasks(base.TestCase):
                                 delete_nics=[
                                     data_models.Interface(network_id=2)])
         self.assertEqual({self.amphora_mock.id: ndm},
-                         calc_delta.execute(self.load_balancer_mock))
+                         calc_delta.execute(self.load_balancer_mock, {}))
 
     def test_get_plumbed_networks(self, mock_get_net_driver):
         mock_driver = mock.MagicMock()
diff --git a/octavia/tests/unit/controller/worker/v1/test_controller_worker.py b/octavia/tests/unit/controller/worker/v1/test_controller_worker.py
index a82d5882fa..ec872ffa1b 100644
--- a/octavia/tests/unit/controller/worker/v1/test_controller_worker.py
+++ b/octavia/tests/unit/controller/worker/v1/test_controller_worker.py
@@ -748,7 +748,10 @@ class TestControllerWorker(base.TestCase):
     @mock.patch('octavia.controller.worker.v1.flows.'
                 'member_flows.MemberFlows.get_create_member_flow',
                 return_value=_flow_mock)
+    @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
+                'get_availability_zone_metadata_dict')
     def test_create_member(self,
+                           mock_get_az_metadata_dict,
                            mock_get_create_member_flow,
                            mock_api_get_session,
                            mock_dyn_log_listener,
@@ -763,20 +766,20 @@ class TestControllerWorker(base.TestCase):
                            mock_amp_repo_get):
 
         _flow_mock.reset_mock()
+        mock_get_az_metadata_dict.return_value = {}
         mock_member_repo_get.side_effect = [None, _member_mock]
 
         cw = controller_worker.ControllerWorker()
         cw.create_member(MEMBER_ID)
 
         (base_taskflow.BaseTaskFlowEngine._taskflow_load.
-            assert_called_once_with(_flow_mock,
-                                    store={constants.MEMBER: _member_mock,
-                                           constants.LISTENERS:
-                                               [_listener_mock],
-                                           constants.LOADBALANCER:
-                                               _load_balancer_mock,
-                                           constants.POOL:
-                                               _pool_mock}))
+            assert_called_once_with(
+                _flow_mock,
+                store={constants.MEMBER: _member_mock,
+                       constants.LISTENERS: [_listener_mock],
+                       constants.LOADBALANCER: _load_balancer_mock,
+                       constants.POOL: _pool_mock,
+                       constants.AVAILABILITY_ZONE: {}}))
 
         _flow_mock.run.assert_called_once_with()
         self.assertEqual(2, mock_member_repo_get.call_count)
@@ -784,7 +787,10 @@ class TestControllerWorker(base.TestCase):
     @mock.patch('octavia.controller.worker.v1.flows.'
                 'member_flows.MemberFlows.get_delete_member_flow',
                 return_value=_flow_mock)
+    @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
+                'get_availability_zone_metadata_dict')
     def test_delete_member(self,
+                           mock_get_az_metadata_dict,
                            mock_get_delete_member_flow,
                            mock_api_get_session,
                            mock_dyn_log_listener,
@@ -799,7 +805,7 @@ class TestControllerWorker(base.TestCase):
                            mock_amp_repo_get):
 
         _flow_mock.reset_mock()
-
+        mock_get_az_metadata_dict.return_value = {}
         cw = controller_worker.ControllerWorker()
         cw.delete_member(MEMBER_ID)
 
@@ -811,14 +817,18 @@ class TestControllerWorker(base.TestCase):
                                    constants.LOADBALANCER:
                                        _load_balancer_mock,
                                    constants.POOL:
-                                       _pool_mock}))
+                                       _pool_mock,
+                                   constants.AVAILABILITY_ZONE: {}}))
 
         _flow_mock.run.assert_called_once_with()
 
     @mock.patch('octavia.controller.worker.v1.flows.'
                 'member_flows.MemberFlows.get_update_member_flow',
                 return_value=_flow_mock)
+    @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
+                'get_availability_zone_metadata_dict')
     def test_update_member(self,
+                           mock_get_az_metadata_dict,
                            mock_get_update_member_flow,
                            mock_api_get_session,
                            mock_dyn_log_listener,
@@ -834,7 +844,7 @@ class TestControllerWorker(base.TestCase):
 
         _flow_mock.reset_mock()
         _member_mock.provisioning_status = constants.PENDING_UPDATE
-
+        mock_get_az_metadata_dict.return_value = {}
         cw = controller_worker.ControllerWorker()
         cw.update_member(MEMBER_ID, MEMBER_UPDATE_DICT)
 
@@ -848,14 +858,18 @@ class TestControllerWorker(base.TestCase):
                                            constants.POOL:
                                                _pool_mock,
                                            constants.UPDATE_DICT:
-                                               MEMBER_UPDATE_DICT}))
+                                               MEMBER_UPDATE_DICT,
+                                           constants.AVAILABILITY_ZONE: {}}))
 
         _flow_mock.run.assert_called_once_with()
 
     @mock.patch('octavia.controller.worker.v1.flows.'
                 'member_flows.MemberFlows.get_batch_update_members_flow',
                 return_value=_flow_mock)
+    @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
+                'get_availability_zone_metadata_dict')
     def test_batch_update_members(self,
+                                  mock_get_az_metadata_dict,
                                   mock_get_batch_update_members_flow,
                                   mock_api_get_session,
                                   mock_dyn_log_listener,
@@ -870,7 +884,7 @@ class TestControllerWorker(base.TestCase):
                                   mock_amp_repo_get):
 
         _flow_mock.reset_mock()
-
+        mock_get_az_metadata_dict.return_value = {}
         cw = controller_worker.ControllerWorker()
         cw.batch_update_members([9], [11], [MEMBER_UPDATE_DICT])
 
@@ -880,7 +894,8 @@ class TestControllerWorker(base.TestCase):
                                         constants.LISTENERS: [_listener_mock],
                                         constants.LOADBALANCER:
                                             _load_balancer_mock,
-                                        constants.POOL: _pool_mock}))
+                                        constants.POOL: _pool_mock,
+                                        constants.AVAILABILITY_ZONE: {}}))
 
         _flow_mock.run.assert_called_once_with()
 
diff --git a/octavia/tests/unit/controller/worker/v2/flows/test_member_flows.py b/octavia/tests/unit/controller/worker/v2/flows/test_member_flows.py
index 551d071edc..0713669fd1 100644
--- a/octavia/tests/unit/controller/worker/v2/flows/test_member_flows.py
+++ b/octavia/tests/unit/controller/worker/v2/flows/test_member_flows.py
@@ -41,8 +41,13 @@ class TestMemberFlows(base.TestCase):
         self.assertIn(constants.LOADBALANCER, member_flow.requires)
         self.assertIn(constants.LOADBALANCER_ID, member_flow.requires)
         self.assertIn(constants.POOL_ID, member_flow.requires)
+        self.assertIn(constants.MEMBER, member_flow.requires)
+        self.assertIn(constants.AVAILABILITY_ZONE, member_flow.requires)
 
-        self.assertEqual(5, len(member_flow.requires))
+        self.assertIn(constants.DELTAS, member_flow.provides)
+        self.assertIn(constants.ADDED_PORTS, member_flow.provides)
+
+        self.assertEqual(6, len(member_flow.requires))
         self.assertEqual(2, len(member_flow.provides))
 
     def test_get_delete_member_flow(self, mock_get_net_driver):
@@ -88,6 +93,10 @@ class TestMemberFlows(base.TestCase):
         self.assertIn(constants.LOADBALANCER, member_flow.requires)
         self.assertIn(constants.LOADBALANCER_ID, member_flow.requires)
         self.assertIn(constants.POOL_ID, member_flow.requires)
+        self.assertIn(constants.AVAILABILITY_ZONE, member_flow.requires)
 
-        self.assertEqual(4, len(member_flow.requires))
+        self.assertIn(constants.DELTAS, member_flow.provides)
+        self.assertIn(constants.ADDED_PORTS, member_flow.provides)
+
+        self.assertEqual(5, len(member_flow.requires))
         self.assertEqual(2, len(member_flow.provides))
diff --git a/octavia/tests/unit/controller/worker/v2/tasks/test_network_tasks.py b/octavia/tests/unit/controller/worker/v2/tasks/test_network_tasks.py
index bfae0f8248..538144a7c4 100644
--- a/octavia/tests/unit/controller/worker/v2/tasks/test_network_tasks.py
+++ b/octavia/tests/unit/controller/worker/v2/tasks/test_network_tasks.py
@@ -122,7 +122,8 @@ class TestNetworkTasks(base.TestCase):
 
         calc_delta = network_tasks.CalculateDelta()
 
-        self.assertEqual(EMPTY, calc_delta.execute(self.load_balancer_mock))
+        self.assertEqual(EMPTY,
+                         calc_delta.execute(self.load_balancer_mock, {}))
 
         # Test with one amp and no pools, nothing plugged
         # Delta should be empty
@@ -132,7 +133,7 @@ class TestNetworkTasks(base.TestCase):
         self.db_load_balancer_mock.amphorae = [self.db_amphora_mock]
         self.db_load_balancer_mock.pools = []
         self.assertEqual(empty_deltas,
-                         calc_delta.execute(self.load_balancer_mock))
+                         calc_delta.execute(self.load_balancer_mock, {}))
         mock_driver.get_plugged_networks.assert_called_once_with(COMPUTE_ID)
 
         # Pool mock should be configured explicitly for each test
@@ -143,7 +144,7 @@ class TestNetworkTasks(base.TestCase):
         # Delta should be empty
         pool_mock.members = []
         self.assertEqual(empty_deltas,
-                         calc_delta.execute(self.load_balancer_mock))
+                         calc_delta.execute(self.load_balancer_mock, {}))
 
         # Test with one amp and one pool and one member, nothing plugged
         # Delta should be one additional subnet to plug
@@ -160,7 +161,7 @@ class TestNetworkTasks(base.TestCase):
                                     data_models.Interface(network_id=3)],
                                 delete_nics=[]).to_dict(recurse=True)
         self.assertEqual({self.db_amphora_mock.id: ndm},
-                         calc_delta.execute(self.load_balancer_mock))
+                         calc_delta.execute(self.load_balancer_mock, {}))
 
         vrrp_port_call = mock.call(PORT_ID)
         mock_driver.get_port.assert_has_calls([vrrp_port_call])
@@ -181,7 +182,7 @@ class TestNetworkTasks(base.TestCase):
             data_models.Interface(network_id='netid')]
 
         self.assertEqual(empty_deltas,
-                         calc_delta.execute(self.load_balancer_mock))
+                         calc_delta.execute(self.load_balancer_mock, {}))
 
         # Test with one amp and one pool and one member, wrong network plugged
         # Delta should be one network to add and one to remove
@@ -201,7 +202,7 @@ class TestNetworkTasks(base.TestCase):
                                     data_models.Interface(network_id=2)]
                                 ).to_dict(recurse=True)
         self.assertEqual({self.db_amphora_mock.id: ndm},
-                         calc_delta.execute(self.load_balancer_mock))
+                         calc_delta.execute(self.load_balancer_mock, {}))
 
         # Test with one amp and one pool and no members, one network plugged
         # Delta should be one network to remove
@@ -219,7 +220,7 @@ class TestNetworkTasks(base.TestCase):
                                     data_models.Interface(network_id=2)]
                                 ).to_dict(recurse=True)
         self.assertEqual({self.db_amphora_mock.id: ndm},
-                         calc_delta.execute(self.load_balancer_mock))
+                         calc_delta.execute(self.load_balancer_mock, {}))
 
     def test_get_plumbed_networks(self, mock_get_net_driver):
         mock_driver = mock.MagicMock()
diff --git a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py
index 851f63e27e..9face93ede 100644
--- a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py
+++ b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py
@@ -807,7 +807,10 @@ class TestControllerWorker(base.TestCase):
     @mock.patch('octavia.controller.worker.v2.flows.'
                 'member_flows.MemberFlows.get_create_member_flow',
                 return_value=_flow_mock)
+    @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
+                'get_availability_zone_metadata_dict')
     def test_create_member(self,
+                           mock_get_az_metadata_dict,
                            mock_get_create_member_flow,
                            mock_api_get_session,
                            mock_dyn_log_listener,
@@ -822,6 +825,7 @@ class TestControllerWorker(base.TestCase):
                            mock_amp_repo_get):
 
         _flow_mock.reset_mock()
+        mock_get_az_metadata_dict.return_value = {}
         mock_member_repo_get.side_effect = [None, _member_mock]
         _member = _member_mock.to_dict()
         cw = controller_worker.ControllerWorker()
@@ -830,23 +834,24 @@ class TestControllerWorker(base.TestCase):
         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
             _db_load_balancer_mock).to_dict()
         (base_taskflow.BaseTaskFlowEngine._taskflow_load.
-            assert_called_once_with(_flow_mock,
-                                    store={constants.MEMBER: _member,
-                                           constants.LISTENERS:
-                                               [self.ref_listener_dict],
-                                           constants.LOADBALANCER_ID:
-                                               LB_ID,
-                                           constants.LOADBALANCER:
-                                               provider_lb,
-                                           constants.POOL_ID:
-                                               POOL_ID}))
+            assert_called_once_with(
+                _flow_mock,
+                store={constants.MEMBER: _member,
+                       constants.LISTENERS: [self.ref_listener_dict],
+                       constants.LOADBALANCER_ID: LB_ID,
+                       constants.LOADBALANCER: provider_lb,
+                       constants.POOL_ID: POOL_ID,
+                       constants.AVAILABILITY_ZONE: {}}))
 
         _flow_mock.run.assert_called_once_with()
 
     @mock.patch('octavia.controller.worker.v2.flows.'
                 'member_flows.MemberFlows.get_delete_member_flow',
                 return_value=_flow_mock)
+    @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
+                'get_availability_zone_metadata_dict')
     def test_delete_member(self,
+                           mock_get_az_metadata_dict,
                            mock_get_delete_member_flow,
                            mock_api_get_session,
                            mock_dyn_log_listener,
@@ -862,6 +867,7 @@ class TestControllerWorker(base.TestCase):
 
         _flow_mock.reset_mock()
         _member = _member_mock.to_dict()
+        mock_get_az_metadata_dict.return_value = {}
         cw = controller_worker.ControllerWorker()
         cw.delete_member(_member)
         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
@@ -878,14 +884,18 @@ class TestControllerWorker(base.TestCase):
                                        provider_lb,
                                    constants.POOL_ID:
                                        POOL_ID,
-                                   constants.PROJECT_ID: PROJECT_ID}))
+                                   constants.PROJECT_ID: PROJECT_ID,
+                                   constants.AVAILABILITY_ZONE: {}}))
 
         _flow_mock.run.assert_called_once_with()
 
     @mock.patch('octavia.controller.worker.v2.flows.'
                 'member_flows.MemberFlows.get_update_member_flow',
                 return_value=_flow_mock)
+    @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
+                'get_availability_zone_metadata_dict')
     def test_update_member(self,
+                           mock_get_az_metadata_dict,
                            mock_get_update_member_flow,
                            mock_api_get_session,
                            mock_dyn_log_listener,
@@ -902,7 +912,7 @@ class TestControllerWorker(base.TestCase):
         _flow_mock.reset_mock()
         _member = _member_mock.to_dict()
         _member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE
-
+        mock_get_az_metadata_dict.return_value = {}
         cw = controller_worker.ControllerWorker()
         cw.update_member(_member, MEMBER_UPDATE_DICT)
         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
@@ -920,14 +930,18 @@ class TestControllerWorker(base.TestCase):
                                            constants.LOADBALANCER_ID:
                                                LB_ID,
                                            constants.UPDATE_DICT:
-                                               MEMBER_UPDATE_DICT}))
+                                               MEMBER_UPDATE_DICT,
+                                           constants.AVAILABILITY_ZONE: {}}))
 
         _flow_mock.run.assert_called_once_with()
 
     @mock.patch('octavia.controller.worker.v2.flows.'
                 'member_flows.MemberFlows.get_batch_update_members_flow',
                 return_value=_flow_mock)
+    @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
+                'get_availability_zone_metadata_dict')
     def test_batch_update_members(self,
+                                  mock_get_az_metadata_dict,
                                   mock_get_batch_update_members_flow,
                                   mock_api_get_session,
                                   mock_dyn_log_listener,
@@ -942,7 +956,7 @@ class TestControllerWorker(base.TestCase):
                                   mock_amp_repo_get):
 
         _flow_mock.reset_mock()
-
+        mock_get_az_metadata_dict.return_value = {}
         cw = controller_worker.ControllerWorker()
         cw.batch_update_members([{constants.MEMBER_ID: 9,
                                   constants.POOL_ID: 'testtest'}],
@@ -957,7 +971,8 @@ class TestControllerWorker(base.TestCase):
                        constants.LOADBALANCER_ID: LB_ID,
                        constants.LOADBALANCER: provider_lb,
                        constants.POOL_ID: POOL_ID,
-                       constants.PROJECT_ID: PROJECT_ID}))
+                       constants.PROJECT_ID: PROJECT_ID,
+                       constants.AVAILABILITY_ZONE: {}}))
 
         _flow_mock.run.assert_called_once_with()