Prevent premature evaluation of Sliding Window

Added ability to configure how long thresh should wait for metrics
before showing up before evaluating sliding window. Ensure the
current time is past the end of the sliding window plus the delay
before sliding

Only evaluate SubAlarm if current time is past slot end timestamp

This change depends on the monasca-common changes of
https://review.openstack.org/161941

Change-Id: Iab7cb1580253f2fc7c114cfb95c009dba6b23331
This commit is contained in:
Craig Bryant 2015-03-05 13:55:49 -07:00
parent 35d3976342
commit b4f0e5fcf6
8 changed files with 81 additions and 61 deletions

View File

@ -20,8 +20,7 @@ package monasca.thresh;
import monasca.common.configuration.KafkaProducerConfiguration;
import monasca.thresh.infrastructure.thresholding.DataSourceFactory;
import org.hibernate.validator.constraints.NotEmpty;
import java.io.Serializable;
import java.util.Set;
import javax.validation.Valid;
@ -30,7 +29,9 @@ import javax.validation.constraints.NotNull;
/**
* Thresholding configuration.
*/
public class ThresholdingConfiguration {
public class ThresholdingConfiguration implements Serializable {
private static final long serialVersionUID = 8939559160479071931L;
/** Total number of workers processes across the cluster. */
@NotNull public Integer numWorkerProcesses = 12;
/** Total number of acker threads across the cluster. */
@ -57,6 +58,8 @@ public class ThresholdingConfiguration {
@NotNull public Integer thresholdingBoltThreads = 6;
@NotNull public Integer thresholdingBoltTasks = 15;
@NotNull public Integer alarmDelay = 30;
/** Namespaces for which metrics are received sporadically. */
@NotNull public Set<String> sporadicMetricNamespaces;

View File

@ -116,21 +116,19 @@ public class TopologyModule extends AbstractModule {
// Filtering /Event -> Alarm Creation
builder
.setBolt("alarm-creation-bolt", new AlarmCreationBolt(config.database),
1)
config.alarmCreationBoltThreads)
.fieldsGrouping("filtering-bolt",
MetricFilteringBolt.NEW_METRIC_FOR_ALARM_DEFINITION_STREAM,
new Fields(AlarmCreationBolt.ALARM_CREATION_FIELDS[3]))
.allGrouping("event-bolt", EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID)
.allGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID)
.allGrouping("event-bolt", EventProcessingBolt.ALARM_DEFINITION_EVENT_STREAM_ID)
.setNumTasks(1); // This has to be a single bolt right now because there is no
// database protection for adding metrics and dimensions
.setNumTasks(config.alarmCreationBoltTasks);
// Filtering / Event / Alarm Creation -> Aggregation
builder
.setBolt("aggregation-bolt",
new MetricAggregationBolt(config.sporadicMetricNamespaces),
config.aggregationBoltThreads)
new MetricAggregationBolt(config), config.aggregationBoltThreads)
.fieldsGrouping("filtering-bolt", new Fields(MetricFilteringBolt.FIELDS[0]))
.allGrouping("filtering-bolt", MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM)
.fieldsGrouping("filtering-bolt", AlarmCreationBolt.ALARM_CREATION_STREAM,

View File

@ -72,14 +72,14 @@ public class SubAlarmStats {
*
* @return true if the alarm's state changed, else false.
*/
public boolean evaluateAndSlideWindow(long slideToTimestamp) {
public boolean evaluateAndSlideWindow(long slideToTimestamp, long alarmDelay) {
try {
return evaluate();
return evaluate(slideToTimestamp, alarmDelay);
} catch (Exception e) {
logger.error("Failed to evaluate {}", this, e);
return false;
} finally {
slideWindow(slideToTimestamp);
slideWindow(slideToTimestamp, alarmDelay);
}
}
@ -89,8 +89,8 @@ public class SubAlarmStats {
*
* @param slideToTimestamp
*/
public void slideWindow(long slideToTimestamp) {
stats.slideViewTo(slideToTimestamp);
public void slideWindow(long slideToTimestamp, long alarmDelay) {
stats.slideViewTo(slideToTimestamp, alarmDelay);
}
/**
@ -116,9 +116,13 @@ public class SubAlarmStats {
}
/**
* @throws IllegalStateException if the {@code timestamp} is outside of the {@link #stats} window
* @param now Current time
* @param alarmDelay How long to give metrics a chance to arrive
*/
boolean evaluate() {
boolean evaluate(final long now, long alarmDelay) {
if (!stats.shouldEvaluate(now, alarmDelay)) {
return false;
}
double[] values = stats.getViewValues();
boolean thresholdExceeded = false;
boolean hasEmptyWindows = false;

View File

@ -30,6 +30,7 @@ import monasca.common.model.metric.Metric;
import monasca.common.streaming.storm.Logging;
import monasca.common.streaming.storm.Streams;
import monasca.common.streaming.storm.Tuples;
import monasca.thresh.ThresholdingConfiguration;
import monasca.thresh.domain.model.MetricDefinitionAndTenantId;
import monasca.thresh.domain.model.SubAlarm;
import monasca.thresh.domain.model.SubAlarmStats;
@ -71,6 +72,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
public static final String[] METRIC_AGGREGATION_CONTROL_FIELDS = new String[] {"directive"};
public static final String METRICS_BEHIND = "MetricsBehind";
private final ThresholdingConfiguration config;
final Map<MetricDefinitionAndTenantId, SubAlarmStatsRepository> metricDefToSubAlarmStatsRepos =
new HashMap<>();
private final Set<SubAlarmStats> subAlarmStatsSet = new HashSet<>();
@ -82,11 +84,8 @@ public class MetricAggregationBolt extends BaseRichBolt {
private OutputCollector collector;
private boolean upToDate = true;
public MetricAggregationBolt() {
}
public MetricAggregationBolt(Set<String> sporadicMetricNamespaces) {
this.sporadicMetricNamespaces = sporadicMetricNamespaces;
public MetricAggregationBolt(ThresholdingConfiguration config) {
this.config = config;
}
@Override
@ -201,13 +200,13 @@ public class MetricAggregationBolt extends BaseRichBolt {
for (SubAlarmStats subAlarmStats : subAlarmStatsSet) {
if (upToDate) {
logger.debug("Evaluating {}", subAlarmStats);
if (subAlarmStats.evaluateAndSlideWindow(newWindowTimestamp)) {
if (subAlarmStats.evaluateAndSlideWindow(newWindowTimestamp, config.alarmDelay)) {
logger.debug("Alarm state changed for {}", subAlarmStats);
collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), subAlarmStats
.getSubAlarm()));
}
} else {
subAlarmStats.slideWindow(newWindowTimestamp);
subAlarmStats.slideWindow(newWindowTimestamp, config.alarmDelay);
}
}
if (!upToDate) {

View File

@ -120,6 +120,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase {
// Config
ThresholdingConfiguration threshConfig = new ThresholdingConfiguration();
threshConfig.alarmDelay = 1;
threshConfig.sporadicMetricNamespaces = new HashSet<String>();
Serialization.registerTarget(KafkaProducerConfiguration.class);
@ -135,7 +136,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase {
.registerModules(new TopologyModule(threshConfig, stormConfig, metricSpout, eventSpout));
Injector.registerModules(new ProducerModule(alarmEventForwarder));
// Evaluate alarm stats every 1 seconds
// Evaluate alarm stats every 5 seconds
System.setProperty(MetricAggregationBolt.TICK_TUPLE_SECONDS_KEY, "5");
startTopology();
@ -205,7 +206,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase {
Alarm alarm = null;
Stages stage = Stages.INITIAL_WAIT;
int finishAt = 0;
for (int i = 1; i < 100 && stage != Stages.FINISHED; i++) {
for (int i = 1; i < 600 && stage != Stages.FINISHED; i++) {
switch (stage) {
case INITIAL_WAIT:
if (i == 5) {

View File

@ -125,6 +125,7 @@ public class ThresholdingEngineTest extends TopologyTestCase {
// Config
ThresholdingConfiguration threshConfig = new ThresholdingConfiguration();
threshConfig.alarmDelay = 1;
threshConfig.sporadicMetricNamespaces = new HashSet<String>();
Serialization.registerTarget(KafkaProducerConfiguration.class);

View File

@ -46,31 +46,31 @@ public class SubAlarmStatsTest {
public void shouldBeOkIfAnySlotsInViewAreBelowThreshold() {
subAlarmStats.getStats().addValue(5, 1);
assertFalse(subAlarmStats.evaluateAndSlideWindow(61));
assertFalse(subAlarmStats.evaluateAndSlideWindow(62, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
subAlarmStats.getStats().addValue(1, 62);
assertTrue(subAlarmStats.evaluateAndSlideWindow(121));
assertTrue(subAlarmStats.evaluateAndSlideWindow(122, 1));
// This went to OK because at least one period is under the threshold
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
subAlarmStats.getStats().addValue(5, 123);
assertFalse(subAlarmStats.evaluateAndSlideWindow(181));
assertFalse(subAlarmStats.evaluateAndSlideWindow(182, 1));
// Still one under the threshold
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
}
public void shouldBeAlarmedIfAllSlotsInViewExceedThreshold() {
subAlarmStats.getStats().addValue(5, 1);
assertFalse(subAlarmStats.evaluateAndSlideWindow(61));
assertFalse(subAlarmStats.evaluateAndSlideWindow(62, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
subAlarmStats.getStats().addValue(5, 62);
assertFalse(subAlarmStats.evaluateAndSlideWindow(121));
assertFalse(subAlarmStats.evaluateAndSlideWindow(122, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
subAlarmStats.getStats().addValue(5, 123);
assertTrue(subAlarmStats.evaluateAndSlideWindow(181));
assertTrue(subAlarmStats.evaluateAndSlideWindow(182, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
}
@ -81,29 +81,29 @@ public class SubAlarmStatsTest {
long initialTime = 11;
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
// Add value and trigger OK
subAlarmStats.getStats().addValue(1, initialTime - 1);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Slide in some values that exceed the threshold
subAlarmStats.getStats().addValue(5, initialTime - 1);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
subAlarmStats.getStats().addValue(5, initialTime - 1);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
subAlarmStats.getStats().addValue(5, initialTime - 1);
// Trigger ALARM
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
// Add value and trigger OK
subAlarmStats.getStats().addValue(1, initialTime - 1);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Must slide 9 times total from the last added value to trigger UNDETERMINED. This is
@ -111,9 +111,9 @@ public class SubAlarmStatsTest {
// slides to move the value outside of the window and 6 more to exceed the observation
// threshold.
for (int i = 0; i < 7; i++) {
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
}
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
subAlarmStats.getStats().addValue(5, initialTime - 1);
}
@ -123,16 +123,16 @@ public class SubAlarmStatsTest {
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
subAlarmStats.getStats().addValue(5, initialTime - 1);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
subAlarmStats.getStats().addValue(5, initialTime - 1);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
subAlarmStats.getStats().addValue(5, initialTime - 1);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60));
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
}
@ -156,13 +156,11 @@ public class SubAlarmStatsTest {
for (int i = 0; i < 360; i++) {
t1++;
stats.getStats().addValue(1.0, t1);
if ((t1 % 60) == 0) {
stats.evaluateAndSlideWindow(t1);
if (i <= 60) {
// First check will show it is OK. You could argue that this is incorrect
// as we have not waited for the whole period so we can't really evaluate it.
// That is true for sum and count
assertEquals(stats.getSubAlarm().getState(), AlarmState.OK);
if ((t1 % 60) == 2) {
stats.evaluateAndSlideWindow(t1, 1);
if (i <= subExpr.getAlarmSubExpression().getPeriod()) {
// Haven't waited long enough to evaluate
assertEquals(stats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
} else {
assertEquals(stats.getSubAlarm().getState(), AlarmState.ALARM);
}

View File

@ -42,6 +42,7 @@ import backtype.storm.testing.MkTupleParam;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import monasca.thresh.ThresholdingConfiguration;
import monasca.thresh.domain.model.MetricDefinitionAndTenantId;
import monasca.thresh.domain.model.SubAlarm;
import monasca.thresh.domain.model.SubAlarmStats;
@ -98,7 +99,9 @@ public class MetricAggregationBoltTest {
subAlarms.add(subAlarm2);
subAlarms.add(subAlarm3);
bolt = new MockMetricAggregationBolt();
final ThresholdingConfiguration config = new ThresholdingConfiguration();
config.alarmDelay = 1;
bolt = new MockMetricAggregationBolt(config);
context = mock(TopologyContext.class);
collector = mock(OutputCollector.class);
bolt.prepare(null, context, collector);
@ -136,16 +139,19 @@ public class MetricAggregationBoltTest {
// Ensure subAlarm2 and subAlarm3 map to the same Metric Definition
assertEquals(metricDef3, metricDef2);
long t1 = 170;
bolt.setCurrentTime(t1);
sendSubAlarmCreated(metricDef1, subAlarm1);
sendSubAlarmCreated(metricDef2, subAlarm2);
sendSubAlarmCreated(metricDef3, subAlarm3);
// Send metrics for subAlarm1
long t1 = System.currentTimeMillis() / 1000;
bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1, 100)));
bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1 -= 60, 95)));
bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1 -= 60, 88)));
bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1 - 60, 95)));
bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1 - 120, 88)));
t1 += 20;
bolt.setCurrentTime(t1);
final Tuple tickTuple = createTickTuple();
bolt.execute(tickTuple);
verify(collector, times(1)).ack(tickTuple);
@ -162,8 +168,10 @@ public class MetricAggregationBoltTest {
// Drive subAlarm1 to ALARM
bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1, 99)));
// Drive subAlarm2 to ALARM and subAlarm3 to OK since they use the same MetricDefinition
bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2,
System.currentTimeMillis() / 1000, 94)));
t1 += 10;
bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 94)));
t1 += 50;
bolt.setCurrentTime(t1);
bolt.execute(tickTuple);
verify(collector, times(1)).ack(tickTuple);
@ -237,9 +245,9 @@ public class MetricAggregationBoltTest {
}
public void shouldSendUndeterminedIfStateChanges() {
sendSubAlarmCreated(metricDef2, subAlarm2);
long t1 = System.currentTimeMillis() / 1000;
long t1 = 50;
bolt.setCurrentTime(t1);
sendSubAlarmCreated(metricDef2, subAlarm2);
bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 1.0)));
t1 += 1;
bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 1.0)));
@ -264,6 +272,8 @@ public class MetricAggregationBoltTest {
}
public void shouldSendUndeterminedOnStartup() {
long t1 = 14;
bolt.setCurrentTime(t1);
sendSubAlarmCreated(metricDef2, subAlarm2);
final MkTupleParam tupleParam = new MkTupleParam();
@ -274,14 +284,20 @@ public class MetricAggregationBoltTest {
verify(collector, times(1)).ack(lagTuple);
final Tuple tickTuple = createTickTuple();
t1 += 60;
bolt.setCurrentTime(t1);
bolt.execute(tickTuple);
verify(collector, times(1)).ack(tickTuple);
verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
t1 += 60;
bolt.setCurrentTime(t1);
bolt.execute(tickTuple);
verify(collector, times(2)).ack(tickTuple);
verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
t1 += 60;
bolt.setCurrentTime(t1);
bolt.execute(tickTuple);
verify(collector, times(3)).ack(tickTuple);
assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED);
@ -419,8 +435,8 @@ public class MetricAggregationBoltTest {
private long currentTime;
public MockMetricAggregationBolt() {
super();
public MockMetricAggregationBolt(ThresholdingConfiguration config) {
super(config);
}
@Override