Merge pull request #507 from dpkp/deprecation_warnings

Add DeprecationWarnings to legacy classes
This commit is contained in:
Dana Powers
2016-01-12 22:57:24 -08:00
8 changed files with 51 additions and 14 deletions

View File

@@ -1,5 +1,5 @@
KafkaClient KafkaClient
=========== ===========
.. autoclass:: kafka.KafkaClient .. autoclass:: kafka.client.KafkaClient
:members: :members:

View File

@@ -4,14 +4,28 @@ __author__ = 'Dana Powers'
__license__ = 'Apache License 2.0' __license__ = 'Apache License 2.0'
__copyright__ = 'Copyright 2016 Dana Powers, David Arthur, and Contributors' __copyright__ = 'Copyright 2016 Dana Powers, David Arthur, and Contributors'
from kafka.client import KafkaClient as SimpleClient from kafka.consumer import KafkaConsumer
from kafka.client_async import KafkaClient
from kafka.conn import BrokerConnection from kafka.conn import BrokerConnection
from kafka.protocol import ( from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message) create_message, create_gzip_message, create_snappy_message)
from kafka.producer import SimpleProducer, KeyedProducer
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
from kafka.consumer import KafkaConsumer, SimpleConsumer, MultiProcessConsumer
# To be deprecated when KafkaProducer interface is released
from kafka.client import SimpleClient
from kafka.producer import SimpleProducer, KeyedProducer
# deprecated in favor of KafkaConsumer
from kafka.consumer import SimpleConsumer, MultiProcessConsumer
import warnings
class KafkaClient(SimpleClient):
def __init__(self, *args, **kwargs):
warnings.warn('The legacy KafkaClient interface has been moved to'
' kafka.SimpleClient - this import will break in a'
' future release', DeprecationWarning)
super(KafkaClient, self).__init__(*args, **kwargs)
__all__ = [ __all__ = [
'KafkaConsumer', 'KafkaClient', 'BrokerConnection', 'KafkaConsumer', 'KafkaClient', 'BrokerConnection',

View File

@@ -19,11 +19,17 @@ from kafka.conn import (
ConnectionStates) ConnectionStates)
from kafka.protocol import KafkaProtocol from kafka.protocol import KafkaProtocol
# New KafkaClient
# this is not exposed in top-level imports yet,
# due to conflicts with legacy SimpleConsumer / SimpleProducer usage
from kafka.client_async import KafkaClient
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class KafkaClient(object): # Legacy KafkaClient interface -- will be deprecated soon
class SimpleClient(object):
CLIENT_ID = b'kafka-python' CLIENT_ID = b'kafka-python'

View File

@@ -9,6 +9,7 @@ import socket
import struct import struct
from threading import local from threading import local
import time import time
import warnings
import six import six
@@ -375,6 +376,8 @@ class KafkaConnection(local):
in seconds. None means no timeout, so a request can block forever. in seconds. None means no timeout, so a request can block forever.
""" """
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
warnings.warn('KafkaConnection has been deprecated and will be'
' removed in a future release', DeprecationWarning)
super(KafkaConnection, self).__init__() super(KafkaConnection, self).__init__()
self.host = host self.host = host
self.port = port self.port = port

View File

@@ -4,6 +4,7 @@ import atexit
import logging import logging
import numbers import numbers
from threading import Lock from threading import Lock
import warnings
import kafka.common import kafka.common
from kafka.common import ( from kafka.common import (
@@ -46,6 +47,9 @@ class Consumer(object):
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL): auto_commit_every_t=AUTO_COMMIT_INTERVAL):
warnings.warn('deprecated -- this class will be removed in a future'
' release. Use KafkaConsumer instead.',
DeprecationWarning)
self.client = client self.client = client
self.topic = topic self.topic = topic
self.group = group self.group = group

View File

@@ -4,6 +4,7 @@ from collections import namedtuple
import logging import logging
from multiprocessing import Process, Manager as MPManager from multiprocessing import Process, Manager as MPManager
import time import time
import warnings
from six.moves import queue from six.moves import queue
@@ -135,6 +136,10 @@ class MultiProcessConsumer(Consumer):
partitions_per_proc=0, partitions_per_proc=0,
**simple_consumer_options): **simple_consumer_options):
warnings.warn('This class has been deprecated and will be removed in a'
' future release. Use KafkaConsumer instead',
DeprecationWarning)
# Initiate the base consumer class # Initiate the base consumer class
super(MultiProcessConsumer, self).__init__( super(MultiProcessConsumer, self).__init__(
client, group, topic, client, group, topic,

View File

@@ -7,6 +7,7 @@ except ImportError:
import logging import logging
import sys import sys
import time import time
import warnings
import six import six
from six.moves import queue from six.moves import queue
@@ -40,6 +41,8 @@ class FetchContext(object):
Class for managing the state of a consumer during fetch Class for managing the state of a consumer during fetch
""" """
def __init__(self, consumer, block, timeout): def __init__(self, consumer, block, timeout):
warnings.warn('deprecated - this class will be removed in a future'
' release', DeprecationWarning)
self.consumer = consumer self.consumer = consumer
self.block = block self.block = block
@@ -116,6 +119,9 @@ class SimpleConsumer(Consumer):
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
iter_timeout=None, iter_timeout=None,
auto_offset_reset='largest'): auto_offset_reset='largest'):
warnings.warn('deprecated - this class will be removed in a future'
' release. Use KafkaConsumer instead.',
DeprecationWarning)
super(SimpleConsumer, self).__init__( super(SimpleConsumer, self).__init__(
client, group, topic, client, group, topic,
partitions=partitions, partitions=partitions,

View File

@@ -1,29 +1,28 @@
from . import unittest from . import unittest
class TestPackage(unittest.TestCase): class TestPackage(unittest.TestCase):
def test_top_level_namespace(self): def test_top_level_namespace(self):
import kafka as kafka1 import kafka as kafka1
self.assertEqual(kafka1.KafkaClient.__name__, "KafkaClient") self.assertEqual(kafka1.KafkaConsumer.__name__, "KafkaConsumer")
self.assertEqual(kafka1.client.__name__, "kafka.client") self.assertEqual(kafka1.consumer.__name__, "kafka.consumer")
self.assertEqual(kafka1.codec.__name__, "kafka.codec") self.assertEqual(kafka1.codec.__name__, "kafka.codec")
def test_submodule_namespace(self): def test_submodule_namespace(self):
import kafka.client as client1 import kafka.client as client1
self.assertEqual(client1.__name__, "kafka.client") self.assertEqual(client1.__name__, "kafka.client")
self.assertEqual(client1.KafkaClient.__name__, "KafkaClient")
from kafka import client as client2 from kafka import client as client2
self.assertEqual(client2.__name__, "kafka.client") self.assertEqual(client2.__name__, "kafka.client")
self.assertEqual(client2.KafkaClient.__name__, "KafkaClient")
from kafka.client import KafkaClient as KafkaClient1 from kafka.client import SimpleClient as SimpleClient1
self.assertEqual(KafkaClient1.__name__, "KafkaClient") self.assertEqual(SimpleClient1.__name__, "SimpleClient")
from kafka.codec import gzip_encode as gzip_encode1 from kafka.codec import gzip_encode as gzip_encode1
self.assertEqual(gzip_encode1.__name__, "gzip_encode") self.assertEqual(gzip_encode1.__name__, "gzip_encode")
from kafka import KafkaClient as KafkaClient2 from kafka import SimpleClient as SimpleClient2
self.assertEqual(KafkaClient2.__name__, "KafkaClient") self.assertEqual(SimpleClient2.__name__, "SimpleClient")
from kafka.codec import snappy_encode from kafka.codec import snappy_encode
self.assertEqual(snappy_encode.__name__, "snappy_encode") self.assertEqual(snappy_encode.__name__, "snappy_encode")