diff --git a/src/main/java/com/hpcloud/DisruptorConfiguration.java b/src/main/java/com/hpcloud/DisruptorConfiguration.java deleted file mode 100644 index 40b279de..00000000 --- a/src/main/java/com/hpcloud/DisruptorConfiguration.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.hpcloud; - -import com.fasterxml.jackson.annotation.JsonProperty; - -public class DisruptorConfiguration { - - @JsonProperty - Integer bufferSize; - - @JsonProperty - Integer numThreads; - - -} diff --git a/src/main/java/com/hpcloud/KafkaConfiguration.java b/src/main/java/com/hpcloud/KafkaConfiguration.java deleted file mode 100644 index e4f62d71..00000000 --- a/src/main/java/com/hpcloud/KafkaConfiguration.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.hpcloud; - -import com.fasterxml.jackson.annotation.JsonProperty; - -public class KafkaConfiguration { - - @JsonProperty - String topic; - - @JsonProperty - Integer numThreads; - - @JsonProperty - String groupId; - - @JsonProperty - String zookeeperConnect; - - @JsonProperty - String consumerId; - - @JsonProperty - Integer socketTimeoutMs; - - @JsonProperty - Integer socketReceiveBufferBytes; - - @JsonProperty - Integer fetchMessageMaxBytes; - - @JsonProperty - Boolean autoCommitEnable; - - @JsonProperty - Integer autoCommitIntervalMs; - - @JsonProperty - Integer queuedMaxMessageChunks; - - @JsonProperty - Integer rebalanceMaxRetries; - - @JsonProperty - Integer fetchMinBytes; - - @JsonProperty - Integer fetchWaitMaxMs; - - @JsonProperty - Integer rebalanceBackoffMs; - - @JsonProperty - Integer refreshLeaderBackoffMs; - - @JsonProperty - String autoOffsetReset; - - @JsonProperty - Integer consumerTimeoutMs; - - @JsonProperty - String clientId; - - @JsonProperty - Integer zookeeperSessionTimeoutMs; - - @JsonProperty - Integer zookeeperConnectionTimeoutMs; - - @JsonProperty - Integer zookeeperSyncTimeMs; -} diff --git a/src/main/java/com/hpcloud/MonPersisterModule.java b/src/main/java/com/hpcloud/MonPersisterModule.java index 14301869..0fc77b83 100644 --- a/src/main/java/com/hpcloud/MonPersisterModule.java +++ b/src/main/java/com/hpcloud/MonPersisterModule.java @@ -5,6 +5,11 @@ import com.google.inject.Provider; import com.google.inject.ProvisionException; import com.google.inject.Scopes; import com.google.inject.assistedinject.FactoryModuleBuilder; +import com.hpcloud.configuration.MonPersisterConfiguration; +import com.hpcloud.consumer.MonConsumer; +import com.hpcloud.disruptor.DisruptorFactory; +import com.hpcloud.event.StringEventHandler; +import com.hpcloud.event.StringEventHandlerFactory; import com.yammer.dropwizard.config.Environment; import com.yammer.dropwizard.jdbi.DBIFactory; import org.skife.jdbi.v2.DBI; diff --git a/src/main/java/com/hpcloud/MonPersisterService.java b/src/main/java/com/hpcloud/MonPersisterService.java index 85591c6d..76879adf 100644 --- a/src/main/java/com/hpcloud/MonPersisterService.java +++ b/src/main/java/com/hpcloud/MonPersisterService.java @@ -2,6 +2,10 @@ package com.hpcloud; import com.google.inject.Guice; import com.google.inject.Injector; +import com.hpcloud.configuration.MonPersisterConfiguration; +import com.hpcloud.consumer.MonConsumer; +import com.hpcloud.healthcheck.SimpleHealthCheck; +import com.hpcloud.resource.Resource; import com.yammer.dropwizard.Service; import com.yammer.dropwizard.config.Bootstrap; import com.yammer.dropwizard.config.Environment; diff --git a/src/main/java/com/hpcloud/configuration/DisruptorConfiguration.java b/src/main/java/com/hpcloud/configuration/DisruptorConfiguration.java new file mode 100644 index 00000000..0a7977d6 --- /dev/null +++ b/src/main/java/com/hpcloud/configuration/DisruptorConfiguration.java @@ -0,0 +1,21 @@ +package com.hpcloud.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class DisruptorConfiguration { + + @JsonProperty + Integer bufferSize; + + public Integer getBufferSize() { + return bufferSize; + } + + @JsonProperty + Integer numProcessors; + + public Integer getNumProcessors() { + return numProcessors; + } + +} diff --git a/src/main/java/com/hpcloud/configuration/KafkaConfiguration.java b/src/main/java/com/hpcloud/configuration/KafkaConfiguration.java new file mode 100644 index 00000000..2be80bc9 --- /dev/null +++ b/src/main/java/com/hpcloud/configuration/KafkaConfiguration.java @@ -0,0 +1,161 @@ +package com.hpcloud.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class KafkaConfiguration { + + @JsonProperty + String topic; + + @JsonProperty + Integer numThreads; + + @JsonProperty + String groupId; + + @JsonProperty + String zookeeperConnect; + + @JsonProperty + String consumerId; + + @JsonProperty + Integer socketTimeoutMs; + + @JsonProperty + Integer socketReceiveBufferBytes; + + @JsonProperty + Integer fetchMessageMaxBytes; + + @JsonProperty + Boolean autoCommitEnable; + + @JsonProperty + Integer autoCommitIntervalMs; + + @JsonProperty + Integer queuedMaxMessageChunks; + + @JsonProperty + Integer rebalanceMaxRetries; + + @JsonProperty + Integer fetchMinBytes; + + @JsonProperty + Integer fetchWaitMaxMs; + + @JsonProperty + Integer rebalanceBackoffMs; + + @JsonProperty + Integer refreshLeaderBackoffMs; + + @JsonProperty + String autoOffsetReset; + + @JsonProperty + Integer consumerTimeoutMs; + + @JsonProperty + String clientId; + + @JsonProperty + Integer zookeeperSessionTimeoutMs; + + @JsonProperty + Integer zookeeperConnectionTimeoutMs; + + @JsonProperty + Integer zookeeperSyncTimeMs; + + public String getTopic() { + return topic; + } + + public Integer getNumThreads() { + return numThreads; + } + + public String getGroupId() { + return groupId; + } + + public String getZookeeperConnect() { + return zookeeperConnect; + } + + public String getConsumerId() { + return consumerId; + } + + public Integer getSocketTimeoutMs() { + return socketTimeoutMs; + } + + public Integer getSocketReceiveBufferBytes() { + return socketReceiveBufferBytes; + } + + public Integer getFetchMessageMaxBytes() { + return fetchMessageMaxBytes; + } + + public Boolean getAutoCommitEnable() { + return autoCommitEnable; + } + + public Integer getAutoCommitIntervalMs() { + return autoCommitIntervalMs; + } + + public Integer getQueuedMaxMessageChunks() { + return queuedMaxMessageChunks; + } + + public Integer getRebalanceMaxRetries() { + return rebalanceMaxRetries; + } + + public Integer getFetchMinBytes() { + return fetchMinBytes; + } + + public Integer getFetchWaitMaxMs() { + return fetchWaitMaxMs; + } + + public Integer getRebalanceBackoffMs() { + return rebalanceBackoffMs; + } + + public Integer getRefreshLeaderBackoffMs() { + return refreshLeaderBackoffMs; + } + + public String getAutoOffsetReset() { + return autoOffsetReset; + } + + public Integer getConsumerTimeoutMs() { + return consumerTimeoutMs; + } + + public String getClientId() { + return clientId; + } + + public Integer getZookeeperSessionTimeoutMs() { + return zookeeperSessionTimeoutMs; + } + + public Integer getZookeeperConnectionTimeoutMs() { + return zookeeperConnectionTimeoutMs; + } + + public Integer getZookeeperSyncTimeMs() { + return zookeeperSyncTimeMs; + } + +} diff --git a/src/main/java/com/hpcloud/MonPersisterConfiguration.java b/src/main/java/com/hpcloud/configuration/MonPersisterConfiguration.java similarity index 97% rename from src/main/java/com/hpcloud/MonPersisterConfiguration.java rename to src/main/java/com/hpcloud/configuration/MonPersisterConfiguration.java index 9fe648d7..7704cd3b 100644 --- a/src/main/java/com/hpcloud/MonPersisterConfiguration.java +++ b/src/main/java/com/hpcloud/configuration/MonPersisterConfiguration.java @@ -1,4 +1,4 @@ -package com.hpcloud; +package com.hpcloud.configuration; import com.fasterxml.jackson.annotation.JsonProperty; import com.yammer.dropwizard.config.Configuration; diff --git a/src/main/java/com/hpcloud/VerticaOutputProcessorConfiguration.java b/src/main/java/com/hpcloud/configuration/VerticaOutputProcessorConfiguration.java similarity index 59% rename from src/main/java/com/hpcloud/VerticaOutputProcessorConfiguration.java rename to src/main/java/com/hpcloud/configuration/VerticaOutputProcessorConfiguration.java index ca735ee3..f40f6c86 100644 --- a/src/main/java/com/hpcloud/VerticaOutputProcessorConfiguration.java +++ b/src/main/java/com/hpcloud/configuration/VerticaOutputProcessorConfiguration.java @@ -1,4 +1,4 @@ -package com.hpcloud; +package com.hpcloud.configuration; import com.fasterxml.jackson.annotation.JsonProperty; @@ -7,6 +7,9 @@ public class VerticaOutputProcessorConfiguration { @JsonProperty Integer batchSize; - @JsonProperty - Integer numProcessors; + public Integer getBatchSize() { + return batchSize; + } + + } diff --git a/src/main/java/com/hpcloud/KafkaConsumer.java b/src/main/java/com/hpcloud/consumer/KafkaConsumer.java similarity index 68% rename from src/main/java/com/hpcloud/KafkaConsumer.java rename to src/main/java/com/hpcloud/consumer/KafkaConsumer.java index 7bca6705..a0eb587c 100644 --- a/src/main/java/com/hpcloud/KafkaConsumer.java +++ b/src/main/java/com/hpcloud/consumer/KafkaConsumer.java @@ -1,6 +1,9 @@ -package com.hpcloud; +package com.hpcloud.consumer; import com.google.inject.Inject; +import com.hpcloud.configuration.KafkaConfiguration; +import com.hpcloud.configuration.MonPersisterConfiguration; +import com.hpcloud.disruptor.DisruptorFactory; import com.lmax.disruptor.dsl.Disruptor; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; @@ -29,8 +32,8 @@ public class KafkaConsumer { @Inject public KafkaConsumer(MonPersisterConfiguration configuration, DisruptorFactory disruptorFactory) { - this.topic = configuration.getKafkaConfiguration().topic; - this.numThreads = configuration.getKafkaConfiguration().numThreads; + this.topic = configuration.getKafkaConfiguration().getTopic(); + this.numThreads = configuration.getKafkaConfiguration().getNumThreads(); this.disruptor = disruptorFactory.create(); Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration()); ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties); @@ -66,26 +69,26 @@ public class KafkaConsumer { private Properties createKafkaProperties(KafkaConfiguration kafkaConfiguration) { Properties properties = new Properties(); - properties.put("group.id", kafkaConfiguration.groupId); - properties.put("zookeeper.connect", kafkaConfiguration.zookeeperConnect); - properties.put("consumer.id", kafkaConfiguration.consumerId); - properties.put("socket.timeout.ms", kafkaConfiguration.socketTimeoutMs.toString()); - properties.put("socket.receive.buffer.bytes", kafkaConfiguration.socketReceiveBufferBytes.toString()); - properties.put("fetch.message.max.bytes", kafkaConfiguration.fetchMessageMaxBytes.toString()); - properties.put("auto.commit.enable", kafkaConfiguration.autoCommitEnable.toString()); - properties.put("auto.commit.interval.ms", kafkaConfiguration.autoCommitIntervalMs.toString()); - properties.put("queued.max.message.chunks", kafkaConfiguration.queuedMaxMessageChunks.toString()); - properties.put("rebalance.max.retries", kafkaConfiguration.rebalanceMaxRetries.toString()); - properties.put("fetch.min.bytes", kafkaConfiguration.fetchMinBytes.toString()); - properties.put("fetch.wait.max.ms", kafkaConfiguration.fetchWaitMaxMs.toString()); - properties.put("rebalance.backoff.ms", kafkaConfiguration.rebalanceBackoffMs.toString()); - properties.put("refresh.leader.backoff.ms", kafkaConfiguration.refreshLeaderBackoffMs.toString()); - properties.put("auto.offset.reset", kafkaConfiguration.autoOffsetReset); - properties.put("consumer.timeout.ms", kafkaConfiguration.consumerTimeoutMs.toString()); - properties.put("client.id", kafkaConfiguration.clientId); - properties.put("zookeeper.session.timeout.ms", kafkaConfiguration.zookeeperSessionTimeoutMs.toString()); - properties.put("zookeeper.connection.timeout.ms", kafkaConfiguration.zookeeperSessionTimeoutMs.toString()); - properties.put("zookeeper.sync.time.ms", kafkaConfiguration.zookeeperSyncTimeMs.toString()); + properties.put("group.id", kafkaConfiguration.getGroupId()); + properties.put("zookeeper.connect", kafkaConfiguration.getZookeeperConnect()); + properties.put("consumer.id", kafkaConfiguration.getConsumerId()); + properties.put("socket.timeout.ms", kafkaConfiguration.getSocketTimeoutMs().toString()); + properties.put("socket.receive.buffer.bytes", kafkaConfiguration.getSocketReceiveBufferBytes().toString()); + properties.put("fetch.message.max.bytes", kafkaConfiguration.getFetchMessageMaxBytes().toString()); + properties.put("auto.commit.enable", kafkaConfiguration.getAutoCommitEnable().toString()); + properties.put("auto.commit.interval.ms", kafkaConfiguration.getAutoCommitIntervalMs().toString()); + properties.put("queued.max.message.chunks", kafkaConfiguration.getQueuedMaxMessageChunks().toString()); + properties.put("rebalance.max.retries", kafkaConfiguration.getRebalanceMaxRetries().toString()); + properties.put("fetch.min.bytes", kafkaConfiguration.getFetchMinBytes().toString()); + properties.put("fetch.wait.max.ms", kafkaConfiguration.getFetchWaitMaxMs().toString()); + properties.put("rebalance.backoff.ms", kafkaConfiguration.getRebalanceBackoffMs().toString()); + properties.put("refresh.leader.backoff.ms", kafkaConfiguration.getRefreshLeaderBackoffMs().toString()); + properties.put("auto.offset.reset", kafkaConfiguration.getAutoOffsetReset()); + properties.put("consumer.timeout.ms", kafkaConfiguration.getConsumerTimeoutMs().toString()); + properties.put("client.id", kafkaConfiguration.getClientId()); + properties.put("zookeeper.session.timeout.ms", kafkaConfiguration.getZookeeperSessionTimeoutMs().toString()); + properties.put("zookeeper.connection.timeout.ms", kafkaConfiguration.getZookeeperConnectionTimeoutMs().toString()); + properties.put("zookeeper.sync.time.ms", kafkaConfiguration.getZookeeperSyncTimeMs().toString()); return properties; } diff --git a/src/main/java/com/hpcloud/KafkaConsumerRunnableBasic.java b/src/main/java/com/hpcloud/consumer/KafkaConsumerRunnableBasic.java similarity index 94% rename from src/main/java/com/hpcloud/KafkaConsumerRunnableBasic.java rename to src/main/java/com/hpcloud/consumer/KafkaConsumerRunnableBasic.java index a4856774..c6f835b4 100644 --- a/src/main/java/com/hpcloud/KafkaConsumerRunnableBasic.java +++ b/src/main/java/com/hpcloud/consumer/KafkaConsumerRunnableBasic.java @@ -1,5 +1,6 @@ -package com.hpcloud; +package com.hpcloud.consumer; +import com.hpcloud.event.StringEvent; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.dsl.Disruptor; import kafka.consumer.ConsumerIterator; diff --git a/src/main/java/com/hpcloud/MonConsumer.java b/src/main/java/com/hpcloud/consumer/MonConsumer.java similarity index 95% rename from src/main/java/com/hpcloud/MonConsumer.java rename to src/main/java/com/hpcloud/consumer/MonConsumer.java index 678ffd87..99bcf33b 100644 --- a/src/main/java/com/hpcloud/MonConsumer.java +++ b/src/main/java/com/hpcloud/consumer/MonConsumer.java @@ -1,4 +1,4 @@ -package com.hpcloud; +package com.hpcloud.consumer; import com.google.inject.Inject; import com.yammer.dropwizard.lifecycle.Managed; diff --git a/src/main/java/com/hpcloud/DisruptorFactory.java b/src/main/java/com/hpcloud/disruptor/DisruptorFactory.java similarity index 80% rename from src/main/java/com/hpcloud/DisruptorFactory.java rename to src/main/java/com/hpcloud/disruptor/DisruptorFactory.java index 47a45375..447ed904 100644 --- a/src/main/java/com/hpcloud/DisruptorFactory.java +++ b/src/main/java/com/hpcloud/disruptor/DisruptorFactory.java @@ -1,6 +1,11 @@ -package com.hpcloud; +package com.hpcloud.disruptor; import com.google.inject.Inject; +import com.hpcloud.configuration.MonPersisterConfiguration; +import com.hpcloud.event.StringEvent; +import com.hpcloud.event.StringEventFactory; +import com.hpcloud.event.StringEventHandler; +import com.hpcloud.event.StringEventHandlerFactory; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; @@ -26,11 +31,11 @@ public class DisruptorFactory { Executor executor = Executors.newCachedThreadPool(); StringEventFactory stringEventFactory = new StringEventFactory(); - int buffersize = configuration.getDisruptorConfiguration().bufferSize; + int buffersize = configuration.getDisruptorConfiguration().getBufferSize(); Disruptor disruptor = new Disruptor(stringEventFactory, buffersize, executor); - int batchSize = configuration.getVerticaOutputProcessorConfiguration().batchSize; - int numOutputProcessors = configuration.getVerticaOutputProcessorConfiguration().numProcessors; + int batchSize = configuration.getVerticaOutputProcessorConfiguration().getBatchSize(); + int numOutputProcessors = configuration.getDisruptorConfiguration().getNumProcessors(); EventHandlerGroup handlerGroup = null; for (int i = 0; i < numOutputProcessors; ++i) { diff --git a/src/main/java/com/hpcloud/StringEvent.java b/src/main/java/com/hpcloud/event/StringEvent.java similarity index 87% rename from src/main/java/com/hpcloud/StringEvent.java rename to src/main/java/com/hpcloud/event/StringEvent.java index a27f25a2..e6b7b54c 100644 --- a/src/main/java/com/hpcloud/StringEvent.java +++ b/src/main/java/com/hpcloud/event/StringEvent.java @@ -1,4 +1,4 @@ -package com.hpcloud; +package com.hpcloud.event; public class StringEvent { diff --git a/src/main/java/com/hpcloud/StringEventFactory.java b/src/main/java/com/hpcloud/event/StringEventFactory.java similarity index 91% rename from src/main/java/com/hpcloud/StringEventFactory.java rename to src/main/java/com/hpcloud/event/StringEventFactory.java index 7c4699d1..b641ade0 100644 --- a/src/main/java/com/hpcloud/StringEventFactory.java +++ b/src/main/java/com/hpcloud/event/StringEventFactory.java @@ -1,4 +1,4 @@ -package com.hpcloud; +package com.hpcloud.event; import com.lmax.disruptor.EventFactory; diff --git a/src/main/java/com/hpcloud/StringEventHandler.java b/src/main/java/com/hpcloud/event/StringEventHandler.java similarity index 94% rename from src/main/java/com/hpcloud/StringEventHandler.java rename to src/main/java/com/hpcloud/event/StringEventHandler.java index 12b89886..e4afd16b 100644 --- a/src/main/java/com/hpcloud/StringEventHandler.java +++ b/src/main/java/com/hpcloud/event/StringEventHandler.java @@ -1,7 +1,8 @@ -package com.hpcloud; +package com.hpcloud.event; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; +import com.hpcloud.repository.VerticaMetricRepository; import com.lmax.disruptor.EventHandler; public class StringEventHandler implements EventHandler { diff --git a/src/main/java/com/hpcloud/StringEventHandlerFactory.java b/src/main/java/com/hpcloud/event/StringEventHandlerFactory.java similarity index 91% rename from src/main/java/com/hpcloud/StringEventHandlerFactory.java rename to src/main/java/com/hpcloud/event/StringEventHandlerFactory.java index 7bbf8f81..8d656c2b 100644 --- a/src/main/java/com/hpcloud/StringEventHandlerFactory.java +++ b/src/main/java/com/hpcloud/event/StringEventHandlerFactory.java @@ -1,4 +1,4 @@ -package com.hpcloud; +package com.hpcloud.event; import com.google.inject.assistedinject.Assisted; diff --git a/src/main/java/com/hpcloud/SimpleHealthCheck.java b/src/main/java/com/hpcloud/healthcheck/SimpleHealthCheck.java similarity index 75% rename from src/main/java/com/hpcloud/SimpleHealthCheck.java rename to src/main/java/com/hpcloud/healthcheck/SimpleHealthCheck.java index b93abc5b..9e180066 100644 --- a/src/main/java/com/hpcloud/SimpleHealthCheck.java +++ b/src/main/java/com/hpcloud/healthcheck/SimpleHealthCheck.java @@ -1,10 +1,10 @@ -package com.hpcloud; +package com.hpcloud.healthcheck; import com.yammer.metrics.core.HealthCheck; public class SimpleHealthCheck extends HealthCheck { - protected SimpleHealthCheck(String name) { + public SimpleHealthCheck(String name) { super(name); } diff --git a/src/main/java/com/hpcloud/VerticaMetricRepository.java b/src/main/java/com/hpcloud/repository/VerticaMetricRepository.java similarity index 96% rename from src/main/java/com/hpcloud/VerticaMetricRepository.java rename to src/main/java/com/hpcloud/repository/VerticaMetricRepository.java index ad459db5..d92dff33 100644 --- a/src/main/java/com/hpcloud/VerticaMetricRepository.java +++ b/src/main/java/com/hpcloud/repository/VerticaMetricRepository.java @@ -1,4 +1,4 @@ -package com.hpcloud; +package com.hpcloud.repository; import org.skife.jdbi.v2.DBI; import org.slf4j.Logger; diff --git a/src/main/java/com/hpcloud/VerticaRepository.java b/src/main/java/com/hpcloud/repository/VerticaRepository.java similarity index 95% rename from src/main/java/com/hpcloud/VerticaRepository.java rename to src/main/java/com/hpcloud/repository/VerticaRepository.java index 66403d06..9c849454 100644 --- a/src/main/java/com/hpcloud/VerticaRepository.java +++ b/src/main/java/com/hpcloud/repository/VerticaRepository.java @@ -1,4 +1,4 @@ -package com.hpcloud; +package com.hpcloud.repository; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; diff --git a/src/main/java/com/hpcloud/PlaceHolder.java b/src/main/java/com/hpcloud/resource/PlaceHolder.java similarity index 87% rename from src/main/java/com/hpcloud/PlaceHolder.java rename to src/main/java/com/hpcloud/resource/PlaceHolder.java index a682cae0..bdb8c3e2 100644 --- a/src/main/java/com/hpcloud/PlaceHolder.java +++ b/src/main/java/com/hpcloud/resource/PlaceHolder.java @@ -1,4 +1,4 @@ -package com.hpcloud; +package com.hpcloud.resource; public class PlaceHolder { private final String content; diff --git a/src/main/java/com/hpcloud/Resource.java b/src/main/java/com/hpcloud/resource/Resource.java similarity index 91% rename from src/main/java/com/hpcloud/Resource.java rename to src/main/java/com/hpcloud/resource/Resource.java index ed313490..79b70fe1 100644 --- a/src/main/java/com/hpcloud/Resource.java +++ b/src/main/java/com/hpcloud/resource/Resource.java @@ -1,4 +1,4 @@ -package com.hpcloud; +package com.hpcloud.resource; import javax.ws.rs.GET; import javax.ws.rs.Path; diff --git a/src/main/resources/com/hpcloud/mon-persister-config.yml b/src/main/resources/com/hpcloud/mon-persister-config.yml index 118bcb94..70b9d600 100644 --- a/src/main/resources/com/hpcloud/mon-persister-config.yml +++ b/src/main/resources/com/hpcloud/mon-persister-config.yml @@ -4,7 +4,7 @@ name: mon-persister kafkaConfiguration: # See http://kafka.apache.org/documentation.html#api for semantics and defaults. topic: test - numThreads: 1 + numThreads: 2 groupId: 1 zookeeperConnect: localhost:2181 consumerId: 1 @@ -28,11 +28,10 @@ kafkaConfiguration: disruptorConfiguration: bufferSize: 1048576 - numThreads: 1 + numProcessors: 2 verticaOutputProcessorConfiguration: batchSize: 10 - numProcessors: 1 databaseConfiguration: driverClass: com.vertica.jdbc.Driver diff --git a/src/test/java/com/hpcloud/MonConsumerTest.java b/src/test/java/com/hpcloud/MonConsumerTest.java index 62a704ed..ea924549 100644 --- a/src/test/java/com/hpcloud/MonConsumerTest.java +++ b/src/test/java/com/hpcloud/MonConsumerTest.java @@ -1,5 +1,7 @@ package com.hpcloud; +import com.hpcloud.consumer.KafkaConsumer; +import com.hpcloud.consumer.MonConsumer; import org.junit.After; import org.junit.Before; import org.junit.Test;