Refactoring, test infrastructure changes and cleanup

...in preparation for the container sharding feature.

Co-Authored-By: Matthew Oliver <matt@oliver.net.au>
Co-Authored-By: Tim Burke <tim.burke@gmail.com>
Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>

Change-Id: I4455677abb114a645cff93cd41b394d227e805de
This commit is contained in:
Alistair Coles 2018-05-01 15:12:05 +01:00
parent c94acbcd23
commit 9d742b85ad
19 changed files with 1085 additions and 339 deletions

View File

@ -22,7 +22,7 @@ import six.moves.cPickle as pickle
import sqlite3
from swift.common.utils import Timestamp
from swift.common.db import DatabaseBroker, utf8encode
from swift.common.db import DatabaseBroker, utf8encode, zero_like
DATADIR = 'accounts'
@ -233,7 +233,7 @@ class AccountBroker(DatabaseBroker):
with self.get() as conn:
row = conn.execute(
'SELECT container_count from account_stat').fetchone()
return (row[0] == 0)
return zero_like(row[0])
def make_tuple_for_pickle(self, record):
return (record['name'], record['put_timestamp'],
@ -254,7 +254,7 @@ class AccountBroker(DatabaseBroker):
:param storage_policy_index: the storage policy for this container
"""
if Timestamp(delete_timestamp) > Timestamp(put_timestamp) and \
object_count in (None, '', 0, '0'):
zero_like(object_count):
deleted = 1
else:
deleted = 0
@ -273,8 +273,7 @@ class AccountBroker(DatabaseBroker):
:returns: True if the DB is considered to be deleted, False otherwise
"""
return status == 'DELETED' or (
container_count in (None, '', 0, '0') and
return status == 'DELETED' or zero_like(container_count) and (
Timestamp(delete_timestamp) > Timestamp(put_timestamp))
def _is_deleted(self, conn):
@ -509,7 +508,7 @@ class AccountBroker(DatabaseBroker):
record[2] = row[2]
# If deleted, mark as such
if Timestamp(record[2]) > Timestamp(record[1]) and \
record[3] in (None, '', 0, '0'):
zero_like(record[3]):
record[5] = 1
else:
record[5] = 0

View File

@ -71,6 +71,18 @@ def native_str_keys(metadata):
metadata[k.decode('utf-8')] = sv
ZERO_LIKE_VALUES = {None, '', 0, '0'}
def zero_like(count):
"""
We've cargo culted our consumers to be tolerant of various expressions of
zero in our databases for backwards compatibility with less disciplined
producers.
"""
return count in ZERO_LIKE_VALUES
def _db_timeout(timeout, db_file, call):
with LockTimeout(timeout, db_file):
retry_wait = 0.001
@ -208,11 +220,27 @@ class DatabaseBroker(object):
def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None,
account=None, container=None, pending_timeout=None,
stale_reads_ok=False):
"""Encapsulates working with a database."""
stale_reads_ok=False, skip_commits=False):
"""Encapsulates working with a database.
:param db_file: path to a database file.
:param timeout: timeout used for database operations.
:param logger: a logger instance.
:param account: name of account.
:param container: name of container.
:param pending_timeout: timeout used when attempting to take a lock to
write to pending file.
:param stale_reads_ok: if True then no error is raised if pending
commits cannot be committed before the database is read, otherwise
an error is raised.
:param skip_commits: if True then this broker instance will never
commit records from the pending file to the database;
:meth:`~swift.common.db.DatabaseBroker.put_record` should not
called on brokers with skip_commits True.
"""
self.conn = None
self.db_file = db_file
self.pending_file = self.db_file + '.pending'
self._db_file = db_file
self.pending_file = self._db_file + '.pending'
self.pending_timeout = pending_timeout or 10
self.stale_reads_ok = stale_reads_ok
self.db_dir = os.path.dirname(db_file)
@ -221,6 +249,7 @@ class DatabaseBroker(object):
self.account = account
self.container = container
self._db_version = -1
self.skip_commits = skip_commits
def __str__(self):
"""
@ -240,9 +269,9 @@ class DatabaseBroker(object):
:param put_timestamp: internalized timestamp of initial PUT request
:param storage_policy_index: only required for containers
"""
if self.db_file == ':memory:':
if self._db_file == ':memory:':
tmp_db_file = None
conn = get_db_connection(self.db_file, self.timeout)
conn = get_db_connection(self._db_file, self.timeout)
else:
mkdirs(self.db_dir)
fd, tmp_db_file = mkstemp(suffix='.tmp', dir=self.db_dir)
@ -329,15 +358,22 @@ class DatabaseBroker(object):
self._delete_db(conn, timestamp)
conn.commit()
@property
def db_file(self):
return self._db_file
def get_device_path(self):
suffix_path = os.path.dirname(self.db_dir)
partition_path = os.path.dirname(suffix_path)
dbs_path = os.path.dirname(partition_path)
return os.path.dirname(dbs_path)
def quarantine(self, reason):
"""
The database will be quarantined and a
sqlite3.DatabaseError will be raised indicating the action taken.
"""
prefix_path = os.path.dirname(self.db_dir)
partition_path = os.path.dirname(prefix_path)
dbs_path = os.path.dirname(partition_path)
device_path = os.path.dirname(dbs_path)
device_path = self.get_device_path()
quar_path = os.path.join(device_path, 'quarantined',
self.db_type + 's',
os.path.basename(self.db_dir))
@ -377,6 +413,20 @@ class DatabaseBroker(object):
self.quarantine(exc_hint)
@contextmanager
def updated_timeout(self, new_timeout):
"""Use with "with" statement; updates ``timeout`` within the block."""
old_timeout = self.timeout
try:
self.timeout = new_timeout
if self.conn:
self.conn.timeout = new_timeout
yield old_timeout
finally:
self.timeout = old_timeout
if self.conn:
self.conn.timeout = old_timeout
@contextmanager
def get(self):
"""Use with the "with" statement; returns a database connection."""
@ -477,6 +527,23 @@ class DatabaseBroker(object):
with self.get() as conn:
return self._is_deleted(conn)
def empty(self):
"""
Check if the broker abstraction contains any undeleted records.
"""
raise NotImplementedError()
def is_reclaimable(self, now, reclaim_age):
"""
Check if the broker abstraction is empty, and has been marked deleted
for at least a reclaim age.
"""
info = self.get_replication_info()
return (zero_like(info['count']) and
(Timestamp(now - reclaim_age) >
Timestamp(info['delete_timestamp']) >
Timestamp(info['put_timestamp'])))
def merge_timestamps(self, created_at, put_timestamp, delete_timestamp):
"""
Used in replication to handle updating timestamps.
@ -548,13 +615,15 @@ class DatabaseBroker(object):
result.append({'remote_id': row[0], 'sync_point': row[1]})
return result
def get_max_row(self):
def get_max_row(self, table=None):
if not table:
table = self.db_contains_type
query = '''
SELECT SQLITE_SEQUENCE.seq
FROM SQLITE_SEQUENCE
WHERE SQLITE_SEQUENCE.name == '%s'
LIMIT 1
''' % (self.db_contains_type)
''' % (table, )
with self.get() as conn:
row = conn.execute(query).fetchone()
return row[0] if row else -1
@ -582,11 +651,26 @@ class DatabaseBroker(object):
return curs.fetchone()
def put_record(self, record):
if self.db_file == ':memory:':
"""
Put a record into the DB. If the DB has an associated pending file with
space then the record is appended to that file and a commit to the DB
is deferred. If the DB is in-memory or its pending file is full then
the record will be committed immediately.
:param record: a record to be added to the DB.
:raises DatabaseConnectionError: if the DB file does not exist or if
``skip_commits`` is True.
:raises LockTimeout: if a timeout occurs while waiting to take a lock
to write to the pending file.
"""
if self._db_file == ':memory:':
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
if self.skip_commits:
raise DatabaseConnectionError(self.db_file,
'commits not accepted')
with lock_parent_directory(self.pending_file, self.pending_timeout):
pending_size = 0
try:
@ -606,6 +690,10 @@ class DatabaseBroker(object):
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
def _skip_commit_puts(self):
return (self._db_file == ':memory:' or self.skip_commits or not
os.path.exists(self.pending_file))
def _commit_puts(self, item_list=None):
"""
Scan for .pending files and commit the found records by feeding them
@ -614,7 +702,13 @@ class DatabaseBroker(object):
:param item_list: A list of items to commit in addition to .pending
"""
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
if self._skip_commit_puts():
if item_list:
# this broker instance should not be used to commit records,
# but if it is then raise an error rather than quietly
# discarding the records in item_list.
raise DatabaseConnectionError(self.db_file,
'commits not accepted')
return
if item_list is None:
item_list = []
@ -645,7 +739,7 @@ class DatabaseBroker(object):
Catch failures of _commit_puts() if broker is intended for
reading of stats, and thus does not care for pending updates.
"""
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
if self._skip_commit_puts():
return
try:
with lock_parent_directory(self.pending_file,
@ -663,6 +757,12 @@ class DatabaseBroker(object):
"""
raise NotImplementedError
def merge_items(self, item_list, source=None):
"""
Save :param:item_list to the database.
"""
raise NotImplementedError
def make_tuple_for_pickle(self, record):
"""
Turn this db record dict into the format this service uses for
@ -701,7 +801,7 @@ class DatabaseBroker(object):
within 512k of a boundary, it allocates to the next boundary.
Boundaries are 2m, 5m, 10m, 25m, 50m, then every 50m after.
"""
if not DB_PREALLOCATION or self.db_file == ':memory:':
if not DB_PREALLOCATION or self._db_file == ':memory:':
return
MB = (1024 * 1024)
@ -830,40 +930,46 @@ class DatabaseBroker(object):
def reclaim(self, age_timestamp, sync_timestamp):
"""
Delete rows from the db_contains_type table that are marked deleted
and whose created_at timestamp is < age_timestamp. Also deletes rows
from incoming_sync and outgoing_sync where the updated_at timestamp is
< sync_timestamp.
Delete reclaimable rows and metadata from the db.
In addition, this calls the DatabaseBroker's :func:`_reclaim` method.
By default this method will delete rows from the db_contains_type table
that are marked deleted and whose created_at timestamp is <
age_timestamp, and deletes rows from incoming_sync and outgoing_sync
where the updated_at timestamp is < sync_timestamp. In addition, this
calls the :meth:`_reclaim_metadata` method.
Subclasses may reclaim other items by overriding :meth:`_reclaim`.
:param age_timestamp: max created_at timestamp of object rows to delete
:param sync_timestamp: max update_at timestamp of sync rows to delete
"""
if self.db_file != ':memory:' and os.path.exists(self.pending_file):
if not self._skip_commit_puts():
with lock_parent_directory(self.pending_file,
self.pending_timeout):
self._commit_puts()
with self.get() as conn:
conn.execute('''
DELETE FROM %s WHERE deleted = 1 AND %s < ?
''' % (self.db_contains_type, self.db_reclaim_timestamp),
(age_timestamp,))
try:
conn.execute('''
DELETE FROM outgoing_sync WHERE updated_at < ?
''', (sync_timestamp,))
conn.execute('''
DELETE FROM incoming_sync WHERE updated_at < ?
''', (sync_timestamp,))
except sqlite3.OperationalError as err:
# Old dbs didn't have updated_at in the _sync tables.
if 'no such column: updated_at' not in str(err):
raise
DatabaseBroker._reclaim(self, conn, age_timestamp)
self._reclaim(conn, age_timestamp, sync_timestamp)
self._reclaim_metadata(conn, age_timestamp)
conn.commit()
def _reclaim(self, conn, timestamp):
def _reclaim(self, conn, age_timestamp, sync_timestamp):
conn.execute('''
DELETE FROM %s WHERE deleted = 1 AND %s < ?
''' % (self.db_contains_type, self.db_reclaim_timestamp),
(age_timestamp,))
try:
conn.execute('''
DELETE FROM outgoing_sync WHERE updated_at < ?
''', (sync_timestamp,))
conn.execute('''
DELETE FROM incoming_sync WHERE updated_at < ?
''', (sync_timestamp,))
except sqlite3.OperationalError as err:
# Old dbs didn't have updated_at in the _sync tables.
if 'no such column: updated_at' not in str(err):
raise
def _reclaim_metadata(self, conn, timestamp):
"""
Removes any empty metadata values older than the timestamp using the
given database connection. This function will not call commit on the

View File

@ -33,10 +33,11 @@ from swift.common.direct_client import quote
from swift.common.utils import get_logger, whataremyips, storage_directory, \
renamer, mkdirs, lock_parent_directory, config_true_value, \
unlink_older_than, dump_recon_cache, rsync_module_interpolation, \
json, Timestamp, parse_override_options, round_robin_iter, Everything
json, parse_override_options, round_robin_iter, Everything
from swift.common import ring
from swift.common.ring.utils import is_local_device
from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE
from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE, \
is_success
from swift.common.bufferedhttp import BufferedHTTPConnection
from swift.common.exceptions import DriveNotMounted
from swift.common.daemon import Daemon
@ -87,11 +88,14 @@ def roundrobin_datadirs(datadirs):
found (in their proper places). The partitions within each data
dir are walked randomly, however.
:param datadirs: a list of (path, node_id, partition_filter) to walk
:returns: A generator of (partition, path_to_db_file, node_id)
:param datadirs: a list of tuples of (path, context, partition_filter) to
walk. The context may be any object; the context is not
used by this function but is included with each yielded
tuple.
:returns: A generator of (partition, path_to_db_file, context)
"""
def walk_datadir(datadir, node_id, part_filter):
def walk_datadir(datadir, context, part_filter):
partitions = [pd for pd in os.listdir(datadir)
if looks_like_partition(pd) and part_filter(pd)]
random.shuffle(partitions)
@ -117,7 +121,7 @@ def roundrobin_datadirs(datadirs):
continue
object_file = os.path.join(hash_dir, hsh + '.db')
if os.path.exists(object_file):
yield (partition, object_file, node_id)
yield (partition, object_file, context)
else:
try:
os.rmdir(hash_dir)
@ -125,8 +129,8 @@ def roundrobin_datadirs(datadirs):
if e.errno != errno.ENOTEMPTY:
raise
its = [walk_datadir(datadir, node_id, filt)
for datadir, node_id, filt in datadirs]
its = [walk_datadir(datadir, context, filt)
for datadir, context, filt in datadirs]
rr_its = round_robin_iter(its)
for datadir in rr_its:
@ -312,6 +316,16 @@ class Replicator(Daemon):
response = http.replicate(replicate_method, local_id)
return response and 200 <= response.status < 300
def _send_merge_items(self, http, local_id, items):
with Timeout(self.node_timeout):
response = http.replicate('merge_items', items, local_id)
if not response or not is_success(response.status):
if response:
self.logger.error('ERROR Bad response %s from %s',
response.status, http.host)
return False
return True
def _usync_db(self, point, broker, http, remote_id, local_id):
"""
Sync a db by sending all records since the last sync.
@ -326,26 +340,28 @@ class Replicator(Daemon):
"""
self.stats['diff'] += 1
self.logger.increment('diffs')
self.logger.debug('Syncing chunks with %s, starting at %s',
http.host, point)
self.logger.debug('%s usyncing chunks to %s, starting at row %s',
broker.db_file,
'%(ip)s:%(port)s/%(device)s' % http.node,
point)
start = time.time()
sync_table = broker.get_syncs()
objects = broker.get_items_since(point, self.per_diff)
diffs = 0
while len(objects) and diffs < self.max_diffs:
diffs += 1
with Timeout(self.node_timeout):
response = http.replicate('merge_items', objects, local_id)
if not response or response.status >= 300 or response.status < 200:
if response:
self.logger.error(_('ERROR Bad response %(status)s from '
'%(host)s'),
{'status': response.status,
'host': http.host})
if not self._send_merge_items(http, local_id, objects):
return False
# replication relies on db order to send the next merge batch in
# order with no gaps
point = objects[-1]['ROWID']
objects = broker.get_items_since(point, self.per_diff)
self.logger.debug('%s usyncing chunks to %s, finished at row %s (%gs)',
broker.db_file,
'%(ip)s:%(port)s/%(device)s' % http.node,
point, time.time() - start)
if objects:
self.logger.debug(
'Synchronization for %s has fallen more than '
@ -449,32 +465,79 @@ class Replicator(Daemon):
if rinfo.get('metadata', ''):
broker.update_metadata(json.loads(rinfo['metadata']))
if self._in_sync(rinfo, info, broker, local_sync):
self.logger.debug('%s in sync with %s, nothing to do',
broker.db_file,
'%(ip)s:%(port)s/%(device)s' % node)
return True
# if the difference in rowids between the two differs by
# more than 50% and the difference is greater than per_diff,
# rsync then do a remote merge.
# NOTE: difference > per_diff stops us from dropping to rsync
# on smaller containers, who have only a few rows to sync.
if rinfo['max_row'] / float(info['max_row']) < 0.5 and \
info['max_row'] - rinfo['max_row'] > self.per_diff:
self.stats['remote_merge'] += 1
self.logger.increment('remote_merges')
return self._rsync_db(broker, node, http, info['id'],
replicate_method='rsync_then_merge',
replicate_timeout=(info['count'] / 2000),
different_region=different_region)
# else send diffs over to the remote server
return self._usync_db(max(rinfo['point'], local_sync),
broker, http, rinfo['id'], info['id'])
return self._choose_replication_mode(
node, rinfo, info, local_sync, broker, http,
different_region)
return False
def _choose_replication_mode(self, node, rinfo, info, local_sync, broker,
http, different_region):
# if the difference in rowids between the two differs by
# more than 50% and the difference is greater than per_diff,
# rsync then do a remote merge.
# NOTE: difference > per_diff stops us from dropping to rsync
# on smaller containers, who have only a few rows to sync.
if (rinfo['max_row'] / float(info['max_row']) < 0.5 and
info['max_row'] - rinfo['max_row'] > self.per_diff):
self.stats['remote_merge'] += 1
self.logger.increment('remote_merges')
return self._rsync_db(broker, node, http, info['id'],
replicate_method='rsync_then_merge',
replicate_timeout=(info['count'] / 2000),
different_region=different_region)
# else send diffs over to the remote server
return self._usync_db(max(rinfo['point'], local_sync),
broker, http, rinfo['id'], info['id'])
def _post_replicate_hook(self, broker, info, responses):
"""
:param broker: the container that just replicated
:param broker: broker instance for the database that just replicated
:param info: pre-replication full info dict
:param responses: a list of bools indicating success from nodes
"""
pass
def cleanup_post_replicate(self, broker, orig_info, responses):
"""
Cleanup non primary database from disk if needed.
:param broker: the broker for the database we're replicating
:param orig_info: snapshot of the broker replication info dict taken
before replication
:param responses: a list of boolean success values for each replication
request to other nodes
:return success: returns False if deletion of the database was
attempted but unsuccessful, otherwise returns True.
"""
log_template = 'Not deleting db %s (%%s)' % broker.db_file
max_row_delta = broker.get_max_row() - orig_info['max_row']
if max_row_delta < 0:
reason = 'negative max_row_delta: %s' % max_row_delta
self.logger.error(log_template, reason)
return True
if max_row_delta:
reason = '%s new rows' % max_row_delta
self.logger.debug(log_template, reason)
return True
if not (responses and all(responses)):
reason = '%s/%s success' % (responses.count(True), len(responses))
self.logger.debug(log_template, reason)
return True
# If the db has been successfully synced to all of its peers, it can be
# removed. Callers should have already checked that the db is not on a
# primary node.
if not self.delete_db(broker):
self.logger.debug(
'Failed to delete db %s', broker.db_file)
return False
self.logger.debug('Successfully deleted db %s', broker.db_file)
return True
def _replicate_object(self, partition, object_file, node_id):
"""
Replicate the db, choosing method based on whether or not it
@ -483,12 +546,20 @@ class Replicator(Daemon):
:param partition: partition to be replicated to
:param object_file: DB file name to be replicated
:param node_id: node id of the node to be replicated to
:returns: a tuple (success, responses). ``success`` is a boolean that
is True if the method completed successfully, False otherwise.
``responses`` is a list of booleans each of which indicates the
success or not of replicating to a peer node if replication has
been attempted. ``success`` is False if any of ``responses`` is
False; when ``responses`` is empty, ``success`` may be either True
or False.
"""
start_time = now = time.time()
self.logger.debug('Replicating db %s', object_file)
self.stats['attempted'] += 1
self.logger.increment('attempts')
shouldbehere = True
responses = []
try:
broker = self.brokerclass(object_file, pending_timeout=30)
broker.reclaim(now - self.reclaim_age,
@ -518,18 +589,12 @@ class Replicator(Daemon):
failure_dev['device'])
for failure_dev in nodes])
self.logger.increment('failures')
return
# The db is considered deleted if the delete_timestamp value is greater
# than the put_timestamp, and there are no objects.
delete_timestamp = Timestamp(info.get('delete_timestamp') or 0)
put_timestamp = Timestamp(info.get('put_timestamp') or 0)
if (now - self.reclaim_age) > delete_timestamp > put_timestamp and \
info['count'] in (None, '', 0, '0'):
return False, responses
if broker.is_reclaimable(now, self.reclaim_age):
if self.report_up_to_date(info):
self.delete_db(broker)
self.logger.timing_since('timing', start_time)
return
responses = []
return True, responses
failure_devs_info = set()
nodes = self.ring.get_part_nodes(int(partition))
local_dev = None
@ -587,14 +652,11 @@ class Replicator(Daemon):
except (Exception, Timeout):
self.logger.exception('UNHANDLED EXCEPTION: in post replicate '
'hook for %s', broker.db_file)
if not shouldbehere and responses and all(responses):
# If the db shouldn't be on this node and has been successfully
# synced to all of its peers, it can be removed.
if not self.delete_db(broker):
if not shouldbehere:
if not self.cleanup_post_replicate(broker, info, responses):
failure_devs_info.update(
[(failure_dev['replication_ip'], failure_dev['device'])
for failure_dev in repl_nodes])
target_devs_info = set([(target_dev['replication_ip'],
target_dev['device'])
for target_dev in repl_nodes])
@ -602,6 +664,9 @@ class Replicator(Daemon):
self._add_failure_stats(failure_devs_info)
self.logger.timing_since('timing', start_time)
if shouldbehere:
responses.append(True)
return all(responses), responses
def delete_db(self, broker):
object_file = broker.db_file
@ -746,6 +811,9 @@ class ReplicatorRpc(object):
self.mount_check = mount_check
self.logger = logger or get_logger({}, log_route='replicator-rpc')
def _db_file_exists(self, db_path):
return os.path.exists(db_path)
def dispatch(self, replicate_args, args):
if not hasattr(args, 'pop'):
return HTTPBadRequest(body='Invalid object type')
@ -764,7 +832,7 @@ class ReplicatorRpc(object):
# someone might be about to rsync a db to us,
# make sure there's a tmp dir to receive it.
mkdirs(os.path.join(self.root, drive, 'tmp'))
if not os.path.exists(db_file):
if not self._db_file_exists(db_file):
return HTTPNotFound()
return getattr(self, op)(self.broker_class(db_file), args)
@ -872,12 +940,17 @@ class ReplicatorRpc(object):
renamer(old_filename, db_file)
return HTTPNoContent()
def _abort_rsync_then_merge(self, db_file, tmp_filename):
return not (self._db_file_exists(db_file) and
os.path.exists(tmp_filename))
def rsync_then_merge(self, drive, db_file, args):
old_filename = os.path.join(self.root, drive, 'tmp', args[0])
if not os.path.exists(db_file) or not os.path.exists(old_filename):
tmp_filename = os.path.join(self.root, drive, 'tmp', args[0])
if self._abort_rsync_then_merge(db_file, tmp_filename):
return HTTPNotFound()
new_broker = self.broker_class(old_filename)
new_broker = self.broker_class(tmp_filename)
existing_broker = self.broker_class(db_file)
db_file = existing_broker.db_file
point = -1
objects = existing_broker.get_items_since(point, 1000)
while len(objects):
@ -885,9 +958,12 @@ class ReplicatorRpc(object):
point = objects[-1]['ROWID']
objects = existing_broker.get_items_since(point, 1000)
sleep()
new_broker.merge_syncs(existing_broker.get_syncs())
new_broker.newid(args[0])
new_broker.update_metadata(existing_broker.metadata)
renamer(old_filename, db_file)
if self._abort_rsync_then_merge(db_file, tmp_filename):
return HTTPNotFound()
renamer(tmp_filename, db_file)
return HTTPNoContent()
# Footnote [1]:

View File

@ -45,6 +45,9 @@ from swift.common.utils import capture_stdio, disable_fallocate, \
validate_configuration, get_hub, config_auto_int_value, \
reiterate
SIGNUM_TO_NAME = {getattr(signal, n): n for n in dir(signal)
if n.startswith('SIG') and '_' not in n}
# Set maximum line size of message headers to be accepted.
wsgi.MAX_HEADER_LINE = constraints.MAX_HEADER_SIZE
@ -559,7 +562,8 @@ class WorkersStrategy(object):
:param int pid: The new worker process' PID
"""
self.logger.notice('Started child %s' % pid)
self.logger.notice('Started child %s from parent %s',
pid, os.getpid())
self.children.append(pid)
def register_worker_exit(self, pid):
@ -569,7 +573,8 @@ class WorkersStrategy(object):
:param int pid: The PID of the worker that exited.
"""
self.logger.error('Removing dead child %s' % pid)
self.logger.error('Removing dead child %s from parent %s',
pid, os.getpid())
self.children.remove(pid)
def shutdown_sockets(self):
@ -935,24 +940,17 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
run_server(conf, logger, no_fork_sock, global_conf=global_conf)
return 0
def kill_children(*args):
"""Kills the entire process group."""
logger.error('SIGTERM received')
signal.signal(signal.SIGTERM, signal.SIG_IGN)
running[0] = False
os.killpg(0, signal.SIGTERM)
def stop_with_signal(signum, *args):
"""Set running flag to False and capture the signum"""
running_context[0] = False
running_context[1] = signum
def hup(*args):
"""Shuts down the server, but allows running requests to complete"""
logger.error('SIGHUP received')
signal.signal(signal.SIGHUP, signal.SIG_IGN)
running[0] = False
# context to hold boolean running state and stop signum
running_context = [True, None]
signal.signal(signal.SIGTERM, stop_with_signal)
signal.signal(signal.SIGHUP, stop_with_signal)
running = [True]
signal.signal(signal.SIGTERM, kill_children)
signal.signal(signal.SIGHUP, hup)
while running[0]:
while running_context[0]:
for sock, sock_info in strategy.new_worker_socks():
pid = os.fork()
if pid == 0:
@ -992,11 +990,23 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
sleep(0.01)
except KeyboardInterrupt:
logger.notice('User quit')
running[0] = False
running_context[0] = False
break
if running_context[1] is not None:
try:
signame = SIGNUM_TO_NAME[running_context[1]]
except KeyError:
logger.error('Stopping with unexpected signal %r' %
running_context[1])
else:
logger.error('%s received', signame)
if running_context[1] == signal.SIGTERM:
os.killpg(0, signal.SIGTERM)
strategy.shutdown_sockets()
logger.notice('Exited')
signal.signal(signal.SIGTERM, signal.SIG_IGN)
logger.notice('Exited (%s)', os.getpid())
return 0

View File

@ -15,7 +15,6 @@
"""
Pluggable Back-ends for Container Server
"""
import os
from uuid import uuid4
@ -25,9 +24,9 @@ from six.moves import range
import sqlite3
from swift.common.utils import Timestamp, encode_timestamps, \
decode_timestamps, extract_swift_bytes
from swift.common.db import DatabaseBroker, utf8encode
decode_timestamps, extract_swift_bytes, storage_directory, hash_path
from swift.common.db import DatabaseBroker, utf8encode, \
zero_like, DatabaseAlreadyExists
SQLITE_ARG_LIMIT = 999
@ -227,6 +226,35 @@ class ContainerBroker(DatabaseBroker):
db_contains_type = 'object'
db_reclaim_timestamp = 'created_at'
@classmethod
def create_broker(self, device_path, part, account, container, logger=None,
put_timestamp=None, storage_policy_index=None):
"""
Create a ContainerBroker instance. If the db doesn't exist, initialize
the db file.
:param device_path: device path
:param part: partition number
:param account: account name string
:param container: container name string
:param logger: a logger instance
:param put_timestamp: initial timestamp if broker needs to be
initialized
:param storage_policy_index: the storage policy index
:return: a :class:`swift.container.backend.ContainerBroker` instance
"""
hsh = hash_path(account, container)
db_dir = storage_directory(DATADIR, part, hsh)
db_path = os.path.join(device_path, db_dir, hsh + '.db')
broker = ContainerBroker(db_path, account=account, container=container,
logger=logger)
if not os.path.exists(broker.db_file):
try:
broker.initialize(put_timestamp, storage_policy_index)
except DatabaseAlreadyExists:
pass
return broker
@property
def storage_policy_index(self):
if not hasattr(self, '_storage_policy_index'):
@ -401,7 +429,7 @@ class ContainerBroker(DatabaseBroker):
raise
row = conn.execute(
'SELECT object_count from container_stat').fetchone()
return (row[0] == 0)
return zero_like(row[0])
def delete_object(self, name, timestamp, storage_policy_index=0):
"""
@ -457,7 +485,7 @@ class ContainerBroker(DatabaseBroker):
# The container is considered deleted if the delete_timestamp
# value is greater than the put_timestamp, and there are no
# objects in the container.
return (object_count in (None, '', 0, '0')) and (
return zero_like(object_count) and (
Timestamp(delete_timestamp) > Timestamp(put_timestamp))
def _is_deleted(self, conn):
@ -473,6 +501,17 @@ class ContainerBroker(DatabaseBroker):
FROM container_stat''').fetchone()
return self._is_deleted_info(**info)
def is_reclaimable(self, now, reclaim_age):
with self.get() as conn:
info = conn.execute('''
SELECT put_timestamp, delete_timestamp
FROM container_stat''').fetchone()
if (Timestamp(now - reclaim_age) >
Timestamp(info['delete_timestamp']) >
Timestamp(info['put_timestamp'])):
return self.empty()
return False
def get_info_is_deleted(self):
"""
Get the is_deleted status and info for the container.

View File

@ -28,9 +28,7 @@ from swift.common import db_replicator
from swift.common.storage_policy import POLICIES
from swift.common.exceptions import DeviceUnavailable
from swift.common.http import is_success
from swift.common.db import DatabaseAlreadyExists
from swift.common.utils import (Timestamp, hash_path,
storage_directory, majority_size)
from swift.common.utils import Timestamp, majority_size
class ContainerReplicator(db_replicator.Replicator):
@ -39,6 +37,10 @@ class ContainerReplicator(db_replicator.Replicator):
datadir = DATADIR
default_port = 6201
def __init__(self, conf, logger=None):
super(ContainerReplicator, self).__init__(conf, logger=logger)
self.reconciler_cleanups = self.sync_store = None
def report_up_to_date(self, full_info):
reported_key_map = {
'reported_put_timestamp': 'put_timestamp',
@ -61,8 +63,7 @@ class ContainerReplicator(db_replicator.Replicator):
return sync_args
def _handle_sync_response(self, node, response, info, broker, http,
different_region):
parent = super(ContainerReplicator, self)
different_region=False):
if is_success(response.status):
remote_info = json.loads(response.data)
if incorrect_policy_index(info, remote_info):
@ -75,9 +76,8 @@ class ContainerReplicator(db_replicator.Replicator):
if any(info[key] != remote_info[key] for key in sync_timestamps):
broker.merge_timestamps(*(remote_info[key] for key in
sync_timestamps))
rv = parent._handle_sync_response(
return super(ContainerReplicator, self)._handle_sync_response(
node, response, info, broker, http, different_region)
return rv
def find_local_handoff_for_part(self, part):
"""
@ -114,15 +114,10 @@ class ContainerReplicator(db_replicator.Replicator):
raise DeviceUnavailable(
'No mounted devices found suitable to Handoff reconciler '
'container %s in partition %s' % (container, part))
hsh = hash_path(account, container)
db_dir = storage_directory(DATADIR, part, hsh)
db_path = os.path.join(self.root, node['device'], db_dir, hsh + '.db')
broker = ContainerBroker(db_path, account=account, container=container)
if not os.path.exists(broker.db_file):
try:
broker.initialize(timestamp, 0)
except DatabaseAlreadyExists:
pass
broker = ContainerBroker.create_broker(
os.path.join(self.root, node['device']), part, account, container,
logger=self.logger, put_timestamp=timestamp,
storage_policy_index=0)
if self.reconciler_containers is not None:
self.reconciler_containers[container] = part, broker, node['id']
return broker
@ -217,12 +212,13 @@ class ContainerReplicator(db_replicator.Replicator):
# this container shouldn't be here, make sure it's cleaned up
self.reconciler_cleanups[broker.container] = broker
return
try:
# DB is going to get deleted. Be preemptive about it
self.sync_store.remove_synced_container(broker)
except Exception:
self.logger.exception('Failed to remove sync_store entry %s' %
broker.db_file)
if self.sync_store:
try:
# DB is going to get deleted. Be preemptive about it
self.sync_store.remove_synced_container(broker)
except Exception:
self.logger.exception('Failed to remove sync_store entry %s' %
broker.db_file)
return super(ContainerReplicator, self).delete_db(broker)

View File

@ -343,6 +343,40 @@ class ContainerController(BaseStorageServer):
broker.update_status_changed_at(timestamp)
return recreated
def _maybe_autocreate(self, broker, req_timestamp, account,
policy_index):
created = False
if account.startswith(self.auto_create_account_prefix) and \
not os.path.exists(broker.db_file):
if policy_index is None:
raise HTTPBadRequest(
'X-Backend-Storage-Policy-Index header is required')
try:
broker.initialize(req_timestamp.internal, policy_index)
except DatabaseAlreadyExists:
pass
else:
created = True
if not os.path.exists(broker.db_file):
raise HTTPNotFound()
return created
def _update_metadata(self, req, broker, req_timestamp, method):
metadata = {}
metadata.update(
(key, (value, req_timestamp.internal))
for key, value in req.headers.items()
if key.lower() in self.save_headers or
is_sys_or_user_meta('container', key))
if metadata:
if 'X-Container-Sync-To' in metadata:
if 'X-Container-Sync-To' not in broker.metadata or \
metadata['X-Container-Sync-To'][0] != \
broker.metadata['X-Container-Sync-To'][0]:
broker.set_x_container_sync_points(-1, -1)
broker.update_metadata(metadata, validate_metadata=True)
self._update_sync_store(broker, method)
@public
@timing_stats()
def PUT(self, req):
@ -364,14 +398,8 @@ class ContainerController(BaseStorageServer):
# obj put expects the policy_index header, default is for
# legacy support during upgrade.
obj_policy_index = requested_policy_index or 0
if account.startswith(self.auto_create_account_prefix) and \
not os.path.exists(broker.db_file):
try:
broker.initialize(req_timestamp.internal, obj_policy_index)
except DatabaseAlreadyExists:
pass
if not os.path.exists(broker.db_file):
return HTTPNotFound()
self._maybe_autocreate(broker, req_timestamp, account,
obj_policy_index)
broker.put_object(obj, req_timestamp.internal,
int(req.headers['x-size']),
req.headers['x-content-type'],
@ -391,20 +419,7 @@ class ContainerController(BaseStorageServer):
req_timestamp.internal,
new_container_policy,
requested_policy_index)
metadata = {}
metadata.update(
(key, (value, req_timestamp.internal))
for key, value in req.headers.items()
if key.lower() in self.save_headers or
is_sys_or_user_meta('container', key))
if 'X-Container-Sync-To' in metadata:
if 'X-Container-Sync-To' not in broker.metadata or \
metadata['X-Container-Sync-To'][0] != \
broker.metadata['X-Container-Sync-To'][0]:
broker.set_x_container_sync_points(-1, -1)
broker.update_metadata(metadata, validate_metadata=True)
if metadata:
self._update_sync_store(broker, 'PUT')
self._update_metadata(req, broker, req_timestamp, 'PUT')
resp = self.account_update(req, account, container, broker)
if resp:
return resp
@ -562,20 +577,7 @@ class ContainerController(BaseStorageServer):
if broker.is_deleted():
return HTTPNotFound(request=req)
broker.update_put_timestamp(req_timestamp.internal)
metadata = {}
metadata.update(
(key, (value, req_timestamp.internal))
for key, value in req.headers.items()
if key.lower() in self.save_headers or
is_sys_or_user_meta('container', key))
if metadata:
if 'X-Container-Sync-To' in metadata:
if 'X-Container-Sync-To' not in broker.metadata or \
metadata['X-Container-Sync-To'][0] != \
broker.metadata['X-Container-Sync-To'][0]:
broker.set_x_container_sync_points(-1, -1)
broker.update_metadata(metadata, validate_metadata=True)
self._update_sync_store(broker, 'POST')
self._update_metadata(req, broker, req_timestamp, 'POST')
return HTTPNoContent(request=req)
def __call__(self, env, start_response):

View File

@ -17,7 +17,11 @@
# The code below enables nosetests to work with i18n _() blocks
from __future__ import print_function
import sys
from contextlib import contextmanager
import os
from six import reraise
try:
from unittest.util import safe_repr
except ImportError:
@ -86,3 +90,26 @@ def listen_zero():
sock.bind(("127.0.0.1", 0))
sock.listen(50)
return sock
@contextmanager
def annotate_failure(msg):
"""
Catch AssertionError and annotate it with a message. Useful when making
assertions in a loop where the message can indicate the loop index or
richer context about the failure.
:param msg: A message to be prefixed to the AssertionError message.
"""
try:
yield
except AssertionError as err:
err_typ, err_val, err_tb = sys.exc_info()
if err_val.args:
msg = '%s Failed with %s' % (msg, err_val.args[0])
err_val.args = (msg, ) + err_val.args[1:]
else:
# workaround for some IDE's raising custom AssertionErrors
err_val = '%s Failed with %s' % (msg, err)
err_typ = AssertionError
reraise(err_typ, err_val, err_tb)

View File

@ -99,9 +99,11 @@ class BrainSplitter(object):
raise ValueError('Unknown server_type: %r' % server_type)
self.server_type = server_type
part, nodes = self.ring.get_nodes(self.account, c, o)
self.part, self.nodes = self.ring.get_nodes(self.account, c, o)
node_ids = [n['id'] for n in self.nodes]
self.node_numbers = [n + 1 for n in node_ids]
node_ids = [n['id'] for n in nodes]
if all(n_id in node_ids for n_id in (0, 1)):
self.primary_numbers = (1, 2)
self.handoff_numbers = (3, 4)

View File

@ -14,6 +14,8 @@
# limitations under the License.
from __future__ import print_function
import errno
import os
from subprocess import Popen, PIPE
import sys
@ -125,13 +127,17 @@ def kill_server(ipport, ipport2server):
if err:
raise Exception('unable to kill %s' % (server if not number else
'%s%s' % (server, number)))
return wait_for_server_to_hangup(ipport)
def wait_for_server_to_hangup(ipport):
try_until = time() + 30
while True:
try:
conn = HTTPConnection(*ipport)
conn.request('GET', '/')
conn.getresponse()
except Exception as err:
except Exception:
break
if time() > try_until:
raise Exception(
@ -334,33 +340,35 @@ class ProbeTest(unittest.TestCase):
Don't instantiate this directly, use a child class instead.
"""
def _load_rings_and_configs(self):
self.ipport2server = {}
self.configs = defaultdict(dict)
self.account_ring = get_ring(
'account',
self.acct_cont_required_replicas,
self.acct_cont_required_devices,
ipport2server=self.ipport2server,
config_paths=self.configs)
self.container_ring = get_ring(
'container',
self.acct_cont_required_replicas,
self.acct_cont_required_devices,
ipport2server=self.ipport2server,
config_paths=self.configs)
self.policy = get_policy(**self.policy_requirements)
self.object_ring = get_ring(
self.policy.ring_name,
self.obj_required_replicas,
self.obj_required_devices,
server='object',
ipport2server=self.ipport2server,
config_paths=self.configs)
def setUp(self):
resetswift()
kill_orphans()
self._load_rings_and_configs()
try:
self.ipport2server = {}
self.configs = defaultdict(dict)
self.account_ring = get_ring(
'account',
self.acct_cont_required_replicas,
self.acct_cont_required_devices,
ipport2server=self.ipport2server,
config_paths=self.configs)
self.container_ring = get_ring(
'container',
self.acct_cont_required_replicas,
self.acct_cont_required_devices,
ipport2server=self.ipport2server,
config_paths=self.configs)
self.policy = get_policy(**self.policy_requirements)
self.object_ring = get_ring(
self.policy.ring_name,
self.obj_required_replicas,
self.obj_required_devices,
server='object',
ipport2server=self.ipport2server,
config_paths=self.configs)
self.servers_per_port = any(
int(readconf(c, section_name='object-replicator').get(
'servers_per_port', '0'))
@ -489,6 +497,49 @@ class ProbeTest(unittest.TestCase):
finally:
shutil.rmtree(tempdir)
def get_all_object_nodes(self):
"""
Returns a list of all nodes in all object storage policies.
:return: a list of node dicts.
"""
all_obj_nodes = {}
for policy in ENABLED_POLICIES:
for dev in policy.object_ring.devs:
all_obj_nodes[dev['device']] = dev
return all_obj_nodes.values()
def gather_async_pendings(self, onodes):
"""
Returns a list of paths to async pending files found on given nodes.
:param onodes: a list of nodes.
:return: a list of file paths.
"""
async_pendings = []
for onode in onodes:
device_dir = self.device_dir('', onode)
for ap_pol_dir in os.listdir(device_dir):
if not ap_pol_dir.startswith('async_pending'):
# skip 'objects', 'containers', etc.
continue
async_pending_dir = os.path.join(device_dir, ap_pol_dir)
try:
ap_dirs = os.listdir(async_pending_dir)
except OSError as err:
if err.errno == errno.ENOENT:
pass
else:
raise
else:
for ap_dir in ap_dirs:
ap_dir_fullpath = os.path.join(
async_pending_dir, ap_dir)
async_pendings.extend([
os.path.join(ap_dir_fullpath, ent)
for ent in os.listdir(ap_dir_fullpath)])
return async_pendings
class ReplProbeTest(ProbeTest):

View File

@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import os
import random
import time
import uuid
@ -143,31 +141,6 @@ class TestObjectExpirer(ReplProbeTest):
# tha the object server does not write out any async pendings; this
# test asserts that this is the case.
def gather_async_pendings(onodes):
async_pendings = []
for onode in onodes:
device_dir = self.device_dir('', onode)
for ap_pol_dir in os.listdir(device_dir):
if not ap_pol_dir.startswith('async_pending'):
# skip 'objects', 'containers', etc.
continue
async_pending_dir = os.path.join(device_dir, ap_pol_dir)
try:
ap_dirs = os.listdir(async_pending_dir)
except OSError as err:
if err.errno == errno.ENOENT:
pass
else:
raise
else:
for ap_dir in ap_dirs:
ap_dir_fullpath = os.path.join(
async_pending_dir, ap_dir)
async_pendings.extend([
os.path.join(ap_dir_fullpath, ent)
for ent in os.listdir(ap_dir_fullpath)])
return async_pendings
# Make an expiring object in each policy
for policy in ENABLED_POLICIES:
container_name = "expirer-test-%d" % policy.idx
@ -191,15 +164,12 @@ class TestObjectExpirer(ReplProbeTest):
# Make sure there's no async_pendings anywhere. Probe tests only run
# on single-node installs anyway, so this set should be small enough
# that an exhaustive check doesn't take too long.
all_obj_nodes = {}
for policy in ENABLED_POLICIES:
for dev in policy.object_ring.devs:
all_obj_nodes[dev['device']] = dev
pendings_before = gather_async_pendings(all_obj_nodes.values())
all_obj_nodes = self.get_all_object_nodes()
pendings_before = self.gather_async_pendings(all_obj_nodes)
# expire the objects
Manager(['object-expirer']).once()
pendings_after = gather_async_pendings(all_obj_nodes.values())
pendings_after = self.gather_async_pendings(all_obj_nodes)
self.assertEqual(pendings_after, pendings_before)
def test_expirer_object_should_not_be_expired(self):

View File

@ -751,6 +751,8 @@ class FakeStatus(object):
:param response_sleep: float, time to eventlet sleep during response
"""
# connect exception
if inspect.isclass(status) and issubclass(status, Exception):
raise status('FakeStatus Error')
if isinstance(status, (Exception, eventlet.Timeout)):
raise status
if isinstance(status, tuple):
@ -1063,6 +1065,15 @@ def make_timestamp_iter(offset=0):
for t in itertools.count(int(time.time()) + offset))
@contextmanager
def mock_timestamp_now(now=None):
if now is None:
now = Timestamp.now()
with mocklib.patch('swift.common.utils.Timestamp.now',
classmethod(lambda c: now)):
yield now
class Timeout(object):
def __init__(self, seconds):
self.seconds = seconds
@ -1323,3 +1334,12 @@ def skip_if_no_xattrs():
if not xattr_supported_check():
raise SkipTest('Large xattrs not supported in `%s`. Skipping test' %
gettempdir())
def unlink_files(paths):
for path in paths:
try:
os.unlink(path)
except OSError as err:
if err.errno != errno.ENOENT:
raise

View File

@ -404,7 +404,7 @@ class TestAccountController(unittest.TestCase):
elif state[0] == 'race':
# Save the original db_file attribute value
self._saved_db_file = self.db_file
self.db_file += '.doesnotexist'
self._db_file += '.doesnotexist'
def initialize(self, *args, **kwargs):
if state[0] == 'initial':
@ -413,7 +413,7 @@ class TestAccountController(unittest.TestCase):
elif state[0] == 'race':
# Restore the original db_file attribute to get the race
# behavior
self.db_file = self._saved_db_file
self._db_file = self._saved_db_file
return super(InterceptedAcBr, self).initialize(*args, **kwargs)
with mock.patch("swift.account.server.AccountBroker", InterceptedAcBr):

View File

@ -38,7 +38,7 @@ from swift.common.constraints import \
MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE
from swift.common.db import chexor, dict_factory, get_db_connection, \
DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \
GreenDBConnection, PICKLE_PROTOCOL
GreenDBConnection, PICKLE_PROTOCOL, zero_like
from swift.common.utils import normalize_timestamp, mkdirs, Timestamp
from swift.common.exceptions import LockTimeout
from swift.common.swob import HTTPException
@ -46,6 +46,30 @@ from swift.common.swob import HTTPException
from test.unit import with_tempdir
class TestHelperFunctions(unittest.TestCase):
def test_zero_like(self):
expectations = {
# value => expected
None: True,
True: False,
'': True,
'asdf': False,
0: True,
1: False,
'0': True,
'1': False,
}
errors = []
for value, expected in expectations.items():
rv = zero_like(value)
if rv != expected:
errors.append('zero_like(%r) => %r expected %r' % (
value, rv, expected))
if errors:
self.fail('Some unexpected return values:\n' + '\n'.join(errors))
class TestDatabaseConnectionError(unittest.TestCase):
def test_str(self):
@ -989,6 +1013,19 @@ class TestDatabaseBroker(unittest.TestCase):
self.assertEqual(broker.get_sync(uuid3), 2)
broker.merge_syncs([{'sync_point': 5, 'remote_id': uuid2}])
self.assertEqual(broker.get_sync(uuid2), 5)
# max sync point sticks
broker.merge_syncs([{'sync_point': 5, 'remote_id': uuid2}])
self.assertEqual(broker.get_sync(uuid2), 5)
self.assertEqual(broker.get_sync(uuid3), 2)
broker.merge_syncs([{'sync_point': 4, 'remote_id': uuid2}])
self.assertEqual(broker.get_sync(uuid2), 5)
self.assertEqual(broker.get_sync(uuid3), 2)
broker.merge_syncs([{'sync_point': -1, 'remote_id': uuid2},
{'sync_point': 3, 'remote_id': uuid3}])
self.assertEqual(broker.get_sync(uuid2), 5)
self.assertEqual(broker.get_sync(uuid3), 3)
self.assertEqual(broker.get_sync(uuid2, incoming=False), 3)
self.assertEqual(broker.get_sync(uuid3, incoming=False), 4)
def test_get_replication_info(self):
self.get_replication_info_tester(metadata=False)
@ -1089,11 +1126,9 @@ class TestDatabaseBroker(unittest.TestCase):
'max_row': 1, 'id': broker_uuid, 'metadata': broker_metadata})
return broker
def test_metadata(self):
def reclaim(broker, timestamp):
with broker.get() as conn:
broker._reclaim(conn, timestamp)
conn.commit()
# only testing _reclaim_metadata here
@patch.object(DatabaseBroker, '_reclaim')
def test_metadata(self, mock_reclaim):
# Initializes a good broker for us
broker = self.get_replication_info_tester(metadata=True)
# Add our first item
@ -1134,7 +1169,7 @@ class TestDatabaseBroker(unittest.TestCase):
self.assertEqual(broker.metadata['Second'],
[second_value, second_timestamp])
# Reclaim at point before second item was deleted
reclaim(broker, normalize_timestamp(3))
broker.reclaim(normalize_timestamp(3), normalize_timestamp(3))
self.assertIn('First', broker.metadata)
self.assertEqual(broker.metadata['First'],
[first_value, first_timestamp])
@ -1142,7 +1177,7 @@ class TestDatabaseBroker(unittest.TestCase):
self.assertEqual(broker.metadata['Second'],
[second_value, second_timestamp])
# Reclaim at point second item was deleted
reclaim(broker, normalize_timestamp(4))
broker.reclaim(normalize_timestamp(4), normalize_timestamp(4))
self.assertIn('First', broker.metadata)
self.assertEqual(broker.metadata['First'],
[first_value, first_timestamp])
@ -1150,11 +1185,18 @@ class TestDatabaseBroker(unittest.TestCase):
self.assertEqual(broker.metadata['Second'],
[second_value, second_timestamp])
# Reclaim after point second item was deleted
reclaim(broker, normalize_timestamp(5))
broker.reclaim(normalize_timestamp(5), normalize_timestamp(5))
self.assertIn('First', broker.metadata)
self.assertEqual(broker.metadata['First'],
[first_value, first_timestamp])
self.assertNotIn('Second', broker.metadata)
# Delete first item (by setting to empty string)
first_timestamp = normalize_timestamp(6)
broker.update_metadata({'First': ['', first_timestamp]})
self.assertIn('First', broker.metadata)
# Check that sync_timestamp doesn't cause item to be reclaimed
broker.reclaim(normalize_timestamp(5), normalize_timestamp(99))
self.assertIn('First', broker.metadata)
def test_update_metadata_missing_container_info(self):
# Test missing container_info/container_stat row
@ -1197,7 +1239,7 @@ class TestDatabaseBroker(unittest.TestCase):
exc = None
try:
with broker.get() as conn:
broker._reclaim(conn, 0)
broker._reclaim_metadata(conn, 0)
except Exception as err:
exc = err
self.assertEqual(
@ -1333,5 +1375,141 @@ class TestDatabaseBroker(unittest.TestCase):
else:
self.fail('Expected an exception to be raised')
def test_skip_commits(self):
broker = DatabaseBroker(':memory:')
self.assertTrue(broker._skip_commit_puts())
broker._initialize = MagicMock()
broker.initialize(Timestamp.now())
self.assertTrue(broker._skip_commit_puts())
# not initialized
db_file = os.path.join(self.testdir, '1.db')
broker = DatabaseBroker(db_file)
self.assertFalse(os.path.exists(broker.db_file)) # sanity check
self.assertTrue(broker._skip_commit_puts())
# no pending file
broker._initialize = MagicMock()
broker.initialize(Timestamp.now())
self.assertTrue(os.path.exists(broker.db_file)) # sanity check
self.assertFalse(os.path.exists(broker.pending_file)) # sanity check
self.assertTrue(broker._skip_commit_puts())
# pending file exists
with open(broker.pending_file, 'wb'):
pass
self.assertTrue(os.path.exists(broker.pending_file)) # sanity check
self.assertFalse(broker._skip_commit_puts())
# skip_commits is True
broker.skip_commits = True
self.assertTrue(broker._skip_commit_puts())
# re-init
broker = DatabaseBroker(db_file)
self.assertFalse(broker._skip_commit_puts())
# constructor can override
broker = DatabaseBroker(db_file, skip_commits=True)
self.assertTrue(broker._skip_commit_puts())
def test_commit_puts(self):
db_file = os.path.join(self.testdir, '1.db')
broker = DatabaseBroker(db_file)
broker._initialize = MagicMock()
broker.initialize(Timestamp.now())
with open(broker.pending_file, 'wb'):
pass
# merge given list
with patch.object(broker, 'merge_items') as mock_merge_items:
broker._commit_puts(['test'])
mock_merge_items.assert_called_once_with(['test'])
# load file and merge
with open(broker.pending_file, 'wb') as fd:
fd.write(':1:2:99')
with patch.object(broker, 'merge_items') as mock_merge_items:
broker._commit_puts_load = lambda l, e: l.append(e)
broker._commit_puts()
mock_merge_items.assert_called_once_with(['1', '2', '99'])
self.assertEqual(0, os.path.getsize(broker.pending_file))
# load file and merge with given list
with open(broker.pending_file, 'wb') as fd:
fd.write(':bad')
with patch.object(broker, 'merge_items') as mock_merge_items:
broker._commit_puts_load = lambda l, e: l.append(e)
broker._commit_puts(['not'])
mock_merge_items.assert_called_once_with(['not', 'bad'])
self.assertEqual(0, os.path.getsize(broker.pending_file))
# skip_commits True - no merge
db_file = os.path.join(self.testdir, '2.db')
broker = DatabaseBroker(db_file, skip_commits=True)
broker._initialize = MagicMock()
broker.initialize(Timestamp.now())
with open(broker.pending_file, 'wb') as fd:
fd.write(':ignored')
with patch.object(broker, 'merge_items') as mock_merge_items:
with self.assertRaises(DatabaseConnectionError) as cm:
broker._commit_puts(['hmmm'])
mock_merge_items.assert_not_called()
self.assertIn('commits not accepted', str(cm.exception))
with open(broker.pending_file, 'rb') as fd:
self.assertEqual(':ignored', fd.read())
def test_put_record(self):
db_file = os.path.join(self.testdir, '1.db')
broker = DatabaseBroker(db_file)
broker._initialize = MagicMock()
broker.initialize(Timestamp.now())
# pending file created and record written
broker.make_tuple_for_pickle = lambda x: x.upper()
with patch.object(broker, '_commit_puts') as mock_commit_puts:
broker.put_record('pinky')
mock_commit_puts.assert_not_called()
with open(broker.pending_file, 'rb') as fd:
pending = fd.read()
items = pending.split(':')
self.assertEqual(['PINKY'],
[pickle.loads(i.decode('base64')) for i in items[1:]])
# record appended
with patch.object(broker, '_commit_puts') as mock_commit_puts:
broker.put_record('perky')
mock_commit_puts.assert_not_called()
with open(broker.pending_file, 'rb') as fd:
pending = fd.read()
items = pending.split(':')
self.assertEqual(['PINKY', 'PERKY'],
[pickle.loads(i.decode('base64')) for i in items[1:]])
# pending file above cap
cap = swift.common.db.PENDING_CAP
while os.path.getsize(broker.pending_file) < cap:
with open(broker.pending_file, 'ab') as fd:
fd.write('x' * 100000)
with patch.object(broker, '_commit_puts') as mock_commit_puts:
broker.put_record('direct')
mock_commit_puts.called_once_with(['direct'])
# records shouldn't be put to brokers with skip_commits True because
# they cannot be accepted if the pending file is full
broker.skip_commits = True
with open(broker.pending_file, 'wb'):
# empty the pending file
pass
with patch.object(broker, '_commit_puts') as mock_commit_puts:
with self.assertRaises(DatabaseConnectionError) as cm:
broker.put_record('unwelcome')
self.assertIn('commits not accepted', str(cm.exception))
mock_commit_puts.assert_not_called()
with open(broker.pending_file, 'rb') as fd:
pending = fd.read()
self.assertFalse(pending)
if __name__ == '__main__':
unittest.main()

View File

@ -16,6 +16,8 @@
from __future__ import print_function
import unittest
from contextlib import contextmanager
import eventlet
import os
import logging
import errno
@ -26,6 +28,7 @@ from tempfile import mkdtemp, NamedTemporaryFile
import json
import mock
from copy import deepcopy
from mock import patch, call
from six.moves import reload_module
@ -37,6 +40,7 @@ from swift.common.exceptions import DriveNotMounted
from swift.common.swob import HTTPException
from test import unit
from test.unit import FakeLogger
from test.unit.common.test_db import ExampleBroker
@ -160,6 +164,11 @@ class ReplHttp(object):
self.set_status = set_status
replicated = False
host = 'localhost'
node = {
'ip': '127.0.0.1',
'port': '6000',
'device': 'sdb',
}
def replicate(self, *args):
self.replicated = True
@ -230,11 +239,27 @@ class FakeBroker(object):
'put_timestamp': 1,
'created_at': 1,
'count': 0,
'max_row': 99,
'id': 'ID',
'metadata': {}
})
if self.stub_replication_info:
info.update(self.stub_replication_info)
return info
def get_max_row(self, table=None):
return self.get_replication_info()['max_row']
def is_reclaimable(self, now, reclaim_age):
info = self.get_replication_info()
return info['count'] == 0 and (
(now - reclaim_age) >
info['delete_timestamp'] >
info['put_timestamp'])
def get_other_replication_items(self):
return None
def reclaim(self, item_timestamp, sync_timestamp):
pass
@ -273,6 +298,7 @@ class TestDBReplicator(unittest.TestCase):
self.recon_cache = mkdtemp()
rmtree(self.recon_cache, ignore_errors=1)
os.mkdir(self.recon_cache)
self.logger = unit.debug_logger('test-replicator')
def tearDown(self):
for patcher in self._patchers:
@ -287,6 +313,7 @@ class TestDBReplicator(unittest.TestCase):
def stub_delete_db(self, broker):
self.delete_db_calls.append('/path/to/file')
return True
def test_creation(self):
# later config should be extended to assert more config options
@ -647,11 +674,107 @@ class TestDBReplicator(unittest.TestCase):
})
def test_replicate_object(self):
# verify return values from replicate_object
db_replicator.ring = FakeRingWithNodes()
replicator = TestReplicator({})
replicator.delete_db = self.stub_delete_db
replicator._replicate_object('0', '/path/to/file', 'node_id')
self.assertEqual([], self.delete_db_calls)
db_path = '/path/to/file'
replicator = TestReplicator({}, logger=FakeLogger())
info = FakeBroker().get_replication_info()
# make remote appear to be in sync
rinfo = {'point': info['max_row'], 'id': 'remote_id'}
class FakeResponse(object):
def __init__(self, status, rinfo):
self._status = status
self.data = json.dumps(rinfo)
@property
def status(self):
if isinstance(self._status, (Exception, eventlet.Timeout)):
raise self._status
return self._status
# all requests fail
replicate = 'swift.common.db_replicator.ReplConnection.replicate'
with mock.patch(replicate) as fake_replicate:
fake_replicate.side_effect = [
FakeResponse(500, None),
FakeResponse(500, None),
FakeResponse(500, None)]
with mock.patch.object(replicator, 'delete_db') as mock_delete:
res = replicator._replicate_object('0', db_path, 'node_id')
self.assertRaises(StopIteration, next, fake_replicate.side_effect)
self.assertEqual((False, [False, False, False]), res)
self.assertEqual(0, mock_delete.call_count)
self.assertFalse(replicator.logger.get_lines_for_level('error'))
self.assertFalse(replicator.logger.get_lines_for_level('warning'))
replicator.logger.clear()
with mock.patch(replicate) as fake_replicate:
fake_replicate.side_effect = [
FakeResponse(Exception('ugh'), None),
FakeResponse(eventlet.Timeout(), None),
FakeResponse(200, rinfo)]
with mock.patch.object(replicator, 'delete_db') as mock_delete:
res = replicator._replicate_object('0', db_path, 'node_id')
self.assertRaises(StopIteration, next, fake_replicate.side_effect)
self.assertEqual((False, [False, False, True]), res)
self.assertEqual(0, mock_delete.call_count)
lines = replicator.logger.get_lines_for_level('error')
self.assertIn('ERROR syncing', lines[0])
self.assertIn('ERROR syncing', lines[1])
self.assertFalse(lines[2:])
self.assertFalse(replicator.logger.get_lines_for_level('warning'))
replicator.logger.clear()
# partial success
with mock.patch(replicate) as fake_replicate:
fake_replicate.side_effect = [
FakeResponse(200, rinfo),
FakeResponse(200, rinfo),
FakeResponse(500, None)]
with mock.patch.object(replicator, 'delete_db') as mock_delete:
res = replicator._replicate_object('0', db_path, 'node_id')
self.assertRaises(StopIteration, next, fake_replicate.side_effect)
self.assertEqual((False, [True, True, False]), res)
self.assertEqual(0, mock_delete.call_count)
self.assertFalse(replicator.logger.get_lines_for_level('error'))
self.assertFalse(replicator.logger.get_lines_for_level('warning'))
replicator.logger.clear()
# 507 triggers additional requests
with mock.patch(replicate) as fake_replicate:
fake_replicate.side_effect = [
FakeResponse(200, rinfo),
FakeResponse(200, rinfo),
FakeResponse(507, None),
FakeResponse(507, None),
FakeResponse(200, rinfo)]
with mock.patch.object(replicator, 'delete_db') as mock_delete:
res = replicator._replicate_object('0', db_path, 'node_id')
self.assertRaises(StopIteration, next, fake_replicate.side_effect)
self.assertEqual((False, [True, True, False, False, True]), res)
self.assertEqual(0, mock_delete.call_count)
lines = replicator.logger.get_lines_for_level('error')
self.assertIn('Remote drive not mounted', lines[0])
self.assertIn('Remote drive not mounted', lines[1])
self.assertFalse(lines[2:])
self.assertFalse(replicator.logger.get_lines_for_level('warning'))
replicator.logger.clear()
# all requests succeed; node id == 'node_id' causes node to be
# considered a handoff so expect the db to be deleted
with mock.patch(replicate) as fake_replicate:
fake_replicate.side_effect = [
FakeResponse(200, rinfo),
FakeResponse(200, rinfo),
FakeResponse(200, rinfo)]
with mock.patch.object(replicator, 'delete_db') as mock_delete:
res = replicator._replicate_object('0', db_path, 'node_id')
self.assertRaises(StopIteration, next, fake_replicate.side_effect)
self.assertEqual((True, [True, True, True]), res)
self.assertEqual(1, mock_delete.call_count)
self.assertFalse(replicator.logger.get_lines_for_level('error'))
self.assertFalse(replicator.logger.get_lines_for_level('warning'))
def test_replicate_object_quarantine(self):
replicator = TestReplicator({})
@ -695,8 +818,122 @@ class TestDBReplicator(unittest.TestCase):
replicator.brokerclass = FakeAccountBroker
replicator._repl_to_node = lambda *args: True
replicator.delete_db = self.stub_delete_db
replicator._replicate_object('0', '/path/to/file', 'node_id')
orig_cleanup = replicator.cleanup_post_replicate
with mock.patch.object(replicator, 'cleanup_post_replicate',
side_effect=orig_cleanup) as mock_cleanup:
replicator._replicate_object('0', '/path/to/file', 'node_id')
mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3)
self.assertIsInstance(mock_cleanup.call_args[0][0],
replicator.brokerclass)
self.assertEqual(['/path/to/file'], self.delete_db_calls)
self.assertEqual(0, replicator.stats['failure'])
def test_replicate_object_delete_delegated_to_cleanup_post_replicate(self):
replicator = TestReplicator({})
replicator.ring = FakeRingWithNodes().Ring('path')
replicator.brokerclass = FakeAccountBroker
replicator._repl_to_node = lambda *args: True
replicator.delete_db = self.stub_delete_db
# cleanup succeeds
with mock.patch.object(replicator, 'cleanup_post_replicate',
return_value=True) as mock_cleanup:
replicator._replicate_object('0', '/path/to/file', 'node_id')
mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3)
self.assertIsInstance(mock_cleanup.call_args[0][0],
replicator.brokerclass)
self.assertFalse(self.delete_db_calls)
self.assertEqual(0, replicator.stats['failure'])
self.assertEqual(3, replicator.stats['success'])
# cleanup fails
replicator._zero_stats()
with mock.patch.object(replicator, 'cleanup_post_replicate',
return_value=False) as mock_cleanup:
replicator._replicate_object('0', '/path/to/file', 'node_id')
mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3)
self.assertIsInstance(mock_cleanup.call_args[0][0],
replicator.brokerclass)
self.assertFalse(self.delete_db_calls)
self.assertEqual(3, replicator.stats['failure'])
self.assertEqual(0, replicator.stats['success'])
# shouldbehere True - cleanup not required
replicator._zero_stats()
primary_node_id = replicator.ring.get_part_nodes('0')[0]['id']
with mock.patch.object(replicator, 'cleanup_post_replicate',
return_value=True) as mock_cleanup:
replicator._replicate_object('0', '/path/to/file', primary_node_id)
mock_cleanup.assert_not_called()
self.assertFalse(self.delete_db_calls)
self.assertEqual(0, replicator.stats['failure'])
self.assertEqual(2, replicator.stats['success'])
def test_cleanup_post_replicate(self):
replicator = TestReplicator({}, logger=self.logger)
replicator.ring = FakeRingWithNodes().Ring('path')
broker = FakeBroker()
replicator._repl_to_node = lambda *args: True
info = broker.get_replication_info()
with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
res = replicator.cleanup_post_replicate(
broker, info, [False] * 3)
mock_delete_db.assert_not_called()
self.assertTrue(res)
self.assertEqual(['Not deleting db %s (0/3 success)' % broker.db_file],
replicator.logger.get_lines_for_level('debug'))
replicator.logger.clear()
with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
res = replicator.cleanup_post_replicate(
broker, info, [True, False, True])
mock_delete_db.assert_not_called()
self.assertTrue(res)
self.assertEqual(['Not deleting db %s (2/3 success)' % broker.db_file],
replicator.logger.get_lines_for_level('debug'))
replicator.logger.clear()
broker.stub_replication_info = {'max_row': 101}
with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
res = replicator.cleanup_post_replicate(
broker, info, [True] * 3)
mock_delete_db.assert_not_called()
self.assertTrue(res)
self.assertEqual(['Not deleting db %s (2 new rows)' % broker.db_file],
replicator.logger.get_lines_for_level('debug'))
replicator.logger.clear()
broker.stub_replication_info = {'max_row': 98}
with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
res = replicator.cleanup_post_replicate(
broker, info, [True] * 3)
mock_delete_db.assert_not_called()
self.assertTrue(res)
broker.stub_replication_info = None
self.assertEqual(['Not deleting db %s (negative max_row_delta: -1)' %
broker.db_file],
replicator.logger.get_lines_for_level('error'))
replicator.logger.clear()
with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
res = replicator.cleanup_post_replicate(
broker, info, [True] * 3)
mock_delete_db.assert_called_once_with(broker)
self.assertTrue(res)
self.assertEqual(['Successfully deleted db %s' % broker.db_file],
replicator.logger.get_lines_for_level('debug'))
replicator.logger.clear()
with mock.patch.object(replicator, 'delete_db',
return_value=False) as mock_delete_db:
res = replicator.cleanup_post_replicate(
broker, info, [True] * 3)
mock_delete_db.assert_called_once_with(broker)
self.assertFalse(res)
self.assertEqual(['Failed to delete db %s' % broker.db_file],
replicator.logger.get_lines_for_level('debug'))
replicator.logger.clear()
def test_replicate_object_with_exception(self):
replicator = TestReplicator({})
@ -949,6 +1186,8 @@ class TestDBReplicator(unittest.TestCase):
response = rpc.dispatch(('drive', 'part', 'hash'),
['rsync_then_merge', 'arg1', 'arg2'])
expected_calls = [call('/part/ash/hash/hash.db'),
call('/drive/tmp/arg1'),
call(FakeBroker.db_file),
call('/drive/tmp/arg1')]
self.assertEqual(mock_os.path.exists.call_args_list,
expected_calls)
@ -1010,7 +1249,8 @@ class TestDBReplicator(unittest.TestCase):
def mock_renamer(old, new):
self.assertEqual('/drive/tmp/arg1', old)
self.assertEqual('/data/db.db', new)
# FakeBroker uses module filename as db_file!
self.assertEqual(__file__, new)
self._patch(patch.object, db_replicator, 'renamer', mock_renamer)
@ -1023,7 +1263,7 @@ class TestDBReplicator(unittest.TestCase):
self.assertEqual('204 No Content', response.status)
self.assertEqual(204, response.status_int)
def test_complete_rsync_db_does_not_exist(self):
def test_complete_rsync_db_exists(self):
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
mount_check=False)
@ -1740,7 +1980,7 @@ class TestReplToNode(unittest.TestCase):
def test_repl_to_node_300_status(self):
self.http = ReplHttp('{"id": 3, "point": -1}', set_status=300)
self.assertIsNone(self.replicator._repl_to_node(
self.assertFalse(self.replicator._repl_to_node(
self.fake_node, FakeBroker(), '0', self.fake_info))
def test_repl_to_node_not_response(self):
@ -1783,7 +2023,7 @@ class FakeHTTPResponse(object):
return self.resp.body
def attach_fake_replication_rpc(rpc, replicate_hook=None):
def attach_fake_replication_rpc(rpc, replicate_hook=None, errors=None):
class FakeReplConnection(object):
def __init__(self, node, partition, hash_, logger):
@ -1795,12 +2035,16 @@ def attach_fake_replication_rpc(rpc, replicate_hook=None):
def replicate(self, op, *sync_args):
print('REPLICATE: %s, %s, %r' % (self.path, op, sync_args))
replicate_args = self.path.lstrip('/').split('/')
args = [op] + list(sync_args)
with unit.mock_check_drive(isdir=not rpc.mount_check,
ismount=rpc.mount_check):
swob_response = rpc.dispatch(replicate_args, args)
resp = FakeHTTPResponse(swob_response)
resp = None
if errors and op in errors and errors[op]:
resp = errors[op].pop(0)
if not resp:
replicate_args = self.path.lstrip('/').split('/')
args = [op] + deepcopy(list(sync_args))
with unit.mock_check_drive(isdir=not rpc.mount_check,
ismount=rpc.mount_check):
swob_response = rpc.dispatch(replicate_args, args)
resp = FakeHTTPResponse(swob_response)
if replicate_hook:
replicate_hook(op, *sync_args)
return resp
@ -1872,15 +2116,19 @@ class TestReplicatorSync(unittest.TestCase):
conf.update(conf_updates)
return self.replicator_daemon(conf, logger=self.logger)
def _run_once(self, node, conf_updates=None, daemon=None):
daemon = daemon or self._get_daemon(node, conf_updates)
def _install_fake_rsync_file(self, daemon, captured_calls=None):
def _rsync_file(db_file, remote_file, **kwargs):
if captured_calls is not None:
captured_calls.append((db_file, remote_file, kwargs))
remote_server, remote_path = remote_file.split('/', 1)
dest_path = os.path.join(self.root, remote_path)
copy(db_file, dest_path)
return True
daemon._rsync_file = _rsync_file
def _run_once(self, node, conf_updates=None, daemon=None):
daemon = daemon or self._get_daemon(node, conf_updates)
self._install_fake_rsync_file(daemon)
with mock.patch('swift.common.db_replicator.whataremyips',
new=lambda *a, **kw: [node['replication_ip']]), \
unit.mock_check_drive(isdir=not daemon.mount_check,

View File

@ -1454,6 +1454,15 @@ class TestUtils(unittest.TestCase):
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# nested dict items are not sticky
submit_dict = {'key1': {'key2': {'value3': 3}}}
expect_dict = {'key0': 101,
'key1': {'key2': {'value3': 3},
'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# cached entries are sticky
submit_dict = {}
utils.dump_recon_cache(submit_dict, testcache_file, logger)

View File

@ -1270,9 +1270,10 @@ class TestWorkersStrategy(unittest.TestCase):
pid += 1
sock_count += 1
mypid = os.getpid()
self.assertEqual([
'Started child %s' % 88,
'Started child %s' % 89,
'Started child %s from parent %s' % (88, mypid),
'Started child %s from parent %s' % (89, mypid),
], self.logger.get_lines_for_level('notice'))
self.assertEqual(2, sock_count)
@ -1282,7 +1283,7 @@ class TestWorkersStrategy(unittest.TestCase):
self.strategy.register_worker_exit(88)
self.assertEqual([
'Removing dead child %s' % 88,
'Removing dead child %s from parent %s' % (88, mypid)
], self.logger.get_lines_for_level('error'))
for s, i in self.strategy.new_worker_socks():
@ -1294,9 +1295,9 @@ class TestWorkersStrategy(unittest.TestCase):
self.assertEqual(1, sock_count)
self.assertEqual([
'Started child %s' % 88,
'Started child %s' % 89,
'Started child %s' % 90,
'Started child %s from parent %s' % (88, mypid),
'Started child %s from parent %s' % (89, mypid),
'Started child %s from parent %s' % (90, mypid),
], self.logger.get_lines_for_level('notice'))
def test_post_fork_hook(self):

View File

@ -20,7 +20,6 @@ import hashlib
import unittest
from time import sleep, time
from uuid import uuid4
import itertools
import random
from collections import defaultdict
from contextlib import contextmanager
@ -30,7 +29,7 @@ import json
from swift.container.backend import ContainerBroker, \
update_new_item_from_existing
from swift.common.utils import Timestamp, encode_timestamps
from swift.common.utils import Timestamp, encode_timestamps, hash_path
from swift.common.storage_policy import POLICIES
import mock
@ -46,7 +45,7 @@ class TestContainerBroker(unittest.TestCase):
def test_creation(self):
# Test ContainerBroker.__init__
broker = ContainerBroker(':memory:', account='a', container='c')
self.assertEqual(broker.db_file, ':memory:')
self.assertEqual(broker._db_file, ':memory:')
broker.initialize(Timestamp('1').internal, 0)
with broker.get() as conn:
curs = conn.cursor()
@ -55,11 +54,11 @@ class TestContainerBroker(unittest.TestCase):
@patch_policies
def test_storage_policy_property(self):
ts = (Timestamp(t).internal for t in itertools.count(int(time())))
ts = make_timestamp_iter()
for policy in POLICIES:
broker = ContainerBroker(':memory:', account='a',
container='policy_%s' % policy.name)
broker.initialize(next(ts), policy.idx)
broker.initialize(next(ts).internal, policy.idx)
with broker.get() as conn:
try:
conn.execute('''SELECT storage_policy_index
@ -165,17 +164,17 @@ class TestContainerBroker(unittest.TestCase):
broker.delete_db(Timestamp.now().internal)
def test_get_info_is_deleted(self):
start = int(time())
ts = (Timestamp(t).internal for t in itertools.count(start))
ts = make_timestamp_iter()
start = next(ts)
broker = ContainerBroker(':memory:', account='test_account',
container='test_container')
# create it
broker.initialize(next(ts), POLICIES.default.idx)
broker.initialize(start.internal, POLICIES.default.idx)
info, is_deleted = broker.get_info_is_deleted()
self.assertEqual(is_deleted, broker.is_deleted())
self.assertEqual(is_deleted, False) # sanity
self.assertEqual(info, broker.get_info())
self.assertEqual(info['put_timestamp'], Timestamp(start).internal)
self.assertEqual(info['put_timestamp'], start.internal)
self.assertTrue(Timestamp(info['created_at']) >= start)
self.assertEqual(info['delete_timestamp'], '0')
if self.__class__ in (TestContainerBrokerBeforeMetadata,
@ -184,28 +183,28 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(info['status_changed_at'], '0')
else:
self.assertEqual(info['status_changed_at'],
Timestamp(start).internal)
start.internal)
# delete it
delete_timestamp = next(ts)
broker.delete_db(delete_timestamp)
broker.delete_db(delete_timestamp.internal)
info, is_deleted = broker.get_info_is_deleted()
self.assertEqual(is_deleted, True) # sanity
self.assertEqual(is_deleted, broker.is_deleted())
self.assertEqual(info, broker.get_info())
self.assertEqual(info['put_timestamp'], Timestamp(start).internal)
self.assertEqual(info['put_timestamp'], start.internal)
self.assertTrue(Timestamp(info['created_at']) >= start)
self.assertEqual(info['delete_timestamp'], delete_timestamp)
self.assertEqual(info['status_changed_at'], delete_timestamp)
# bring back to life
broker.put_object('obj', next(ts), 0, 'text/plain', 'etag',
broker.put_object('obj', next(ts).internal, 0, 'text/plain', 'etag',
storage_policy_index=broker.storage_policy_index)
info, is_deleted = broker.get_info_is_deleted()
self.assertEqual(is_deleted, False) # sanity
self.assertEqual(is_deleted, broker.is_deleted())
self.assertEqual(info, broker.get_info())
self.assertEqual(info['put_timestamp'], Timestamp(start).internal)
self.assertEqual(info['put_timestamp'], start.internal)
self.assertTrue(Timestamp(info['created_at']) >= start)
self.assertEqual(info['delete_timestamp'], delete_timestamp)
self.assertEqual(info['status_changed_at'], delete_timestamp)
@ -559,7 +558,7 @@ class TestContainerBroker(unittest.TestCase):
"SELECT deleted FROM object").fetchone()[0], deleted)
def _test_put_object_multiple_encoded_timestamps(self, broker):
ts = (Timestamp(t) for t in itertools.count(int(time())))
ts = make_timestamp_iter()
broker.initialize(next(ts).internal, 0)
t = [next(ts) for _ in range(9)]
@ -629,7 +628,7 @@ class TestContainerBroker(unittest.TestCase):
self._test_put_object_multiple_encoded_timestamps(broker)
def _test_put_object_multiple_explicit_timestamps(self, broker):
ts = (Timestamp(t) for t in itertools.count(int(time())))
ts = make_timestamp_iter()
broker.initialize(next(ts).internal, 0)
t = [next(ts) for _ in range(11)]
@ -733,7 +732,7 @@ class TestContainerBroker(unittest.TestCase):
def test_last_modified_time(self):
# Test container listing reports the most recent of data or metadata
# timestamp as last-modified time
ts = (Timestamp(t) for t in itertools.count(int(time())))
ts = make_timestamp_iter()
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(next(ts).internal, 0)
@ -786,18 +785,17 @@ class TestContainerBroker(unittest.TestCase):
@patch_policies
def test_put_misplaced_object_does_not_effect_container_stats(self):
policy = random.choice(list(POLICIES))
ts = (Timestamp(t).internal for t in
itertools.count(int(time())))
ts = make_timestamp_iter()
broker = ContainerBroker(':memory:',
account='a', container='c')
broker.initialize(next(ts), policy.idx)
broker.initialize(next(ts).internal, policy.idx)
# migration tests may not honor policy on initialize
if isinstance(self, ContainerBrokerMigrationMixin):
real_storage_policy_index = \
broker.get_info()['storage_policy_index']
policy = [p for p in POLICIES
if p.idx == real_storage_policy_index][0]
broker.put_object('correct_o', next(ts), 123, 'text/plain',
broker.put_object('correct_o', next(ts).internal, 123, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=policy.idx)
info = broker.get_info()
@ -805,7 +803,7 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(123, info['bytes_used'])
other_policy = random.choice([p for p in POLICIES
if p is not policy])
broker.put_object('wrong_o', next(ts), 123, 'text/plain',
broker.put_object('wrong_o', next(ts).internal, 123, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=other_policy.idx)
self.assertEqual(1, info['object_count'])
@ -814,23 +812,22 @@ class TestContainerBroker(unittest.TestCase):
@patch_policies
def test_has_multiple_policies(self):
policy = random.choice(list(POLICIES))
ts = (Timestamp(t).internal for t in
itertools.count(int(time())))
ts = make_timestamp_iter()
broker = ContainerBroker(':memory:',
account='a', container='c')
broker.initialize(next(ts), policy.idx)
broker.initialize(next(ts).internal, policy.idx)
# migration tests may not honor policy on initialize
if isinstance(self, ContainerBrokerMigrationMixin):
real_storage_policy_index = \
broker.get_info()['storage_policy_index']
policy = [p for p in POLICIES
if p.idx == real_storage_policy_index][0]
broker.put_object('correct_o', next(ts), 123, 'text/plain',
broker.put_object('correct_o', next(ts).internal, 123, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=policy.idx)
self.assertFalse(broker.has_multiple_policies())
other_policy = [p for p in POLICIES if p is not policy][0]
broker.put_object('wrong_o', next(ts), 123, 'text/plain',
broker.put_object('wrong_o', next(ts).internal, 123, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=other_policy.idx)
self.assertTrue(broker.has_multiple_policies())
@ -838,11 +835,10 @@ class TestContainerBroker(unittest.TestCase):
@patch_policies
def test_get_policy_info(self):
policy = random.choice(list(POLICIES))
ts = (Timestamp(t).internal for t in
itertools.count(int(time())))
ts = make_timestamp_iter()
broker = ContainerBroker(':memory:',
account='a', container='c')
broker.initialize(next(ts), policy.idx)
broker.initialize(next(ts).internal, policy.idx)
# migration tests may not honor policy on initialize
if isinstance(self, ContainerBrokerMigrationMixin):
real_storage_policy_index = \
@ -854,7 +850,7 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(policy_stats, expected)
# add an object
broker.put_object('correct_o', next(ts), 123, 'text/plain',
broker.put_object('correct_o', next(ts).internal, 123, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=policy.idx)
policy_stats = broker.get_policy_stats()
@ -864,7 +860,7 @@ class TestContainerBroker(unittest.TestCase):
# add a misplaced object
other_policy = random.choice([p for p in POLICIES
if p is not policy])
broker.put_object('wrong_o', next(ts), 123, 'text/plain',
broker.put_object('wrong_o', next(ts).internal, 123, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=other_policy.idx)
policy_stats = broker.get_policy_stats()
@ -876,15 +872,14 @@ class TestContainerBroker(unittest.TestCase):
@patch_policies
def test_policy_stat_tracking(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time())))
ts = make_timestamp_iter()
broker = ContainerBroker(':memory:',
account='a', container='c')
# Note: in subclasses of this TestCase that inherit the
# ContainerBrokerMigrationMixin, passing POLICIES.default.idx here has
# no effect and broker.get_policy_stats() returns a dict with a single
# entry mapping policy index 0 to the container stats
broker.initialize(next(ts), POLICIES.default.idx)
broker.initialize(next(ts).internal, POLICIES.default.idx)
stats = defaultdict(dict)
def assert_empty_default_policy_stats(policy_stats):
@ -904,7 +899,7 @@ class TestContainerBroker(unittest.TestCase):
policy_index = random.randint(0, iters * 0.1)
name = 'object-%s' % random.randint(0, iters * 0.1)
size = random.randint(0, iters)
broker.put_object(name, next(ts), size, 'text/plain',
broker.put_object(name, next(ts).internal, size, 'text/plain',
'5af83e3196bf99f440f31f2e1a6c9afe',
storage_policy_index=policy_index)
# track the size of the latest timestamp put for each object
@ -1930,12 +1925,11 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(rec['content_type'], 'text/plain')
def test_set_storage_policy_index(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time())))
ts = make_timestamp_iter()
broker = ContainerBroker(':memory:', account='test_account',
container='test_container')
timestamp = next(ts)
broker.initialize(timestamp, 0)
broker.initialize(timestamp.internal, 0)
info = broker.get_info()
self.assertEqual(0, info['storage_policy_index']) # sanity check
@ -1946,39 +1940,40 @@ class TestContainerBroker(unittest.TestCase):
TestContainerBrokerBeforeSPI):
self.assertEqual(info['status_changed_at'], '0')
else:
self.assertEqual(timestamp, info['status_changed_at'])
self.assertEqual(timestamp.internal, info['status_changed_at'])
expected = {0: {'object_count': 0, 'bytes_used': 0}}
self.assertEqual(expected, broker.get_policy_stats())
timestamp = next(ts)
broker.set_storage_policy_index(111, timestamp)
broker.set_storage_policy_index(111, timestamp.internal)
self.assertEqual(broker.storage_policy_index, 111)
info = broker.get_info()
self.assertEqual(111, info['storage_policy_index'])
self.assertEqual(0, info['object_count'])
self.assertEqual(0, info['bytes_used'])
self.assertEqual(timestamp, info['status_changed_at'])
self.assertEqual(timestamp.internal, info['status_changed_at'])
expected[111] = {'object_count': 0, 'bytes_used': 0}
self.assertEqual(expected, broker.get_policy_stats())
timestamp = next(ts)
broker.set_storage_policy_index(222, timestamp)
broker.set_storage_policy_index(222, timestamp.internal)
self.assertEqual(broker.storage_policy_index, 222)
info = broker.get_info()
self.assertEqual(222, info['storage_policy_index'])
self.assertEqual(0, info['object_count'])
self.assertEqual(0, info['bytes_used'])
self.assertEqual(timestamp, info['status_changed_at'])
self.assertEqual(timestamp.internal, info['status_changed_at'])
expected[222] = {'object_count': 0, 'bytes_used': 0}
self.assertEqual(expected, broker.get_policy_stats())
old_timestamp, timestamp = timestamp, next(ts)
broker.set_storage_policy_index(222, timestamp) # it's idempotent
# setting again is idempotent
broker.set_storage_policy_index(222, timestamp.internal)
info = broker.get_info()
self.assertEqual(222, info['storage_policy_index'])
self.assertEqual(0, info['object_count'])
self.assertEqual(0, info['bytes_used'])
self.assertEqual(old_timestamp, info['status_changed_at'])
self.assertEqual(old_timestamp.internal, info['status_changed_at'])
self.assertEqual(expected, broker.get_policy_stats())
def test_set_storage_policy_index_empty(self):
@ -2004,19 +1999,18 @@ class TestContainerBroker(unittest.TestCase):
@with_tempdir
def test_legacy_pending_files(self, tempdir):
ts = (Timestamp(t).internal for t in
itertools.count(int(time())))
ts = make_timestamp_iter()
db_path = os.path.join(tempdir, 'container.db')
# first init an acct DB without the policy_stat table present
broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(next(ts), 1)
broker.initialize(next(ts).internal, 1)
# manually make some pending entries lacking storage_policy_index
with open(broker.pending_file, 'a+b') as fp:
for i in range(10):
name, timestamp, size, content_type, etag, deleted = (
'o%s' % i, next(ts), 0, 'c', 'e', 0)
'o%s' % i, next(ts).internal, 0, 'c', 'e', 0)
fp.write(':')
fp.write(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted),
@ -2033,7 +2027,7 @@ class TestContainerBroker(unittest.TestCase):
else:
size = 2
storage_policy_index = 1
broker.put_object(name, next(ts), size, 'c', 'e', 0,
broker.put_object(name, next(ts).internal, size, 'c', 'e', 0,
storage_policy_index=storage_policy_index)
broker._commit_puts_stale_ok()
@ -2049,8 +2043,7 @@ class TestContainerBroker(unittest.TestCase):
@with_tempdir
def test_get_info_no_stale_reads(self, tempdir):
ts = (Timestamp(t).internal for t in
itertools.count(int(time())))
ts = make_timestamp_iter()
db_path = os.path.join(tempdir, 'container.db')
def mock_commit_puts():
@ -2058,13 +2051,13 @@ class TestContainerBroker(unittest.TestCase):
broker = ContainerBroker(db_path, account='a', container='c',
stale_reads_ok=False)
broker.initialize(next(ts), 1)
broker.initialize(next(ts).internal, 1)
# manually make some pending entries
with open(broker.pending_file, 'a+b') as fp:
for i in range(10):
name, timestamp, size, content_type, etag, deleted = (
'o%s' % i, next(ts), 0, 'c', 'e', 0)
'o%s' % i, next(ts).internal, 0, 'c', 'e', 0)
fp.write(':')
fp.write(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted),
@ -2079,8 +2072,7 @@ class TestContainerBroker(unittest.TestCase):
@with_tempdir
def test_get_info_stale_read_ok(self, tempdir):
ts = (Timestamp(t).internal for t in
itertools.count(int(time())))
ts = make_timestamp_iter()
db_path = os.path.join(tempdir, 'container.db')
def mock_commit_puts():
@ -2088,13 +2080,13 @@ class TestContainerBroker(unittest.TestCase):
broker = ContainerBroker(db_path, account='a', container='c',
stale_reads_ok=True)
broker.initialize(next(ts), 1)
broker.initialize(next(ts).internal, 1)
# manually make some pending entries
with open(broker.pending_file, 'a+b') as fp:
for i in range(10):
name, timestamp, size, content_type, etag, deleted = (
'o%s' % i, next(ts), 0, 'c', 'e', 0)
'o%s' % i, next(ts).internal, 0, 'c', 'e', 0)
fp.write(':')
fp.write(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted),
@ -2104,6 +2096,26 @@ class TestContainerBroker(unittest.TestCase):
broker._commit_puts = mock_commit_puts
broker.get_info()
@with_tempdir
def test_create_broker(self, tempdir):
broker = ContainerBroker.create_broker(tempdir, 0, 'a', 'c')
hsh = hash_path('a', 'c')
expected_path = os.path.join(
tempdir, 'containers', '0', hsh[-3:], hsh, hsh + '.db')
self.assertEqual(expected_path, broker.db_file)
self.assertTrue(os.path.isfile(expected_path))
ts = Timestamp.now()
broker = ContainerBroker.create_broker(tempdir, 0, 'a', 'c1',
put_timestamp=ts.internal)
hsh = hash_path('a', 'c1')
expected_path = os.path.join(
tempdir, 'containers', '0', hsh[-3:], hsh, hsh + '.db')
self.assertEqual(expected_path, broker.db_file)
self.assertTrue(os.path.isfile(expected_path))
self.assertEqual(ts.internal, broker.get_info()['put_timestamp'])
self.assertEqual(0, broker.get_info()['storage_policy_index'])
class TestCommonContainerBroker(test_db.TestExampleBroker):

View File

@ -424,7 +424,7 @@ class TestContainerController(unittest.TestCase):
elif state[0] == 'race':
# Save the original db_file attribute value
self._saved_db_file = self.db_file
self.db_file += '.doesnotexist'
self._db_file += '.doesnotexist'
def initialize(self, *args, **kwargs):
if state[0] == 'initial':
@ -433,7 +433,7 @@ class TestContainerController(unittest.TestCase):
elif state[0] == 'race':
# Restore the original db_file attribute to get the race
# behavior
self.db_file = self._saved_db_file
self._db_file = self._saved_db_file
return super(InterceptedCoBr, self).initialize(*args, **kwargs)
with mock.patch("swift.container.server.ContainerBroker",