up to trunk

This commit is contained in:
David Goetz 2011-03-16 08:01:23 -07:00
commit 8296bbb4f0
1 changed files with 81 additions and 122 deletions

View File

@ -165,6 +165,7 @@ class DatabaseBroker(object):
self.logger = logger or logging.getLogger()
self.account = account
self.container = container
self._db_version = -1
def initialize(self, put_timestamp=None):
"""
@ -607,7 +608,7 @@ class ContainerBroker(DatabaseBroker):
conn.executescript("""
CREATE TABLE object (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
name TEXT,
created_at TEXT,
size INTEGER,
content_type TEXT,
@ -615,7 +616,7 @@ class ContainerBroker(DatabaseBroker):
deleted INTEGER DEFAULT 0
);
CREATE INDEX ix_object_deleted ON object (deleted);
CREATE INDEX ix_object_deleted_name ON object (deleted, name);
CREATE TRIGGER object_insert AFTER INSERT ON object
BEGIN
@ -678,6 +679,15 @@ class ContainerBroker(DatabaseBroker):
''', (self.account, self.container, normalize_timestamp(time.time()),
str(uuid4()), put_timestamp))
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_object_deleted_name' '''):
self._db_version = 1
return self._db_version
def _newid(self, conn):
conn.execute('''
UPDATE container_stat
@ -910,37 +920,6 @@ class ContainerBroker(DatabaseBroker):
''', (put_timestamp, delete_timestamp, object_count, bytes_used))
conn.commit()
def get_random_objects(self, max_count=100):
"""
Get random objects from the DB. This is used by the container_auditor
when testing random objects for existence.
:param max_count: maximum number of objects to get
:returns: list of object names
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
rv = []
with self.get() as conn:
row = conn.execute('''
SELECT ROWID FROM object ORDER BY ROWID DESC LIMIT 1
''').fetchone()
if not row:
return []
max_rowid = row['ROWID']
for _junk in xrange(min(max_count, max_rowid)):
row = conn.execute('''
SELECT name FROM object WHERE ROWID >= ? AND +deleted = 0
LIMIT 1
''', (randint(0, max_rowid),)).fetchone()
if row:
rv.append(row['name'])
return list(set(rv))
def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter,
path=None, format=None):
"""
@ -988,7 +967,11 @@ class ContainerBroker(DatabaseBroker):
elif prefix:
query += ' name >= ? AND'
query_args.append(prefix)
query += ' +deleted = 0 ORDER BY name LIMIT ?'
if self.get_db_version(conn) < 1:
query += ' +deleted = 0'
else:
query += ' deleted = 0'
query += ' ORDER BY name LIMIT ?'
query_args.append(limit - len(results))
curs = conn.execute(query, query_args)
curs.row_factory = None
@ -1035,19 +1018,23 @@ class ContainerBroker(DatabaseBroker):
with self.get() as conn:
max_rowid = -1
for rec in item_list:
conn.execute('''
DELETE FROM object WHERE name = ? AND
(created_at < ?)
''', (rec['name'], rec['created_at']))
try:
query = '''
DELETE FROM object
WHERE name = ? AND (created_at < ?)
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
conn.execute(query, (rec['name'], rec['created_at']))
query = 'SELECT 1 FROM object WHERE name = ?'
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
if not conn.execute(query, (rec['name'],)).fetchall():
conn.execute('''
INSERT INTO object (name, created_at, size,
content_type, etag, deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', ([rec['name'], rec['created_at'], rec['size'],
rec['content_type'], rec['etag'], rec['deleted']]))
except sqlite3.IntegrityError:
pass
if source:
max_rowid = max(max_rowid, rec['ROWID'])
if source:
@ -1091,7 +1078,7 @@ class AccountBroker(DatabaseBroker):
conn.executescript("""
CREATE TABLE container (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
name TEXT,
put_timestamp TEXT,
delete_timestamp TEXT,
object_count INTEGER,
@ -1099,8 +1086,9 @@ class AccountBroker(DatabaseBroker):
deleted INTEGER DEFAULT 0
);
CREATE INDEX ix_container_deleted ON container (deleted);
CREATE INDEX ix_container_name ON container (name);
CREATE INDEX ix_container_deleted_name ON
container (deleted, name);
CREATE TRIGGER container_insert AFTER INSERT ON container
BEGIN
UPDATE account_stat
@ -1164,6 +1152,15 @@ class AccountBroker(DatabaseBroker):
''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
put_timestamp))
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_container_deleted_name' '''):
self._db_version = 1
return self._db_version
def update_put_timestamp(self, timestamp):
"""
Update the put_timestamp. Only modifies it if it is greater than
@ -1413,38 +1410,6 @@ class AccountBroker(DatabaseBroker):
FROM account_stat
''').fetchone()
def get_random_containers(self, max_count=100):
"""
Get random containers from the DB. This is used by the
account_auditor when testing random containerss for existence.
:param max_count: maximum number of containers to get
:returns: list of container names
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
rv = []
with self.get() as conn:
row = conn.execute('''
SELECT ROWID FROM container ORDER BY ROWID DESC LIMIT 1
''').fetchone()
if not row:
return []
max_rowid = row['ROWID']
for _junk in xrange(min(max_count, max_rowid)):
row = conn.execute('''
SELECT name FROM container WHERE
ROWID >= ? AND +deleted = 0
LIMIT 1
''', (randint(0, max_rowid),)).fetchone()
if row:
rv.append(row['name'])
return list(set(rv))
def list_containers_iter(self, limit, marker, end_marker, prefix,
delimiter):
"""
@ -1485,7 +1450,11 @@ class AccountBroker(DatabaseBroker):
elif prefix:
query += ' name >= ? AND'
query_args.append(prefix)
query += ' +deleted = 0 ORDER BY name LIMIT ?'
if self.get_db_version(conn) < 1:
query += ' +deleted = 0'
else:
query += ' deleted = 0'
query += ' ORDER BY name LIMIT ?'
query_args.append(limit - len(results))
curs = conn.execute(query, query_args)
curs.row_factory = None
@ -1529,51 +1498,41 @@ class AccountBroker(DatabaseBroker):
record = [rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'],
rec['bytes_used'], rec['deleted']]
try:
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', record)
except sqlite3.IntegrityError:
curs = conn.execute('''
SELECT name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted
FROM container WHERE name = ? AND
(put_timestamp < ? OR delete_timestamp < ? OR
object_count != ? OR bytes_used != ?)''',
(rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'],
rec['bytes_used']))
curs.row_factory = None
row = curs.fetchone()
if row:
row = list(row)
for i in xrange(5):
if record[i] is None and row[i] is not None:
record[i] = row[i]
if row[1] > record[1]: # Keep newest put_timestamp
record[1] = row[1]
if row[2] > record[2]: # Keep newest delete_timestamp
record[2] = row[2]
conn.execute('DELETE FROM container WHERE name = ?',
(record[0],))
# If deleted, mark as such
if record[2] > record[1] and \
record[3] in (None, '', 0, '0'):
record[5] = 1
else:
record[5] = 0
try:
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', record)
except sqlite3.IntegrityError:
continue
query = '''
SELECT name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted
FROM container WHERE name = ?
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
curs = conn.execute(query, (rec['name'],))
curs.row_factory = None
row = curs.fetchone()
if row:
row = list(row)
for i in xrange(5):
if record[i] is None and row[i] is not None:
record[i] = row[i]
if row[1] > record[1]: # Keep newest put_timestamp
record[1] = row[1]
if row[2] > record[2]: # Keep newest delete_timestamp
record[2] = row[2]
# If deleted, mark as such
if record[2] > record[1] and \
record[3] in (None, '', 0, '0'):
record[5] = 1
else:
record[5] = 0
conn.execute('''
DELETE FROM container WHERE name = ? AND
deleted IN (0, 1)
''', (record[0],))
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', record)
if source:
max_rowid = max(max_rowid, rec['ROWID'])
if source: