Add Storage Policy Support to Accounts

This change updates the account HEAD handler to report out per
policy object and byte usage for the account.  Cumulative values
are still reported and policy names are used in the report
out (unless request is sent to an account server directly in
which case policy indexes are used for easier accounting).

Below is an example of the relevant HEAD response for a cluster
with 3 policies and just a few small objects:

   X-Account-Container-Count: 3
   X-Account-Object-Count: 3
   X-Account-Bytes-Used: 21
   X-Storage-Policy-Bronze-Object-Count: 1
   X-Storage-Policy-Bronze-Bytes-Used: 7
   X-Storage-Policy-Silver-Object-Count: 1
   X-Storage-Policy-Silver-Bytes-Used: 7
   X-Storage-Policy-Gold-Object-Count: 1
   X-Storage-Policy-Gold-Bytes-Used: 7

Set a DEFAULT storage_policy_index for existing container rows during
migration.

Copy existing object_count and bytes_used in policy_stat table during
migration.

DocImpact
Implements: blueprint storage-policies
Change-Id: I5ec251f9a8014dd89764340de927d09466c72221
This commit is contained in:
Paul Luse 2014-05-27 19:02:52 -07:00 committed by Clay Gerrard
parent 81bc31e6ec
commit 00a162c4d4
9 changed files with 994 additions and 87 deletions

View File

