Add DeprecationWarnings to legacy KafkaClient, Simple/MultiProcess/Consumer, and KafkaConnection
This commit is contained in:
		@@ -1,5 +1,5 @@
 | 
			
		||||
KafkaClient
 | 
			
		||||
===========
 | 
			
		||||
 | 
			
		||||
.. autoclass:: kafka.KafkaClient
 | 
			
		||||
.. autoclass:: kafka.client.KafkaClient
 | 
			
		||||
    :members:
 | 
			
		||||
 
 | 
			
		||||
@@ -4,14 +4,28 @@ __author__ = 'Dana Powers'
 | 
			
		||||
__license__ = 'Apache License 2.0'
 | 
			
		||||
__copyright__ = 'Copyright 2016 Dana Powers, David Arthur, and Contributors'
 | 
			
		||||
 | 
			
		||||
from kafka.client import KafkaClient as SimpleClient
 | 
			
		||||
from kafka.client_async import KafkaClient
 | 
			
		||||
from kafka.consumer import KafkaConsumer
 | 
			
		||||
from kafka.conn import BrokerConnection
 | 
			
		||||
from kafka.protocol import (
 | 
			
		||||
    create_message, create_gzip_message, create_snappy_message)
 | 
			
		||||
from kafka.producer import SimpleProducer, KeyedProducer
 | 
			
		||||
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
 | 
			
		||||
from kafka.consumer import KafkaConsumer, SimpleConsumer, MultiProcessConsumer
 | 
			
		||||
 | 
			
		||||
# To be deprecated when KafkaProducer interface is released
 | 
			
		||||
from kafka.client import SimpleClient
 | 
			
		||||
from kafka.producer import SimpleProducer, KeyedProducer
 | 
			
		||||
 | 
			
		||||
# deprecated in favor of KafkaConsumer
 | 
			
		||||
from kafka.consumer import SimpleConsumer, MultiProcessConsumer
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
import warnings
 | 
			
		||||
class KafkaClient(SimpleClient):
 | 
			
		||||
    def __init__(self, *args, **kwargs):
 | 
			
		||||
        warnings.warn('The legacy KafkaClient interface has been moved to'
 | 
			
		||||
                      ' kafka.SimpleClient - this import will break in a'
 | 
			
		||||
                      ' future release', DeprecationWarning)
 | 
			
		||||
        super(KafkaClient, self).__init__(*args, **kwargs)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
__all__ = [
 | 
			
		||||
    'KafkaConsumer', 'KafkaClient', 'BrokerConnection',
 | 
			
		||||
 
 | 
			
		||||
@@ -19,11 +19,17 @@ from kafka.conn import (
 | 
			
		||||
    ConnectionStates)
 | 
			
		||||
from kafka.protocol import KafkaProtocol
 | 
			
		||||
 | 
			
		||||
# New KafkaClient
 | 
			
		||||
# this is not exposed in top-level imports yet,
 | 
			
		||||
# due to conflicts with legacy SimpleConsumer / SimpleProducer usage
 | 
			
		||||
from kafka.client_async import KafkaClient
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class KafkaClient(object):
 | 
			
		||||
# Legacy KafkaClient interface -- will be deprecated soon
 | 
			
		||||
class SimpleClient(object):
 | 
			
		||||
 | 
			
		||||
    CLIENT_ID = b'kafka-python'
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -9,6 +9,7 @@ import socket
 | 
			
		||||
import struct
 | 
			
		||||
from threading import local
 | 
			
		||||
import time
 | 
			
		||||
import warnings
 | 
			
		||||
 | 
			
		||||
import six
 | 
			
		||||
 | 
			
		||||
@@ -375,6 +376,8 @@ class KafkaConnection(local):
 | 
			
		||||
            in seconds. None means no timeout, so a request can block forever.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
 | 
			
		||||
        warnings.warn('KafkaConnection has been deprecated and will be'
 | 
			
		||||
                      ' removed in a future release', DeprecationWarning)
 | 
			
		||||
        super(KafkaConnection, self).__init__()
 | 
			
		||||
        self.host = host
 | 
			
		||||
        self.port = port
 | 
			
		||||
 
 | 
			
		||||
@@ -4,6 +4,7 @@ import atexit
 | 
			
		||||
import logging
 | 
			
		||||
import numbers
 | 
			
		||||
from threading import Lock
 | 
			
		||||
import warnings
 | 
			
		||||
 | 
			
		||||
import kafka.common
 | 
			
		||||
from kafka.common import (
 | 
			
		||||
@@ -46,6 +47,9 @@ class Consumer(object):
 | 
			
		||||
                 auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
 | 
			
		||||
                 auto_commit_every_t=AUTO_COMMIT_INTERVAL):
 | 
			
		||||
 | 
			
		||||
        warnings.warn('deprecated -- this class will be removed in a future'
 | 
			
		||||
                      ' release. Use KafkaConsumer instead.',
 | 
			
		||||
                      DeprecationWarning)
 | 
			
		||||
        self.client = client
 | 
			
		||||
        self.topic = topic
 | 
			
		||||
        self.group = group
 | 
			
		||||
 
 | 
			
		||||
@@ -4,6 +4,7 @@ from collections import namedtuple
 | 
			
		||||
import logging
 | 
			
		||||
from multiprocessing import Process, Manager as MPManager
 | 
			
		||||
import time
 | 
			
		||||
import warnings
 | 
			
		||||
 | 
			
		||||
from six.moves import queue
 | 
			
		||||
 | 
			
		||||
@@ -135,6 +136,10 @@ class MultiProcessConsumer(Consumer):
 | 
			
		||||
                 partitions_per_proc=0,
 | 
			
		||||
                 **simple_consumer_options):
 | 
			
		||||
 | 
			
		||||
        warnings.warn('This class has been deprecated and will be removed in a'
 | 
			
		||||
                      ' future release. Use KafkaConsumer instead',
 | 
			
		||||
                      DeprecationWarning)
 | 
			
		||||
 | 
			
		||||
        # Initiate the base consumer class
 | 
			
		||||
        super(MultiProcessConsumer, self).__init__(
 | 
			
		||||
            client, group, topic,
 | 
			
		||||
 
 | 
			
		||||
@@ -7,6 +7,7 @@ except ImportError:
 | 
			
		||||
import logging
 | 
			
		||||
import sys
 | 
			
		||||
import time
 | 
			
		||||
import warnings
 | 
			
		||||
 | 
			
		||||
import six
 | 
			
		||||
from six.moves import queue
 | 
			
		||||
@@ -40,6 +41,8 @@ class FetchContext(object):
 | 
			
		||||
    Class for managing the state of a consumer during fetch
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, consumer, block, timeout):
 | 
			
		||||
        warnings.warn('deprecated - this class will be removed in a future'
 | 
			
		||||
                      ' release', DeprecationWarning)
 | 
			
		||||
        self.consumer = consumer
 | 
			
		||||
        self.block = block
 | 
			
		||||
 | 
			
		||||
