swift/swift/container/backend.py

2216 lines
91 KiB
Python

# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Pluggable Back-ends for Container Server
"""
import errno
import os
from uuid import uuid4
import six
from six.moves import range
from six.moves.urllib.parse import unquote
import sqlite3
from eventlet import tpool
from swift.common.constraints import CONTAINER_LISTING_LIMIT
from swift.common.exceptions import LockTimeout
from swift.common.utils import Timestamp, encode_timestamps, \
decode_timestamps, extract_swift_bytes, storage_directory, hash_path, \
ShardRange, renamer, find_shard_range, MD5_OF_EMPTY_STRING, mkdirs, \
get_db_files, parse_db_filename, make_db_file_path, split_path, \
RESERVED_BYTE
from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \
zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT
DATADIR = 'containers'
RECORD_TYPE_OBJECT = 'object'
RECORD_TYPE_SHARD = 'shard'
SHARD_RANGE_TABLE = 'shard_range'
NOTFOUND = 'not_found'
UNSHARDED = 'unsharded'
SHARDING = 'sharding'
SHARDED = 'sharded'
COLLAPSED = 'collapsed'
SHARD_STATS_STATES = [ShardRange.ACTIVE, ShardRange.SHARDING,
ShardRange.SHRINKING]
SHARD_LISTING_STATES = SHARD_STATS_STATES + [ShardRange.CLEAVED]
SHARD_UPDATE_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
ShardRange.ACTIVE, ShardRange.SHARDING]
# attribute names in order used when transforming shard ranges from dicts to
# tuples and vice-versa
SHARD_RANGE_KEYS = ('name', 'timestamp', 'lower', 'upper', 'object_count',
'bytes_used', 'meta_timestamp', 'deleted', 'state',
'state_timestamp', 'epoch')
POLICY_STAT_TABLE_CREATE = '''
CREATE TABLE policy_stat (
storage_policy_index INTEGER PRIMARY KEY,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0
);
'''
POLICY_STAT_TRIGGER_SCRIPT = '''
CREATE TRIGGER object_insert_policy_stat AFTER INSERT ON object
BEGIN
UPDATE policy_stat
SET object_count = object_count + (1 - new.deleted),
bytes_used = bytes_used + new.size
WHERE storage_policy_index = new.storage_policy_index;
INSERT INTO policy_stat (
storage_policy_index, object_count, bytes_used)
SELECT new.storage_policy_index,
(1 - new.deleted),
new.size
WHERE NOT EXISTS(
SELECT changes() as change
FROM policy_stat
WHERE change <> 0
);
UPDATE container_info
SET hash = chexor(hash, new.name, new.created_at);
END;
CREATE TRIGGER object_delete_policy_stat AFTER DELETE ON object
BEGIN
UPDATE policy_stat
SET object_count = object_count - (1 - old.deleted),
bytes_used = bytes_used - old.size
WHERE storage_policy_index = old.storage_policy_index;
UPDATE container_info
SET hash = chexor(hash, old.name, old.created_at);
END;
'''
CONTAINER_INFO_TABLE_SCRIPT = '''
CREATE TABLE container_info (
account TEXT,
container TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
reported_put_timestamp TEXT DEFAULT '0',
reported_delete_timestamp TEXT DEFAULT '0',
reported_object_count INTEGER DEFAULT 0,
reported_bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT '',
x_container_sync_point1 INTEGER DEFAULT -1,
x_container_sync_point2 INTEGER DEFAULT -1,
storage_policy_index INTEGER DEFAULT 0,
reconciler_sync_point INTEGER DEFAULT -1
);
'''
CONTAINER_STAT_VIEW_SCRIPT = '''
CREATE VIEW container_stat
AS SELECT ci.account, ci.container, ci.created_at,
ci.put_timestamp, ci.delete_timestamp,
ci.reported_put_timestamp, ci.reported_delete_timestamp,
ci.reported_object_count, ci.reported_bytes_used, ci.hash,
ci.id, ci.status, ci.status_changed_at, ci.metadata,
ci.x_container_sync_point1, ci.x_container_sync_point2,
ci.reconciler_sync_point,
ci.storage_policy_index,
coalesce(ps.object_count, 0) AS object_count,
coalesce(ps.bytes_used, 0) AS bytes_used
FROM container_info ci LEFT JOIN policy_stat ps
ON ci.storage_policy_index = ps.storage_policy_index;
CREATE TRIGGER container_stat_update
INSTEAD OF UPDATE ON container_stat
BEGIN
UPDATE container_info
SET account = NEW.account,
container = NEW.container,
created_at = NEW.created_at,
put_timestamp = NEW.put_timestamp,
delete_timestamp = NEW.delete_timestamp,
reported_put_timestamp = NEW.reported_put_timestamp,
reported_delete_timestamp = NEW.reported_delete_timestamp,
reported_object_count = NEW.reported_object_count,
reported_bytes_used = NEW.reported_bytes_used,
hash = NEW.hash,
id = NEW.id,
status = NEW.status,
status_changed_at = NEW.status_changed_at,
metadata = NEW.metadata,
x_container_sync_point1 = NEW.x_container_sync_point1,
x_container_sync_point2 = NEW.x_container_sync_point2,
storage_policy_index = NEW.storage_policy_index,
reconciler_sync_point = NEW.reconciler_sync_point;
END;
'''
def update_new_item_from_existing(new_item, existing):
"""
Compare the data and meta related timestamps of a new object item with
the timestamps of an existing object record, and update the new item
with data and/or meta related attributes from the existing record if
their timestamps are newer.
The multiple timestamps are encoded into a single string for storing
in the 'created_at' column of the objects db table.
:param new_item: A dict of object update attributes
:param existing: A dict of existing object attributes
:return: True if any attributes of the new item dict were found to be
newer than the existing and therefore not updated, otherwise
False implying that the updated item is equal to the existing.
"""
# item[created_at] may be updated so keep a copy of the original
# value in case we process this item again
new_item.setdefault('data_timestamp', new_item['created_at'])
# content-type and metadata timestamps may be encoded in
# item[created_at], or may be set explicitly.
item_ts_data, item_ts_ctype, item_ts_meta = decode_timestamps(
new_item['data_timestamp'])
if new_item.get('ctype_timestamp'):
item_ts_ctype = Timestamp(new_item.get('ctype_timestamp'))
item_ts_meta = item_ts_ctype
if new_item.get('meta_timestamp'):
item_ts_meta = Timestamp(new_item.get('meta_timestamp'))
if not existing:
# encode new_item timestamps into one string for db record
new_item['created_at'] = encode_timestamps(
item_ts_data, item_ts_ctype, item_ts_meta)
return True
# decode existing timestamp into separate data, content-type and
# metadata timestamps
rec_ts_data, rec_ts_ctype, rec_ts_meta = decode_timestamps(
existing['created_at'])
# Extract any swift_bytes values from the content_type values. This is
# necessary because the swift_bytes value to persist should be that at the
# most recent data timestamp whereas the content-type value to persist is
# that at the most recent content-type timestamp. The two values happen to
# be stored in the same database column for historical reasons.
for item in (new_item, existing):
content_type, swift_bytes = extract_swift_bytes(item['content_type'])
item['content_type'] = content_type
item['swift_bytes'] = swift_bytes
newer_than_existing = [True, True, True]
if rec_ts_data >= item_ts_data:
# apply data attributes from existing record
new_item.update([(k, existing[k])
for k in ('size', 'etag', 'deleted', 'swift_bytes')])
item_ts_data = rec_ts_data
newer_than_existing[0] = False
if rec_ts_ctype >= item_ts_ctype:
# apply content-type attribute from existing record
new_item['content_type'] = existing['content_type']
item_ts_ctype = rec_ts_ctype
newer_than_existing[1] = False
if rec_ts_meta >= item_ts_meta:
# apply metadata timestamp from existing record
item_ts_meta = rec_ts_meta
newer_than_existing[2] = False
# encode updated timestamps into one string for db record
new_item['created_at'] = encode_timestamps(
item_ts_data, item_ts_ctype, item_ts_meta)
# append the most recent swift_bytes onto the most recent content_type in
# new_item and restore existing to its original state
for item in (new_item, existing):
if item['swift_bytes']:
item['content_type'] += ';swift_bytes=%s' % item['swift_bytes']
del item['swift_bytes']
return any(newer_than_existing)
def merge_shards(shard_data, existing):
"""
Compares ``shard_data`` with ``existing`` and updates ``shard_data`` with
any items of ``existing`` that take precedence over the corresponding item
in ``shard_data``.
:param shard_data: a dict representation of shard range that may be
modified by this method.
:param existing: a dict representation of shard range.
:returns: True if ``shard data`` has any item(s) that are considered to
take precedence over the corresponding item in ``existing``
"""
if not existing:
return True
if existing['timestamp'] < shard_data['timestamp']:
# note that currently we do not roll forward any meta or state from
# an item that was created at older time, newer created time trumps
return True
elif existing['timestamp'] > shard_data['timestamp']:
return False
new_content = False
# timestamp must be the same, so preserve existing range bounds and deleted
for k in ('lower', 'upper', 'deleted'):
shard_data[k] = existing[k]
# now we need to look for meta data updates
if existing['meta_timestamp'] >= shard_data['meta_timestamp']:
for k in ('object_count', 'bytes_used', 'meta_timestamp'):
shard_data[k] = existing[k]
else:
new_content = True
if (existing['state_timestamp'] == shard_data['state_timestamp']
and shard_data['state'] > existing['state']):
new_content = True
elif existing['state_timestamp'] >= shard_data['state_timestamp']:
for k in ('state', 'state_timestamp', 'epoch'):
shard_data[k] = existing[k]
else:
new_content = True
return new_content
class ContainerBroker(DatabaseBroker):
"""
Encapsulates working with a container database.
Note that this may involve multiple on-disk DB files if the container
becomes sharded:
* :attr:`_db_file` is the path to the legacy container DB name, i.e.
``<hash>.db``. This file should exist for an initialised broker that
has never been sharded, but will not exist once a container has been
sharded.
* :attr:`db_files` is a list of existing db files for the broker. This
list should have at least one entry for an initialised broker, and
should have two entries while a broker is in SHARDING state.
* :attr:`db_file` is the path to whichever db is currently authoritative
for the container. Depending on the container's state, this may not be
the same as the ``db_file`` argument given to :meth:`~__init__`, unless
``force_db_file`` is True in which case :attr:`db_file` is always equal
to the ``db_file`` argument given to :meth:`~__init__`.
* :attr:`pending_file` is always equal to :attr:`_db_file` extended with
``.pending``, i.e. ``<hash>.db.pending``.
"""
db_type = 'container'
db_contains_type = 'object'
db_reclaim_timestamp = 'created_at'
def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None,
account=None, container=None, pending_timeout=None,
stale_reads_ok=False, skip_commits=False,
force_db_file=False):
self._init_db_file = db_file
if db_file == ':memory:':
base_db_file = db_file
else:
base_db_file = make_db_file_path(db_file, None)
super(ContainerBroker, self).__init__(
base_db_file, timeout, logger, account, container, pending_timeout,
stale_reads_ok, skip_commits=skip_commits)
# the root account and container are populated on demand
self._root_account = self._root_container = None
self._force_db_file = force_db_file
self._db_files = None
@classmethod
def create_broker(cls, device_path, part, account, container, logger=None,
epoch=None, put_timestamp=None,
storage_policy_index=None):
"""
Create a ContainerBroker instance. If the db doesn't exist, initialize
the db file.
:param device_path: device path
:param part: partition number
:param account: account name string
:param container: container name string
:param logger: a logger instance
:param epoch: a timestamp to include in the db filename
:param put_timestamp: initial timestamp if broker needs to be
initialized
:param storage_policy_index: the storage policy index
:return: a :class:`swift.container.backend.ContainerBroker` instance
"""
hsh = hash_path(account, container)
db_dir = storage_directory(DATADIR, part, hsh)
db_path = make_db_file_path(
os.path.join(device_path, db_dir, hsh + '.db'), epoch)
broker = ContainerBroker(db_path, account=account, container=container,
logger=logger)
if not os.path.exists(broker.db_file):
try:
broker.initialize(put_timestamp, storage_policy_index)
except DatabaseAlreadyExists:
pass
return broker
def get_db_state(self):
"""
Returns the current state of on disk db files.
"""
if self._db_file == ':memory:':
return UNSHARDED
if not self.db_files:
return NOTFOUND
if len(self.db_files) > 1:
return SHARDING
if self.db_epoch is None:
# never been sharded
return UNSHARDED
if self.db_epoch != self._own_shard_range().epoch:
return UNSHARDED
if not self.get_shard_ranges():
return COLLAPSED
return SHARDED
def sharding_initiated(self):
"""
Returns True if a broker has shard range state that would be necessary
for sharding to have been initiated, False otherwise.
"""
own_shard_range = self.get_own_shard_range()
if own_shard_range.state in (ShardRange.SHARDING,
ShardRange.SHRINKING,
ShardRange.SHARDED):
return bool(self.get_shard_ranges())
return False
def sharding_required(self):
"""
Returns True if a broker has shard range state that would be necessary
for sharding to have been initiated but has not yet completed sharding,
False otherwise.
"""
db_state = self.get_db_state()
return (db_state == SHARDING or
(db_state == UNSHARDED and self.sharding_initiated()))
def is_sharded(self):
return self.get_db_state() == SHARDED
def reload_db_files(self):
"""
Reloads the cached list of valid on disk db files for this broker.
"""
if self._db_file == ':memory:':
return
# reset connection so the next access will use the correct DB file
self.conn = None
self._db_files = get_db_files(self._init_db_file)
@property
def db_files(self):
"""
Gets the cached list of valid db files that exist on disk for this
broker.
The cached list may be refreshed by calling
:meth:`~swift.container.backend.ContainerBroker.reload_db_files`.
:return: A list of paths to db files ordered by ascending epoch;
the list may be empty.
"""
if not self._db_files:
self.reload_db_files()
return self._db_files
@property
def db_file(self):
"""
Get the path to the primary db file for this broker. This is typically
the db file for the most recent sharding epoch. However, if no db files
exist on disk, or if ``force_db_file`` was True when the broker was
constructed, then the primary db file is the file passed to the broker
constructor.
:return: A path to a db file; the file does not necessarily exist.
"""
if self._force_db_file:
return self._init_db_file
if self.db_files:
return self.db_files[-1]
return self._init_db_file
@property
def db_epoch(self):
hash_, epoch, ext = parse_db_filename(self.db_file)
return epoch
@property
def storage_policy_index(self):
if not hasattr(self, '_storage_policy_index'):
self._storage_policy_index = \
self.get_info()['storage_policy_index']
return self._storage_policy_index
@property
def path(self):
self._populate_instance_cache()
return '%s/%s' % (self.account, self.container)
def _initialize(self, conn, put_timestamp, storage_policy_index):
"""
Create a brand new container database (tables, indices, triggers, etc.)
"""
if not self.account:
raise ValueError(
'Attempting to create a new database with no account set')
if not self.container:
raise ValueError(
'Attempting to create a new database with no container set')
if storage_policy_index is None:
storage_policy_index = 0
self.create_object_table(conn)
self.create_policy_stat_table(conn, storage_policy_index)
self.create_container_info_table(conn, put_timestamp,
storage_policy_index)
self.create_shard_range_table(conn)
self._db_files = None
def create_object_table(self, conn):
"""
Create the object table which is specific to the container DB.
Not a part of Pluggable Back-ends, internal to the baseline code.
:param conn: DB connection object
"""
conn.executescript("""
CREATE TABLE object (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
created_at TEXT,
size INTEGER,
content_type TEXT,
etag TEXT,
deleted INTEGER DEFAULT 0,
storage_policy_index INTEGER DEFAULT 0
);
CREATE INDEX ix_object_deleted_name ON object (deleted, name);
CREATE TRIGGER object_update BEFORE UPDATE ON object
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
""" + POLICY_STAT_TRIGGER_SCRIPT)
def create_container_info_table(self, conn, put_timestamp,
storage_policy_index):
"""
Create the container_info table which is specific to the container DB.
Not a part of Pluggable Back-ends, internal to the baseline code.
Also creates the container_stat view.
:param conn: DB connection object
:param put_timestamp: put timestamp
:param storage_policy_index: storage policy index
"""
if put_timestamp is None:
put_timestamp = Timestamp(0).internal
# The container_stat view is for compatibility; old versions of Swift
# expected a container_stat table with columns "object_count" and
# "bytes_used", but when that stuff became per-storage-policy and
# moved to the policy_stat table, we stopped creating those columns in
# container_stat.
#
# To retain compatibility, we create the container_stat view with some
# triggers to make it behave like the old container_stat table. This
# way, if an old version of Swift encounters a database with the new
# schema, it can still work.
#
# Note that this can occur during a rolling Swift upgrade if a DB gets
# rsynced from an old node to a new, so it's necessary for
# availability during upgrades. The fact that it enables downgrades is
# a nice bonus.
conn.executescript(CONTAINER_INFO_TABLE_SCRIPT +
CONTAINER_STAT_VIEW_SCRIPT)
conn.execute("""
INSERT INTO container_info (account, container, created_at, id,
put_timestamp, status_changed_at, storage_policy_index)
VALUES (?, ?, ?, ?, ?, ?, ?);
""", (self.account, self.container, Timestamp.now().internal,
str(uuid4()), put_timestamp, put_timestamp,
storage_policy_index))
def create_policy_stat_table(self, conn, storage_policy_index=0):
"""
Create policy_stat table.
:param conn: DB connection object
:param storage_policy_index: the policy_index the container is
being created with
"""
conn.executescript(POLICY_STAT_TABLE_CREATE)
conn.execute("""
INSERT INTO policy_stat (storage_policy_index)
VALUES (?)
""", (storage_policy_index,))
def create_shard_range_table(self, conn):
"""
Create the shard_range table which is specific to the container DB.
:param conn: DB connection object
"""
# Use execute (not executescript) so we get the benefits of our
# GreenDBConnection. Creating a table requires a whole-DB lock;
# *any* in-progress cursor will otherwise trip a "database is locked"
# error.
conn.execute("""
CREATE TABLE %s (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
timestamp TEXT,
lower TEXT,
upper TEXT,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0,
meta_timestamp TEXT,
deleted INTEGER DEFAULT 0,
state INTEGER,
state_timestamp TEXT,
epoch TEXT
);
""" % SHARD_RANGE_TABLE)
conn.execute("""
CREATE TRIGGER shard_range_update BEFORE UPDATE ON %s
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
""" % SHARD_RANGE_TABLE)
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_object_deleted_name' '''):
self._db_version = 1
return self._db_version
def _get_deleted_key(self, connection):
if self.get_db_version(connection) < 1:
return '+deleted'
return 'deleted'
def _newid(self, conn):
conn.execute('''
UPDATE container_stat
SET reported_put_timestamp = 0, reported_delete_timestamp = 0,
reported_object_count = 0, reported_bytes_used = 0''')
def _commit_puts_load(self, item_list, entry):
"""See :func:`swift.common.db.DatabaseBroker._commit_puts_load`"""
(name, timestamp, size, content_type, etag, deleted) = entry[:6]
if len(entry) > 6:
storage_policy_index = entry[6]
else:
storage_policy_index = 0
content_type_timestamp = meta_timestamp = None
if len(entry) > 7:
content_type_timestamp = entry[7]
if len(entry) > 8:
meta_timestamp = entry[8]
item_list.append({'name': name,
'created_at': timestamp,
'size': size,
'content_type': content_type,
'etag': etag,
'deleted': deleted,
'storage_policy_index': storage_policy_index,
'ctype_timestamp': content_type_timestamp,
'meta_timestamp': meta_timestamp})
def _empty(self):
self._commit_puts_stale_ok()
with self.get() as conn:
try:
row = conn.execute(
'SELECT max(object_count) from policy_stat').fetchone()
except sqlite3.OperationalError as err:
if not any(msg in str(err) for msg in (
"no such column: storage_policy_index",
"no such table: policy_stat")):
raise
row = conn.execute(
'SELECT object_count from container_stat').fetchone()
return zero_like(row[0])
def empty(self):
"""
Check if container DB is empty.
This method uses more stringent checks on object count than
:meth:`is_deleted`: this method checks that there are no objects in any
policy; if the container is in the process of sharding then both fresh
and retiring databases are checked to be empty; if a root container has
shard ranges then they are checked to be empty.
:returns: True if the database has no active objects, False otherwise
"""
if not all(broker._empty() for broker in self.get_brokers()):
return False
if self.is_root_container() and self.sharding_initiated():
# sharded shards don't get updates from their shards so their shard
# usage should not be relied upon
return self.get_shard_usage()['object_count'] <= 0
return True
def delete_object(self, name, timestamp, storage_policy_index=0):
"""
Mark an object deleted.
:param name: object name to be deleted
:param timestamp: timestamp when the object was marked as deleted
:param storage_policy_index: the storage policy index for the object
"""
self.put_object(name, timestamp, 0, 'application/deleted', 'noetag',
deleted=1, storage_policy_index=storage_policy_index)
def make_tuple_for_pickle(self, record):
return (record['name'], record['created_at'], record['size'],
record['content_type'], record['etag'], record['deleted'],
record['storage_policy_index'],
record['ctype_timestamp'],
record['meta_timestamp'])
def put_object(self, name, timestamp, size, content_type, etag, deleted=0,
storage_policy_index=0, ctype_timestamp=None,
meta_timestamp=None):
"""
Creates an object in the DB with its metadata.
:param name: object name to be created
:param timestamp: timestamp of when the object was created
:param size: object size
:param content_type: object content-type
:param etag: object etag
:param deleted: if True, marks the object as deleted and sets the
deleted_at timestamp to timestamp
:param storage_policy_index: the storage policy index for the object
:param ctype_timestamp: timestamp of when content_type was last
updated
:param meta_timestamp: timestamp of when metadata was last updated
"""
record = {'name': name, 'created_at': timestamp, 'size': size,
'content_type': content_type, 'etag': etag,
'deleted': deleted,
'storage_policy_index': storage_policy_index,
'ctype_timestamp': ctype_timestamp,
'meta_timestamp': meta_timestamp}
self.put_record(record)
def remove_objects(self, lower, upper, max_row=None):
"""
Removes object records in the given namespace range from the object
table.
Note that objects are removed regardless of their storage_policy_index.
:param lower: defines the lower bound of object names that will be
removed; names greater than this value will be removed; names less
than or equal to this value will not be removed.
:param upper: defines the upper bound of object names that will be
removed; names less than or equal to this value will be removed;
names greater than this value will not be removed. The empty string
is interpreted as there being no upper bound.
:param max_row: if specified only rows less than or equal to max_row
will be removed
"""
query_conditions = []
query_args = []
if max_row is not None:
query_conditions.append('ROWID <= ?')
query_args.append(str(max_row))
if lower:
query_conditions.append('name > ?')
query_args.append(lower)
if upper:
query_conditions.append('name <= ?')
query_args.append(upper)
query = 'DELETE FROM object WHERE deleted in (0, 1)'
if query_conditions:
query += ' AND ' + ' AND '.join(query_conditions)
with self.get() as conn:
conn.execute(query, query_args)
conn.commit()
def _is_deleted_info(self, object_count, put_timestamp, delete_timestamp,
**kwargs):
"""
Apply delete logic to database info.
:returns: True if the DB is considered to be deleted, False otherwise
"""
# The container is considered deleted if the delete_timestamp
# value is greater than the put_timestamp, and there are no
# objects in the container.
return zero_like(object_count) and (
Timestamp(delete_timestamp) > Timestamp(put_timestamp))
def _is_deleted(self, conn):
"""
Check if the DB is considered to be deleted.
This object count used in this check is the same as the container
object count that would be returned in the result of :meth:`get_info`
and exposed to a client i.e. it is based on the container_stat view for
the current storage policy index or relevant shard range usage.
:param conn: database conn
:returns: True if the DB is considered to be deleted, False otherwise
"""
info = conn.execute('''
SELECT put_timestamp, delete_timestamp, object_count
FROM container_stat''').fetchone()
info = dict(info)
info.update(self._get_alternate_object_stats()[1])
return self._is_deleted_info(**info)
def is_reclaimable(self, now, reclaim_age):
with self.get() as conn:
info = conn.execute('''
SELECT put_timestamp, delete_timestamp
FROM container_stat''').fetchone()
if (Timestamp(now - reclaim_age) >
Timestamp(info['delete_timestamp']) >
Timestamp(info['put_timestamp'])):
return self.empty()
return False
def get_info_is_deleted(self):
"""
Get the is_deleted status and info for the container.
:returns: a tuple, in the form (info, is_deleted) info is a dict as
returned by get_info and is_deleted is a boolean.
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return {}, True
info = self.get_info()
return info, self._is_deleted_info(**info)
def get_replication_info(self):
info = super(ContainerBroker, self).get_replication_info()
info['shard_max_row'] = self.get_max_row(SHARD_RANGE_TABLE)
return info
def _do_get_info_query(self, conn):
data = None
trailing_sync = 'x_container_sync_point1, x_container_sync_point2'
trailing_pol = 'storage_policy_index'
errors = set()
while not data:
try:
data = conn.execute(('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, status_changed_at,
object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash,
id, %s, %s
FROM container_stat
''') % (trailing_sync, trailing_pol)).fetchone()
except sqlite3.OperationalError as err:
err_msg = str(err)
if err_msg in errors:
# only attempt migration once
raise
errors.add(err_msg)
if 'no such column: storage_policy_index' in err_msg:
trailing_pol = '0 AS storage_policy_index'
elif 'no such column: x_container_sync_point' in err_msg:
trailing_sync = '-1 AS x_container_sync_point1, ' \
'-1 AS x_container_sync_point2'
else:
raise
data = dict(data)
# populate instance cache
self._storage_policy_index = data['storage_policy_index']
self.account = data['account']
self.container = data['container']
return data
def _get_info(self):
self._commit_puts_stale_ok()
with self.get() as conn:
return self._do_get_info_query(conn)
def _populate_instance_cache(self, conn=None):
# load cached instance attributes from the database if necessary
if self.container is None:
with self.maybe_get(conn) as conn:
self._do_get_info_query(conn)
def _get_alternate_object_stats(self):
state = self.get_db_state()
if state == SHARDING:
other_info = self.get_brokers()[0]._get_info()
stats = {'object_count': other_info['object_count'],
'bytes_used': other_info['bytes_used']}
elif state == SHARDED and self.is_root_container():
stats = self.get_shard_usage()
else:
stats = {}
return state, stats
def get_info(self):
"""
Get global data for the container.
:returns: dict with keys: account, container, created_at,
put_timestamp, delete_timestamp, status_changed_at,
object_count, bytes_used, reported_put_timestamp,
reported_delete_timestamp, reported_object_count,
reported_bytes_used, hash, id, x_container_sync_point1,
x_container_sync_point2, and storage_policy_index,
db_state.
"""
data = self._get_info()
state, stats = self._get_alternate_object_stats()
data.update(stats)
data['db_state'] = state
return data
def set_x_container_sync_points(self, sync_point1, sync_point2):
with self.get() as conn:
try:
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
except sqlite3.OperationalError as err:
if 'no such column: x_container_sync_point' not in \
str(err):
raise
self._migrate_add_container_sync_points(conn)
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
conn.commit()
def _set_x_container_sync_points(self, conn, sync_point1, sync_point2):
if sync_point1 is not None and sync_point2 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point1 = ?,
x_container_sync_point2 = ?
''', (sync_point1, sync_point2))
elif sync_point1 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point1 = ?
''', (sync_point1,))
elif sync_point2 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point2 = ?
''', (sync_point2,))
def get_policy_stats(self):
with self.get() as conn:
try:
info = conn.execute('''
SELECT storage_policy_index, object_count, bytes_used
FROM policy_stat
''').fetchall()
except sqlite3.OperationalError as err:
if not any(msg in str(err) for msg in (
"no such column: storage_policy_index",
"no such table: policy_stat")):
raise
info = conn.execute('''
SELECT 0 as storage_policy_index, object_count, bytes_used
FROM container_stat
''').fetchall()
policy_stats = {}
for row in info:
stats = dict(row)
key = stats.pop('storage_policy_index')
policy_stats[key] = stats
return policy_stats
def has_multiple_policies(self):
with self.get() as conn:
try:
curs = conn.execute('''
SELECT count(storage_policy_index)
FROM policy_stat
''').fetchone()
except sqlite3.OperationalError as err:
if 'no such table: policy_stat' not in str(err):
raise
# no policy_stat row
return False
if curs and curs[0] > 1:
return True
# only one policy_stat row
return False
def set_storage_policy_index(self, policy_index, timestamp=None):
"""
Update the container_stat policy_index and status_changed_at.
"""
if timestamp is None:
timestamp = Timestamp.now().internal
def _setit(conn):
conn.execute('''
INSERT OR IGNORE INTO policy_stat (storage_policy_index)
VALUES (?)
''', (policy_index,))
conn.execute('''
UPDATE container_stat
SET storage_policy_index = ?,
status_changed_at = MAX(?, status_changed_at)
WHERE storage_policy_index <> ?
''', (policy_index, timestamp, policy_index))
conn.commit()
with self.get() as conn:
try:
_setit(conn)
except sqlite3.OperationalError as err:
if not any(msg in str(err) for msg in (
"no such column: storage_policy_index",
"no such table: policy_stat")):
raise
self._migrate_add_storage_policy(conn)
_setit(conn)
self._storage_policy_index = policy_index
def reported(self, put_timestamp, delete_timestamp, object_count,
bytes_used):
"""
Update reported stats, available with container's `get_info`.
:param put_timestamp: put_timestamp to update
:param delete_timestamp: delete_timestamp to update
:param object_count: object_count to update
:param bytes_used: bytes_used to update
"""
with self.get() as conn:
conn.execute('''
UPDATE container_stat
SET reported_put_timestamp = ?, reported_delete_timestamp = ?,
reported_object_count = ?, reported_bytes_used = ?
''', (put_timestamp, delete_timestamp, object_count, bytes_used))
conn.commit()
def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter,
path=None, storage_policy_index=0, reverse=False,
include_deleted=False, since_row=None,
transform_func=None, all_policies=False,
allow_reserved=False):
"""
Get a list of objects sorted by name starting at marker onward, up
to limit entries. Entries will begin with the prefix and will not
have the delimiter after the prefix.
:param limit: maximum number of entries to get
:param marker: marker query
:param end_marker: end marker query
:param prefix: prefix query
:param delimiter: delimiter for query
:param path: if defined, will set the prefix and delimiter based on
the path
:param storage_policy_index: storage policy index for query
:param reverse: reverse the result order.
:param include_deleted: if True, include only deleted objects; if
False (default), include only undeleted objects; otherwise, include
both deleted and undeleted objects.
:param since_row: include only items whose ROWID is greater than
the given row id; by default all rows are included.
:param transform_func: an optional function that if given will be
called for each object to get a transformed version of the object
to include in the listing; should have same signature as
:meth:`~_transform_record`; defaults to :meth:`~_transform_record`.
:param all_policies: if True, include objects for all storage policies
ignoring any value given for ``storage_policy_index``
:param allow_reserved: exclude names with reserved-byte by default
:returns: list of tuples of (name, created_at, size, content_type,
etag, deleted)
"""
if include_deleted is True:
deleted_arg = ' = 1'
elif include_deleted is False:
deleted_arg = ' = 0'
else:
deleted_arg = ' in (0, 1)'
if transform_func is None:
transform_func = self._transform_record
delim_force_gte = False
if six.PY2:
(marker, end_marker, prefix, delimiter, path) = utf8encode(
marker, end_marker, prefix, delimiter, path)
self._commit_puts_stale_ok()
if reverse:
# Reverse the markers if we are reversing the listing.
marker, end_marker = end_marker, marker
if path is not None:
prefix = path
if path:
prefix = path = path.rstrip('/') + '/'
delimiter = '/'
elif delimiter and not prefix:
prefix = ''
if prefix:
end_prefix = prefix[:-1] + chr(ord(prefix[-1]) + 1)
orig_marker = marker
with self.get() as conn:
results = []
deleted_key = self._get_deleted_key(conn)
query_keys = ['name', 'created_at', 'size', 'content_type',
'etag', deleted_key]
while len(results) < limit:
query_args = []
query_conditions = []
if end_marker and (not prefix or end_marker < end_prefix):
query_conditions.append('name < ?')
query_args.append(end_marker)
elif prefix:
query_conditions.append('name < ?')
query_args.append(end_prefix)
if delim_force_gte:
query_conditions.append('name >= ?')
query_args.append(marker)
# Always set back to False
delim_force_gte = False
elif marker and (not prefix or marker >= prefix):
query_conditions.append('name > ?')
query_args.append(marker)
elif prefix:
query_conditions.append('name >= ?')
query_args.append(prefix)
if not allow_reserved:
query_conditions.append('name >= ?')
query_args.append(chr(ord(RESERVED_BYTE) + 1))
query_conditions.append(deleted_key + deleted_arg)
if since_row:
query_conditions.append('ROWID > ?')
query_args.append(since_row)
def build_query(keys, conditions, args):
query = 'SELECT ' + ', '.join(keys) + ' FROM object '
if conditions:
query += 'WHERE ' + ' AND '.join(conditions)
tail_query = '''
ORDER BY name %s LIMIT ?
''' % ('DESC' if reverse else '')
return query + tail_query, args + [limit - len(results)]
# storage policy filter
if all_policies:
query, args = build_query(
query_keys + ['storage_policy_index'],
query_conditions,
query_args)
else:
query, args = build_query(
query_keys + ['storage_policy_index'],
query_conditions + ['storage_policy_index = ?'],
query_args + [storage_policy_index])
try:
curs = conn.execute(query, tuple(args))
except sqlite3.OperationalError as err:
if 'no such column: storage_policy_index' not in str(err):
raise
query, args = build_query(
query_keys + ['0 as storage_policy_index'],
query_conditions, query_args)
curs = conn.execute(query, tuple(args))
curs.row_factory = None
# Delimiters without a prefix is ignored, further if there
# is no delimiter then we can simply return the result as
# prefixes are now handled in the SQL statement.
if prefix is None or not delimiter:
return [transform_func(r) for r in curs]
# We have a delimiter and a prefix (possibly empty string) to
# handle
rowcount = 0
for row in curs:
rowcount += 1
name = row[0]
if reverse:
end_marker = name
else:
marker = name
if len(results) >= limit:
curs.close()
return results
end = name.find(delimiter, len(prefix))
if path is not None:
if name == path:
continue
if end >= 0 and len(name) > end + len(delimiter):
if reverse:
end_marker = name[:end + len(delimiter)]
else:
marker = ''.join([
name[:end],
delimiter[:-1],
chr(ord(delimiter[-1:]) + 1),
])
curs.close()
break
elif end >= 0:
if reverse:
end_marker = name[:end + len(delimiter)]
else:
marker = ''.join([
name[:end],
delimiter[:-1],
chr(ord(delimiter[-1:]) + 1),
])
# we want result to be inclusive of delim+1
delim_force_gte = True
dir_name = name[:end + len(delimiter)]
if dir_name != orig_marker:
results.append([dir_name, '0', 0, None, ''])
curs.close()
break
results.append(transform_func(row))
if not rowcount:
break
return results
def get_objects(self, limit=None, marker='', end_marker='',
include_deleted=None, since_row=None):
"""
Returns a list of objects, including deleted objects, in all policies.
Each object in the list is described by a dict with keys {'name',
'created_at', 'size', 'content_type', 'etag', 'deleted',
'storage_policy_index'}.
:param limit: maximum number of entries to get
:param marker: if set, objects with names less than or equal to this
value will not be included in the list.
:param end_marker: if set, objects with names greater than or equal to
this value will not be included in the list.
:param include_deleted: if True, include only deleted objects; if
False, include only undeleted objects; otherwise (default), include
both deleted and undeleted objects.
:param since_row: include only items whose ROWID is greater than
the given row id; by default all rows are included.
:return: a list of dicts, each describing an object.
"""
limit = CONTAINER_LISTING_LIMIT if limit is None else limit
return self.list_objects_iter(
limit, marker, end_marker, prefix=None, delimiter=None, path=None,
reverse=False, include_deleted=include_deleted,
transform_func=self._record_to_dict, since_row=since_row,
all_policies=True, allow_reserved=True
)
def _transform_record(self, record):
"""
Returns a tuple of (name, last-modified time, size, content_type and
etag) for the given record.
The given record's created_at timestamp is decoded into separate data,
content-type and meta timestamps and the metadata timestamp is used as
the last-modified time value.
"""
t_data, t_ctype, t_meta = decode_timestamps(record[1])
return (record[0], t_meta.internal) + record[2:5]
def _record_to_dict(self, rec):
if rec:
keys = ('name', 'created_at', 'size', 'content_type', 'etag',
'deleted', 'storage_policy_index')
return dict(zip(keys, rec))
return None
def merge_items(self, item_list, source=None):
"""
Merge items into the object table.
:param item_list: list of dictionaries of {'name', 'created_at',
'size', 'content_type', 'etag', 'deleted',
'storage_policy_index', 'ctype_timestamp',
'meta_timestamp'}
:param source: if defined, update incoming_sync with the source
"""
for item in item_list:
if six.PY2 and isinstance(item['name'], six.text_type):
item['name'] = item['name'].encode('utf-8')
elif not six.PY2 and isinstance(item['name'], six.binary_type):
item['name'] = item['name'].decode('utf-8')
def _really_really_merge_items(conn):
curs = conn.cursor()
if self.get_db_version(conn) >= 1:
query_mod = ' deleted IN (0, 1) AND '
else:
query_mod = ''
curs.execute('BEGIN IMMEDIATE')
# Get sqlite records for objects in item_list that already exist.
# We must chunk it up to avoid sqlite's limit of 999 args.
records = {}
for offset in range(0, len(item_list), SQLITE_ARG_LIMIT):
chunk = [rec['name'] for rec in
item_list[offset:offset + SQLITE_ARG_LIMIT]]
records.update(
((rec[0], rec[6]), rec) for rec in curs.execute(
'SELECT name, created_at, size, content_type,'
'etag, deleted, storage_policy_index '
'FROM object WHERE ' + query_mod + ' name IN (%s)' %
','.join('?' * len(chunk)), chunk))
# Sort item_list into things that need adding and deleting, based
# on results of created_at query.
to_delete = set()
to_add = {}
for item in item_list:
item.setdefault('storage_policy_index', 0) # legacy
item_ident = (item['name'], item['storage_policy_index'])
existing = self._record_to_dict(records.get(item_ident))
if update_new_item_from_existing(item, existing):
if item_ident in records: # exists with older timestamp
to_delete.add(item_ident)
if item_ident in to_add: # duplicate entries in item_list
update_new_item_from_existing(item, to_add[item_ident])
to_add[item_ident] = item
if to_delete:
curs.executemany(
'DELETE FROM object WHERE ' + query_mod +
'name=? AND storage_policy_index=?',
(item_ident for item_ident in to_delete))
if to_add:
curs.executemany(
'INSERT INTO object (name, created_at, size, content_type,'
'etag, deleted, storage_policy_index) '
'VALUES (?, ?, ?, ?, ?, ?, ?)',
((rec['name'], rec['created_at'], rec['size'],
rec['content_type'], rec['etag'], rec['deleted'],
rec['storage_policy_index'])
for rec in to_add.values()))
if source:
# for replication we rely on the remote end sending merges in
# order with no gaps to increment sync_points
sync_point = item_list[-1]['ROWID']
curs.execute('''
UPDATE incoming_sync SET
sync_point=max(?, sync_point) WHERE remote_id=?
''', (sync_point, source))
if curs.rowcount < 1:
curs.execute('''
INSERT INTO incoming_sync (sync_point, remote_id)
VALUES (?, ?)
''', (sync_point, source))
conn.commit()
def _really_merge_items(conn):
return tpool.execute(_really_really_merge_items, conn)
with self.get() as conn:
try:
return _really_merge_items(conn)
except sqlite3.OperationalError as err:
if 'no such column: storage_policy_index' not in str(err):
raise
self._migrate_add_storage_policy(conn)
return _really_merge_items(conn)
def merge_shard_ranges(self, shard_ranges):
"""
Merge shard ranges into the shard range table.
:param shard_ranges: a shard range or a list of shard ranges; each
shard range should be an instance of
:class:`~swift.common.utils.ShardRange` or a dict representation of
a shard range having ``SHARD_RANGE_KEYS``.
"""
if not shard_ranges:
return
if not isinstance(shard_ranges, list):
shard_ranges = [shard_ranges]
item_list = []
for item in shard_ranges:
if isinstance(item, ShardRange):
item = dict(item)
for col in ('name', 'lower', 'upper'):
if six.PY2 and isinstance(item[col], six.text_type):
item[col] = item[col].encode('utf-8')
elif not six.PY2 and isinstance(item[col], six.binary_type):
item[col] = item[col].decode('utf-8')
item_list.append(item)
def _really_merge_items(conn):
curs = conn.cursor()
curs.execute('BEGIN IMMEDIATE')
# Get rows for items that already exist.
# We must chunk it up to avoid sqlite's limit of 999 args.
records = {}
for offset in range(0, len(item_list), SQLITE_ARG_LIMIT):
chunk = [record['name'] for record
in item_list[offset:offset + SQLITE_ARG_LIMIT]]
records.update(
(rec[0], rec) for rec in curs.execute(
'SELECT %s FROM %s '
'WHERE deleted IN (0, 1) AND name IN (%s)' %
(', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE,
','.join('?' * len(chunk))), chunk))
# Sort item_list into things that need adding and deleting
to_delete = set()
to_add = {}
for item in item_list:
item_ident = item['name']
existing = records.get(item_ident)
if existing:
existing = dict(zip(SHARD_RANGE_KEYS, existing))
if merge_shards(item, existing):
# exists with older timestamp
if item_ident in records:
to_delete.add(item_ident)
# duplicate entries in item_list
if (item_ident not in to_add or
merge_shards(item, to_add[item_ident])):
to_add[item_ident] = item
if to_delete:
curs.executemany(
'DELETE FROM %s WHERE deleted in (0, 1) '
'AND name = ?' % SHARD_RANGE_TABLE,
((item_ident,) for item_ident in to_delete))
if to_add:
vals = ','.join('?' * len(SHARD_RANGE_KEYS))
curs.executemany(
'INSERT INTO %s (%s) VALUES (%s)' %
(SHARD_RANGE_TABLE, ','.join(SHARD_RANGE_KEYS), vals),
tuple([item[k] for k in SHARD_RANGE_KEYS]
for item in to_add.values()))
conn.commit()
with self.get() as conn:
try:
return _really_merge_items(conn)
except sqlite3.OperationalError as err:
# Without the rollback, new enough (>= py37) python/sqlite3
# will panic:
# sqlite3.OperationalError: cannot start a transaction
# within a transaction
conn.rollback()
if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err):
raise
self.create_shard_range_table(conn)
return _really_merge_items(conn)
def get_reconciler_sync(self):
with self.get() as conn:
try:
return conn.execute('''
SELECT reconciler_sync_point FROM container_stat
''').fetchone()[0]
except sqlite3.OperationalError as err:
if "no such column: reconciler_sync_point" not in str(err):
raise
return -1
def update_reconciler_sync(self, point):
query = '''
UPDATE container_stat
SET reconciler_sync_point = ?
'''
with self.get() as conn:
try:
conn.execute(query, (point,))
except sqlite3.OperationalError as err:
if "no such column: reconciler_sync_point" not in str(err):
raise
self._migrate_add_storage_policy(conn)
conn.execute(query, (point,))
conn.commit()
def get_misplaced_since(self, start, count):
"""
Get a list of objects which are in a storage policy different
from the container's storage policy.
:param start: last reconciler sync point
:param count: maximum number of entries to get
:returns: list of dicts with keys: name, created_at, size,
content_type, etag, storage_policy_index
"""
qry = '''
SELECT ROWID, name, created_at, size, content_type, etag,
deleted, storage_policy_index
FROM object
WHERE ROWID > ?
AND storage_policy_index != (
SELECT storage_policy_index FROM container_stat LIMIT 1)
ORDER BY ROWID ASC LIMIT ?
'''
self._commit_puts_stale_ok()
with self.get() as conn:
try:
cur = conn.execute(qry, (start, count))
except sqlite3.OperationalError as err:
if "no such column: storage_policy_index" not in str(err):
raise
return []
return list(dict(row) for row in cur.fetchall())
def _migrate_add_container_sync_points(self, conn):
"""
Add the x_container_sync_point columns to the 'container_stat' table.
"""
conn.executescript('''
BEGIN;
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point1 INTEGER DEFAULT -1;
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point2 INTEGER DEFAULT -1;
COMMIT;
''')
def _migrate_add_storage_policy(self, conn):
"""
Migrate the container schema to support tracking objects from
multiple storage policies. If the container_stat table has any
pending migrations, they are applied now before copying into
container_info.
* create the 'policy_stat' table.
* copy the current 'object_count' and 'bytes_used' columns to a
row in the 'policy_stat' table.
* add the storage_policy_index column to the 'object' table.
* drop the 'object_insert' and 'object_delete' triggers.
* add the 'object_insert_policy_stat' and
'object_delete_policy_stat' triggers.
* create container_info table for non-policy container info
* insert values from container_stat into container_info
* drop container_stat table
* create container_stat view
"""
# I tried just getting the list of column names in the current
# container_stat table with a pragma table_info, but could never get
# it inside the same transaction as the DDL (non-DML) statements:
# https://docs.python.org/2/library/sqlite3.html
# #controlling-transactions
# So we just apply all pending migrations to container_stat and copy a
# static known list of column names into container_info.
try:
self._migrate_add_container_sync_points(conn)
except sqlite3.OperationalError as e:
if 'duplicate column' in str(e):
conn.execute('ROLLBACK;')
else:
raise
try:
conn.executescript("""
ALTER TABLE container_stat
ADD COLUMN metadata TEXT DEFAULT '';
""")
except sqlite3.OperationalError as e:
if 'duplicate column' not in str(e):
raise
column_names = ', '.join((
'account', 'container', 'created_at', 'put_timestamp',
'delete_timestamp', 'reported_put_timestamp',
'reported_object_count', 'reported_bytes_used', 'hash', 'id',
'status', 'status_changed_at', 'metadata',
'x_container_sync_point1', 'x_container_sync_point2'))
conn.executescript(
'BEGIN;' +
POLICY_STAT_TABLE_CREATE +
'''
INSERT INTO policy_stat (
storage_policy_index, object_count, bytes_used)
SELECT 0, object_count, bytes_used
FROM container_stat;
ALTER TABLE object
ADD COLUMN storage_policy_index INTEGER DEFAULT 0;
DROP TRIGGER object_insert;
DROP TRIGGER object_delete;
''' +
POLICY_STAT_TRIGGER_SCRIPT +
CONTAINER_INFO_TABLE_SCRIPT +
'''
INSERT INTO container_info (%s)
SELECT %s FROM container_stat;
DROP TABLE IF EXISTS container_stat;
''' % (column_names, column_names) +
CONTAINER_STAT_VIEW_SCRIPT +
'COMMIT;')
def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
super(ContainerBroker, self)._reclaim_other_stuff(
conn, age_timestamp, sync_timestamp)
# populate instance cache, but use existing conn to avoid deadlock
# when it has a pending update
self._populate_instance_cache(conn=conn)
try:
conn.execute('''
DELETE FROM %s WHERE deleted = 1 AND timestamp < ?
AND name != ?
''' % SHARD_RANGE_TABLE, (sync_timestamp, self.path))
except sqlite3.OperationalError as err:
if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err):
raise
def _get_shard_range_rows(self, connection=None, include_deleted=False,
states=None, include_own=False,
exclude_others=False):
"""
Returns a list of shard range rows.
To get all shard ranges use ``include_own=True``. To get only the
broker's own shard range use ``include_own=True`` and
``exclude_others=True``.
:param connection: db connection
:param include_deleted: include rows marked as deleted
:param states: include only rows matching the given state(s); can be an
int or a list of ints.
:param include_own: boolean that governs whether the row whose name
matches the broker's path is included in the returned list. If
True, that row is included, otherwise it is not included. Default
is False.
:param exclude_others: boolean that governs whether the rows whose
names do not match the broker's path are included in the returned
list. If True, those rows are not included, otherwise they are
included. Default is False.
:return: a list of tuples.
"""
if exclude_others and not include_own:
return []
included_states = set()
if isinstance(states, (list, tuple, set)):
included_states.update(states)
elif states is not None:
included_states.add(states)
def do_query(conn):
condition = ''
conditions = []
params = []
if not include_deleted:
conditions.append('deleted=0')
if included_states:
conditions.append('state in (%s)' % ','.join(
'?' * len(included_states)))
params.extend(included_states)
if not include_own:
conditions.append('name != ?')
params.append(self.path)
if exclude_others:
conditions.append('name = ?')
params.append(self.path)
if conditions:
condition = ' WHERE ' + ' AND '.join(conditions)
sql = '''
SELECT %s
FROM %s%s;
''' % (', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE, condition)
data = conn.execute(sql, params)
data.row_factory = None
return [row for row in data]
try:
with self.maybe_get(connection) as conn:
return do_query(conn)
except sqlite3.OperationalError as err:
if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err):
raise
return []
@classmethod
def resolve_shard_range_states(cls, states):
"""
Given a list of values each of which may be the name of a state, the
number of a state, or an alias, return the set of state numbers
described by the list.
The following alias values are supported: 'listing' maps to all states
that are considered valid when listing objects; 'updating' maps to all
states that are considered valid for redirecting an object update.
:param states: a list of values each of which may be the name of a
state, the number of a state, or an alias
:return: a set of integer state numbers, or None if no states are given
:raises ValueError: if any value in the given list is neither a valid
state nor a valid alias
"""
if states:
resolved_states = set()
for state in states:
if state == 'listing':
resolved_states.update(SHARD_LISTING_STATES)
elif state == 'updating':
resolved_states.update(SHARD_UPDATE_STATES)
else:
resolved_states.add(ShardRange.resolve_state(state)[0])
return resolved_states
return None
def get_shard_ranges(self, marker=None, end_marker=None, includes=None,
reverse=False, include_deleted=False, states=None,
include_own=False,
exclude_others=False, fill_gaps=False):
"""
Returns a list of persisted shard ranges.
:param marker: restricts the returned list to shard ranges whose
namespace includes or is greater than the marker value.
:param end_marker: restricts the returned list to shard ranges whose
namespace includes or is less than the end_marker value.
:param includes: restricts the returned list to the shard range that
includes the given value; if ``includes`` is specified then
``marker`` and ``end_marker`` are ignored.
:param reverse: reverse the result order.
:param include_deleted: include items that have the delete marker set
:param states: if specified, restricts the returned list to shard
ranges that have the given state(s); can be a list of ints or a
single int.
:param include_own: boolean that governs whether the row whose name
matches the broker's path is included in the returned list. If
True, that row is included, otherwise it is not included. Default
is False.
:param exclude_others: boolean that governs whether the rows whose
names do not match the broker's path are included in the returned
list. If True, those rows are not included, otherwise they are
included. Default is False.
:param fill_gaps: if True, insert own shard range to fill any gaps in
at the tail of other shard ranges.
:return: a list of instances of :class:`swift.common.utils.ShardRange`
"""
def shard_range_filter(sr):
end = start = True
if end_marker:
end = end_marker > sr.lower
if marker:
start = marker < sr.upper
return start and end
if reverse:
marker, end_marker = end_marker, marker
if marker and end_marker and marker >= end_marker:
return []
shard_ranges = [
ShardRange(*row)
for row in self._get_shard_range_rows(
include_deleted=include_deleted, states=states,
include_own=include_own,
exclude_others=exclude_others)]
# note if this ever changes to *not* sort by upper first then it breaks
# a key assumption for bisect, which is used by utils.find_shard_ranges
shard_ranges.sort(key=lambda sr: (sr.upper, sr.state, sr.lower))
if includes:
shard_range = find_shard_range(includes, shard_ranges)
return [shard_range] if shard_range else []
if marker or end_marker:
shard_ranges = list(filter(shard_range_filter, shard_ranges))
if fill_gaps:
if shard_ranges:
last_upper = shard_ranges[-1].upper
else:
last_upper = marker or ShardRange.MIN
required_upper = end_marker or ShardRange.MAX
if required_upper > last_upper:
filler_sr = self.get_own_shard_range()
filler_sr.lower = last_upper
filler_sr.upper = required_upper
shard_ranges.append(filler_sr)
if reverse:
shard_ranges.reverse()
return shard_ranges
def _own_shard_range(self, no_default=False):
shard_ranges = self.get_shard_ranges(include_own=True,
include_deleted=True,
exclude_others=True)
if shard_ranges:
own_shard_range = shard_ranges[0]
elif no_default:
return None
else:
own_shard_range = ShardRange(
self.path, Timestamp.now(), ShardRange.MIN, ShardRange.MAX,
state=ShardRange.ACTIVE)
return own_shard_range
def get_own_shard_range(self, no_default=False):
"""
Returns a shard range representing this broker's own shard range. If no
such range has been persisted in the broker's shard ranges table then a
default shard range representing the entire namespace will be returned.
The returned shard range will be updated with the current object stats
for this broker and a meta timestamp set to the current time. For these
values to be persisted the caller must merge the shard range.
:param no_default: if True and the broker's own shard range is not
found in the shard ranges table then None is returned, otherwise a
default shard range is returned.
:return: an instance of :class:`~swift.common.utils.ShardRange`
"""
own_shard_range = self._own_shard_range(no_default=no_default)
if own_shard_range:
info = self.get_info()
own_shard_range.update_meta(
info['object_count'], info['bytes_used'])
return own_shard_range
def is_own_shard_range(self, shard_range):
return shard_range.name == self.path
def enable_sharding(self, epoch):
"""
Updates this broker's own shard range with the given epoch, sets its
state to SHARDING and persists it in the DB.
:param epoch: a :class:`~swift.utils.common.Timestamp`
:return: the broker's updated own shard range.
"""
own_shard_range = self._own_shard_range()
own_shard_range.update_state(ShardRange.SHARDING, epoch)
own_shard_range.epoch = epoch
self.merge_shard_ranges(own_shard_range)
return own_shard_range
def get_shard_usage(self):
"""
Get the aggregate object stats for all shard ranges in states ACTIVE,
SHARDING or SHRINKING.
:return: a dict with keys {bytes_used, object_count}
"""
shard_ranges = self.get_shard_ranges(states=SHARD_STATS_STATES)
return {'bytes_used': sum(sr.bytes_used for sr in shard_ranges),
'object_count': sum(sr.object_count for sr in shard_ranges)}
def get_all_shard_range_data(self):
"""
Returns a list of all shard range data, including own shard range and
deleted shard ranges.
:return: A list of dict representations of a ShardRange.
"""
shard_ranges = self.get_shard_ranges(include_deleted=True,
include_own=True)
return [dict(sr) for sr in shard_ranges]
def set_sharding_state(self):
"""
Creates and initializes a fresh DB file in preparation for sharding a
retiring DB. The broker's own shard range must have an epoch timestamp
for this method to succeed.
:return: True if the fresh DB was successfully created, False
otherwise.
"""
epoch = self.get_own_shard_range().epoch
if not epoch:
self.logger.warning("Container '%s' cannot be set to sharding "
"state: missing epoch", self.path)
return False
state = self.get_db_state()
if not state == UNSHARDED:
self.logger.warning("Container '%s' cannot be set to sharding "
"state while in %s state", self.path, state)
return False
info = self.get_info()
# The tmp_dir is cleaned up by the replicators after reclaim_age, so if
# we initially create the fresh DB there, we will already have cleanup
# covered if there is an error.
tmp_dir = os.path.join(self.get_device_path(), 'tmp')
if not os.path.exists(tmp_dir):
mkdirs(tmp_dir)
tmp_db_file = os.path.join(tmp_dir, "fresh%s.db" % str(uuid4()))
fresh_broker = ContainerBroker(tmp_db_file, self.timeout, self.logger,
self.account, self.container)
fresh_broker.initialize(info['put_timestamp'],
info['storage_policy_index'])
# copy relevant data from the retiring db to the fresh db
fresh_broker.update_metadata(self.metadata)
fresh_broker.merge_shard_ranges(self.get_all_shard_range_data())
# copy sync points so that any peer in sync with retiring db will
# appear to be in sync with the fresh db, although the peer shouldn't
# attempt to replicate objects to a db with shard ranges.
for incoming in (True, False):
syncs = self.get_syncs(incoming)
fresh_broker.merge_syncs(syncs, incoming)
max_row = self.get_max_row()
with fresh_broker.get() as fresh_broker_conn:
# Initialise the rowid to continue from where the retiring db ended
try:
sql = "INSERT into object " \
"(ROWID, name, created_at, size, content_type, etag) " \
"values (?, 'tmp_sharding', ?, 0, '', ?)"
fresh_broker_conn.execute(
sql, (max_row, Timestamp.now().internal,
MD5_OF_EMPTY_STRING))
fresh_broker_conn.execute(
'DELETE FROM object WHERE ROWID = ?', (max_row,))
fresh_broker_conn.commit()
except sqlite3.OperationalError as err:
self.logger.error(
'Failed to set the ROWID of the fresh database for %s: %s',
self.path, err)
return False
# Set the created_at and hash in the container_info table the same
# in both brokers
try:
fresh_broker_conn.execute(
'UPDATE container_stat SET created_at=?',
(info['created_at'],))
fresh_broker_conn.commit()
except sqlite3.OperationalError as err:
self.logger.error('Failed to set matching created_at time in '
'the fresh database for %s: %s',
self.path, err)
return False
# Rename to the new database
fresh_db_filename = make_db_file_path(self._db_file, epoch)
renamer(tmp_db_file, fresh_db_filename)
self.reload_db_files()
return True
def set_sharded_state(self):
"""
Unlink's the broker's retiring DB file.
:return: True if the retiring DB was successfully unlinked, False
otherwise.
"""
state = self.get_db_state()
if not state == SHARDING:
self.logger.warning("Container %r cannot be set to sharded "
"state while in %s state",
self.path, state)
return False
self.reload_db_files()
if len(self.db_files) < 2:
self.logger.warning(
'Refusing to delete db file for %r: no fresher db file found '
'in %r.', self.path, self.db_files)
return False
retiring_file = self.db_files[-2]
try:
os.unlink(retiring_file)
self.logger.debug('Unlinked retiring db %r', retiring_file)
except OSError as err:
if err.errno != errno.ENOENT:
self.logger.exception('Failed to unlink %r' % self._db_file)
return False
self.reload_db_files()
if len(self.db_files) >= 2:
self.logger.warning(
'Still have multiple db files after unlinking %r: %r',
retiring_file, self.db_files)
return False
return True
def get_brokers(self):
"""
Return a list of brokers for component dbs. The list has two entries
while the db state is sharding: the first entry is a broker for the
retiring db with ``skip_commits`` set to ``True``; the second entry is
a broker for the fresh db with ``skip_commits`` set to ``False``. For
any other db state the list has one entry.
:return: a list of :class:`~swift.container.backend.ContainerBroker`
"""
if len(self.db_files) > 2:
self.logger.warning('Unexpected db files will be ignored: %s' %
self.db_files[:-2])
brokers = []
db_files = self.db_files[-2:]
while db_files:
db_file = db_files.pop(0)
sub_broker = ContainerBroker(
db_file, self.timeout, self.logger, self.account,
self.container, self.pending_timeout, self.stale_reads_ok,
force_db_file=True, skip_commits=bool(db_files))
brokers.append(sub_broker)
return brokers
def set_sharding_sysmeta(self, key, value):
"""
Updates the broker's metadata stored under the given key
prefixed with a sharding specific namespace.
:param key: metadata key in the sharding metadata namespace.
:param value: metadata value
"""
self.update_metadata({'X-Container-Sysmeta-Shard-' + key:
(value, Timestamp.now().internal)})
def get_sharding_sysmeta_with_timestamps(self):
"""
Returns sharding specific info from the broker's metadata with
timestamps.
:param key: if given the value stored under ``key`` in the sharding
info will be returned.
:return: a dict of sharding info with their timestamps.
"""
prefix = 'X-Container-Sysmeta-Shard-'
return {
k[len(prefix):]: v
for k, v in self.metadata.items()
if k.startswith(prefix)
}
def get_sharding_sysmeta(self, key=None):
"""
Returns sharding specific info from the broker's metadata.
:param key: if given the value stored under ``key`` in the sharding
info will be returned.
:return: either a dict of sharding info or the value stored under
``key`` in that dict.
"""
info = self.get_sharding_sysmeta_with_timestamps()
if key:
return info.get(key, (None, None))[0]
else:
return {k: v[0] for k, v in info.items()}
def _load_root_info(self):
"""
Load the root container name and account for the container represented
by this broker.
The root container path, if set, is stored in sysmeta under the key
``X-Container-Sysmeta-Shard-Root``. If this sysmeta is not set then the
container is considered to be a root container and ``_root_account``
and ``_root_container`` are set equal to the broker ``account`` and
``container`` attributes respectively.
"""
path = self.get_sharding_sysmeta('Quoted-Root')
hdr = 'X-Container-Sysmeta-Shard-Quoted-Root'
if path:
path = unquote(path)
else:
path = self.get_sharding_sysmeta('Root')
hdr = 'X-Container-Sysmeta-Shard-Root'
if not path:
# Ensure account/container get populated
self._populate_instance_cache()
self._root_account = self.account
self._root_container = self.container
return
try:
self._root_account, self._root_container = split_path(
'/' + path, 2, 2)
except ValueError:
raise ValueError("Expected %s to be of the form "
"'account/container', got %r" % (hdr, path))
@property
def root_account(self):
if not self._root_account:
self._load_root_info()
return self._root_account
@property
def root_container(self):
if not self._root_container:
self._load_root_info()
return self._root_container
@property
def root_path(self):
return '%s/%s' % (self.root_account, self.root_container)
def is_root_container(self):
"""
Returns True if this container is a root container, False otherwise.
A root container is a container that is not a shard of another
container.
"""
self._populate_instance_cache()
return (self.root_account == self.account and
self.root_container == self.container)
def _get_next_shard_range_upper(self, shard_size, last_upper=None):
"""
Returns the name of the object that is ``shard_size`` rows beyond
``last_upper`` in the object table ordered by name. If ``last_upper``
is not given then it defaults to the start of object table ordered by
name.
:param last_upper: the upper bound of the last found shard range.
:return: an object name, or None if the number of rows beyond
``last_upper`` is less than ``shard_size``.
"""
self._commit_puts_stale_ok()
with self.get() as connection:
sql = ('SELECT name FROM object WHERE %s=0 ' %
self._get_deleted_key(connection))
args = []
if last_upper:
sql += "AND name > ? "
args.append(str(last_upper))
sql += "ORDER BY name LIMIT 1 OFFSET %d" % (shard_size - 1)
row = connection.execute(sql, args).fetchone()
return row['name'] if row else None
def find_shard_ranges(self, shard_size, limit=-1, existing_ranges=None):
"""
Scans the container db for shard ranges. Scanning will start at the
upper bound of the any ``existing_ranges`` that are given, otherwise
at ``ShardRange.MIN``. Scanning will stop when ``limit`` shard ranges
have been found or when no more shard ranges can be found. In the
latter case, the upper bound of the final shard range will be equal to
the upper bound of the container namespace.
This method does not modify the state of the db; callers are
responsible for persisting any shard range data in the db.
:param shard_size: the size of each shard range
:param limit: the maximum number of shard points to be found; a
negative value (default) implies no limit.
:param existing_ranges: an optional list of existing ShardRanges; if
given, this list should be sorted in order of upper bounds; the
scan for new shard ranges will start at the upper bound of the last
existing ShardRange.
:return: a tuple; the first value in the tuple is a list of
dicts each having keys {'index', 'lower', 'upper', 'object_count'}
in order of ascending 'upper'; the second value in the tuple is a
boolean which is True if the last shard range has been found, False
otherwise.
"""
existing_ranges = existing_ranges or []
object_count = self.get_info().get('object_count', 0)
if shard_size >= object_count:
# container not big enough to shard
return [], False
own_shard_range = self.get_own_shard_range()
progress = 0
progress_reliable = True
# update initial state to account for any existing shard ranges
if existing_ranges:
if all([sr.state == ShardRange.FOUND
for sr in existing_ranges]):
progress = sum([sr.object_count for sr in existing_ranges])
else:
# else: object count in existing shard ranges may have changed
# since they were found so progress cannot be reliably
# calculated; use default progress of zero - that's ok,
# progress is used for optimisation not correctness
progress_reliable = False
last_shard_upper = existing_ranges[-1].upper
if last_shard_upper >= own_shard_range.upper:
# == implies all ranges were previously found
# > implies an acceptor range has been set into which this
# shard should cleave itself
return [], True
else:
last_shard_upper = own_shard_range.lower
found_ranges = []
sub_broker = self.get_brokers()[0]
index = len(existing_ranges)
while limit is None or limit < 0 or len(found_ranges) < limit:
if progress + shard_size >= object_count:
# next shard point is at or beyond final object name so don't
# bother with db query
next_shard_upper = None
else:
try:
next_shard_upper = sub_broker._get_next_shard_range_upper(
shard_size, last_shard_upper)
except (sqlite3.OperationalError, LockTimeout):
self.logger.exception(
"Problem finding shard upper in %r: " % self.db_file)
break
if (next_shard_upper is None or
next_shard_upper > own_shard_range.upper):
# We reached the end of the container namespace, or possibly
# beyond if the container has misplaced objects. In either case
# limit the final shard range to own_shard_range.upper.
next_shard_upper = own_shard_range.upper
if progress_reliable:
# object count may include misplaced objects so the final
# shard size may not be accurate until cleaved, but at
# least the sum of shard sizes will equal the unsharded
# object_count
shard_size = object_count - progress
# NB shard ranges are created with a non-zero object count so that
# the apparent container object count remains constant, and the
# container is non-deletable while shards have been found but not
# yet cleaved
found_ranges.append(
{'index': index,
'lower': str(last_shard_upper),
'upper': str(next_shard_upper),
'object_count': shard_size})
if next_shard_upper == own_shard_range.upper:
return found_ranges, True
progress += shard_size
last_shard_upper = next_shard_upper
index += 1
return found_ranges, False