Only flush staging tables if enough time has elapsed since last flush.

This commit is contained in:
Deklan Dieterly 2014-03-10 09:01:19 -06:00
parent 9cf71380e1
commit a6ecbc35d1
5 changed files with 57 additions and 34 deletions

View File

@ -1,7 +1,6 @@
package com.hpcloud.dedupe; package com.hpcloud.dedupe;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.hpcloud.configuration.MonPersisterConfiguration;
import com.hpcloud.disruptor.event.MetricMessageEvent; import com.hpcloud.disruptor.event.MetricMessageEvent;
import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
@ -13,16 +12,13 @@ public class MonDeDuperHeartbeat implements Managed {
private static Logger logger = LoggerFactory.getLogger(MonDeDuperHeartbeat.class); private static Logger logger = LoggerFactory.getLogger(MonDeDuperHeartbeat.class);
private final MonPersisterConfiguration configuration;
private final Disruptor disruptor; private final Disruptor disruptor;
private final DeDuperRunnable deDuperRunnable; private final DeDuperRunnable deDuperRunnable;
@Inject @Inject
public MonDeDuperHeartbeat(MonPersisterConfiguration configuration, public MonDeDuperHeartbeat(Disruptor disruptor) {
Disruptor disruptor) {
this.configuration = configuration;
this.disruptor = disruptor; this.disruptor = disruptor;
this.deDuperRunnable = new DeDuperRunnable(configuration, disruptor); this.deDuperRunnable = new DeDuperRunnable(disruptor);
} }
@ -41,24 +37,22 @@ public class MonDeDuperHeartbeat implements Managed {
private static Logger logger = LoggerFactory.getLogger(DeDuperRunnable.class); private static Logger logger = LoggerFactory.getLogger(DeDuperRunnable.class);
private final MonPersisterConfiguration configuration;
private final Disruptor disruptor; private final Disruptor disruptor;
private DeDuperRunnable(MonPersisterConfiguration configuration, Disruptor disruptor) { private DeDuperRunnable(Disruptor disruptor) {
this.configuration = configuration;
this.disruptor = disruptor; this.disruptor = disruptor;
} }
@Override @Override
public void run() { public void run() {
int seconds = configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds();
for (; ; ) { for (; ; ) {
try { try {
Thread.sleep(seconds * 1000); // Send a heartbeat every second.
logger.debug("Waking up after sleeping " + seconds + " seconds, yawn..."); Thread.sleep(1000);
logger.debug("Waking up after sleeping 1 seconds, yawn...");
// Send heartbeat // Send heartbeat
logger.debug("Sending dedupe heartbeat message"); logger.debug("Sending heartbeat message");
disruptor.publishEvent(new EventTranslator<MetricMessageEvent>() { disruptor.publishEvent(new EventTranslator<MetricMessageEvent>() {
@Override @Override
@ -69,7 +63,7 @@ public class MonDeDuperHeartbeat implements Managed {
}); });
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to send dedupe heartbeat", e); logger.error("Failed to send heartbeat", e);
} }
} }

View File

@ -2,6 +2,7 @@ package com.hpcloud.disruptor.event;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.Assisted;
import com.hpcloud.configuration.MonPersisterConfiguration;
import com.hpcloud.message.MetricMessage; import com.hpcloud.message.MetricMessage;
import com.hpcloud.repository.VerticaMetricRepository; import com.hpcloud.repository.VerticaMetricRepository;
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventHandler;
@ -31,21 +32,32 @@ public class MetricMessageEventHandler implements EventHandler<MetricMessageEven
private final SimpleDateFormat simpleDateFormat; private final SimpleDateFormat simpleDateFormat;
private long millisSinceLastFlush = System.currentTimeMillis();
private final long millisBetweenFlushes;
private final int secondsBetweenFlushes;
private final VerticaMetricRepository verticaMetricRepository; private final VerticaMetricRepository verticaMetricRepository;
private final MonPersisterConfiguration configuration;
private final Counter metricCounter = Metrics.newCounter(this.getClass(), "metrics-added-to-batch-counter"); private final Counter metricCounter = Metrics.newCounter(this.getClass(), "metrics-added-to-batch-counter");
private final Counter definitionCounter = Metrics.newCounter(this.getClass(), "metric-definitions-added-to-batch-counter"); private final Counter definitionCounter = Metrics.newCounter(this.getClass(), "metric-definitions-added-to-batch-counter");
private final Counter dimensionCounter = Metrics.newCounter(this.getClass(), "metric-dimensions-added-to-batch-counter"); private final Counter dimensionCounter = Metrics.newCounter(this.getClass(), "metric-dimensions-added-to-batch-counter");
private final Meter metricMessageMeter = Metrics.newMeter(this.getClass(), "Metric", "metrics-messages-processed-meter", TimeUnit.SECONDS); private final Meter metricMessageMeter = Metrics.newMeter(this.getClass(), "Metric", "metrics-messages-processed-meter", TimeUnit.SECONDS);
private final Meter commitMeter = Metrics.newMeter(this.getClass(), "Metric", "commits-executed-meter", TimeUnit.SECONDS); private final Meter commitMeter = Metrics.newMeter(this.getClass(), "Metric", "commits-executed-meter", TimeUnit.SECONDS);
private final Timer commitTimer = Metrics.newTimer(this.getClass(), "commits-executed-timer"); private final Timer commitTimer = Metrics.newTimer(this.getClass(), "total-commit-and-flush-timer");
@Inject @Inject
public MetricMessageEventHandler(VerticaMetricRepository verticaMetricRepository, public MetricMessageEventHandler(VerticaMetricRepository verticaMetricRepository,
MonPersisterConfiguration configuration,
@Assisted("ordinal") int ordinal, @Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors, @Assisted("numProcessors") int numProcessors,
@Assisted("batchSize") int batchSize) { @Assisted("batchSize") int batchSize) {
this.verticaMetricRepository = verticaMetricRepository; this.verticaMetricRepository = verticaMetricRepository;
this.configuration = configuration;
this.secondsBetweenFlushes = configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds();
this.millisBetweenFlushes = secondsBetweenFlushes * 1000;
this.ordinal = ordinal; this.ordinal = ordinal;
this.numProcessors = numProcessors; this.numProcessors = numProcessors;
this.batchSize = batchSize; this.batchSize = batchSize;
@ -53,14 +65,20 @@ public class MetricMessageEventHandler implements EventHandler<MetricMessageEven
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT-0")); simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT-0"));
} }
@Override @Override
public void onEvent(MetricMessageEvent metricMessageEvent, long sequence, boolean b) throws Exception { public void onEvent(MetricMessageEvent metricMessageEvent, long sequence, boolean b) throws Exception {
if (metricMessageEvent.getMetricEnvelope() == null) { if (metricMessageEvent.getMetricEnvelope() == null) {
logger.debug("Received heartbeat message. Flushing staging tables."); logger.debug("Received heartbeat message. Checking last flush time.");
verticaMetricRepository.flush(); if (millisSinceLastFlush + millisBetweenFlushes < System.currentTimeMillis()) {
logger.debug("It's been more than " + secondsBetweenFlushes + " seconds since last flush. Flushing staging tables now...");
flush();
} else {
logger.debug("It has not been more than " + secondsBetweenFlushes + " seeconds since last flush. No need to perform flush at this time.");
}
return; return;
} }
@ -124,11 +142,15 @@ public class MetricMessageEventHandler implements EventHandler<MetricMessageEven
} }
if (sequence % batchSize == (batchSize - 1)) { if (sequence % batchSize == (batchSize - 1)) {
TimerContext context = commitTimer.time(); TimerContext context = commitTimer.time();
verticaMetricRepository.flush(); flush();
context.stop(); context.stop();
commitMeter.mark(); commitMeter.mark();
} }
}
private void flush() {
verticaMetricRepository.flush();
millisSinceLastFlush = System.currentTimeMillis();
} }
} }

