From c4fbd727c27849750ee40be23cfa0d1337ee19ff Mon Sep 17 00:00:00 2001 From: Roland Hochmuth Date: Thu, 10 Apr 2014 21:12:10 -0600 Subject: [PATCH] Re-factored to remove some duplicate code. --- .../AlarmStateTransitionsConsumer.java | 25 +--------- .../mon/persister/consumer/Consumer.java | 33 +++++++++++++ .../KafkaAlarmStateTransitionConsumer.java | 46 +++++------------ .../mon/persister/consumer/KafkaConsumer.java | 49 +++++++++++++++++++ .../consumer/KafkaMetricsConsumer.java | 46 +++++------------ ...kaMetricsConsumerRunnableBasicFactory.java | 1 - .../persister/consumer/MetricsConsumer.java | 25 +--------- 7 files changed, 108 insertions(+), 117 deletions(-) create mode 100644 src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java create mode 100644 src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/AlarmStateTransitionsConsumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/AlarmStateTransitionsConsumer.java index 5d864f98..51102138 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/AlarmStateTransitionsConsumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/AlarmStateTransitionsConsumer.java @@ -2,32 +2,11 @@ package com.hpcloud.mon.persister.consumer; import com.google.inject.Inject; import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor; -import com.yammer.dropwizard.lifecycle.Managed; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class AlarmStateTransitionsConsumer implements Managed { - - private static final Logger logger = LoggerFactory.getLogger(AlarmStateTransitionsConsumer.class); - private final KafkaAlarmStateTransitionConsumer consumer; - private final AlarmStateHistoryDisruptor disruptor; +public class AlarmStateTransitionsConsumer extends Consumer { @Inject public AlarmStateTransitionsConsumer(KafkaAlarmStateTransitionConsumer kafkaConsumer, AlarmStateHistoryDisruptor disruptor) { - this.consumer = kafkaConsumer; - this.disruptor = disruptor; - } - - @Override - public void start() throws Exception { - logger.debug("start"); - consumer.run(); - } - - @Override - public void stop() throws Exception { - logger.debug("stop"); - consumer.stop(); - disruptor.shutdown(); + super(kafkaConsumer, disruptor); } } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java new file mode 100644 index 00000000..59ad05ef --- /dev/null +++ b/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java @@ -0,0 +1,33 @@ +package com.hpcloud.mon.persister.consumer; + +import com.google.inject.Inject; +import com.lmax.disruptor.dsl.Disruptor; +import com.yammer.dropwizard.lifecycle.Managed; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Consumer implements Managed { + + private static final Logger logger = LoggerFactory.getLogger(Consumer.class); + private final KafkaConsumer consumer; + private final Disruptor disruptor; + + @Inject + public Consumer(KafkaConsumer kafkaConsumer, Disruptor disruptor) { + this.consumer = kafkaConsumer; + this.disruptor = disruptor; + } + + @Override + public void start() throws Exception { + logger.debug("start"); + consumer.run(); + } + + @Override + public void stop() throws Exception { + logger.debug("stop"); + consumer.stop(); + disruptor.shutdown(); + } +} diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumer.java index f213f6e9..7d033f06 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumer.java @@ -3,48 +3,24 @@ package com.hpcloud.mon.persister.consumer; import com.google.inject.Inject; import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; import kafka.consumer.KafkaStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class KafkaAlarmStateTransitionConsumer { - - private static final String KAFKA_CONFIGURATION = "Kafka configuration:"; - private static final Logger logger = LoggerFactory.getLogger(KafkaAlarmStateTransitionConsumer.class); - - private final Integer numThreads; - private ExecutorService executorService; - private final KafkaAlarmStateTransitionConsumerRunnableBasicFactory kafkaConsumerRunnableBasicFactory; - @Inject - private KafkaStreams kafkaStreams; +public class KafkaAlarmStateTransitionConsumer extends KafkaConsumer { @Inject - public KafkaAlarmStateTransitionConsumer(MonPersisterConfiguration configuration, - KafkaAlarmStateTransitionConsumerRunnableBasicFactory kafkaConsumerRunnableBasicFactory) { + private KafkaAlarmStateTransitionConsumerRunnableBasicFactory factory; - this.numThreads = configuration.getKafkaConfiguration().getNumThreads(); - logger.info(KAFKA_CONFIGURATION + " numThreads = " + numThreads); - - this.kafkaConsumerRunnableBasicFactory = kafkaConsumerRunnableBasicFactory; + @Inject + public KafkaAlarmStateTransitionConsumer(MonPersisterConfiguration configuration) { + super(configuration); } - public void run() { - List> streams = kafkaStreams.getStreams().get("alarm-state-transitions"); - executorService = Executors.newFixedThreadPool(numThreads); - - int threadNumber = 0; - for (final KafkaStream stream : streams) { - executorService.submit(kafkaConsumerRunnableBasicFactory.create(stream, threadNumber)); - threadNumber++; - } + @Override + protected Runnable createRunnable(KafkaStream stream, int threadNumber) { + return factory.create(stream, threadNumber); } - public void stop() { - if (executorService != null) { - executorService.shutdown(); - } + @Override + protected String getStreamName() { + return "alarm-state-transitions"; } } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java new file mode 100644 index 00000000..4c612821 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java @@ -0,0 +1,49 @@ +package com.hpcloud.mon.persister.consumer; + +import com.google.inject.Inject; +import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; +import kafka.consumer.KafkaStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public abstract class KafkaConsumer { + + private static final String KAFKA_CONFIGURATION = "Kafka configuration:"; + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + + private final Integer numThreads; + private ExecutorService executorService; + @Inject + private KafkaStreams kafkaStreams; + + protected abstract Runnable createRunnable(KafkaStream stream, int threadNumber); + protected abstract String getStreamName(); + + @Inject + public KafkaConsumer(MonPersisterConfiguration configuration) { + + this.numThreads = configuration.getKafkaConfiguration().getNumThreads(); + logger.info(KAFKA_CONFIGURATION + " numThreads = " + numThreads); + } + + public void run() { + List> streams = kafkaStreams.getStreams().get(getStreamName()); + executorService = Executors.newFixedThreadPool(numThreads); + + int threadNumber = 0; + for (final KafkaStream stream : streams) { + executorService.submit(createRunnable(stream, threadNumber)); + threadNumber++; + } + } + + public void stop() { + if (executorService != null) { + executorService.shutdown(); + } + } +} diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumer.java index 7229319b..13aed4d8 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumer.java @@ -3,48 +3,24 @@ package com.hpcloud.mon.persister.consumer; import com.google.inject.Inject; import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; import kafka.consumer.KafkaStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class KafkaMetricsConsumer { - - private static final String KAFKA_CONFIGURATION = "Kafka configuration:"; - private static final Logger logger = LoggerFactory.getLogger(KafkaMetricsConsumer.class); - - private final Integer numThreads; - private ExecutorService executorService; - private final KafkaMetricsConsumerRunnableBasicFactory kafkaConsumerRunnableBasicFactory; - @Inject - private KafkaStreams kafkaStreams; +public class KafkaMetricsConsumer extends KafkaConsumer { @Inject - public KafkaMetricsConsumer(MonPersisterConfiguration configuration, - KafkaMetricsConsumerRunnableBasicFactory kafkaConsumerRunnableBasicFactory) { + private KafkaMetricsConsumerRunnableBasicFactory factory; - this.numThreads = configuration.getKafkaConfiguration().getNumThreads(); - logger.info(KAFKA_CONFIGURATION + " numThreads = " + numThreads); - - this.kafkaConsumerRunnableBasicFactory = kafkaConsumerRunnableBasicFactory; + @Inject + public KafkaMetricsConsumer(MonPersisterConfiguration configuration) { + super(configuration); } - public void run() { - List> streams = kafkaStreams.getStreams().get("metrics"); - executorService = Executors.newFixedThreadPool(numThreads); - - int threadNumber = 0; - for (final KafkaStream stream : streams) { - executorService.submit(kafkaConsumerRunnableBasicFactory.create(stream, threadNumber)); - threadNumber++; - } + @Override + protected Runnable createRunnable(KafkaStream stream, int threadNumber) { + return factory.create(stream, threadNumber); } - public void stop() { - if (executorService != null) { - executorService.shutdown(); - } + @Override + protected String getStreamName() { + return "metrics"; } } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerRunnableBasicFactory.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerRunnableBasicFactory.java index 861b9c10..97bf74b1 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerRunnableBasicFactory.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerRunnableBasicFactory.java @@ -4,5 +4,4 @@ import kafka.consumer.KafkaStream; public interface KafkaMetricsConsumerRunnableBasicFactory { KafkaMetricsConsumerRunnableBasic create(KafkaStream stream, int threadNumber); - } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java index 3edb14c9..02fd4f1e 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java @@ -2,32 +2,11 @@ package com.hpcloud.mon.persister.consumer; import com.google.inject.Inject; import com.hpcloud.mon.persister.disruptor.MetricDisruptor; -import com.yammer.dropwizard.lifecycle.Managed; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class MetricsConsumer implements Managed { - - private static final Logger logger = LoggerFactory.getLogger(MetricsConsumer.class); - private final KafkaMetricsConsumer consumer; - private final MetricDisruptor disruptor; +public class MetricsConsumer extends Consumer { @Inject public MetricsConsumer(KafkaMetricsConsumer kafkaConsumer, MetricDisruptor disruptor) { - this.consumer = kafkaConsumer; - this.disruptor = disruptor; - } - - @Override - public void start() throws Exception { - logger.debug("start"); - consumer.run(); - } - - @Override - public void stop() throws Exception { - logger.debug("stop"); - consumer.stop(); - disruptor.shutdown(); + super(kafkaConsumer, disruptor); } }