Merge "Fix account replication during pre-storage-policy upgrade"
This commit is contained in:
@@ -460,6 +460,7 @@ class AccountBroker(DatabaseBroker):
|
|||||||
max_rowid = -1
|
max_rowid = -1
|
||||||
curs = conn.cursor()
|
curs = conn.cursor()
|
||||||
for rec in item_list:
|
for rec in item_list:
|
||||||
|
rec.setdefault('storage_policy_index', 0) # legacy
|
||||||
record = [rec['name'], rec['put_timestamp'],
|
record = [rec['name'], rec['put_timestamp'],
|
||||||
rec['delete_timestamp'], rec['object_count'],
|
rec['delete_timestamp'], rec['object_count'],
|
||||||
rec['bytes_used'], rec['deleted'],
|
rec['bytes_used'], rec['deleted'],
|
||||||
|
@@ -32,7 +32,7 @@ import random
|
|||||||
|
|
||||||
from swift.account.backend import AccountBroker
|
from swift.account.backend import AccountBroker
|
||||||
from swift.common.utils import Timestamp
|
from swift.common.utils import Timestamp
|
||||||
from test.unit import patch_policies, with_tempdir
|
from test.unit import patch_policies, with_tempdir, make_timestamp_iter
|
||||||
from swift.common.db import DatabaseConnectionError
|
from swift.common.db import DatabaseConnectionError
|
||||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||||
|
|
||||||
@@ -1120,6 +1120,45 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker):
|
|||||||
conn.execute('SELECT * FROM policy_stat')
|
conn.execute('SELECT * FROM policy_stat')
|
||||||
conn.execute('SELECT storage_policy_index FROM container')
|
conn.execute('SELECT storage_policy_index FROM container')
|
||||||
|
|
||||||
|
@with_tempdir
|
||||||
|
def test_pre_storage_policy_replication(self, tempdir):
|
||||||
|
ts = make_timestamp_iter()
|
||||||
|
|
||||||
|
# make and two account database "replicas"
|
||||||
|
old_broker = AccountBroker(os.path.join(tempdir, 'old_account.db'),
|
||||||
|
account='a')
|
||||||
|
old_broker.initialize(ts.next().internal)
|
||||||
|
new_broker = AccountBroker(os.path.join(tempdir, 'new_account.db'),
|
||||||
|
account='a')
|
||||||
|
new_broker.initialize(ts.next().internal)
|
||||||
|
|
||||||
|
# manually insert an existing row to avoid migration for old database
|
||||||
|
with old_broker.get() as conn:
|
||||||
|
conn.execute('''
|
||||||
|
INSERT INTO container (name, put_timestamp,
|
||||||
|
delete_timestamp, object_count, bytes_used,
|
||||||
|
deleted)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?)
|
||||||
|
''', ('test_name', ts.next().internal, 0, 1, 2, 0))
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
# get replication info and rows form old database
|
||||||
|
info = old_broker.get_info()
|
||||||
|
rows = old_broker.get_items_since(0, 10)
|
||||||
|
|
||||||
|
# "send" replication rows to new database
|
||||||
|
new_broker.merge_items(rows, info['id'])
|
||||||
|
|
||||||
|
# make sure "test_name" container in new database
|
||||||
|
self.assertEqual(new_broker.get_info()['container_count'], 1)
|
||||||
|
for c in new_broker.list_containers_iter(1, None, None, None, None):
|
||||||
|
self.assertEqual(c, ('test_name', 1, 2, 0))
|
||||||
|
|
||||||
|
# full migration successful
|
||||||
|
with new_broker.get() as conn:
|
||||||
|
conn.execute('SELECT * FROM policy_stat')
|
||||||
|
conn.execute('SELECT storage_policy_index FROM container')
|
||||||
|
|
||||||
|
|
||||||
def pre_track_containers_create_policy_stat(self, conn):
|
def pre_track_containers_create_policy_stat(self, conn):
|
||||||
"""
|
"""
|
||||||
|
Reference in New Issue
Block a user