tests: db_pool: test .clear() updates .current_size GH-139

This commit is contained in:
Sergey Shepelev
2014-12-21 02:40:52 +03:00
parent 67b18bd327
commit 68b5641f22

View File

@@ -453,7 +453,8 @@ class DBConnectionPool(DBTester):
class DummyConnection(object):
pass
def rollback(self):
pass
class DummyDBModule(object):
@@ -505,54 +506,69 @@ class RawConnectionPool(DBConnectionPool):
**self._auth)
class TestRawConnectionPool(TestCase):
def test_issue_125(self):
# pool = self.create_pool(min_size=3, max_size=5)
pool = db_pool.RawConnectionPool(
DummyDBModule(),
dsn="dbname=test user=jessica port=5433",
min_size=3, max_size=5)
conn = pool.get()
def test_raw_pool_issue_125():
# pool = self.create_pool(min_size=3, max_size=5)
pool = db_pool.RawConnectionPool(
DummyDBModule(),
dsn="dbname=test user=jessica port=5433",
min_size=3, max_size=5)
conn = pool.get()
pool.put(conn)
def test_raw_pool_custom_cleanup_ok():
cleanup_mock = mock.Mock()
pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup_mock)
conn = pool.get()
pool.put(conn)
assert cleanup_mock.call_count == 1
with pool.item() as conn:
pass
assert cleanup_mock.call_count == 2
def test_raw_pool_custom_cleanup_arg_error():
cleanup_mock = mock.Mock(side_effect=NotImplementedError)
pool = db_pool.RawConnectionPool(DummyDBModule())
conn = pool.get()
pool.put(conn, cleanup=cleanup_mock)
assert cleanup_mock.call_count == 1
with pool.item(cleanup=cleanup_mock):
pass
assert cleanup_mock.call_count == 2
def test_raw_pool_custom_cleanup_fatal():
state = [0]
def cleanup(conn):
state[0] += 1
raise KeyboardInterrupt
pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup)
conn = pool.get()
try:
pool.put(conn)
except KeyboardInterrupt:
pass
else:
assert False, 'Expected KeyboardInterrupt'
assert state[0] == 1
def test_custom_cleanup_ok(self):
cleanup_mock = mock.Mock()
pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup_mock)
conn = pool.get()
pool.put(conn)
assert cleanup_mock.call_count == 1
with pool.item() as conn:
pass
assert cleanup_mock.call_count == 2
def test_custom_cleanup_arg_error(self):
cleanup_mock = mock.Mock(side_effect=NotImplementedError)
pool = db_pool.RawConnectionPool(DummyDBModule())
conn = pool.get()
pool.put(conn, cleanup=cleanup_mock)
assert cleanup_mock.call_count == 1
with pool.item(cleanup=cleanup_mock):
pass
assert cleanup_mock.call_count == 2
def test_custom_cleanup_fatal(self):
state = [0]
def cleanup(conn):
state[0] += 1
raise KeyboardInterrupt
pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup)
conn = pool.get()
try:
pool.put(conn)
except KeyboardInterrupt:
pass
else:
assert False, 'Expected KeyboardInterrupt'
assert state[0] == 1
def test_raw_pool_clear_update_current_size():
# https://github.com/eventlet/eventlet/issues/139
# BaseConnectionPool.clear does not update .current_size.
# That leads to situation when new connections could not be created.
pool = db_pool.RawConnectionPool(DummyDBModule())
pool.get().close()
assert pool.current_size == 1
assert len(pool.free_items) == 1
pool.clear()
assert pool.current_size == 0
assert len(pool.free_items) == 0
get_auth = get_database_auth