From 15fbf9fe7cf33ed4b56569078400a2ba070b6bce Mon Sep 17 00:00:00 2001 From: paul luse Date: Thu, 11 Sep 2014 06:55:45 -0700 Subject: [PATCH] Add container_count to policy_stat table Start tracking the container count per policy including reporting it in account HEAD and supporting installations where the DB existed before the updated schema. Migration is triggered by the account audtior; if the database is un-migrated it will continue to report policy_stats without the per policy container_count keys. Closes-Bug: #1367514 Change-Id: I07331cea177e19b3df303609a4ac510765a19162 --- swift/account/auditor.py | 35 ++- swift/account/backend.py | 115 ++++++++-- swift/common/exceptions.py | 4 + test/unit/account/test_auditor.py | 135 ++++++++++- test/unit/account/test_backend.py | 364 +++++++++++++++++++++++++++++- test/unit/account/test_server.py | 6 + test/unit/account/test_utils.py | 61 +++++ 7 files changed, 691 insertions(+), 29 deletions(-) diff --git a/swift/account/auditor.py b/swift/account/auditor.py index af38ed3bde..261acf7e92 100644 --- a/swift/account/auditor.py +++ b/swift/account/auditor.py @@ -20,6 +20,7 @@ from random import random import swift.common.db from swift.account.backend import AccountBroker, DATADIR +from swift.common.exceptions import InvalidAccountInfo from swift.common.utils import get_logger, audit_location_generator, \ config_true_value, dump_recon_cache, ratelimit_sleep from swift.common.daemon import Daemon @@ -30,9 +31,9 @@ from eventlet import Timeout class AccountAuditor(Daemon): """Audit accounts.""" - def __init__(self, conf): + def __init__(self, conf, logger=None): self.conf = conf - self.logger = get_logger(conf, log_route='account-auditor') + self.logger = logger or get_logger(conf, log_route='account-auditor') self.devices = conf.get('devices', '/srv/node') self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.interval = int(conf.get('interval', 1800)) @@ -104,6 +105,29 @@ class AccountAuditor(Daemon): dump_recon_cache({'account_auditor_pass_completed': elapsed}, self.rcache, self.logger) + def validate_per_policy_counts(self, broker): + info = broker.get_info() + policy_stats = broker.get_policy_stats(do_migrations=True) + policy_totals = { + 'container_count': 0, + 'object_count': 0, + 'bytes_used': 0, + } + for policy_stat in policy_stats.values(): + for key in policy_totals: + policy_totals[key] += policy_stat[key] + + for key in policy_totals: + if policy_totals[key] == info[key]: + continue + raise InvalidAccountInfo(_( + 'The total %(key)s for the container (%(total)s) does not ' + 'match the sum of %(key)s across policies (%(sum)s)') % { + 'key': key, + 'total': info[key], + 'sum': policy_totals[key], + }) + def account_audit(self, path): """ Audits the given account path @@ -114,10 +138,15 @@ class AccountAuditor(Daemon): try: broker = AccountBroker(path) if not broker.is_deleted(): - broker.get_info() + self.validate_per_policy_counts(broker) self.logger.increment('passes') self.account_passes += 1 self.logger.debug('Audit passed for %s' % broker) + except InvalidAccountInfo as e: + self.logger.increment('failures') + self.account_failures += 1 + self.logger.error( + _('Audit Failed for %s: %s'), path, str(e)) except (Exception, Timeout): self.logger.increment('failures') self.account_failures += 1 diff --git a/swift/account/backend.py b/swift/account/backend.py index 1ad37c22c1..89c6cfb65e 100644 --- a/swift/account/backend.py +++ b/swift/account/backend.py @@ -32,17 +32,19 @@ POLICY_STAT_TRIGGER_SCRIPT = """ CREATE TRIGGER container_insert_ps AFTER INSERT ON container BEGIN INSERT OR IGNORE INTO policy_stat - (storage_policy_index, object_count, bytes_used) - VALUES (new.storage_policy_index, 0, 0); + (storage_policy_index, container_count, object_count, bytes_used) + VALUES (new.storage_policy_index, 0, 0, 0); UPDATE policy_stat - SET object_count = object_count + new.object_count, + SET container_count = container_count + (1 - new.deleted), + object_count = object_count + new.object_count, bytes_used = bytes_used + new.bytes_used WHERE storage_policy_index = new.storage_policy_index; END; CREATE TRIGGER container_delete_ps AFTER DELETE ON container BEGIN UPDATE policy_stat - SET object_count = object_count - old.object_count, + SET container_count = container_count - (1 - old.deleted), + object_count = object_count - old.object_count, bytes_used = bytes_used - old.bytes_used WHERE storage_policy_index = old.storage_policy_index; END; @@ -165,13 +167,15 @@ class AccountBroker(DatabaseBroker): conn.executescript(""" CREATE TABLE policy_stat ( storage_policy_index INTEGER PRIMARY KEY, + container_count INTEGER DEFAULT 0, object_count INTEGER DEFAULT 0, bytes_used INTEGER DEFAULT 0 ); INSERT OR IGNORE INTO policy_stat ( - storage_policy_index, object_count, bytes_used + storage_policy_index, container_count, object_count, + bytes_used ) - SELECT 0, object_count, bytes_used + SELECT 0, container_count, object_count, bytes_used FROM account_stat WHERE container_count > 0; """) @@ -296,24 +300,45 @@ class AccountBroker(DatabaseBroker): return row['status'] == "DELETED" or ( row['delete_timestamp'] > row['put_timestamp']) - def get_policy_stats(self): + def get_policy_stats(self, do_migrations=False): """ Get global policy stats for the account. + :param do_migrations: boolean, if True the policy stat dicts will + always include the 'container_count' key; + otherwise it may be ommited on legacy databases + until they are migrated. + :returns: dict of policy stats where the key is the policy index and the value is a dictionary like {'object_count': M, - 'bytes_used': N} + 'bytes_used': N, 'container_count': L} """ - info = [] + columns = [ + 'storage_policy_index', + 'container_count', + 'object_count', + 'bytes_used', + ] + + def run_query(): + return (conn.execute(''' + SELECT %s + FROM policy_stat + ''' % ', '.join(columns)).fetchall()) + self._commit_puts_stale_ok() + info = [] with self.get() as conn: try: - info = (conn.execute(''' - SELECT storage_policy_index, object_count, bytes_used - FROM policy_stat - ''').fetchall()) + info = run_query() except sqlite3.OperationalError as err: - if "no such table: policy_stat" not in str(err): + if "no such column: container_count" in str(err): + if do_migrations: + self._migrate_add_container_count(conn) + else: + columns.remove('container_count') + info = run_query() + elif "no such table: policy_stat" not in str(err): raise policy_stats = {} @@ -501,10 +526,72 @@ class AccountBroker(DatabaseBroker): self._migrate_add_storage_policy_index(conn) _really_merge_items(conn) + def _migrate_add_container_count(self, conn): + """ + Add the container_count column to the 'policy_stat' table and + update it + + :param conn: DB connection object + """ + # add the container_count column + curs = conn.cursor() + curs.executescript(''' + DROP TRIGGER container_delete_ps; + DROP TRIGGER container_insert_ps; + ALTER TABLE policy_stat + ADD COLUMN container_count INTEGER DEFAULT 0; + ''' + POLICY_STAT_TRIGGER_SCRIPT) + + # keep the simple case simple, if there's only one entry in the + # policy_stat table we just copy the total container count from the + # account_stat table + + # if that triggers an update then the where changes <> 0 *would* exist + # and the insert or replace from the count subqueries won't execute + + curs.executescript(""" + UPDATE policy_stat + SET container_count = ( + SELECT container_count + FROM account_stat) + WHERE ( + SELECT COUNT(storage_policy_index) + FROM policy_stat + ) <= 1; + + INSERT OR REPLACE INTO policy_stat ( + storage_policy_index, + container_count, + object_count, + bytes_used + ) + SELECT p.storage_policy_index, + c.count, + p.object_count, + p.bytes_used + FROM ( + SELECT storage_policy_index, + COUNT(*) as count + FROM container + WHERE deleted = 0 + GROUP BY storage_policy_index + ) c + JOIN policy_stat p + ON p.storage_policy_index = c.storage_policy_index + WHERE NOT EXISTS( + SELECT changes() as change + FROM policy_stat + WHERE change <> 0 + ); + """) + conn.commit() + def _migrate_add_storage_policy_index(self, conn): """ Add the storage_policy_index column to the 'container' table and set up triggers, creating the policy_stat table if needed. + + :param conn: DB connection object """ try: self.create_policy_stat_table(conn) diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index e7999bab97..c4506ad9e3 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -79,6 +79,10 @@ class DeviceUnavailable(SwiftException): pass +class InvalidAccountInfo(SwiftException): + pass + + class PathNotDir(OSError): pass diff --git a/test/unit/account/test_auditor.py b/test/unit/account/test_auditor.py index 499b44155d..c79209bc09 100644 --- a/test/unit/account/test_auditor.py +++ b/test/unit/account/test_auditor.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import defaultdict +import itertools import unittest import mock import time @@ -23,7 +25,11 @@ from shutil import rmtree from eventlet import Timeout from swift.account import auditor -from test.unit import FakeLogger +from swift.common.storage_policy import POLICIES +from swift.common.utils import Timestamp +from test.unit import debug_logger, patch_policies, with_tempdir +from test.unit.account.test_backend import ( + AccountBrokerPreTrackContainerCountSetup) class FakeAccountBroker(object): @@ -37,16 +43,22 @@ class FakeAccountBroker(object): def get_info(self): if self.file.startswith('fail'): - raise ValueError + raise ValueError() if self.file.startswith('true'): - return 'ok' + return defaultdict(int) + + def get_policy_stats(self, **kwargs): + if self.file.startswith('fail'): + raise ValueError() + if self.file.startswith('true'): + return defaultdict(int) class TestAuditor(unittest.TestCase): def setUp(self): self.testdir = os.path.join(mkdtemp(), 'tmp_test_account_auditor') - self.logger = FakeLogger() + self.logger = debug_logger() rmtree(self.testdir, ignore_errors=1) os.mkdir(self.testdir) fnames = ['true1.db', 'true2.db', 'true3.db', @@ -69,9 +81,7 @@ class TestAuditor(unittest.TestCase): def sleep(self, sec): self.times += 1 - if self.times < sleep_times: - time.sleep(0.1) - else: + if self.times >= sleep_times: # stop forever by an error raise ValueError() @@ -79,7 +89,7 @@ class TestAuditor(unittest.TestCase): return time.time() conf = {} - test_auditor = auditor.AccountAuditor(conf) + test_auditor = auditor.AccountAuditor(conf, logger=self.logger) with mock.patch('swift.account.auditor.time', FakeTime()): def fake_audit_location_generator(*args, **kwargs): @@ -106,7 +116,7 @@ class TestAuditor(unittest.TestCase): @mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker) def test_run_once(self): conf = {} - test_auditor = auditor.AccountAuditor(conf) + test_auditor = auditor.AccountAuditor(conf, logger=self.logger) def fake_audit_location_generator(*args, **kwargs): files = os.listdir(self.testdir) @@ -121,7 +131,7 @@ class TestAuditor(unittest.TestCase): @mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker) def test_one_audit_pass(self): conf = {} - test_auditor = auditor.AccountAuditor(conf) + test_auditor = auditor.AccountAuditor(conf, logger=self.logger) def fake_audit_location_generator(*args, **kwargs): files = os.listdir(self.testdir) @@ -138,7 +148,7 @@ class TestAuditor(unittest.TestCase): @mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker) def test_account_auditor(self): conf = {} - test_auditor = auditor.AccountAuditor(conf) + test_auditor = auditor.AccountAuditor(conf, logger=self.logger) files = os.listdir(self.testdir) for f in files: path = os.path.join(self.testdir, f) @@ -146,5 +156,108 @@ class TestAuditor(unittest.TestCase): self.assertEqual(test_auditor.account_failures, 2) self.assertEqual(test_auditor.account_passes, 3) + +@patch_policies +class TestAuditorRealBrokerMigration( + AccountBrokerPreTrackContainerCountSetup, unittest.TestCase): + + def test_db_migration(self): + # add a few containers + policies = itertools.cycle(POLICIES) + num_containers = len(POLICIES) * 3 + per_policy_container_counts = defaultdict(int) + for i in range(num_containers): + name = 'test-container-%02d' % i + policy = next(policies) + self.broker.put_container(name, next(self.ts), + 0, 0, 0, int(policy)) + per_policy_container_counts[int(policy)] += 1 + + self.broker._commit_puts() + self.assertEqual(num_containers, + self.broker.get_info()['container_count']) + + # still un-migrated + self.assertUnmigrated(self.broker) + + # run auditor, and validate migration + conf = {'devices': self.tempdir, 'mount_check': False, + 'recon_cache_path': self.tempdir} + test_auditor = auditor.AccountAuditor(conf, logger=debug_logger()) + test_auditor.run_once() + + self.restore_account_broker() + + broker = auditor.AccountBroker(self.db_path) + # go after rows directly to avoid unintentional migration + with broker.get() as conn: + rows = conn.execute(''' + SELECT storage_policy_index, container_count + FROM policy_stat + ''').fetchall() + for policy_index, container_count in rows: + self.assertEqual(container_count, + per_policy_container_counts[policy_index]) + + +class TestAuditorRealBroker(unittest.TestCase): + + def setUp(self): + self.logger = debug_logger() + + @with_tempdir + def test_db_validate_fails(self, tempdir): + ts = (Timestamp(t).internal for t in itertools.count(int(time.time()))) + db_path = os.path.join(tempdir, 'sda', 'accounts', + '0', '0', '0', 'test.db') + broker = auditor.AccountBroker(db_path, account='a') + broker.initialize(next(ts)) + # add a few containers + policies = itertools.cycle(POLICIES) + num_containers = len(POLICIES) * 3 + per_policy_container_counts = defaultdict(int) + for i in range(num_containers): + name = 'test-container-%02d' % i + policy = next(policies) + broker.put_container(name, next(ts), 0, 0, 0, int(policy)) + per_policy_container_counts[int(policy)] += 1 + + broker._commit_puts() + self.assertEqual(broker.get_info()['container_count'], num_containers) + + messed_up_policy = random.choice(list(POLICIES)) + + # now mess up a policy_stats table count + with broker.get() as conn: + conn.executescript(''' + UPDATE policy_stat + SET container_count = container_count - 1 + WHERE storage_policy_index = %d; + ''' % int(messed_up_policy)) + + # validate it's messed up + policy_stats = broker.get_policy_stats() + self.assertEqual( + policy_stats[int(messed_up_policy)]['container_count'], + per_policy_container_counts[int(messed_up_policy)] - 1) + + # do an audit + conf = {'devices': tempdir, 'mount_check': False, + 'recon_cache_path': tempdir} + test_auditor = auditor.AccountAuditor(conf, logger=self.logger) + test_auditor.run_once() + + # validate errors + self.assertEqual(test_auditor.account_failures, 1) + error_lines = test_auditor.logger.get_lines_for_level('error') + self.assertEqual(len(error_lines), 1) + error_message = error_lines[0] + self.assert_(broker.db_file in error_message) + self.assert_('container_count' in error_message) + self.assert_('does not match' in error_message) + self.assertEqual(test_auditor.logger.get_increment_counts(), + {'failures': 1}) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/account/test_backend.py b/test/unit/account/test_backend.py index d98b803966..58c3560111 100644 --- a/test/unit/account/test_backend.py +++ b/test/unit/account/test_backend.py @@ -15,6 +15,7 @@ """ Tests for swift.account.backend """ +from collections import defaultdict import hashlib import unittest import pickle @@ -627,9 +628,10 @@ class TestAccountBroker(unittest.TestCase): put_timestamp, 0, 0, 0, policy.idx) - policy_stats = broker.get_policy_stats() stats = policy_stats[policy.idx] + if 'container_count' in stats: + self.assertEqual(stats['container_count'], 1) self.assertEqual(stats['object_count'], 0) self.assertEqual(stats['bytes_used'], 0) @@ -645,6 +647,8 @@ class TestAccountBroker(unittest.TestCase): policy_stats = broker.get_policy_stats() stats = policy_stats[policy.idx] + if 'container_count' in stats: + self.assertEqual(stats['container_count'], 1) self.assertEqual(stats['object_count'], count) self.assertEqual(stats['bytes_used'], count) @@ -652,6 +656,8 @@ class TestAccountBroker(unittest.TestCase): for policy_index, stats in policy_stats.items(): policy = POLICIES[policy_index] count = policy.idx * 100 # coupled with policy for test + if 'container_count' in stats: + self.assertEqual(stats['container_count'], 1) self.assertEqual(stats['object_count'], count) self.assertEqual(stats['bytes_used'], count) @@ -666,6 +672,8 @@ class TestAccountBroker(unittest.TestCase): policy_stats = broker.get_policy_stats() stats = policy_stats[policy.idx] + if 'container_count' in stats: + self.assertEqual(stats['container_count'], 0) self.assertEqual(stats['object_count'], 0) self.assertEqual(stats['bytes_used'], 0) @@ -685,8 +693,12 @@ class TestAccountBroker(unittest.TestCase): stats = broker.get_policy_stats() self.assertEqual(len(stats), 2) + if 'container_count' in stats[0]: + self.assertEqual(stats[0]['container_count'], 1) self.assertEqual(stats[0]['object_count'], 13) self.assertEqual(stats[0]['bytes_used'], 8156441) + if 'container_count' in stats[1]: + self.assertEqual(stats[1]['container_count'], 1) self.assertEqual(stats[1]['object_count'], 8) self.assertEqual(stats[1]['bytes_used'], 6085379) @@ -990,8 +1002,12 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker): # we should have stats for both containers stats = broker.get_policy_stats() self.assertEqual(len(stats), 2) + if 'container_count' in stats[0]: + self.assertEqual(stats[0]['container_count'], 1) self.assertEqual(stats[0]['object_count'], 1) self.assertEqual(stats[0]['bytes_used'], 2) + if 'container_count' in stats[1]: + self.assertEqual(stats[1]['container_count'], 1) self.assertEqual(stats[1]['object_count'], 3) self.assertEqual(stats[1]['bytes_used'], 4) @@ -1003,8 +1019,12 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker): conn.commit() stats = broker.get_policy_stats() self.assertEqual(len(stats), 2) + if 'container_count' in stats[0]: + self.assertEqual(stats[0]['container_count'], 0) self.assertEqual(stats[0]['object_count'], 0) self.assertEqual(stats[0]['bytes_used'], 0) + if 'container_count' in stats[1]: + self.assertEqual(stats[1]['container_count'], 1) self.assertEqual(stats[1]['object_count'], 3) self.assertEqual(stats[1]['bytes_used'], 4) @@ -1070,3 +1090,345 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker): with broker.get() as conn: conn.execute('SELECT * FROM policy_stat') conn.execute('SELECT storage_policy_index FROM container') + + +def pre_track_containers_create_policy_stat(self, conn): + """ + Copied from AccountBroker before the container_count column was + added. + Create policy_stat table which is specific to the account DB. + Not a part of Pluggable Back-ends, internal to the baseline code. + + :param conn: DB connection object + """ + conn.executescript(""" + CREATE TABLE policy_stat ( + storage_policy_index INTEGER PRIMARY KEY, + object_count INTEGER DEFAULT 0, + bytes_used INTEGER DEFAULT 0 + ); + INSERT OR IGNORE INTO policy_stat ( + storage_policy_index, object_count, bytes_used + ) + SELECT 0, object_count, bytes_used + FROM account_stat + WHERE container_count > 0; + """) + + +def pre_track_containers_create_container_table(self, conn): + """ + Copied from AccountBroker before the container_count column was + added (using old stat trigger script) + Create container table which is specific to the account DB. + + :param conn: DB connection object + """ + # revert to old trigger script to support one of the tests + OLD_POLICY_STAT_TRIGGER_SCRIPT = """ + CREATE TRIGGER container_insert_ps AFTER INSERT ON container + BEGIN + INSERT OR IGNORE INTO policy_stat + (storage_policy_index, object_count, bytes_used) + VALUES (new.storage_policy_index, 0, 0); + UPDATE policy_stat + SET object_count = object_count + new.object_count, + bytes_used = bytes_used + new.bytes_used + WHERE storage_policy_index = new.storage_policy_index; + END; + CREATE TRIGGER container_delete_ps AFTER DELETE ON container + BEGIN + UPDATE policy_stat + SET object_count = object_count - old.object_count, + bytes_used = bytes_used - old.bytes_used + WHERE storage_policy_index = old.storage_policy_index; + END; + + """ + conn.executescript(""" + CREATE TABLE container ( + ROWID INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT, + put_timestamp TEXT, + delete_timestamp TEXT, + object_count INTEGER, + bytes_used INTEGER, + deleted INTEGER DEFAULT 0, + storage_policy_index INTEGER DEFAULT 0 + ); + + CREATE INDEX ix_container_deleted_name ON + container (deleted, name); + + CREATE TRIGGER container_insert AFTER INSERT ON container + BEGIN + UPDATE account_stat + SET container_count = container_count + (1 - new.deleted), + object_count = object_count + new.object_count, + bytes_used = bytes_used + new.bytes_used, + hash = chexor(hash, new.name, + new.put_timestamp || '-' || + new.delete_timestamp || '-' || + new.object_count || '-' || new.bytes_used); + END; + + CREATE TRIGGER container_update BEFORE UPDATE ON container + BEGIN + SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT'); + END; + + + CREATE TRIGGER container_delete AFTER DELETE ON container + BEGIN + UPDATE account_stat + SET container_count = container_count - (1 - old.deleted), + object_count = object_count - old.object_count, + bytes_used = bytes_used - old.bytes_used, + hash = chexor(hash, old.name, + old.put_timestamp || '-' || + old.delete_timestamp || '-' || + old.object_count || '-' || old.bytes_used); + END; + """ + OLD_POLICY_STAT_TRIGGER_SCRIPT) + + +class AccountBrokerPreTrackContainerCountSetup(object): + def assertUnmigrated(self, broker): + with broker.get() as conn: + try: + conn.execute(''' + SELECT container_count FROM policy_stat + ''').fetchone()[0] + except sqlite3.OperationalError as err: + # confirm that the column really isn't there + self.assert_('no such column: container_count' in str(err)) + else: + self.fail('broker did not raise sqlite3.OperationalError ' + 'trying to select container_count from policy_stat!') + + def setUp(self): + # use old version of policy_stat + self._imported_create_policy_stat_table = \ + AccountBroker.create_policy_stat_table + AccountBroker.create_policy_stat_table = \ + pre_track_containers_create_policy_stat + # use old container table so we use old trigger for + # updating policy_stat + self._imported_create_container_table = \ + AccountBroker.create_container_table + AccountBroker.create_container_table = \ + pre_track_containers_create_container_table + + broker = AccountBroker(':memory:', account='a') + broker.initialize(Timestamp('1').internal) + self.assertUnmigrated(broker) + + self.tempdir = mkdtemp() + self.ts = (Timestamp(t).internal for t in itertools.count(int(time()))) + + self.db_path = os.path.join(self.tempdir, 'sda', 'accounts', + '0', '0', '0', 'test.db') + self.broker = AccountBroker(self.db_path, account='a') + self.broker.initialize(next(self.ts)) + + # Common sanity-check that our starting, pre-migration state correctly + # does not have the container_count column. + self.assertUnmigrated(self.broker) + + def tearDown(self): + rmtree(self.tempdir, ignore_errors=True) + + self.restore_account_broker() + + broker = AccountBroker(':memory:', account='a') + broker.initialize(Timestamp('1').internal) + with broker.get() as conn: + conn.execute('SELECT container_count FROM policy_stat') + + def restore_account_broker(self): + AccountBroker.create_policy_stat_table = \ + self._imported_create_policy_stat_table + AccountBroker.create_container_table = \ + self._imported_create_container_table + + +@patch_policies([StoragePolicy(0, 'zero', False), + StoragePolicy(1, 'one', True), + StoragePolicy(2, 'two', False), + StoragePolicy(3, 'three', False)]) +class TestAccountBrokerBeforePerPolicyContainerTrack( + AccountBrokerPreTrackContainerCountSetup, TestAccountBroker): + """ + Tests for AccountBroker against databases created before + the container_count column was added to the policy_stat table. + """ + + def test_policy_table_cont_count_do_migrations(self): + # add a few containers + num_containers = 8 + policies = itertools.cycle(POLICIES) + per_policy_container_counts = defaultdict(int) + + # add a few container entries + for i in range(num_containers): + name = 'test-container-%02d' % i + policy = next(policies) + self.broker.put_container(name, next(self.ts), + 0, 0, 0, int(policy)) + per_policy_container_counts[int(policy)] += 1 + + total_container_count = self.broker.get_info()['container_count'] + self.assertEqual(total_container_count, num_containers) + + # still un-migrated + self.assertUnmigrated(self.broker) + + policy_stats = self.broker.get_policy_stats() + self.assertEqual(len(policy_stats), len(per_policy_container_counts)) + for stats in policy_stats.values(): + self.assertEqual(stats['object_count'], 0) + self.assertEqual(stats['bytes_used'], 0) + # un-migrated dbs should not return container_count + self.assertFalse('container_count' in stats) + + # now force the migration + policy_stats = self.broker.get_policy_stats(do_migrations=True) + self.assertEqual(len(policy_stats), len(per_policy_container_counts)) + for policy_index, stats in policy_stats.items(): + self.assertEqual(stats['object_count'], 0) + self.assertEqual(stats['bytes_used'], 0) + self.assertEqual(stats['container_count'], + per_policy_container_counts[policy_index]) + + def test_policy_table_cont_count_update_get_stats(self): + # add a few container entries + for policy in POLICIES: + for i in range(0, policy.idx + 1): + container_name = 'c%s_0' % policy.idx + self.broker.put_container('c%s_%s' % (policy.idx, i), + 0, 0, 0, 0, policy.idx) + # _commit_puts_stale_ok() called by get_policy_stats() + + # calling get_policy_stats() with do_migrations will alter the table + # and populate it based on what's in the container table now + stats = self.broker.get_policy_stats(do_migrations=True) + + # now confirm that the column was created + with self.broker.get() as conn: + conn.execute('SELECT container_count FROM policy_stat') + + # confirm stats reporting back correctly + self.assertEqual(len(stats), 4) + for policy in POLICIES: + self.assertEqual(stats[policy.idx]['container_count'], + policy.idx + 1) + + # now delete one from each policy and check the stats + with self.broker.get() as conn: + for policy in POLICIES: + container_name = 'c%s_0' % policy.idx + conn.execute(''' + DELETE FROM container + WHERE name = ? + ''', (container_name,)) + conn.commit() + stats = self.broker.get_policy_stats() + self.assertEqual(len(stats), 4) + for policy in POLICIES: + self.assertEqual(stats[policy.idx]['container_count'], + policy.idx) + + # now put them back and make sure things are still cool + for policy in POLICIES: + container_name = 'c%s_0' % policy.idx + self.broker.put_container(container_name, 0, 0, 0, 0, policy.idx) + # _commit_puts_stale_ok() called by get_policy_stats() + + # confirm stats reporting back correctly + stats = self.broker.get_policy_stats() + self.assertEqual(len(stats), 4) + for policy in POLICIES: + self.assertEqual(stats[policy.idx]['container_count'], + policy.idx + 1) + + def test_per_policy_cont_count_migration_with_deleted(self): + num_containers = 15 + policies = itertools.cycle(POLICIES) + container_policy_map = {} + + # add a few container entries + for i in range(num_containers): + name = 'test-container-%02d' % i + policy = next(policies) + self.broker.put_container(name, next(self.ts), + 0, 0, 0, int(policy)) + # keep track of stub container policies + container_policy_map[name] = policy + + # delete about half of the containers + for i in range(0, num_containers, 2): + name = 'test-container-%02d' % i + policy = container_policy_map[name] + self.broker.put_container(name, 0, next(self.ts), + 0, 0, int(policy)) + + total_container_count = self.broker.get_info()['container_count'] + self.assertEqual(total_container_count, num_containers / 2) + + # trigger migration + policy_info = self.broker.get_policy_stats(do_migrations=True) + self.assertEqual(len(policy_info), min(num_containers, len(POLICIES))) + policy_container_count = sum(p['container_count'] for p in + policy_info.values()) + self.assertEqual(total_container_count, policy_container_count) + + def test_per_policy_cont_count_migration_with_single_policy(self): + num_containers = 100 + + with patch_policies(legacy_only=True): + policy = POLICIES[0] + # add a few container entries + for i in range(num_containers): + name = 'test-container-%02d' % i + self.broker.put_container(name, next(self.ts), + 0, 0, 0, int(policy)) + # delete about half of the containers + for i in range(0, num_containers, 2): + name = 'test-container-%02d' % i + self.broker.put_container(name, 0, next(self.ts), + 0, 0, int(policy)) + + total_container_count = self.broker.get_info()['container_count'] + # trigger migration + policy_info = self.broker.get_policy_stats(do_migrations=True) + + self.assertEqual(total_container_count, num_containers / 2) + + self.assertEqual(len(policy_info), 1) + policy_container_count = sum(p['container_count'] for p in + policy_info.values()) + self.assertEqual(total_container_count, policy_container_count) + + def test_per_policy_cont_count_migration_impossible(self): + with patch_policies(legacy_only=True): + # add a container for the legacy policy + policy = POLICIES[0] + self.broker.put_container('test-legacy-container', next(self.ts), + 0, 0, 0, int(policy)) + + # now create an impossible situation by adding a container for a + # policy index that doesn't exist + non_existant_policy_index = int(policy) + 1 + self.broker.put_container('test-non-existant-policy', + next(self.ts), 0, 0, 0, + non_existant_policy_index) + + total_container_count = self.broker.get_info()['container_count'] + + # trigger migration + policy_info = self.broker.get_policy_stats(do_migrations=True) + + self.assertEqual(total_container_count, 2) + self.assertEqual(len(policy_info), 2) + for policy_stat in policy_info.values(): + self.assertEqual(policy_stat['container_count'], 1) diff --git a/test/unit/account/test_server.py b/test/unit/account/test_server.py index e515b3d221..c18c57edb1 100644 --- a/test/unit/account/test_server.py +++ b/test/unit/account/test_server.py @@ -1708,6 +1708,9 @@ class TestAccountController(unittest.TestCase): self.assertEquals( resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' % POLICIES[0].name], '4') + self.assertEquals( + resp.headers['X-Account-Storage-Policy-%s-Container-Count' % + POLICIES[0].name], '1') def test_policy_stats_non_default(self): ts = itertools.count() @@ -1743,6 +1746,9 @@ class TestAccountController(unittest.TestCase): self.assertEquals( resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' % policy.name], '4') + self.assertEquals( + resp.headers['X-Account-Storage-Policy-%s-Container-Count' % + policy.name], '1') def test_empty_policy_stats(self): ts = itertools.count() diff --git a/test/unit/account/test_utils.py b/test/unit/account/test_utils.py index 46f8835bff..ea90decfc9 100644 --- a/test/unit/account/test_utils.py +++ b/test/unit/account/test_utils.py @@ -117,9 +117,70 @@ class TestAccountUtils(unittest.TestCase): }) for policy in POLICIES: prefix = 'X-Account-Storage-Policy-%s-' % policy.name + expected[prefix + 'Container-Count'] = 1 expected[prefix + 'Object-Count'] = int(policy) expected[prefix + 'Bytes-Used'] = int(policy) * 10 resp_headers = utils.get_response_headers(broker) + per_policy_container_headers = [ + h for h in resp_headers if + h.lower().startswith('x-account-storage-policy-') and + h.lower().endswith('-container-count')] + self.assertTrue(per_policy_container_headers) + for key, value in resp_headers.items(): + expected_value = expected.pop(key) + self.assertEqual(expected_value, str(value), + 'value for %r was %r not %r' % ( + key, value, expected_value)) + self.assertFalse(expected) + + @patch_policies + def test_get_response_headers_with_legacy_data(self): + broker = backend.AccountBroker(':memory:', account='a') + now = time.time() + with mock.patch('time.time', new=lambda: now): + broker.initialize(Timestamp(now).internal) + # add some container data + ts = (Timestamp(t).internal for t in itertools.count(int(now))) + total_containers = 0 + total_objects = 0 + total_bytes = 0 + for policy in POLICIES: + delete_timestamp = ts.next() + put_timestamp = ts.next() + object_count = int(policy) + bytes_used = int(policy) * 10 + broker.put_container('c-%s' % policy.name, put_timestamp, + delete_timestamp, object_count, bytes_used, + int(policy)) + total_containers += 1 + total_objects += object_count + total_bytes += bytes_used + expected = HeaderKeyDict({ + 'X-Account-Container-Count': total_containers, + 'X-Account-Object-Count': total_objects, + 'X-Account-Bytes-Used': total_bytes, + 'X-Timestamp': Timestamp(now).normal, + 'X-PUT-Timestamp': Timestamp(now).normal, + }) + for policy in POLICIES: + prefix = 'X-Account-Storage-Policy-%s-' % policy.name + expected[prefix + 'Object-Count'] = int(policy) + expected[prefix + 'Bytes-Used'] = int(policy) * 10 + orig_policy_stats = broker.get_policy_stats + + def stub_policy_stats(*args, **kwargs): + policy_stats = orig_policy_stats(*args, **kwargs) + for stats in policy_stats.values(): + # legacy db's won't return container_count + del stats['container_count'] + return policy_stats + broker.get_policy_stats = stub_policy_stats + resp_headers = utils.get_response_headers(broker) + per_policy_container_headers = [ + h for h in resp_headers if + h.lower().startswith('x-account-storage-policy-') and + h.lower().endswith('-container-count')] + self.assertFalse(per_policy_container_headers) for key, value in resp_headers.items(): expected_value = expected.pop(key) self.assertEqual(expected_value, str(value),