Merge "container merge_items speedup"

This commit is contained in:
Jenkins 2014-08-31 04:29:15 +00:00 committed by Gerrit Code Review
commit 85362fdf4e

View File

@ -28,6 +28,9 @@ from swift.common.utils import Timestamp, lock_parent_directory
from swift.common.db import DatabaseBroker, DatabaseConnectionError, \
PENDING_CAP, PICKLE_PROTOCOL, utf8encode
SQLITE_ARG_LIMIT = 999
DATADIR = 'containers'
POLICY_STAT_TABLE_CREATE = '''
@ -701,47 +704,65 @@ class ContainerBroker(DatabaseBroker):
:param source: if defined, update incoming_sync with the source
"""
def _really_merge_items(conn):
max_rowid = -1
curs = conn.cursor()
for rec in item_list:
rec.setdefault('storage_policy_index', 0) # legacy
query = '''
DELETE FROM object
WHERE name = ? AND (created_at < ?)
AND storage_policy_index = ?
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
curs.execute(query, (rec['name'], rec['created_at'],
rec['storage_policy_index']))
query = '''
SELECT 1 FROM object WHERE name = ?
AND storage_policy_index = ?
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
if not curs.execute(query, (
rec['name'], rec['storage_policy_index'])).fetchall():
curs.execute('''
INSERT INTO object (name, created_at, size,
content_type, etag, deleted, storage_policy_index)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', ([rec['name'], rec['created_at'], rec['size'],
rec['content_type'], rec['etag'], rec['deleted'],
rec['storage_policy_index']]))
if self.get_db_version(conn) >= 1:
query_mod = ' deleted IN (0, 1) AND '
else:
query_mod = ''
curs.execute('BEGIN IMMEDIATE')
# Get created_at times for objects in item_list that already exist.
# We must chunk it up to avoid sqlite's limit of 999 args.
created_at = {}
for offset in xrange(0, len(item_list), SQLITE_ARG_LIMIT):
chunk = [rec['name'] for rec in
item_list[offset:offset + SQLITE_ARG_LIMIT]]
created_at.update(
((rec[0], rec[1]), rec[2]) for rec in curs.execute(
'SELECT name, storage_policy_index, created_at '
'FROM object WHERE ' + query_mod + ' name IN (%s)' %
','.join('?' * len(chunk)), chunk))
# Sort item_list into things that need adding and deleting, based
# on results of created_at query.
to_delete = {}
to_add = {}
for item in item_list:
item.setdefault('storage_policy_index', 0) # legacy
item_ident = (item['name'], item['storage_policy_index'])
if created_at.get(item_ident) < item['created_at']:
if item_ident in created_at: # exists with older timestamp
to_delete[item_ident] = item
if item_ident in to_add: # duplicate entries in item_list
to_add[item_ident] = max(item, to_add[item_ident],
key=lambda i: i['created_at'])
else:
to_add[item_ident] = item
if to_delete:
curs.executemany(
'DELETE FROM object WHERE ' + query_mod +
'name=? AND storage_policy_index=?',
((rec['name'], rec['storage_policy_index'])
for rec in to_delete.itervalues()))
if to_add:
curs.executemany(
'INSERT INTO object (name, created_at, size, content_type,'
'etag, deleted, storage_policy_index)'
'VALUES (?, ?, ?, ?, ?, ?, ?)',
((rec['name'], rec['created_at'], rec['size'],
rec['content_type'], rec['etag'], rec['deleted'],
rec['storage_policy_index'])
for rec in to_add.itervalues()))
if source:
max_rowid = max(max_rowid, rec['ROWID'])
if source:
try:
conn.execute('''
INSERT INTO incoming_sync (sync_point, remote_id)
VALUES (?, ?)
''', (max_rowid, source))
except sqlite3.IntegrityError:
conn.execute('''
UPDATE incoming_sync SET sync_point=max(?, sync_point)
WHERE remote_id=?
max_rowid = max(rec['ROWID']
for rec in to_add.itervalues())
curs.execute('''
UPDATE incoming_sync SET
sync_point=max(?, sync_point) WHERE remote_id=?
''', (max_rowid, source))
if curs.rowcount < 1:
curs.execute('''
INSERT INTO incoming_sync (sync_point, remote_id)
VALUES (?, ?)
''', (max_rowid, source))
conn.commit()
with self.get() as conn: