Use sqlite's new WAL mechanism as a replacement for .pending files.

This commit is contained in:
Michael Barton
2011-02-09 19:02:29 +00:00
committed by Tarmac
4 changed files with 134 additions and 258 deletions

View File

@@ -86,8 +86,6 @@ class AccountController(object):
return Response(status='507 %s is not mounted' % drive) return Response(status='507 %s is not mounted' % drive)
broker = self._get_account_broker(drive, part, account) broker = self._get_account_broker(drive, part, account)
if container: # put account container if container: # put account container
if 'x-cf-trans-id' in req.headers:
broker.pending_timeout = 3
if req.headers.get('x-account-override-deleted', 'no').lower() != \ if req.headers.get('x-account-override-deleted', 'no').lower() != \
'yes' and broker.is_deleted(): 'yes' and broker.is_deleted():
return HTTPNotFound(request=req) return HTTPNotFound(request=req)
@@ -140,9 +138,6 @@ class AccountController(object):
if self.mount_check and not check_mount(self.root, drive): if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive) return Response(status='507 %s is not mounted' % drive)
broker = self._get_account_broker(drive, part, account) broker = self._get_account_broker(drive, part, account)
if not container:
broker.pending_timeout = 0.1
broker.stale_reads_ok = True
if broker.is_deleted(): if broker.is_deleted():
return HTTPNotFound(request=req) return HTTPNotFound(request=req)
info = broker.get_info() info = broker.get_info()
@@ -171,8 +166,6 @@ class AccountController(object):
if self.mount_check and not check_mount(self.root, drive): if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive) return Response(status='507 %s is not mounted' % drive)
broker = self._get_account_broker(drive, part, account) broker = self._get_account_broker(drive, part, account)
broker.pending_timeout = 0.1
broker.stale_reads_ok = True
if broker.is_deleted(): if broker.is_deleted():
return HTTPNotFound(request=req) return HTTPNotFound(request=req)
info = broker.get_info() info = broker.get_info()

View File

