From 4048b1fd8eae065652105d19892071b0a4fa5533 Mon Sep 17 00:00:00 2001 From: pooja jadhav Date: Mon, 10 Jul 2017 18:19:07 +0530 Subject: [PATCH] Add db purge support Added db purge support to delete the soft-deleted records from tables. In case of tables 'hosts' and 'failover_segments' the decision to purge the record is made on the basis of 'deleted' and 'deleted_at' column's value. However in case of 'notifications' table it is decided on the basis of 'updated_at' column's value as notifications don't have 'delete' API. Operator can use the below command to purge the records: "masakari-manage db purge --age_in_days --max_rows " Implements: blueprint db-purge-support Co-author: Pooja Jadhav Change-Id: Id959b0e2b84d5d2587d6b70b8f695f69634a1d17 --- masakari/cmd/manage.py | 43 ++++- masakari/db/api.py | 10 ++ masakari/db/sqlalchemy/api.py | 81 +++++++++ masakari/hacking/checks.py | 12 ++ masakari/tests/unit/db/test_purge.py | 167 ++++++++++++++++++ masakari/tests/unit/test_masakari_manage.py | 75 ++++++++ .../db-purge-support-7a33e2ea5d2a624b.yaml | 11 ++ 7 files changed, 395 insertions(+), 4 deletions(-) create mode 100644 masakari/tests/unit/db/test_purge.py create mode 100644 masakari/tests/unit/test_masakari_manage.py create mode 100644 releasenotes/notes/db-purge-support-7a33e2ea5d2a624b.yaml diff --git a/masakari/cmd/manage.py b/masakari/cmd/manage.py index 39ef249b..c83bc766 100644 --- a/masakari/cmd/manage.py +++ b/masakari/cmd/manage.py @@ -21,16 +21,21 @@ import logging as python_logging import sys +import time from oslo_config import cfg from oslo_db.sqlalchemy import migration from oslo_log import log as logging +import six import masakari.conf +from masakari import context +from masakari import db from masakari.db import api as db_api from masakari.db.sqlalchemy import migration as db_migration from masakari import exception from masakari.i18n import _ +from masakari import utils from masakari import version @@ -76,6 +81,35 @@ class DbCommands(object): db_migration.MIGRATE_REPO_PATH, db_migration.INIT_VERSION)) + @args('--age_in_days', type=int, default=30, + help='Purge deleted rows older than age in days (default: ' + '%(default)d)') + @args('--max_rows', type=int, default=-1, + help='Limit number of records to delete (default: %(default)d)') + def purge(self, age_in_days, max_rows): + """Purge rows older than a given age from masakari tables.""" + try: + max_rows = utils.validate_integer( + max_rows, 'max_rows', -1, db.MAX_INT) + except exception.Invalid as exc: + sys.exit(six.text_type(exc)) + + try: + age_in_days = int(age_in_days) + except ValueError: + msg = 'Invalid value for age, %(age)s' % {'age': age_in_days} + sys.exit(six.text_type(msg)) + + if max_rows == 0: + sys.exit(_("Must supply value greater than 0 for max_rows.")) + if age_in_days < 0: + sys.exit(_("Must supply a non-negative value for age.")) + if age_in_days >= (int(time.time()) / 86400): + sys.exit(_("Maximal age is count of days since epoch.")) + ctx = context.get_admin_context() + + db_api.purge_deleted_rows(ctx, age_in_days, max_rows) + CATEGORIES = { 'db': DbCommands, @@ -115,10 +149,10 @@ def add_command_parsers(subparsers): parser.set_defaults(action_kwargs=action_kwargs) -CONF.register_cli_opt(cfg.SubCommandOpt('category', - title='Command categories', - help='Available categories', - handler=add_command_parsers)) +command_opt = cfg.SubCommandOpt('category', + title='Command categories', + help='Available categories', + handler=add_command_parsers) def get_arg_string(args): @@ -149,6 +183,7 @@ def fetch_func_args(func): def main(): """Parse options and call the appropriate class/method.""" + CONF.register_cli_opt(command_opt) script_name = sys.argv[0] if len(sys.argv) < 2: print(_("\nOpenStack masakari version: %(version)s\n") % diff --git a/masakari/db/api.py b/masakari/db/api.py index faf2d24b..b81fbd7c 100644 --- a/masakari/db/api.py +++ b/masakari/db/api.py @@ -361,3 +361,13 @@ def notification_delete(context, notification_uuid): 'notification_uuid' doesn't exist """ return IMPL.notification_delete(context, notification_uuid) + + +def purge_deleted_rows(context, age_in_days, max_rows): + """Purge the soft deleted rows. + + :param context: context to query under + :param age_in_days: Purge deleted rows older than age in days + :param max_rows: Limit number of records to delete + """ + return IMPL.purge_deleted_rows(context, age_in_days, max_rows) diff --git a/masakari/db/sqlalchemy/api.py b/masakari/db/sqlalchemy/api.py index 74abce2f..b871e826 100644 --- a/masakari/db/sqlalchemy/api.py +++ b/masakari/db/sqlalchemy/api.py @@ -14,14 +14,21 @@ # under the License. """Implementation of SQLAlchemy backend.""" +import datetime import sys from oslo_db import api as oslo_db_api from oslo_db import exception as db_exc from oslo_db.sqlalchemy import enginefacade from oslo_db.sqlalchemy import utils as sqlalchemyutils +from oslo_log import log as logging from oslo_utils import timeutils +from sqlalchemy import or_, and_ +from sqlalchemy.ext.compiler import compiles +from sqlalchemy import MetaData from sqlalchemy.orm import joinedload +from sqlalchemy import sql +import sqlalchemy.sql as sa_sql from sqlalchemy.sql import func import masakari.conf @@ -29,6 +36,7 @@ from masakari.db.sqlalchemy import models from masakari import exception from masakari.i18n import _ +LOG = logging.getLogger(__name__) CONF = masakari.conf.CONF @@ -617,3 +625,76 @@ def notification_delete(context, notification_uuid): if count == 0: raise exception.NotificationNotFound(id=notification_uuid) + + +class DeleteFromSelect(sa_sql.expression.UpdateBase): + def __init__(self, table, select, column): + self.table = table + self.select = select + self.column = column + + +# NOTE(pooja_jadhav): MySQL doesn't yet support subquery with +# 'LIMIT & IN/ALL/ANY/SOME' We need work around this with nesting select. +@compiles(DeleteFromSelect) +def visit_delete_from_select(element, compiler, **kw): + return "DELETE FROM %s WHERE %s in (SELECT T1.%s FROM (%s) as T1)" % ( + compiler.process(element.table, asfrom=True), + compiler.process(element.column), + element.column.name, + compiler.process(element.select)) + + +@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) +@main_context_manager.writer +def purge_deleted_rows(context, age_in_days, max_rows): + """Purges soft deleted rows + + Deleted rows get purged from hosts and segment tables based on + deleted_at column. As notifications table doesn't delete any of + the notification records so rows get purged from notifications + based on last updated_at and status column. + """ + engine = get_engine() + conn = engine.connect() + metadata = MetaData() + metadata.reflect(engine) + deleted_age = timeutils.utcnow() - datetime.timedelta(days=age_in_days) + total_rows_purged = 0 + for table in reversed(metadata.sorted_tables): + if 'deleted' not in table.columns.keys(): + continue + LOG.info('Purging deleted rows older than %(age_in_days)d day(s) ' + 'from table %(tbl)s', + {'age_in_days': age_in_days, 'tbl': table}) + column = table.c.id + updated_at_column = table.c.updated_at + deleted_at_column = table.c.deleted_at + + if table.name == 'notifications': + status_column = table.c.status + query_delete = sql.select([column]).where( + and_(updated_at_column < deleted_age, or_( + status_column == 'finished', status_column == 'failed', + status_column == 'ignored'))).order_by(status_column) + else: + query_delete = sql.select( + [column], deleted_at_column < deleted_age).order_by( + deleted_at_column) + + if max_rows > 0: + query_delete = query_delete.limit(max_rows - total_rows_purged) + + delete_statement = DeleteFromSelect(table, query_delete, column) + + result = conn.execute(delete_statement) + + rows = result.rowcount + LOG.info('Deleted %(rows)d row(s) from table %(tbl)s', + {'rows': rows, 'tbl': table}) + + total_rows_purged += rows + if max_rows > 0 and total_rows_purged == max_rows: + break + + LOG.info('Total deleted rows are %(rows)d', {'rows': total_rows_purged}) diff --git a/masakari/hacking/checks.py b/masakari/hacking/checks.py index 9617cd16..166cabd8 100644 --- a/masakari/hacking/checks.py +++ b/masakari/hacking/checks.py @@ -315,6 +315,18 @@ def check_config_option_in_central_place(logical_line, filename): if "masakari/conf/" in filename: return + # (pooja_jadhav) All config options (with exceptions that are clarified + # in the list below) were moved to the central place. List below is for + # all options that were impossible to move without doing a major impact + # on code. Add full path to a module or folder. + conf_exceptions = [ + # CLI opts are allowed to be outside of masakari/conf directory + 'masakari/cmd/manage.py', + ] + + if any(f in filename for f in conf_exceptions): + return + if cfg_opt_re.match(logical_line): yield(0, msg) diff --git a/masakari/tests/unit/db/test_purge.py b/masakari/tests/unit/db/test_purge.py new file mode 100644 index 00000000..03065cd2 --- /dev/null +++ b/masakari/tests/unit/db/test_purge.py @@ -0,0 +1,167 @@ +# Copyright 2017 NTT DATA +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Tests for db purge.""" + +import datetime +import uuid + +from oslo_db.sqlalchemy import utils as sqlalchemyutils +from oslo_utils import timeutils +from sqlalchemy.dialects import sqlite + +from masakari import context +from masakari import db +from masakari.db.sqlalchemy import api as db_api +from masakari import test + + +class PurgeDeletedTest(test.TestCase): + + def setUp(self): + super(PurgeDeletedTest, self).setUp() + self.context = context.get_admin_context() + self.engine = db_api.get_engine() + self.conn = self.engine.connect() + self.notifications = sqlalchemyutils.get_table( + self.engine, "notifications") + self.failover_segments = sqlalchemyutils.get_table( + self.engine, "failover_segments") + # The hosts table has a FK of segment_id + self.hosts = sqlalchemyutils.get_table( + self.engine, "hosts") + + # Add 6 rows to table + self.uuidstrs = [] + self.uuid_fs_segments = [] + self.uuid_hosts = [] + for record in range(6): + notification_uuid = uuid.uuid4().hex + fs_segment_uuid = uuid.uuid4().hex + host_uuid = uuid.uuid4().hex + ins_stmt = self.notifications.insert().values( + notification_uuid=notification_uuid, + generated_time=timeutils.utcnow(), + source_host_uuid=host_uuid, + type='demo', + status='failed') + self.uuidstrs.append(notification_uuid) + self.conn.execute(ins_stmt) + + ins_stmt = self.failover_segments.insert().values( + uuid=fs_segment_uuid, + name='test', + service_type='demo', + recovery_method='auto') + self.uuid_fs_segments.append(fs_segment_uuid) + self.conn.execute(ins_stmt) + + ins_stmt = self.hosts.insert().values( + uuid=host_uuid, + failover_segment_id=fs_segment_uuid, + name='host1', + type='demo', + control_attributes='test') + self.uuid_hosts.append(host_uuid) + self.conn.execute(ins_stmt) + + # Set 4 of them deleted, 2 are 60 days ago, 2 are 20 days ago + self.age_in_days_20 = timeutils.utcnow() - datetime.timedelta(days=20) + self.age_in_days_60 = timeutils.utcnow() - datetime.timedelta(days=60) + + make_notifications_old = self.notifications.update().where( + self.notifications.c.notification_uuid.in_( + self.uuidstrs[1:3])).values(updated_at=self.age_in_days_20) + make_notifications_older = self.notifications.update().where( + self.notifications.c.notification_uuid.in_( + self.uuidstrs[4:6])).values(updated_at=self.age_in_days_60) + make_failover_segments_old = self.failover_segments.update().where( + self.failover_segments.c.uuid.in_( + self.uuid_fs_segments[1:3])).values( + deleted_at=self.age_in_days_20) + make_failover_segments_older = self.failover_segments.update().where( + self.failover_segments.c.uuid.in_( + self.uuid_fs_segments[4:6])).values( + deleted_at=self.age_in_days_60) + make_hosts_old = self.hosts.update().where( + self.hosts.c.uuid.in_(self.uuid_hosts[1:3])).values( + deleted_at=self.age_in_days_20) + make_hosts_older = self.hosts.update().where( + self.hosts.c.uuid.in_(self.uuid_hosts[4:6])).values( + deleted_at=self.age_in_days_60) + + self.conn.execute(make_notifications_old) + self.conn.execute(make_notifications_older) + self.conn.execute(make_failover_segments_old) + self.conn.execute(make_failover_segments_older) + self.conn.execute(make_hosts_old) + self.conn.execute(make_hosts_older) + + dialect = self.engine.url.get_dialect() + if dialect == sqlite.dialect: + # We're seeing issues with foreign key support in SQLite 3.6.20 + # SQLAlchemy doesn't support it at all with < SQLite 3.6.19 + # It works fine in SQLite 3.7. + # Force foreign_key checking if running SQLite >= 3.7 + import sqlite3 + tup = sqlite3.sqlite_version_info + if tup[0] > 3 or (tup[0] == 3 and tup[1] >= 7): + self.conn.execute("PRAGMA foreign_keys = ON") + + def test_purge_deleted_rows_old(self): + # Purge at 30 days old, should only delete 2 rows + db.purge_deleted_rows(self.context, age_in_days=30, max_rows=10) + + notifications_rows = self.conn.execute( + self.notifications.count()).scalar() + failover_segments_rows = self.conn.execute( + self.failover_segments.count()).scalar() + hosts_rows = self.conn.execute(self.hosts.count()).scalar() + + # Verify that we only deleted 2 + self.assertEqual(4, notifications_rows) + self.assertEqual(4, failover_segments_rows) + self.assertEqual(4, hosts_rows) + + def test_purge_all_deleted_rows(self): + db.purge_deleted_rows(self.context, age_in_days=20, max_rows=-1) + + notifications_rows = self.conn.execute( + self.notifications.count()).scalar() + + failover_segments_rows = self.conn.execute( + self.failover_segments.count()).scalar() + + hosts_rows = self.conn.execute(self.hosts.count()).scalar() + + # Verify that we have purged all deleted rows + self.assertEqual(2, notifications_rows) + self.assertEqual(2, failover_segments_rows) + self.assertEqual(2, hosts_rows) + + def test_purge_maximum_rows_partial_deleted_records(self): + db.purge_deleted_rows(self.context, age_in_days=60, max_rows=3) + + notifications_rows = self.conn.execute( + self.notifications.count()).scalar() + + failover_segments_rows = self.conn.execute( + self.failover_segments.count()).scalar() + + hosts_rows = self.conn.execute(self.hosts.count()).scalar() + + # Verify that we have deleted 3 rows only + self.assertEqual(4, notifications_rows) + self.assertEqual(5, hosts_rows) + self.assertEqual(6, failover_segments_rows) diff --git a/masakari/tests/unit/test_masakari_manage.py b/masakari/tests/unit/test_masakari_manage.py new file mode 100644 index 00000000..eebcf4fc --- /dev/null +++ b/masakari/tests/unit/test_masakari_manage.py @@ -0,0 +1,75 @@ +# Copyright 2017 NTT DATA +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import sys + +from masakari.cmd import manage +from masakari import context +from masakari.db import api as db_api +from masakari import test + + +class DBCommandsTestCase(test.TestCase): + + def setUp(self): + super(DBCommandsTestCase, self).setUp() + self.commands = manage.DbCommands() + self.context = context.get_admin_context() + sys.argv = ['masakari-manage'] + + @mock.patch.object(db_api, 'purge_deleted_rows') + @mock.patch.object(context, 'get_admin_context') + def test_purge_command(self, mock_context, mock_db_purge): + mock_context.return_value = self.context + self.commands.purge(0, 100) + mock_db_purge.assert_called_once_with(self.context, 0, 100) + + def test_purge_negative_age_in_days(self): + ex = self.assertRaises(SystemExit, self.commands.purge, -1, 100) + self.assertEqual("Must supply a non-negative value for age.", ex.code) + + def test_purge_invalid_age_in_days(self): + ex = self.assertRaises(SystemExit, self.commands.purge, "test", 100) + self.assertEqual("Invalid value for age, test", ex.code) + + def test_purge_command_exceeded_age_in_days(self): + ex = self.assertRaises(SystemExit, self.commands.purge, 1000000, 50) + self.assertEqual("Maximal age is count of days since epoch.", ex.code) + + def test_purge_invalid_max_rows(self): + ex = self.assertRaises(SystemExit, self.commands.purge, 0, 0) + self.assertEqual("Must supply value greater than 0 for max_rows.", + ex.code) + + def test_purge_negative_max_rows(self): + ex = self.assertRaises(SystemExit, self.commands.purge, 0, -5) + self.assertEqual("Invalid input received: max_rows must be >= -1", + ex.code) + + @mock.patch.object(db_api, 'purge_deleted_rows') + @mock.patch.object(context, 'get_admin_context') + def test_purge_max_rows(self, mock_context, mock_db_purge): + mock_context.return_value = self.context + value = (2 ** 31) - 1 + self.commands.purge(age_in_days=1, max_rows=value) + mock_db_purge.assert_called_once_with(self.context, 1, value) + + def test_purge_command_exceeded_maximum_rows(self): + # value(2 ** 31) is greater than max_rows(2147483647) by 1. + value = 2 ** 31 + ex = self.assertRaises(SystemExit, self.commands.purge, age_in_days=1, + max_rows=value) + expected = "Invalid input received: max_rows must be <= 2147483647" + self.assertEqual(expected, ex.code) diff --git a/releasenotes/notes/db-purge-support-7a33e2ea5d2a624b.yaml b/releasenotes/notes/db-purge-support-7a33e2ea5d2a624b.yaml new file mode 100644 index 00000000..cb55c284 --- /dev/null +++ b/releasenotes/notes/db-purge-support-7a33e2ea5d2a624b.yaml @@ -0,0 +1,11 @@ +--- +features: + - | + Operators can now purge the soft-deleted records from the database tables. + Added below command to purge the records: + + ``masakari-manage db purge --age_in_days --max_rows `` + + NOTE: ``notifications`` db records will be purged on the basis of ``update_at`` + and ``status`` fields (finished, ignored, failed) as these records will not be + automatically soft-deleted by the system.