diff --git a/src/main/java/com/hpcloud/MonPersisterModule.java b/src/main/java/com/hpcloud/MonPersisterModule.java index 5b98d3c6..5d61a4e4 100644 --- a/src/main/java/com/hpcloud/MonPersisterModule.java +++ b/src/main/java/com/hpcloud/MonPersisterModule.java @@ -9,9 +9,10 @@ import com.hpcloud.configuration.MonPersisterConfiguration; import com.hpcloud.consumer.KafkaConsumerRunnableBasic; import com.hpcloud.consumer.KafkaConsumerRunnableBasicFactory; import com.hpcloud.consumer.MonConsumer; -import com.hpcloud.disruptor.DisruptorFactory; +import com.hpcloud.disruptor.DisruptorProvider; import com.hpcloud.event.StringEventHandler; import com.hpcloud.event.StringEventHandlerFactory; +import com.lmax.disruptor.dsl.Disruptor; import com.yammer.dropwizard.config.Environment; import com.yammer.dropwizard.jdbi.DBIFactory; import org.skife.jdbi.v2.DBI; @@ -39,7 +40,8 @@ public class MonPersisterModule extends AbstractModule { .implement(KafkaConsumerRunnableBasic.class, KafkaConsumerRunnableBasic.class) .build(KafkaConsumerRunnableBasicFactory.class)); - bind(DisruptorFactory.class); + bind(Disruptor.class) + .toProvider(DisruptorProvider.class); bind(MonConsumer.class); diff --git a/src/main/java/com/hpcloud/consumer/KafkaConsumer.java b/src/main/java/com/hpcloud/consumer/KafkaConsumer.java index bd78d1a7..bbb48954 100644 --- a/src/main/java/com/hpcloud/consumer/KafkaConsumer.java +++ b/src/main/java/com/hpcloud/consumer/KafkaConsumer.java @@ -3,7 +3,6 @@ package com.hpcloud.consumer; import com.google.inject.Inject; import com.hpcloud.configuration.KafkaConfiguration; import com.hpcloud.configuration.MonPersisterConfiguration; -import com.hpcloud.disruptor.DisruptorFactory; import com.lmax.disruptor.dsl.Disruptor; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; @@ -33,7 +32,7 @@ public class KafkaConsumer { @Inject public KafkaConsumer(MonPersisterConfiguration configuration, - DisruptorFactory disruptorFactory, + Disruptor disruptor, KafkaConsumerRunnableBasicFactory kafkaConsumerRunnableBasicFactory) { this.topic = configuration.getKafkaConfiguration().getTopic(); @@ -42,7 +41,7 @@ public class KafkaConsumer { this.numThreads = configuration.getKafkaConfiguration().getNumThreads(); logger.info(KAFKA_CONFIGURATION + " numThreads = " + numThreads); - this.disruptor = disruptorFactory.create(); + this.disruptor = disruptor; Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration()); ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties); this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); @@ -51,7 +50,7 @@ public class KafkaConsumer { } public void run() { - Map topicCountMap = new HashMap(); + Map topicCountMap = new HashMap<>(); topicCountMap.put(topic, new Integer(numThreads)); Map>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); List> streams = consumerMap.get(topic); diff --git a/src/main/java/com/hpcloud/consumer/KafkaConsumerRunnableBasic.java b/src/main/java/com/hpcloud/consumer/KafkaConsumerRunnableBasic.java index 711ed3ab..c3d1873f 100644 --- a/src/main/java/com/hpcloud/consumer/KafkaConsumerRunnableBasic.java +++ b/src/main/java/com/hpcloud/consumer/KafkaConsumerRunnableBasic.java @@ -2,7 +2,6 @@ package com.hpcloud.consumer; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import com.hpcloud.disruptor.DisruptorFactory; import com.hpcloud.event.StringEvent; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.dsl.Disruptor; @@ -15,12 +14,12 @@ public class KafkaConsumerRunnableBasic implements Runnable { private Disruptor disruptor; @Inject - public KafkaConsumerRunnableBasic(DisruptorFactory disruptorFactory, + public KafkaConsumerRunnableBasic(Disruptor disruptor, @Assisted KafkaStream stream, @Assisted int threadNumber) { this.stream = stream; this.threadNumber = threadNumber; - this.disruptor = disruptorFactory.create(); + this.disruptor = disruptor; } @SuppressWarnings("unchecked") diff --git a/src/main/java/com/hpcloud/consumer/MonConsumer.java b/src/main/java/com/hpcloud/consumer/MonConsumer.java index 75e88221..5fde63a2 100644 --- a/src/main/java/com/hpcloud/consumer/MonConsumer.java +++ b/src/main/java/com/hpcloud/consumer/MonConsumer.java @@ -1,7 +1,6 @@ package com.hpcloud.consumer; import com.google.inject.Inject; -import com.hpcloud.disruptor.DisruptorFactory; import com.lmax.disruptor.dsl.Disruptor; import com.yammer.dropwizard.lifecycle.Managed; import org.slf4j.Logger; @@ -15,9 +14,9 @@ public class MonConsumer implements Managed { private Disruptor disruptor; @Inject - public MonConsumer(KafkaConsumer kafkaConsumer, DisruptorFactory disruptorFactory) { + public MonConsumer(KafkaConsumer kafkaConsumer, Disruptor disruptor) { this.kafkaConsumer = kafkaConsumer; - this.disruptor = disruptorFactory.create(); + this.disruptor = disruptor; } @Override diff --git a/src/main/java/com/hpcloud/disruptor/DisruptorFactory.java b/src/main/java/com/hpcloud/disruptor/DisruptorProvider.java similarity index 86% rename from src/main/java/com/hpcloud/disruptor/DisruptorFactory.java rename to src/main/java/com/hpcloud/disruptor/DisruptorProvider.java index 447ed904..084bad08 100644 --- a/src/main/java/com/hpcloud/disruptor/DisruptorFactory.java +++ b/src/main/java/com/hpcloud/disruptor/DisruptorProvider.java @@ -1,6 +1,7 @@ package com.hpcloud.disruptor; import com.google.inject.Inject; +import com.google.inject.Provider; import com.hpcloud.configuration.MonPersisterConfiguration; import com.hpcloud.event.StringEvent; import com.hpcloud.event.StringEventFactory; @@ -12,20 +13,20 @@ import com.lmax.disruptor.dsl.EventHandlerGroup; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -public class DisruptorFactory { +public class DisruptorProvider implements Provider { private MonPersisterConfiguration configuration; private StringEventHandlerFactory stringEventHandlerFactory; private Disruptor instance; @Inject - public DisruptorFactory(MonPersisterConfiguration configuration, - StringEventHandlerFactory stringEventHandlerFactory) { + public DisruptorProvider(MonPersisterConfiguration configuration, + StringEventHandlerFactory stringEventHandlerFactory) { this.configuration = configuration; this.stringEventHandlerFactory = stringEventHandlerFactory; } - public synchronized Disruptor create() { + public synchronized Disruptor get() { if (instance == null) { Executor executor = Executors.newCachedThreadPool();