diff --git a/octavia/api/drivers/amphora_driver/v2/driver.py b/octavia/api/drivers/amphora_driver/v2/driver.py index 1d5785e0c2..9b63322268 100644 --- a/octavia/api/drivers/amphora_driver/v2/driver.py +++ b/octavia/api/drivers/amphora_driver/v2/driver.py @@ -196,22 +196,21 @@ class AmphoraProviderDriver(driver_base.ProviderDriver): id=pool_id) self._validate_members(db_pool, [member]) - payload = {consts.MEMBER_ID: member.member_id} + payload = {consts.MEMBER: member.to_dict()} self.client.cast({}, 'create_member', **payload) def member_delete(self, member): - member_id = member.member_id - payload = {consts.MEMBER_ID: member_id} + payload = {consts.MEMBER: member.to_dict()} self.client.cast({}, 'delete_member', **payload) def member_update(self, old_member, new_member): - member_dict = new_member.to_dict() - if 'admin_state_up' in member_dict: - member_dict['enabled'] = member_dict.pop('admin_state_up') - member_id = member_dict.pop('member_id') - - payload = {consts.MEMBER_ID: member_id, - consts.MEMBER_UPDATES: member_dict} + original_member = old_member.to_dict() + member_updates = new_member.to_dict() + if 'admin_state_up' in member_updates: + member_updates['enabled'] = member_updates.pop('admin_state_up') + member_updates.pop(consts.MEMBER_ID) + payload = {consts.ORIGINAL_MEMBER: original_member, + consts.MEMBER_UPDATES: member_updates} self.client.cast({}, 'update_member', **payload) def member_batch_update(self, pool_id, members): @@ -248,8 +247,8 @@ class AmphoraProviderDriver(driver_base.ProviderDriver): deleted_members.append(m) if deleted_members or new_members or updated_members: - payload = {'old_member_ids': [m.id for m in deleted_members], - 'new_member_ids': [m.member_id for m in new_members], + payload = {'old_members': [m.to_dict() for m in deleted_members], + 'new_members': [m.to_dict() for m in new_members], 'updated_members': updated_members} self.client.cast({}, 'batch_update_members', **payload) else: diff --git a/octavia/common/constants.py b/octavia/common/constants.py index 1cb007d381..ca9df63845 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -346,6 +346,7 @@ NETWORK_ID = 'network_id' NICS = 'nics' OBJECT = 'object' ORIGINAL_LISTENER = 'original_listener' +ORIGINAL_MEMBER = 'original_member' ORIGINAL_POOL = 'original_pool' PEER_PORT = 'peer_port' POOL = 'pool' diff --git a/octavia/controller/queue/v2/endpoints.py b/octavia/controller/queue/v2/endpoints.py index e213c0e30a..775e3d213a 100644 --- a/octavia/controller/queue/v2/endpoints.py +++ b/octavia/controller/queue/v2/endpoints.py @@ -100,28 +100,31 @@ class Endpoints(object): LOG.info('Deleting health monitor \'%s\'...', health_monitor_id) self.worker.delete_health_monitor(health_monitor_id) - def create_member(self, context, member_id): - LOG.info('Creating member \'%s\'...', member_id) - self.worker.create_member(member_id) + def create_member(self, context, member): + LOG.info('Creating member \'%s\'...', member.get(constants.ID)) + self.worker.create_member(member) - def update_member(self, context, member_id, member_updates): - LOG.info('Updating member \'%s\'...', member_id) - self.worker.update_member(member_id, member_updates) + def update_member(self, context, original_member, member_updates): + LOG.info('Updating member \'%s\'...', original_member.get( + constants.ID)) + self.worker.update_member(original_member, member_updates) - def batch_update_members(self, context, old_member_ids, new_member_ids, + def batch_update_members(self, context, old_members, new_members, updated_members): - updated_member_ids = [m.get('id') for m in updated_members] + updated_member_ids = [m.get(constants.ID) for m in updated_members] + new_member_ids = [m.get(constants.ID) for m in new_members] + old_member_ids = [m.get(constants.ID) for m in old_members] LOG.info( 'Batch updating members: old=\'%(old)s\', new=\'%(new)s\', ' 'updated=\'%(updated)s\'...', {'old': old_member_ids, 'new': new_member_ids, 'updated': updated_member_ids}) self.worker.batch_update_members( - old_member_ids, new_member_ids, updated_members) + old_members, new_members, updated_members) - def delete_member(self, context, member_id): - LOG.info('Deleting member \'%s\'...', member_id) - self.worker.delete_member(member_id) + def delete_member(self, context, member): + LOG.info('Deleting member \'%s\'...', member.get(constants.ID)) + self.worker.delete_member(member) def create_l7policy(self, context, l7policy_id): LOG.info('Creating l7policy \'%s\'...', l7policy_id) diff --git a/octavia/controller/worker/v2/controller_worker.py b/octavia/controller/worker/v2/controller_worker.py index 78c90dcffa..fc0061a42a 100644 --- a/octavia/controller/worker/v2/controller_worker.py +++ b/octavia/controller/worker/v2/controller_worker.py @@ -395,26 +395,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): log=LOG): update_lb_tf.run() - @tenacity.retry( - retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), - wait=tenacity.wait_incrementing( - RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX), - stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS)) - def create_member(self, member_id): + def create_member(self, member): """Creates a pool member. - :param member_id: ID of the member to create + :param member: A member provider dictionary to create :returns: None :raises NoSuitablePool: Unable to find the node pool """ - member = self._member_repo.get(db_apis.get_session(), - id=member_id) - if not member: - LOG.warning('Failed to fetch %s %s from DB. Retrying for up to ' - '60 seconds.', 'member', member_id) - raise db_exceptions.NoResultFound - - pool = member.pool + pool = self._pool_repo.get(db_apis.get_session(), + id=member[constants.POOL_ID]) load_balancer = pool.load_balancer listeners_dicts = ( @@ -432,16 +421,16 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): log=LOG): create_member_tf.run() - def delete_member(self, member_id): + def delete_member(self, member): """Deletes a pool member. - :param member_id: ID of the member to delete + :param member: A member provider dictionary to delete :returns: None :raises MemberNotFound: The referenced member was not found """ - member = self._member_repo.get(db_apis.get_session(), - id=member_id) - pool = member.pool + pool = self._pool_repo.get(db_apis.get_session(), + id=member[constants.POOL_ID]) + load_balancer = pool.load_balancer listeners_dicts = ( @@ -454,27 +443,38 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): constants.LISTENERS: listeners_dicts, constants.LOADBALANCER: load_balancer, constants.LOADBALANCER_ID: load_balancer.id, - constants.POOL_ID: pool.id} + constants.POOL_ID: pool.id, + constants.PROJECT_ID: load_balancer.project_id + } + ) with tf_logging.DynamicLoggingListener(delete_member_tf, log=LOG): delete_member_tf.run() - def batch_update_members(self, old_member_ids, new_member_ids, + def batch_update_members(self, old_members, new_members, updated_members): - old_members = [self._member_repo.get(db_apis.get_session(), id=mid) - for mid in old_member_ids] - new_members = [self._member_repo.get(db_apis.get_session(), id=mid) - for mid in new_member_ids] updated_members = [ - (self._member_repo.get(db_apis.get_session(), id=m.get('id')), m) + (provider_utils.db_member_to_provider_member( + self._member_repo.get(db_apis.get_session(), + id=m.get(constants.ID))).to_dict(), + m) for m in updated_members] + provider_old_members = [ + provider_utils.db_member_to_provider_member( + self._member_repo.get(db_apis.get_session(), + id=m.get(constants.ID))).to_dict() + for m in old_members] if old_members: - pool = old_members[0].pool + pool = self._pool_repo.get(db_apis.get_session(), + id=old_members[0][constants.POOL_ID]) elif new_members: - pool = new_members[0].pool + pool = self._pool_repo.get(db_apis.get_session(), + id=new_members[0][constants.POOL_ID]) else: - pool = updated_members[0][0].pool + pool = self._pool_repo.get( + db_apis.get_session(), + id=updated_members[0][0][constants.POOL_ID]) load_balancer = pool.load_balancer listeners_dicts = ( @@ -483,36 +483,27 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): batch_update_members_tf = self._taskflow_load( self._member_flows.get_batch_update_members_flow( - old_members, new_members, updated_members), + provider_old_members, new_members, updated_members), store={constants.LISTENERS: listeners_dicts, constants.LOADBALANCER: load_balancer, constants.LOADBALANCER_ID: load_balancer.id, - constants.POOL_ID: pool.id}) + constants.POOL_ID: pool.id, + constants.PROJECT_ID: load_balancer.project_id}) with tf_logging.DynamicLoggingListener(batch_update_members_tf, log=LOG): batch_update_members_tf.run() - def update_member(self, member_id, member_updates): + def update_member(self, member, member_updates): """Updates a pool member. - :param member_id: ID of the member to update + :param member_id: A member provider dictionary to update :param member_updates: Dict containing updated member attributes :returns: None :raises MemberNotFound: The referenced member was not found """ - member = None - try: - member = self._get_db_obj_until_pending_update( - self._member_repo, member_id) - except tenacity.RetryError as e: - LOG.warning('Member did not go into %s in 60 seconds. ' - 'This either due to an in-progress Octavia upgrade ' - 'or an overloaded and failing database. Assuming ' - 'an upgrade is in progress and continuing.', - constants.PENDING_UPDATE) - member = e.last_attempt.result() - - pool = member.pool + # TODO(ataraday) when other flows will use dicts - revisit this + pool = self._pool_repo.get(db_apis.get_session(), + id=member[constants.POOL_ID]) load_balancer = pool.load_balancer listeners_dicts = ( diff --git a/octavia/controller/worker/v2/flows/member_flows.py b/octavia/controller/worker/v2/flows/member_flows.py index ed05c179f7..6d12991539 100644 --- a/octavia/controller/worker/v2/flows/member_flows.py +++ b/octavia/controller/worker/v2/flows/member_flows.py @@ -77,7 +77,7 @@ class MemberFlows(object): delete_member_flow.add(database_tasks.DeleteMemberInDB( requires=constants.MEMBER)) delete_member_flow.add(database_tasks.DecrementMemberQuota( - requires=constants.MEMBER)) + requires=constants.PROJECT_ID)) delete_member_flow.add(database_tasks.MarkPoolActiveInDB( requires=constants.POOL_ID)) delete_member_flow.add(database_tasks. @@ -138,11 +138,13 @@ class MemberFlows(object): unordered_members_flow.add(database_tasks.DeleteMemberInDB( inject={constants.MEMBER: m}, name='{flow}-{id}'.format( - id=m.id, flow=constants.DELETE_MEMBER_INDB))) + id=m[constants.MEMBER_ID], + flow=constants.DELETE_MEMBER_INDB))) unordered_members_flow.add(database_tasks.DecrementMemberQuota( - inject={constants.MEMBER: m}, + requires=constants.PROJECT_ID, name='{flow}-{id}'.format( - id=m.id, flow=constants.DECREMENT_MEMBER_QUOTA_FLOW))) + id=m[constants.MEMBER_ID], + flow=constants.DECREMENT_MEMBER_QUOTA_FLOW))) # Create new members unordered_members_flow.add( @@ -155,7 +157,8 @@ class MemberFlows(object): database_tasks.MarkMemberActiveInDB( inject={constants.MEMBER: m}, name='{flow}-{id}'.format( - id=m.id, flow=constants.MARK_MEMBER_ACTIVE_INDB))) + id=m[constants.MEMBER_ID], + flow=constants.MARK_MEMBER_ACTIVE_INDB))) # Update existing members unordered_members_flow.add( @@ -165,12 +168,13 @@ class MemberFlows(object): name='{flow}-updated'.format( flow=constants.MEMBER_TO_ERROR_ON_REVERT_FLOW))) for m, um in updated_members: - um.pop('id', None) + um.pop(constants.ID, None) unordered_members_active_flow.add( database_tasks.MarkMemberActiveInDB( inject={constants.MEMBER: m}, name='{flow}-{id}'.format( - id=m.id, flow=constants.MARK_MEMBER_ACTIVE_INDB))) + id=m[constants.MEMBER_ID], + flow=constants.MARK_MEMBER_ACTIVE_INDB))) batch_update_members_flow.add(unordered_members_flow) diff --git a/octavia/controller/worker/v2/tasks/database_tasks.py b/octavia/controller/worker/v2/tasks/database_tasks.py index 982b3573d9..ca0e7e1e79 100644 --- a/octavia/controller/worker/v2/tasks/database_tasks.py +++ b/octavia/controller/worker/v2/tasks/database_tasks.py @@ -222,8 +222,9 @@ class DeleteMemberInDB(BaseDatabaseTask): :returns: None """ - LOG.debug("DB delete member for id: %s ", member.id) - self.member_repo.delete(db_apis.get_session(), id=member.id) + LOG.debug("DB delete member for id: %s ", member[constants.MEMBER_ID]) + self.member_repo.delete(db_apis.get_session(), + id=member[constants.MEMBER_ID]) def revert(self, member, *args, **kwargs): """Mark the member ERROR since the delete couldn't happen @@ -232,14 +233,16 @@ class DeleteMemberInDB(BaseDatabaseTask): :returns: None """ - LOG.warning("Reverting delete in DB for member id %s", member.id) + LOG.warning("Reverting delete in DB for member id %s", + member[constants.MEMBER_ID]) try: - self.member_repo.update(db_apis.get_session(), member.id, + self.member_repo.update(db_apis.get_session(), + member[constants.MEMBER_ID], provisioning_status=constants.ERROR) except Exception as e: LOG.error("Failed to update member %(mem)s " "provisioning_status to ERROR due to: %(except)s", - {'mem': member.id, 'except': e}) + {'mem': member[constants.MEMBER_ID], 'except': e}) class DeleteListenerInDB(BaseDatabaseTask): @@ -1034,7 +1037,7 @@ class MarkLBActiveInDB(BaseDatabaseTask): def _mark_member_status(self, member, status): self.member_repo.update( - db_apis.get_session(), member.id, + db_apis.get_session(), member[constants.MEMBER_ID], provisioning_status=status) def revert(self, loadbalancer, *args, **kwargs): @@ -1465,8 +1468,9 @@ class UpdateMemberInDB(BaseDatabaseTask): :returns: None """ - LOG.debug("Update DB for member id: %s ", member.id) - self.member_repo.update(db_apis.get_session(), member.id, + LOG.debug("Update DB for member id: %s ", member[constants.MEMBER_ID]) + self.member_repo.update(db_apis.get_session(), + member[constants.MEMBER_ID], **update_dict) def revert(self, member, *args, **kwargs): @@ -1477,14 +1481,15 @@ class UpdateMemberInDB(BaseDatabaseTask): """ LOG.warning("Reverting update member in DB " - "for member id %s", member.id) + "for member id %s", member[constants.MEMBER_ID]) try: - self.member_repo.update(db_apis.get_session(), member.id, + self.member_repo.update(db_apis.get_session(), + member[constants.MEMBER_ID], provisioning_status=constants.ERROR) except Exception as e: LOG.error("Failed to update member %(member)s provisioning_status " - "to ERROR due to: %(except)s", {'member': member.id, - 'except': e}) + "to ERROR due to: %(except)s", + {'member': member[constants.MEMBER_ID], 'except': e}) class UpdatePoolInDB(BaseDatabaseTask): @@ -2153,9 +2158,10 @@ class MarkMemberActiveInDB(BaseDatabaseTask): :returns: None """ - LOG.debug("Mark ACTIVE in DB for member id: %s", member.id) + LOG.debug("Mark ACTIVE in DB for member id: %s", + member[constants.MEMBER_ID]) self.member_repo.update(db_apis.get_session(), - member.id, + member[constants.MEMBER_ID], provisioning_status=constants.ACTIVE) def revert(self, member, *args, **kwargs): @@ -2166,8 +2172,9 @@ class MarkMemberActiveInDB(BaseDatabaseTask): """ LOG.warning("Reverting mark member ACTIVE in DB " - "for member id %s", member.id) - self.task_utils.mark_member_prov_status_error(member.id) + "for member id %s", member[constants.MEMBER_ID]) + self.task_utils.mark_member_prov_status_error( + member[constants.MEMBER_ID]) class MarkMemberPendingCreateInDB(BaseDatabaseTask): @@ -2183,9 +2190,10 @@ class MarkMemberPendingCreateInDB(BaseDatabaseTask): :returns: None """ - LOG.debug("Mark PENDING CREATE in DB for member id: %s", member.id) + LOG.debug("Mark PENDING CREATE in DB for member id: %s", + member[constants.MEMBER_ID]) self.member_repo.update(db_apis.get_session(), - member.id, + member[constants.MEMBER_ID], provisioning_status=constants.PENDING_CREATE) def revert(self, member, *args, **kwargs): @@ -2196,8 +2204,9 @@ class MarkMemberPendingCreateInDB(BaseDatabaseTask): """ LOG.warning("Reverting mark member pending create in DB " - "for member id %s", member.id) - self.task_utils.mark_member_prov_status_error(member.id) + "for member id %s", member[constants.MEMBER_ID]) + self.task_utils.mark_member_prov_status_error( + member[constants.MEMBER_ID]) class MarkMemberPendingDeleteInDB(BaseDatabaseTask): @@ -2213,9 +2222,10 @@ class MarkMemberPendingDeleteInDB(BaseDatabaseTask): :returns: None """ - LOG.debug("Mark PENDING DELETE in DB for member id: %s", member.id) + LOG.debug("Mark PENDING DELETE in DB for member id: %s", + member[constants.MEMBER_ID]) self.member_repo.update(db_apis.get_session(), - member.id, + member[constants.MEMBER_ID], provisioning_status=constants.PENDING_DELETE) def revert(self, member, *args, **kwargs): @@ -2226,8 +2236,9 @@ class MarkMemberPendingDeleteInDB(BaseDatabaseTask): """ LOG.warning("Reverting mark member pending delete in DB " - "for member id %s", member.id) - self.task_utils.mark_member_prov_status_error(member.id) + "for member id %s", member[constants.MEMBER_ID]) + self.task_utils.mark_member_prov_status_error( + member[constants.MEMBER_ID]) class MarkMemberPendingUpdateInDB(BaseDatabaseTask): @@ -2244,9 +2255,9 @@ class MarkMemberPendingUpdateInDB(BaseDatabaseTask): """ LOG.debug("Mark PENDING UPDATE in DB for member id: %s", - member.id) + member[constants.MEMBER_ID]) self.member_repo.update(db_apis.get_session(), - member.id, + member[constants.MEMBER_ID], provisioning_status=constants.PENDING_UPDATE) def revert(self, member, *args, **kwargs): @@ -2257,8 +2268,9 @@ class MarkMemberPendingUpdateInDB(BaseDatabaseTask): """ LOG.warning("Reverting mark member pending update in DB " - "for member id %s", member.id) - self.task_utils.mark_member_prov_status_error(member.id) + "for member id %s", member[constants.MEMBER_ID]) + self.task_utils.mark_member_prov_status_error( + member[constants.MEMBER_ID]) class MarkPoolActiveInDB(BaseDatabaseTask): @@ -2567,7 +2579,7 @@ class DecrementMemberQuota(BaseDatabaseTask): Since sqlalchemy will likely retry by itself always revert if it fails """ - def execute(self, member): + def execute(self, project_id): """Decrements the member quota. :param member: The member to decrement the quota on. @@ -2575,22 +2587,22 @@ class DecrementMemberQuota(BaseDatabaseTask): """ LOG.debug("Decrementing member quota for " - "project: %s ", member.project_id) + "project: %s ", project_id) lock_session = db_apis.get_session(autocommit=False) try: self.repos.decrement_quota(lock_session, data_models.Member, - member.project_id) + project_id) lock_session.commit() except Exception: with excutils.save_and_reraise_exception(): LOG.error('Failed to decrement member quota for project: ' '%(proj)s the project may have excess quota in use.', - {'proj': member.project_id}) + {'proj': project_id}) lock_session.rollback() - def revert(self, member, result, *args, **kwargs): + def revert(self, project_id, result, *args, **kwargs): """Re-apply the quota :param member: The member to decrement the quota on. @@ -2599,7 +2611,7 @@ class DecrementMemberQuota(BaseDatabaseTask): LOG.warning('Reverting decrement quota for member on project %(proj)s ' 'Project quota counts may be incorrect.', - {'proj': member.project_id}) + {'proj': project_id}) # Increment the quota back if this task wasn't the failure if not isinstance(result, failure.Failure): @@ -2611,7 +2623,7 @@ class DecrementMemberQuota(BaseDatabaseTask): self.repos.check_quota_met(session, lock_session, data_models.Member, - member.project_id) + project_id) lock_session.commit() except Exception: lock_session.rollback() diff --git a/octavia/controller/worker/v2/tasks/lifecycle_tasks.py b/octavia/controller/worker/v2/tasks/lifecycle_tasks.py index 70f0272a50..cd4edfc0d5 100644 --- a/octavia/controller/worker/v2/tasks/lifecycle_tasks.py +++ b/octavia/controller/worker/v2/tasks/lifecycle_tasks.py @@ -145,7 +145,8 @@ class MemberToErrorOnRevertTask(BaseLifecycleTask): def revert(self, member, listeners, loadbalancer, pool_id, *args, **kwargs): - self.task_utils.mark_member_prov_status_error(member.id) + self.task_utils.mark_member_prov_status_error( + member[constants.MEMBER_ID]) for listener in listeners: self.task_utils.mark_listener_prov_status_active( listener[constants.LISTENER_ID]) @@ -162,7 +163,8 @@ class MembersToErrorOnRevertTask(BaseLifecycleTask): def revert(self, members, listeners, loadbalancer, pool_id, *args, **kwargs): for m in members: - self.task_utils.mark_member_prov_status_error(m.id) + self.task_utils.mark_member_prov_status_error( + m[constants.MEMBER_ID]) for listener in listeners: self.task_utils.mark_listener_prov_status_active( listener[constants.LISTENER_ID]) diff --git a/octavia/tests/unit/api/drivers/amphora_driver/v2/test_amphora_driver.py b/octavia/tests/unit/api/drivers/amphora_driver/v2/test_amphora_driver.py index d0a53c979f..26d24ebd88 100644 --- a/octavia/tests/unit/api/drivers/amphora_driver/v2/test_amphora_driver.py +++ b/octavia/tests/unit/api/drivers/amphora_driver/v2/test_amphora_driver.py @@ -239,9 +239,9 @@ class TestAmphoraDriver(base.TestRpc): @mock.patch('oslo_messaging.RPCClient.cast') def test_member_create(self, mock_cast, mock_pool_get, mock_session): provider_member = driver_dm.Member( - member_id=self.sample_data.member1_id) + member_id=self.sample_data) self.amp_driver.member_create(provider_member) - payload = {consts.MEMBER_ID: self.sample_data.member1_id} + payload = {consts.MEMBER: provider_member.to_dict()} mock_cast.assert_called_with({}, 'create_member', **payload) @mock.patch('octavia.db.api.get_session') @@ -263,7 +263,7 @@ class TestAmphoraDriver(base.TestRpc): member_id=self.sample_data.member1_id, address="192.0.2.1") self.amp_driver.member_create(provider_member) - payload = {consts.MEMBER_ID: self.sample_data.member1_id} + payload = {consts.MEMBER: provider_member.to_dict()} mock_cast.assert_called_with({}, 'create_member', **payload) @mock.patch('octavia.db.api.get_session') @@ -293,7 +293,7 @@ class TestAmphoraDriver(base.TestRpc): provider_member = driver_dm.Member( member_id=self.sample_data.member1_id) self.amp_driver.member_delete(provider_member) - payload = {consts.MEMBER_ID: self.sample_data.member1_id} + payload = {consts.MEMBER: provider_member.to_dict()} mock_cast.assert_called_with({}, 'delete_member', **payload) @mock.patch('oslo_messaging.RPCClient.cast') @@ -302,9 +302,12 @@ class TestAmphoraDriver(base.TestRpc): member_id=self.sample_data.member1_id) provider_member = driver_dm.Member( member_id=self.sample_data.member1_id, admin_state_up=True) - member_dict = {'enabled': True} - self.amp_driver.member_update(old_provider_member, provider_member) - payload = {consts.MEMBER_ID: self.sample_data.member1_id, + member_dict = provider_member.to_dict() + member_dict.pop(consts.MEMBER_ID) + member_dict['enabled'] = member_dict.pop('admin_state_up') + self.amp_driver.member_update(old_provider_member, + provider_member) + payload = {consts.ORIGINAL_MEMBER: old_provider_member.to_dict(), consts.MEMBER_UPDATES: member_dict} mock_cast.assert_called_with({}, 'update_member', **payload) @@ -314,9 +317,11 @@ class TestAmphoraDriver(base.TestRpc): member_id=self.sample_data.member1_id) provider_member = driver_dm.Member( member_id=self.sample_data.member1_id, name='Great member') - member_dict = {'name': 'Great member'} - self.amp_driver.member_update(old_provider_member, provider_member) - payload = {consts.MEMBER_ID: self.sample_data.member1_id, + member_dict = provider_member.to_dict() + member_dict.pop(consts.MEMBER_ID) + self.amp_driver.member_update(old_provider_member, + provider_member) + payload = {consts.ORIGINAL_MEMBER: old_provider_member.to_dict(), consts.MEMBER_UPDATES: member_dict} mock_cast.assert_called_with({}, 'update_member', **payload) @@ -351,9 +356,10 @@ class TestAmphoraDriver(base.TestRpc): self.amp_driver.member_batch_update( self.sample_data.pool1_id, prov_members) - payload = {'old_member_ids': [self.sample_data.member1_id], - 'new_member_ids': [self.sample_data.member3_id], - 'updated_members': [update_mem_dict]} + payload = { + 'old_members': [self.sample_data.db_pool1_members[0].to_dict()], + 'new_members': [prov_new_member.to_dict()], + 'updated_members': [update_mem_dict]} mock_cast.assert_called_with({}, 'batch_update_members', **payload) @mock.patch('octavia.db.api.get_session') @@ -386,9 +392,10 @@ class TestAmphoraDriver(base.TestRpc): self.amp_driver.member_batch_update( self.sample_data.pool1_id, prov_members) - payload = {'old_member_ids': [self.sample_data.member1_id], - 'new_member_ids': [self.sample_data.member3_id], - 'updated_members': [update_mem_dict]} + payload = { + 'old_members': [self.sample_data.db_pool1_members[0].to_dict()], + 'new_members': [prov_new_member.to_dict()], + 'updated_members': [update_mem_dict]} mock_cast.assert_called_with({}, 'batch_update_members', **payload) @mock.patch('octavia.db.api.get_session') @@ -461,8 +468,9 @@ class TestAmphoraDriver(base.TestRpc): self.amp_driver.member_batch_update( self.sample_data.pool1_id, prov_members) - payload = {'old_member_ids': [self.sample_data.member1_id], - 'new_member_ids': [self.sample_data.member3_id], + payload = {'old_members': + [self.sample_data.db_pool1_members[0].to_dict()], + 'new_members': [prov_new_member.to_dict()], 'updated_members': [update_mem_dict]} mock_cast.assert_called_with({}, 'batch_update_members', **payload) diff --git a/octavia/tests/unit/controller/queue/v2/test_endpoints.py b/octavia/tests/unit/controller/queue/v2/test_endpoints.py index 2954e71f06..3c06def931 100644 --- a/octavia/tests/unit/controller/queue/v2/test_endpoints.py +++ b/octavia/tests/unit/controller/queue/v2/test_endpoints.py @@ -124,26 +124,29 @@ class TestEndpoints(base.TestCase): self.resource_id) def test_create_member(self): - self.ep.create_member(self.context, self.resource_id) + self.ep.create_member(self.context, self.resource) self.ep.worker.create_member.assert_called_once_with( - self.resource_id) + self.resource) def test_update_member(self): - self.ep.update_member(self.context, self.resource_id, + self.ep.update_member(self.context, self.resource, self.resource_updates) self.ep.worker.update_member.assert_called_once_with( - self.resource_id, self.resource_updates) + self.resource, self.resource_updates) def test_batch_update_members(self): self.ep.batch_update_members( - self.context, [9], [11], [self.resource_updates]) + self.context, [{constants.MEMBER_ID: 9}], + [{constants.MEMBER_ID: 11}], + [self.resource_updates]) self.ep.worker.batch_update_members.assert_called_once_with( - [9], [11], [self.resource_updates]) + [{constants.MEMBER_ID: 9}], [{constants.MEMBER_ID: 11}], + [self.resource_updates]) def test_delete_member(self): - self.ep.delete_member(self.context, self.resource_id) + self.ep.delete_member(self.context, self.resource) self.ep.worker.delete_member.assert_called_once_with( - self.resource_id) + self.resource) def test_create_l7policy(self): self.ep.create_l7policy(self.context, self.resource_id) diff --git a/octavia/tests/unit/controller/worker/v2/flows/test_member_flows.py b/octavia/tests/unit/controller/worker/v2/flows/test_member_flows.py index cb276514e5..13dba2f91a 100644 --- a/octavia/tests/unit/controller/worker/v2/flows/test_member_flows.py +++ b/octavia/tests/unit/controller/worker/v2/flows/test_member_flows.py @@ -56,8 +56,9 @@ class TestMemberFlows(base.TestCase): self.assertIn(constants.LISTENERS, member_flow.requires) self.assertIn(constants.LOADBALANCER_ID, member_flow.requires) self.assertIn(constants.POOL_ID, member_flow.requires) + self.assertIn(constants.PROJECT_ID, member_flow.requires) - self.assertEqual(5, len(member_flow.requires)) + self.assertEqual(6, len(member_flow.requires)) self.assertEqual(0, len(member_flow.provides)) def test_get_update_member_flow(self, mock_get_net_driver): diff --git a/octavia/tests/unit/controller/worker/v2/tasks/test_database_tasks.py b/octavia/tests/unit/controller/worker/v2/tasks/test_database_tasks.py index 4dba40e934..10224c3109 100644 --- a/octavia/tests/unit/controller/worker/v2/tasks/test_database_tasks.py +++ b/octavia/tests/unit/controller/worker/v2/tasks/test_database_tasks.py @@ -121,6 +121,11 @@ class TestDatabaseTasks(base.TestCase): self.db_pool_mock.id = POOL_ID self.db_pool_mock.health_monitor = self.health_mon_mock + self.member_mock = { + constants.MEMBER_ID: MEMBER_ID, + constants.POOL_ID: POOL_ID, + } + self.l7policy_mock = mock.MagicMock() self.l7policy_mock.id = L7POLICY_ID @@ -1279,16 +1284,16 @@ class TestDatabaseTasks(base.TestCase): mock_amphora_repo_update, mock_amphora_repo_delete): unused_pool = data_models.Pool(id='unused_pool') - members1 = [data_models.Member(id='member1'), - data_models.Member(id='member2')] + members1 = [{constants.MEMBER_ID: 'member1'}, + {constants.MEMBER_ID: 'member2'}] health_monitor = data_models.HealthMonitor(id='hm1') default_pool = data_models.Pool(id='default_pool', members=members1, health_monitor=health_monitor) listener1 = data_models.Listener(id='listener1', default_pool=default_pool) - members2 = [data_models.Member(id='member3'), - data_models.Member(id='member4')] + members2 = [{constants.MEMBER_ID: 'member3'}, + {constants.MEMBER_ID: 'member4'}] redirect_pool = data_models.Pool(id='redirect_pool', members=members2) l7rules = [data_models.L7Rule(id='rule1')] @@ -1323,16 +1328,6 @@ class TestDatabaseTasks(base.TestCase): provisioning_status=constants.ACTIVE), mock.call('TEST', redirect_pool.id, provisioning_status=constants.ACTIVE)]) - self.assertEqual(4, repo.MemberRepository.update.call_count) - repo.MemberRepository.update.has_calls( - [mock.call('TEST', members1[0].id, - provisioning_status=constants.ACTIVE), - mock.call('TEST', members1[1].id, - provisioning_status=constants.ACTIVE), - mock.call('TEST', members2[0].id, - provisioning_status=constants.ACTIVE), - mock.call('TEST', members2[1].id, - provisioning_status=constants.ACTIVE)]) self.assertEqual(1, repo.HealthMonitorRepository.update.call_count) repo.HealthMonitorRepository.update.has_calls( [mock.call('TEST', health_monitor.id, @@ -1371,16 +1366,6 @@ class TestDatabaseTasks(base.TestCase): provisioning_status=constants.ERROR), mock.call('TEST', redirect_pool.id, provisioning_status=constants.ERROR)]) - self.assertEqual(4, repo.MemberRepository.update.call_count) - repo.MemberRepository.update.has_calls( - [mock.call('TEST', members1[0].id, - provisioning_status=constants.ERROR), - mock.call('TEST', members1[1].id, - provisioning_status=constants.ERROR), - mock.call('TEST', members2[0].id, - provisioning_status=constants.ERROR), - mock.call('TEST', members2[1].id, - provisioning_status=constants.ERROR)]) self.assertEqual(1, repo.HealthMonitorRepository.update.call_count) repo.HealthMonitorRepository.update.has_calls( [mock.call('TEST', health_monitor.id, diff --git a/octavia/tests/unit/controller/worker/v2/tasks/test_database_tasks_quota.py b/octavia/tests/unit/controller/worker/v2/tasks/test_database_tasks_quota.py index 6039444ddd..1c2275f98f 100644 --- a/octavia/tests/unit/controller/worker/v2/tasks/test_database_tasks_quota.py +++ b/octavia/tests/unit/controller/worker/v2/tasks/test_database_tasks_quota.py @@ -175,9 +175,11 @@ class TestDatabaseTasksQuota(base.TestCase): self._test_decrement_quota(task, data_model, project_id=project_id) def test_decrement_member_quota(self): + project_id = uuidutils.generate_uuid() task = database_tasks.DecrementMemberQuota() data_model = data_models.Member - self._test_decrement_quota(task, data_model) + self._test_decrement_quota(task, data_model, + project_id=project_id) @mock.patch('octavia.db.repositories.Repositories.decrement_quota') @mock.patch('octavia.db.repositories.Repositories.check_quota_met') diff --git a/octavia/tests/unit/controller/worker/v2/tasks/test_lifecycle_tasks.py b/octavia/tests/unit/controller/worker/v2/tasks/test_lifecycle_tasks.py index 71dfe62720..fb4d7669bc 100644 --- a/octavia/tests/unit/controller/worker/v2/tasks/test_lifecycle_tasks.py +++ b/octavia/tests/unit/controller/worker/v2/tasks/test_lifecycle_tasks.py @@ -304,7 +304,8 @@ class TestLifecycleTasks(base.TestCase): member_to_error_on_revert = lifecycle_tasks.MemberToErrorOnRevertTask() # Execute - member_to_error_on_revert.execute(self.MEMBER, + member_to_error_on_revert.execute({constants.MEMBER_ID: + self.MEMBER_ID}, self.LISTENERS, self.LOADBALANCER, self.POOL_ID) @@ -312,7 +313,7 @@ class TestLifecycleTasks(base.TestCase): self.assertFalse(mock_member_prov_status_error.called) # Revert - member_to_error_on_revert.revert(self.MEMBER, + member_to_error_on_revert.revert({constants.MEMBER_ID: self.MEMBER_ID}, self.LISTENERS, self.LOADBALANCER, self.POOL_ID) @@ -344,7 +345,8 @@ class TestLifecycleTasks(base.TestCase): lifecycle_tasks.MembersToErrorOnRevertTask()) # Execute - members_to_error_on_revert.execute(self.MEMBERS, + members_to_error_on_revert.execute([{constants.MEMBER_ID: + self.MEMBER_ID}], self.LISTENERS, self.LOADBALANCER, self.POOL_ID) @@ -352,7 +354,8 @@ class TestLifecycleTasks(base.TestCase): self.assertFalse(mock_member_prov_status_error.called) # Revert - members_to_error_on_revert.revert(self.MEMBERS, + members_to_error_on_revert.revert([{constants.MEMBER_ID: + self.MEMBER_ID}], self.LISTENERS, self.LOADBALANCER, self.POOL_ID) diff --git a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py index 65458356c3..4777d8915d 100644 --- a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py @@ -53,6 +53,7 @@ _vip_mock = mock.MagicMock() _listener_mock = mock.MagicMock() _load_balancer_mock = mock.MagicMock() _load_balancer_mock.listeners = [_listener_mock] +_load_balancer_mock.project_id = PROJECT_ID _member_mock = mock.MagicMock() _pool_mock = {constants.POOL_ID: POOL_ID} _db_pool_mock = mock.MagicMock() @@ -740,13 +741,13 @@ class TestControllerWorker(base.TestCase): _flow_mock.reset_mock() mock_member_repo_get.side_effect = [None, _member_mock] - + _member = _member_mock.to_dict() cw = controller_worker.ControllerWorker() - cw.create_member(MEMBER_ID) + cw.create_member(_member) (base_taskflow.BaseTaskFlowEngine._taskflow_load. assert_called_once_with(_flow_mock, - store={constants.MEMBER: _member_mock, + store={constants.MEMBER: _member, constants.LISTENERS: [self.ref_listener_dict], constants.LOADBALANCER_ID: @@ -757,7 +758,6 @@ class TestControllerWorker(base.TestCase): POOL_ID})) _flow_mock.run.assert_called_once_with() - self.assertEqual(2, mock_member_repo_get.call_count) @mock.patch('octavia.controller.worker.v2.flows.' 'member_flows.MemberFlows.get_delete_member_flow', @@ -777,13 +777,13 @@ class TestControllerWorker(base.TestCase): mock_amp_repo_get): _flow_mock.reset_mock() - + _member = _member_mock.to_dict() cw = controller_worker.ControllerWorker() - cw.delete_member(MEMBER_ID) + cw.delete_member(_member) (base_taskflow.BaseTaskFlowEngine._taskflow_load. assert_called_once_with( - _flow_mock, store={constants.MEMBER: _member_mock, + _flow_mock, store={constants.MEMBER: _member, constants.LISTENERS: [self.ref_listener_dict], constants.LOADBALANCER_ID: @@ -791,7 +791,8 @@ class TestControllerWorker(base.TestCase): constants.LOADBALANCER: _load_balancer_mock, constants.POOL_ID: - POOL_ID})) + POOL_ID, + constants.PROJECT_ID: PROJECT_ID})) _flow_mock.run.assert_called_once_with() @@ -813,14 +814,15 @@ class TestControllerWorker(base.TestCase): mock_amp_repo_get): _flow_mock.reset_mock() - _member_mock.provisioning_status = constants.PENDING_UPDATE + _member = _member_mock.to_dict() + _member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE cw = controller_worker.ControllerWorker() - cw.update_member(MEMBER_ID, MEMBER_UPDATE_DICT) + cw.update_member(_member, MEMBER_UPDATE_DICT) (base_taskflow.BaseTaskFlowEngine._taskflow_load. assert_called_once_with(_flow_mock, - store={constants.MEMBER: _member_mock, + store={constants.MEMBER: _member, constants.LISTENERS: [self.ref_listener_dict], constants.LOADBALANCER: @@ -854,15 +856,18 @@ class TestControllerWorker(base.TestCase): _flow_mock.reset_mock() cw = controller_worker.ControllerWorker() - cw.batch_update_members([9], [11], [MEMBER_UPDATE_DICT]) - + cw.batch_update_members([{constants.MEMBER_ID: 9, + constants.POOL_ID: 'testtest'}], + [{constants.MEMBER_ID: 11}], + [MEMBER_UPDATE_DICT]) (base_taskflow.BaseTaskFlowEngine._taskflow_load. assert_called_once_with( _flow_mock, store={constants.LISTENERS: [self.ref_listener_dict], constants.LOADBALANCER_ID: _load_balancer_mock.id, constants.LOADBALANCER: _load_balancer_mock, - constants.POOL_ID: POOL_ID})) + constants.POOL_ID: POOL_ID, + constants.PROJECT_ID: PROJECT_ID})) _flow_mock.run.assert_called_once_with()