@@ -116,6 +119,9 @@ class SimpleConsumer(Consumer):
 | 
			
		||||
                 max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
 | 
			
		||||
                 iter_timeout=None,
 | 
			
		||||
                 auto_offset_reset='largest'):
 | 
			
		||||
        warnings.warn('deprecated - this class will be removed in a future'
 | 
			
		||||
                      ' release. Use KafkaConsumer instead.',
 | 
			
		||||
                      DeprecationWarning)
 | 
			
		||||
        super(SimpleConsumer, self).__init__(
 | 
			
		||||
            client, group, topic,
 | 
			
		||||
            partitions=partitions,
 | 
			
		||||
 
 | 
			
		||||
@@ -1,29 +1,28 @@
 | 
			
		||||
from . import unittest
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestPackage(unittest.TestCase):
 | 
			
		||||
    def test_top_level_namespace(self):
 | 
			
		||||
        import kafka as kafka1
 | 
			
		||||
        self.assertEqual(kafka1.KafkaClient.__name__, "KafkaClient")
 | 
			
		||||
        self.assertEqual(kafka1.client.__name__, "kafka.client")
 | 
			
		||||
        self.assertEqual(kafka1.KafkaConsumer.__name__, "KafkaConsumer")
 | 
			
		||||
        self.assertEqual(kafka1.consumer.__name__, "kafka.consumer")
 | 
			
		||||
        self.assertEqual(kafka1.codec.__name__, "kafka.codec")
 | 
			
		||||
 | 
			
		||||
    def test_submodule_namespace(self):
 | 
			
		||||
        import kafka.client as client1
 | 
			
		||||
        self.assertEqual(client1.__name__, "kafka.client")
 | 
			
		||||
        self.assertEqual(client1.KafkaClient.__name__, "KafkaClient")
 | 
			
		||||
 | 
			
		||||
        from kafka import client as client2
 | 
			
		||||
        self.assertEqual(client2.__name__, "kafka.client")
 | 
			
		||||
        self.assertEqual(client2.KafkaClient.__name__, "KafkaClient")
 | 
			
		||||
 | 
			
		||||
        from kafka.client import KafkaClient as KafkaClient1
 | 
			
		||||
        self.assertEqual(KafkaClient1.__name__, "KafkaClient")
 | 
			
		||||
        from kafka.client import SimpleClient as SimpleClient1
 | 
			
		||||
        self.assertEqual(SimpleClient1.__name__, "SimpleClient")
 | 
			
		||||
 | 
			
		||||
        from kafka.codec import gzip_encode as gzip_encode1
 | 
			
		||||
        self.assertEqual(gzip_encode1.__name__, "gzip_encode")
 | 
			
		||||
 | 
			
		||||
        from kafka import KafkaClient as KafkaClient2
 | 
			
		||||
        self.assertEqual(KafkaClient2.__name__, "KafkaClient")
 | 
			
		||||
        from kafka import SimpleClient as SimpleClient2
 | 
			
		||||
        self.assertEqual(SimpleClient2.__name__, "SimpleClient")
 | 
			
		||||
 | 
			
		||||
        from kafka.codec import snappy_encode
 | 
			
		||||
        self.assertEqual(snappy_encode.__name__, "snappy_encode")
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user