From a6ecbc35d1daae06a73b6d4f2ad8ffd2b97bf94d Mon Sep 17 00:00:00 2001 From: Deklan Dieterly Date: Mon, 10 Mar 2014 09:01:19 -0600 Subject: [PATCH] Only flush staging tables if enough time has elapsed since last flush. --- .../hpcloud/dedupe/MonDeDuperHeartbeat.java | 22 +++++-------- .../event/MetricMessageEventHandler.java | 32 ++++++++++++++++--- .../com/hpcloud/message/MetricEnvelope.java | 13 +++++--- .../repository/VerticaMetricRepository.java | 16 ++++++---- .../com/hpcloud/mon-persister-config.yml | 8 ++--- 5 files changed, 57 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/hpcloud/dedupe/MonDeDuperHeartbeat.java b/src/main/java/com/hpcloud/dedupe/MonDeDuperHeartbeat.java index b682749b..2f160cff 100644 --- a/src/main/java/com/hpcloud/dedupe/MonDeDuperHeartbeat.java +++ b/src/main/java/com/hpcloud/dedupe/MonDeDuperHeartbeat.java @@ -1,7 +1,6 @@ package com.hpcloud.dedupe; import com.google.inject.Inject; -import com.hpcloud.configuration.MonPersisterConfiguration; import com.hpcloud.disruptor.event.MetricMessageEvent; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.dsl.Disruptor; @@ -13,16 +12,13 @@ public class MonDeDuperHeartbeat implements Managed { private static Logger logger = LoggerFactory.getLogger(MonDeDuperHeartbeat.class); - private final MonPersisterConfiguration configuration; private final Disruptor disruptor; private final DeDuperRunnable deDuperRunnable; @Inject - public MonDeDuperHeartbeat(MonPersisterConfiguration configuration, - Disruptor disruptor) { - this.configuration = configuration; + public MonDeDuperHeartbeat(Disruptor disruptor) { this.disruptor = disruptor; - this.deDuperRunnable = new DeDuperRunnable(configuration, disruptor); + this.deDuperRunnable = new DeDuperRunnable(disruptor); } @@ -41,24 +37,22 @@ public class MonDeDuperHeartbeat implements Managed { private static Logger logger = LoggerFactory.getLogger(DeDuperRunnable.class); - private final MonPersisterConfiguration configuration; private final Disruptor disruptor; - private DeDuperRunnable(MonPersisterConfiguration configuration, Disruptor disruptor) { - this.configuration = configuration; + private DeDuperRunnable(Disruptor disruptor) { this.disruptor = disruptor; } @Override public void run() { - int seconds = configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds(); for (; ; ) { try { - Thread.sleep(seconds * 1000); - logger.debug("Waking up after sleeping " + seconds + " seconds, yawn..."); + // Send a heartbeat every second. + Thread.sleep(1000); + logger.debug("Waking up after sleeping 1 seconds, yawn..."); // Send heartbeat - logger.debug("Sending dedupe heartbeat message"); + logger.debug("Sending heartbeat message"); disruptor.publishEvent(new EventTranslator() { @Override @@ -69,7 +63,7 @@ public class MonDeDuperHeartbeat implements Managed { }); } catch (Exception e) { - logger.error("Failed to send dedupe heartbeat", e); + logger.error("Failed to send heartbeat", e); } } diff --git a/src/main/java/com/hpcloud/disruptor/event/MetricMessageEventHandler.java b/src/main/java/com/hpcloud/disruptor/event/MetricMessageEventHandler.java index 8d5231c5..99ca1db2 100644 --- a/src/main/java/com/hpcloud/disruptor/event/MetricMessageEventHandler.java +++ b/src/main/java/com/hpcloud/disruptor/event/MetricMessageEventHandler.java @@ -2,6 +2,7 @@ package com.hpcloud.disruptor.event; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; +import com.hpcloud.configuration.MonPersisterConfiguration; import com.hpcloud.message.MetricMessage; import com.hpcloud.repository.VerticaMetricRepository; import com.lmax.disruptor.EventHandler; @@ -31,21 +32,32 @@ public class MetricMessageEventHandler implements EventHandler meta; @@ -27,4 +22,12 @@ public class MetricEnvelope { this.metric = metric; this.meta = meta; } + + @Override + public String toString() { + return "MetricEnvelope{" + + "metric=" + metric + + ", meta=" + meta + + '}'; + } } \ No newline at end of file diff --git a/src/main/java/com/hpcloud/repository/VerticaMetricRepository.java b/src/main/java/com/hpcloud/repository/VerticaMetricRepository.java index b050e588..7f37511f 100644 --- a/src/main/java/com/hpcloud/repository/VerticaMetricRepository.java +++ b/src/main/java/com/hpcloud/repository/VerticaMetricRepository.java @@ -1,5 +1,8 @@ package com.hpcloud.repository; +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.Timer; +import com.yammer.metrics.core.TimerContext; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.PreparedBatch; import org.slf4j.Logger; @@ -16,12 +19,6 @@ public class VerticaMetricRepository extends VerticaRepository { private static final String SQL_INSERT_INTO_METRICS = "insert into MonMetrics.metrics (metric_definition_id, time_stamp, value) values (:metric_definition_id, :time_stamp, :value)"; - private static final String SQL_INSERT_INTO_STAGING_DEFINITIONS = - "insert into MonMetrics.stagedDefinitions values (:metric_definition_id, :name, :tenant_id," + - ":region)"; - private static final String SQL_INSERT_INTO_STAGING_DIMENSIONS = - "insert into MonMetrics.stagedDimensions values (:metric_definition_id, :name, :value)"; - private static final String defs = "(" + " metric_definition_id BINARY(20) NOT NULL," + " name VARCHAR NOT NULL," + @@ -46,6 +43,9 @@ public class VerticaMetricRepository extends VerticaRepository { private final String dsDefs; private final String dsDims; + private final Timer commitTimer = Metrics.newTimer(this.getClass(), "commits-timer"); + private final Timer flushTimer = Metrics.newTimer(this.getClass(), "staging-tables-flushed-timer"); + @Inject public VerticaMetricRepository(DBI dbi) throws NoSuchAlgorithmException, SQLException { super(dbi); @@ -93,12 +93,14 @@ public class VerticaMetricRepository extends VerticaRepository { public void flush() { commitBatch(); long startTime = System.currentTimeMillis(); + TimerContext context = flushTimer.time(); handle.execute(dsDefs); handle.execute("truncate table " + sDefs); handle.execute(dsDims); handle.execute("truncate table " + sDims); handle.commit(); handle.begin(); + context.stop(); long endTime = System.currentTimeMillis(); logger.debug("Flushing staging tables took " + (endTime - startTime) / 1000 + " seconds"); @@ -106,11 +108,13 @@ public class VerticaMetricRepository extends VerticaRepository { private void commitBatch() { long startTime = System.currentTimeMillis(); + TimerContext context = commitTimer.time(); metricsBatch.execute(); stagedDefinitionsBatch.execute(); stagedDimensionsBatch.execute(); handle.commit(); handle.begin(); + context.stop(); long endTime = System.currentTimeMillis(); logger.debug("Commiting batch took " + (endTime - startTime) / 1000 + " seconds"); } diff --git a/src/main/resources/com/hpcloud/mon-persister-config.yml b/src/main/resources/com/hpcloud/mon-persister-config.yml index aee80f57..c44fbeb7 100644 --- a/src/main/resources/com/hpcloud/mon-persister-config.yml +++ b/src/main/resources/com/hpcloud/mon-persister-config.yml @@ -28,7 +28,7 @@ kafkaConfiguration: disruptorConfiguration: bufferSize: 1048576 - numProcessors: 2 + numProcessors: 1 verticaOutputProcessorConfiguration: batchSize: 25000 @@ -41,8 +41,8 @@ databaseConfiguration: # url: jdbc:vertica://mon-aw1rdd1-vertica0001.rndd.aw1.hpcloud.net:5433/som url: jdbc:vertica://15.185.94.245:5433/som # user: persister -# user: mon_persister - user: dbadmin + user: mon_persister +# user: dbadmin password: password properties: ssl: false @@ -81,8 +81,8 @@ logging: # Sets the level for 'com.example.app' to DEBUG. com.example.app: DEBUG + com.hpcloud: debug com.hpcloud.repository: DEBUG - com.hpcloud.disruptor.event: INFO # Settings for logging to stdout. console: