JIRA JAH-11 Keep Alarms from switching to UNDETERMINED if the Threshold Engine has been down for long enough that Kafka has buffered enough Metrics that it takes the Threshold Engine minutes to catch up.

Added code to MetricFilteringBolt to check on the lag between current time and the timestamp in the metric. If too large, a message is sent to the MetricAggregationBolt to hold off on evaluating alarms.

Added unit tests.
This commit is contained in:
Craig Bryant 2014-04-23 17:12:50 -06:00
parent 6deb113700
commit 8cfaaca7a6
7 changed files with 240 additions and 8 deletions

View File

@ -111,6 +111,7 @@ public class TopologyModule extends AbstractModule {
new MetricAggregationBolt(config.database, config.sporadicMetricNamespaces),
config.aggregationBoltThreads)
.fieldsGrouping("filtering-bolt", new Fields(MetricFilteringBolt.FIELDS[0]))
.allGrouping("filtering-bolt", MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM)
.fieldsGrouping("event-bolt", EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID,
new Fields(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_FIELDS[1]))
.fieldsGrouping("event-bolt", EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID,

View File

@ -51,8 +51,11 @@ import com.hpcloud.util.Injector;
*/
public class MetricAggregationBolt extends BaseRichBolt {
private static final long serialVersionUID = 5624314196838090726L;
public static final String TICK_TUPLE_SECONDS_KEY = "maas.aggregation.tick.seconds";
public static final String TICK_TUPLE_SECONDS_KEY = "com.hpcloud.mon.aggregation.tick.seconds";
public static final String[] FIELDS = new String[] { "alarmId", "subAlarm" };
public static final String METRIC_AGGREGATION_CONTROL_STREAM = "MetricAggregationControl";
public static final String[] METRIC_AGGREGATION_CONTROL_FIELDS = new String[] { "directive" };
public static final String METRICS_BEHIND = "MetricsBehind";
final Map<MetricDefinitionAndTenantId, SubAlarmStatsRepository> subAlarmStatsRepos = new HashMap<>();
private transient Logger LOG;
@ -62,6 +65,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
private Set<String> sporadicMetricNamespaces = Collections.emptySet();
private OutputCollector collector;
private int evaluationTimeOffset;
private boolean upToDate = true;
public MetricAggregationBolt(SubAlarmDAO subAlarmDAO) {
this.subAlarmDAO = subAlarmDAO;
@ -88,6 +92,8 @@ public class MetricAggregationBolt extends BaseRichBolt {
MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(0);
Metric metric = (Metric) tuple.getValueByField("metric");
aggregateValues(metricDefinitionAndTenantId, metric);
} else if (METRIC_AGGREGATION_CONTROL_STREAM.equals(tuple.getSourceStreamId())) {
processControl(tuple.getString(0));
} else {
String eventType = tuple.getString(0);
MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(1);
@ -112,6 +118,15 @@ public class MetricAggregationBolt extends BaseRichBolt {
}
}
private void processControl(final String directive) {
if (METRICS_BEHIND.equals(directive)) {
LOG.debug("Received {}", directive);
this.upToDate = false;
}
else
LOG.error("Unknown directive '{}'", directive);
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>();
@ -158,6 +173,11 @@ public class MetricAggregationBolt extends BaseRichBolt {
* ago, then sliding the window to the current time.
*/
void evaluateAlarmsAndSlideWindows() {
if (!upToDate) {
LOG.info("Not evaluating SubAlarms because Metrics are not up to date");
upToDate = true;
return;
}
long newWindowTimestamp = System.currentTimeMillis() / 1000;
for (SubAlarmStatsRepository subAlarmStatsRepo : subAlarmStatsRepos.values())
for (SubAlarmStats subAlarmStats : subAlarmStatsRepo.get()) {

View File

@ -16,6 +16,7 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.hpcloud.mon.common.model.metric.Metric;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantIdMatcher;
import com.hpcloud.mon.domain.model.SubAlarm;
@ -60,15 +61,30 @@ import com.hpcloud.util.Injector;
*/
public class MetricFilteringBolt extends BaseRichBolt {
private static final long serialVersionUID = 1096706128973976599L;
public static final String MIN_LAG_VALUE_KEY = "com.hpcloud.mon.filtering.minLagValue";
public static final int MIN_LAG_VALUE_DEFAULT = 10;
public static final String MAX_LAG_MESSAGES_KEY = "com.hpcloud.mon.filtering.maxLagMessages";
public static final int MAX_LAG_MESSAGES_DEFAULT = 10;
public static final String LAG_MESSAGE_PERIOD_KEY = "com.hpcloud.mon.filtering.lagMessagePeriod";
public static final int LAG_MESSAGE_PERIOD_DEFAULT = 30;
public static final String[] FIELDS = new String[] { "metricDefinitionAndTenantId", "metric" };
private static final int MIN_LAG_VALUE = PropertyFinder.getIntProperty(MIN_LAG_VALUE_KEY, MIN_LAG_VALUE_DEFAULT, 0, Integer.MAX_VALUE);
private static final int MAX_LAG_MESSAGES = PropertyFinder.getIntProperty(MAX_LAG_MESSAGES_KEY, MAX_LAG_MESSAGES_DEFAULT, 0, Integer.MAX_VALUE);
private static final int LAG_MESSAGE_PERIOD = PropertyFinder.getIntProperty(LAG_MESSAGE_PERIOD_KEY, LAG_MESSAGE_PERIOD_DEFAULT, 1, 600);
private static final Map<MetricDefinitionAndTenantId, List<String>> METRIC_DEFS = new ConcurrentHashMap<>();
private static final MetricDefinitionAndTenantIdMatcher matcher = new MetricDefinitionAndTenantIdMatcher();
private static final Object SENTINAL = new Object();
public static final String[] FIELDS = new String[] { "metricDefinitionAndTenantId", "metric" };
private transient Logger LOG;
private DataSourceFactory dbConfig;
private transient MetricDefinitionDAO metricDefDAO;
private OutputCollector collector;
private long minLag = Long.MAX_VALUE;
private long lastMinLagMessageSent = 0;
private long minLagMessageSent = 0;
private boolean lagging = true;
public MetricFilteringBolt(DataSourceFactory dbConfig) {
this.dbConfig = dbConfig;
@ -81,6 +97,8 @@ public class MetricFilteringBolt extends BaseRichBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FIELDS));
declarer.declareStream(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
new Fields(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_FIELDS));
}
@Override
@ -88,15 +106,15 @@ public class MetricFilteringBolt extends BaseRichBolt {
LOG.debug("tuple: {}", tuple);
try {
if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) {
MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(0);
final MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(0);
final Metric metric = (Metric)tuple.getValue(1);
checkLag(metric);
LOG.debug("metric definition and tenant id: {}", metricDefinitionAndTenantId);
// Check for exact matches as well as inexact matches
final List<MetricDefinitionAndTenantId> matches = matcher.match(metricDefinitionAndTenantId);
for (final MetricDefinitionAndTenantId match : matches)
// Must send with the MetricDefinitionAndTenantId that it matches, not one in the Metric although
// they may be the same
collector.emit(tuple, new Values(match, tuple.getValue(1)));
collector.emit(tuple, new Values(match, metric));
} else {
String eventType = tuple.getString(0);
MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(1);
@ -121,6 +139,30 @@ public class MetricFilteringBolt extends BaseRichBolt {
}
}
private void checkLag(Metric metric) {
final long now = getCurrentSeconds();
final long lag = now - metric.timestamp;
if (lag < minLag)
minLag = lag;
if (lagging)
if (minLag <= MIN_LAG_VALUE) {
lagging = false;
LOG.info("Metrics no longer lagging, minLag = {}", minLag);
}
else if (minLagMessageSent >= MAX_LAG_MESSAGES) {
LOG.info("Waited for {} seconds for Metrics to catch up. Giving up. minLag = {}",
MAX_LAG_MESSAGES * LAG_MESSAGE_PERIOD, minLag);
lagging = false;
}
else if ((now - lastMinLagMessageSent) >= LAG_MESSAGE_PERIOD) {
LOG.info("Sending {} message, minLag = {}", MetricAggregationBolt.METRICS_BEHIND, minLag);
collector.emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
new Values(MetricAggregationBolt.METRICS_BEHIND));
lastMinLagMessageSent = now;
minLagMessageSent++;
}
}
private void removeSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String subAlarmId) {
synchronized(SENTINAL) {
final List<String> subAlarmIds = METRIC_DEFS.get(metricDefinitionAndTenantId);
@ -158,6 +200,15 @@ public class MetricFilteringBolt extends BaseRichBolt {
}
}
}
// Not really when it was sent, but want to wait at least LAG_MESSAGE_PERIOD before sending a message
lastMinLagMessageSent = getCurrentSeconds();
}
/**
* Allow override of current time for testing.
*/
protected long getCurrentSeconds() {
return System.currentTimeMillis() / 1000;
}
private void addMetricDef(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String subAlarmId) {

View File

@ -0,0 +1,34 @@
package com.hpcloud.mon.infrastructure.thresholding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PropertyFinder {
private static final Logger LOG = LoggerFactory.getLogger(PropertyFinder.class);
private PropertyFinder()
{
}
public static int getIntProperty(final String name,
final int defaultValue,
final int minValue,
final int maxValue) {
final String valueString = System.getProperty(name);
if ((valueString != null) && !valueString.isEmpty()) {
try {
final int newValue = Integer.parseInt(valueString);
if ((newValue >= minValue) && (newValue <= maxValue)) {
return newValue;
}
LOG.warn("Invalid value {} for property '{}' must be >= {} and <= {}, using default value of {}",
valueString, name, minValue, maxValue, defaultValue);
}
catch (NumberFormatException nfe) {
LOG.warn("Not an integer value '{}' for property '{}', using default value of {}", valueString,
name, defaultValue);
}
}
return defaultValue;
}
}

View File

@ -3,6 +3,7 @@ package com.hpcloud.mon.infrastructure.thresholding;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.reset;
@ -126,6 +127,7 @@ public class MetricAggregationBoltTest {
bolt.execute(createMetricTuple(metricDef2, null));
// Send metrics for subAlarm1
long t1 = System.currentTimeMillis() / 1000;
bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1, 100)));
@ -134,6 +136,7 @@ public class MetricAggregationBoltTest {
final Tuple tickTuple = createTickTuple();
bolt.execute(tickTuple);
verify(collector, times(1)).ack(tickTuple);
assertEquals(subAlarm1.getState(), AlarmState.OK);
assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED);
@ -147,6 +150,7 @@ public class MetricAggregationBoltTest {
bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1, 99)));
bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, System.currentTimeMillis() / 1000, 94)));
bolt.execute(tickTuple);
verify(collector, times(1)).ack(tickTuple);
assertEquals(subAlarm1.getState(), AlarmState.ALARM);
assertEquals(subAlarm2.getState(), AlarmState.ALARM);
@ -174,8 +178,19 @@ public class MetricAggregationBoltTest {
subAlarm2.setState(AlarmState.UNDETERMINED);
bolt.execute(createMetricTuple(metricDef2, null));
final MkTupleParam tupleParam = new MkTupleParam();
tupleParam.setStream(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM);
final Tuple lagTuple = Testing.testTuple(Arrays.asList(MetricAggregationBolt.METRICS_BEHIND), tupleParam);
bolt.execute(lagTuple);
verify(collector, times(1)).ack(lagTuple);
final Tuple tickTuple = createTickTuple();
bolt.execute(tickTuple);
verify(collector, times(1)).ack(tickTuple);
verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
bolt.execute(tickTuple);
verify(collector, times(2)).ack(tickTuple);
verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
}

