156 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			156 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import copy
 | |
| import logging
 | |
| import socket
 | |
| import struct
 | |
| from random import shuffle
 | |
| from threading import local
 | |
| 
 | |
| from kafka.common import ConnectionError
 | |
| 
 | |
| log = logging.getLogger("kafka")
 | |
| 
 | |
| DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
 | |
| DEFAULT_KAFKA_PORT = 9092
 | |
| 
 | |
| 
 | |
| def collect_hosts(hosts, randomize=True):
 | |
|     """
 | |
|     Collects a comma-separated set of hosts (host:port) and optionally
 | |
|     randomize the returned list.
 | |
|     """
 | |
| 
 | |
|     if isinstance(hosts, basestring):
 | |
|         hosts = hosts.strip().split(',')
 | |
| 
 | |
|     result = []
 | |
|     for host_port in hosts:
 | |
| 
 | |
|         res = host_port.split(':')
 | |
|         host = res[0]
 | |
|         port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
 | |
|         result.append((host.strip(), port))
 | |
| 
 | |
|     if randomize:
 | |
|         shuffle(result)
 | |
| 
 | |
|     return result
 | |
| 
 | |
| 
 | |
| class KafkaConnection(local):
 | |
|     """
 | |
|     A socket connection to a single Kafka broker
 | |
| 
 | |
|     This class is _not_ thread safe. Each call to `send` must be followed
 | |
|     by a call to `recv` in order to get the correct response. Eventually,
 | |
|     we can do something in here to facilitate multiplexed requests/responses
 | |
|     since the Kafka API includes a correlation id.
 | |
| 
 | |
|     host:    the host name or IP address of a kafka broker
 | |
|     port:    the port number the kafka broker is listening on
 | |
|     timeout: default 120. The socket timeout for sending and receiving data
 | |
|              in seconds. None means no timeout, so a request can block forever.
 | |
|     """
 | |
|     def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
 | |
|         super(KafkaConnection, self).__init__()
 | |
|         self.host = host
 | |
|         self.port = port
 | |
|         self.timeout = timeout
 | |
|         self._sock = None
 | |
| 
 | |
|         self.reinit()
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
 | |
| 
 | |
|     ###################
 | |
|     #   Private API   #
 | |
|     ###################
 | |
| 
 | |
|     def _raise_connection_error(self):
 | |
|         self._dirty = True
 | |
|         raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
 | |
| 
 | |
|     def _read_bytes(self, num_bytes):
 | |
|         bytes_left = num_bytes
 | |
|         responses = []
 | |
| 
 | |
|         log.debug("About to read %d bytes from Kafka", num_bytes)
 | |
|         if self._dirty:
 | |
|             self.reinit()
 | |
| 
 | |
|         while bytes_left:
 | |
|             try:
 | |
|                 data = self._sock.recv(min(bytes_left, 4096))
 | |
|             except socket.error:
 | |
|                 log.exception('Unable to receive data from Kafka')
 | |
|                 self._raise_connection_error()
 | |
| 
 | |
|             if data == '':
 | |
|                 log.error("Not enough data to read this response")
 | |
|                 self._raise_connection_error()
 | |
| 
 | |
|             bytes_left -= len(data)
 | |
|             log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
 | |
|             responses.append(data)
 | |
| 
 | |
|         return ''.join(responses)
 | |
| 
 | |
|     ##################
 | |
|     #   Public API   #
 | |
|     ##################
 | |
| 
 | |
|     # TODO multiplex socket communication to allow for multi-threaded clients
 | |
| 
 | |
|     def send(self, request_id, payload):
 | |
|         "Send a request to Kafka"
 | |
|         log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
 | |
|         try:
 | |
|             if self._dirty:
 | |
|                 self.reinit()
 | |
|             sent = self._sock.sendall(payload)
 | |
|             if sent is not None:
 | |
|                 self._raise_connection_error()
 | |
|         except socket.error:
 | |
|             log.exception('Unable to send payload to Kafka')
 | |
|             self._raise_connection_error()
 | |
| 
 | |
|     def recv(self, request_id):
 | |
|         """
 | |
|         Get a response from Kafka
 | |
|         """
 | |
|         log.debug("Reading response %d from Kafka" % request_id)
 | |
|         # Read the size off of the header
 | |
|         resp = self._read_bytes(4)
 | |
| 
 | |
|         (size,) = struct.unpack('>i', resp)
 | |
| 
 | |
|         # Read the remainder of the response
 | |
|         resp = self._read_bytes(size)
 | |
|         return str(resp)
 | |
| 
 | |
|     def copy(self):
 | |
|         """
 | |
|         Create an inactive copy of the connection object
 | |
|         A reinit() has to be done on the copy before it can be used again
 | |
|         """
 | |
|         c = copy.deepcopy(self)
 | |
|         c._sock = None
 | |
|         return c
 | |
| 
 | |
|     def close(self):
 | |
|         """
 | |
|         Close this connection
 | |
|         """
 | |
|         if self._sock:
 | |
|             self._sock.close()
 | |
| 
 | |
|     def reinit(self):
 | |
|         """
 | |
|         Re-initialize the socket connection
 | |
|         """
 | |
|         self.close()
 | |
|         self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | |
|         self._sock.connect((self.host, self.port))
 | |
|         self._sock.settimeout(self.timeout)
 | |
|         self._dirty = False
 | 
