DictCursor: use table.column when column name conflicts.

DictCurosr now implemented via DictCursorMixin.
This commit is contained in:
INADA Naoki
2013-10-01 02:26:56 +09:00
parent fe3cc1f8b2
commit 6c2cf255bc
3 changed files with 43 additions and 26 deletions

View File

@@ -414,10 +414,10 @@ class FieldDescriptorPacket(MysqlPacket):
"""
self.catalog = self.read_length_coded_string()
self.db = self.read_length_coded_string()
self.table_name = self.read_length_coded_string()
self.org_table = self.read_length_coded_string()
self.table_name = self.read_length_coded_string().decode(encoding)
self.org_table = self.read_length_coded_string().decode(encoding)
self.name = self.read_length_coded_string().decode(encoding)
self.org_name = self.read_length_coded_string()
self.org_name = self.read_length_coded_string().decode(encoding)
self.advance(1) # non-null filler
self.charsetnr = struct.unpack('<H', self.read(2))[0]
self.length = struct.unpack('<I', self.read(4))[0]

View File

@@ -233,39 +233,48 @@ class Cursor(object):
NotSupportedError = NotSupportedError
class DictCursor(Cursor):
"""A cursor which returns results as a dictionary"""
class DictCursorMixin(object):
# You can override this to use OrderedDict or other dict-like types.
dict_type = dict
def execute(self, query, args=None):
result = super(DictCursor, self).execute(query, args)
result = super(DictCursorMixin, self).execute(query, args)
if self.description:
self._fields = [field[0] for field in self.description]
fields = []
for f in self._result.fields:
name = f.name
if name in fields:
name = f.table_name + '.' + name
fields.append(name)
self._fields = fields
return result
def fetchone(self):
''' Fetch the next row '''
result = super(DictCursor, self).fetchone()
result = super(DictCursorMixin, self).fetchone()
if result is None:
return None
return self.dict_type(zip(self._fields, result))
def fetchmany(self, size=None):
''' Fetch several rows '''
rows = super(DictCursor, self).fetchmany(size)
rows = super(DictCursorMixin, self).fetchmany(size)
if rows is None:
return None
return [self.dict_type(zip(self._fields, r)) for r in rows]
def fetchall(self):
''' Fetch all the rows '''
rows = super(DictCursor, self).fetchall()
rows = super(DictCursorMixin, self).fetchall()
if rows is None:
return None
return [self.dict_type(zip(self._fields, r)) for r in rows]
class DictCursor(DictCursorMixin, Cursor):
"""A cursor which returns results as a dictionary"""
class SSCursor(Cursor):
"""
Unbuffered Cursor, mainly useful for queries that return a lot of data,
@@ -379,20 +388,5 @@ class SSCursor(Cursor):
raise ProgrammingError("unknown scroll mode %s" % mode)
class SSDictCursor(SSCursor):
class SSDictCursor(DictCursorMixin, SSCursor):
""" An unbuffered cursor, which returns results as a dictionary """
# You can override this to use OrderedDict or other dict-like types.
dict_type = dict
def execute(self, query, args=None):
result = super(SSDictCursor, self).execute(query, args)
if self.description:
self._fields = [field[0] for field in self.description]
return result
def read_next(self):
""" Read next row """
row = super(SSDictCursor, self).read_next()
if row is None:
return None
return self.dict_type(zip(self._fields, row))

View File

@@ -275,6 +275,29 @@ class TestGitHubIssues(base.PyMySQLTestCase):
self.assertTrue(c.fetchone()[0])
conn.close()
def test_Duplicate_field(self):
'''#79'''
conn = self.connections[0]
c = conn.cursor(pymysql.cursors.DictCursor)
c.execute("""CREATE TABLE a (id int, value int)""")
c.execute("""CREATE TABLE b (id int, value int)""")
a=(1,11)
b=(1,22)
try:
c.execute("insert into a values (%s, %s)", a)
c.execute("insert into b values (%s, %s)", b)
c.execute("SELECT * FROM a inner join b on a.id = b.id")
r = c.fetchall()[0]
self.assertEqual(r['id'], 1)
self.assertEqual(r['value'], 11)
self.assertEqual(r['b.value'], 22)
finally:
c.execute("drop table a")
c.execute("drop table b")
def test_issue_95(self):
conn = self.connections[0]
cur = conn.cursor()