Make properties all Strings for Kafka.

This commit is contained in:
Deklan Dieterly 2014-02-21 11:39:14 -07:00
parent 1a0f0c6359
commit 9d8efa53bc

View File

@ -20,10 +20,9 @@ public class KafkaConsumer {
private static Logger logger = LoggerFactory.getLogger(MonConsumer.class); private static Logger logger = LoggerFactory.getLogger(MonConsumer.class);
private final ConsumerConnector consumerConnector;
private final String topic; private final String topic;
private final Integer numThreads; private final Integer numThreads;
private final ConsumerConnector consumerConnector;
private ExecutorService executorService; private ExecutorService executorService;
@Inject @Inject
@ -68,23 +67,23 @@ public class KafkaConsumer {
properties.put("group.id", kafkaConfiguration.groupId); properties.put("group.id", kafkaConfiguration.groupId);
properties.put("zookeeper.connect", kafkaConfiguration.zookeeperConnect); properties.put("zookeeper.connect", kafkaConfiguration.zookeeperConnect);
properties.put("consumer.id", kafkaConfiguration.consumerId); properties.put("consumer.id", kafkaConfiguration.consumerId);
properties.put("socket.timeout.ms", kafkaConfiguration.socketTimeoutMs); properties.put("socket.timeout.ms", kafkaConfiguration.socketTimeoutMs.toString());
properties.put("socket.receive.buffer.bytes", kafkaConfiguration.socketReceiveBufferBytes); properties.put("socket.receive.buffer.bytes", kafkaConfiguration.socketReceiveBufferBytes.toString());
properties.put("fetch.message.max.bytes", kafkaConfiguration.fetchMessageMaxBytes); properties.put("fetch.message.max.bytes", kafkaConfiguration.fetchMessageMaxBytes.toString());
properties.put("auto.commit.enable", kafkaConfiguration.autoCommitEnable); properties.put("auto.commit.enable", kafkaConfiguration.autoCommitEnable.toString());
properties.put("auto.commit.interval.ms", kafkaConfiguration.autoCommitIntervalMs); properties.put("auto.commit.interval.ms", kafkaConfiguration.autoCommitIntervalMs.toString());
properties.put("queued.max.message.chunks", kafkaConfiguration.queuedMaxMessageChunks); properties.put("queued.max.message.chunks", kafkaConfiguration.queuedMaxMessageChunks.toString());
properties.put("rebalance.max.retries", kafkaConfiguration.rebalanceMaxRetries); properties.put("rebalance.max.retries", kafkaConfiguration.rebalanceMaxRetries.toString());
properties.put("fetch.min.bytes", kafkaConfiguration.fetchMinBytes); properties.put("fetch.min.bytes", kafkaConfiguration.fetchMinBytes.toString());
properties.put("fetch.wait.max.ms", kafkaConfiguration.fetchWaitMaxMs); properties.put("fetch.wait.max.ms", kafkaConfiguration.fetchWaitMaxMs.toString());
properties.put("rebalance.backoff.ms", kafkaConfiguration.rebalanceBackoffMs); properties.put("rebalance.backoff.ms", kafkaConfiguration.rebalanceBackoffMs.toString());
properties.put("refresh.leader.backoff.ms", kafkaConfiguration.refreshLeaderBackoffMs); properties.put("refresh.leader.backoff.ms", kafkaConfiguration.refreshLeaderBackoffMs.toString());
properties.put("auto.offset.reset", kafkaConfiguration.autoOffsetReset); properties.put("auto.offset.reset", kafkaConfiguration.autoOffsetReset);
properties.put("consumer.timeout.ms", kafkaConfiguration.consumerTimeoutMs); properties.put("consumer.timeout.ms", kafkaConfiguration.consumerTimeoutMs.toString());
properties.put("client.id", kafkaConfiguration.clientId); properties.put("client.id", kafkaConfiguration.clientId);
properties.put("zookeeper.session.timeout.ms", kafkaConfiguration.zookeeperSessionTimeoutMs); properties.put("zookeeper.session.timeout.ms", kafkaConfiguration.zookeeperSessionTimeoutMs.toString());
properties.put("zookeeper.connection.timeout.ms", kafkaConfiguration.zookeeperSessionTimeoutMs); properties.put("zookeeper.connection.timeout.ms", kafkaConfiguration.zookeeperSessionTimeoutMs.toString());
properties.put("zookeeper.sync.time.ms", kafkaConfiguration.zookeeperSyncTimeMs); properties.put("zookeeper.sync.time.ms", kafkaConfiguration.zookeeperSyncTimeMs.toString());
return properties; return properties;
} }