View File

@ -66,11 +66,11 @@ public class MetricFilteringBoltTest {
return result;
}
private MetricFilteringBolt createBolt(List<SubAlarmMetricDefinition> initialMetricDefinitions,
private MockMetricFilteringBolt createBolt(List<SubAlarmMetricDefinition> initialMetricDefinitions,
final OutputCollector collector, boolean willEmit) {
final MetricDefinitionDAO dao = mock(MetricDefinitionDAO.class);
when(dao.findForAlarms()).thenReturn(initialMetricDefinitions);
MetricFilteringBolt bolt = new MetricFilteringBolt(dao);
MockMetricFilteringBolt bolt = new MockMetricFilteringBolt(dao);
final Map<String, String> config = new HashMap<>();
final TopologyContext context = mock(TopologyContext.class);
@ -85,6 +85,71 @@ public class MetricFilteringBoltTest {
return bolt;
}
public void testLagging() {
final OutputCollector collector = mock(OutputCollector.class);
final MockMetricFilteringBolt bolt = createBolt(new ArrayList<SubAlarmMetricDefinition>(0), collector, true);
final long prepareTime = bolt.getCurrentSeconds();
final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition();
final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0));
bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT);
bolt.execute(lateMetricTuple);
verify(collector, times(1)).ack(lateMetricTuple);
verify(collector, times(1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
new Values(MetricAggregationBolt.METRICS_BEHIND));
bolt.setCurrentSeconds(prepareTime + 2 * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT);
final Tuple metricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, bolt.getCurrentSeconds() - MetricFilteringBolt.MIN_LAG_VALUE_DEFAULT, 42.0));
bolt.execute(metricTuple);
verify(collector, times(1)).ack(metricTuple);
verify(collector, times(1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
new Values(MetricAggregationBolt.METRICS_BEHIND));
}
public void testLaggingTooLong() {
final OutputCollector collector = mock(OutputCollector.class);
final MockMetricFilteringBolt bolt = createBolt(new ArrayList<SubAlarmMetricDefinition>(0), collector, true);
long prepareTime = bolt.getCurrentSeconds();
final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition();
// Fake sending metrics for MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT seconds
for (int i = 0; i < MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT; i++) {
final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0));
bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT);
bolt.execute(lateMetricTuple);
verify(collector, times(1)).ack(lateMetricTuple);
verify(collector, times(i + 1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
new Values(MetricAggregationBolt.METRICS_BEHIND));
prepareTime = bolt.getCurrentSeconds();
}
// One more
final Tuple metricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, bolt.getCurrentSeconds() - MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT, 42.0));
bolt.execute(metricTuple);
verify(collector, times(1)).ack(metricTuple);
// Won't be any more of these
verify(collector, times(MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
new Values(MetricAggregationBolt.METRICS_BEHIND));
}
private static class MockMetricFilteringBolt extends MetricFilteringBolt {
private static final long serialVersionUID = 1L;
private long currentSeconds = System.currentTimeMillis() / 1000;
public MockMetricFilteringBolt(MetricDefinitionDAO metricDefDAO) {
super(metricDefDAO);
}
@Override
protected long getCurrentSeconds() {
return currentSeconds;
}
public void setCurrentSeconds(final long currentSeconds) {
this.currentSeconds = currentSeconds;
}
}
public void testNoInitial() {
MetricFilteringBolt.clearMetricDefinitions();
final OutputCollector collector1 = mock(OutputCollector.class);

View File

@ -0,0 +1,46 @@
package com.hpcloud.mon.infrastructure.thresholding;
import static org.testng.Assert.assertEquals;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test
public class PropertyFinderTest {
private static String PROPERTY_NAME = "com.hpcloud.mon.infrastructure.thresholding.Prop";
@BeforeMethod
public void beforeMethod() {
System.clearProperty(PROPERTY_NAME);
}
public void shouldUseNewValue() {
final int expectedValue = 45;
System.setProperty(PROPERTY_NAME, String.valueOf(expectedValue));
assertEquals(expectedValue, PropertyFinder.getIntProperty(PROPERTY_NAME, 30, 0, Integer.MAX_VALUE));
}
public void shouldUseDefaultValueBecausePropertyNotSet() {
final int defaultValue = 45;
assertEquals(defaultValue, PropertyFinder.getIntProperty(PROPERTY_NAME, defaultValue, 0, Integer.MAX_VALUE));
}
public void shouldUseDefaultValueBecausePropertyNotANumber() {
final int defaultValue = 45;
System.setProperty(PROPERTY_NAME, "AAA");
assertEquals(defaultValue, PropertyFinder.getIntProperty(PROPERTY_NAME, defaultValue, 0, Integer.MAX_VALUE));
}
public void shouldUseDefaultValueBecausePropertyTooSmall() {
final int defaultValue = 45;
System.setProperty(PROPERTY_NAME, "0");
assertEquals(defaultValue, PropertyFinder.getIntProperty(PROPERTY_NAME, defaultValue, 1, Integer.MAX_VALUE));
}
public void shouldUseDefaultValueBecausePropertyTooLarge() {
final int defaultValue = 45;
System.setProperty(PROPERTY_NAME, "10");
assertEquals(defaultValue, PropertyFinder.getIntProperty(PROPERTY_NAME, defaultValue, 9, 9));
}
}