From 7c6394b1f847838535d1ccd7f4c37f3ac4ccdda1 Mon Sep 17 00:00:00 2001 From: Craig Bryant Date: Mon, 5 May 2014 17:53:37 -0600 Subject: [PATCH] Use the new timestamp in MetricEnvelope which is the time the API creates the MetricEnvelope. The API immediately hands the MetricEnvelope to Kafka so the Threshold Engine can determine its progess in emptying the Queue. Change the code that checks for "lagging" metrics to use this timestamp instead of the Metric timestamp since the lagging code deals with emptying the Kafka queue and this timestamp is a much better measure of how backed up the kafka queue is. The metric timestamp is set by the agent and it is much likelier for the time to be off. Switch to mon-common build 48 which has the new timestamp. The MetricFilteringBolt will work correctly if the API is using an older version of MetricEnvelope without the timestamp, the lagging code just won't be invoked. Change the tests to work with the new timestamp. Had to back down to an older version of scala or the Threshold Engine would not start with a java.lang.NoClassDefFoundError: scala/reflect/ClassManifest --- pom.xml | 15 ++++- .../thresholding/MetricFilteringBolt.java | 63 +++++++++++-------- .../thresholding/MetricSpout.java | 5 +- .../mon/ThresholdingEngineAlarmTest.java | 6 +- .../hpcloud/mon/ThresholdingEngineTest.java | 10 +-- .../thresholding/MetricFilteringBoltTest.java | 53 +++++++++------- 6 files changed, 90 insertions(+), 62 deletions(-) diff --git a/pom.xml b/pom.xml index 43f2273..cd3d073 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ - 1.0.0.43 + 1.0.0.48 0.9.1-incubating true UTF-8 @@ -63,8 +63,8 @@ - + org.slf4j slf4j-api @@ -75,6 +75,17 @@ slf4j-log4j12 1.7.6 + + + org.scala-lang + scala-compiler + 2.9.2 + + + org.scala-lang + scala-library + 2.9.2 + diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java index 28270c8..a4e497c 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java @@ -84,9 +84,9 @@ public class MetricFilteringBolt extends BaseRichBolt { public static final int LAG_MESSAGE_PERIOD_DEFAULT = 30; public static final String[] FIELDS = new String[] { "metricDefinitionAndTenantId", "metric" }; - private static final int MIN_LAG_VALUE = PropertyFinder.getIntProperty(MIN_LAG_VALUE_KEY, MIN_LAG_VALUE_DEFAULT, 0, Integer.MAX_VALUE); + private static final int MIN_LAG_VALUE = 1000 * PropertyFinder.getIntProperty(MIN_LAG_VALUE_KEY, MIN_LAG_VALUE_DEFAULT, 0, Integer.MAX_VALUE); private static final int MAX_LAG_MESSAGES = PropertyFinder.getIntProperty(MAX_LAG_MESSAGES_KEY, MAX_LAG_MESSAGES_DEFAULT, 0, Integer.MAX_VALUE); - private static final int LAG_MESSAGE_PERIOD = PropertyFinder.getIntProperty(LAG_MESSAGE_PERIOD_KEY, LAG_MESSAGE_PERIOD_DEFAULT, 1, 600); + private static final int LAG_MESSAGE_PERIOD = 1000 * PropertyFinder.getIntProperty(LAG_MESSAGE_PERIOD_KEY, LAG_MESSAGE_PERIOD_DEFAULT, 1, 600); private static final Map> METRIC_DEFS = new ConcurrentHashMap<>(); private static final MetricDefinitionAndTenantIdMatcher matcher = new MetricDefinitionAndTenantIdMatcher(); private static final Object SENTINAL = new Object(); @@ -121,8 +121,9 @@ public class MetricFilteringBolt extends BaseRichBolt { try { if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) { final MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(0); - final Metric metric = (Metric)tuple.getValue(1); - checkLag(metric); + final Long timestamp = (Long)tuple.getValue(1); + final Metric metric = (Metric)tuple.getValue(2); + checkLag(timestamp); LOG.debug("metric definition and tenant id: {}", metricDefinitionAndTenantId); // Check for exact matches as well as inexact matches @@ -153,31 +154,34 @@ public class MetricFilteringBolt extends BaseRichBolt { } } - private void checkLag(Metric metric) { - final long now = getCurrentSeconds(); - final long lag = now - metric.timestamp; + private void checkLag(Long apiTimeStamp) { + if (!lagging) + return; + if ((apiTimeStamp == null) || (apiTimeStamp.longValue() == 0)) + return; // Remove this code at some point, just to handle old metrics without a NPE + final long now = getCurrentTime(); + final long lag = now - apiTimeStamp.longValue(); if (lag < minLag) minLag = lag; - if (lagging) - if (minLag <= MIN_LAG_VALUE) { - lagging = false; - LOG.info("Metrics no longer lagging, minLag = {}", minLag); - } - else if (minLagMessageSent >= MAX_LAG_MESSAGES) { - LOG.info("Waited for {} seconds for Metrics to catch up. Giving up. minLag = {}", + if (minLag <= MIN_LAG_VALUE) { + lagging = false; + LOG.info("Metrics no longer lagging, minLag = {}", minLag); + } + else if (minLagMessageSent >= MAX_LAG_MESSAGES) { + LOG.info("Waited for {} seconds for Metrics to catch up. Giving up. minLag = {}", MAX_LAG_MESSAGES * LAG_MESSAGE_PERIOD, minLag); - lagging = false; - } - else if (lastMinLagMessageSent == 0) { - lastMinLagMessageSent = now; - } - else if ((now - lastMinLagMessageSent) >= LAG_MESSAGE_PERIOD) { - LOG.info("Sending {} message, minLag = {}", MetricAggregationBolt.METRICS_BEHIND, minLag); - collector.emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, + lagging = false; + } + else if (lastMinLagMessageSent == 0) { + lastMinLagMessageSent = now; + } + else if ((now - lastMinLagMessageSent) >= LAG_MESSAGE_PERIOD) { + LOG.info("Sending {} message, minLag = {}", MetricAggregationBolt.METRICS_BEHIND, minLag); + collector.emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, new Values(MetricAggregationBolt.METRICS_BEHIND)); - lastMinLagMessageSent = now; - minLagMessageSent++; - } + lastMinLagMessageSent = now; + minLagMessageSent++; + } } private void removeSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String subAlarmId) { @@ -214,6 +218,11 @@ public class MetricFilteringBolt extends BaseRichBolt { // Iterate again to ensure we only emit each metricDef once for (MetricDefinitionAndTenantId metricDefinitionAndTenantId : METRIC_DEFS.keySet()) collector.emit(new Values(metricDefinitionAndTenantId, null)); + LOG.info("Found {} Metric Definitions", METRIC_DEFS.size()); + // Just output these here so they are only output once per JVM + LOG.info("MIN_LAG_VALUE set to {} seconds", MIN_LAG_VALUE/1000); + LOG.info("MAX_LAG_MESSAGES set to {}", MAX_LAG_MESSAGES); + LOG.info("LAG_MESSAGE_PERIOD set to {} seconds", LAG_MESSAGE_PERIOD/1000); } } } @@ -223,8 +232,8 @@ public class MetricFilteringBolt extends BaseRichBolt { /** * Allow override of current time for testing. */ - protected long getCurrentSeconds() { - return System.currentTimeMillis() / 1000; + protected long getCurrentTime() { + return System.currentTimeMillis(); } private void addMetricDef(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String subAlarmId) { diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java index 08c6d31..03eda73 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java @@ -34,7 +34,7 @@ public class MetricSpout extends KafkaSpout { private static final long serialVersionUID = 744004533863562119L; - public static final String[] FIELDS = new String[] { "metricDefinitionAndTenantId", "metric" }; + public static final String[] FIELDS = new String[] { "metricDefinitionAndTenantId", "apiTimeStamp", "metric" }; public static final String DEFAULT_TENANT_ID = "TENANT_ID_NOT_SET"; public MetricSpout(MetricSpoutConfig metricSpoutConfig) { @@ -58,7 +58,8 @@ public class MetricSpout extends KafkaSpout { LOG.error("No tenantId so using default tenantId {} for Metric {}", DEFAULT_TENANT_ID, metricEnvelope.metric); tenantId = DEFAULT_TENANT_ID; } - collector.emit(new Values(new MetricDefinitionAndTenantId(metricEnvelope.metric.definition(), tenantId), metricEnvelope.metric)); + collector.emit(new Values(new MetricDefinitionAndTenantId(metricEnvelope.metric.definition(), tenantId), + metricEnvelope.creationTime, metricEnvelope.metric)); } @Override diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java index ddbb051..435936c 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java @@ -241,13 +241,13 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { else { System.out.println("Feeding metrics..."); - long time = System.currentTimeMillis() / 1000; + long time = System.currentTimeMillis(); ++goodValueCount; for (final SubAlarm subAlarm : subAlarms) { final MetricDefinitionAndTenantId metricDefinitionAndTenantId = new MetricDefinitionAndTenantId(subAlarm.getExpression().getMetricDefinition(), TEST_ALARM_TENANT_ID); - metricSpout.feed(new Values(metricDefinitionAndTenantId, - new Metric(metricDefinitionAndTenantId.metricDefinition, time, (double) (goodValueCount == 15 ? 1 : 555)))); + metricSpout.feed(new Values(metricDefinitionAndTenantId, time, + new Metric(metricDefinitionAndTenantId.metricDefinition, time / 1000, (double) (goodValueCount == 15 ? 1 : 555)))); } } try { diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java index 25b7c0d..5999c03 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java @@ -192,11 +192,11 @@ public class ThresholdingEngineTest extends TopologyTestCase { if (feedCount > 0) { System.out.println("Feeding metrics..."); - long time = System.currentTimeMillis() / 1000; - metricSpout.feed(new Values(new MetricDefinitionAndTenantId(cpuMetricDef, TEST_ALARM_TENANT_ID), new Metric(cpuMetricDef.name, - cpuMetricDef.dimensions, time, (double) (++goodValueCount == 15 ? 1 : 555)))); - metricSpout.feed(new Values(new MetricDefinitionAndTenantId(memMetricDef, TEST_ALARM_TENANT_ID), new Metric(memMetricDef.name, - extraMemMetricDefDimensions, time, (double) (goodValueCount == 15 ? 1 : 555)))); + long time = System.currentTimeMillis(); + metricSpout.feed(new Values(new MetricDefinitionAndTenantId(cpuMetricDef, TEST_ALARM_TENANT_ID), time, + new Metric(cpuMetricDef.name, cpuMetricDef.dimensions, time / 1000, (double) (++goodValueCount == 15 ? 1 : 555)))); + metricSpout.feed(new Values(new MetricDefinitionAndTenantId(memMetricDef, TEST_ALARM_TENANT_ID), time, + new Metric(memMetricDef.name, extraMemMetricDefDimensions, time / 1000, (double) (goodValueCount == 15 ? 1 : 555)))); if (--feedCount == 0) waitCount = 3; diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java index 80cf7a1..801d9ce 100644 --- a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java +++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java @@ -107,20 +107,23 @@ public class MetricFilteringBoltTest { final MockMetricFilteringBolt bolt = createBolt(new ArrayList(0), collector, true); - final long prepareTime = bolt.getCurrentSeconds(); + final long prepareTime = bolt.getCurrentTime(); final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition(); - final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime - MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT, 42.0)); + final long oldestTimestamp = prepareTime - MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT*1000; + final Tuple lateMetricTuple = createMetricTuple(metricDefinition, oldestTimestamp, new Metric(metricDefinition, oldestTimestamp/1000, 42.0)); bolt.execute(lateMetricTuple); verify(collector, times(1)).ack(lateMetricTuple); - bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT); - final Tuple lateMetricTuple2 = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0)); + bolt.setCurrentTime(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT*1000); + final Tuple lateMetricTuple2 = createMetricTuple(metricDefinition, prepareTime, new Metric(metricDefinition, prepareTime/1000, 42.0)); bolt.execute(lateMetricTuple2); verify(collector, times(1)).ack(lateMetricTuple2); verify(collector, times(1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, new Values(MetricAggregationBolt.METRICS_BEHIND)); - bolt.setCurrentSeconds(prepareTime + 2 * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT); - final Tuple metricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, bolt.getCurrentSeconds() - MetricFilteringBolt.MIN_LAG_VALUE_DEFAULT, 42.0)); + bolt.setCurrentTime(prepareTime + 2 * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT * 1000); + long caughtUpTimestamp = bolt.getCurrentTime() - MetricFilteringBolt.MIN_LAG_VALUE_DEFAULT; + final Tuple metricTuple = createMetricTuple(metricDefinition, caughtUpTimestamp, new Metric(metricDefinition, caughtUpTimestamp/1000, 42.0)); bolt.execute(metricTuple); + // Metrics are caught up so there should not be another METRICS_BEHIND message verify(collector, times(1)).ack(metricTuple); verify(collector, times(1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, new Values(MetricAggregationBolt.METRICS_BEHIND)); @@ -131,15 +134,15 @@ public class MetricFilteringBoltTest { final MockMetricFilteringBolt bolt = createBolt(new ArrayList(0), collector, true); - long prepareTime = bolt.getCurrentSeconds(); + long prepareTime = bolt.getCurrentTime(); final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition(); // Fake sending metrics for MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT seconds boolean first = true; // Need to send MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT + 1 metrics because the lag message is not // output on the first one. for (int i = 0; i < MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT + 1; i++) { - final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0)); - bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT); + final Tuple lateMetricTuple = createMetricTuple(metricDefinition, prepareTime, new Metric(metricDefinition, prepareTime/1000, 42.0)); + bolt.setCurrentTime(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT * 1000); bolt.execute(lateMetricTuple); verify(collector, times(1)).ack(lateMetricTuple); if (!first) { @@ -147,10 +150,11 @@ public class MetricFilteringBoltTest { new Values(MetricAggregationBolt.METRICS_BEHIND)); } first = false; - prepareTime = bolt.getCurrentSeconds(); + prepareTime = bolt.getCurrentTime(); } // One more - final Tuple metricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, bolt.getCurrentSeconds() - MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT, 42.0)); + long timestamp = bolt.getCurrentTime() - MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT * 1000; + final Tuple metricTuple = createMetricTuple(metricDefinition, timestamp, new Metric(metricDefinition, timestamp/1000, 42.0)); bolt.execute(metricTuple); verify(collector, times(1)).ack(metricTuple); // Won't be any more of these @@ -160,19 +164,19 @@ public class MetricFilteringBoltTest { private static class MockMetricFilteringBolt extends MetricFilteringBolt { private static final long serialVersionUID = 1L; - private long currentSeconds = System.currentTimeMillis() / 1000; + private long currentTimeMillis = System.currentTimeMillis(); public MockMetricFilteringBolt(MetricDefinitionDAO metricDefDAO) { super(metricDefDAO); } @Override - protected long getCurrentSeconds() { - return currentSeconds; + protected long getCurrentTime() { + return currentTimeMillis; } - public void setCurrentSeconds(final long currentSeconds) { - this.currentSeconds = currentSeconds; + public void setCurrentTime(final long currentTimeMillis) { + this.currentTimeMillis = currentTimeMillis; } } @@ -220,17 +224,17 @@ public class MetricFilteringBoltTest { for (final SubAlarm subAlarm : subAlarms) { // First do a MetricDefinition that is an exact match final MetricDefinition metricDefinition = subAlarm.getExpression().getMetricDefinition(); - final Tuple exactTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, metricTimestamp++, 42.0)); + final Tuple exactTuple = createMetricTuple(metricDefinition, metricTimestamp++ * 1000, new Metric(metricDefinition, metricTimestamp, 42.0)); bolt1.execute(exactTuple); verify(collector1, times(1)).ack(exactTuple); - verify(collector1, howMany).emit(exactTuple.getValues()); + verify(collector1, howMany).emit(new Values(exactTuple.getValue(0), exactTuple.getValue(2))); // Now do a MetricDefinition with an extra dimension that should still match the SubAlarm final Map extraDimensions = new HashMap<>(metricDefinition.dimensions); extraDimensions.put("group", "group_a"); final MetricDefinition inexactMetricDef = new MetricDefinition(metricDefinition.name, extraDimensions); - Metric inexactMetric = new Metric(inexactMetricDef, metricTimestamp++, 42.0); - final Tuple inexactTuple = createMetricTuple(metricDefinition, inexactMetric); + Metric inexactMetric = new Metric(inexactMetricDef, metricTimestamp, 42.0); + final Tuple inexactTuple = createMetricTuple(metricDefinition, metricTimestamp++ * 1000, inexactMetric); bolt1.execute(inexactTuple); verify(collector1, times(1)).ack(inexactTuple); // We want the MetricDefinitionAndTenantId from the exact tuple, but the inexactMetric @@ -310,11 +314,14 @@ public class MetricFilteringBoltTest { } private Tuple createMetricTuple(final MetricDefinition metricDefinition, + final long timestamp, final Metric metric) { final MkTupleParam tupleParam = new MkTupleParam(); - tupleParam.setFields(MetricFilteringBolt.FIELDS); - tupleParam.setStream(Streams.DEFAULT_STREAM_ID); final Tuple tuple = Testing.testTuple(Arrays.asList( - new MetricDefinitionAndTenantId(metricDefinition, TEST_TENANT_ID), metric), tupleParam); + tupleParam.setFields(MetricSpout.FIELDS); + tupleParam.setStream(Streams.DEFAULT_STREAM_ID); + final Tuple tuple = Testing.testTuple(Arrays.asList( + new MetricDefinitionAndTenantId(metricDefinition, TEST_TENANT_ID), + timestamp, metric), tupleParam); return tuple; } }