From 5c2f9e7f71b5d96c807f691556a5e6c5b754f492 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Weing=C3=A4rtner?= Date: Wed, 12 Jul 2023 19:27:04 -0300 Subject: [PATCH] Optimize CloudKitty reprocessing process Currently, when a reprocessing task is scheduled, CloudKitty executes the cleaning of the data for the reprocessing period in one hour fashion (the default period). Therefore, for each one of the timeframes, a delete query is sent to InfluxDB (when using it as a backend). However, InfluxDB is not a very optimized time series database for deletion; thus, this workflow generates quite some overhead and slowness when reprocessing. If we clean right away the whole time frame for the reprocessing task, and then we just reprocess it, it will execute a single delete query in InfluxDB, which has a similar cost as a delete to remove the data for a single time frame. This patch optimized the reprocessing workflow to execute batch cleaning of data in the storage backend of CloudKitty. Change-Id: I8282f44ad837c71df0cb6c73776eafc7014ebedf --- cloudkitty/orchestrator.py | 28 ++++++++++++++----- cloudkitty/tests/test_orchestrator.py | 13 +++++---- ...-delete-reprocessing-d46df15b078a42a5.yaml | 5 ++++ 3 files changed, 33 insertions(+), 13 deletions(-) create mode 100644 releasenotes/notes/batch-delete-reprocessing-d46df15b078a42a5.yaml diff --git a/cloudkitty/orchestrator.py b/cloudkitty/orchestrator.py index 87391411..92245e21 100644 --- a/cloudkitty/orchestrator.py +++ b/cloudkitty/orchestrator.py @@ -517,13 +517,27 @@ class ReprocessingWorker(Worker): end_of_this_processing = tzutils.local_to_utc(end_of_this_processing) - LOG.debug("Cleaning backend [%s] data for reprocessing scope [%s] " - "for timeframe[start=%s, end=%s].", - self._storage, self.scope, timestamp, end_of_this_processing) - - self._storage.delete( - begin=timestamp, end=end_of_this_processing, - filters={self.scope_key: self._tenant_id}) + # If the start_reprocess_time of the reprocessing task equals to + # the current reprocessing time, it means that we have just started + # executing it. Therefore, we can clean/erase the old data in the + # reprocessing task time frame. + if tzutils.local_to_utc(self.scope.start_reprocess_time) == timestamp: + LOG.info( + "Cleaning backend [%s] data for reprocessing scope [%s] for " + "timeframe[start=%s, end=%s].", self._storage, self.scope, + self.scope.start_reprocess_time, self.scope.end_reprocess_time) + self._storage.delete( + begin=self.scope.start_reprocess_time, + end=self.scope.end_reprocess_time, + filters={self.scope_key: self._tenant_id}) + else: + LOG.debug("No need to clean backend [%s] data for reprocessing " + "scope [%s] for timeframe[start=%s, end=%s]. We are " + "past the very first timestamp; therefore, the cleaning " + "for the reprocessing task period has already been " + "executed.", self._storage, self.scope, + self.scope.start_reprocess_time, + self.scope.end_reprocess_time) LOG.debug("Executing the reprocessing of scope [%s] for " "timeframe[start=%s, end=%s].", self.scope, timestamp, diff --git a/cloudkitty/tests/test_orchestrator.py b/cloudkitty/tests/test_orchestrator.py index 6e28024c..0afde85c 100644 --- a/cloudkitty/tests/test_orchestrator.py +++ b/cloudkitty/tests/test_orchestrator.py @@ -1035,15 +1035,16 @@ class ReprocessingWorkerTest(tests.TestCase): self, do_execute_scope_processing_mock_from_worker): now_timestamp = tzutils.localized_now() + self.reprocessing_worker.scope.start_reprocess_time = now_timestamp self.reprocessing_worker.do_execute_scope_processing(now_timestamp) - expected_end = tzutils.localized_now() + datetime.timedelta( - seconds=self.reprocessing_worker._period) - self.storage_mock.delete.assert_has_calls([ - mock.call(begin=now_timestamp, end=expected_end, - filters={self.reprocessing_worker.scope_key: - self.reprocessing_worker._tenant_id})]) + mock.call( + begin=self.reprocessing_worker.scope.start_reprocess_time, + end=self.reprocessing_worker.scope.end_reprocess_time, + filters={ + self.reprocessing_worker.scope_key: + self.reprocessing_worker._tenant_id})]) do_execute_scope_processing_mock_from_worker.assert_has_calls([ mock.call(now_timestamp)]) diff --git a/releasenotes/notes/batch-delete-reprocessing-d46df15b078a42a5.yaml b/releasenotes/notes/batch-delete-reprocessing-d46df15b078a42a5.yaml new file mode 100644 index 00000000..76cf01bb --- /dev/null +++ b/releasenotes/notes/batch-delete-reprocessing-d46df15b078a42a5.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Optimized the reprocessing workflow to execute batch cleaning + of data in the storage backend of CloudKitty.