Merge "Fix DB locked error on commit"
This commit is contained in:
commit
c850580566
@ -57,6 +57,19 @@ def utf8encodekeys(metadata):
|
|||||||
metadata[k.encode('utf-8')] = sv
|
metadata[k.encode('utf-8')] = sv
|
||||||
|
|
||||||
|
|
||||||
|
def _db_timeout(timeout, db_file, call):
|
||||||
|
with LockTimeout(timeout, db_file):
|
||||||
|
retry_wait = 0.001
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
return call()
|
||||||
|
except sqlite3.OperationalError as e:
|
||||||
|
if 'locked' not in str(e):
|
||||||
|
raise
|
||||||
|
sleep(retry_wait)
|
||||||
|
retry_wait = min(retry_wait * 2, 0.05)
|
||||||
|
|
||||||
|
|
||||||
class DatabaseConnectionError(sqlite3.DatabaseError):
|
class DatabaseConnectionError(sqlite3.DatabaseError):
|
||||||
"""More friendly error messages for DB Errors."""
|
"""More friendly error messages for DB Errors."""
|
||||||
|
|
||||||
@ -94,6 +107,11 @@ class GreenDBConnection(sqlite3.Connection):
|
|||||||
cls = GreenDBCursor
|
cls = GreenDBCursor
|
||||||
return sqlite3.Connection.cursor(self, cls)
|
return sqlite3.Connection.cursor(self, cls)
|
||||||
|
|
||||||
|
def commit(self):
|
||||||
|
return _db_timeout(
|
||||||
|
self.timeout, self.db_file,
|
||||||
|
lambda: sqlite3.Connection.commit(self))
|
||||||
|
|
||||||
|
|
||||||
class GreenDBCursor(sqlite3.Cursor):
|
class GreenDBCursor(sqlite3.Cursor):
|
||||||
"""SQLite Cursor handler that plays well with eventlet."""
|
"""SQLite Cursor handler that plays well with eventlet."""
|
||||||
@ -103,24 +121,10 @@ class GreenDBCursor(sqlite3.Cursor):
|
|||||||
self.db_file = args[0].db_file
|
self.db_file = args[0].db_file
|
||||||
sqlite3.Cursor.__init__(self, *args, **kwargs)
|
sqlite3.Cursor.__init__(self, *args, **kwargs)
|
||||||
|
|
||||||
def _timeout(self, call):
|
|
||||||
with LockTimeout(self.timeout, self.db_file):
|
|
||||||
retry_wait = 0.001
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
return call()
|
|
||||||
except sqlite3.OperationalError as e:
|
|
||||||
if 'locked' not in str(e):
|
|
||||||
raise
|
|
||||||
sleep(retry_wait)
|
|
||||||
retry_wait = min(retry_wait * 2, 0.05)
|
|
||||||
|
|
||||||
def execute(self, *args, **kwargs):
|
def execute(self, *args, **kwargs):
|
||||||
return self._timeout(lambda: sqlite3.Cursor.execute(
|
return _db_timeout(
|
||||||
self, *args, **kwargs))
|
self.timeout, self.db_file, lambda: sqlite3.Cursor.execute(
|
||||||
|
self, *args, **kwargs))
|
||||||
def commit(self):
|
|
||||||
return self._timeout(lambda: sqlite3.Cursor.commit(self))
|
|
||||||
|
|
||||||
|
|
||||||
def dict_factory(crs, row):
|
def dict_factory(crs, row):
|
||||||
|
@ -29,7 +29,8 @@ from eventlet.timeout import Timeout
|
|||||||
|
|
||||||
import swift.common.db
|
import swift.common.db
|
||||||
from swift.common.db import chexor, dict_factory, get_db_connection, \
|
from swift.common.db import chexor, dict_factory, get_db_connection, \
|
||||||
DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists
|
DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \
|
||||||
|
GreenDBConnection
|
||||||
from swift.common.utils import normalize_timestamp
|
from swift.common.utils import normalize_timestamp
|
||||||
from swift.common.exceptions import LockTimeout
|
from swift.common.exceptions import LockTimeout
|
||||||
|
|
||||||
@ -82,6 +83,41 @@ class TestChexor(unittest.TestCase):
|
|||||||
normalize_timestamp(1))
|
normalize_timestamp(1))
|
||||||
|
|
||||||
|
|
||||||
|
class TestGreenDBConnection(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_execute_when_locked(self):
|
||||||
|
# This test is dependant on the code under test calling execute and
|
||||||
|
# commit as sqlite3.Cursor.execute in a subclass.
|
||||||
|
class InterceptCursor(sqlite3.Cursor):
|
||||||
|
pass
|
||||||
|
db_error = sqlite3.OperationalError('database is locked')
|
||||||
|
InterceptCursor.execute = MagicMock(side_effect=db_error)
|
||||||
|
with patch('sqlite3.Cursor', new=InterceptCursor):
|
||||||
|
conn = sqlite3.connect(':memory:', check_same_thread=False,
|
||||||
|
factory=GreenDBConnection, timeout=0.1)
|
||||||
|
self.assertRaises(Timeout, conn.execute, 'select 1')
|
||||||
|
self.assertTrue(InterceptCursor.execute.called)
|
||||||
|
self.assertEqual(InterceptCursor.execute.call_args_list,
|
||||||
|
list((InterceptCursor.execute.call_args,) *
|
||||||
|
InterceptCursor.execute.call_count))
|
||||||
|
|
||||||
|
def text_commit_when_locked(self):
|
||||||
|
# This test is dependant on the code under test calling commit and
|
||||||
|
# commit as sqlite3.Connection.commit in a subclass.
|
||||||
|
class InterceptConnection(sqlite3.Connection):
|
||||||
|
pass
|
||||||
|
db_error = sqlite3.OperationalError('database is locked')
|
||||||
|
InterceptConnection.commit = MagicMock(side_effect=db_error)
|
||||||
|
with patch('sqlite3.Connection', new=InterceptConnection):
|
||||||
|
conn = sqlite3.connect(':memory:', check_same_thread=False,
|
||||||
|
factory=GreenDBConnection, timeout=0.1)
|
||||||
|
self.assertRaises(Timeout, conn.commit)
|
||||||
|
self.assertTrue(InterceptConnection.commit.called)
|
||||||
|
self.assertEqual(InterceptConnection.commit.call_args_list,
|
||||||
|
list((InterceptConnection.commit.call_args,) *
|
||||||
|
InterceptConnection.commit.call_count))
|
||||||
|
|
||||||
|
|
||||||
class TestGetDBConnection(unittest.TestCase):
|
class TestGetDBConnection(unittest.TestCase):
|
||||||
|
|
||||||
def test_normal_case(self):
|
def test_normal_case(self):
|
||||||
@ -94,22 +130,15 @@ class TestGetDBConnection(unittest.TestCase):
|
|||||||
|
|
||||||
def test_locked_db(self):
|
def test_locked_db(self):
|
||||||
# This test is dependant on the code under test calling execute and
|
# This test is dependant on the code under test calling execute and
|
||||||
# commit as sqlite3.<Connection/Cursor>.<execute/commit> in a subclass.
|
# commit as sqlite3.Cursor.execute in a subclass.
|
||||||
class InterceptConnection(sqlite3.Connection):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class InterceptCursor(sqlite3.Cursor):
|
class InterceptCursor(sqlite3.Cursor):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
db_error = sqlite3.OperationalError('database is locked')
|
db_error = sqlite3.OperationalError('database is locked')
|
||||||
mock_db_cmd = MagicMock(side_effect=db_error)
|
mock_db_cmd = MagicMock(side_effect=db_error)
|
||||||
InterceptConnection.execute = mock_db_cmd
|
|
||||||
InterceptConnection.commit = mock_db_cmd
|
|
||||||
InterceptCursor.execute = mock_db_cmd
|
InterceptCursor.execute = mock_db_cmd
|
||||||
InterceptCursor.commit = mock_db_cmd
|
|
||||||
|
|
||||||
with patch.multiple('sqlite3', Connection=InterceptConnection,
|
with patch('sqlite3.Cursor', new=InterceptCursor):
|
||||||
Cursor=InterceptCursor):
|
|
||||||
self.assertRaises(Timeout, get_db_connection, ':memory:',
|
self.assertRaises(Timeout, get_db_connection, ':memory:',
|
||||||
timeout=0.1)
|
timeout=0.1)
|
||||||
self.assertTrue(mock_db_cmd.called)
|
self.assertTrue(mock_db_cmd.called)
|
||||||
|
Loading…
Reference in New Issue
Block a user