180 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Cython
		
	
	
	
	
	
			
		
		
	
	
			180 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Cython
		
	
	
	
	
	
# Copyright 2013-2016 DataStax, Inc.
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
# http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
"""
 | 
						|
This module provider an optional protocol parser that returns
 | 
						|
NumPy arrays.
 | 
						|
 | 
						|
=============================================================================
 | 
						|
This module should not be imported by any of the main python-driver modules,
 | 
						|
as numpy is an optional dependency.
 | 
						|
=============================================================================
 | 
						|
"""
 | 
						|
 | 
						|
include "ioutils.pyx"
 | 
						|
 | 
						|
cimport cython
 | 
						|
from libc.stdint cimport uint64_t
 | 
						|
from cpython.ref cimport Py_INCREF, PyObject
 | 
						|
 | 
						|
from cassandra.bytesio cimport BytesIOReader
 | 
						|
from cassandra.deserializers cimport Deserializer, from_binary
 | 
						|
from cassandra.parsing cimport ParseDesc, ColumnParser, RowParser
 | 
						|
from cassandra import cqltypes
 | 
						|
from cassandra.util import is_little_endian
 | 
						|
 | 
						|
import numpy as np
 | 
						|
# import pandas as pd
 | 
						|
 | 
						|
cdef extern from "numpyFlags.h":
 | 
						|
    # Include 'numpyFlags.h' into the generated C code to disable the
 | 
						|
    # deprecated NumPy API
 | 
						|
    pass
 | 
						|
 | 
						|
cdef extern from "Python.h":
 | 
						|
    # An integer type large enough to hold a pointer
 | 
						|
    ctypedef uint64_t Py_uintptr_t
 | 
						|
 | 
						|
 | 
						|
# Simple array descriptor, useful to parse rows into a NumPy array
 | 
						|
ctypedef struct ArrDesc:
 | 
						|
    Py_uintptr_t buf_ptr
 | 
						|
    int stride # should be large enough as we allocate contiguous arrays
 | 
						|
    int is_object
 | 
						|
    int empty_binary_ok
 | 
						|
 | 
						|
arrDescDtype = np.dtype(
 | 
						|
    [ ('buf_ptr', np.uintp)
 | 
						|
    , ('stride', np.dtype('i'))
 | 
						|
    , ('is_object', np.dtype('i'))
 | 
						|
    , ('empty_binary_ok', np.dtype('i'))
 | 
						|
    ], align=True)
 | 
						|
 | 
						|
_cqltype_to_numpy = {
 | 
						|
    cqltypes.LongType:          np.dtype('>i8'),
 | 
						|
    cqltypes.CounterColumnType: np.dtype('>i8'),
 | 
						|
    cqltypes.Int32Type:         np.dtype('>i4'),
 | 
						|
    cqltypes.ShortType:         np.dtype('>i2'),
 | 
						|
    cqltypes.FloatType:         np.dtype('>f4'),
 | 
						|
    cqltypes.DoubleType:        np.dtype('>f8'),
 | 
						|
}
 | 
						|
 | 
						|
obj_dtype = np.dtype('O')
 | 
						|
 | 
						|
 | 
						|
cdef class NumpyParser(ColumnParser):
 | 
						|
    """Decode a ResultMessage into a bunch of NumPy arrays"""
 | 
						|
 | 
						|
    cpdef parse_rows(self, BytesIOReader reader, ParseDesc desc):
 | 
						|
        cdef Py_ssize_t rowcount
 | 
						|
        cdef ArrDesc[::1] array_descs
 | 
						|
        cdef ArrDesc *arrs
 | 
						|
 | 
						|
        rowcount = read_int(reader)
 | 
						|
        array_descs, arrays = make_arrays(desc, rowcount)
 | 
						|
        arrs = &array_descs[0]
 | 
						|
 | 
						|
        _parse_rows(reader, desc, arrs, rowcount)
 | 
						|
 | 
						|
        arrays = [make_native_byteorder(arr) for arr in arrays]
 | 
						|
        result = dict(zip(desc.colnames, arrays))
 | 
						|
        return result
 | 
						|
 | 
						|
 | 
						|
