Unify _commit_puts for accounts and containers

The _commit_puts method is exactly the same in AccountBroker and
ContainerBroker, except for what it puts on the list pending records.
It seems to be easy to factor out without victimizing the code
clarity unduly.

We also factor common checking for LockTimeout: not so much for
any code savings, but to underscore that all these little segments
really are the same, so a reader does not need to prove uniformity
every time.

Change-Id: Idf9b553a3cf28de3c8803aaa47cf9a2be6817f00
This commit is contained in:
Pete Zaitcev
2013-07-25 18:18:46 -06:00
parent 76f12c8c10
commit db2a4787b4

View File

@@ -416,11 +416,7 @@ class DatabaseBroker(object):
:param count: number to get :param count: number to get
:returns: list of objects between start and end :returns: list of objects between start and end
""" """
try: self._commit_puts_stale_ok()
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
curs = conn.execute(''' curs = conn.execute('''
SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ? SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ?
@@ -469,11 +465,7 @@ class DatabaseBroker(object):
:returns: dict containing keys: hash, id, created_at, put_timestamp, :returns: dict containing keys: hash, id, created_at, put_timestamp,
delete_timestamp, count, max_row, and metadata delete_timestamp, count, max_row, and metadata
""" """
try: self._commit_puts_stale_ok()
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
query_part1 = ''' query_part1 = '''
SELECT hash, id, created_at, put_timestamp, delete_timestamp, SELECT hash, id, created_at, put_timestamp, delete_timestamp,
%s_count AS count, %s_count AS count,
@@ -495,8 +487,58 @@ class DatabaseBroker(object):
curs.row_factory = dict_factory curs.row_factory = dict_factory
return curs.fetchone() return curs.fetchone()
def _commit_puts(self): def _commit_puts(self, item_list=None):
pass # stub to be overridden if need be """
Scan for .pending files and commit the found records by feeding them
to merge_items().
:param item_list: A list of items to commit in addition to .pending
"""
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
return
if item_list is None:
item_list = []
with lock_parent_directory(self.pending_file, self.pending_timeout):
self._preallocate()
if not os.path.getsize(self.pending_file):
if item_list:
self.merge_items(item_list)
return
with open(self.pending_file, 'r+b') as fp:
for entry in fp.read().split(':'):
if entry:
try:
self._commit_puts_load(item_list, entry)
except Exception:
self.logger.exception(
_('Invalid pending entry %(file)s: %(entry)s'),
{'file': self.pending_file, 'entry': entry})
if item_list:
self.merge_items(item_list)
try:
os.ftruncate(fp.fileno(), 0)
except OSError, err:
if err.errno != errno.ENOENT:
raise
def _commit_puts_stale_ok(self):
"""
Catch failures of _commit_puts() if broker is intended for
reading of stats, and thus does not care for pending updates.
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
def _commit_puts_load(self, item_list, entry):
"""
Unmarshall the :param:entry and append it to :param:item_list.
This is implemented by a particular broker to be compatible
with its merge_items().
"""
raise NotImplementedError
def merge_syncs(self, sync_points, incoming=True): def merge_syncs(self, sync_points, incoming=True):
""" """
@@ -792,58 +834,28 @@ class ContainerBroker(DatabaseBroker):
status_changed_at = ? status_changed_at = ?
WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp)) WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
def _commit_puts_load(self, item_list, entry):
(name, timestamp, size, content_type, etag, deleted) = \
pickle.loads(entry.decode('base64'))
item_list.append({'name': name,
'created_at': timestamp,
'size': size,
'content_type': content_type,
'etag': etag,
'deleted': deleted})
def empty(self): def empty(self):
""" """
Check if the DB is empty. Check if the DB is empty.
:returns: True if the database has no active objects, False otherwise :returns: True if the database has no active objects, False otherwise
""" """
try: self._commit_puts_stale_ok()
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
row = conn.execute( row = conn.execute(
'SELECT object_count from container_stat').fetchone() 'SELECT object_count from container_stat').fetchone()
return (row[0] == 0) return (row[0] == 0)
def _commit_puts(self, item_list=None):
"""Handles committing rows in .pending files."""
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
return
if item_list is None:
item_list = []
with lock_parent_directory(self.pending_file, self.pending_timeout):
self._preallocate()
if not os.path.getsize(self.pending_file):
if item_list:
self.merge_items(item_list)
return
with open(self.pending_file, 'r+b') as fp:
for entry in fp.read().split(':'):
if entry:
try:
(name, timestamp, size, content_type, etag,
deleted) = pickle.loads(entry.decode('base64'))
item_list.append({'name': name,
'created_at': timestamp,
'size': size,
'content_type': content_type,
'etag': etag,
'deleted': deleted})
except Exception:
self.logger.exception(
_('Invalid pending entry %(file)s: %(entry)s'),
{'file': self.pending_file, 'entry': entry})
if item_list:
self.merge_items(item_list)
try:
os.ftruncate(fp.fileno(), 0)
except OSError, err:
if err.errno != errno.ENOENT:
raise
def reclaim(self, object_timestamp, sync_timestamp): def reclaim(self, object_timestamp, sync_timestamp):
""" """
Delete rows from the object table that are marked deleted and Delete rows from the object table that are marked deleted and
@@ -934,11 +946,7 @@ class ContainerBroker(DatabaseBroker):
""" """
if self.db_file != ':memory:' and not os.path.exists(self.db_file): if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return True return True
try: self._commit_puts_stale_ok()
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
row = conn.execute(''' row = conn.execute('''
SELECT put_timestamp, delete_timestamp, object_count SELECT put_timestamp, delete_timestamp, object_count
@@ -962,11 +970,7 @@ class ContainerBroker(DatabaseBroker):
reported_object_count, reported_bytes_used, hash, id, reported_object_count, reported_bytes_used, hash, id,
x_container_sync_point1, and x_container_sync_point2. x_container_sync_point1, and x_container_sync_point2.
""" """
try: self._commit_puts_stale_ok()
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
data = None data = None
trailing = 'x_container_sync_point1, x_container_sync_point2' trailing = 'x_container_sync_point1, x_container_sync_point2'
@@ -1075,11 +1079,7 @@ class ContainerBroker(DatabaseBroker):
delim_force_gte = False delim_force_gte = False
(marker, end_marker, prefix, delimiter, path) = utf8encode( (marker, end_marker, prefix, delimiter, path) = utf8encode(
marker, end_marker, prefix, delimiter, path) marker, end_marker, prefix, delimiter, path)
try: self._commit_puts_stale_ok()
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
if path is not None: if path is not None:
prefix = path prefix = path
if path: if path:
@@ -1342,43 +1342,17 @@ class AccountBroker(DatabaseBroker):
status_changed_at = ? status_changed_at = ?
WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp)) WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
def _commit_puts(self, item_list=None): def _commit_puts_load(self, item_list, entry):
"""Handles committing rows in .pending files.""" (name, put_timestamp, delete_timestamp,
if self.db_file == ':memory:' or not os.path.exists(self.pending_file): object_count, bytes_used, deleted) = \
return pickle.loads(entry.decode('base64'))
if item_list is None: item_list.append(
item_list = [] {'name': name,
with lock_parent_directory(self.pending_file, self.pending_timeout): 'put_timestamp': put_timestamp,
self._preallocate() 'delete_timestamp': delete_timestamp,
if not os.path.getsize(self.pending_file): 'object_count': object_count,
if item_list: 'bytes_used': bytes_used,
self.merge_items(item_list) 'deleted': deleted})
return
with open(self.pending_file, 'r+b') as fp:
for entry in fp.read().split(':'):
if entry:
try:
(name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted) = \
pickle.loads(entry.decode('base64'))
item_list.append(
{'name': name,
'put_timestamp': put_timestamp,
'delete_timestamp': delete_timestamp,
'object_count': object_count,
'bytes_used': bytes_used,
'deleted': deleted})
except Exception:
self.logger.exception(
_('Invalid pending entry %(file)s: %(entry)s'),
{'file': self.pending_file, 'entry': entry})
if item_list:
self.merge_items(item_list)
try:
os.ftruncate(fp.fileno(), 0)
except OSError, err:
if err.errno != errno.ENOENT:
raise
def empty(self): def empty(self):
""" """
@@ -1386,11 +1360,7 @@ class AccountBroker(DatabaseBroker):
:returns: True if the database has no active containers. :returns: True if the database has no active containers.
""" """
try: self._commit_puts_stale_ok()
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
row = conn.execute( row = conn.execute(
'SELECT container_count from account_stat').fetchone() 'SELECT container_count from account_stat').fetchone()
@@ -1509,11 +1479,7 @@ class AccountBroker(DatabaseBroker):
""" """
if self.db_file != ':memory:' and not os.path.exists(self.db_file): if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return True return True
try: self._commit_puts_stale_ok()
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
row = conn.execute(''' row = conn.execute('''
SELECT put_timestamp, delete_timestamp, container_count, status SELECT put_timestamp, delete_timestamp, container_count, status
@@ -1538,11 +1504,7 @@ class AccountBroker(DatabaseBroker):
delete_timestamp, container_count, object_count, delete_timestamp, container_count, object_count,
bytes_used, hash, id bytes_used, hash, id
""" """
try: self._commit_puts_stale_ok()
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn: with self.get() as conn:
return dict(conn.execute(''' return dict(conn.execute('''
SELECT account, created_at, put_timestamp, delete_timestamp, SELECT account, created_at, put_timestamp, delete_timestamp,
@@ -1567,11 +1529,7 @@ class AccountBroker(DatabaseBroker):
""" """
(marker, end_marker, prefix, delimiter) = utf8encode( (marker, end_marker, prefix, delimiter) = utf8encode(
marker, end_marker, prefix, delimiter) marker, end_marker, prefix, delimiter)
try: self._commit_puts_stale_ok()
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
if delimiter and not prefix: if delimiter and not prefix:
prefix = '' prefix = ''
orig_marker = marker orig_marker = marker