Port to thriftpy

This commit is contained in:
Wouter Bolsterlee
2016-03-27 21:46:42 +02:00
parent 1ded319c92
commit 9212da0fd4
8 changed files with 25 additions and 24 deletions

View File

@@ -3,11 +3,12 @@ HappyBase, a developer-friendly Python library to interact with Apache
HBase. HBase.
""" """
import thriftpy as _thriftpy
_thriftpy.install_import_hook()
from ._version import __version__ from ._version import __version__
from .connection import DEFAULT_HOST, DEFAULT_PORT, Connection from .connection import DEFAULT_HOST, DEFAULT_PORT, Connection
from .table import Table from .table import Table
from .batch import Batch from .batch import Batch
from .pool import ConnectionPool, NoConnectionsAvailable from .pool import ConnectionPool, NoConnectionsAvailable
# TODO: properly handle errors defined in Thrift specification

View File

@@ -6,7 +6,7 @@ from collections import defaultdict
import logging import logging
from numbers import Integral from numbers import Integral
from .hbase.ttypes import BatchMutation, Mutation from .Hbase_thrift import BatchMutation, Mutation
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -6,12 +6,11 @@ HappyBase connection module.
import logging import logging
from thrift.transport.TSocket import TSocket from thriftpy.thrift import TClient
from thrift.transport.TTransport import TBufferedTransport, TFramedTransport from thriftpy.transport import TBufferedTransport, TFramedTransport, TSocket
from thrift.protocol import TBinaryProtocol, TCompactProtocol from thriftpy.protocol import TBinaryProtocol, TCompactProtocol
from .hbase import Hbase from .Hbase_thrift import Hbase, ColumnDescriptor
from .hbase.ttypes import ColumnDescriptor
from .table import Table from .table import Table
from .util import pep8_to_camel_case from .util import pep8_to_camel_case
@@ -23,8 +22,8 @@ THRIFT_TRANSPORTS = dict(
framed=TFramedTransport, framed=TFramedTransport,
) )
THRIFT_PROTOCOLS = dict( THRIFT_PROTOCOLS = dict(
binary=TBinaryProtocol.TBinaryProtocolAccelerated, binary=TBinaryProtocol,
compact=TCompactProtocol.TCompactProtocol, compact=TCompactProtocol,
) )
DEFAULT_HOST = 'localhost' DEFAULT_HOST = 'localhost'
@@ -78,8 +77,8 @@ class Connection(object):
since otherwise you might see non-obvious connection errors or since otherwise you might see non-obvious connection errors or
program hangs when making a connection. ``TCompactProtocol`` is program hangs when making a connection. ``TCompactProtocol`` is
a more compact binary format that is typically more efficient to a more compact binary format that is typically more efficient to
process as well. ``TBinaryAccelerated`` is the default protocol that process as well. ``TBinaryProtocol`` is the default protocol that
happybase uses. Happybase uses.
.. versionadded:: 0.9 .. versionadded:: 0.9
`protocol` argument `protocol` argument
@@ -148,11 +147,11 @@ class Connection(object):
"""Refresh the Thrift socket, transport, and client.""" """Refresh the Thrift socket, transport, and client."""
socket = TSocket(self.host, self.port) socket = TSocket(self.host, self.port)
if self.timeout is not None: if self.timeout is not None:
socket.setTimeout(self.timeout) socket.set_timeout(self.timeout)
self.transport = self._transport_class(socket) self.transport = self._transport_class(socket)
protocol = self._protocol_class(self.transport) protocol = self._protocol_class(self.transport, decode_response=False)
self.client = Hbase.Client(protocol) self.client = TClient(Hbase, protocol)
def _table_name(self, name): def _table_name(self, name):
"""Construct a table name by optionally adding a table name prefix.""" """Construct a table name by optionally adding a table name prefix."""
@@ -166,7 +165,7 @@ class Connection(object):
This method opens the underlying Thrift transport (TCP connection). This method opens the underlying Thrift transport (TCP connection).
""" """
if self.transport.isOpen(): if self.transport.is_open():
return return
logger.debug("Opening Thrift transport to %s:%d", self.host, self.port) logger.debug("Opening Thrift transport to %s:%d", self.host, self.port)
@@ -177,7 +176,7 @@ class Connection(object):
This method closes the underlying Thrift transport (TCP connection). This method closes the underlying Thrift transport (TCP connection).
""" """
if not self.transport.isOpen(): if not self.transport.is_open():
return return
if logger is not None: if logger is not None:

View File

@@ -8,7 +8,7 @@ import Queue
import socket import socket
import threading import threading
from thrift.Thrift import TException from thriftpy.thrift import TException
from .connection import Connection from .connection import Connection

View File

@@ -7,7 +7,8 @@ from numbers import Integral
from operator import attrgetter from operator import attrgetter
from struct import Struct from struct import Struct
from .hbase.ttypes import TScan from .Hbase_thrift import TScan
from .util import thrift_type_to_dict, str_increment, OrderedDict from .util import thrift_type_to_dict, str_increment, OrderedDict
from .batch import Batch from .batch import Batch

View File

@@ -45,7 +45,7 @@ def pep8_to_camel_case(name, initial=False):
def thrift_attrs(obj_or_cls): def thrift_attrs(obj_or_cls):
"""Obtain Thrift data type attribute names for an instance or class.""" """Obtain Thrift data type attribute names for an instance or class."""
return [v[2] for v in obj_or_cls.thrift_spec[1:]] return [v[1] for v in obj_or_cls.thrift_spec.values()]
def thrift_type_to_dict(obj): def thrift_type_to_dict(obj):

View File

@@ -1 +1 @@
thrift>=0.8.0 thriftpy>=0.3.7

View File

@@ -490,7 +490,7 @@ def test_connection_pool_construction():
def test_connection_pool(): def test_connection_pool():
from thrift.transport.TTransport import TTransportException from thriftpy.thrift import TException
def run(): def run():
name = threading.current_thread().name name = threading.current_thread().name
@@ -505,7 +505,7 @@ def test_connection_pool():
if random.random() < .25: if random.random() < .25:
print "Introducing random failure" print "Introducing random failure"
connection.transport.close() connection.transport.close()
raise TTransportException("Fake transport exception") raise TException("Fake transport exception")
for i in xrange(50): for i in xrange(50):
with pool.connection() as connection: with pool.connection() as connection:
@@ -513,7 +513,7 @@ def test_connection_pool():
try: try:
inner_function() inner_function()
except TTransportException: except TException:
# This error should have been picked up by the # This error should have been picked up by the
# connection pool, and the connection should have # connection pool, and the connection should have
# been replaced by a fresh one # been replaced by a fresh one