diff --git a/panko/cmd/storage.py b/panko/cmd/storage.py index 99175dc4..1ff81fbb 100644 --- a/panko/cmd/storage.py +++ b/panko/cmd/storage.py @@ -34,7 +34,20 @@ def expirer(): if conf.database.event_time_to_live > 0: LOG.debug("Clearing expired event data") conn = storage.get_connection_from_config(conf) - conn.clear_expired_data(conf.database.event_time_to_live) + max_count = conf.database.events_delete_batch_size + try: + if max_count > 0: + conn.clear_expired_data(conf.database.event_time_to_live, + max_count) + else: + deleted = max_count = 100 + while deleted and deleted > 0: + deleted = conn.clear_expired_data( + conf.database.event_time_to_live, + max_count) + except TypeError: + LOG.warning("Storage driver does not support " + "'events_delete_batch_size' config option.") else: LOG.info("Nothing to clean, database event time to live " "is disabled") diff --git a/panko/storage/__init__.py b/panko/storage/__init__.py index b4219792..ddeaa924 100644 --- a/panko/storage/__init__.py +++ b/panko/storage/__init__.py @@ -32,6 +32,11 @@ OPTS = [ default=-1, help=("Number of seconds that events are kept " "in the database for (<= 0 means forever).")), + cfg.IntOpt('events_delete_batch_size', + default=0, + min=0, + help=("Number of events to be deleted in one iteration " + "from the database for (0 means all).")), cfg.StrOpt('event_connection', secret=True, deprecated_for_removal=True, diff --git a/panko/storage/base.py b/panko/storage/base.py index c562c801..0d58d880 100644 --- a/panko/storage/base.py +++ b/panko/storage/base.py @@ -122,10 +122,11 @@ class Connection(object): return cls.STORAGE_CAPABILITIES @staticmethod - def clear_expired_data(ttl): + def clear_expired_data(ttl, max_count=None): """Clear expired data from the backend storage system. Clearing occurs according to the time-to-live. :param ttl: Number of seconds to keep records for. + :param max_count: Number of records to delete. """ raise panko.NotImplementedError('Clearing events not implemented') diff --git a/panko/storage/impl_log.py b/panko/storage/impl_log.py index 5513f2ba..4b2ddb11 100644 --- a/panko/storage/impl_log.py +++ b/panko/storage/impl_log.py @@ -22,11 +22,12 @@ class Connection(base.Connection): """Log event data.""" @staticmethod - def clear_expired_data(ttl): + def clear_expired_data(ttl, max_count): """Clear expired data from the backend storage system. Clearing occurs according to the time-to-live. :param ttl: Number of seconds to keep records for. + :param max_count: Number of records to delete. """ - LOG.info("Dropping event data with TTL %d", ttl) + LOG.info("Dropping %d events data with TTL %d", max_count, ttl) diff --git a/panko/storage/impl_mongodb.py b/panko/storage/impl_mongodb.py index ff5eb0c9..8484b350 100644 --- a/panko/storage/impl_mongodb.py +++ b/panko/storage/impl_mongodb.py @@ -96,12 +96,13 @@ class Connection(pymongo_base.Connection): # Connection will be reopened automatically if needed self.conn.close() - def clear_expired_data(self, ttl): + def clear_expired_data(self, ttl, max_count=None): """Clear expired data from the backend storage system. Clearing occurs according to the time-to-live. :param ttl: Number of seconds to keep records for. + :param max_count: Number of records to delete (not used for MongoDB). """ self.update_ttl(ttl, 'event_ttl', 'timestamp', self.db.event) LOG.info("Clearing expired event data is based on native " diff --git a/panko/storage/impl_sqlalchemy.py b/panko/storage/impl_sqlalchemy.py index 4ad05444..7013fdb0 100644 --- a/panko/storage/impl_sqlalchemy.py +++ b/panko/storage/impl_sqlalchemy.py @@ -462,12 +462,13 @@ class Connection(base.Connection): dtype=dtype, value=v) - def clear_expired_data(self, ttl): + def clear_expired_data(self, ttl, max_count): """Clear expired data from the backend storage system. Clearing occurs according to the time-to-live. :param ttl: Number of seconds to keep records for. + :param max_count: Number of records to delete. """ session = self._engine_facade.get_session() with session.begin(): @@ -475,17 +476,23 @@ class Connection(base.Connection): event_q = (session.query(models.Event.id) .filter(models.Event.generated < end)) - event_subq = event_q.subquery() + # NOTE(e0ne): it's not an optiomal from the performance point of + # view but it works with all databases. + ids = [i[0] for i in event_q.limit(max_count)] for trait_model in [models.TraitText, models.TraitInt, models.TraitFloat, models.TraitDatetime]: - (session.query(trait_model) - .filter(trait_model.event_id.in_(event_subq)) - .delete(synchronize_session="fetch")) - event_rows = event_q.delete() + session.query(trait_model).filter( + trait_model.event_id.in_(ids) + ).delete(synchronize_session="fetch") + event_rows = session.query(models.Event).filter( + models.Event.id.in_(ids) + ).delete(synchronize_session="fetch") # remove EventType and TraitType with no corresponding - # matching events and traits + # matching events (session.query(models.EventType) .filter(~models.EventType.events.any()) .delete(synchronize_session="fetch")) LOG.info("%d events are removed from database", event_rows) + + return event_rows diff --git a/panko/tests/functional/storage/test_impl_mongodb.py b/panko/tests/functional/storage/test_impl_mongodb.py index fc98ccc3..c2cda68e 100644 --- a/panko/tests/functional/storage/test_impl_mongodb.py +++ b/panko/tests/functional/storage/test_impl_mongodb.py @@ -31,22 +31,22 @@ class IndexTest(tests_db.TestBase): def test_event_ttl_index_absent(self): # create a fake index and check it is deleted - self.conn.clear_expired_data(-1) + self.conn.clear_expired_data(-1, 0) self.assertNotIn("event_ttl", self.conn.db.event.index_information()) - self.conn.clear_expired_data(456789) + self.conn.clear_expired_data(456789, 0) self.assertEqual(456789, self.conn.db.event.index_information() ["event_ttl"]['expireAfterSeconds']) def test_event_ttl_index_present(self): - self.conn.clear_expired_data(456789) + self.conn.clear_expired_data(456789, 0) self.assertEqual(456789, self.conn.db.event.index_information() ["event_ttl"]['expireAfterSeconds']) - self.conn.clear_expired_data(-1) + self.conn.clear_expired_data(-1, 0) self.assertNotIn("event_ttl", self.conn.db.event.index_information()) diff --git a/panko/tests/functional/storage/test_storage_scenarios.py b/panko/tests/functional/storage/test_storage_scenarios.py index 71b60600..4252f15f 100644 --- a/panko/tests/functional/storage/test_storage_scenarios.py +++ b/panko/tests/functional/storage/test_storage_scenarios.py @@ -68,7 +68,7 @@ class EventTTLTest(EventTestBase): @mock.patch.object(timeutils, 'utcnow') def test_clear_expired_data(self, mock_utcnow): mock_utcnow.return_value = datetime.datetime(2013, 12, 31, 10, 0) - self.conn.clear_expired_data(3600) + self.conn.clear_expired_data(3600, 100) events = list(self.conn.get_events(storage.EventFilter())) self.assertEqual(2, len(events)) diff --git a/panko/tests/functional/test_bin.py b/panko/tests/functional/test_bin.py index 4973588e..a6927cd8 100644 --- a/panko/tests/functional/test_bin.py +++ b/panko/tests/functional/test_bin.py @@ -66,7 +66,7 @@ class BinTestCase(base.BaseTestCase): stdout=subprocess.PIPE) out, __ = subp.communicate() self.assertEqual(0, subp.poll()) - msg = "Dropping %s data with TTL 1" % data_name + msg = "Dropping 100 %ss data with TTL 1" % data_name if six.PY3: msg = msg.encode('utf-8') self.assertIn(msg, out) diff --git a/releasenotes/notes/victoria-support-batch-delete-events-4c63a758bdda93d1.yaml b/releasenotes/notes/victoria-support-batch-delete-events-4c63a758bdda93d1.yaml new file mode 100644 index 00000000..e53b741b --- /dev/null +++ b/releasenotes/notes/victoria-support-batch-delete-events-4c63a758bdda93d1.yaml @@ -0,0 +1,12 @@ +--- +features: + - | + A new ``events_delete_batch_size`` config option is introduced to specify + a number of events to be deleted in one iteration from the database. It will + help when thare're a lot of events in the database and panko-expire + consumes a lot of memory to delete all records with a single + call. +fixes: + | + Fixed the issue that panko-expire is consuming too much memory during + events cleaning up.