Optimize CloudKitty reprocessing process
Currently, when a reprocessing task is scheduled, CloudKitty executes the cleaning of the data for the reprocessing period in one hour fashion (the default period). Therefore, for each one of the timeframes, a delete query is sent to InfluxDB (when using it as a backend). However, InfluxDB is not a very optimized time series database for deletion; thus, this workflow generates quite some overhead and slowness when reprocessing. If we clean right away the whole time frame for the reprocessing task, and then we just reprocess it, it will execute a single delete query in InfluxDB, which has a similar cost as a delete to remove the data for a single time frame. This patch optimized the reprocessing workflow to execute batch cleaning of data in the storage backend of CloudKitty. Change-Id: I8282f44ad837c71df0cb6c73776eafc7014ebedf
This commit is contained in:
parent
60077a3cc4
commit
5c2f9e7f71
@ -517,13 +517,27 @@ class ReprocessingWorker(Worker):
|
||||
|
||||
end_of_this_processing = tzutils.local_to_utc(end_of_this_processing)
|
||||
|
||||
LOG.debug("Cleaning backend [%s] data for reprocessing scope [%s] "
|
||||
"for timeframe[start=%s, end=%s].",
|
||||
self._storage, self.scope, timestamp, end_of_this_processing)
|
||||
|
||||
self._storage.delete(
|
||||
begin=timestamp, end=end_of_this_processing,
|
||||
filters={self.scope_key: self._tenant_id})
|
||||
# If the start_reprocess_time of the reprocessing task equals to
|
||||
# the current reprocessing time, it means that we have just started
|
||||
# executing it. Therefore, we can clean/erase the old data in the
|
||||
# reprocessing task time frame.
|
||||
if tzutils.local_to_utc(self.scope.start_reprocess_time) == timestamp:
|
||||
LOG.info(
|
||||
"Cleaning backend [%s] data for reprocessing scope [%s] for "
|
||||
"timeframe[start=%s, end=%s].", self._storage, self.scope,
|
||||
self.scope.start_reprocess_time, self.scope.end_reprocess_time)
|
||||
self._storage.delete(
|
||||
begin=self.scope.start_reprocess_time,
|
||||
end=self.scope.end_reprocess_time,
|
||||
filters={self.scope_key: self._tenant_id})
|
||||
else:
|
||||
LOG.debug("No need to clean backend [%s] data for reprocessing "
|
||||
"scope [%s] for timeframe[start=%s, end=%s]. We are "
|
||||
"past the very first timestamp; therefore, the cleaning "
|
||||
"for the reprocessing task period has already been "
|
||||
"executed.", self._storage, self.scope,
|
||||
self.scope.start_reprocess_time,
|
||||
self.scope.end_reprocess_time)
|
||||
|
||||
LOG.debug("Executing the reprocessing of scope [%s] for "
|
||||
"timeframe[start=%s, end=%s].", self.scope, timestamp,
|
||||
|
@ -1035,15 +1035,16 @@ class ReprocessingWorkerTest(tests.TestCase):
|
||||
self, do_execute_scope_processing_mock_from_worker):
|
||||
|
||||
now_timestamp = tzutils.localized_now()
|
||||
self.reprocessing_worker.scope.start_reprocess_time = now_timestamp
|
||||
self.reprocessing_worker.do_execute_scope_processing(now_timestamp)
|
||||
|
||||
expected_end = tzutils.localized_now() + datetime.timedelta(
|
||||
seconds=self.reprocessing_worker._period)
|
||||
|
||||
self.storage_mock.delete.assert_has_calls([
|
||||
mock.call(begin=now_timestamp, end=expected_end,
|
||||
filters={self.reprocessing_worker.scope_key:
|
||||
self.reprocessing_worker._tenant_id})])
|
||||
mock.call(
|
||||
begin=self.reprocessing_worker.scope.start_reprocess_time,
|
||||
end=self.reprocessing_worker.scope.end_reprocess_time,
|
||||
filters={
|
||||
self.reprocessing_worker.scope_key:
|
||||
self.reprocessing_worker._tenant_id})])
|
||||
|
||||
do_execute_scope_processing_mock_from_worker.assert_has_calls([
|
||||
mock.call(now_timestamp)])
|
||||
|
@ -0,0 +1,5 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Optimized the reprocessing workflow to execute batch cleaning
|
||||
of data in the storage backend of CloudKitty.
|
Loading…
x
Reference in New Issue
Block a user