From fcb4de17277cb31eae4943bdbacde5682b480daa Mon Sep 17 00:00:00 2001 From: Deklan Dieterly Date: Mon, 24 Feb 2014 11:32:24 -0700 Subject: [PATCH] Basic functionality for sending string message to DB. DB schema 'test'. DB table 'test.metric'. DB column 'test.metric.metric'. --- pom.xml | 17 ++++++- .../java/com/hpcloud/DisruptorFactory.java | 51 +++++++++++++++++++ src/main/java/com/hpcloud/KafkaConsumer.java | 4 +- .../hpcloud/MonPersisterConfiguration.java | 11 ++++ .../java/com/hpcloud/MonPersisterModule.java | 35 +++++++++++-- .../java/com/hpcloud/MonPersisterService.java | 32 +----------- .../java/com/hpcloud/StringEventHandler.java | 35 ++++++++++++- .../hpcloud/StringEventHandlerFactory.java | 9 ++++ .../com/hpcloud/VerticaMetricRepository.java | 32 ++++++++++++ .../java/com/hpcloud/VerticaRepository.java | 31 +++++++++++ .../com/hpcloud/mon-persister-config.yml | 33 +++++++++++- 11 files changed, 249 insertions(+), 41 deletions(-) create mode 100644 src/main/java/com/hpcloud/DisruptorFactory.java create mode 100644 src/main/java/com/hpcloud/StringEventHandlerFactory.java create mode 100644 src/main/java/com/hpcloud/VerticaMetricRepository.java create mode 100644 src/main/java/com/hpcloud/VerticaRepository.java diff --git a/pom.xml b/pom.xml index a8f833ad..2c491e3f 100644 --- a/pom.xml +++ b/pom.xml @@ -40,7 +40,12 @@ 3.0 - org.mockito + com.google.inject.extensions + guice-assistedinject + 3.0 + + + org.mockito mockito-all 1.9.5 @@ -54,6 +59,16 @@ disruptor 3.2.0 + + com.yammer.dropwizard + dropwizard-jdbi + 0.6.2 + + + com.vertica + vertica-jdbc + 6.1.0 + diff --git a/src/main/java/com/hpcloud/DisruptorFactory.java b/src/main/java/com/hpcloud/DisruptorFactory.java new file mode 100644 index 00000000..47a45375 --- /dev/null +++ b/src/main/java/com/hpcloud/DisruptorFactory.java @@ -0,0 +1,51 @@ +package com.hpcloud; + +import com.google.inject.Inject; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.EventHandlerGroup; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +public class DisruptorFactory { + + private MonPersisterConfiguration configuration; + private StringEventHandlerFactory stringEventHandlerFactory; + private Disruptor instance; + + @Inject + public DisruptorFactory(MonPersisterConfiguration configuration, + StringEventHandlerFactory stringEventHandlerFactory) { + this.configuration = configuration; + this.stringEventHandlerFactory = stringEventHandlerFactory; + } + + public synchronized Disruptor create() { + if (instance == null) { + + Executor executor = Executors.newCachedThreadPool(); + StringEventFactory stringEventFactory = new StringEventFactory(); + + int buffersize = configuration.getDisruptorConfiguration().bufferSize; + Disruptor disruptor = new Disruptor(stringEventFactory, buffersize, executor); + + int batchSize = configuration.getVerticaOutputProcessorConfiguration().batchSize; + int numOutputProcessors = configuration.getVerticaOutputProcessorConfiguration().numProcessors; + EventHandlerGroup handlerGroup = null; + for (int i = 0; i < numOutputProcessors; ++i) { + + StringEventHandler stringEventHandler = stringEventHandlerFactory.create(i, numOutputProcessors, batchSize); + + if (handlerGroup == null) { + handlerGroup = disruptor.handleEventsWith(stringEventHandler); + } else { + handlerGroup.then(stringEventHandler); + } + + } + disruptor.start(); + instance = disruptor; + } + return instance; + } +} diff --git a/src/main/java/com/hpcloud/KafkaConsumer.java b/src/main/java/com/hpcloud/KafkaConsumer.java index 02a6dc17..7bca6705 100644 --- a/src/main/java/com/hpcloud/KafkaConsumer.java +++ b/src/main/java/com/hpcloud/KafkaConsumer.java @@ -27,11 +27,11 @@ public class KafkaConsumer { private final Disruptor disruptor; @Inject - public KafkaConsumer(MonPersisterConfiguration configuration, Disruptor disruptor) { + public KafkaConsumer(MonPersisterConfiguration configuration, DisruptorFactory disruptorFactory) { this.topic = configuration.getKafkaConfiguration().topic; this.numThreads = configuration.getKafkaConfiguration().numThreads; - this.disruptor = disruptor; + this.disruptor = disruptorFactory.create(); Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration()); ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties); this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); diff --git a/src/main/java/com/hpcloud/MonPersisterConfiguration.java b/src/main/java/com/hpcloud/MonPersisterConfiguration.java index d3ab843c..9fe648d7 100644 --- a/src/main/java/com/hpcloud/MonPersisterConfiguration.java +++ b/src/main/java/com/hpcloud/MonPersisterConfiguration.java @@ -2,6 +2,7 @@ package com.hpcloud; import com.fasterxml.jackson.annotation.JsonProperty; import com.yammer.dropwizard.config.Configuration; +import com.yammer.dropwizard.db.DatabaseConfiguration; import javax.validation.Valid; import javax.validation.constraints.NotNull; @@ -41,4 +42,14 @@ public class MonPersisterConfiguration extends Configuration { public VerticaOutputProcessorConfiguration getVerticaOutputProcessorConfiguration() { return verticaOutputProcessorConfiguration; } + + @Valid + @NotNull + @JsonProperty + private DatabaseConfiguration databaseConfiguration = new DatabaseConfiguration(); + + public DatabaseConfiguration getDatabaseConfiguration() { + return databaseConfiguration; + } + } diff --git a/src/main/java/com/hpcloud/MonPersisterModule.java b/src/main/java/com/hpcloud/MonPersisterModule.java index f5a26888..14301869 100644 --- a/src/main/java/com/hpcloud/MonPersisterModule.java +++ b/src/main/java/com/hpcloud/MonPersisterModule.java @@ -1,23 +1,48 @@ package com.hpcloud; import com.google.inject.AbstractModule; -import com.lmax.disruptor.dsl.Disruptor; +import com.google.inject.Provider; +import com.google.inject.ProvisionException; +import com.google.inject.Scopes; +import com.google.inject.assistedinject.FactoryModuleBuilder; +import com.yammer.dropwizard.config.Environment; +import com.yammer.dropwizard.jdbi.DBIFactory; +import org.skife.jdbi.v2.DBI; public class MonPersisterModule extends AbstractModule { private final MonPersisterConfiguration configuration; - private final Disruptor disruptor; + private final Environment environment; - public MonPersisterModule(MonPersisterConfiguration configuration, Disruptor disruptor) { + public MonPersisterModule(MonPersisterConfiguration configuration, Environment environment) { this.configuration = configuration; - this.disruptor = disruptor; + this.environment = environment; } @Override protected void configure() { + bind(MonPersisterConfiguration.class).toInstance(configuration); - bind(Disruptor.class).toInstance(disruptor); + + install(new FactoryModuleBuilder() + .implement(StringEventHandler.class, StringEventHandler.class) + .build(StringEventHandlerFactory.class)); + + bind(DisruptorFactory.class); + bind(MonConsumer.class); + bind(DBI.class).toProvider(new Provider() { + @Override + public DBI get() { + try { + return new DBIFactory().build(environment, configuration.getDatabaseConfiguration(), "vertica"); + } catch (ClassNotFoundException e) { + throw new ProvisionException("Failed to provision DBI", e); + } + } + }).in(Scopes.SINGLETON); + + } } diff --git a/src/main/java/com/hpcloud/MonPersisterService.java b/src/main/java/com/hpcloud/MonPersisterService.java index cbdd15a8..85591c6d 100644 --- a/src/main/java/com/hpcloud/MonPersisterService.java +++ b/src/main/java/com/hpcloud/MonPersisterService.java @@ -2,15 +2,10 @@ package com.hpcloud; import com.google.inject.Guice; import com.google.inject.Injector; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.EventHandlerGroup; import com.yammer.dropwizard.Service; import com.yammer.dropwizard.config.Bootstrap; import com.yammer.dropwizard.config.Environment; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - public class MonPersisterService extends Service { public static void main(String[] args) throws Exception { @@ -25,9 +20,7 @@ public class MonPersisterService extends Service { @Override public void run(MonPersisterConfiguration configuration, Environment environment) throws Exception { - Disruptor disruptor = createDisruptor(configuration); - - Injector injector = Guice.createInjector(new MonPersisterModule(configuration, disruptor)); + Injector injector = Guice.createInjector(new MonPersisterModule(configuration, environment)); // Sample resource. environment.addResource(new Resource()); @@ -38,27 +31,4 @@ public class MonPersisterService extends Service { environment.manage(monConsumer); } - private Disruptor createDisruptor(MonPersisterConfiguration configuration) { - Executor executor = Executors.newCachedThreadPool(); - StringEventFactory stringEventFactory = new StringEventFactory(); - - int buffersize = configuration.getDisruptorConfiguration().bufferSize; - Disruptor disruptor = new Disruptor(stringEventFactory, buffersize, executor); - - int numOutputProcessors = configuration.getVerticaOutputProcessorConfiguration().numProcessors; - EventHandlerGroup handlerGroup = null; - for (int i = 0; i < numOutputProcessors; ++i) { - - StringEventHandler stringEventHandler = new StringEventHandler(); - - if (handlerGroup == null) { - handlerGroup = disruptor.handleEventsWith(stringEventHandler); - } else { - handlerGroup.then(stringEventHandler); - } - - } - disruptor.start(); - return disruptor; - } } diff --git a/src/main/java/com/hpcloud/StringEventHandler.java b/src/main/java/com/hpcloud/StringEventHandler.java index b47969f9..12b89886 100644 --- a/src/main/java/com/hpcloud/StringEventHandler.java +++ b/src/main/java/com/hpcloud/StringEventHandler.java @@ -1,11 +1,44 @@ package com.hpcloud; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; import com.lmax.disruptor.EventHandler; public class StringEventHandler implements EventHandler { + private final int ordinal; + private final int numProcessors; + private final int batchSize; + + VerticaMetricRepository verticaMetricRepository; + + @Inject + public StringEventHandler(VerticaMetricRepository verticaMetricRepository, + @Assisted("ordinal") int ordinal, + @Assisted("numProcessors") int numProcessors, + @Assisted("batchSize") int batchSize) { + + this.verticaMetricRepository = verticaMetricRepository; + this.ordinal = ordinal; + this.numProcessors = numProcessors; + this.batchSize = batchSize; + + } + @Override - public void onEvent(StringEvent stringEvent, long l, boolean b) throws Exception { + public void onEvent(StringEvent stringEvent, long sequence, boolean b) throws Exception { + + if (((sequence / batchSize) % this.numProcessors) != this.ordinal) { + return; + } + System.out.println("Event: " + stringEvent.getValue()); + verticaMetricRepository.addToBatch(stringEvent.getValue()); + + if (sequence % batchSize == (batchSize - 1)) { + verticaMetricRepository.commitBatch(); + } + + } } diff --git a/src/main/java/com/hpcloud/StringEventHandlerFactory.java b/src/main/java/com/hpcloud/StringEventHandlerFactory.java new file mode 100644 index 00000000..7bbf8f81 --- /dev/null +++ b/src/main/java/com/hpcloud/StringEventHandlerFactory.java @@ -0,0 +1,9 @@ +package com.hpcloud; + +import com.google.inject.assistedinject.Assisted; + +public interface StringEventHandlerFactory { + StringEventHandler create(@Assisted("ordinal") int ordinal, + @Assisted("numProcessors") int numProcessors, + @Assisted("batchSize") int batchSize); +} diff --git a/src/main/java/com/hpcloud/VerticaMetricRepository.java b/src/main/java/com/hpcloud/VerticaMetricRepository.java new file mode 100644 index 00000000..ad459db5 --- /dev/null +++ b/src/main/java/com/hpcloud/VerticaMetricRepository.java @@ -0,0 +1,32 @@ +package com.hpcloud; + +import org.skife.jdbi.v2.DBI; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import java.security.NoSuchAlgorithmException; + +public class VerticaMetricRepository extends VerticaRepository { + + private static final Logger logger = LoggerFactory.getLogger(VerticaMetricRepository.class); + + private static final String SQL_INSERT_INTO_METRICS = + "INSERT INTO test.metric VALUES (:metric)"; + + private static final String METRIC_COLUMN_NAME = "metric"; + + @Inject + public VerticaMetricRepository(DBI dbi) throws NoSuchAlgorithmException { + super(dbi); + batch = handle.prepareBatch(SQL_INSERT_INTO_METRICS); + } + + public void addToBatch(String aString) { + batch.add().bind(METRIC_COLUMN_NAME, aString); + } + + public void commitBatch() { + batch.execute(); + } +} diff --git a/src/main/java/com/hpcloud/VerticaRepository.java b/src/main/java/com/hpcloud/VerticaRepository.java new file mode 100644 index 00000000..66403d06 --- /dev/null +++ b/src/main/java/com/hpcloud/VerticaRepository.java @@ -0,0 +1,31 @@ +package com.hpcloud; + +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VerticaRepository { + protected DBI dbi; + protected Handle handle; + protected PreparedBatch batch; + private static final Logger logger = LoggerFactory.getLogger(VerticaRepository.class); + + public VerticaRepository(DBI dbi) { + this.dbi = dbi; + this.handle = dbi.open(); + this.handle.execute("SET TIME ZONE TO 'UTC'"); + } + + public VerticaRepository() { + } + + public void setDBI(DBI dbi) + throws Exception { + this.dbi = dbi; + this.handle = dbi.open(); + } + + +} diff --git a/src/main/resources/com/hpcloud/mon-persister-config.yml b/src/main/resources/com/hpcloud/mon-persister-config.yml index 7b4673ce..118bcb94 100644 --- a/src/main/resources/com/hpcloud/mon-persister-config.yml +++ b/src/main/resources/com/hpcloud/mon-persister-config.yml @@ -31,9 +31,40 @@ disruptorConfiguration: numThreads: 1 verticaOutputProcessorConfiguration: - batchSize: 512 + batchSize: 10 numProcessors: 1 +databaseConfiguration: + driverClass: com.vertica.jdbc.Driver +# url: jdbc:vertica://mon-aw1rdd1-vertica0001.rndd.aw1.hpcloud.net:5433/som + url: jdbc:vertica://15.185.94.245:5433/som + user: persister + password: password + properties: + ssl: false + # the maximum amount of time to wait on an empty pool before throwing an exception + maxWaitForConnection: 1s + + # the SQL query to run when validating a connection's liveness + validationQuery: "/* MyService Health Check */ SELECT 1" + + # the minimum number of connections to keep open + minSize: 8 + + # the maximum number of connections to keep open + + + maxSize: 41 + + # whether or not idle connections should be validated + checkConnectionWhileIdle: false + + # how long a connection must be held before it can be validated + checkConnectionHealthWhenIdleFor: 10s + + # the maximum lifetime of an idle connection + closeConnectionIfIdleFor: 1 minute + # Logging settings. logging: