From 75787e5b37f44718f5d7dbb275343fefaab171f7 Mon Sep 17 00:00:00 2001 From: Deklan Dieterly Date: Fri, 7 Mar 2014 17:01:14 -0700 Subject: [PATCH] Use individual local temp staging tables. --- src/deb/etc/persister-config.yml-sample | 2 +- .../java/com/hpcloud/MonPersisterModule.java | 4 +- .../java/com/hpcloud/MonPersisterService.java | 6 +- .../java/com/hpcloud/dedupe/MonDeDuper.java | 140 ------------------ .../hpcloud/dedupe/MonDeDuperHeartbeat.java | 79 ++++++++++ .../event/MetricMessageEventHandler.java | 8 +- .../repository/VerticaMetricRepository.java | 47 +++++- .../com/hpcloud/mon-persister-config.yml | 5 +- 8 files changed, 136 insertions(+), 155 deletions(-) delete mode 100644 src/main/java/com/hpcloud/dedupe/MonDeDuper.java create mode 100644 src/main/java/com/hpcloud/dedupe/MonDeDuperHeartbeat.java diff --git a/src/deb/etc/persister-config.yml-sample b/src/deb/etc/persister-config.yml-sample index 470d4427..8493f619 100755 --- a/src/deb/etc/persister-config.yml-sample +++ b/src/deb/etc/persister-config.yml-sample @@ -31,7 +31,7 @@ disruptorConfiguration: numProcessors: 2 verticaOutputProcessorConfiguration: - batchSize: 10 + batchSize: 2 monDeDuperConfiguration: dedupeRunFrequencySeconds: 30 diff --git a/src/main/java/com/hpcloud/MonPersisterModule.java b/src/main/java/com/hpcloud/MonPersisterModule.java index b082bfc9..1cca98a8 100644 --- a/src/main/java/com/hpcloud/MonPersisterModule.java +++ b/src/main/java/com/hpcloud/MonPersisterModule.java @@ -9,7 +9,7 @@ import com.hpcloud.consumer.KafkaConsumerRunnableBasic; import com.hpcloud.consumer.KafkaConsumerRunnableBasicFactory; import com.hpcloud.consumer.MonConsumer; import com.hpcloud.dbi.DBIProvider; -import com.hpcloud.dedupe.MonDeDuper; +import com.hpcloud.dedupe.MonDeDuperHeartbeat; import com.hpcloud.disruptor.DisruptorExceptionHandler; import com.hpcloud.disruptor.DisruptorProvider; import com.hpcloud.disruptor.event.MetricMessageEventHandler; @@ -53,7 +53,7 @@ public class MonPersisterModule extends AbstractModule { bind(DBI.class).toProvider(DBIProvider.class).in(Scopes.SINGLETON); bind(MonConsumer.class); - bind(MonDeDuper.class); + bind(MonDeDuperHeartbeat.class); } } diff --git a/src/main/java/com/hpcloud/MonPersisterService.java b/src/main/java/com/hpcloud/MonPersisterService.java index e3413e0c..96ba1026 100644 --- a/src/main/java/com/hpcloud/MonPersisterService.java +++ b/src/main/java/com/hpcloud/MonPersisterService.java @@ -4,7 +4,7 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.hpcloud.configuration.MonPersisterConfiguration; import com.hpcloud.consumer.MonConsumer; -import com.hpcloud.dedupe.MonDeDuper; +import com.hpcloud.dedupe.MonDeDuperHeartbeat; import com.hpcloud.healthcheck.SimpleHealthCheck; import com.hpcloud.resource.Resource; import com.yammer.dropwizard.Service; @@ -35,8 +35,8 @@ public class MonPersisterService extends Service { MonConsumer monConsumer = injector.getInstance(MonConsumer.class); environment.manage(monConsumer); - MonDeDuper monDeDuper = injector.getInstance(MonDeDuper.class); - environment.manage(monDeDuper); + MonDeDuperHeartbeat monDeDuperHeartbeat = injector.getInstance(MonDeDuperHeartbeat.class); + environment.manage(monDeDuperHeartbeat); } } diff --git a/src/main/java/com/hpcloud/dedupe/MonDeDuper.java b/src/main/java/com/hpcloud/dedupe/MonDeDuper.java deleted file mode 100644 index 2897cef0..00000000 --- a/src/main/java/com/hpcloud/dedupe/MonDeDuper.java +++ /dev/null @@ -1,140 +0,0 @@ -package com.hpcloud.dedupe; - -import com.google.inject.Inject; -import com.hpcloud.configuration.MonPersisterConfiguration; -import com.yammer.dropwizard.lifecycle.Managed; -import com.yammer.metrics.Metrics; -import com.yammer.metrics.core.Timer; -import com.yammer.metrics.core.TimerContext; -import org.skife.jdbi.v2.DBI; -import org.skife.jdbi.v2.Handle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; - -public class MonDeDuper implements Managed { - - private static Logger logger = LoggerFactory.getLogger(MonDeDuper.class); - - private final MonPersisterConfiguration configuration; - private final DBI dbi; - private final DeDuperRunnable deDuperRunnable; - - @Inject - public MonDeDuper(MonPersisterConfiguration configuration, - DBI dbi) { - this.configuration = configuration; - this.dbi = dbi; - this.deDuperRunnable = new DeDuperRunnable(configuration, dbi); - - } - - @Override - public void start() throws Exception { - - Thread deduperThread = new Thread(deDuperRunnable); - deduperThread.start(); - } - - @Override - public void stop() throws Exception { - } - - private static class DeDuperRunnable implements Runnable { - - private static Logger logger = LoggerFactory.getLogger(DeDuperRunnable.class); - - private final MonPersisterConfiguration configuration; - private final DBI dbi; - private final Handle handle; - private final Timer dedupeTimer = Metrics.newTimer(this.getClass(), "dedupe-execution-timer"); - - private static final String DEDUPE_STAGING_DEFS = - "insert into MonMetrics.Definitions select distinct * from MonMetrics.StagedDefinitions where metric_definition_id not in (select metric_definition_id from MonMetrics.Definitions)"; - - private static final String DEDEUP_STAGING_DIMS = - "insert into MonMetrics.Dimensions select distinct * from MonMetrics.StagedDimensions where metric_definition_id not in (select metric_definition_id from MonMetrics.Dimensions)"; - - private static final String DELETE_STAGING_DEFS = - "delete from monmetrics.stageddefinitions where metric_definition_id in (select metric_definition_id from MonMetrics.Definitions)"; - - private static final String PURGE_STAGING_DEFS = - "select purge_table('monmetrics.stageddefinitions')"; - - private static final String DELETE_STAGING_DIMS = - "delete from monmetrics.stageddimensions where metric_definition_id in (select metric_definition_id from MonMetrics.Dimensions)"; - - private static final String PURGE_STAGING_DIMS = - "select purge_table('monmetrics.stageddimensions')"; - - private DeDuperRunnable(MonPersisterConfiguration configuration, DBI dbi) { - this.configuration = configuration; - this.dbi = dbi; - this.handle = this.dbi.open(); - this.handle.execute("SET TIME ZONE TO 'UTC'"); - try { - this.handle.getConnection().setAutoCommit(false); - } catch (SQLException e) { - logger.error("Failed to set autocommit to false", e); - System.exit(-1); - } - - } - - @Override - public void run() { - int seconds = configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds(); - long startTime; - long endTime; - for (; ; ) { - try { - Thread.sleep(seconds * 1000); - logger.debug("Waking up after sleeping " + seconds + " seconds, yawn..."); - - TimerContext context = dedupeTimer.time(); - - handle.begin(); - - startTime = System.currentTimeMillis(); - logger.debug("Executing: " + DELETE_STAGING_DEFS); - handle.execute(DELETE_STAGING_DEFS); - logger.debug("Executing: " + DEDUPE_STAGING_DEFS); - handle.execute(DEDUPE_STAGING_DEFS); - logger.debug("Executing: " + DELETE_STAGING_DEFS); - handle.execute(DELETE_STAGING_DEFS); - logger.debug("Executing: " + PURGE_STAGING_DEFS); - handle.execute(PURGE_STAGING_DEFS); - handle.commit(); - - endTime = System.currentTimeMillis(); - logger.debug("Deduping metric defintitions took " + (endTime - startTime) / 1000 + " seconds"); - - handle.begin(); - - startTime = System.currentTimeMillis(); - logger.debug("Executing: " + DELETE_STAGING_DIMS); - handle.execute(DELETE_STAGING_DIMS); - logger.debug("Executing: " + DEDEUP_STAGING_DIMS); - handle.execute(DEDEUP_STAGING_DIMS); - logger.debug("Executing: " + DELETE_STAGING_DIMS); - handle.execute(DELETE_STAGING_DIMS); - logger.debug("Executing: " + PURGE_STAGING_DIMS); - handle.execute(PURGE_STAGING_DIMS); - handle.commit(); - endTime = System.currentTimeMillis(); - logger.debug("Deduping metric dimensions took " + (endTime - startTime) / 1000 + " seconds"); - - context.stop(); - - } catch (InterruptedException e) { - logger.warn("Failed to wait for " + seconds + " between deduping", e); - } catch (Exception e) { - logger.error("Failed to dedupe", e); - } - - } - - } - } -} diff --git a/src/main/java/com/hpcloud/dedupe/MonDeDuperHeartbeat.java b/src/main/java/com/hpcloud/dedupe/MonDeDuperHeartbeat.java new file mode 100644 index 00000000..b682749b --- /dev/null +++ b/src/main/java/com/hpcloud/dedupe/MonDeDuperHeartbeat.java @@ -0,0 +1,79 @@ +package com.hpcloud.dedupe; + +import com.google.inject.Inject; +import com.hpcloud.configuration.MonPersisterConfiguration; +import com.hpcloud.disruptor.event.MetricMessageEvent; +import com.lmax.disruptor.EventTranslator; +import com.lmax.disruptor.dsl.Disruptor; +import com.yammer.dropwizard.lifecycle.Managed; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MonDeDuperHeartbeat implements Managed { + + private static Logger logger = LoggerFactory.getLogger(MonDeDuperHeartbeat.class); + + private final MonPersisterConfiguration configuration; + private final Disruptor disruptor; + private final DeDuperRunnable deDuperRunnable; + + @Inject + public MonDeDuperHeartbeat(MonPersisterConfiguration configuration, + Disruptor disruptor) { + this.configuration = configuration; + this.disruptor = disruptor; + this.deDuperRunnable = new DeDuperRunnable(configuration, disruptor); + + } + + @Override + public void start() throws Exception { + + Thread deduperThread = new Thread(deDuperRunnable); + deduperThread.start(); + } + + @Override + public void stop() throws Exception { + } + + private static class DeDuperRunnable implements Runnable { + + private static Logger logger = LoggerFactory.getLogger(DeDuperRunnable.class); + + private final MonPersisterConfiguration configuration; + private final Disruptor disruptor; + + private DeDuperRunnable(MonPersisterConfiguration configuration, Disruptor disruptor) { + this.configuration = configuration; + this.disruptor = disruptor; + } + + @Override + public void run() { + int seconds = configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds(); + for (; ; ) { + try { + Thread.sleep(seconds * 1000); + logger.debug("Waking up after sleeping " + seconds + " seconds, yawn..."); + + // Send heartbeat + logger.debug("Sending dedupe heartbeat message"); + disruptor.publishEvent(new EventTranslator() { + + @Override + public void translateTo(MetricMessageEvent event, long sequence) { + event.setMetricEnvelope(null); + + } + }); + + } catch (Exception e) { + logger.error("Failed to send dedupe heartbeat", e); + } + + } + + } + } +} diff --git a/src/main/java/com/hpcloud/disruptor/event/MetricMessageEventHandler.java b/src/main/java/com/hpcloud/disruptor/event/MetricMessageEventHandler.java index 8d4ea42e..8b763e43 100644 --- a/src/main/java/com/hpcloud/disruptor/event/MetricMessageEventHandler.java +++ b/src/main/java/com/hpcloud/disruptor/event/MetricMessageEventHandler.java @@ -58,6 +58,12 @@ public class MetricMessageEventHandler implements EventHandler meta = metricMessageEvent.getMetricEnvelope().meta; diff --git a/src/main/java/com/hpcloud/repository/VerticaMetricRepository.java b/src/main/java/com/hpcloud/repository/VerticaMetricRepository.java index 7e783ad8..6a7809f7 100644 --- a/src/main/java/com/hpcloud/repository/VerticaMetricRepository.java +++ b/src/main/java/com/hpcloud/repository/VerticaMetricRepository.java @@ -15,6 +15,7 @@ public class VerticaMetricRepository extends VerticaRepository { private static final String SQL_INSERT_INTO_METRICS = "insert into MonMetrics.metrics (metric_definition_id, time_stamp, value) values (:metric_definition_id, :time_stamp, :value)"; + private static final String SQL_INSERT_INTO_STAGING_DEFINITIONS = "insert into MonMetrics.stagedDefinitions values (:metric_definition_id, :name, :tenant_id," + ":region)"; @@ -35,26 +36,43 @@ public class VerticaMetricRepository extends VerticaRepository { ")"; private PreparedBatch metricsBatch; + private PreparedBatch stagedDefinitionsBatch; private PreparedBatch stagedDimensionsBatch; + private final String sDefs; + private final String sDims; + + private final String dsDefs; + private final String dsDims; + @Inject public VerticaMetricRepository(DBI dbi) throws NoSuchAlgorithmException, SQLException { super(dbi); logger.debug("Instantiating: " + this); - String sDefs = this.toString().replaceAll(".", "_").replaceAll("@", "_") + "staged_definitions"; - String sDims = this.toString().replaceAll(".", "_").replaceAll("@", "_") + "staged_dimensions"; - handle.execute("drop table if exists" + sDefs + " cascade"); - handle.execute("drop table if exists" + sDims + " cascade"); + this.sDefs = this.toString().replaceAll("\\.", "_").replaceAll("\\@", "_") + "_staged_definitions"; + logger.debug("temp staging definitions table: " + sDefs); + + this.sDims = this.toString().replaceAll("\\.", "_").replaceAll("\\@", "_") + "_staged_dimensions"; + logger.debug("temp staging dimensions table: " + sDims); + + this.dsDefs = "insert into MonMetrics.Definitions select distinct * from " + sDefs + " where metric_definition_id not in (select metric_definition_id from MonMetrics.Definitions)"; + logger.debug("insert stmt: " + dsDefs); + + this.dsDims = "insert into MonMetrics.Dimensions select distinct * from " + sDims + " where metric_definition_id not in (select metric_definition_id from MonMetrics.Dimensions)"; + logger.debug("insert stmt: " + dsDefs); + + handle.execute("drop table if exists " + sDefs + " cascade"); + handle.execute("drop table if exists " + sDims + " cascade"); handle.execute("create local temp table " + sDefs + " " + defs + " on commit preserve rows"); handle.execute("create local temp table " + sDims + " " + dims + " on commit preserve rows"); handle.getConnection().setAutoCommit(false); metricsBatch = handle.prepareBatch(SQL_INSERT_INTO_METRICS); - stagedDefinitionsBatch = handle.prepareBatch("insert into " + sDefs + " values (:metric_definition_id, :name, :tenant_id, "); - stagedDimensionsBatch = handle.prepareBatch(SQL_INSERT_INTO_STAGING_DIMENSIONS); + stagedDefinitionsBatch = handle.prepareBatch("insert into " + sDefs + " values (:metric_definition_id, :name, :tenant_id, :region)"); + stagedDimensionsBatch = handle.prepareBatch("insert into " + sDims + " values (:metric_definition_id, :name, :value)"); handle.begin(); } @@ -72,11 +90,28 @@ public class VerticaMetricRepository extends VerticaRepository { .bind(2, value); } + public void flush() { + commitBatch(); + long startTime = System.currentTimeMillis(); + handle.execute(dsDefs); + handle.execute("truncate table " + sDefs); + handle.execute(dsDims); + handle.execute("truncate table " + sDims); + handle.commit(); + handle.begin(); + long endTime = System.currentTimeMillis(); + logger.debug("Flushing staging tables took " + (endTime - startTime) / 1000 + " seconds"); + + } + public void commitBatch() { + long startTime = System.currentTimeMillis(); metricsBatch.execute(); stagedDefinitionsBatch.execute(); stagedDimensionsBatch.execute(); handle.commit(); handle.begin(); + long endTime = System.currentTimeMillis(); + logger.debug("Commiting batch took " + (endTime - startTime) / 1000 + " seconds"); } } diff --git a/src/main/resources/com/hpcloud/mon-persister-config.yml b/src/main/resources/com/hpcloud/mon-persister-config.yml index 10301d9f..aee80f57 100644 --- a/src/main/resources/com/hpcloud/mon-persister-config.yml +++ b/src/main/resources/com/hpcloud/mon-persister-config.yml @@ -31,7 +31,7 @@ disruptorConfiguration: numProcessors: 2 verticaOutputProcessorConfiguration: - batchSize: 50000 + batchSize: 25000 monDeDuperConfiguration: dedupeRunFrequencySeconds: 30 @@ -81,7 +81,8 @@ logging: # Sets the level for 'com.example.app' to DEBUG. com.example.app: DEBUG - com.hpcloud: DEBUG + com.hpcloud.repository: DEBUG + com.hpcloud.disruptor.event: INFO # Settings for logging to stdout. console: