Merge pull request #303 from PyMySQL/improve-format
Improve datetime and bytes formats
This commit is contained in:
@@ -67,7 +67,8 @@ def escape_unicode(value):
|
||||
return escape_str(value)
|
||||
|
||||
def escape_bytes(value):
|
||||
return "x'%s'" % binascii.hexlify(value).decode(sys.getdefaultencoding())
|
||||
# escape_bytes is calld only on Python 3.
|
||||
return escape_str(value.decode('ascii', 'surrogateescape'))
|
||||
|
||||
def escape_None(value):
|
||||
return 'NULL'
|
||||
@@ -76,21 +77,29 @@ def escape_timedelta(obj):
|
||||
seconds = int(obj.seconds) % 60
|
||||
minutes = int(obj.seconds // 60) % 60
|
||||
hours = int(obj.seconds // 3600) % 24 + int(obj.days) * 24
|
||||
return escape_str('%02d:%02d:%02d' % (hours, minutes, seconds))
|
||||
if obj.microseconds:
|
||||
fmt = "'{0:02d}:{1:02d}:{2:02d}.{3:06d}'"
|
||||
else:
|
||||
fmt = "'{0:02d}:{1:02d}:{2:02d}'"
|
||||
return fmt.format(hours, minutes, seconds, obj.microseconds)
|
||||
|
||||
def escape_time(obj):
|
||||
s = "%02d:%02d:%02d" % (int(obj.hour), int(obj.minute),
|
||||
int(obj.second))
|
||||
if obj.microsecond:
|
||||
s += ".{0:06}".format(obj.microsecond)
|
||||
|
||||
return escape_str(s)
|
||||
fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'"
|
||||
else:
|
||||
fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}'"
|
||||
return fmt.format(obj)
|
||||
|
||||
def escape_datetime(obj):
|
||||
return escape_str(obj.isoformat(' '))
|
||||
if obj.microsecond:
|
||||
fmt = "'{0.year:04}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'"
|
||||
else:
|
||||
fmt = "'{0.year:04}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}:{0.second:02}'"
|
||||
return fmt.format(obj)
|
||||
|
||||
def escape_date(obj):
|
||||
return escape_str(obj.isoformat())
|
||||
fmt = "'{0.year:04}-{0.month:02}-{0.day:02}'"
|
||||
return fmt.format(obj)
|
||||
|
||||
def escape_struct_time(obj):
|
||||
return escape_datetime(datetime.datetime(*obj[:6]))
|
||||
|
||||
@@ -161,13 +161,19 @@ class Cursor(object):
|
||||
args = iter(args)
|
||||
v = values % escape(next(args), conn)
|
||||
if isinstance(v, text_type):
|
||||
v = v.encode(encoding)
|
||||
if PY2:
|
||||
v = v.encode(encoding)
|
||||
else:
|
||||
v = v.encode(encoding, 'surrogateescape')
|
||||
sql += v
|
||||
rows = 0
|
||||
for arg in args:
|
||||
v = values % escape(arg, conn)
|
||||
if isinstance(v, text_type):
|
||||
v = v.encode(encoding)
|
||||
if PY2:
|
||||
v = v.encode(encoding)
|
||||
else:
|
||||
v = v.encode(encoding, 'surrogateescape')
|
||||
if len(sql) + len(v) + len(postfix) + 1 > max_stmt_length:
|
||||
rows += self.execute(sql + postfix)
|
||||
sql = bytearray(prefix)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import datetime
|
||||
import time
|
||||
import warnings
|
||||
|
||||
import pymysql
|
||||
@@ -226,6 +227,9 @@ class TestNewIssues(base.PyMySQLTestCase):
|
||||
|
||||
# check the process list from the other connection
|
||||
try:
|
||||
# Wait since Travis-CI sometimes fail this test.
|
||||
time.sleep(0.1)
|
||||
|
||||
c = self.connections[1].cursor()
|
||||
c.execute("show processlist")
|
||||
ids = [row[0] for row in c.fetchall()]
|
||||
|
||||
Reference in New Issue
Block a user