Merge pull request #491 from vtermanis/warning_fixes
Warning propagation improvements
This commit is contained in:
@@ -30,6 +30,8 @@ class Cursor(object):
|
||||
#: Default value of max_allowed_packet is 1048576.
|
||||
max_stmt_length = 1024000
|
||||
|
||||
_defer_warnings = False
|
||||
|
||||
def __init__(self, connection):
|
||||
'''
|
||||
Do not create an instance of a Cursor yourself. Call
|
||||
@@ -43,6 +45,7 @@ class Cursor(object):
|
||||
self._executed = None
|
||||
self._result = None
|
||||
self._rows = None
|
||||
self._warnings_handled = False
|
||||
|
||||
def close(self):
|
||||
'''
|
||||
@@ -86,6 +89,9 @@ class Cursor(object):
|
||||
"""Get the next query set"""
|
||||
conn = self._get_db()
|
||||
current_result = self._result
|
||||
# for unbuffered queries warnings are only available once whole result has been read
|
||||
if unbuffered:
|
||||
self._show_warnings()
|
||||
if current_result is None or current_result is not conn._result:
|
||||
return None
|
||||
if not current_result.has_next:
|
||||
@@ -328,14 +334,18 @@ class Cursor(object):
|
||||
self.description = result.description
|
||||
self.lastrowid = result.insert_id
|
||||
self._rows = result.rows
|
||||
self._warnings_handled = False
|
||||
|
||||
if result.warning_count > 0:
|
||||
self._show_warnings(conn)
|
||||
if not self._defer_warnings:
|
||||
self._show_warnings()
|
||||
|
||||
def _show_warnings(self, conn):
|
||||
if self._result and self._result.has_next:
|
||||
def _show_warnings(self):
|
||||
if self._warnings_handled:
|
||||
return
|
||||
ws = conn.show_warnings()
|
||||
self._warnings_handled = True
|
||||
if self._result and (self._result.has_next or not self._result.warning_count):
|
||||
return
|
||||
ws = self._get_db().show_warnings()
|
||||
if ws is None:
|
||||
return
|
||||
for w in ws:
|
||||
@@ -343,7 +353,7 @@ class Cursor(object):
|
||||
if PY2:
|
||||
if isinstance(msg, unicode):
|
||||
msg = msg.encode('utf-8', 'replace')
|
||||
warnings.warn(str(msg), err.Warning, 4)
|
||||
warnings.warn(err.Warning(*w[1:3]), stacklevel=4)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.fetchone, None)
|
||||
@@ -404,6 +414,8 @@ class SSCursor(Cursor):
|
||||
possible to scroll backwards, as only the current row is held in memory.
|
||||
"""
|
||||
|
||||
_defer_warnings = True
|
||||
|
||||
def _conv_row(self, row):
|
||||
return row
|
||||
|
||||
@@ -440,6 +452,7 @@ class SSCursor(Cursor):
|
||||
self._check_executed()
|
||||
row = self.read_next()
|
||||
if row is None:
|
||||
self._show_warnings()
|
||||
return None
|
||||
self.rownumber += 1
|
||||
return row
|
||||
@@ -473,6 +486,7 @@ class SSCursor(Cursor):
|
||||
for i in range_type(size):
|
||||
row = self.read_next()
|
||||
if row is None:
|
||||
self._show_warnings()
|
||||
break
|
||||
rows.append(row)
|
||||
self.rownumber += 1
|
||||
|
||||
@@ -21,7 +21,9 @@ class TestDictCursor(base.PyMySQLTestCase):
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings("ignore")
|
||||
c.execute("drop table if exists dictcursor")
|
||||
c.execute("""CREATE TABLE dictcursor (name char(20), age int , DOB datetime)""")
|
||||
# include in filterwarnings since for unbuffered dict cursor warning for lack of table
|
||||
# will only be propagated at start of next execute() call
|
||||
c.execute("""CREATE TABLE dictcursor (name char(20), age int , DOB datetime)""")
|
||||
data = [("bob", 21, "1990-02-06 23:04:56"),
|
||||
("jim", 56, "1955-05-09 13:12:45"),
|
||||
("fred", 100, "1911-09-12 01:01:01")]
|
||||
|
||||
@@ -4,6 +4,8 @@ import warnings
|
||||
import sys
|
||||
|
||||
import pymysql
|
||||
from pymysql import cursors
|
||||
from pymysql._compat import text_type
|
||||
from pymysql.tests import base
|
||||
import unittest2
|
||||
|
||||
@@ -486,3 +488,28 @@ class TestGitHubIssues(base.PyMySQLTestCase):
|
||||
# don't assert the exact internal binary value, as it could
|
||||
# vary across implementations
|
||||
self.assertTrue(isinstance(row[0], bytes))
|
||||
|
||||
def test_issue_491(self):
|
||||
""" Test warning propagation """
|
||||
conn = pymysql.connect(charset="utf8", **self.databases[0])
|
||||
|
||||
with warnings.catch_warnings():
|
||||
# Ignore all warnings other than pymysql generated ones
|
||||
warnings.simplefilter("ignore")
|
||||
warnings.simplefilter("error", category=pymysql.Warning)
|
||||
|
||||
# verify for both buffered and unbuffered cursor types
|
||||
for cursor_class in (cursors.Cursor, cursors.SSCursor):
|
||||
c = conn.cursor(cursor_class)
|
||||
try:
|
||||
c.execute("SELECT CAST('124b' AS SIGNED)")
|
||||
c.fetchall()
|
||||
except pymysql.Warning as e:
|
||||
# Warnings should have errorcode and string message, just like exceptions
|
||||
self.assertEqual(len(e.args), 2)
|
||||
self.assertEqual(e.args[0], 1292)
|
||||
self.assertTrue(isinstance(e.args[1], text_type))
|
||||
else:
|
||||
self.fail("Should raise Warning")
|
||||
finally:
|
||||
c.close()
|
||||
|
||||
Reference in New Issue
Block a user