@@ -27,13 +27,14 @@ import cPickle as pickle
import errno import errno
from random import randint from random import randint
from tempfile import mkstemp from tempfile import mkstemp
import traceback
from eventlet import sleep from eventlet import sleep
import simplejson as json import simplejson as json
import sqlite3 import sqlite3
from swift.common.utils import normalize_timestamp, renamer, \ from swift.common.utils import normalize_timestamp, renamer, \
mkdirs, lock_parent_directory, fallocate mkdirs, lock_parent_directory
from swift.common.exceptions import LockTimeout from swift.common.exceptions import LockTimeout
@@ -41,8 +42,9 @@ from swift.common.exceptions import LockTimeout
BROKER_TIMEOUT = 25 BROKER_TIMEOUT = 25
#: Pickle protocol to use #: Pickle protocol to use
PICKLE_PROTOCOL = 2 PICKLE_PROTOCOL = 2
#: Max number of pending entries CONNECT_ATTEMPTS = 4
PENDING_CAP = 131072 PENDING_COMMIT_TIMEOUT = 900
AUTOCHECKPOINT = 8192
class DatabaseConnectionError(sqlite3.DatabaseError): class DatabaseConnectionError(sqlite3.DatabaseError):
@@ -123,48 +125,48 @@ def get_db_connection(path, timeout=30, okay_to_create=False):
:param okay_to_create: if True, create the DB if it doesn't exist :param okay_to_create: if True, create the DB if it doesn't exist
:returns: DB connection object :returns: DB connection object
""" """
try: # retry logic to address:
connect_time = time.time() # http://www.mail-archive.com/sqlite-users@sqlite.org/msg57092.html
conn = sqlite3.connect(path, check_same_thread=False, for attempt in xrange(CONNECT_ATTEMPTS):
factory=GreenDBConnection, timeout=timeout) try:
if path != ':memory:' and not okay_to_create: connect_time = time.time()
conn = sqlite3.connect(path, check_same_thread=False,
factory=GreenDBConnection, timeout=timeout)
# attempt to detect and fail when connect creates the db file # attempt to detect and fail when connect creates the db file
stat = os.stat(path) if path != ':memory:' and not okay_to_create:
if stat.st_size == 0 and stat.st_ctime >= connect_time: stat = os.stat(path)
os.unlink(path) if stat.st_size == 0 and stat.st_ctime >= connect_time:
raise DatabaseConnectionError(path, os.unlink(path)
'DB file created by connect?') raise DatabaseConnectionError(path,
conn.row_factory = sqlite3.Row 'DB file created by connect?')
conn.text_factory = str conn.execute('PRAGMA journal_mode = WAL')
conn.execute('PRAGMA synchronous = NORMAL') conn.execute('PRAGMA synchronous = NORMAL')
conn.execute('PRAGMA count_changes = OFF') conn.execute('PRAGMA wal_autocheckpoint = %s' % AUTOCHECKPOINT)
conn.execute('PRAGMA temp_store = MEMORY') conn.execute('PRAGMA count_changes = OFF')
conn.execute('PRAGMA journal_mode = DELETE') conn.execute('PRAGMA temp_store = MEMORY')
conn.create_function('chexor', 3, chexor) conn.create_function('chexor', 3, chexor)
except sqlite3.DatabaseError: conn.row_factory = sqlite3.Row
import traceback conn.text_factory = str
raise DatabaseConnectionError(path, traceback.format_exc(), return conn
timeout=timeout) except sqlite3.DatabaseError, e:
return conn errstr = traceback.format_exc()
raise DatabaseConnectionError(path, errstr, timeout=timeout)
class DatabaseBroker(object): class DatabaseBroker(object):
"""Encapsulates working with a database.""" """Encapsulates working with a database."""
def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None, def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None,
account=None, container=None, pending_timeout=10, account=None, container=None):
stale_reads_ok=False):
""" Encapsulates working with a database. """ """ Encapsulates working with a database. """
self.conn = None self.conn = None
self.db_file = db_file self.db_file = db_file
self.pending_file = self.db_file + '.pending'
self.pending_timeout = pending_timeout
self.stale_reads_ok = stale_reads_ok
self.db_dir = os.path.dirname(db_file) self.db_dir = os.path.dirname(db_file)
self.timeout = timeout self.timeout = timeout
self.logger = logger or logging.getLogger() self.logger = logger or logging.getLogger()
self.account = account self.account = account
self.container = container self.container = container
self._db_version = -1
def initialize(self, put_timestamp=None): def initialize(self, put_timestamp=None):
""" """
@@ -233,7 +235,7 @@ class DatabaseBroker(object):
conn.close() conn.close()
with open(tmp_db_file, 'r+b') as fp: with open(tmp_db_file, 'r+b') as fp:
os.fsync(fp.fileno()) os.fsync(fp.fileno())
with lock_parent_directory(self.db_file, self.pending_timeout): with lock_parent_directory(self.db_file, self.timeout):
if os.path.exists(self.db_file): if os.path.exists(self.db_file):
# It's as if there was a "condition" where different parts # It's as if there was a "condition" where different parts
# of the system were "racing" each other. # of the system were "racing" each other.
@@ -285,6 +287,7 @@ class DatabaseBroker(object):
self.conn = None self.conn = None
orig_isolation_level = conn.isolation_level orig_isolation_level = conn.isolation_level
conn.isolation_level = None conn.isolation_level = None
conn.execute('PRAGMA journal_mode = DELETE') # remove any journal files
conn.execute('BEGIN IMMEDIATE') conn.execute('BEGIN IMMEDIATE')
try: try:
yield True yield True
@@ -292,6 +295,7 @@ class DatabaseBroker(object):
pass pass
try: try:
conn.execute('ROLLBACK') conn.execute('ROLLBACK')
conn.execute('PRAGMA journal_mode = WAL') # back to WAL mode
conn.isolation_level = orig_isolation_level conn.isolation_level = orig_isolation_level
self.conn = conn self.conn = conn
except Exception: except Exception:
@@ -348,11 +352,6 @@ class DatabaseBroker(object):
:param count: number to get :param count: number to get
:returns: list of objects between start and end :returns: list of objects between start and end
""" """
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
curs = conn.execute(''' curs = conn.execute('''
SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ? SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ?
@@ -401,11 +400,7 @@ class DatabaseBroker(object):
:returns: dict containing keys: hash, id, created_at, put_timestamp, :returns: dict containing keys: hash, id, created_at, put_timestamp,
delete_timestamp, count, max_row, and metadata delete_timestamp, count, max_row, and metadata
""" """
try: self._commit_puts()
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
query_part1 = ''' query_part1 = '''
SELECT hash, id, created_at, put_timestamp, delete_timestamp, SELECT hash, id, created_at, put_timestamp, delete_timestamp,
%s_count AS count, %s_count AS count,
@@ -455,34 +450,6 @@ class DatabaseBroker(object):
(rec['sync_point'], rec['remote_id'])) (rec['sync_point'], rec['remote_id']))
conn.commit() conn.commit()
def _preallocate(self):
"""
The idea is to allocate space in front of an expanding db. If it gets
within 512k of a boundary, it allocates to the next boundary.
Boundaries are 2m, 5m, 10m, 25m, 50m, then every 50m after.
"""
if self.db_file == ':memory:':
return
MB = (1024 * 1024)
def prealloc_points():
for pm in (1, 2, 5, 10, 25, 50):
yield pm * MB
while True:
pm += 50
yield pm * MB
stat = os.stat(self.db_file)
file_size = stat.st_size
allocated_size = stat.st_blocks * 512
for point in prealloc_points():
if file_size <= point - MB / 2:
prealloc_size = point
break
if allocated_size < prealloc_size:
with open(self.db_file, 'rb+') as fp:
fallocate(fp.fileno(), int(prealloc_size))
@property @property
def metadata(self): def metadata(self):
""" """
@@ -607,7 +574,7 @@ class ContainerBroker(DatabaseBroker):
conn.executescript(""" conn.executescript("""
CREATE TABLE object ( CREATE TABLE object (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT, ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE, name TEXT,
created_at TEXT, created_at TEXT,
size INTEGER, size INTEGER,
content_type TEXT, content_type TEXT,
@@ -615,7 +582,7 @@ class ContainerBroker(DatabaseBroker):
deleted INTEGER DEFAULT 0 deleted INTEGER DEFAULT 0
); );
CREATE INDEX ix_object_deleted ON object (deleted); CREATE INDEX ix_object_deleted_name ON object (deleted, name);
CREATE TRIGGER object_insert AFTER INSERT ON object CREATE TRIGGER object_insert AFTER INSERT ON object
BEGIN BEGIN
@@ -678,6 +645,15 @@ class ContainerBroker(DatabaseBroker):
''', (self.account, self.container, normalize_timestamp(time.time()), ''', (self.account, self.container, normalize_timestamp(time.time()),
str(uuid4()), put_timestamp)) str(uuid4()), put_timestamp))
def _get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_object_deleted_name' '''):
self._db_version = 1
return self._db_version
def _newid(self, conn): def _newid(self, conn):
conn.execute(''' conn.execute('''
UPDATE container_stat UPDATE container_stat
@@ -717,11 +693,6 @@ class ContainerBroker(DatabaseBroker):
:returns: True if the database has no active objects, False otherwise :returns: True if the database has no active objects, False otherwise
""" """
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
row = conn.execute( row = conn.execute(
'SELECT object_count from container_stat').fetchone() 'SELECT object_count from container_stat').fetchone()
@@ -729,17 +700,16 @@ class ContainerBroker(DatabaseBroker):
def _commit_puts(self, item_list=None): def _commit_puts(self, item_list=None):
"""Handles commiting rows in .pending files.""" """Handles commiting rows in .pending files."""
if self.db_file == ':memory:' or not os.path.exists(self.pending_file): pending_file = self.db_file + '.pending'
if self.db_file == ':memory:' or not os.path.exists(pending_file):
return
if not os.path.getsize(pending_file):
os.unlink(pending_file)
return return
if item_list is None: if item_list is None:
item_list = [] item_list = []
with lock_parent_directory(self.pending_file, self.pending_timeout): with lock_parent_directory(pending_file, PENDING_COMMIT_TIMEOUT):
self._preallocate() with open(pending_file, 'r+b') as fp:
if not os.path.getsize(self.pending_file):
if item_list:
self.merge_items(item_list)
return
with open(self.pending_file, 'r+b') as fp:
for entry in fp.read().split(':'): for entry in fp.read().split(':'):
if entry: if entry:
try: try:
@@ -752,11 +722,11 @@ class ContainerBroker(DatabaseBroker):
except Exception: except Exception:
self.logger.exception( self.logger.exception(
_('Invalid pending entry %(file)s: %(entry)s'), _('Invalid pending entry %(file)s: %(entry)s'),
{'file': self.pending_file, 'entry': entry}) {'file': pending_file, 'entry': entry})
if item_list: if item_list:
self.merge_items(item_list) self.merge_items(item_list)
try: try:
os.ftruncate(fp.fileno(), 0) os.unlink(pending_file)
except OSError, err: except OSError, err:
if err.errno != errno.ENOENT: if err.errno != errno.ENOENT:
raise raise
@@ -774,7 +744,6 @@ class ContainerBroker(DatabaseBroker):
delete delete
:param sync_timestamp: max update_at timestamp of sync rows to delete :param sync_timestamp: max update_at timestamp of sync rows to delete
""" """
self._commit_puts()
with self.get() as conn: with self.get() as conn:
conn.execute(""" conn.execute("""
DELETE FROM object DELETE FROM object
@@ -818,30 +787,9 @@ class ContainerBroker(DatabaseBroker):
record = {'name': name, 'created_at': timestamp, 'size': size, record = {'name': name, 'created_at': timestamp, 'size': size,
'content_type': content_type, 'etag': etag, 'content_type': content_type, 'etag': etag,
'deleted': deleted} 'deleted': deleted}
if self.db_file == ':memory:': if self.db_file != ':memory:' and not os.path.exists(self.db_file):
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist") raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
pending_size = 0 self.merge_items([record])
try:
pending_size = os.path.getsize(self.pending_file)
except OSError, err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with lock_parent_directory(
self.pending_file, self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
def is_deleted(self, timestamp=None): def is_deleted(self, timestamp=None):
""" """
@@ -851,11 +799,6 @@ class ContainerBroker(DatabaseBroker):
""" """
if self.db_file != ':memory:' and not os.path.exists(self.db_file): if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return True return True
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
row = conn.execute(''' row = conn.execute('''
SELECT put_timestamp, delete_timestamp, object_count SELECT put_timestamp, delete_timestamp, object_count
@@ -878,11 +821,6 @@ class ContainerBroker(DatabaseBroker):
reported_put_timestamp, reported_delete_timestamp, reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id) reported_object_count, reported_bytes_used, hash, id)
""" """
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
return conn.execute(''' return conn.execute('''
SELECT account, container, created_at, put_timestamp, SELECT account, container, created_at, put_timestamp,
@@ -919,11 +857,6 @@ class ContainerBroker(DatabaseBroker):
:returns: list of object names :returns: list of object names
""" """
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
rv = [] rv = []
with self.get() as conn: with self.get() as conn:
row = conn.execute(''' row = conn.execute('''
@@ -960,11 +893,6 @@ class ContainerBroker(DatabaseBroker):
:returns: list of tuples of (name, created_at, size, content_type, :returns: list of tuples of (name, created_at, size, content_type,
etag) etag)
""" """
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
if path is not None: if path is not None:
prefix = path prefix = path
if path: if path:
@@ -988,7 +916,10 @@ class ContainerBroker(DatabaseBroker):
elif prefix: elif prefix:
query += ' name >= ? AND' query += ' name >= ? AND'
query_args.append(prefix) query_args.append(prefix)
query += ' +deleted = 0 ORDER BY name LIMIT ?' if self._get_db_version(conn) < 1:
query += ' +deleted = 0 ORDER BY name LIMIT ?'
else:
query += ' deleted = 0 ORDER BY name LIMIT ?'
query_args.append(limit - len(results)) query_args.append(limit - len(results))
curs = conn.execute(query, query_args) curs = conn.execute(query, query_args)
curs.row_factory = None curs.row_factory = None
@@ -1036,18 +967,19 @@ class ContainerBroker(DatabaseBroker):
max_rowid = -1 max_rowid = -1
for rec in item_list: for rec in item_list:
conn.execute(''' conn.execute('''
DELETE FROM object WHERE name = ? AND DELETE FROM object WHERE name = ? AND created_at < ? AND
(created_at < ?) deleted IN (0, 1)
''', (rec['name'], rec['created_at'])) ''', (rec['name'], rec['created_at']))
try: if not conn.execute('''
SELECT name FROM object WHERE name = ? AND
deleted IN (0, 1)
''', (rec['name'],)).fetchall():
conn.execute(''' conn.execute('''
INSERT INTO object (name, created_at, size, INSERT INTO object (name, created_at, size,
content_type, etag, deleted) content_type, etag, deleted)
VALUES (?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?)
''', ([rec['name'], rec['created_at'], rec['size'], ''', ([rec['name'], rec['created_at'], rec['size'],
rec['content_type'], rec['etag'], rec['deleted']])) rec['content_type'], rec['etag'], rec['deleted']]))
except sqlite3.IntegrityError:
pass
if source: if source:
max_rowid = max(max_rowid, rec['ROWID']) max_rowid = max(max_rowid, rec['ROWID'])
if source: if source:
@@ -1091,7 +1023,7 @@ class AccountBroker(DatabaseBroker):
conn.executescript(""" conn.executescript("""
CREATE TABLE container ( CREATE TABLE container (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT, ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE, name TEXT,
put_timestamp TEXT, put_timestamp TEXT,
delete_timestamp TEXT, delete_timestamp TEXT,
object_count INTEGER, object_count INTEGER,
@@ -1099,8 +1031,9 @@ class AccountBroker(DatabaseBroker):
deleted INTEGER DEFAULT 0 deleted INTEGER DEFAULT 0
); );
CREATE INDEX ix_container_deleted ON container (deleted); CREATE INDEX ix_container_deleted_name ON
CREATE INDEX ix_container_name ON container (name); container (deleted, name);
CREATE TRIGGER container_insert AFTER INSERT ON container CREATE TRIGGER container_insert AFTER INSERT ON container
BEGIN BEGIN
UPDATE account_stat UPDATE account_stat
@@ -1164,6 +1097,15 @@ class AccountBroker(DatabaseBroker):
''', (self.account, normalize_timestamp(time.time()), str(uuid4()), ''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
put_timestamp)) put_timestamp))
def _get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_container_deleted_name' '''):
self._db_version = 1
return self._db_version
def update_put_timestamp(self, timestamp): def update_put_timestamp(self, timestamp):
""" """
Update the put_timestamp. Only modifies it if it is greater than Update the put_timestamp. Only modifies it if it is greater than
@@ -1193,17 +1135,16 @@ class AccountBroker(DatabaseBroker):
def _commit_puts(self, item_list=None): def _commit_puts(self, item_list=None):
"""Handles commiting rows in .pending files.""" """Handles commiting rows in .pending files."""
if self.db_file == ':memory:' or not os.path.exists(self.pending_file): pending_file = self.db_file + '.pending'
if self.db_file == ':memory:' or not os.path.exists(pending_file):
return
if not os.path.getsize(pending_file):
os.unlink(pending_file)
return return
if item_list is None: if item_list is None:
item_list = [] item_list = []
with lock_parent_directory(self.pending_file, self.pending_timeout): with lock_parent_directory(pending_file, PENDING_COMMIT_TIMEOUT):
self._preallocate() with open(pending_file, 'r+b') as fp:
if not os.path.getsize(self.pending_file):
if item_list:
self.merge_items(item_list)
return
with open(self.pending_file, 'r+b') as fp:
for entry in fp.read().split(':'): for entry in fp.read().split(':'):
if entry: if entry:
try: try:
@@ -1219,11 +1160,11 @@ class AccountBroker(DatabaseBroker):
except Exception: except Exception:
self.logger.exception( self.logger.exception(
_('Invalid pending entry %(file)s: %(entry)s'), _('Invalid pending entry %(file)s: %(entry)s'),
{'file': self.pending_file, 'entry': entry}) {'file': pending_file, 'entry': entry})
if item_list: if item_list:
self.merge_items(item_list) self.merge_items(item_list)
try: try:
os.ftruncate(fp.fileno(), 0) os.unlink(pending_file)
except OSError, err: except OSError, err:
if err.errno != errno.ENOENT: if err.errno != errno.ENOENT:
raise raise
@@ -1234,11 +1175,6 @@ class AccountBroker(DatabaseBroker):
:returns: True if the database has no active containers. :returns: True if the database has no active containers.
""" """
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
row = conn.execute( row = conn.execute(
'SELECT container_count from account_stat').fetchone() 'SELECT container_count from account_stat').fetchone()
@@ -1258,7 +1194,6 @@ class AccountBroker(DatabaseBroker):
:param sync_timestamp: max update_at timestamp of sync rows to delete :param sync_timestamp: max update_at timestamp of sync rows to delete
""" """
self._commit_puts()
with self.get() as conn: with self.get() as conn:
conn.execute(''' conn.execute('''
DELETE FROM container WHERE DELETE FROM container WHERE
@@ -1286,11 +1221,6 @@ class AccountBroker(DatabaseBroker):
:returns: put_timestamp of the container :returns: put_timestamp of the container
""" """
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
ret = conn.execute(''' ret = conn.execute('''
SELECT put_timestamp FROM container SELECT put_timestamp FROM container
@@ -1311,6 +1241,8 @@ class AccountBroker(DatabaseBroker):
:param object_count: number of objects in the container :param object_count: number of objects in the container
:param bytes_used: number of bytes used by the container :param bytes_used: number of bytes used by the container
""" """
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
if delete_timestamp > put_timestamp and \ if delete_timestamp > put_timestamp and \
object_count in (None, '', 0, '0'): object_count in (None, '', 0, '0'):
deleted = 1 deleted = 1
@@ -1321,24 +1253,7 @@ class AccountBroker(DatabaseBroker):
'object_count': object_count, 'object_count': object_count,
'bytes_used': bytes_used, 'bytes_used': bytes_used,
'deleted': deleted} 'deleted': deleted}
if self.db_file == ':memory:': self.merge_items([record])
self.merge_items([record])
return
commit = False
with lock_parent_directory(self.pending_file, self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, put_timestamp, delete_timestamp, object_count,
bytes_used, deleted),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
if fp.tell() > PENDING_CAP:
commit = True
if commit:
self._commit_puts()
def can_delete_db(self, cutoff): def can_delete_db(self, cutoff):
""" """
@@ -1346,7 +1261,6 @@ class AccountBroker(DatabaseBroker):
:returns: True if the account can be deleted, False otherwise :returns: True if the account can be deleted, False otherwise
""" """
self._commit_puts()
with self.get() as conn: with self.get() as conn:
row = conn.execute(''' row = conn.execute('''
SELECT status, put_timestamp, delete_timestamp, container_count SELECT status, put_timestamp, delete_timestamp, container_count
@@ -1372,11 +1286,6 @@ class AccountBroker(DatabaseBroker):
""" """
if self.db_file != ':memory:' and not os.path.exists(self.db_file): if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return True return True
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
row = conn.execute(''' row = conn.execute('''
SELECT put_timestamp, delete_timestamp, container_count, status SELECT put_timestamp, delete_timestamp, container_count, status
@@ -1401,11 +1310,6 @@ class AccountBroker(DatabaseBroker):
delete_timestamp, container_count, object_count, delete_timestamp, container_count, object_count,
bytes_used, hash, id) bytes_used, hash, id)
""" """
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
return conn.execute(''' return conn.execute('''
SELECT account, created_at, put_timestamp, delete_timestamp, SELECT account, created_at, put_timestamp, delete_timestamp,
@@ -1422,11 +1326,6 @@ class AccountBroker(DatabaseBroker):
:returns: list of container names :returns: list of container names
""" """
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
rv = [] rv = []
with self.get() as conn: with self.get() as conn:
row = conn.execute(''' row = conn.execute('''
@@ -1460,11 +1359,6 @@ class AccountBroker(DatabaseBroker):
:returns: list of tuples of (name, object_count, bytes_used, 0) :returns: list of tuples of (name, object_count, bytes_used, 0)
""" """
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
if delimiter and not prefix: if delimiter and not prefix:
prefix = '' prefix = ''
orig_marker = marker orig_marker = marker
@@ -1485,7 +1379,10 @@ class AccountBroker(DatabaseBroker):
elif prefix: elif prefix:
query += ' name >= ? AND' query += ' name >= ? AND'
query_args.append(prefix) query_args.append(prefix)
query += ' +deleted = 0 ORDER BY name LIMIT ?' if self._get_db_version(conn) < 1:
query += ' +deleted = 0 ORDER BY name LIMIT ?'
else:
query += ' deleted = 0 ORDER BY name LIMIT ?'
query_args.append(limit - len(results)) query_args.append(limit - len(results))
curs = conn.execute(query, query_args) curs = conn.execute(query, query_args)
curs.row_factory = None curs.row_factory = None
@@ -1529,51 +1426,39 @@ class AccountBroker(DatabaseBroker):
record = [rec['name'], rec['put_timestamp'], record = [rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'], rec['delete_timestamp'], rec['object_count'],
rec['bytes_used'], rec['deleted']] rec['bytes_used'], rec['deleted']]
try: curs = conn.execute('''
conn.execute(''' SELECT name, put_timestamp, delete_timestamp,
INSERT INTO container (name, put_timestamp, object_count, bytes_used, deleted
delete_timestamp, object_count, bytes_used, FROM container WHERE name = ? AND
deleted) deleted IN (0, 1)
VALUES (?, ?, ?, ?, ?, ?) ''', (rec['name'],))
''', record) curs.row_factory = None
except sqlite3.IntegrityError: row = curs.fetchone()
curs = conn.execute(''' if row:
SELECT name, put_timestamp, delete_timestamp, row = list(row)
object_count, bytes_used, deleted for i in xrange(5):
FROM container WHERE name = ? AND if record[i] is None and row[i] is not None:
(put_timestamp < ? OR delete_timestamp < ? OR record[i] = row[i]
object_count != ? OR bytes_used != ?)''', if row[1] > record[1]: # Keep newest put_timestamp
(rec['name'], rec['put_timestamp'], record[1] = row[1]
rec['delete_timestamp'], rec['object_count'], if row[2] > record[2]: # Keep newest delete_timestamp
rec['bytes_used'])) record[2] = row[2]
curs.row_factory = None # If deleted, mark as such
row = curs.fetchone() if record[2] > record[1] and \
if row: record[3] in (None, '', 0, '0'):
row = list(row) record[5] = 1
for i in xrange(5): else:
if record[i] is None and row[i] is not None: record[5] = 0
record[i] = row[i] conn.execute('''
if row[1] > record[1]: # Keep newest put_timestamp DELETE FROM container WHERE name = ? AND
record[1] = row[1] deleted IN (0, 1)
if row[2] > record[2]: # Keep newest delete_timestamp ''', (record[0],))
record[2] = row[2] conn.execute('''
conn.execute('DELETE FROM container WHERE name = ?', INSERT INTO container (name, put_timestamp,
(record[0],)) delete_timestamp, object_count, bytes_used,
# If deleted, mark as such deleted)
if record[2] > record[1] and \ VALUES (?, ?, ?, ?, ?, ?)
record[3] in (None, '', 0, '0'): ''', record)
record[5] = 1
else:
record[5] = 0
try:
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', record)
except sqlite3.IntegrityError:
continue
if source: if source:
max_rowid = max(max_rowid, rec['ROWID']) max_rowid = max(max_rowid, rec['ROWID'])
if source: if source:

View File

@@ -180,7 +180,9 @@ class Replicator(Daemon):
return False return False
# perform block-level sync if the db was modified during the first sync # perform block-level sync if the db was modified during the first sync
if os.path.exists(broker.db_file + '-journal') or \ if os.path.exists(broker.db_file + '-journal') or \
os.path.getmtime(broker.db_file) > mtime: os.path.exists(broker.db_file + '-wal') or \
os.path.exists(broker.db_file + '-shm') or \
os.path.getmtime(broker.db_file) > mtime:
# grab a lock so nobody else can modify it # grab a lock so nobody else can modify it
with broker.lock(): with broker.lock():
if not self._rsync_file(broker.db_file, remote_file, False): if not self._rsync_file(broker.db_file, remote_file, False):
@@ -316,7 +318,7 @@ class Replicator(Daemon):
self.logger.debug(_('Replicating db %s'), object_file) self.logger.debug(_('Replicating db %s'), object_file)
self.stats['attempted'] += 1 self.stats['attempted'] += 1
try: try:
broker = self.brokerclass(object_file, pending_timeout=30) broker = self.brokerclass(object_file)
broker.reclaim(time.time() - self.reclaim_age, broker.reclaim(time.time() - self.reclaim_age,
time.time() - (self.reclaim_age * 2)) time.time() - (self.reclaim_age * 2))
info = broker.get_replication_info() info = broker.get_replication_info()

View File

@@ -219,8 +219,6 @@ class ContainerController(object):
if self.mount_check and not check_mount(self.root, drive): if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive) return Response(status='507 %s is not mounted' % drive)
broker = self._get_container_broker(drive, part, account, container) broker = self._get_container_broker(drive, part, account, container)
broker.pending_timeout = 0.1
broker.stale_reads_ok = True
if broker.is_deleted(): if broker.is_deleted():
return HTTPNotFound(request=req) return HTTPNotFound(request=req)
info = broker.get_info() info = broker.get_info()
@@ -246,8 +244,6 @@ class ContainerController(object):
if self.mount_check and not check_mount(self.root, drive): if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive) return Response(status='507 %s is not mounted' % drive)
broker = self._get_container_broker(drive, part, account, container) broker = self._get_container_broker(drive, part, account, container)
broker.pending_timeout = 0.1
broker.stale_reads_ok = True
if broker.is_deleted(): if broker.is_deleted():
return HTTPNotFound(request=req) return HTTPNotFound(request=req)
info = broker.get_info() info = broker.get_info()