diff --git a/swift/common/db.py b/swift/common/db.py index 6d50516076..aeccf380ee 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -416,11 +416,7 @@ class DatabaseBroker(object): :param count: number to get :returns: list of objects between start and end """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: curs = conn.execute(''' SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ? @@ -469,11 +465,7 @@ class DatabaseBroker(object): :returns: dict containing keys: hash, id, created_at, put_timestamp, delete_timestamp, count, max_row, and metadata """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() query_part1 = ''' SELECT hash, id, created_at, put_timestamp, delete_timestamp, %s_count AS count, @@ -495,8 +487,58 @@ class DatabaseBroker(object): curs.row_factory = dict_factory return curs.fetchone() - def _commit_puts(self): - pass # stub to be overridden if need be + def _commit_puts(self, item_list=None): + """ + Scan for .pending files and commit the found records by feeding them + to merge_items(). + + :param item_list: A list of items to commit in addition to .pending + """ + if self.db_file == ':memory:' or not os.path.exists(self.pending_file): + return + if item_list is None: + item_list = [] + with lock_parent_directory(self.pending_file, self.pending_timeout): + self._preallocate() + if not os.path.getsize(self.pending_file): + if item_list: + self.merge_items(item_list) + return + with open(self.pending_file, 'r+b') as fp: + for entry in fp.read().split(':'): + if entry: + try: + self._commit_puts_load(item_list, entry) + except Exception: + self.logger.exception( + _('Invalid pending entry %(file)s: %(entry)s'), + {'file': self.pending_file, 'entry': entry}) + if item_list: + self.merge_items(item_list) + try: + os.ftruncate(fp.fileno(), 0) + except OSError, err: + if err.errno != errno.ENOENT: + raise + + def _commit_puts_stale_ok(self): + """ + Catch failures of _commit_puts() if broker is intended for + reading of stats, and thus does not care for pending updates. + """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + + def _commit_puts_load(self, item_list, entry): + """ + Unmarshall the :param:entry and append it to :param:item_list. + This is implemented by a particular broker to be compatible + with its merge_items(). + """ + raise NotImplementedError def merge_syncs(self, sync_points, incoming=True): """ @@ -792,58 +834,28 @@ class ContainerBroker(DatabaseBroker): status_changed_at = ? WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp)) + def _commit_puts_load(self, item_list, entry): + (name, timestamp, size, content_type, etag, deleted) = \ + pickle.loads(entry.decode('base64')) + item_list.append({'name': name, + 'created_at': timestamp, + 'size': size, + 'content_type': content_type, + 'etag': etag, + 'deleted': deleted}) + def empty(self): """ Check if the DB is empty. :returns: True if the database has no active objects, False otherwise """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: row = conn.execute( 'SELECT object_count from container_stat').fetchone() return (row[0] == 0) - def _commit_puts(self, item_list=None): - """Handles committing rows in .pending files.""" - if self.db_file == ':memory:' or not os.path.exists(self.pending_file): - return - if item_list is None: - item_list = [] - with lock_parent_directory(self.pending_file, self.pending_timeout): - self._preallocate() - if not os.path.getsize(self.pending_file): - if item_list: - self.merge_items(item_list) - return - with open(self.pending_file, 'r+b') as fp: - for entry in fp.read().split(':'): - if entry: - try: - (name, timestamp, size, content_type, etag, - deleted) = pickle.loads(entry.decode('base64')) - item_list.append({'name': name, - 'created_at': timestamp, - 'size': size, - 'content_type': content_type, - 'etag': etag, - 'deleted': deleted}) - except Exception: - self.logger.exception( - _('Invalid pending entry %(file)s: %(entry)s'), - {'file': self.pending_file, 'entry': entry}) - if item_list: - self.merge_items(item_list) - try: - os.ftruncate(fp.fileno(), 0) - except OSError, err: - if err.errno != errno.ENOENT: - raise - def reclaim(self, object_timestamp, sync_timestamp): """ Delete rows from the object table that are marked deleted and @@ -934,11 +946,7 @@ class ContainerBroker(DatabaseBroker): """ if self.db_file != ':memory:' and not os.path.exists(self.db_file): return True - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: row = conn.execute(''' SELECT put_timestamp, delete_timestamp, object_count @@ -962,11 +970,7 @@ class ContainerBroker(DatabaseBroker): reported_object_count, reported_bytes_used, hash, id, x_container_sync_point1, and x_container_sync_point2. """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: data = None trailing = 'x_container_sync_point1, x_container_sync_point2' @@ -1075,11 +1079,7 @@ class ContainerBroker(DatabaseBroker): delim_force_gte = False (marker, end_marker, prefix, delimiter, path) = utf8encode( marker, end_marker, prefix, delimiter, path) - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() if path is not None: prefix = path if path: @@ -1342,43 +1342,17 @@ class AccountBroker(DatabaseBroker): status_changed_at = ? WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp)) - def _commit_puts(self, item_list=None): - """Handles committing rows in .pending files.""" - if self.db_file == ':memory:' or not os.path.exists(self.pending_file): - return - if item_list is None: - item_list = [] - with lock_parent_directory(self.pending_file, self.pending_timeout): - self._preallocate() - if not os.path.getsize(self.pending_file): - if item_list: - self.merge_items(item_list) - return - with open(self.pending_file, 'r+b') as fp: - for entry in fp.read().split(':'): - if entry: - try: - (name, put_timestamp, delete_timestamp, - object_count, bytes_used, deleted) = \ - pickle.loads(entry.decode('base64')) - item_list.append( - {'name': name, - 'put_timestamp': put_timestamp, - 'delete_timestamp': delete_timestamp, - 'object_count': object_count, - 'bytes_used': bytes_used, - 'deleted': deleted}) - except Exception: - self.logger.exception( - _('Invalid pending entry %(file)s: %(entry)s'), - {'file': self.pending_file, 'entry': entry}) - if item_list: - self.merge_items(item_list) - try: - os.ftruncate(fp.fileno(), 0) - except OSError, err: - if err.errno != errno.ENOENT: - raise + def _commit_puts_load(self, item_list, entry): + (name, put_timestamp, delete_timestamp, + object_count, bytes_used, deleted) = \ + pickle.loads(entry.decode('base64')) + item_list.append( + {'name': name, + 'put_timestamp': put_timestamp, + 'delete_timestamp': delete_timestamp, + 'object_count': object_count, + 'bytes_used': bytes_used, + 'deleted': deleted}) def empty(self): """ @@ -1386,11 +1360,7 @@ class AccountBroker(DatabaseBroker): :returns: True if the database has no active containers. """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: row = conn.execute( 'SELECT container_count from account_stat').fetchone() @@ -1509,11 +1479,7 @@ class AccountBroker(DatabaseBroker): """ if self.db_file != ':memory:' and not os.path.exists(self.db_file): return True - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: row = conn.execute(''' SELECT put_timestamp, delete_timestamp, container_count, status @@ -1538,11 +1504,7 @@ class AccountBroker(DatabaseBroker): delete_timestamp, container_count, object_count, bytes_used, hash, id """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: return dict(conn.execute(''' SELECT account, created_at, put_timestamp, delete_timestamp, @@ -1567,11 +1529,7 @@ class AccountBroker(DatabaseBroker): """ (marker, end_marker, prefix, delimiter) = utf8encode( marker, end_marker, prefix, delimiter) - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() if delimiter and not prefix: prefix = '' orig_marker = marker