Merge "Fixed concurrent PUT requests to accounts or containers"

This commit is contained in:
Jenkins 2013-11-21 01:12:19 +00:00 committed by Gerrit Code Review
commit 9081b33048
2 changed files with 47 additions and 4 deletions

View File

@ -89,22 +89,38 @@ class GreenDBConnection(sqlite3.Connection):
self.db_file = args[0] if args else'-' self.db_file = args[0] if args else'-'
sqlite3.Connection.__init__(self, *args, **kwargs) sqlite3.Connection.__init__(self, *args, **kwargs)
def cursor(self, cls=None):
if cls is None:
cls = GreenDBCursor
return sqlite3.Connection.cursor(self, cls)
class GreenDBCursor(sqlite3.Cursor):
"""SQLite Cursor handler that plays well with eventlet."""
def __init__(self, *args, **kwargs):
self.timeout = args[0].timeout
self.db_file = args[0].db_file
sqlite3.Cursor.__init__(self, *args, **kwargs)
def _timeout(self, call): def _timeout(self, call):
with LockTimeout(self.timeout, self.db_file): with LockTimeout(self.timeout, self.db_file):
retry_wait = 0.001
while True: while True:
try: try:
return call() return call()
except sqlite3.OperationalError as e: except sqlite3.OperationalError as e:
if 'locked' not in str(e): if 'locked' not in str(e):
raise raise
sleep(0.05) sleep(retry_wait)
retry_wait = min(retry_wait * 2, 0.05)
def execute(self, *args, **kwargs): def execute(self, *args, **kwargs):
return self._timeout(lambda: sqlite3.Connection.execute( return self._timeout(lambda: sqlite3.Cursor.execute(
self, *args, **kwargs)) self, *args, **kwargs))
def commit(self): def commit(self):
return self._timeout(lambda: sqlite3.Connection.commit(self)) return self._timeout(lambda: sqlite3.Cursor.commit(self))
def dict_factory(crs, row): def dict_factory(crs, row):

View File

@ -23,7 +23,9 @@ from uuid import uuid4
import simplejson import simplejson
import sqlite3 import sqlite3
from mock import patch from mock import patch, MagicMock
from eventlet.timeout import Timeout
import swift.common.db import swift.common.db
from swift.common.db import chexor, dict_factory, get_db_connection, \ from swift.common.db import chexor, dict_factory, get_db_connection, \
@ -90,6 +92,31 @@ class TestGetDBConnection(unittest.TestCase):
self.assertRaises(DatabaseConnectionError, get_db_connection, self.assertRaises(DatabaseConnectionError, get_db_connection,
'invalid database path / name') 'invalid database path / name')
def test_locked_db(self):
# This test is dependant on the code under test calling execute and
# commit as sqlite3.<Connection/Cursor>.<execute/commit> in a subclass.
class InterceptConnection(sqlite3.Connection):
pass
class InterceptCursor(sqlite3.Cursor):
pass
db_error = sqlite3.OperationalError('database is locked')
mock_db_cmd = MagicMock(side_effect=db_error)
InterceptConnection.execute = mock_db_cmd
InterceptConnection.commit = mock_db_cmd
InterceptCursor.execute = mock_db_cmd
InterceptCursor.commit = mock_db_cmd
with patch.multiple('sqlite3', Connection=InterceptConnection,
Cursor=InterceptCursor):
self.assertRaises(Timeout, get_db_connection, ':memory:',
timeout=0.1)
self.assertTrue(mock_db_cmd.called)
self.assertEqual(mock_db_cmd.call_args_list,
list((mock_db_cmd.call_args,) *
mock_db_cmd.call_count))
class TestDatabaseBroker(unittest.TestCase): class TestDatabaseBroker(unittest.TestCase):