From db2a4787b447b2907e8de7f604a500c31a2d1330 Mon Sep 17 00:00:00 2001 From: Pete Zaitcev Date: Thu, 25 Jul 2013 18:18:46 -0600 Subject: [PATCH] Unify _commit_puts for accounts and containers The _commit_puts method is exactly the same in AccountBroker and ContainerBroker, except for what it puts on the list pending records. It seems to be easy to factor out without victimizing the code clarity unduly. We also factor common checking for LockTimeout: not so much for any code savings, but to underscore that all these little segments really are the same, so a reader does not need to prove uniformity every time. Change-Id: Idf9b553a3cf28de3c8803aaa47cf9a2be6817f00 --- swift/common/db.py | 208 ++++++++++++++++++--------------------------- 1 file changed, 83 insertions(+), 125 deletions(-) diff --git a/swift/common/db.py b/swift/common/db.py index 6d50516076..aeccf380ee 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -416,11 +416,7 @@ class DatabaseBroker(object): :param count: number to get :returns: list of objects between start and end """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: curs = conn.execute(''' SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ? @@ -469,11 +465,7 @@ class DatabaseBroker(object): :returns: dict containing keys: hash, id, created_at, put_timestamp, delete_timestamp, count, max_row, and metadata """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() query_part1 = ''' SELECT hash, id, created_at, put_timestamp, delete_timestamp, %s_count AS count, @@ -495,8 +487,58 @@ class DatabaseBroker(object): curs.row_factory = dict_factory return curs.fetchone() - def _commit_puts(self): - pass # stub to be overridden if need be + def _commit_puts(self, item_list=None): + """ + Scan for .pending files and commit the found records by feeding them + to merge_items(). + + :param item_list: A list of items to commit in addition to .pending + """ + if self.db_file == ':memory:' or not os.path.exists(self.pending_file): + return + if item_list is None: + item_list = [] + with lock_parent_directory(self.pending_file, self.pending_timeout): + self._preallocate() + if not os.path.getsize(self.pending_file): + if item_list: + self.merge_items(item_list) + return + with open(self.pending_file, 'r+b') as fp: + for entry in fp.read().split(':'): + if entry: + try: + self._commit_puts_load(item_list, entry) + except Exception: + self.logger.exception( + _('Invalid pending entry %(file)s: %(entry)s'), + {'file': self.pending_file, 'entry': entry}) + if item_list: + self.merge_items(item_list) + try: + os.ftruncate(fp.fileno(), 0) + except OSError, err: + if err.errno != errno.ENOENT: + raise + + def _commit_puts_stale_ok(self): + """ + Catch failures of _commit_puts() if broker is intended for + reading of stats, and thus does not care for pending updates. + """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise + + def _commit_puts_load(self, item_list, entry): + """ + Unmarshall the :param:entry and append it to :param:item_list. + This is implemented by a particular broker to be compatible + with its merge_items(). + """ + raise NotImplementedError def merge_syncs(self, sync_points, incoming=True): """ @@ -792,58 +834,28 @@ class ContainerBroker(DatabaseBroker): status_changed_at = ? WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp)) + def _commit_puts_load(self, item_list, entry): + (name, timestamp, size, content_type, etag, deleted) = \ + pickle.loads(entry.decode('base64')) + item_list.append({'name': name, + 'created_at': timestamp, + 'size': size, + 'content_type': content_type, + 'etag': etag, + 'deleted': deleted}) + def empty(self): """ Check if the DB is empty. :returns: True if the database has no active objects, False otherwise """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: row = conn.execute( 'SELECT object_count from container_stat').fetchone() return (row[0] == 0) - def _commit_puts(self, item_list=None): - """Handles committing rows in .pending files.""" - if self.db_file == ':memory:' or not os.path.exists(self.pending_file): - return - if item_list is None: - item_list = [] - with lock_parent_directory(self.pending_file, self.pending_timeout): - self._preallocate() - if not os.path.getsize(self.pending_file): - if item_list: - self.merge_items(item_list) - return - with open(self.pending_file, 'r+b') as fp: - for entry in fp.read().split(':'): - if entry: - try: - (name, timestamp, size, content_type, etag, - deleted) = pickle.loads(entry.decode('base64')) - item_list.append({'name': name, - 'created_at': timestamp, - 'size': size, - 'content_type': content_type, - 'etag': etag, - 'deleted': deleted}) - except Exception: - self.logger.exception( - _('Invalid pending entry %(file)s: %(entry)s'), - {'file': self.pending_file, 'entry': entry}) - if item_list: - self.merge_items(item_list) - try: - os.ftruncate(fp.fileno(), 0) - except OSError, err: - if err.errno != errno.ENOENT: - raise - def reclaim(self, object_timestamp, sync_timestamp): """ Delete rows from the object table that are marked deleted and @@ -934,11 +946,7 @@ class ContainerBroker(DatabaseBroker): """ if self.db_file != ':memory:' and not os.path.exists(self.db_file): return True - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: row = conn.execute(''' SELECT put_timestamp, delete_timestamp, object_count @@ -962,11 +970,7 @@ class ContainerBroker(DatabaseBroker): reported_object_count, reported_bytes_used, hash, id, x_container_sync_point1, and x_container_sync_point2. """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: data = None trailing = 'x_container_sync_point1, x_container_sync_point2' @@ -1075,11 +1079,7 @@ class ContainerBroker(DatabaseBroker): delim_force_gte = False (marker, end_marker, prefix, delimiter, path) = utf8encode( marker, end_marker, prefix, delimiter, path) - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() if path is not None: prefix = path if path: @@ -1342,43 +1342,17 @@ class AccountBroker(DatabaseBroker): status_changed_at = ? WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp)) - def _commit_puts(self, item_list=None): - """Handles committing rows in .pending files.""" - if self.db_file == ':memory:' or not os.path.exists(self.pending_file): - return - if item_list is None: - item_list = [] - with lock_parent_directory(self.pending_file, self.pending_timeout): - self._preallocate() - if not os.path.getsize(self.pending_file): - if item_list: - self.merge_items(item_list) - return - with open(self.pending_file, 'r+b') as fp: - for entry in fp.read().split(':'): - if entry: - try: - (name, put_timestamp, delete_timestamp, - object_count, bytes_used, deleted) = \ - pickle.loads(entry.decode('base64')) - item_list.append( - {'name': name, - 'put_timestamp': put_timestamp, - 'delete_timestamp': delete_timestamp, - 'object_count': object_count, - 'bytes_used': bytes_used, - 'deleted': deleted}) - except Exception: - self.logger.exception( - _('Invalid pending entry %(file)s: %(entry)s'), - {'file': self.pending_file, 'entry': entry}) - if item_list: - self.merge_items(item_list) - try: - os.ftruncate(fp.fileno(), 0) - except OSError, err: - if err.errno != errno.ENOENT: - raise + def _commit_puts_load(self, item_list, entry): + (name, put_timestamp, delete_timestamp, + object_count, bytes_used, deleted) = \ + pickle.loads(entry.decode('base64')) + item_list.append( + {'name': name, + 'put_timestamp': put_timestamp, + 'delete_timestamp': delete_timestamp, + 'object_count': object_count, + 'bytes_used': bytes_used, + 'deleted': deleted}) def empty(self): """ @@ -1386,11 +1360,7 @@ class AccountBroker(DatabaseBroker): :returns: True if the database has no active containers. """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: row = conn.execute( 'SELECT container_count from account_stat').fetchone() @@ -1509,11 +1479,7 @@ class AccountBroker(DatabaseBroker): """ if self.db_file != ':memory:' and not os.path.exists(self.db_file): return True - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: row = conn.execute(''' SELECT put_timestamp, delete_timestamp, container_count, status @@ -1538,11 +1504,7 @@ class AccountBroker(DatabaseBroker): delete_timestamp, container_count, object_count, bytes_used, hash, id """ - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() with self.get() as conn: return dict(conn.execute(''' SELECT account, created_at, put_timestamp, delete_timestamp, @@ -1567,11 +1529,7 @@ class AccountBroker(DatabaseBroker): """ (marker, end_marker, prefix, delimiter) = utf8encode( marker, end_marker, prefix, delimiter) - try: - self._commit_puts() - except LockTimeout: - if not self.stale_reads_ok: - raise + self._commit_puts_stale_ok() if delimiter and not prefix: prefix = '' orig_marker = marker