View File

@ -4,11 +4,6 @@ import com.google.common.base.Preconditions;
import java.util.Map; import java.util.Map;
/**
* A metric envelope.
*
* @author Jonathan Halterman
*/
public class MetricEnvelope { public class MetricEnvelope {
public MetricMessage metric; public MetricMessage metric;
public Map<String, Object> meta; public Map<String, Object> meta;
@ -27,4 +22,12 @@ public class MetricEnvelope {
this.metric = metric; this.metric = metric;
this.meta = meta; this.meta = meta;
} }
@Override
public String toString() {
return "MetricEnvelope{" +
"metric=" + metric +
", meta=" + meta +
'}';
}
} }

View File

@ -1,5 +1,8 @@
package com.hpcloud.repository; package com.hpcloud.repository;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.PreparedBatch;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -16,12 +19,6 @@ public class VerticaMetricRepository extends VerticaRepository {
private static final String SQL_INSERT_INTO_METRICS = private static final String SQL_INSERT_INTO_METRICS =
"insert into MonMetrics.metrics (metric_definition_id, time_stamp, value) values (:metric_definition_id, :time_stamp, :value)"; "insert into MonMetrics.metrics (metric_definition_id, time_stamp, value) values (:metric_definition_id, :time_stamp, :value)";
private static final String SQL_INSERT_INTO_STAGING_DEFINITIONS =
"insert into MonMetrics.stagedDefinitions values (:metric_definition_id, :name, :tenant_id," +
":region)";
private static final String SQL_INSERT_INTO_STAGING_DIMENSIONS =
"insert into MonMetrics.stagedDimensions values (:metric_definition_id, :name, :value)";
private static final String defs = "(" + private static final String defs = "(" +
" metric_definition_id BINARY(20) NOT NULL," + " metric_definition_id BINARY(20) NOT NULL," +
" name VARCHAR NOT NULL," + " name VARCHAR NOT NULL," +
@ -46,6 +43,9 @@ public class VerticaMetricRepository extends VerticaRepository {
private final String dsDefs; private final String dsDefs;
private final String dsDims; private final String dsDims;
private final Timer commitTimer = Metrics.newTimer(this.getClass(), "commits-timer");
private final Timer flushTimer = Metrics.newTimer(this.getClass(), "staging-tables-flushed-timer");
@Inject @Inject
public VerticaMetricRepository(DBI dbi) throws NoSuchAlgorithmException, SQLException { public VerticaMetricRepository(DBI dbi) throws NoSuchAlgorithmException, SQLException {
super(dbi); super(dbi);
@ -93,12 +93,14 @@ public class VerticaMetricRepository extends VerticaRepository {
public void flush() { public void flush() {
commitBatch(); commitBatch();
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
TimerContext context = flushTimer.time();
handle.execute(dsDefs); handle.execute(dsDefs);
handle.execute("truncate table " + sDefs); handle.execute("truncate table " + sDefs);
handle.execute(dsDims); handle.execute(dsDims);
handle.execute("truncate table " + sDims); handle.execute("truncate table " + sDims);
handle.commit(); handle.commit();
handle.begin(); handle.begin();
context.stop();
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
logger.debug("Flushing staging tables took " + (endTime - startTime) / 1000 + " seconds"); logger.debug("Flushing staging tables took " + (endTime - startTime) / 1000 + " seconds");
@ -106,11 +108,13 @@ public class VerticaMetricRepository extends VerticaRepository {
private void commitBatch() { private void commitBatch() {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
TimerContext context = commitTimer.time();
metricsBatch.execute(); metricsBatch.execute();
stagedDefinitionsBatch.execute(); stagedDefinitionsBatch.execute();
stagedDimensionsBatch.execute(); stagedDimensionsBatch.execute();
handle.commit(); handle.commit();
handle.begin(); handle.begin();
context.stop();
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
logger.debug("Commiting batch took " + (endTime - startTime) / 1000 + " seconds"); logger.debug("Commiting batch took " + (endTime - startTime) / 1000 + " seconds");
} }

View File

@ -28,7 +28,7 @@ kafkaConfiguration:
disruptorConfiguration: disruptorConfiguration:
bufferSize: 1048576 bufferSize: 1048576
numProcessors: 2 numProcessors: 1
verticaOutputProcessorConfiguration: verticaOutputProcessorConfiguration:
batchSize: 25000 batchSize: 25000
@ -41,8 +41,8 @@ databaseConfiguration:
# url: jdbc:vertica://mon-aw1rdd1-vertica0001.rndd.aw1.hpcloud.net:5433/som # url: jdbc:vertica://mon-aw1rdd1-vertica0001.rndd.aw1.hpcloud.net:5433/som
url: jdbc:vertica://15.185.94.245:5433/som url: jdbc:vertica://15.185.94.245:5433/som
# user: persister # user: persister
# user: mon_persister user: mon_persister
user: dbadmin # user: dbadmin
password: password password: password
properties: properties:
ssl: false ssl: false
@ -81,8 +81,8 @@ logging:
# Sets the level for 'com.example.app' to DEBUG. # Sets the level for 'com.example.app' to DEBUG.
com.example.app: DEBUG com.example.app: DEBUG
com.hpcloud: debug
com.hpcloud.repository: DEBUG com.hpcloud.repository: DEBUG
com.hpcloud.disruptor.event: INFO
# Settings for logging to stdout. # Settings for logging to stdout.
console: console: