merge from master
This commit is contained in:
16
.coveragerc
Normal file
16
.coveragerc
Normal file
@@ -0,0 +1,16 @@
|
||||
[run]
|
||||
branch = True
|
||||
source =
|
||||
pymysql
|
||||
|
||||
|
||||
[report]
|
||||
exclude_lines =
|
||||
pragma: no cover
|
||||
except ImportError:
|
||||
if DEBUG:
|
||||
def __repr__
|
||||
def __str__
|
||||
raise NotImplementedError
|
||||
def __getattr__
|
||||
raise ValueError
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,6 +1,7 @@
|
||||
*.pyc
|
||||
*.pyo
|
||||
__pycache__
|
||||
.coverage
|
||||
/dist
|
||||
/PyMySQL.egg-info
|
||||
/.tox
|
||||
|
||||
@@ -39,9 +39,8 @@ matrix:
|
||||
# - TOX_ENV=py34
|
||||
# - DB=5.7.8-rc
|
||||
|
||||
before_install:
|
||||
install:
|
||||
- pip install -U tox
|
||||
- pip install -U tox coveralls
|
||||
|
||||
before_script:
|
||||
- if [ ! -z "${DB}" ]; then
|
||||
@@ -64,9 +63,14 @@ before_script:
|
||||
- "mysql -e 'create database test_pymysql DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;'"
|
||||
- "mysql -e 'create database test_pymysql2 DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;'"
|
||||
- "mysql -e 'select VERSION();'"
|
||||
- cp .travis.databases.json pymysql/tests/databases.json
|
||||
- export COVERALLS_PARALLEL=true
|
||||
|
||||
script:
|
||||
- tox -e $TOX_ENV
|
||||
|
||||
after_success:
|
||||
- coveralls
|
||||
|
||||
after_failure:
|
||||
- cat /tmp/mysql.err
|
||||
|
||||
@@ -5,6 +5,8 @@ PyMySQL
|
||||
.. image:: https://travis-ci.org/PyMySQL/PyMySQL.svg?branch=master
|
||||
:target: https://travis-ci.org/PyMySQL/PyMySQL
|
||||
|
||||
.. image:: https://coveralls.io/repos/PyMySQL/PyMySQL/badge.svg?branch=master&service=github :target: https://coveralls.io/github/PyMySQL/PyMySQL?branch=master
|
||||
|
||||
.. contents::
|
||||
|
||||
This package contains a pure-Python MySQL client library. The goal of PyMySQL
|
||||
|
||||
@@ -91,7 +91,8 @@ TEXT_TYPES = set([
|
||||
FIELD_TYPE.STRING,
|
||||
FIELD_TYPE.TINY_BLOB,
|
||||
FIELD_TYPE.VAR_STRING,
|
||||
FIELD_TYPE.VARCHAR])
|
||||
FIELD_TYPE.VARCHAR,
|
||||
FIELD_TYPE.GEOMETRY])
|
||||
|
||||
sha_new = partial(hashlib.new, 'sha1')
|
||||
|
||||
@@ -106,7 +107,7 @@ DEFAULT_CHARSET = 'latin1'
|
||||
MAX_PACKET_LEN = 2**24-1
|
||||
|
||||
|
||||
def dump_packet(data):
|
||||
def dump_packet(data): # pragma: no cover
|
||||
def is_ascii(data):
|
||||
if 65 <= byte2int(data) <= 122:
|
||||
if isinstance(data, int):
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import print_function, absolute_import
|
||||
from functools import partial
|
||||
import re
|
||||
import warnings
|
||||
|
||||
@@ -93,14 +94,30 @@ class Cursor(object):
|
||||
def nextset(self):
|
||||
return self._nextset(False)
|
||||
|
||||
def _ensure_bytes(self, x, encoding=None):
|
||||
if isinstance(x, text_type):
|
||||
x = x.encode(encoding)
|
||||
elif isinstance(x, (tuple, list)):
|
||||
x = type(x)(self._ensure_bytes(v, encoding=encoding) for v in x)
|
||||
return x
|
||||
|
||||
def _escape_args(self, args, conn):
|
||||
ensure_bytes = partial(self._ensure_bytes, encoding=conn.encoding)
|
||||
|
||||
if isinstance(args, (tuple, list)):
|
||||
if PY2:
|
||||
args = tuple(map(ensure_bytes, args))
|
||||
return tuple(conn.escape(arg) for arg in args)
|
||||
elif isinstance(args, dict):
|
||||
if PY2:
|
||||
args = dict((ensure_bytes(key), ensure_bytes(val)) for
|
||||
(key, val) in args.items())
|
||||
return dict((key, conn.escape(val)) for (key, val) in args.items())
|
||||
else:
|
||||
#If it's not a dictionary let's try escaping it anyways.
|
||||
#Worst case it will throw a Value error
|
||||
# If it's not a dictionary let's try escaping it anyways.
|
||||
# Worst case it will throw a Value error
|
||||
if PY2:
|
||||
ensure_bytes(args)
|
||||
return conn.escape(args)
|
||||
|
||||
def mogrify(self, query, args=None):
|
||||
@@ -111,24 +128,8 @@ class Cursor(object):
|
||||
This method follows the extension to the DB API 2.0 followed by Psycopg.
|
||||
"""
|
||||
conn = self._get_db()
|
||||
|
||||
if PY2: # Use bytes on Python 2 always
|
||||
encoding = conn.encoding
|
||||
|
||||
def ensure_bytes(x):
|
||||
if isinstance(x, unicode):
|
||||
x = x.encode(encoding)
|
||||
return x
|
||||
|
||||
query = ensure_bytes(query)
|
||||
|
||||
if args is not None:
|
||||
if isinstance(args, (tuple, list)):
|
||||
args = tuple(map(ensure_bytes, args))
|
||||
elif isinstance(args, dict):
|
||||
args = dict((ensure_bytes(key), ensure_bytes(val)) for (key, val) in args.items())
|
||||
else:
|
||||
args = ensure_bytes(args)
|
||||
query = self._ensure_bytes(query, encoding=conn.encoding)
|
||||
|
||||
if args is not None:
|
||||
query = query % self._escape_args(args, conn)
|
||||
@@ -173,6 +174,8 @@ class Cursor(object):
|
||||
escape = self._escape_args
|
||||
if isinstance(prefix, text_type):
|
||||
prefix = prefix.encode(encoding)
|
||||
if PY2 and isinstance(values, text_type):
|
||||
values = values.encode(encoding)
|
||||
if isinstance(postfix, text_type):
|
||||
postfix = postfix.encode(encoding)
|
||||
sql = bytearray(prefix)
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import datetime
|
||||
import decimal
|
||||
import sys
|
||||
import pymysql
|
||||
import time
|
||||
import sys
|
||||
import unittest2
|
||||
import pymysql
|
||||
from pymysql.tests import base
|
||||
|
||||
|
||||
@@ -284,6 +284,13 @@ class TestConnection(base.PyMySQLTestCase):
|
||||
self.assertEqual(('foobar',), c.fetchone())
|
||||
conn.close()
|
||||
|
||||
@unittest2.skipUnless(sys.version_info[0:2] >= (3,2), "required py-3.2")
|
||||
def test_no_delay_warning(self):
|
||||
current_db = self.databases[0].copy()
|
||||
current_db['no_delay'] = True
|
||||
with self.assertWarns(DeprecationWarning) as cm:
|
||||
conn = pymysql.connect(**current_db)
|
||||
|
||||
|
||||
# A custom type and function to escape it
|
||||
class Foo(object):
|
||||
|
||||
@@ -377,3 +377,95 @@ class TestGitHubIssues(base.PyMySQLTestCase):
|
||||
warnings.filterwarnings("ignore")
|
||||
cur.execute('drop table if exists test_field_count')
|
||||
|
||||
def test_issue_321(self):
|
||||
""" Test iterable as query argument. """
|
||||
conn = pymysql.connect(charset="utf8", **self.databases[0])
|
||||
self.safe_create_table(
|
||||
conn, "issue321",
|
||||
"create table issue321 (value_1 varchar(1), value_2 varchar(1))")
|
||||
|
||||
sql_insert = "insert into issue321 (value_1, value_2) values (%s, %s)"
|
||||
sql_dict_insert = ("insert into issue321 (value_1, value_2) "
|
||||
"values (%(value_1)s, %(value_2)s)")
|
||||
sql_select = ("select * from issue321 where "
|
||||
"value_1 in %s and value_2=%s")
|
||||
data = [
|
||||
[(u"a", ), u"\u0430"],
|
||||
[[u"b"], u"\u0430"],
|
||||
{"value_1": [[u"c"]], "value_2": u"\u0430"}
|
||||
]
|
||||
cur = conn.cursor()
|
||||
self.assertEqual(cur.execute(sql_insert, data[0]), 1)
|
||||
self.assertEqual(cur.execute(sql_insert, data[1]), 1)
|
||||
self.assertEqual(cur.execute(sql_dict_insert, data[2]), 1)
|
||||
self.assertEqual(
|
||||
cur.execute(sql_select, [(u"a", u"b", u"c"), u"\u0430"]), 3)
|
||||
self.assertEqual(cur.fetchone(), (u"a", u"\u0430"))
|
||||
self.assertEqual(cur.fetchone(), (u"b", u"\u0430"))
|
||||
self.assertEqual(cur.fetchone(), (u"c", u"\u0430"))
|
||||
|
||||
def test_issue_364(self):
|
||||
""" Test mixed unicode/binary arguments in executemany. """
|
||||
conn = pymysql.connect(charset="utf8", **self.databases[0])
|
||||
self.safe_create_table(
|
||||
conn, "issue364",
|
||||
"create table issue364 (value_1 binary(3), value_2 varchar(3)) "
|
||||
"engine=InnoDB default charset=utf8")
|
||||
|
||||
sql = "insert into issue364 (value_1, value_2) values (%s, %s)"
|
||||
usql = u"insert into issue364 (value_1, value_2) values (%s, %s)"
|
||||
values = [b"\x00\xff\x00", u"\xe4\xf6\xfc"]
|
||||
|
||||
# test single insert and select
|
||||
cur = conn.cursor()
|
||||
cur.execute(sql, args=values)
|
||||
cur.execute("select * from issue364")
|
||||
self.assertEqual(cur.fetchone(), tuple(values))
|
||||
|
||||
# test single insert unicode query
|
||||
cur.execute(usql, args=values)
|
||||
|
||||
# test multi insert and select
|
||||
cur.executemany(sql, args=(values, values, values))
|
||||
cur.execute("select * from issue364")
|
||||
for row in cur.fetchall():
|
||||
self.assertEqual(row, tuple(values))
|
||||
|
||||
# test multi insert with unicode query
|
||||
cur.executemany(usql, args=(values, values, values))
|
||||
|
||||
def test_issue_363(self):
|
||||
""" Test binary / geometry types. """
|
||||
conn = pymysql.connect(charset="utf8", **self.databases[0])
|
||||
self.safe_create_table(
|
||||
conn, "issue363",
|
||||
"CREATE TABLE issue363 ( "
|
||||
"id INTEGER PRIMARY KEY, geom LINESTRING NOT NULL, "
|
||||
"SPATIAL KEY geom (geom)) "
|
||||
"ENGINE=MyISAM default charset=utf8")
|
||||
|
||||
cur = conn.cursor()
|
||||
cur.execute("INSERT INTO issue363 (id, geom) VALUES ("
|
||||
"1998, GeomFromText('LINESTRING(1.1 1.1,2.2 2.2)'))")
|
||||
|
||||
# select WKT
|
||||
cur.execute("SELECT AsText(geom) FROM issue363")
|
||||
row = cur.fetchone()
|
||||
self.assertEqual(row, ("LINESTRING(1.1 1.1,2.2 2.2)", ))
|
||||
|
||||
# select WKB
|
||||
cur.execute("SELECT AsBinary(geom) FROM issue363")
|
||||
row = cur.fetchone()
|
||||
self.assertEqual(row,
|
||||
(b"\x01\x02\x00\x00\x00\x02\x00\x00\x00"
|
||||
b"\x9a\x99\x99\x99\x99\x99\xf1?"
|
||||
b"\x9a\x99\x99\x99\x99\x99\xf1?"
|
||||
b"\x9a\x99\x99\x99\x99\x99\x01@"
|
||||
b"\x9a\x99\x99\x99\x99\x99\x01@", ))
|
||||
|
||||
# select internal binary
|
||||
cur.execute("SELECT geom FROM issue363")
|
||||
row = cur.fetchone()
|
||||
# don't assert the exact internal binary value, as it could
|
||||
# vary across implementations
|
||||
self.assertTrue(isinstance(row[0], bytes))
|
||||
|
||||
Reference in New Issue
Block a user