Re-factored to remove some duplicate code.

This commit is contained in:
Roland Hochmuth
2014-04-10 21:12:10 -06:00
parent c58633526e
commit c4fbd727c2
7 changed files with 108 additions and 117 deletions

View File

@@ -2,32 +2,11 @@ package com.hpcloud.mon.persister.consumer;
import com.google.inject.Inject;
import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor;
import com.yammer.dropwizard.lifecycle.Managed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlarmStateTransitionsConsumer implements Managed {
private static final Logger logger = LoggerFactory.getLogger(AlarmStateTransitionsConsumer.class);
private final KafkaAlarmStateTransitionConsumer consumer;
private final AlarmStateHistoryDisruptor disruptor;
public class AlarmStateTransitionsConsumer extends Consumer {
@Inject
public AlarmStateTransitionsConsumer(KafkaAlarmStateTransitionConsumer kafkaConsumer, AlarmStateHistoryDisruptor disruptor) {
this.consumer = kafkaConsumer;
this.disruptor = disruptor;
}
@Override
public void start() throws Exception {
logger.debug("start");
consumer.run();
}
@Override
public void stop() throws Exception {
logger.debug("stop");
consumer.stop();
disruptor.shutdown();
super(kafkaConsumer, disruptor);
}
}

View File

@@ -0,0 +1,33 @@
package com.hpcloud.mon.persister.consumer;
import com.google.inject.Inject;
import com.lmax.disruptor.dsl.Disruptor;
import com.yammer.dropwizard.lifecycle.Managed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Consumer implements Managed {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
private final KafkaConsumer consumer;
private final Disruptor disruptor;
@Inject
public Consumer(KafkaConsumer kafkaConsumer, Disruptor disruptor) {
this.consumer = kafkaConsumer;
this.disruptor = disruptor;
}
@Override
public void start() throws Exception {
logger.debug("start");
consumer.run();
}
@Override
public void stop() throws Exception {
logger.debug("stop");
consumer.stop();
disruptor.shutdown();
}
}

View File

@@ -3,48 +3,24 @@ package com.hpcloud.mon.persister.consumer;
import com.google.inject.Inject;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import kafka.consumer.KafkaStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class KafkaAlarmStateTransitionConsumer {
private static final String KAFKA_CONFIGURATION = "Kafka configuration:";
private static final Logger logger = LoggerFactory.getLogger(KafkaAlarmStateTransitionConsumer.class);
private final Integer numThreads;
private ExecutorService executorService;
private final KafkaAlarmStateTransitionConsumerRunnableBasicFactory kafkaConsumerRunnableBasicFactory;
@Inject
private KafkaStreams kafkaStreams;
public class KafkaAlarmStateTransitionConsumer extends KafkaConsumer {
@Inject
public KafkaAlarmStateTransitionConsumer(MonPersisterConfiguration configuration,
KafkaAlarmStateTransitionConsumerRunnableBasicFactory kafkaConsumerRunnableBasicFactory) {
private KafkaAlarmStateTransitionConsumerRunnableBasicFactory factory;
this.numThreads = configuration.getKafkaConfiguration().getNumThreads();
logger.info(KAFKA_CONFIGURATION + " numThreads = " + numThreads);
this.kafkaConsumerRunnableBasicFactory = kafkaConsumerRunnableBasicFactory;
@Inject
public KafkaAlarmStateTransitionConsumer(MonPersisterConfiguration configuration) {
super(configuration);
}
public void run() {
List<KafkaStream<byte[], byte[]>> streams = kafkaStreams.getStreams().get("alarm-state-transitions");
executorService = Executors.newFixedThreadPool(numThreads);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executorService.submit(kafkaConsumerRunnableBasicFactory.create(stream, threadNumber));
threadNumber++;
}
@Override
protected Runnable createRunnable(KafkaStream stream, int threadNumber) {
return factory.create(stream, threadNumber);
}
public void stop() {
if (executorService != null) {
executorService.shutdown();
}
@Override
protected String getStreamName() {
return "alarm-state-transitions";
}
}

View File

@@ -0,0 +1,49 @@
package com.hpcloud.mon.persister.consumer;
import com.google.inject.Inject;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import kafka.consumer.KafkaStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public abstract class KafkaConsumer {
private static final String KAFKA_CONFIGURATION = "Kafka configuration:";
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
private final Integer numThreads;
private ExecutorService executorService;
@Inject
private KafkaStreams kafkaStreams;
protected abstract Runnable createRunnable(KafkaStream stream, int threadNumber);
protected abstract String getStreamName();
@Inject
public KafkaConsumer(MonPersisterConfiguration configuration) {
this.numThreads = configuration.getKafkaConfiguration().getNumThreads();
logger.info(KAFKA_CONFIGURATION + " numThreads = " + numThreads);
}
public void run() {
List<KafkaStream<byte[], byte[]>> streams = kafkaStreams.getStreams().get(getStreamName());
executorService = Executors.newFixedThreadPool(numThreads);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executorService.submit(createRunnable(stream, threadNumber));
threadNumber++;
}
}
public void stop() {
if (executorService != null) {
executorService.shutdown();
}
}
}

View File

@@ -3,48 +3,24 @@ package com.hpcloud.mon.persister.consumer;
import com.google.inject.Inject;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import kafka.consumer.KafkaStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class KafkaMetricsConsumer {
private static final String KAFKA_CONFIGURATION = "Kafka configuration:";
private static final Logger logger = LoggerFactory.getLogger(KafkaMetricsConsumer.class);
private final Integer numThreads;
private ExecutorService executorService;
private final KafkaMetricsConsumerRunnableBasicFactory kafkaConsumerRunnableBasicFactory;
@Inject
private KafkaStreams kafkaStreams;
public class KafkaMetricsConsumer extends KafkaConsumer {
@Inject
public KafkaMetricsConsumer(MonPersisterConfiguration configuration,
KafkaMetricsConsumerRunnableBasicFactory kafkaConsumerRunnableBasicFactory) {
private KafkaMetricsConsumerRunnableBasicFactory factory;
this.numThreads = configuration.getKafkaConfiguration().getNumThreads();
logger.info(KAFKA_CONFIGURATION + " numThreads = " + numThreads);
this.kafkaConsumerRunnableBasicFactory = kafkaConsumerRunnableBasicFactory;
@Inject
public KafkaMetricsConsumer(MonPersisterConfiguration configuration) {
super(configuration);
}
public void run() {
List<KafkaStream<byte[], byte[]>> streams = kafkaStreams.getStreams().get("metrics");
executorService = Executors.newFixedThreadPool(numThreads);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executorService.submit(kafkaConsumerRunnableBasicFactory.create(stream, threadNumber));
threadNumber++;
}
@Override
protected Runnable createRunnable(KafkaStream stream, int threadNumber) {
return factory.create(stream, threadNumber);
}
public void stop() {
if (executorService != null) {
executorService.shutdown();
}
@Override
protected String getStreamName() {
return "metrics";
}
}

View File

@@ -4,5 +4,4 @@ import kafka.consumer.KafkaStream;
public interface KafkaMetricsConsumerRunnableBasicFactory {
KafkaMetricsConsumerRunnableBasic create(KafkaStream stream, int threadNumber);
}

View File

@@ -2,32 +2,11 @@ package com.hpcloud.mon.persister.consumer;
import com.google.inject.Inject;
import com.hpcloud.mon.persister.disruptor.MetricDisruptor;
import com.yammer.dropwizard.lifecycle.Managed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MetricsConsumer implements Managed {
private static final Logger logger = LoggerFactory.getLogger(MetricsConsumer.class);
private final KafkaMetricsConsumer consumer;
private final MetricDisruptor disruptor;
public class MetricsConsumer extends Consumer {
@Inject
public MetricsConsumer(KafkaMetricsConsumer kafkaConsumer, MetricDisruptor disruptor) {
this.consumer = kafkaConsumer;
this.disruptor = disruptor;
}
@Override
public void start() throws Exception {
logger.debug("start");
consumer.run();
}
@Override
public void stop() throws Exception {
logger.debug("stop");
consumer.stop();
disruptor.shutdown();
super(kafkaConsumer, disruptor);
}
}