(Optional) NumPy row parser
This commit is contained in:
committed by
Mark Florisson
parent
81ff98efc2
commit
1a6534b575
@@ -1,11 +1,12 @@
|
|||||||
cdef class LLDataType:
|
# cdef class LLDataType:
|
||||||
"""
|
# """
|
||||||
Low-level Cassandra datatype
|
# Low-level Cassandra datatype
|
||||||
"""
|
# """
|
||||||
|
#
|
||||||
cdef Py_ssize_t size
|
# cdef Py_ssize_t size
|
||||||
|
#
|
||||||
cdef void deserialize_ptr(self, char *buf, Py_ssize_t size, void *out, protocol_version)
|
# cdef void deserialize_ptr(self, char *buf, Py_ssize_t size,
|
||||||
|
# Py_ssize_t index, void *out, protocol_version)
|
||||||
|
|
||||||
cdef class DataType:
|
cdef class DataType:
|
||||||
cdef object deserialize(self, char *buf, Py_ssize_t size, protocol_version)
|
cdef object deserialize(self, char *buf, Py_ssize_t size, protocol_version)
|
||||||
|
|||||||
@@ -1,29 +1,10 @@
|
|||||||
include 'marshal.pyx'
|
include 'marshal.pyx'
|
||||||
|
|
||||||
from cassandra import cqltypes
|
|
||||||
|
|
||||||
|
|
||||||
cdef class LLDataType:
|
|
||||||
cdef void deserialize_ptr(self, char *buf, Py_ssize_t size,
|
|
||||||
void *out, protocol_version):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
cdef class DataType:
|
cdef class DataType:
|
||||||
cdef object deserialize(self, char *buf, Py_ssize_t size, protocol_version):
|
cdef object deserialize(self, char *buf, Py_ssize_t size, protocol_version):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
cdef class LLInt64(LLDataType):
|
|
||||||
"""
|
|
||||||
Low-level Cassandra datatype
|
|
||||||
"""
|
|
||||||
|
|
||||||
cdef void deserialize_ptr(self, char *buf, Py_ssize_t size, void *out, protocol_version):
|
|
||||||
cdef int64_t x = int64_unpack(buf)
|
|
||||||
(<int64_t *> out)[0] = x
|
|
||||||
|
|
||||||
|
|
||||||
cdef class Int64(DataType):
|
cdef class Int64(DataType):
|
||||||
|
|
||||||
cdef object deserialize(self, char *buf, Py_ssize_t size, protocol_version):
|
cdef object deserialize(self, char *buf, Py_ssize_t size, protocol_version):
|
||||||
|
|||||||
@@ -22,8 +22,8 @@ import math
|
|||||||
from libc.stdint cimport (int8_t, int16_t, int32_t, int64_t,
|
from libc.stdint cimport (int8_t, int16_t, int32_t, int64_t,
|
||||||
uint8_t, uint16_t, uint32_t, uint64_t)
|
uint8_t, uint16_t, uint32_t, uint64_t)
|
||||||
|
|
||||||
assert sys.byteorder in ('little', 'big')
|
cdef bint is_little_endian
|
||||||
cdef bint is_little_endian = sys.byteorder == 'little'
|
from cassandra.util import is_little_endian
|
||||||
|
|
||||||
# cdef extern from "marshal.h":
|
# cdef extern from "marshal.h":
|
||||||
# cdef str c_string_to_python(char *p, Py_ssize_t len)
|
# cdef str c_string_to_python(char *p, Py_ssize_t len)
|
||||||
|
|||||||
144
cassandra/numpyparser.pyx
Normal file
144
cassandra/numpyparser.pyx
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
"""
|
||||||
|
This module provider an optional protocol parser that returns
|
||||||
|
NumPy arrays.
|
||||||
|
|
||||||
|
=============================================================================
|
||||||
|
This module should not be imported by any of the main python-driver modules,
|
||||||
|
as numpy is an optional dependency.
|
||||||
|
=============================================================================
|
||||||
|
"""
|
||||||
|
|
||||||
|
include "ioutils.pyx"
|
||||||
|
|
||||||
|
from libc.stdint cimport uint64_t
|
||||||
|
|
||||||
|
from cassandra.rowparser cimport RowParser
|
||||||
|
from cassandra.bytesio cimport BytesIOReader
|
||||||
|
from cassandra.datatypes cimport DataType
|
||||||
|
from cassandra import cqltypes
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
cimport numpy as np
|
||||||
|
|
||||||
|
from cassandra.util import is_little_endian
|
||||||
|
|
||||||
|
from cpython.ref cimport Py_INCREF, PyObject
|
||||||
|
|
||||||
|
cdef extern from "Python.h":
|
||||||
|
# An integer type large enough to hold a pointer
|
||||||
|
ctypedef uint64_t Py_uintptr_t
|
||||||
|
|
||||||
|
# ctypedef struct TypeRepr:
|
||||||
|
# Py_ssize_t size
|
||||||
|
# int is_object
|
||||||
|
|
||||||
|
ctypedef struct ArrRepr:
|
||||||
|
# TypeRepr typ
|
||||||
|
Py_uintptr_t buf_ptr
|
||||||
|
Py_ssize_t stride
|
||||||
|
int is_object
|
||||||
|
|
||||||
|
_cqltype_to_numpy = {
|
||||||
|
cqltypes.LongType: np.dtype('>i8'),
|
||||||
|
cqltypes.CounterColumnType: np.dtype('>i8'),
|
||||||
|
cqltypes.Int32Type: np.dtype('>i4'),
|
||||||
|
cqltypes.ShortType: np.dtype('>i2'),
|
||||||
|
cqltypes.FloatType: np.dtype('>f4'),
|
||||||
|
cqltypes.DoubleType: np.dtype('>f8'),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# cdef type_repr(coltype):
|
||||||
|
# """
|
||||||
|
# Get a low-level type representation for the cqltype
|
||||||
|
# """
|
||||||
|
# cdef TypeRepr res
|
||||||
|
# if coltype in _cqltype_to_numpy:
|
||||||
|
# dtype = _cqltype_to_numpy[coltype]
|
||||||
|
# res.size = dtype.itemsize
|
||||||
|
# res.is_object = False
|
||||||
|
# else:
|
||||||
|
# res.size = sizeof(PyObject *)
|
||||||
|
# res.is_object = True
|
||||||
|
# return res
|
||||||
|
|
||||||
|
|
||||||
|
cdef ArrRepr array_repr(np.ndarray arr, coltype):
|
||||||
|
"""
|
||||||
|
Construct a low-level array representation
|
||||||
|
"""
|
||||||
|
assert arr.ndim == 1, "Expected a one-dimensional array"
|
||||||
|
|
||||||
|
cdef ArrRepr res
|
||||||
|
# Get the data pointer to the underlying memory of the numpy array
|
||||||
|
res.buf_ptr = arr.ctypes.data
|
||||||
|
res.stride = arr.strides[0]
|
||||||
|
res.is_object = coltype in _cqltype_to_numpy
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
cdef class NativeRowParser(RowParser):
|
||||||
|
"""
|
||||||
|
This is a row parser that copies bytes into arrays (e.g. NumPy arrays)
|
||||||
|
for types it recognizes, such as int64. Values of other types are
|
||||||
|
converted to objects.
|
||||||
|
|
||||||
|
NOTE: This class is stateful, in that every time unpack_row is called it
|
||||||
|
advanced the pointer into the array by updates the buf_ptr field
|
||||||
|
of self.arrays
|
||||||
|
"""
|
||||||
|
|
||||||
|
# ArrRepr contains a 'buf_ptr' field, which is not supported as a memoryview dtype
|
||||||
|
cdef ArrRepr[::1] arrays
|
||||||
|
cdef DataType[::1] datatypes
|
||||||
|
cdef Py_ssize_t size
|
||||||
|
|
||||||
|
def __init__(self, ArrRepr[::1] arrays, DataType[::1] datatypes):
|
||||||
|
self.arrays = arrays
|
||||||
|
self.datatypes = datatypes
|
||||||
|
self.size = len(datatypes)
|
||||||
|
|
||||||
|
cpdef unpack_row(self, BytesIOReader reader, protocol_version):
|
||||||
|
cdef char *buf
|
||||||
|
cdef Py_ssize_t i, bufsize, rowsize = self.size
|
||||||
|
cdef ArrRepr arr
|
||||||
|
|
||||||
|
for i in range(rowsize):
|
||||||
|
buf = get_buf(reader, &bufsize)
|
||||||
|
if buf == NULL:
|
||||||
|
raise ValueError("Unexpected end of stream")
|
||||||
|
|
||||||
|
arr = self.arrays[i]
|
||||||
|
|
||||||
|
if arr.is_object:
|
||||||
|
dt = self.datatypes[i]
|
||||||
|
val = dt.deserialize(buf, bufsize, protocol_version)
|
||||||
|
Py_INCREF(val)
|
||||||
|
(<PyObject **> arr.buf_ptr)[0] = <PyObject *> val
|
||||||
|
else:
|
||||||
|
memcopy(buf, <char *> arr.buf_ptr, bufsize)
|
||||||
|
|
||||||
|
# Update the pointer into the array for the next time
|
||||||
|
self.arrays[i].buf_ptr += arr.stride
|
||||||
|
|
||||||
|
|
||||||
|
cdef inline memcopy(char *src, char *dst, Py_ssize_t size):
|
||||||
|
"""
|
||||||
|
Our own simple memcopy which can be inlined. This is useful because our data types
|
||||||
|
are only a few bytes.
|
||||||
|
"""
|
||||||
|
cdef Py_ssize_t i
|
||||||
|
for i in range(size):
|
||||||
|
dst[i] = src[i]
|
||||||
|
|
||||||
|
|
||||||
|
def make_native_byteorder(arr):
|
||||||
|
"""
|
||||||
|
Make sure all values have a native endian in the NumPy arrays.
|
||||||
|
"""
|
||||||
|
if is_little_endian:
|
||||||
|
# We have arrays in big-endian order. First swap the bytes
|
||||||
|
# into little endian order, and then update the numpy dtype
|
||||||
|
# accordingly (e.g. from '>i8' to '<i8')
|
||||||
|
return arr.byteswap().newbyteorder()
|
||||||
|
return arr
|
||||||
@@ -4,9 +4,12 @@ import datetime
|
|||||||
import random
|
import random
|
||||||
import six
|
import six
|
||||||
import uuid
|
import uuid
|
||||||
|
import sys
|
||||||
|
|
||||||
DATETIME_EPOC = datetime.datetime(1970, 1, 1)
|
DATETIME_EPOC = datetime.datetime(1970, 1, 1)
|
||||||
|
|
||||||
|
assert sys.byteorder in ('little', 'big')
|
||||||
|
is_little_endian = sys.byteorder == 'little'
|
||||||
|
|
||||||
def datetime_from_timestamp(timestamp):
|
def datetime_from_timestamp(timestamp):
|
||||||
"""
|
"""
|
||||||
|
|||||||
Reference in New Issue
Block a user