From 8cfaaca7a6192087766e5f73652a7bd21cd4b73e Mon Sep 17 00:00:00 2001 From: Craig Bryant Date: Wed, 23 Apr 2014 17:12:50 -0600 Subject: [PATCH] JIRA JAH-11 Keep Alarms from switching to UNDETERMINED if the Threshold Engine has been down for long enough that Kafka has buffered enough Metrics that it takes the Threshold Engine minutes to catch up. Added code to MetricFilteringBolt to check on the lag between current time and the timestamp in the metric. If too large, a message is sent to the MetricAggregationBolt to hold off on evaluating alarms. Added unit tests. --- .../java/com/hpcloud/mon/TopologyModule.java | 1 + .../thresholding/MetricAggregationBolt.java | 22 +++++- .../thresholding/MetricFilteringBolt.java | 61 ++++++++++++++-- .../thresholding/PropertyFinder.java | 34 +++++++++ .../MetricAggregationBoltTest.java | 15 ++++ .../thresholding/MetricFilteringBoltTest.java | 69 ++++++++++++++++++- .../thresholding/PropertyFinderTest.java | 46 +++++++++++++ 7 files changed, 240 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/hpcloud/mon/infrastructure/thresholding/PropertyFinder.java create mode 100644 src/test/java/com/hpcloud/mon/infrastructure/thresholding/PropertyFinderTest.java diff --git a/src/main/java/com/hpcloud/mon/TopologyModule.java b/src/main/java/com/hpcloud/mon/TopologyModule.java index c8ca60c..0c393c2 100644 --- a/src/main/java/com/hpcloud/mon/TopologyModule.java +++ b/src/main/java/com/hpcloud/mon/TopologyModule.java @@ -111,6 +111,7 @@ public class TopologyModule extends AbstractModule { new MetricAggregationBolt(config.database, config.sporadicMetricNamespaces), config.aggregationBoltThreads) .fieldsGrouping("filtering-bolt", new Fields(MetricFilteringBolt.FIELDS[0])) + .allGrouping("filtering-bolt", MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM) .fieldsGrouping("event-bolt", EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID, new Fields(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_FIELDS[1])) .fieldsGrouping("event-bolt", EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID, diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java index eb74165..2b62ca6 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java @@ -51,8 +51,11 @@ import com.hpcloud.util.Injector; */ public class MetricAggregationBolt extends BaseRichBolt { private static final long serialVersionUID = 5624314196838090726L; - public static final String TICK_TUPLE_SECONDS_KEY = "maas.aggregation.tick.seconds"; + public static final String TICK_TUPLE_SECONDS_KEY = "com.hpcloud.mon.aggregation.tick.seconds"; public static final String[] FIELDS = new String[] { "alarmId", "subAlarm" }; + public static final String METRIC_AGGREGATION_CONTROL_STREAM = "MetricAggregationControl"; + public static final String[] METRIC_AGGREGATION_CONTROL_FIELDS = new String[] { "directive" }; + public static final String METRICS_BEHIND = "MetricsBehind"; final Map subAlarmStatsRepos = new HashMap<>(); private transient Logger LOG; @@ -62,6 +65,7 @@ public class MetricAggregationBolt extends BaseRichBolt { private Set sporadicMetricNamespaces = Collections.emptySet(); private OutputCollector collector; private int evaluationTimeOffset; + private boolean upToDate = true; public MetricAggregationBolt(SubAlarmDAO subAlarmDAO) { this.subAlarmDAO = subAlarmDAO; @@ -88,6 +92,8 @@ public class MetricAggregationBolt extends BaseRichBolt { MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(0); Metric metric = (Metric) tuple.getValueByField("metric"); aggregateValues(metricDefinitionAndTenantId, metric); + } else if (METRIC_AGGREGATION_CONTROL_STREAM.equals(tuple.getSourceStreamId())) { + processControl(tuple.getString(0)); } else { String eventType = tuple.getString(0); MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(1); @@ -112,6 +118,15 @@ public class MetricAggregationBolt extends BaseRichBolt { } } + private void processControl(final String directive) { + if (METRICS_BEHIND.equals(directive)) { + LOG.debug("Received {}", directive); + this.upToDate = false; + } + else + LOG.error("Unknown directive '{}'", directive); + } + @Override public Map getComponentConfiguration() { Map conf = new HashMap(); @@ -158,6 +173,11 @@ public class MetricAggregationBolt extends BaseRichBolt { * ago, then sliding the window to the current time. */ void evaluateAlarmsAndSlideWindows() { + if (!upToDate) { + LOG.info("Not evaluating SubAlarms because Metrics are not up to date"); + upToDate = true; + return; + } long newWindowTimestamp = System.currentTimeMillis() / 1000; for (SubAlarmStatsRepository subAlarmStatsRepo : subAlarmStatsRepos.values()) for (SubAlarmStats subAlarmStats : subAlarmStatsRepo.get()) { diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java index c3190fe..65c7763 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java @@ -16,6 +16,7 @@ import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import com.hpcloud.mon.common.model.metric.Metric; import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId; import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantIdMatcher; import com.hpcloud.mon.domain.model.SubAlarm; @@ -60,15 +61,30 @@ import com.hpcloud.util.Injector; */ public class MetricFilteringBolt extends BaseRichBolt { private static final long serialVersionUID = 1096706128973976599L; + + public static final String MIN_LAG_VALUE_KEY = "com.hpcloud.mon.filtering.minLagValue"; + public static final int MIN_LAG_VALUE_DEFAULT = 10; + public static final String MAX_LAG_MESSAGES_KEY = "com.hpcloud.mon.filtering.maxLagMessages"; + public static final int MAX_LAG_MESSAGES_DEFAULT = 10; + public static final String LAG_MESSAGE_PERIOD_KEY = "com.hpcloud.mon.filtering.lagMessagePeriod"; + public static final int LAG_MESSAGE_PERIOD_DEFAULT = 30; + public static final String[] FIELDS = new String[] { "metricDefinitionAndTenantId", "metric" }; + + private static final int MIN_LAG_VALUE = PropertyFinder.getIntProperty(MIN_LAG_VALUE_KEY, MIN_LAG_VALUE_DEFAULT, 0, Integer.MAX_VALUE); + private static final int MAX_LAG_MESSAGES = PropertyFinder.getIntProperty(MAX_LAG_MESSAGES_KEY, MAX_LAG_MESSAGES_DEFAULT, 0, Integer.MAX_VALUE); + private static final int LAG_MESSAGE_PERIOD = PropertyFinder.getIntProperty(LAG_MESSAGE_PERIOD_KEY, LAG_MESSAGE_PERIOD_DEFAULT, 1, 600); private static final Map> METRIC_DEFS = new ConcurrentHashMap<>(); private static final MetricDefinitionAndTenantIdMatcher matcher = new MetricDefinitionAndTenantIdMatcher(); private static final Object SENTINAL = new Object(); - public static final String[] FIELDS = new String[] { "metricDefinitionAndTenantId", "metric" }; private transient Logger LOG; private DataSourceFactory dbConfig; private transient MetricDefinitionDAO metricDefDAO; private OutputCollector collector; + private long minLag = Long.MAX_VALUE; + private long lastMinLagMessageSent = 0; + private long minLagMessageSent = 0; + private boolean lagging = true; public MetricFilteringBolt(DataSourceFactory dbConfig) { this.dbConfig = dbConfig; @@ -81,6 +97,8 @@ public class MetricFilteringBolt extends BaseRichBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FIELDS)); + declarer.declareStream(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, + new Fields(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_FIELDS)); } @Override @@ -88,15 +106,15 @@ public class MetricFilteringBolt extends BaseRichBolt { LOG.debug("tuple: {}", tuple); try { if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) { - MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(0); + final MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(0); + final Metric metric = (Metric)tuple.getValue(1); + checkLag(metric); LOG.debug("metric definition and tenant id: {}", metricDefinitionAndTenantId); // Check for exact matches as well as inexact matches final List matches = matcher.match(metricDefinitionAndTenantId); for (final MetricDefinitionAndTenantId match : matches) - // Must send with the MetricDefinitionAndTenantId that it matches, not one in the Metric although - // they may be the same - collector.emit(tuple, new Values(match, tuple.getValue(1))); + collector.emit(tuple, new Values(match, metric)); } else { String eventType = tuple.getString(0); MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(1); @@ -121,6 +139,30 @@ public class MetricFilteringBolt extends BaseRichBolt { } } + private void checkLag(Metric metric) { + final long now = getCurrentSeconds(); + final long lag = now - metric.timestamp; + if (lag < minLag) + minLag = lag; + if (lagging) + if (minLag <= MIN_LAG_VALUE) { + lagging = false; + LOG.info("Metrics no longer lagging, minLag = {}", minLag); + } + else if (minLagMessageSent >= MAX_LAG_MESSAGES) { + LOG.info("Waited for {} seconds for Metrics to catch up. Giving up. minLag = {}", + MAX_LAG_MESSAGES * LAG_MESSAGE_PERIOD, minLag); + lagging = false; + } + else if ((now - lastMinLagMessageSent) >= LAG_MESSAGE_PERIOD) { + LOG.info("Sending {} message, minLag = {}", MetricAggregationBolt.METRICS_BEHIND, minLag); + collector.emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, + new Values(MetricAggregationBolt.METRICS_BEHIND)); + lastMinLagMessageSent = now; + minLagMessageSent++; + } + } + private void removeSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String subAlarmId) { synchronized(SENTINAL) { final List subAlarmIds = METRIC_DEFS.get(metricDefinitionAndTenantId); @@ -158,6 +200,15 @@ public class MetricFilteringBolt extends BaseRichBolt { } } } + // Not really when it was sent, but want to wait at least LAG_MESSAGE_PERIOD before sending a message + lastMinLagMessageSent = getCurrentSeconds(); + } + + /** + * Allow override of current time for testing. + */ + protected long getCurrentSeconds() { + return System.currentTimeMillis() / 1000; } private void addMetricDef(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String subAlarmId) { diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/PropertyFinder.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/PropertyFinder.java new file mode 100644 index 0000000..6315d9f --- /dev/null +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/PropertyFinder.java @@ -0,0 +1,34 @@ +package com.hpcloud.mon.infrastructure.thresholding; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PropertyFinder { + private static final Logger LOG = LoggerFactory.getLogger(PropertyFinder.class); + + private PropertyFinder() + { + } + + public static int getIntProperty(final String name, + final int defaultValue, + final int minValue, + final int maxValue) { + final String valueString = System.getProperty(name); + if ((valueString != null) && !valueString.isEmpty()) { + try { + final int newValue = Integer.parseInt(valueString); + if ((newValue >= minValue) && (newValue <= maxValue)) { + return newValue; + } + LOG.warn("Invalid value {} for property '{}' must be >= {} and <= {}, using default value of {}", + valueString, name, minValue, maxValue, defaultValue); + } + catch (NumberFormatException nfe) { + LOG.warn("Not an integer value '{}' for property '{}', using default value of {}", valueString, + name, defaultValue); + } + } + return defaultValue; + } +} diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java index 54a10d7..d721bfb 100644 --- a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java +++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java @@ -3,6 +3,7 @@ package com.hpcloud.mon.infrastructure.thresholding; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.Mockito.reset; @@ -126,6 +127,7 @@ public class MetricAggregationBoltTest { bolt.execute(createMetricTuple(metricDef2, null)); + // Send metrics for subAlarm1 long t1 = System.currentTimeMillis() / 1000; bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1, 100))); @@ -134,6 +136,7 @@ public class MetricAggregationBoltTest { final Tuple tickTuple = createTickTuple(); bolt.execute(tickTuple); + verify(collector, times(1)).ack(tickTuple); assertEquals(subAlarm1.getState(), AlarmState.OK); assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); @@ -147,6 +150,7 @@ public class MetricAggregationBoltTest { bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1, 99))); bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, System.currentTimeMillis() / 1000, 94))); bolt.execute(tickTuple); + verify(collector, times(1)).ack(tickTuple); assertEquals(subAlarm1.getState(), AlarmState.ALARM); assertEquals(subAlarm2.getState(), AlarmState.ALARM); @@ -174,8 +178,19 @@ public class MetricAggregationBoltTest { subAlarm2.setState(AlarmState.UNDETERMINED); bolt.execute(createMetricTuple(metricDef2, null)); + final MkTupleParam tupleParam = new MkTupleParam(); + tupleParam.setStream(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM); + final Tuple lagTuple = Testing.testTuple(Arrays.asList(MetricAggregationBolt.METRICS_BEHIND), tupleParam); + bolt.execute(lagTuple); + verify(collector, times(1)).ack(lagTuple); + final Tuple tickTuple = createTickTuple(); bolt.execute(tickTuple); + verify(collector, times(1)).ack(tickTuple); + verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + + bolt.execute(tickTuple); + verify(collector, times(2)).ack(tickTuple); verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); } diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java index 6f4f85f..7cdec69 100644 --- a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java +++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java @@ -66,11 +66,11 @@ public class MetricFilteringBoltTest { return result; } - private MetricFilteringBolt createBolt(List initialMetricDefinitions, + private MockMetricFilteringBolt createBolt(List initialMetricDefinitions, final OutputCollector collector, boolean willEmit) { final MetricDefinitionDAO dao = mock(MetricDefinitionDAO.class); when(dao.findForAlarms()).thenReturn(initialMetricDefinitions); - MetricFilteringBolt bolt = new MetricFilteringBolt(dao); + MockMetricFilteringBolt bolt = new MockMetricFilteringBolt(dao); final Map config = new HashMap<>(); final TopologyContext context = mock(TopologyContext.class); @@ -85,6 +85,71 @@ public class MetricFilteringBoltTest { return bolt; } + public void testLagging() { + final OutputCollector collector = mock(OutputCollector.class); + + final MockMetricFilteringBolt bolt = createBolt(new ArrayList(0), collector, true); + + final long prepareTime = bolt.getCurrentSeconds(); + final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition(); + final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0)); + bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT); + bolt.execute(lateMetricTuple); + verify(collector, times(1)).ack(lateMetricTuple); + verify(collector, times(1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, + new Values(MetricAggregationBolt.METRICS_BEHIND)); + bolt.setCurrentSeconds(prepareTime + 2 * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT); + final Tuple metricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, bolt.getCurrentSeconds() - MetricFilteringBolt.MIN_LAG_VALUE_DEFAULT, 42.0)); + bolt.execute(metricTuple); + verify(collector, times(1)).ack(metricTuple); + verify(collector, times(1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, + new Values(MetricAggregationBolt.METRICS_BEHIND)); + } + + public void testLaggingTooLong() { + final OutputCollector collector = mock(OutputCollector.class); + + final MockMetricFilteringBolt bolt = createBolt(new ArrayList(0), collector, true); + + long prepareTime = bolt.getCurrentSeconds(); + final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition(); + // Fake sending metrics for MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT seconds + for (int i = 0; i < MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT; i++) { + final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0)); + bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT); + bolt.execute(lateMetricTuple); + verify(collector, times(1)).ack(lateMetricTuple); + verify(collector, times(i + 1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, + new Values(MetricAggregationBolt.METRICS_BEHIND)); + prepareTime = bolt.getCurrentSeconds(); + } + // One more + final Tuple metricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, bolt.getCurrentSeconds() - MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT, 42.0)); + bolt.execute(metricTuple); + verify(collector, times(1)).ack(metricTuple); + // Won't be any more of these + verify(collector, times(MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, + new Values(MetricAggregationBolt.METRICS_BEHIND)); + } + + private static class MockMetricFilteringBolt extends MetricFilteringBolt { + private static final long serialVersionUID = 1L; + private long currentSeconds = System.currentTimeMillis() / 1000; + + public MockMetricFilteringBolt(MetricDefinitionDAO metricDefDAO) { + super(metricDefDAO); + } + + @Override + protected long getCurrentSeconds() { + return currentSeconds; + } + + public void setCurrentSeconds(final long currentSeconds) { + this.currentSeconds = currentSeconds; + } + } + public void testNoInitial() { MetricFilteringBolt.clearMetricDefinitions(); final OutputCollector collector1 = mock(OutputCollector.class); diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/PropertyFinderTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/PropertyFinderTest.java new file mode 100644 index 0000000..02a4cb4 --- /dev/null +++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/PropertyFinderTest.java @@ -0,0 +1,46 @@ +package com.hpcloud.mon.infrastructure.thresholding; + +import static org.testng.Assert.assertEquals; + +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test +public class PropertyFinderTest { + + private static String PROPERTY_NAME = "com.hpcloud.mon.infrastructure.thresholding.Prop"; + + @BeforeMethod + public void beforeMethod() { + System.clearProperty(PROPERTY_NAME); + } + + public void shouldUseNewValue() { + final int expectedValue = 45; + System.setProperty(PROPERTY_NAME, String.valueOf(expectedValue)); + assertEquals(expectedValue, PropertyFinder.getIntProperty(PROPERTY_NAME, 30, 0, Integer.MAX_VALUE)); + } + + public void shouldUseDefaultValueBecausePropertyNotSet() { + final int defaultValue = 45; + assertEquals(defaultValue, PropertyFinder.getIntProperty(PROPERTY_NAME, defaultValue, 0, Integer.MAX_VALUE)); + } + + public void shouldUseDefaultValueBecausePropertyNotANumber() { + final int defaultValue = 45; + System.setProperty(PROPERTY_NAME, "AAA"); + assertEquals(defaultValue, PropertyFinder.getIntProperty(PROPERTY_NAME, defaultValue, 0, Integer.MAX_VALUE)); + } + + public void shouldUseDefaultValueBecausePropertyTooSmall() { + final int defaultValue = 45; + System.setProperty(PROPERTY_NAME, "0"); + assertEquals(defaultValue, PropertyFinder.getIntProperty(PROPERTY_NAME, defaultValue, 1, Integer.MAX_VALUE)); + } + + public void shouldUseDefaultValueBecausePropertyTooLarge() { + final int defaultValue = 45; + System.setProperty(PROPERTY_NAME, "10"); + assertEquals(defaultValue, PropertyFinder.getIntProperty(PROPERTY_NAME, defaultValue, 9, 9)); + } +}