cdef _parse_rows(BytesIOReader reader, ParseDesc desc,
 | 
						|
                 ArrDesc *arrs, Py_ssize_t rowcount):
 | 
						|
    cdef Py_ssize_t i
 | 
						|
 | 
						|
    for i in range(rowcount):
 | 
						|
        unpack_row(reader, desc, arrs)
 | 
						|
 | 
						|
 | 
						|
### Helper functions to create NumPy arrays and array descriptors
 | 
						|
 | 
						|
def make_arrays(ParseDesc desc, array_size):
 | 
						|
    """
 | 
						|
    Allocate arrays for each result column.
 | 
						|
 | 
						|
    returns a tuple of (array_descs, arrays), where
 | 
						|
        'array_descs' describe the arrays for NativeRowParser and
 | 
						|
        'arrays' is a dict mapping column names to arrays
 | 
						|
            (e.g. this can be fed into pandas.DataFrame)
 | 
						|
    """
 | 
						|
    array_descs = np.empty((desc.rowsize,), arrDescDtype)
 | 
						|
    arrays = []
 | 
						|
 | 
						|
    for i, coltype in enumerate(desc.coltypes):
 | 
						|
        arr = make_array(coltype, array_size)
 | 
						|
        array_descs[i]['buf_ptr'] = arr.ctypes.data
 | 
						|
        array_descs[i]['stride'] = arr.strides[0]
 | 
						|
        array_descs[i]['is_object'] = coltype not in _cqltype_to_numpy
 | 
						|
        array_descs[i]['empty_binary_ok'] = coltype.empty_binary_ok
 | 
						|
        arrays.append(arr)
 | 
						|
 | 
						|
    return array_descs, arrays
 | 
						|
 | 
						|
 | 
						|
def make_array(coltype, array_size):
 | 
						|
    """
 | 
						|
    Allocate a new NumPy array of the given column type and size.
 | 
						|
    """
 | 
						|
    dtype = _cqltype_to_numpy.get(coltype, obj_dtype)
 | 
						|
    return np.empty((array_size,), dtype=dtype)
 | 
						|
 | 
						|
 | 
						|
#### Parse rows into NumPy arrays
 | 
						|
 | 
						|
@cython.boundscheck(False)
 | 
						|
@cython.wraparound(False)
 | 
						|
cdef inline int unpack_row(
 | 
						|
        BytesIOReader reader, ParseDesc desc, ArrDesc *arrays) except -1:
 | 
						|
    cdef Buffer buf
 | 
						|
    cdef Py_ssize_t i, rowsize = desc.rowsize
 | 
						|
    cdef ArrDesc arr
 | 
						|
    cdef Deserializer deserializer
 | 
						|
 | 
						|
    for i in range(rowsize):
 | 
						|
        get_buf(reader, &buf)
 | 
						|
        arr = arrays[i]
 | 
						|
 | 
						|
        if buf.size < 0 or (buf.size == 0 and not arr.empty_binary_ok):
 | 
						|
            raise ValueError("Cannot handle NULL value")
 | 
						|
 | 
						|
        if arr.is_object:
 | 
						|
            deserializer = desc.deserializers[i]
 | 
						|
            val = from_binary(deserializer, &buf, desc.protocol_version)
 | 
						|
            Py_INCREF(val)
 | 
						|
            (<PyObject **> arr.buf_ptr)[0] = <PyObject *> val
 | 
						|
        else:
 | 
						|
            memcpy(<char *> arr.buf_ptr, buf.ptr, buf.size)
 | 
						|
 | 
						|
        # Update the pointer into the array for the next time
 | 
						|
        arrays[i].buf_ptr += arr.stride
 | 
						|
 | 
						|
    return 0
 | 
						|
 | 
						|
 | 
						|
def make_native_byteorder(arr):
 | 
						|
    """
 | 
						|
    Make sure all values have a native endian in the NumPy arrays.
 | 
						|
    """
 | 
						|
    if is_little_endian and not arr.dtype.kind == 'O':
 | 
						|
        # We have arrays in big-endian order. First swap the bytes
 | 
						|
        # into little endian order, and then update the numpy dtype
 | 
						|
        # accordingly (e.g. from '>i8' to '<i8')
 | 
						|
        #
 | 
						|
        # Ignore any object arrays of dtype('O')
 | 
						|
        return arr.byteswap().newbyteorder()
 | 
						|
    return arr
 |