diff --git a/cloudkitty/orchestrator.py b/cloudkitty/orchestrator.py index 6b2e4328..52d293dd 100644 --- a/cloudkitty/orchestrator.py +++ b/cloudkitty/orchestrator.py @@ -517,13 +517,27 @@ class ReprocessingWorker(Worker): end_of_this_processing = tzutils.local_to_utc(end_of_this_processing) - LOG.debug("Cleaning backend [%s] data for reprocessing scope [%s] " - "for timeframe[start=%s, end=%s].", - self._storage, self.scope, timestamp, end_of_this_processing) - - self._storage.delete( - begin=timestamp, end=end_of_this_processing, - filters={self.scope_key: self._tenant_id}) + # If the start_reprocess_time of the reprocessing task equals to + # the current reprocessing time, it means that we have just started + # executing it. Therefore, we can clean/erase the old data in the + # reprocessing task time frame. + if tzutils.local_to_utc(self.scope.start_reprocess_time) == timestamp: + LOG.info( + "Cleaning backend [%s] data for reprocessing scope [%s] for " + "timeframe[start=%s, end=%s].", self._storage, self.scope, + self.scope.start_reprocess_time, self.scope.end_reprocess_time) + self._storage.delete( + begin=self.scope.start_reprocess_time, + end=self.scope.end_reprocess_time, + filters={self.scope_key: self._tenant_id}) + else: + LOG.debug("No need to clean backend [%s] data for reprocessing " + "scope [%s] for timeframe[start=%s, end=%s]. We are " + "past the very first timestamp; therefore, the cleaning " + "for the reprocessing task period has already been " + "executed.", self._storage, self.scope, + self.scope.start_reprocess_time, + self.scope.end_reprocess_time) LOG.debug("Executing the reprocessing of scope [%s] for " "timeframe[start=%s, end=%s].", self.scope, timestamp, diff --git a/cloudkitty/tests/test_orchestrator.py b/cloudkitty/tests/test_orchestrator.py index ba46c06c..e4453a26 100644 --- a/cloudkitty/tests/test_orchestrator.py +++ b/cloudkitty/tests/test_orchestrator.py @@ -1034,15 +1034,16 @@ class ReprocessingWorkerTest(tests.TestCase): self, do_execute_scope_processing_mock_from_worker): now_timestamp = tzutils.localized_now() + self.reprocessing_worker.scope.start_reprocess_time = now_timestamp self.reprocessing_worker.do_execute_scope_processing(now_timestamp) - expected_end = tzutils.localized_now() + datetime.timedelta( - seconds=self.reprocessing_worker._period) - self.storage_mock.delete.assert_has_calls([ - mock.call(begin=now_timestamp, end=expected_end, - filters={self.reprocessing_worker.scope_key: - self.reprocessing_worker._tenant_id})]) + mock.call( + begin=self.reprocessing_worker.scope.start_reprocess_time, + end=self.reprocessing_worker.scope.end_reprocess_time, + filters={ + self.reprocessing_worker.scope_key: + self.reprocessing_worker._tenant_id})]) do_execute_scope_processing_mock_from_worker.assert_has_calls([ mock.call(now_timestamp)]) diff --git a/releasenotes/notes/batch-delete-reprocessing-d46df15b078a42a5.yaml b/releasenotes/notes/batch-delete-reprocessing-d46df15b078a42a5.yaml new file mode 100644 index 00000000..76cf01bb --- /dev/null +++ b/releasenotes/notes/batch-delete-reprocessing-d46df15b078a42a5.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Optimized the reprocessing workflow to execute batch cleaning + of data in the storage backend of CloudKitty.