93 lines
4.8 KiB
Java
93 lines
4.8 KiB
Java
/*
|
|
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
|
*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
package com.hpcloud.mon.persister.consumer;
|
|
|
|
import com.hpcloud.mon.persister.configuration.KafkaConfiguration;
|
|
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
|
import kafka.consumer.Consumer;
|
|
import kafka.consumer.ConsumerConfig;
|
|
import kafka.consumer.KafkaStream;
|
|
import kafka.javaapi.consumer.ConsumerConnector;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Properties;
|
|
|
|
public class KafkaStreams {
|
|
private static final String KAFKA_CONFIGURATION = "Kafka configuration:";
|
|
private static final Logger logger = LoggerFactory.getLogger(KafkaStreams.class);
|
|
|
|
private final ConsumerConnector consumerConnector;
|
|
private final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap;
|
|
|
|
public KafkaStreams(MonPersisterConfiguration configuration) {
|
|
Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration());
|
|
ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties);
|
|
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
|
|
Map<String, Integer> topicCountMap = new HashMap<>();
|
|
Integer numThreads = configuration.getKafkaConfiguration().getNumThreads();
|
|
topicCountMap.put("metrics", (int) numThreads);
|
|
topicCountMap.put("alarm-state-transitions", (int) numThreads);
|
|
consumerMap = consumerConnector.createMessageStreams(topicCountMap);
|
|
}
|
|
|
|
public final Map<String, List<KafkaStream<byte[], byte[]>>> getStreams() {
|
|
return consumerMap;
|
|
}
|
|
|
|
private ConsumerConfig createConsumerConfig(Properties kafkaProperties) {
|
|
return new ConsumerConfig(kafkaProperties);
|
|
}
|
|
|
|
private Properties createKafkaProperties(KafkaConfiguration kafkaConfiguration) {
|
|
Properties properties = new Properties();
|
|
|
|
properties.put("group.id", kafkaConfiguration.getGroupId());
|
|
properties.put("zookeeper.connect", kafkaConfiguration.getZookeeperConnect());
|
|
properties.put("consumer.id", kafkaConfiguration.getConsumerId());
|
|
properties.put("socket.timeout.ms", kafkaConfiguration.getSocketTimeoutMs().toString());
|
|
properties.put("socket.receive.buffer.bytes", kafkaConfiguration.getSocketReceiveBufferBytes().toString());
|
|
properties.put("fetch.message.max.bytes", kafkaConfiguration.getFetchMessageMaxBytes().toString());
|
|
properties.put("auto.commit.enable", kafkaConfiguration.getAutoCommitEnable().toString());
|
|
properties.put("auto.commit.interval.ms", kafkaConfiguration.getAutoCommitIntervalMs().toString());
|
|
properties.put("queued.max.message.chunks", kafkaConfiguration.getQueuedMaxMessageChunks().toString());
|
|
properties.put("rebalance.max.retries", kafkaConfiguration.getRebalanceMaxRetries().toString());
|
|
properties.put("fetch.min.bytes", kafkaConfiguration.getFetchMinBytes().toString());
|
|
properties.put("fetch.wait.max.ms", kafkaConfiguration.getFetchWaitMaxMs().toString());
|
|
properties.put("rebalance.backoff.ms", kafkaConfiguration.getRebalanceBackoffMs().toString());
|
|
properties.put("refresh.leader.backoff.ms", kafkaConfiguration.getRefreshLeaderBackoffMs().toString());
|
|
properties.put("auto.offset.reset", kafkaConfiguration.getAutoOffsetReset());
|
|
properties.put("consumer.timeout.ms", kafkaConfiguration.getConsumerTimeoutMs().toString());
|
|
properties.put("client.id", kafkaConfiguration.getClientId());
|
|
properties.put("zookeeper.session.timeout.ms", kafkaConfiguration.getZookeeperSessionTimeoutMs().toString());
|
|
properties.put("zookeeper.connection.timeout.ms", kafkaConfiguration.getZookeeperConnectionTimeoutMs().toString());
|
|
properties.put("zookeeper.sync.time.ms", kafkaConfiguration.getZookeeperSyncTimeMs().toString());
|
|
|
|
for (String key : properties.stringPropertyNames()) {
|
|
logger.info(KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key));
|
|
}
|
|
|
|
return properties;
|
|
}
|
|
}
|