Merge "Add container_count to policy_stat table"
This commit is contained in:
commit
813cc0fe0d
@ -20,6 +20,7 @@ from random import random
|
||||
|
||||
import swift.common.db
|
||||
from swift.account.backend import AccountBroker, DATADIR
|
||||
from swift.common.exceptions import InvalidAccountInfo
|
||||
from swift.common.utils import get_logger, audit_location_generator, \
|
||||
config_true_value, dump_recon_cache, ratelimit_sleep
|
||||
from swift.common.daemon import Daemon
|
||||
@ -30,9 +31,9 @@ from eventlet import Timeout
|
||||
class AccountAuditor(Daemon):
|
||||
"""Audit accounts."""
|
||||
|
||||
def __init__(self, conf):
|
||||
def __init__(self, conf, logger=None):
|
||||
self.conf = conf
|
||||
self.logger = get_logger(conf, log_route='account-auditor')
|
||||
self.logger = logger or get_logger(conf, log_route='account-auditor')
|
||||
self.devices = conf.get('devices', '/srv/node')
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.interval = int(conf.get('interval', 1800))
|
||||
@ -104,6 +105,29 @@ class AccountAuditor(Daemon):
|
||||
dump_recon_cache({'account_auditor_pass_completed': elapsed},
|
||||
self.rcache, self.logger)
|
||||
|
||||
def validate_per_policy_counts(self, broker):
|
||||
info = broker.get_info()
|
||||
policy_stats = broker.get_policy_stats(do_migrations=True)
|
||||
policy_totals = {
|
||||
'container_count': 0,
|
||||
'object_count': 0,
|
||||
'bytes_used': 0,
|
||||
}
|
||||
for policy_stat in policy_stats.values():
|
||||
for key in policy_totals:
|
||||
policy_totals[key] += policy_stat[key]
|
||||
|
||||
for key in policy_totals:
|
||||
if policy_totals[key] == info[key]:
|
||||
continue
|
||||
raise InvalidAccountInfo(_(
|
||||
'The total %(key)s for the container (%(total)s) does not '
|
||||
'match the sum of %(key)s across policies (%(sum)s)') % {
|
||||
'key': key,
|
||||
'total': info[key],
|
||||
'sum': policy_totals[key],
|
||||
})
|
||||
|
||||
def account_audit(self, path):
|
||||
"""
|
||||
Audits the given account path
|
||||
@ -114,10 +138,15 @@ class AccountAuditor(Daemon):
|
||||
try:
|
||||
broker = AccountBroker(path)
|
||||
if not broker.is_deleted():
|
||||
broker.get_info()
|
||||
self.validate_per_policy_counts(broker)
|
||||
self.logger.increment('passes')
|
||||
self.account_passes += 1
|
||||
self.logger.debug('Audit passed for %s' % broker)
|
||||
except InvalidAccountInfo as e:
|
||||
self.logger.increment('failures')
|
||||
self.account_failures += 1
|
||||
self.logger.error(
|
||||
_('Audit Failed for %s: %s'), path, str(e))
|
||||
except (Exception, Timeout):
|
||||
self.logger.increment('failures')
|
||||
self.account_failures += 1
|
||||
|
@ -32,17 +32,19 @@ POLICY_STAT_TRIGGER_SCRIPT = """
|
||||
CREATE TRIGGER container_insert_ps AFTER INSERT ON container
|
||||
BEGIN
|
||||
INSERT OR IGNORE INTO policy_stat
|
||||
(storage_policy_index, object_count, bytes_used)
|
||||
VALUES (new.storage_policy_index, 0, 0);
|
||||
(storage_policy_index, container_count, object_count, bytes_used)
|
||||
VALUES (new.storage_policy_index, 0, 0, 0);
|
||||
UPDATE policy_stat
|
||||
SET object_count = object_count + new.object_count,
|
||||
SET container_count = container_count + (1 - new.deleted),
|
||||
object_count = object_count + new.object_count,
|
||||
bytes_used = bytes_used + new.bytes_used
|
||||
WHERE storage_policy_index = new.storage_policy_index;
|
||||
END;
|
||||
CREATE TRIGGER container_delete_ps AFTER DELETE ON container
|
||||
BEGIN
|
||||
UPDATE policy_stat
|
||||
SET object_count = object_count - old.object_count,
|
||||
SET container_count = container_count - (1 - old.deleted),
|
||||
object_count = object_count - old.object_count,
|
||||
bytes_used = bytes_used - old.bytes_used
|
||||
WHERE storage_policy_index = old.storage_policy_index;
|
||||
END;
|
||||
@ -165,13 +167,15 @@ class AccountBroker(DatabaseBroker):
|
||||
conn.executescript("""
|
||||
CREATE TABLE policy_stat (
|
||||
storage_policy_index INTEGER PRIMARY KEY,
|
||||
container_count INTEGER DEFAULT 0,
|
||||
object_count INTEGER DEFAULT 0,
|
||||
bytes_used INTEGER DEFAULT 0
|
||||
);
|
||||
INSERT OR IGNORE INTO policy_stat (
|
||||
storage_policy_index, object_count, bytes_used
|
||||
storage_policy_index, container_count, object_count,
|
||||
bytes_used
|
||||
)
|
||||
SELECT 0, object_count, bytes_used
|
||||
SELECT 0, container_count, object_count, bytes_used
|
||||
FROM account_stat
|
||||
WHERE container_count > 0;
|
||||
""")
|
||||
@ -296,24 +300,45 @@ class AccountBroker(DatabaseBroker):
|
||||
return row['status'] == "DELETED" or (
|
||||
row['delete_timestamp'] > row['put_timestamp'])
|
||||
|
||||
def get_policy_stats(self):
|
||||
def get_policy_stats(self, do_migrations=False):
|
||||
"""
|
||||
Get global policy stats for the account.
|
||||
|
||||
:param do_migrations: boolean, if True the policy stat dicts will
|
||||
always include the 'container_count' key;
|
||||
otherwise it may be ommited on legacy databases
|
||||
until they are migrated.
|
||||
|
||||
:returns: dict of policy stats where the key is the policy index and
|
||||
the value is a dictionary like {'object_count': M,
|
||||
'bytes_used': N}
|
||||
'bytes_used': N, 'container_count': L}
|
||||
"""
|
||||
info = []
|
||||
columns = [
|
||||
'storage_policy_index',
|
||||
'container_count',
|
||||
'object_count',
|
||||
'bytes_used',
|
||||
]
|
||||
|
||||
def run_query():
|
||||
return (conn.execute('''
|
||||
SELECT %s
|
||||
FROM policy_stat
|
||||
''' % ', '.join(columns)).fetchall())
|
||||
|
||||
self._commit_puts_stale_ok()
|
||||
info = []
|
||||
with self.get() as conn:
|
||||
try:
|
||||
info = (conn.execute('''
|
||||
SELECT storage_policy_index, object_count, bytes_used
|
||||
FROM policy_stat
|
||||
''').fetchall())
|
||||
info = run_query()
|
||||
except sqlite3.OperationalError as err:
|
||||
if "no such table: policy_stat" not in str(err):
|
||||
if "no such column: container_count" in str(err):
|
||||
if do_migrations:
|
||||
self._migrate_add_container_count(conn)
|
||||
else:
|
||||
columns.remove('container_count')
|
||||
info = run_query()
|
||||
elif "no such table: policy_stat" not in str(err):
|
||||
raise
|
||||
|
||||
policy_stats = {}
|
||||
@ -501,10 +526,72 @@ class AccountBroker(DatabaseBroker):
|
||||
self._migrate_add_storage_policy_index(conn)
|
||||
_really_merge_items(conn)
|
||||
|
||||
def _migrate_add_container_count(self, conn):
|
||||
"""
|
||||
Add the container_count column to the 'policy_stat' table and
|
||||
update it
|
||||
|
||||
:param conn: DB connection object
|
||||
"""
|
||||
# add the container_count column
|
||||
curs = conn.cursor()
|
||||
curs.executescript('''
|
||||
DROP TRIGGER container_delete_ps;
|
||||
DROP TRIGGER container_insert_ps;
|
||||
ALTER TABLE policy_stat
|
||||
ADD COLUMN container_count INTEGER DEFAULT 0;
|
||||
''' + POLICY_STAT_TRIGGER_SCRIPT)
|
||||
|
||||
# keep the simple case simple, if there's only one entry in the
|
||||
# policy_stat table we just copy the total container count from the
|
||||
# account_stat table
|
||||
|
||||
# if that triggers an update then the where changes <> 0 *would* exist
|
||||
# and the insert or replace from the count subqueries won't execute
|
||||
|
||||
curs.executescript("""
|
||||
UPDATE policy_stat
|
||||
SET container_count = (
|
||||
SELECT container_count
|
||||
FROM account_stat)
|
||||
WHERE (
|
||||
SELECT COUNT(storage_policy_index)
|
||||
FROM policy_stat
|
||||
) <= 1;
|
||||
|
||||
INSERT OR REPLACE INTO policy_stat (
|
||||
storage_policy_index,
|
||||
container_count,
|
||||
object_count,
|
||||
bytes_used
|
||||
)
|
||||
SELECT p.storage_policy_index,
|
||||
c.count,
|
||||
p.object_count,
|
||||
p.bytes_used
|
||||
FROM (
|
||||
SELECT storage_policy_index,
|
||||
COUNT(*) as count
|
||||
FROM container
|
||||
WHERE deleted = 0
|
||||
GROUP BY storage_policy_index
|
||||
) c
|
||||
JOIN policy_stat p
|
||||
ON p.storage_policy_index = c.storage_policy_index
|
||||
WHERE NOT EXISTS(
|
||||
SELECT changes() as change
|
||||
FROM policy_stat
|
||||
WHERE change <> 0
|
||||
);
|
||||
""")
|
||||
conn.commit()
|
||||
|
||||
def _migrate_add_storage_policy_index(self, conn):
|
||||
"""
|
||||
Add the storage_policy_index column to the 'container' table and
|
||||
set up triggers, creating the policy_stat table if needed.
|
||||
|
||||
:param conn: DB connection object
|
||||
"""
|
||||
try:
|
||||
self.create_policy_stat_table(conn)
|
||||
|
@ -79,6 +79,10 @@ class DeviceUnavailable(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidAccountInfo(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class PathNotDir(OSError):
|
||||
pass
|
||||
|
||||
|
@ -13,6 +13,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from collections import defaultdict
|
||||
import itertools
|
||||
import unittest
|
||||
import mock
|
||||
import time
|
||||
@ -23,7 +25,11 @@ from shutil import rmtree
|
||||
from eventlet import Timeout
|
||||
|
||||
from swift.account import auditor
|
||||
from test.unit import FakeLogger
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.utils import Timestamp
|
||||
from test.unit import debug_logger, patch_policies, with_tempdir
|
||||
from test.unit.account.test_backend import (
|
||||
AccountBrokerPreTrackContainerCountSetup)
|
||||
|
||||
|
||||
class FakeAccountBroker(object):
|
||||
@ -37,16 +43,22 @@ class FakeAccountBroker(object):
|
||||
|
||||
def get_info(self):
|
||||
if self.file.startswith('fail'):
|
||||
raise ValueError
|
||||
raise ValueError()
|
||||
if self.file.startswith('true'):
|
||||
return 'ok'
|
||||
return defaultdict(int)
|
||||
|
||||
def get_policy_stats(self, **kwargs):
|
||||
if self.file.startswith('fail'):
|
||||
raise ValueError()
|
||||
if self.file.startswith('true'):
|
||||
return defaultdict(int)
|
||||
|
||||
|
||||
class TestAuditor(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.testdir = os.path.join(mkdtemp(), 'tmp_test_account_auditor')
|
||||
self.logger = FakeLogger()
|
||||
self.logger = debug_logger()
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
os.mkdir(self.testdir)
|
||||
fnames = ['true1.db', 'true2.db', 'true3.db',
|
||||
@ -69,9 +81,7 @@ class TestAuditor(unittest.TestCase):
|
||||
|
||||
def sleep(self, sec):
|
||||
self.times += 1
|
||||
if self.times < sleep_times:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
if self.times >= sleep_times:
|
||||
# stop forever by an error
|
||||
raise ValueError()
|
||||
|
||||
@ -79,7 +89,7 @@ class TestAuditor(unittest.TestCase):
|
||||
return time.time()
|
||||
|
||||
conf = {}
|
||||
test_auditor = auditor.AccountAuditor(conf)
|
||||
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
|
||||
|
||||
with mock.patch('swift.account.auditor.time', FakeTime()):
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
@ -106,7 +116,7 @@ class TestAuditor(unittest.TestCase):
|
||||
@mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker)
|
||||
def test_run_once(self):
|
||||
conf = {}
|
||||
test_auditor = auditor.AccountAuditor(conf)
|
||||
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
|
||||
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
files = os.listdir(self.testdir)
|
||||
@ -121,7 +131,7 @@ class TestAuditor(unittest.TestCase):
|
||||
@mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker)
|
||||
def test_one_audit_pass(self):
|
||||
conf = {}
|
||||
test_auditor = auditor.AccountAuditor(conf)
|
||||
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
|
||||
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
files = os.listdir(self.testdir)
|
||||
@ -138,7 +148,7 @@ class TestAuditor(unittest.TestCase):
|
||||
@mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker)
|
||||
def test_account_auditor(self):
|
||||
conf = {}
|
||||
test_auditor = auditor.AccountAuditor(conf)
|
||||
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
|
||||
files = os.listdir(self.testdir)
|
||||
for f in files:
|
||||
path = os.path.join(self.testdir, f)
|
||||
@ -146,5 +156,108 @@ class TestAuditor(unittest.TestCase):
|
||||
self.assertEqual(test_auditor.account_failures, 2)
|
||||
self.assertEqual(test_auditor.account_passes, 3)
|
||||
|
||||
|
||||
@patch_policies
|
||||
class TestAuditorRealBrokerMigration(
|
||||
AccountBrokerPreTrackContainerCountSetup, unittest.TestCase):
|
||||
|
||||
def test_db_migration(self):
|
||||
# add a few containers
|
||||
policies = itertools.cycle(POLICIES)
|
||||
num_containers = len(POLICIES) * 3
|
||||
per_policy_container_counts = defaultdict(int)
|
||||
for i in range(num_containers):
|
||||
name = 'test-container-%02d' % i
|
||||
policy = next(policies)
|
||||
self.broker.put_container(name, next(self.ts),
|
||||
0, 0, 0, int(policy))
|
||||
per_policy_container_counts[int(policy)] += 1
|
||||
|
||||
self.broker._commit_puts()
|
||||
self.assertEqual(num_containers,
|
||||
self.broker.get_info()['container_count'])
|
||||
|
||||
# still un-migrated
|
||||
self.assertUnmigrated(self.broker)
|
||||
|
||||
# run auditor, and validate migration
|
||||
conf = {'devices': self.tempdir, 'mount_check': False,
|
||||
'recon_cache_path': self.tempdir}
|
||||
test_auditor = auditor.AccountAuditor(conf, logger=debug_logger())
|
||||
test_auditor.run_once()
|
||||
|
||||
self.restore_account_broker()
|
||||
|
||||
broker = auditor.AccountBroker(self.db_path)
|
||||
# go after rows directly to avoid unintentional migration
|
||||
with broker.get() as conn:
|
||||
rows = conn.execute('''
|
||||
SELECT storage_policy_index, container_count
|
||||
FROM policy_stat
|
||||
''').fetchall()
|
||||
for policy_index, container_count in rows:
|
||||
self.assertEqual(container_count,
|
||||
per_policy_container_counts[policy_index])
|
||||
|
||||
|
||||
class TestAuditorRealBroker(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.logger = debug_logger()
|
||||
|
||||
@with_tempdir
|
||||
def test_db_validate_fails(self, tempdir):
|
||||
ts = (Timestamp(t).internal for t in itertools.count(int(time.time())))
|
||||
db_path = os.path.join(tempdir, 'sda', 'accounts',
|
||||
'0', '0', '0', 'test.db')
|
||||
broker = auditor.AccountBroker(db_path, account='a')
|
||||
broker.initialize(next(ts))
|
||||
# add a few containers
|
||||
policies = itertools.cycle(POLICIES)
|
||||
num_containers = len(POLICIES) * 3
|
||||
per_policy_container_counts = defaultdict(int)
|
||||
for i in range(num_containers):
|
||||
name = 'test-container-%02d' % i
|
||||
policy = next(policies)
|
||||
broker.put_container(name, next(ts), 0, 0, 0, int(policy))
|
||||
per_policy_container_counts[int(policy)] += 1
|
||||
|
||||
broker._commit_puts()
|
||||
self.assertEqual(broker.get_info()['container_count'], num_containers)
|
||||
|
||||
messed_up_policy = random.choice(list(POLICIES))
|
||||
|
||||
# now mess up a policy_stats table count
|
||||
with broker.get() as conn:
|
||||
conn.executescript('''
|
||||
UPDATE policy_stat
|
||||
SET container_count = container_count - 1
|
||||
WHERE storage_policy_index = %d;
|
||||
''' % int(messed_up_policy))
|
||||
|
||||
# validate it's messed up
|
||||
policy_stats = broker.get_policy_stats()
|
||||
self.assertEqual(
|
||||
policy_stats[int(messed_up_policy)]['container_count'],
|
||||
per_policy_container_counts[int(messed_up_policy)] - 1)
|
||||
|
||||
# do an audit
|
||||
conf = {'devices': tempdir, 'mount_check': False,
|
||||
'recon_cache_path': tempdir}
|
||||
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
|
||||
test_auditor.run_once()
|
||||
|
||||
# validate errors
|
||||
self.assertEqual(test_auditor.account_failures, 1)
|
||||
error_lines = test_auditor.logger.get_lines_for_level('error')
|
||||
self.assertEqual(len(error_lines), 1)
|
||||
error_message = error_lines[0]
|
||||
self.assert_(broker.db_file in error_message)
|
||||
self.assert_('container_count' in error_message)
|
||||
self.assert_('does not match' in error_message)
|
||||
self.assertEqual(test_auditor.logger.get_increment_counts(),
|
||||
{'failures': 1})
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
""" Tests for swift.account.backend """
|
||||
|
||||
from collections import defaultdict
|
||||
import hashlib
|
||||
import json
|
||||
import unittest
|
||||
@ -656,9 +657,10 @@ class TestAccountBroker(unittest.TestCase):
|
||||
put_timestamp, 0,
|
||||
0, 0,
|
||||
policy.idx)
|
||||
|
||||
policy_stats = broker.get_policy_stats()
|
||||
stats = policy_stats[policy.idx]
|
||||
if 'container_count' in stats:
|
||||
self.assertEqual(stats['container_count'], 1)
|
||||
self.assertEqual(stats['object_count'], 0)
|
||||
self.assertEqual(stats['bytes_used'], 0)
|
||||
|
||||
@ -674,6 +676,8 @@ class TestAccountBroker(unittest.TestCase):
|
||||
|
||||
policy_stats = broker.get_policy_stats()
|
||||
stats = policy_stats[policy.idx]
|
||||
if 'container_count' in stats:
|
||||
self.assertEqual(stats['container_count'], 1)
|
||||
self.assertEqual(stats['object_count'], count)
|
||||
self.assertEqual(stats['bytes_used'], count)
|
||||
|
||||
@ -681,6 +685,8 @@ class TestAccountBroker(unittest.TestCase):
|
||||
for policy_index, stats in policy_stats.items():
|
||||
policy = POLICIES[policy_index]
|
||||
count = policy.idx * 100 # coupled with policy for test
|
||||
if 'container_count' in stats:
|
||||
self.assertEqual(stats['container_count'], 1)
|
||||
self.assertEqual(stats['object_count'], count)
|
||||
self.assertEqual(stats['bytes_used'], count)
|
||||
|
||||
@ -695,6 +701,8 @@ class TestAccountBroker(unittest.TestCase):
|
||||
|
||||
policy_stats = broker.get_policy_stats()
|
||||
stats = policy_stats[policy.idx]
|
||||
if 'container_count' in stats:
|
||||
self.assertEqual(stats['container_count'], 0)
|
||||
self.assertEqual(stats['object_count'], 0)
|
||||
self.assertEqual(stats['bytes_used'], 0)
|
||||
|
||||
@ -714,8 +722,12 @@ class TestAccountBroker(unittest.TestCase):
|
||||
|
||||
stats = broker.get_policy_stats()
|
||||
self.assertEqual(len(stats), 2)
|
||||
if 'container_count' in stats[0]:
|
||||
self.assertEqual(stats[0]['container_count'], 1)
|
||||
self.assertEqual(stats[0]['object_count'], 13)
|
||||
self.assertEqual(stats[0]['bytes_used'], 8156441)
|
||||
if 'container_count' in stats[1]:
|
||||
self.assertEqual(stats[1]['container_count'], 1)
|
||||
self.assertEqual(stats[1]['object_count'], 8)
|
||||
self.assertEqual(stats[1]['bytes_used'], 6085379)
|
||||
|
||||
@ -1019,8 +1031,12 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker):
|
||||
# we should have stats for both containers
|
||||
stats = broker.get_policy_stats()
|
||||
self.assertEqual(len(stats), 2)
|
||||
if 'container_count' in stats[0]:
|
||||
self.assertEqual(stats[0]['container_count'], 1)
|
||||
self.assertEqual(stats[0]['object_count'], 1)
|
||||
self.assertEqual(stats[0]['bytes_used'], 2)
|
||||
if 'container_count' in stats[1]:
|
||||
self.assertEqual(stats[1]['container_count'], 1)
|
||||
self.assertEqual(stats[1]['object_count'], 3)
|
||||
self.assertEqual(stats[1]['bytes_used'], 4)
|
||||
|
||||
@ -1032,8 +1048,12 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker):
|
||||
conn.commit()
|
||||
stats = broker.get_policy_stats()
|
||||
self.assertEqual(len(stats), 2)
|
||||
if 'container_count' in stats[0]:
|
||||
self.assertEqual(stats[0]['container_count'], 0)
|
||||
self.assertEqual(stats[0]['object_count'], 0)
|
||||
self.assertEqual(stats[0]['bytes_used'], 0)
|
||||
if 'container_count' in stats[1]:
|
||||
self.assertEqual(stats[1]['container_count'], 1)
|
||||
self.assertEqual(stats[1]['object_count'], 3)
|
||||
self.assertEqual(stats[1]['bytes_used'], 4)
|
||||
|
||||
@ -1099,3 +1119,345 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker):
|
||||
with broker.get() as conn:
|
||||
conn.execute('SELECT * FROM policy_stat')
|
||||
conn.execute('SELECT storage_policy_index FROM container')
|
||||
|
||||
|
||||
def pre_track_containers_create_policy_stat(self, conn):
|
||||
"""
|
||||
Copied from AccountBroker before the container_count column was
|
||||
added.
|
||||
Create policy_stat table which is specific to the account DB.
|
||||
Not a part of Pluggable Back-ends, internal to the baseline code.
|
||||
|
||||
:param conn: DB connection object
|
||||
"""
|
||||
conn.executescript("""
|
||||
CREATE TABLE policy_stat (
|
||||
storage_policy_index INTEGER PRIMARY KEY,
|
||||
object_count INTEGER DEFAULT 0,
|
||||
bytes_used INTEGER DEFAULT 0
|
||||
);
|
||||
INSERT OR IGNORE INTO policy_stat (
|
||||
storage_policy_index, object_count, bytes_used
|
||||
)
|
||||
SELECT 0, object_count, bytes_used
|
||||
FROM account_stat
|
||||
WHERE container_count > 0;
|
||||
""")
|
||||
|
||||
|
||||
def pre_track_containers_create_container_table(self, conn):
|
||||
"""
|
||||
Copied from AccountBroker before the container_count column was
|
||||
added (using old stat trigger script)
|
||||
Create container table which is specific to the account DB.
|
||||
|
||||
:param conn: DB connection object
|
||||
"""
|
||||
# revert to old trigger script to support one of the tests
|
||||
OLD_POLICY_STAT_TRIGGER_SCRIPT = """
|
||||
CREATE TRIGGER container_insert_ps AFTER INSERT ON container
|
||||
BEGIN
|
||||
INSERT OR IGNORE INTO policy_stat
|
||||
(storage_policy_index, object_count, bytes_used)
|
||||
VALUES (new.storage_policy_index, 0, 0);
|
||||
UPDATE policy_stat
|
||||
SET object_count = object_count + new.object_count,
|
||||
bytes_used = bytes_used + new.bytes_used
|
||||
WHERE storage_policy_index = new.storage_policy_index;
|
||||
END;
|
||||
CREATE TRIGGER container_delete_ps AFTER DELETE ON container
|
||||
BEGIN
|
||||
UPDATE policy_stat
|
||||
SET object_count = object_count - old.object_count,
|
||||
bytes_used = bytes_used - old.bytes_used
|
||||
WHERE storage_policy_index = old.storage_policy_index;
|
||||
END;
|
||||
|
||||
"""
|
||||
conn.executescript("""
|
||||
CREATE TABLE container (
|
||||
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT,
|
||||
put_timestamp TEXT,
|
||||
delete_timestamp TEXT,
|
||||
object_count INTEGER,
|
||||
bytes_used INTEGER,
|
||||
deleted INTEGER DEFAULT 0,
|
||||
storage_policy_index INTEGER DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE INDEX ix_container_deleted_name ON
|
||||
container (deleted, name);
|
||||
|
||||
CREATE TRIGGER container_insert AFTER INSERT ON container
|
||||
BEGIN
|
||||
UPDATE account_stat
|
||||
SET container_count = container_count + (1 - new.deleted),
|
||||
object_count = object_count + new.object_count,
|
||||
bytes_used = bytes_used + new.bytes_used,
|
||||
hash = chexor(hash, new.name,
|
||||
new.put_timestamp || '-' ||
|
||||
new.delete_timestamp || '-' ||
|
||||
new.object_count || '-' || new.bytes_used);
|
||||
END;
|
||||
|
||||
CREATE TRIGGER container_update BEFORE UPDATE ON container
|
||||
BEGIN
|
||||
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
|
||||
END;
|
||||
|
||||
|
||||
CREATE TRIGGER container_delete AFTER DELETE ON container
|
||||
BEGIN
|
||||
UPDATE account_stat
|
||||
SET container_count = container_count - (1 - old.deleted),
|
||||
object_count = object_count - old.object_count,
|
||||
bytes_used = bytes_used - old.bytes_used,
|
||||
hash = chexor(hash, old.name,
|
||||
old.put_timestamp || '-' ||
|
||||
old.delete_timestamp || '-' ||
|
||||
old.object_count || '-' || old.bytes_used);
|
||||
END;
|
||||
""" + OLD_POLICY_STAT_TRIGGER_SCRIPT)
|
||||
|
||||
|
||||
class AccountBrokerPreTrackContainerCountSetup(object):
|
||||
def assertUnmigrated(self, broker):
|
||||
with broker.get() as conn:
|
||||
try:
|
||||
conn.execute('''
|
||||
SELECT container_count FROM policy_stat
|
||||
''').fetchone()[0]
|
||||
except sqlite3.OperationalError as err:
|
||||
# confirm that the column really isn't there
|
||||
self.assert_('no such column: container_count' in str(err))
|
||||
else:
|
||||
self.fail('broker did not raise sqlite3.OperationalError '
|
||||
'trying to select container_count from policy_stat!')
|
||||
|
||||
def setUp(self):
|
||||
# use old version of policy_stat
|
||||
self._imported_create_policy_stat_table = \
|
||||
AccountBroker.create_policy_stat_table
|
||||
AccountBroker.create_policy_stat_table = \
|
||||
pre_track_containers_create_policy_stat
|
||||
# use old container table so we use old trigger for
|
||||
# updating policy_stat
|
||||
self._imported_create_container_table = \
|
||||
AccountBroker.create_container_table
|
||||
AccountBroker.create_container_table = \
|
||||
pre_track_containers_create_container_table
|
||||
|
||||
broker = AccountBroker(':memory:', account='a')
|
||||
broker.initialize(Timestamp('1').internal)
|
||||
self.assertUnmigrated(broker)
|
||||
|
||||
self.tempdir = mkdtemp()
|
||||
self.ts = (Timestamp(t).internal for t in itertools.count(int(time())))
|
||||
|
||||
self.db_path = os.path.join(self.tempdir, 'sda', 'accounts',
|
||||
'0', '0', '0', 'test.db')
|
||||
self.broker = AccountBroker(self.db_path, account='a')
|
||||
self.broker.initialize(next(self.ts))
|
||||
|
||||
# Common sanity-check that our starting, pre-migration state correctly
|
||||
# does not have the container_count column.
|
||||
self.assertUnmigrated(self.broker)
|
||||
|
||||
def tearDown(self):
|
||||
rmtree(self.tempdir, ignore_errors=True)
|
||||
|
||||
self.restore_account_broker()
|
||||
|
||||
broker = AccountBroker(':memory:', account='a')
|
||||
broker.initialize(Timestamp('1').internal)
|
||||
with broker.get() as conn:
|
||||
conn.execute('SELECT container_count FROM policy_stat')
|
||||
|
||||
def restore_account_broker(self):
|
||||
AccountBroker.create_policy_stat_table = \
|
||||
self._imported_create_policy_stat_table
|
||||
AccountBroker.create_container_table = \
|
||||
self._imported_create_container_table
|
||||
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', False),
|
||||
StoragePolicy(1, 'one', True),
|
||||
StoragePolicy(2, 'two', False),
|
||||
StoragePolicy(3, 'three', False)])
|
||||
class TestAccountBrokerBeforePerPolicyContainerTrack(
|
||||
AccountBrokerPreTrackContainerCountSetup, TestAccountBroker):
|
||||
"""
|
||||
Tests for AccountBroker against databases created before
|
||||
the container_count column was added to the policy_stat table.
|
||||
"""
|
||||
|
||||
def test_policy_table_cont_count_do_migrations(self):
|
||||
# add a few containers
|
||||
num_containers = 8
|
||||
policies = itertools.cycle(POLICIES)
|
||||
per_policy_container_counts = defaultdict(int)
|
||||
|
||||
# add a few container entries
|
||||
for i in range(num_containers):
|
||||
name = 'test-container-%02d' % i
|
||||
policy = next(policies)
|
||||
self.broker.put_container(name, next(self.ts),
|
||||
0, 0, 0, int(policy))
|
||||
per_policy_container_counts[int(policy)] += 1
|
||||
|
||||
total_container_count = self.broker.get_info()['container_count']
|
||||
self.assertEqual(total_container_count, num_containers)
|
||||
|
||||
# still un-migrated
|
||||
self.assertUnmigrated(self.broker)
|
||||
|
||||
policy_stats = self.broker.get_policy_stats()
|
||||
self.assertEqual(len(policy_stats), len(per_policy_container_counts))
|
||||
for stats in policy_stats.values():
|
||||
self.assertEqual(stats['object_count'], 0)
|
||||
self.assertEqual(stats['bytes_used'], 0)
|
||||
# un-migrated dbs should not return container_count
|
||||
self.assertFalse('container_count' in stats)
|
||||
|
||||
# now force the migration
|
||||
policy_stats = self.broker.get_policy_stats(do_migrations=True)
|
||||
self.assertEqual(len(policy_stats), len(per_policy_container_counts))
|
||||
for policy_index, stats in policy_stats.items():
|
||||
self.assertEqual(stats['object_count'], 0)
|
||||
self.assertEqual(stats['bytes_used'], 0)
|
||||
self.assertEqual(stats['container_count'],
|
||||
per_policy_container_counts[policy_index])
|
||||
|
||||
def test_policy_table_cont_count_update_get_stats(self):
|
||||
# add a few container entries
|
||||
for policy in POLICIES:
|
||||
for i in range(0, policy.idx + 1):
|
||||
container_name = 'c%s_0' % policy.idx
|
||||
self.broker.put_container('c%s_%s' % (policy.idx, i),
|
||||
0, 0, 0, 0, policy.idx)
|
||||
# _commit_puts_stale_ok() called by get_policy_stats()
|
||||
|
||||
# calling get_policy_stats() with do_migrations will alter the table
|
||||
# and populate it based on what's in the container table now
|
||||
stats = self.broker.get_policy_stats(do_migrations=True)
|
||||
|
||||
# now confirm that the column was created
|
||||
with self.broker.get() as conn:
|
||||
conn.execute('SELECT container_count FROM policy_stat')
|
||||
|
||||
# confirm stats reporting back correctly
|
||||
self.assertEqual(len(stats), 4)
|
||||
for policy in POLICIES:
|
||||
self.assertEqual(stats[policy.idx]['container_count'],
|
||||
policy.idx + 1)
|
||||
|
||||
# now delete one from each policy and check the stats
|
||||
with self.broker.get() as conn:
|
||||
for policy in POLICIES:
|
||||
container_name = 'c%s_0' % policy.idx
|
||||
conn.execute('''
|
||||
DELETE FROM container
|
||||
WHERE name = ?
|
||||
''', (container_name,))
|
||||
conn.commit()
|
||||
stats = self.broker.get_policy_stats()
|
||||
self.assertEqual(len(stats), 4)
|
||||
for policy in POLICIES:
|
||||
self.assertEqual(stats[policy.idx]['container_count'],
|
||||
policy.idx)
|
||||
|
||||
# now put them back and make sure things are still cool
|
||||
for policy in POLICIES:
|
||||
container_name = 'c%s_0' % policy.idx
|
||||
self.broker.put_container(container_name, 0, 0, 0, 0, policy.idx)
|
||||
# _commit_puts_stale_ok() called by get_policy_stats()
|
||||
|
||||
# confirm stats reporting back correctly
|
||||
stats = self.broker.get_policy_stats()
|
||||
self.assertEqual(len(stats), 4)
|
||||
for policy in POLICIES:
|
||||
self.assertEqual(stats[policy.idx]['container_count'],
|
||||
policy.idx + 1)
|
||||
|
||||
def test_per_policy_cont_count_migration_with_deleted(self):
|
||||
num_containers = 15
|
||||
policies = itertools.cycle(POLICIES)
|
||||
container_policy_map = {}
|
||||
|
||||
# add a few container entries
|
||||
for i in range(num_containers):
|
||||
name = 'test-container-%02d' % i
|
||||
policy = next(policies)
|
||||
self.broker.put_container(name, next(self.ts),
|
||||
0, 0, 0, int(policy))
|
||||
# keep track of stub container policies
|
||||
container_policy_map[name] = policy
|
||||
|
||||
# delete about half of the containers
|
||||
for i in range(0, num_containers, 2):
|
||||
name = 'test-container-%02d' % i
|
||||
policy = container_policy_map[name]
|
||||
self.broker.put_container(name, 0, next(self.ts),
|
||||
0, 0, int(policy))
|
||||
|
||||
total_container_count = self.broker.get_info()['container_count']
|
||||
self.assertEqual(total_container_count, num_containers / 2)
|
||||
|
||||
# trigger migration
|
||||
policy_info = self.broker.get_policy_stats(do_migrations=True)
|
||||
self.assertEqual(len(policy_info), min(num_containers, len(POLICIES)))
|
||||
policy_container_count = sum(p['container_count'] for p in
|
||||
policy_info.values())
|
||||
self.assertEqual(total_container_count, policy_container_count)
|
||||
|
||||
def test_per_policy_cont_count_migration_with_single_policy(self):
|
||||
num_containers = 100
|
||||
|
||||
with patch_policies(legacy_only=True):
|
||||
policy = POLICIES[0]
|
||||
# add a few container entries
|
||||
for i in range(num_containers):
|
||||
name = 'test-container-%02d' % i
|
||||
self.broker.put_container(name, next(self.ts),
|
||||
0, 0, 0, int(policy))
|
||||
# delete about half of the containers
|
||||
for i in range(0, num_containers, 2):
|
||||
name = 'test-container-%02d' % i
|
||||
self.broker.put_container(name, 0, next(self.ts),
|
||||
0, 0, int(policy))
|
||||
|
||||
total_container_count = self.broker.get_info()['container_count']
|
||||
# trigger migration
|
||||
policy_info = self.broker.get_policy_stats(do_migrations=True)
|
||||
|
||||
self.assertEqual(total_container_count, num_containers / 2)
|
||||
|
||||
self.assertEqual(len(policy_info), 1)
|
||||
policy_container_count = sum(p['container_count'] for p in
|
||||
policy_info.values())
|
||||
self.assertEqual(total_container_count, policy_container_count)
|
||||
|
||||
def test_per_policy_cont_count_migration_impossible(self):
|
||||
with patch_policies(legacy_only=True):
|
||||
# add a container for the legacy policy
|
||||
policy = POLICIES[0]
|
||||
self.broker.put_container('test-legacy-container', next(self.ts),
|
||||
0, 0, 0, int(policy))
|
||||
|
||||
# now create an impossible situation by adding a container for a
|
||||
# policy index that doesn't exist
|
||||
non_existant_policy_index = int(policy) + 1
|
||||
self.broker.put_container('test-non-existant-policy',
|
||||
next(self.ts), 0, 0, 0,
|
||||
non_existant_policy_index)
|
||||
|
||||
total_container_count = self.broker.get_info()['container_count']
|
||||
|
||||
# trigger migration
|
||||
policy_info = self.broker.get_policy_stats(do_migrations=True)
|
||||
|
||||
self.assertEqual(total_container_count, 2)
|
||||
self.assertEqual(len(policy_info), 2)
|
||||
for policy_stat in policy_info.values():
|
||||
self.assertEqual(policy_stat['container_count'], 1)
|
||||
|
@ -1708,6 +1708,9 @@ class TestAccountController(unittest.TestCase):
|
||||
self.assertEquals(
|
||||
resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' %
|
||||
POLICIES[0].name], '4')
|
||||
self.assertEquals(
|
||||
resp.headers['X-Account-Storage-Policy-%s-Container-Count' %
|
||||
POLICIES[0].name], '1')
|
||||
|
||||
def test_policy_stats_non_default(self):
|
||||
ts = itertools.count()
|
||||
@ -1743,6 +1746,9 @@ class TestAccountController(unittest.TestCase):
|
||||
self.assertEquals(
|
||||
resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' %
|
||||
policy.name], '4')
|
||||
self.assertEquals(
|
||||
resp.headers['X-Account-Storage-Policy-%s-Container-Count' %
|
||||
policy.name], '1')
|
||||
|
||||
def test_empty_policy_stats(self):
|
||||
ts = itertools.count()
|
||||
|
@ -117,9 +117,70 @@ class TestAccountUtils(unittest.TestCase):
|
||||
})
|
||||
for policy in POLICIES:
|
||||
prefix = 'X-Account-Storage-Policy-%s-' % policy.name
|
||||
expected[prefix + 'Container-Count'] = 1
|
||||
expected[prefix + 'Object-Count'] = int(policy)
|
||||
expected[prefix + 'Bytes-Used'] = int(policy) * 10
|
||||
resp_headers = utils.get_response_headers(broker)
|
||||
per_policy_container_headers = [
|
||||
h for h in resp_headers if
|
||||
h.lower().startswith('x-account-storage-policy-') and
|
||||
h.lower().endswith('-container-count')]
|
||||
self.assertTrue(per_policy_container_headers)
|
||||
for key, value in resp_headers.items():
|
||||
expected_value = expected.pop(key)
|
||||
self.assertEqual(expected_value, str(value),
|
||||
'value for %r was %r not %r' % (
|
||||
key, value, expected_value))
|
||||
self.assertFalse(expected)
|
||||
|
||||
@patch_policies
|
||||
def test_get_response_headers_with_legacy_data(self):
|
||||
broker = backend.AccountBroker(':memory:', account='a')
|
||||
now = time.time()
|
||||
with mock.patch('time.time', new=lambda: now):
|
||||
broker.initialize(Timestamp(now).internal)
|
||||
# add some container data
|
||||
ts = (Timestamp(t).internal for t in itertools.count(int(now)))
|
||||
total_containers = 0
|
||||
total_objects = 0
|
||||
total_bytes = 0
|
||||
for policy in POLICIES:
|
||||
delete_timestamp = ts.next()
|
||||
put_timestamp = ts.next()
|
||||
object_count = int(policy)
|
||||
bytes_used = int(policy) * 10
|
||||
broker.put_container('c-%s' % policy.name, put_timestamp,
|
||||
delete_timestamp, object_count, bytes_used,
|
||||
int(policy))
|
||||
total_containers += 1
|
||||
total_objects += object_count
|
||||
total_bytes += bytes_used
|
||||
expected = HeaderKeyDict({
|
||||
'X-Account-Container-Count': total_containers,
|
||||
'X-Account-Object-Count': total_objects,
|
||||
'X-Account-Bytes-Used': total_bytes,
|
||||
'X-Timestamp': Timestamp(now).normal,
|
||||
'X-PUT-Timestamp': Timestamp(now).normal,
|
||||
})
|
||||
for policy in POLICIES:
|
||||
prefix = 'X-Account-Storage-Policy-%s-' % policy.name
|
||||
expected[prefix + 'Object-Count'] = int(policy)
|
||||
expected[prefix + 'Bytes-Used'] = int(policy) * 10
|
||||
orig_policy_stats = broker.get_policy_stats
|
||||
|
||||
def stub_policy_stats(*args, **kwargs):
|
||||
policy_stats = orig_policy_stats(*args, **kwargs)
|
||||
for stats in policy_stats.values():
|
||||
# legacy db's won't return container_count
|
||||
del stats['container_count']
|
||||
return policy_stats
|
||||
broker.get_policy_stats = stub_policy_stats
|
||||
resp_headers = utils.get_response_headers(broker)
|
||||
per_policy_container_headers = [
|
||||
h for h in resp_headers if
|
||||
h.lower().startswith('x-account-storage-policy-') and
|
||||
h.lower().endswith('-container-count')]
|
||||
self.assertFalse(per_policy_container_headers)
|
||||
for key, value in resp_headers.items():
|
||||
expected_value = expected.pop(key)
|
||||
self.assertEqual(expected_value, str(value),
|
||||
|
Loading…
Reference in New Issue
Block a user