add test for defer_connect
This commit is contained in:
@@ -651,7 +651,9 @@ class Connection(object):
|
|||||||
self.sql_mode = sql_mode
|
self.sql_mode = sql_mode
|
||||||
self.init_command = init_command
|
self.init_command = init_command
|
||||||
self.max_allowed_packet = max_allowed_packet
|
self.max_allowed_packet = max_allowed_packet
|
||||||
if not defer_connect:
|
if defer_connect:
|
||||||
|
self.socket = None
|
||||||
|
else:
|
||||||
self.connect()
|
self.connect()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
|||||||
@@ -107,6 +107,26 @@ class TestConnection(base.PyMySQLTestCase):
|
|||||||
c.set_charset('utf8')
|
c.set_charset('utf8')
|
||||||
# TODO validate setting here
|
# TODO validate setting here
|
||||||
|
|
||||||
|
def test_defer_connect(self):
|
||||||
|
import socket
|
||||||
|
for db in self.databases:
|
||||||
|
d = db.copy()
|
||||||
|
try:
|
||||||
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
sock.connect(d['unix_socket'])
|
||||||
|
except KeyError:
|
||||||
|
sock = socket.create_connection(
|
||||||
|
(d.get('host', 'localhost'), d.get('port', 3306)))
|
||||||
|
for k in ['unix_socket', 'host', 'port']:
|
||||||
|
try:
|
||||||
|
del d[k]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
c = pymysql.connect(defer_connect=True, **d)
|
||||||
|
self.assertFalse(c.open)
|
||||||
|
c.connect(sock)
|
||||||
|
|
||||||
|
|
||||||
# A custom type and function to escape it
|
# A custom type and function to escape it
|
||||||
class Foo(object):
|
class Foo(object):
|
||||||
|
|||||||
Reference in New Issue
Block a user