34 lines
		
	
	
		
			1.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			34 lines
		
	
	
		
			1.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import absolute_import
 | 
						|
 | 
						|
from kafka.protocol.struct import Struct
 | 
						|
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
 | 
						|
from kafka.structs import TopicPartition
 | 
						|
 | 
						|
 | 
						|
class ConsumerProtocolMemberMetadata(Struct):
 | 
						|
    SCHEMA = Schema(
 | 
						|
        ('version', Int16),
 | 
						|
        ('subscription', Array(String('utf-8'))),
 | 
						|
        ('user_data', Bytes))
 | 
						|
 | 
						|
 | 
						|
class ConsumerProtocolMemberAssignment(Struct):
 | 
						|
    SCHEMA = Schema(
 | 
						|
        ('version', Int16),
 | 
						|
        ('assignment', Array(
 | 
						|
            ('topic', String('utf-8')),
 | 
						|
            ('partitions', Array(Int32)))),
 | 
						|
        ('user_data', Bytes))
 | 
						|
 | 
						|
    def partitions(self):
 | 
						|
        return [TopicPartition(topic, partition)
 | 
						|
                for topic, partitions in self.assignment # pylint: disable-msg=no-member
 | 
						|
                for partition in partitions]
 | 
						|
 | 
						|
 | 
						|
class ConsumerProtocol(object):
 | 
						|
    PROTOCOL_TYPE = 'consumer'
 | 
						|
    ASSIGNMENT_STRATEGIES = ('range', 'roundrobin')
 | 
						|
    METADATA = ConsumerProtocolMemberMetadata
 | 
						|
    ASSIGNMENT = ConsumerProtocolMemberAssignment
 |