@ -31,6 +31,28 @@ from swift.common.db import DatabaseBroker, DatabaseConnectionError, \
DATADIR = 'accounts'
POLICY_STAT_TRIGGER_SCRIPT = """
CREATE TRIGGER container_insert_ps AFTER INSERT ON container
BEGIN
INSERT OR IGNORE INTO policy_stat
(storage_policy_index, object_count, bytes_used)
VALUES (new.storage_policy_index, 0, 0);
UPDATE policy_stat
SET object_count = object_count + new.object_count,
bytes_used = bytes_used + new.bytes_used
WHERE storage_policy_index = new.storage_policy_index;
END;
CREATE TRIGGER container_delete_ps AFTER DELETE ON container
BEGIN
UPDATE policy_stat
SET object_count = object_count - old.object_count,
bytes_used = bytes_used - old.bytes_used
WHERE storage_policy_index = old.storage_policy_index;
END;
"""
class AccountBroker(DatabaseBroker):
"""Encapsulates working with an account database."""
db_type = 'account'
@ -49,6 +71,7 @@ class AccountBroker(DatabaseBroker):
'Attempting to create a new database with no account set')
self.create_container_table(conn)
self.create_account_stat_table(conn, put_timestamp)
self.create_policy_stat_table(conn)
def create_container_table(self, conn):
"""
@ -64,7 +87,8 @@ class AccountBroker(DatabaseBroker):
delete_timestamp TEXT,
object_count INTEGER,
bytes_used INTEGER,
deleted INTEGER DEFAULT 0
deleted INTEGER DEFAULT 0,
storage_policy_index INTEGER DEFAULT 0
);
CREATE INDEX ix_container_deleted_name ON
@ -99,7 +123,7 @@ class AccountBroker(DatabaseBroker):
old.delete_timestamp || '-' ||
old.object_count || '-' || old.bytes_used);
END;
""")
""" + POLICY_STAT_TRIGGER_SCRIPT)
def create_account_stat_table(self, conn, put_timestamp):
"""
@ -134,6 +158,27 @@ class AccountBroker(DatabaseBroker):
''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
put_timestamp))
def create_policy_stat_table(self, conn):
"""
Create policy_stat table which is specific to the account DB.
Not a part of Pluggable Back-ends, internal to the baseline code.
:param conn: DB connection object
"""
conn.executescript("""
CREATE TABLE policy_stat (
storage_policy_index INTEGER PRIMARY KEY,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0
);
INSERT OR IGNORE INTO policy_stat (
storage_policy_index, object_count, bytes_used
)
SELECT 0, object_count, bytes_used
FROM account_stat
WHERE container_count > 0;
""")
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
@ -159,16 +204,24 @@ class AccountBroker(DatabaseBroker):
def _commit_puts_load(self, item_list, entry):
"""See :func:`swift.common.db.DatabaseBroker._commit_puts_load`"""
(name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted) = \
pickle.loads(entry.decode('base64'))
loaded = pickle.loads(entry.decode('base64'))
# check to see if the update includes policy_index or not
(name, put_timestamp, delete_timestamp, object_count, bytes_used,
deleted) = loaded[:6]
if len(loaded) > 6:
storage_policy_index = loaded[6]
else:
# legacy support during upgrade until first non legacy storage
# policy is defined
storage_policy_index = 0
item_list.append(
{'name': name,
'put_timestamp': put_timestamp,
'delete_timestamp': delete_timestamp,
'object_count': object_count,
'bytes_used': bytes_used,
'deleted': deleted})
'deleted': deleted,
'storage_policy_index': storage_policy_index})
def empty(self):
"""
@ -183,7 +236,7 @@ class AccountBroker(DatabaseBroker):
return (row[0] == 0)
def put_container(self, name, put_timestamp, delete_timestamp,
object_count, bytes_used):
object_count, bytes_used, storage_policy_index):
"""
Create a container with the given attributes.
@ -192,6 +245,7 @@ class AccountBroker(DatabaseBroker):
:param delete_timestamp: delete_timestamp of the container to create
:param object_count: number of objects in the container
:param bytes_used: number of bytes used by the container
:param storage_policy_index: the storage policy for this container
"""
if delete_timestamp > put_timestamp and \
object_count in (None, '', 0, '0'):
@ -202,7 +256,8 @@ class AccountBroker(DatabaseBroker):
'delete_timestamp': delete_timestamp,
'object_count': object_count,
'bytes_used': bytes_used,
'deleted': deleted}
'deleted': deleted,
'storage_policy_index': storage_policy_index}
if self.db_file == ':memory:':
self.merge_items([record])
return
@ -225,7 +280,7 @@ class AccountBroker(DatabaseBroker):
fp.write(':')
fp.write(pickle.dumps(
(name, put_timestamp, delete_timestamp, object_count,
bytes_used, deleted),
bytes_used, deleted, storage_policy_index),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
@ -255,19 +310,47 @@ class AccountBroker(DatabaseBroker):
FROM account_stat''').fetchone()
return (row['status'] == "DELETED")
def get_policy_stats(self):
"""
Get global policy stats for the account.
:returns: dict of policy stats where the key is the policy index and
the value is a dictionary like {'object_count': M,
'bytes_used': N}
"""
info = []
self._commit_puts_stale_ok()
with self.get() as conn:
try:
info = (conn.execute('''
SELECT storage_policy_index, object_count, bytes_used
FROM policy_stat
''').fetchall())
except sqlite3.OperationalError as err:
if "no such table: policy_stat" not in str(err):
raise
policy_stats = {}
for row in info:
stats = dict(row)
key = stats.pop('storage_policy_index')
policy_stats[key] = stats
return policy_stats
def get_info(self):
"""
Get global data for the account.
:returns: dict with keys: account, created_at, put_timestamp,
delete_timestamp, container_count, object_count,
bytes_used, hash, id
delete_timestamp, status_changed_at, container_count,
object_count, bytes_used, hash, id
"""
self._commit_puts_stale_ok()
with self.get() as conn:
return dict(conn.execute('''
SELECT account, created_at, put_timestamp, delete_timestamp,
container_count, object_count, bytes_used, hash, id
status_changed_at, container_count, object_count,
bytes_used, hash, id
FROM account_stat
''').fetchone())
@ -359,18 +442,20 @@ class AccountBroker(DatabaseBroker):
:param item_list: list of dictionaries of {'name', 'put_timestamp',
'delete_timestamp', 'object_count', 'bytes_used',
'deleted'}
'deleted', 'storage_policy_index'}
:param source: if defined, update incoming_sync with the source
"""
with self.get() as conn:
def _really_merge_items(conn):
max_rowid = -1
for rec in item_list:
record = [rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'],
rec['bytes_used'], rec['deleted']]
rec['bytes_used'], rec['deleted'],
rec['storage_policy_index']]
query = '''
SELECT name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted
object_count, bytes_used, deleted,
storage_policy_index
FROM container WHERE name = ?
'''
if self.get_db_version(conn) >= 1:
@ -400,8 +485,8 @@ class AccountBroker(DatabaseBroker):
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
deleted, storage_policy_index)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', record)
if source:
max_rowid = max(max_rowid, rec['ROWID'])
@ -413,7 +498,33 @@ class AccountBroker(DatabaseBroker):
''', (max_rowid, source))
except sqlite3.IntegrityError:
conn.execute('''
UPDATE incoming_sync SET sync_point=max(?, sync_point)
UPDATE incoming_sync
SET sync_point=max(?, sync_point)
WHERE remote_id=?
''', (max_rowid, source))
conn.commit()
with self.get() as conn:
# create the policy stat table if needed and add spi to container
try:
_really_merge_items(conn)
except sqlite3.OperationalError as err:
if 'no such column: storage_policy_index' not in str(err):
raise
self._migrate_add_storage_policy_index(conn)
_really_merge_items(conn)
def _migrate_add_storage_policy_index(self, conn):
"""
Add the storage_policy_index column to the 'container' table and
set up triggers, creating the policy_stat table if needed.
"""
try:
self.create_policy_stat_table(conn)
except sqlite3.OperationalError as err:
if 'table policy_stat already exists' not in str(err):
raise
conn.executescript('''
ALTER TABLE container
ADD COLUMN storage_policy_index INTEGER DEFAULT 0;
''' + POLICY_STAT_TRIGGER_SCRIPT)

View File

@ -22,7 +22,7 @@ from eventlet import Timeout
import swift.common.db
from swift.account.backend import AccountBroker, DATADIR
from swift.account.utils import account_listing_response
from swift.account.utils import account_listing_response, get_response_headers
from swift.common.db import DatabaseConnectionError, DatabaseAlreadyExists
from swift.common.request_helpers import get_param, get_listing_content_type, \
split_and_validate_path
@ -38,6 +38,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, \
HTTPPreconditionFailed, HTTPConflict, Request, \
HTTPInsufficientStorage, HTTPException
from swift.common.request_helpers import is_sys_or_user_meta
from swift.common.storage_policy import POLICY_INDEX
class AccountController(object):
@ -108,6 +109,7 @@ class AccountController(object):
return HTTPInsufficientStorage(drive=drive, request=req)
if container: # put account container
pending_timeout = None
container_policy_index = req.headers.get(POLICY_INDEX, 0)
if 'x-trans-id' in req.headers:
pending_timeout = 3
broker = self._get_account_broker(drive, part, account,
@ -125,7 +127,8 @@ class AccountController(object):
broker.put_container(container, req.headers['x-put-timestamp'],
req.headers['x-delete-timestamp'],
req.headers['x-object-count'],
req.headers['x-bytes-used'])
req.headers['x-bytes-used'],
container_policy_index)
if req.headers['x-delete-timestamp'] > \
req.headers['x-put-timestamp']:
return HTTPNoContent(request=req)
@ -172,16 +175,7 @@ class AccountController(object):
stale_reads_ok=True)
if broker.is_deleted():
return self._deleted_response(broker, req, HTTPNotFound)
info = broker.get_info()
headers = {
'X-Account-Container-Count': info['container_count'],
'X-Account-Object-Count': info['object_count'],
'X-Account-Bytes-Used': info['bytes_used'],
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp']}
headers.update((key, value)
for key, (value, timestamp) in
broker.metadata.iteritems() if value != '')
headers = get_response_headers(broker)
headers['Content-Type'] = out_content_type
return HTTPNoContent(request=req, headers=headers, charset='utf-8')

View File

@ -18,6 +18,7 @@ from xml.sax import saxutils
from swift.common.swob import HTTPOk, HTTPNoContent
from swift.common.utils import json, normalize_timestamp
from swift.common.storage_policy import POLICIES
class FakeAccountBroker(object):
@ -40,13 +41,11 @@ class FakeAccountBroker(object):
def metadata(self):
return {}
def get_policy_stats(self):
return {}
def account_listing_response(account, req, response_content_type, broker=None,
limit='', marker='', end_marker='', prefix='',
delimiter=''):
if broker is None:
broker = FakeAccountBroker()
def get_response_headers(broker):
info = broker.get_info()
resp_headers = {
'X-Account-Container-Count': info['container_count'],
@ -54,9 +53,28 @@ def account_listing_response(account, req, response_content_type, broker=None,
'X-Account-Bytes-Used': info['bytes_used'],
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp']}
policy_stats = broker.get_policy_stats()
for policy_idx, stats in policy_stats.items():
policy = POLICIES.get_by_index(policy_idx)
if not policy:
continue
header_prefix = 'X-Account-Storage-Policy-%s-%%s' % policy.name
for key, value in stats.items():
header_name = header_prefix % key.replace('_', '-')
resp_headers[header_name] = value
resp_headers.update((key, value)
for key, (value, timestamp) in
broker.metadata.iteritems() if value != '')
return resp_headers
def account_listing_response(account, req, response_content_type, broker=None,
limit='', marker='', end_marker='', prefix='',
delimiter=''):
if broker is None:
broker = FakeAccountBroker()
resp_headers = get_response_headers(broker)
account_list = broker.list_containers_iter(limit, marker, end_marker,
prefix, delimiter)

View File

@ -203,6 +203,7 @@ class ContainerController(object):
'x-object-count': info['object_count'],
'x-bytes-used': info['bytes_used'],
'x-trans-id': req.headers.get('x-trans-id', '-'),
POLICY_INDEX: info['storage_policy_index'],
'user-agent': 'container-server %s' % os.getpid(),
'referer': req.as_referer()})
if req.headers.get('x-account-override-deleted', 'no').lower() == \

View File

@ -33,6 +33,7 @@ from swift.common.utils import get_logger, config_true_value, ismount, \
dump_recon_cache, quorum_size
from swift.common.daemon import Daemon
from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
from swift.common.storage_policy import POLICY_INDEX
class ContainerUpdater(Daemon):
@ -221,7 +222,8 @@ class ContainerUpdater(Daemon):
part, nodes = self.get_account_ring().get_nodes(info['account'])
events = [spawn(self.container_report, node, part, container,
info['put_timestamp'], info['delete_timestamp'],
info['object_count'], info['bytes_used'])
info['object_count'], info['bytes_used'],
info['storage_policy_index'])
for node in nodes]
successes = 0
for event in events:
@ -254,7 +256,8 @@ class ContainerUpdater(Daemon):
self.no_changes += 1
def container_report(self, node, part, container, put_timestamp,
delete_timestamp, count, bytes):
delete_timestamp, count, bytes,
storage_policy_index):
"""
Report container info to an account server.
@ -265,6 +268,7 @@ class ContainerUpdater(Daemon):
:param delete_timestamp: delete timestamp
:param count: object count in the container
:param bytes: bytes used in the container
:param storage_policy_index: the policy index for the container
"""
with ConnectionTimeout(self.conn_timeout):
try:
@ -274,6 +278,7 @@ class ContainerUpdater(Daemon):
'X-Object-Count': count,
'X-Bytes-Used': bytes,
'X-Account-Override-Deleted': 'yes',
POLICY_INDEX: storage_policy_index,
'user-agent': self.user_agent}
conn = http_connect(
node['ip'], node['port'], node['device'], part,

View File

@ -17,14 +17,24 @@
import hashlib
import unittest
import pickle
import os
from time import sleep, time
from uuid import uuid4
from tempfile import mkdtemp
from shutil import rmtree
import sqlite3
import itertools
from contextlib import contextmanager
from swift.account.backend import AccountBroker
from swift.common.utils import normalize_timestamp
from test.unit import patch_policies, with_tempdir
from swift.common.db import DatabaseConnectionError
from swift.common.storage_policy import StoragePolicy, POLICIES
@patch_policies
class TestAccountBroker(unittest.TestCase):
"""Tests for AccountBroker"""
@ -70,16 +80,19 @@ class TestAccountBroker(unittest.TestCase):
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
self.assert_(broker.empty())
broker.put_container('o', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('o', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
self.assert_(not broker.empty())
sleep(.00001)
broker.put_container('o', 0, normalize_timestamp(time()), 0, 0)
broker.put_container('o', 0, normalize_timestamp(time()), 0, 0,
POLICIES.default.idx)
self.assert_(broker.empty())
def test_reclaim(self):
broker = AccountBroker(':memory:', account='test_account')
broker.initialize(normalize_timestamp('1'))
broker.put_container('c', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('c', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM container "
@ -96,7 +109,8 @@ class TestAccountBroker(unittest.TestCase):
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 0)
sleep(.00001)
broker.put_container('c', 0, normalize_timestamp(time()), 0, 0)
broker.put_container('c', 0, normalize_timestamp(time()), 0, 0,
POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM container "
@ -122,9 +136,9 @@ class TestAccountBroker(unittest.TestCase):
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 0)
# Test reclaim after deletion. Create 3 test containers
broker.put_container('x', 0, 0, 0, 0)
broker.put_container('y', 0, 0, 0, 0)
broker.put_container('z', 0, 0, 0, 0)
broker.put_container('x', 0, 0, 0, 0, POLICIES.default.idx)
broker.put_container('y', 0, 0, 0, 0, POLICIES.default.idx)
broker.put_container('z', 0, 0, 0, 0, POLICIES.default.idx)
broker.reclaim(normalize_timestamp(time()), time())
# self.assertEqual(len(res), 2)
# self.assert_(isinstance(res, tuple))
@ -144,11 +158,32 @@ class TestAccountBroker(unittest.TestCase):
# self.assert_('z' in containers)
# self.assert_('a' not in containers)
def test_delete_db_status(self):
start = int(time())
ts = itertools.count(start)
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp(ts.next()))
info = broker.get_info()
self.assertEqual(info['put_timestamp'], normalize_timestamp(start))
self.assert_(float(info['created_at']) >= start)
self.assertEqual(info['delete_timestamp'], '0')
self.assertEqual(info['status_changed_at'], '0')
# delete it
delete_timestamp = normalize_timestamp(ts.next())
broker.delete_db(delete_timestamp)
info = broker.get_info()
self.assertEqual(info['put_timestamp'], normalize_timestamp(start))
self.assert_(float(info['created_at']) >= start)
self.assertEqual(info['delete_timestamp'], delete_timestamp)
self.assertEqual(info['status_changed_at'], delete_timestamp)
def test_delete_container(self):
# Test AccountBroker.delete_container
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
broker.put_container('o', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('o', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM container "
@ -157,7 +192,8 @@ class TestAccountBroker(unittest.TestCase):
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 0)
sleep(.00001)
broker.put_container('o', 0, normalize_timestamp(time()), 0, 0)
broker.put_container('o', 0, normalize_timestamp(time()), 0, 0,
POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM container "
@ -173,7 +209,8 @@ class TestAccountBroker(unittest.TestCase):
# Create initial container
timestamp = normalize_timestamp(time())
broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0,
POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@ -185,7 +222,8 @@ class TestAccountBroker(unittest.TestCase):
"SELECT deleted FROM container").fetchone()[0], 0)
# Reput same event
broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0,
POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@ -199,7 +237,8 @@ class TestAccountBroker(unittest.TestCase):
# Put new event
sleep(.00001)
timestamp = normalize_timestamp(time())
broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0,
POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@ -212,7 +251,8 @@ class TestAccountBroker(unittest.TestCase):
# Put old event
otimestamp = normalize_timestamp(float(timestamp) - 1)
broker.put_container('"{<container \'&\' name>}"', otimestamp, 0, 0, 0)
broker.put_container('"{<container \'&\' name>}"', otimestamp, 0, 0, 0,
POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@ -225,7 +265,8 @@ class TestAccountBroker(unittest.TestCase):
# Put old delete event
dtimestamp = normalize_timestamp(float(timestamp) - 1)
broker.put_container('"{<container \'&\' name>}"', 0, dtimestamp, 0, 0)
broker.put_container('"{<container \'&\' name>}"', 0, dtimestamp, 0, 0,
POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@ -242,7 +283,8 @@ class TestAccountBroker(unittest.TestCase):
# Put new delete event
sleep(.00001)
timestamp = normalize_timestamp(time())
broker.put_container('"{<container \'&\' name>}"', 0, timestamp, 0, 0)
broker.put_container('"{<container \'&\' name>}"', 0, timestamp, 0, 0,
POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@ -256,7 +298,8 @@ class TestAccountBroker(unittest.TestCase):
# Put new event
sleep(.00001)
timestamp = normalize_timestamp(time())
broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0,
POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@ -275,31 +318,39 @@ class TestAccountBroker(unittest.TestCase):
info = broker.get_info()
self.assertEqual(info['account'], 'test1')
self.assertEqual(info['hash'], '00000000000000000000000000000000')
self.assertEqual(info['put_timestamp'], normalize_timestamp(1))
self.assertEqual(info['delete_timestamp'], '0')
self.assertEqual(info['status_changed_at'], '0')
info = broker.get_info()
self.assertEqual(info['container_count'], 0)
broker.put_container('c1', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('c1', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 1)
sleep(.00001)
broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 2)
sleep(.00001)
broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 2)
sleep(.00001)
broker.put_container('c1', 0, normalize_timestamp(time()), 0, 0)
broker.put_container('c1', 0, normalize_timestamp(time()), 0, 0,
POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 1)
sleep(.00001)
broker.put_container('c2', 0, normalize_timestamp(time()), 0, 0)
broker.put_container('c2', 0, normalize_timestamp(time()), 0, 0,
POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 0)
@ -310,14 +361,17 @@ class TestAccountBroker(unittest.TestCase):
for cont1 in xrange(4):
for cont2 in xrange(125):
broker.put_container('%d-%04d' % (cont1, cont2),
normalize_timestamp(time()), 0, 0, 0)
normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
for cont in xrange(125):
broker.put_container('2-0051-%04d' % cont,
normalize_timestamp(time()), 0, 0, 0)
normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
for cont in xrange(125):
broker.put_container('3-%04d-0049' % cont,
normalize_timestamp(time()), 0, 0, 0)
normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
listing = broker.list_containers_iter(100, '', None, None, '')
self.assertEqual(len(listing), 100)
@ -381,7 +435,8 @@ class TestAccountBroker(unittest.TestCase):
'3-0047-', '3-0048', '3-0048-', '3-0049',
'3-0049-', '3-0050'])
broker.put_container('3-0049-', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('3-0049-', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
listing = broker.list_containers_iter(10, '3-0048', None, None, None)
self.assertEqual(len(listing), 10)
self.assertEqual([row[0] for row in listing],
@ -406,16 +461,26 @@ class TestAccountBroker(unittest.TestCase):
# account that has an odd container with a trailing delimiter
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
broker.put_container('a', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('a-', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('a-a', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('a-a-a', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('a-a-b', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('a-b', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('b', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('b-a', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('b-b', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('c', normalize_timestamp(time()), 0, 0, 0)
broker.put_container('a', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
broker.put_container('a-', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
broker.put_container('a-a', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
broker.put_container('a-a-a', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
broker.put_container('a-a-b', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
broker.put_container('a-b', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
broker.put_container('b', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
broker.put_container('b-a', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
broker.put_container('b-b', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
broker.put_container('c', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
listing = broker.list_containers_iter(15, None, None, None, None)
self.assertEqual(len(listing), 10)
self.assertEqual([row[0] for row in listing],
@ -437,9 +502,11 @@ class TestAccountBroker(unittest.TestCase):
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
broker.put_container('a', normalize_timestamp(1),
normalize_timestamp(0), 0, 0)
normalize_timestamp(0), 0, 0,
POLICIES.default.idx)
broker.put_container('b', normalize_timestamp(2),
normalize_timestamp(0), 0, 0)
normalize_timestamp(0), 0, 0,
POLICIES.default.idx)
hasha = hashlib.md5(
'%s-%s' % ('a', '0000000001.00000-0000000000.00000-0-0')
).digest()
@ -450,7 +517,8 @@ class TestAccountBroker(unittest.TestCase):
''.join(('%02x' % (ord(a) ^ ord(b)) for a, b in zip(hasha, hashb)))
self.assertEqual(broker.get_info()['hash'], hashc)
broker.put_container('b', normalize_timestamp(3),
normalize_timestamp(0), 0, 0)
normalize_timestamp(0), 0, 0,
POLICIES.default.idx)
hashb = hashlib.md5(
'%s-%s' % ('b', '0000000003.00000-0000000000.00000-0-0')
).digest()
@ -463,15 +531,18 @@ class TestAccountBroker(unittest.TestCase):
broker1.initialize(normalize_timestamp('1'))
broker2 = AccountBroker(':memory:', account='a')
broker2.initialize(normalize_timestamp('1'))
broker1.put_container('a', normalize_timestamp(1), 0, 0, 0)
broker1.put_container('b', normalize_timestamp(2), 0, 0, 0)
broker1.put_container('a', normalize_timestamp(1), 0, 0, 0,
POLICIES.default.idx)
broker1.put_container('b', normalize_timestamp(2), 0, 0, 0,
POLICIES.default.idx)
id = broker1.get_info()['id']
broker2.merge_items(broker1.get_items_since(
broker2.get_sync(id), 1000), id)
items = broker2.get_items_since(-1, 1000)
self.assertEqual(len(items), 2)
self.assertEqual(['a', 'b'], sorted([rec['name'] for rec in items]))
broker1.put_container('c', normalize_timestamp(3), 0, 0, 0)
broker1.put_container('c', normalize_timestamp(3), 0, 0, 0,
POLICIES.default.idx)
broker2.merge_items(broker1.get_items_since(
broker2.get_sync(id), 1000), id)
items = broker2.get_items_since(-1, 1000)
@ -479,6 +550,145 @@ class TestAccountBroker(unittest.TestCase):
self.assertEqual(['a', 'b', 'c'],
sorted([rec['name'] for rec in items]))
def test_load_old_pending_puts(self):
# pending puts from pre-storage-policy account brokers won't contain
# the storage policy index
tempdir = mkdtemp()
broker_path = os.path.join(tempdir, 'test-load-old.db')
try:
broker = AccountBroker(broker_path, account='real')
broker.initialize(normalize_timestamp(1))
with open(broker_path + '.pending', 'a+b') as pending:
pending.write(':')
pending.write(pickle.dumps(
# name, put_timestamp, delete_timestamp, object_count,
# bytes_used, deleted
('oldcon', normalize_timestamp(200),
normalize_timestamp(0),
896, 9216695, 0)).encode('base64'))
broker._commit_puts()
with broker.get() as conn:
results = list(conn.execute('''
SELECT name, storage_policy_index FROM container
'''))
self.assertEqual(len(results), 1)
self.assertEqual(dict(results[0]),
{'name': 'oldcon', 'storage_policy_index': 0})
finally:
rmtree(tempdir)
@patch_policies([StoragePolicy(0, 'zero', False),
StoragePolicy(1, 'one', True),
StoragePolicy(2, 'two', False),
StoragePolicy(3, 'three', False)])
def test_get_policy_stats(self):
ts = itertools.count()
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp(ts.next()))
# check empty policy_stats
self.assertTrue(broker.empty())
policy_stats = broker.get_policy_stats()
self.assertEqual(policy_stats, {})
# add some empty containers
for policy in POLICIES:
container_name = 'c-%s' % policy.name
put_timestamp = normalize_timestamp(ts.next())
broker.put_container(container_name,
put_timestamp, 0,
0, 0,
policy.idx)
policy_stats = broker.get_policy_stats()
stats = policy_stats[policy.idx]
self.assertEqual(stats['object_count'], 0)
self.assertEqual(stats['bytes_used'], 0)
# update the containers object & byte count
for policy in POLICIES:
container_name = 'c-%s' % policy.name
put_timestamp = normalize_timestamp(ts.next())
count = policy.idx * 100 # good as any integer
broker.put_container(container_name,
put_timestamp, 0,
count, count,
policy.idx)
policy_stats = broker.get_policy_stats()
stats = policy_stats[policy.idx]
self.assertEqual(stats['object_count'], count)
self.assertEqual(stats['bytes_used'], count)
# check all the policy_stats at once
for policy_index, stats in policy_stats.items():
policy = POLICIES[policy_index]
count = policy.idx * 100 # coupled with policy for test
self.assertEqual(stats['object_count'], count)
self.assertEqual(stats['bytes_used'], count)
# now delete the containers one by one
for policy in POLICIES:
container_name = 'c-%s' % policy.name
delete_timestamp = normalize_timestamp(ts.next())
broker.put_container(container_name,
0, delete_timestamp,
0, 0,
policy.idx)
policy_stats = broker.get_policy_stats()
stats = policy_stats[policy.idx]
self.assertEqual(stats['object_count'], 0)
self.assertEqual(stats['bytes_used'], 0)
@patch_policies([StoragePolicy(0, 'zero', False),
StoragePolicy(1, 'one', True)])
def test_policy_stats_tracking(self):
ts = itertools.count()
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp(ts.next()))
# policy 0
broker.put_container('con1', ts.next(), 0, 12, 2798641, 0)
broker.put_container('con1', ts.next(), 0, 13, 8156441, 0)
# policy 1
broker.put_container('con2', ts.next(), 0, 7, 5751991, 1)
broker.put_container('con2', ts.next(), 0, 8, 6085379, 1)
stats = broker.get_policy_stats()
self.assertEqual(len(stats), 2)
self.assertEqual(stats[0]['object_count'], 13)
self.assertEqual(stats[0]['bytes_used'], 8156441)
self.assertEqual(stats[1]['object_count'], 8)
self.assertEqual(stats[1]['bytes_used'], 6085379)
# Break encapsulation here to make sure that there's only 2 rows in
# the stats table. It's possible that there could be 4 rows (one per
# put_container) but that they came out in the right order so that
# get_policy_stats() collapsed them down to the right number. To prove
# that's not so, we have to go peek at the broker's internals.
with broker.get() as conn:
nrows = conn.execute(
"SELECT COUNT(*) FROM policy_stat").fetchall()[0][0]
self.assertEqual(nrows, 2)
def prespi_AccountBroker_initialize(self, conn, put_timestamp, **kwargs):
"""
The AccountBroker initialze() function before we added the
policy stat table. Used by test_policy_table_creation() to
make sure that the AccountBroker will correctly add the table
for cases where the DB existed before the policy suport was added.
:param conn: DB connection object
:param put_timestamp: put timestamp
"""
if not self.account:
raise ValueError(
'Attempting to create a new database with no account set')
self.create_container_table(conn)
self.create_account_stat_table(conn, put_timestamp)
def premetadata_create_account_stat_table(self, conn, put_timestamp):
"""
@ -543,3 +753,274 @@ class TestAccountBrokerBeforeMetadata(TestAccountBroker):
broker.initialize(normalize_timestamp('1'))
with broker.get() as conn:
conn.execute('SELECT metadata FROM account_stat')
def prespi_create_container_table(self, conn):
"""
Copied from AccountBroker before the sstoage_policy_index column was
added; used for testing with TestAccountBrokerBeforeSPI.
Create container table which is specific to the account DB.
:param conn: DB connection object
"""
conn.executescript("""
CREATE TABLE container (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
put_timestamp TEXT,
delete_timestamp TEXT,
object_count INTEGER,
bytes_used INTEGER,
deleted INTEGER DEFAULT 0
);
CREATE INDEX ix_container_deleted_name ON
container (deleted, name);
CREATE TRIGGER container_insert AFTER INSERT ON container
BEGIN
UPDATE account_stat
SET container_count = container_count + (1 - new.deleted),
object_count = object_count + new.object_count,
bytes_used = bytes_used + new.bytes_used,
hash = chexor(hash, new.name,
new.put_timestamp || '-' ||
new.delete_timestamp || '-' ||
new.object_count || '-' || new.bytes_used);
END;
CREATE TRIGGER container_update BEFORE UPDATE ON container
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
CREATE TRIGGER container_delete AFTER DELETE ON container
BEGIN
UPDATE account_stat
SET container_count = container_count - (1 - old.deleted),
object_count = object_count - old.object_count,
bytes_used = bytes_used - old.bytes_used,
hash = chexor(hash, old.name,
old.put_timestamp || '-' ||
old.delete_timestamp || '-' ||
old.object_count || '-' || old.bytes_used);
END;
""")
class TestAccountBrokerBeforeSPI(TestAccountBroker):
"""
Tests for AccountBroker against databases created before
the storage_policy_index column was added.
"""
def setUp(self):
self._imported_create_container_table = \
AccountBroker.create_container_table
AccountBroker.create_container_table = \
prespi_create_container_table
self._imported_initialize = AccountBroker._initialize
AccountBroker._initialize = prespi_AccountBroker_initialize
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
exc = None
with broker.get() as conn:
try:
conn.execute('SELECT storage_policy_index FROM container')
except BaseException as err:
exc = err
self.assert_('no such column: storage_policy_index' in str(exc))
with broker.get() as conn:
try:
conn.execute('SELECT * FROM policy_stat')
except sqlite3.OperationalError as err:
self.assert_('no such table: policy_stat' in str(err))
else:
self.fail('database created with policy_stat table')
def tearDown(self):
AccountBroker.create_container_table = \
self._imported_create_container_table
AccountBroker._initialize = self._imported_initialize
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
with broker.get() as conn:
conn.execute('SELECT storage_policy_index FROM container')
@with_tempdir
def test_policy_table_migration(self, tempdir):
db_path = os.path.join(tempdir, 'account.db')
# first init an acct DB without the policy_stat table present
broker = AccountBroker(db_path, account='a')
broker.initialize(normalize_timestamp('1'))
with broker.get() as conn:
try:
conn.execute('''
SELECT * FROM policy_stat
''').fetchone()[0]
except sqlite3.OperationalError as err:
# confirm that the table really isn't there
self.assert_('no such table: policy_stat' in str(err))
else:
self.fail('broker did not raise sqlite3.OperationalError '
'trying to select from policy_stat table!')
# make sure we can HEAD this thing w/o the table
stats = broker.get_policy_stats()
self.assertEqual(len(stats), 0)
# now do a PUT to create the table
broker.put_container('o', normalize_timestamp(time()), 0, 0, 0,
POLICIES.default.idx)
broker._commit_puts_stale_ok()
# now confirm that the table was created
with broker.get() as conn:
conn.execute('SELECT * FROM policy_stat')
stats = broker.get_policy_stats()
self.assertEqual(len(stats), 1)
@patch_policies
@with_tempdir
def test_container_table_migration(self, tempdir):
db_path = os.path.join(tempdir, 'account.db')
# first init an acct DB without the policy_stat table present
broker = AccountBroker(db_path, account='a')
broker.initialize(normalize_timestamp('1'))
with broker.get() as conn:
try:
conn.execute('''
SELECT storage_policy_index FROM container
''').fetchone()[0]
except sqlite3.OperationalError as err:
# confirm that the table doesn't have this column
self.assert_('no such column: storage_policy_index' in
str(err))
else:
self.fail('broker did not raise sqlite3.OperationalError '
'trying to select from storage_policy_index '
'from container table!')
# manually insert an existing row to avoid migration
with broker.get() as conn:
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', ('test_name', normalize_timestamp(time()), 0, 1, 2, 0))
conn.commit()
# make sure we can iter containers without the migration
for c in broker.list_containers_iter(1, None, None, None, None):
self.assertEqual(c, ('test_name', 1, 2, 0))
# stats table is mysteriously empty...
stats = broker.get_policy_stats()
self.assertEqual(len(stats), 0)
# now do a PUT with a different value for storage_policy_index
# which will update the DB schema as well as update policy_stats
# for legacy containers in the DB (those without an SPI)
other_policy = [p for p in POLICIES if p.idx != 0][0]
broker.put_container('test_second', normalize_timestamp(time()),
0, 3, 4, other_policy.idx)
broker._commit_puts_stale_ok()
with broker.get() as conn:
rows = conn.execute('''
SELECT name, storage_policy_index FROM container
''').fetchall()
for row in rows:
if row[0] == 'test_name':
self.assertEqual(row[1], 0)
else:
self.assertEqual(row[1], other_policy.idx)
# we should have stats for both containers
stats = broker.get_policy_stats()
self.assertEqual(len(stats), 2)
self.assertEqual(stats[0]['object_count'], 1)
self.assertEqual(stats[0]['bytes_used'], 2)
self.assertEqual(stats[1]['object_count'], 3)
self.assertEqual(stats[1]['bytes_used'], 4)
# now lets delete a container and make sure policy_stats is OK
with broker.get() as conn:
conn.execute('''
DELETE FROM container WHERE name = ?
''', ('test_name',))
conn.commit()
stats = broker.get_policy_stats()
self.assertEqual(len(stats), 2)
self.assertEqual(stats[0]['object_count'], 0)
self.assertEqual(stats[0]['bytes_used'], 0)
self.assertEqual(stats[1]['object_count'], 3)
self.assertEqual(stats[1]['bytes_used'], 4)
@with_tempdir
def test_half_upgraded_database(self, tempdir):
db_path = os.path.join(tempdir, 'account.db')
ts = itertools.count()
broker = AccountBroker(db_path, account='a')
broker.initialize(normalize_timestamp(ts.next()))
self.assertTrue(broker.empty())
# add a container (to pending file)
broker.put_container('c', normalize_timestamp(ts.next()), 0, 0, 0,
POLICIES.default.idx)
real_get = broker.get
called = []
@contextmanager
def mock_get():
with real_get() as conn:
def mock_executescript(script):
if called:
raise Exception('kaboom!')
called.append(script)
conn.executescript = mock_executescript
yield conn
broker.get = mock_get
try:
broker._commit_puts()
except Exception:
pass
else:
self.fail('mock exception was not raised')
self.assertEqual(len(called), 1)
self.assert_('CREATE TABLE policy_stat' in called[0])
# nothing was commited
broker = AccountBroker(db_path, account='a')
with broker.get() as conn:
try:
conn.execute('SELECT * FROM policy_stat')
except sqlite3.OperationalError as err:
self.assert_('no such table: policy_stat' in str(err))
else:
self.fail('half upgraded database!')
container_count = conn.execute(
'SELECT count(*) FROM container').fetchone()[0]
self.assertEqual(container_count, 0)
# try again to commit puts
self.assertFalse(broker.empty())
# full migration successful
with broker.get() as conn:
conn.execute('SELECT * FROM policy_stat')
conn.execute('SELECT storage_policy_index FROM container')

View File

@ -13,18 +13,132 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import unittest
import shutil
from swift.account import replicator, backend, server
from swift.common.utils import normalize_timestamp
from swift.common.storage_policy import POLICIES
from test.unit.common import test_db_replicator
class TestReplicator(unittest.TestCase):
"""
swift.account.replicator is currently just a subclass with some class
variables overridden, but at least this test stub will ensure proper Python
syntax.
"""
class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
def test_placeholder(self):
pass
backend = backend.AccountBroker
datadir = server.DATADIR
replicator_daemon = replicator.AccountReplicator
def test_sync(self):
broker = self._get_broker('a', node_index=0)
put_timestamp = normalize_timestamp(time.time())
broker.initialize(put_timestamp)
# "replicate" to same database
daemon = replicator.AccountReplicator({})
part, node = self._get_broker_part_node(broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
# nothing to do
self.assertTrue(success)
self.assertEqual(1, daemon.stats['no_change'])
def test_sync_remote_missing(self):
broker = self._get_broker('a', node_index=0)
put_timestamp = time.time()
broker.initialize(put_timestamp)
# "replicate" to all other nodes
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
# complete rsync
self.assertEqual(2, daemon.stats['rsync'])
local_info = self._get_broker(
'a', node_index=0).get_info()
for i in range(1, 3):
remote_broker = self._get_broker('a', node_index=i)
self.assertTrue(os.path.exists(remote_broker.db_file))
remote_info = remote_broker.get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
def test_sync_remote_missing_most_rows(self):
put_timestamp = time.time()
# create "local" broker
broker = self._get_broker('a', node_index=0)
broker.initialize(put_timestamp)
# create "remote" broker
remote_broker = self._get_broker('a', node_index=1)
remote_broker.initialize(put_timestamp)
# add a row to "local" db
broker.put_container('/a/c', time.time(), 0, 0, 0,
POLICIES.default.idx)
#replicate
daemon = replicator.AccountReplicator({})
def _rsync_file(db_file, remote_file, **kwargs):
remote_server, remote_path = remote_file.split('/', 1)
dest_path = os.path.join(self.root, remote_path)
shutil.copy(db_file, dest_path)
return True
daemon._rsync_file = _rsync_file
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
self.assertTrue(success)
# row merge
self.assertEqual(1, daemon.stats['remote_merge'])
local_info = self._get_broker(
'a', node_index=0).get_info()
remote_info = self._get_broker(
'a', node_index=1).get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
def test_sync_remote_missing_one_rows(self):
put_timestamp = time.time()
# create "local" broker
broker = self._get_broker('a', node_index=0)
broker.initialize(put_timestamp)
# create "remote" broker
remote_broker = self._get_broker('a', node_index=1)
remote_broker.initialize(put_timestamp)
# add some rows to both db
for i in range(10):
put_timestamp = time.time()
for db in (broker, remote_broker):
path = '/a/c_%s' % i
db.put_container(path, put_timestamp, 0, 0, 0,
POLICIES.default.idx)
# now a row to the "local" broker only
broker.put_container('/a/c_missing', time.time(), 0, 0, 0,
POLICIES.default.idx)
# replicate
daemon = replicator.AccountReplicator({})
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
self.assertTrue(success)
# row merge
self.assertEqual(1, daemon.stats['diff'])
local_info = self._get_broker(
'a', node_index=0).get_info()
remote_info = self._get_broker(
'a', node_index=1).get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
if __name__ == '__main__':

View File

@ -22,6 +22,8 @@ from shutil import rmtree
from StringIO import StringIO
from time import gmtime
from test.unit import FakeLogger
import itertools
import random
import simplejson
import xml.dom.minidom
@ -31,8 +33,11 @@ from swift.common import constraints
from swift.account.server import AccountController
from swift.common.utils import normalize_timestamp, replication, public
from swift.common.request_helpers import get_sys_meta_prefix
from test.unit import patch_policies
from swift.common.storage_policy import StoragePolicy, POLICIES, POLICY_INDEX
@patch_policies
class TestAccountController(unittest.TestCase):
"""Test swift.account.server.AccountController"""
def setUp(self):
@ -1670,6 +1675,182 @@ class TestAccountController(unittest.TestCase):
[(('1.2.3.4 - - [01/Jan/1970:02:46:41 +0000] "HEAD /sda1/p/a" 404 '
'- "-" "-" "-" 2.0000 "-"',), {})])
def test_policy_stats_with_legacy(self):
ts = itertools.count()
# create the account
req = Request.blank('/sda1/p/a', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next())})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity
# add a container
req = Request.blank('/sda1/p/a/c1', method='PUT', headers={
'X-Put-Timestamp': normalize_timestamp(ts.next()),
'X-Delete-Timestamp': '0',
'X-Object-Count': '2',
'X-Bytes-Used': '4',
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201)
# read back rollup
for method in ('GET', 'HEAD'):
req = Request.blank('/sda1/p/a', method=method)
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int // 100, 2)
self.assertEquals(resp.headers['X-Account-Object-Count'], '2')
self.assertEquals(resp.headers['X-Account-Bytes-Used'], '4')
self.assertEquals(
resp.headers['X-Account-Storage-Policy-%s-Object-Count' %
POLICIES[0].name], '2')
self.assertEquals(
resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' %
POLICIES[0].name], '4')
def test_policy_stats_non_default(self):
ts = itertools.count()
# create the account
req = Request.blank('/sda1/p/a', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next())})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity
# add a container
non_default_policies = [p for p in POLICIES if not p.is_default]
policy = random.choice(non_default_policies)
req = Request.blank('/sda1/p/a/c1', method='PUT', headers={
'X-Put-Timestamp': normalize_timestamp(ts.next()),
'X-Delete-Timestamp': '0',
'X-Object-Count': '2',
'X-Bytes-Used': '4',
POLICY_INDEX: policy.idx,
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201)
# read back rollup
for method in ('GET', 'HEAD'):
req = Request.blank('/sda1/p/a', method=method)
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int // 100, 2)
self.assertEquals(resp.headers['X-Account-Object-Count'], '2')
self.assertEquals(resp.headers['X-Account-Bytes-Used'], '4')
self.assertEquals(
resp.headers['X-Account-Storage-Policy-%s-Object-Count' %
policy.name], '2')
self.assertEquals(
resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' %
policy.name], '4')
def test_empty_policy_stats(self):
ts = itertools.count()
# create the account
req = Request.blank('/sda1/p/a', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next())})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity
for method in ('GET', 'HEAD'):
req = Request.blank('/sda1/p/a', method=method)
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int // 100, 2)
for key in resp.headers:
self.assert_('storage-policy' not in key.lower())
def test_empty_except_for_used_policies(self):
ts = itertools.count()
# create the account
req = Request.blank('/sda1/p/a', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next())})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity
# starts empty
for method in ('GET', 'HEAD'):
req = Request.blank('/sda1/p/a', method=method)
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int // 100, 2)
for key in resp.headers:
self.assert_('storage-policy' not in key.lower())
# add a container
policy = random.choice(POLICIES)
req = Request.blank('/sda1/p/a/c1', method='PUT', headers={
'X-Put-Timestamp': normalize_timestamp(ts.next()),
'X-Delete-Timestamp': '0',
'X-Object-Count': '2',
'X-Bytes-Used': '4',
POLICY_INDEX: policy.idx,
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201)
# only policy of the created container should be in headers
for method in ('GET', 'HEAD'):
req = Request.blank('/sda1/p/a', method=method)
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int // 100, 2)
for key in resp.headers:
if 'storage-policy' in key.lower():
self.assert_(policy.name.lower() in key.lower())
def test_multiple_policies_in_use(self):
ts = itertools.count()
# create the account
req = Request.blank('/sda1/p/a', method='PUT', headers={
'X-Timestamp': normalize_timestamp(ts.next())})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity
# add some containers
for policy in POLICIES:
count = policy.idx * 100 # good as any integer
container_path = '/sda1/p/a/c_%s' % policy.name
req = Request.blank(
container_path, method='PUT', headers={
'X-Put-Timestamp': normalize_timestamp(ts.next()),
'X-Delete-Timestamp': '0',
'X-Object-Count': count,
'X-Bytes-Used': count,
POLICY_INDEX: policy.idx,
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201)
req = Request.blank('/sda1/p/a', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int // 100, 2)
# check container counts in roll up headers
total_object_count = 0
total_bytes_used = 0
for key in resp.headers:
if 'storage-policy' not in key.lower():
continue
for policy in POLICIES:
if policy.name.lower() not in key.lower():
continue
if key.lower().endswith('object-count'):
object_count = int(resp.headers[key])
self.assertEqual(policy.idx * 100, object_count)
total_object_count += object_count
if key.lower().endswith('bytes-used'):
bytes_used = int(resp.headers[key])
self.assertEqual(policy.idx * 100, bytes_used)
total_bytes_used += bytes_used
expected_total_count = sum([p.idx * 100 for p in POLICIES])
self.assertEqual(expected_total_count, total_object_count)
self.assertEqual(expected_total_count, total_bytes_used)
@patch_policies([StoragePolicy(0, 'zero', False),
StoragePolicy(1, 'one', True),
StoragePolicy(2, 'two', False),
StoragePolicy(3, 'three', False)])
class TestNonLegacyDefaultStoragePolicy(TestAccountController):
pass
if __name__ == '__main__':
unittest.main()

View File

@ -2268,6 +2268,7 @@ class TestContainerController(unittest.TestCase):
'x-delete-timestamp': '0',
'x-object-count': 0,
'x-put-timestamp': '0000012345.00000',
POLICY_INDEX: '%s' % POLICIES.default.idx,
'referer': 'PUT http://localhost/sda1/p/a/c',
'user-agent': 'container-server %d' % os.getpid(),
'x-trans-id': '-'})})
@ -2285,6 +2286,7 @@ class TestContainerController(unittest.TestCase):
'x-delete-timestamp': '0',
'x-object-count': 0,
'x-put-timestamp': '0000012345.00000',
POLICY_INDEX: '%s' % POLICIES.default.idx,
'referer': 'PUT http://localhost/sda1/p/a/c',
'user-agent': 'container-server %d' % os.getpid(),
'x-trans-id': '-'})})