Make Disruptor a Provider.

This commit is contained in:
Deklan Dieterly 2014-02-24 15:50:08 -07:00
parent b87ae080ba
commit 411d72f10c
5 changed files with 16 additions and 16 deletions

View File

@ -9,9 +9,10 @@ import com.hpcloud.configuration.MonPersisterConfiguration;
import com.hpcloud.consumer.KafkaConsumerRunnableBasic; import com.hpcloud.consumer.KafkaConsumerRunnableBasic;
import com.hpcloud.consumer.KafkaConsumerRunnableBasicFactory; import com.hpcloud.consumer.KafkaConsumerRunnableBasicFactory;
import com.hpcloud.consumer.MonConsumer; import com.hpcloud.consumer.MonConsumer;
import com.hpcloud.disruptor.DisruptorFactory; import com.hpcloud.disruptor.DisruptorProvider;
import com.hpcloud.event.StringEventHandler; import com.hpcloud.event.StringEventHandler;
import com.hpcloud.event.StringEventHandlerFactory; import com.hpcloud.event.StringEventHandlerFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.yammer.dropwizard.config.Environment; import com.yammer.dropwizard.config.Environment;
import com.yammer.dropwizard.jdbi.DBIFactory; import com.yammer.dropwizard.jdbi.DBIFactory;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
@ -39,7 +40,8 @@ public class MonPersisterModule extends AbstractModule {
.implement(KafkaConsumerRunnableBasic.class, KafkaConsumerRunnableBasic.class) .implement(KafkaConsumerRunnableBasic.class, KafkaConsumerRunnableBasic.class)
.build(KafkaConsumerRunnableBasicFactory.class)); .build(KafkaConsumerRunnableBasicFactory.class));
bind(DisruptorFactory.class); bind(Disruptor.class)
.toProvider(DisruptorProvider.class);
bind(MonConsumer.class); bind(MonConsumer.class);

View File

@ -3,7 +3,6 @@ package com.hpcloud.consumer;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.hpcloud.configuration.KafkaConfiguration; import com.hpcloud.configuration.KafkaConfiguration;
import com.hpcloud.configuration.MonPersisterConfiguration; import com.hpcloud.configuration.MonPersisterConfiguration;
import com.hpcloud.disruptor.DisruptorFactory;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import kafka.consumer.Consumer; import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerConfig;
@ -33,7 +32,7 @@ public class KafkaConsumer {
@Inject @Inject
public KafkaConsumer(MonPersisterConfiguration configuration, public KafkaConsumer(MonPersisterConfiguration configuration,
DisruptorFactory disruptorFactory, Disruptor disruptor,
KafkaConsumerRunnableBasicFactory kafkaConsumerRunnableBasicFactory) { KafkaConsumerRunnableBasicFactory kafkaConsumerRunnableBasicFactory) {
this.topic = configuration.getKafkaConfiguration().getTopic(); this.topic = configuration.getKafkaConfiguration().getTopic();
@ -42,7 +41,7 @@ public class KafkaConsumer {
this.numThreads = configuration.getKafkaConfiguration().getNumThreads(); this.numThreads = configuration.getKafkaConfiguration().getNumThreads();
logger.info(KAFKA_CONFIGURATION + " numThreads = " + numThreads); logger.info(KAFKA_CONFIGURATION + " numThreads = " + numThreads);
this.disruptor = disruptorFactory.create(); this.disruptor = disruptor;
Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration()); Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration());
ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties); ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties);
this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
@ -51,7 +50,7 @@ public class KafkaConsumer {
} }
public void run() { public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, new Integer(numThreads)); topicCountMap.put(topic, new Integer(numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

View File

@ -2,7 +2,6 @@ package com.hpcloud.consumer;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.Assisted;
import com.hpcloud.disruptor.DisruptorFactory;
import com.hpcloud.event.StringEvent; import com.hpcloud.event.StringEvent;
import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
@ -15,12 +14,12 @@ public class KafkaConsumerRunnableBasic implements Runnable {
private Disruptor disruptor; private Disruptor disruptor;
@Inject @Inject
public KafkaConsumerRunnableBasic(DisruptorFactory disruptorFactory, public KafkaConsumerRunnableBasic(Disruptor disruptor,
@Assisted KafkaStream stream, @Assisted KafkaStream stream,
@Assisted int threadNumber) { @Assisted int threadNumber) {
this.stream = stream; this.stream = stream;
this.threadNumber = threadNumber; this.threadNumber = threadNumber;
this.disruptor = disruptorFactory.create(); this.disruptor = disruptor;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -1,7 +1,6 @@
package com.hpcloud.consumer; package com.hpcloud.consumer;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.hpcloud.disruptor.DisruptorFactory;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.yammer.dropwizard.lifecycle.Managed; import com.yammer.dropwizard.lifecycle.Managed;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -15,9 +14,9 @@ public class MonConsumer implements Managed {
private Disruptor disruptor; private Disruptor disruptor;
@Inject @Inject
public MonConsumer(KafkaConsumer kafkaConsumer, DisruptorFactory disruptorFactory) { public MonConsumer(KafkaConsumer kafkaConsumer, Disruptor disruptor) {
this.kafkaConsumer = kafkaConsumer; this.kafkaConsumer = kafkaConsumer;
this.disruptor = disruptorFactory.create(); this.disruptor = disruptor;
} }
@Override @Override

View File

@ -1,6 +1,7 @@
package com.hpcloud.disruptor; package com.hpcloud.disruptor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Provider;
import com.hpcloud.configuration.MonPersisterConfiguration; import com.hpcloud.configuration.MonPersisterConfiguration;
import com.hpcloud.event.StringEvent; import com.hpcloud.event.StringEvent;
import com.hpcloud.event.StringEventFactory; import com.hpcloud.event.StringEventFactory;
@ -12,20 +13,20 @@ import com.lmax.disruptor.dsl.EventHandlerGroup;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
public class DisruptorFactory { public class DisruptorProvider implements Provider<Disruptor> {
private MonPersisterConfiguration configuration; private MonPersisterConfiguration configuration;
private StringEventHandlerFactory stringEventHandlerFactory; private StringEventHandlerFactory stringEventHandlerFactory;
private Disruptor instance; private Disruptor instance;
@Inject @Inject
public DisruptorFactory(MonPersisterConfiguration configuration, public DisruptorProvider(MonPersisterConfiguration configuration,
StringEventHandlerFactory stringEventHandlerFactory) { StringEventHandlerFactory stringEventHandlerFactory) {
this.configuration = configuration; this.configuration = configuration;
this.stringEventHandlerFactory = stringEventHandlerFactory; this.stringEventHandlerFactory = stringEventHandlerFactory;
} }
public synchronized Disruptor<StringEvent> create() { public synchronized Disruptor<StringEvent> get() {
if (instance == null) { if (instance == null) {
Executor executor = Executors.newCachedThreadPool(); Executor executor = Executors.newCachedThreadPool();