diff --git a/src/main/java/com/hpcloud/mon/domain/model/SubAlarmStats.java b/src/main/java/com/hpcloud/mon/domain/model/SubAlarmStats.java index b6b411f..40f0dbe 100644 --- a/src/main/java/com/hpcloud/mon/domain/model/SubAlarmStats.java +++ b/src/main/java/com/hpcloud/mon/domain/model/SubAlarmStats.java @@ -102,7 +102,6 @@ public class SubAlarmStats { */ boolean evaluate() { double[] values = stats.getViewValues(); - AlarmState initialState = subAlarm.getState(); boolean thresholdExceeded = false; boolean hasEmptyWindows = false; for (double value : values) { @@ -115,7 +114,7 @@ public class SubAlarmStats { if (!subAlarm.getExpression() .getOperator() .evaluate(value, subAlarm.getExpression().getThreshold())) { - if (AlarmState.OK.equals(initialState)) + if (!shouldSendStateChange(AlarmState.OK)) return false; setSubAlarmState(AlarmState.OK); return true; @@ -125,7 +124,7 @@ public class SubAlarmStats { } if (thresholdExceeded && !hasEmptyWindows) { - if (AlarmState.ALARM.equals(initialState)) + if (!shouldSendStateChange(AlarmState.ALARM)) return false; setSubAlarmState(AlarmState.ALARM); return true; @@ -135,7 +134,7 @@ public class SubAlarmStats { emptyWindowObservations++; if ((emptyWindowObservations >= emptyWindowObservationThreshold) && - (subAlarm.isNoState() || !AlarmState.UNDETERMINED.equals(initialState)) && + shouldSendStateChange(AlarmState.UNDETERMINED) && !subAlarm.isSporadicMetric()) { setSubAlarmState(AlarmState.UNDETERMINED); return true; @@ -144,10 +143,14 @@ public class SubAlarmStats { return false; } -private void setSubAlarmState(AlarmState newState) { + private boolean shouldSendStateChange(AlarmState newState) { + return !subAlarm.getState().equals(newState) || subAlarm.isNoState(); + } + + private void setSubAlarmState(AlarmState newState) { subAlarm.setState(newState); subAlarm.setNoState(false); -} + } /** * This MUST only be used for compatible SubAlarms, i.e. where diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventProcessingBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventProcessingBolt.java index 114d276..6b2f843 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventProcessingBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventProcessingBolt.java @@ -69,6 +69,7 @@ public class EventProcessingBolt extends BaseRichBolt { public static final String CREATED = "created"; public static final String DELETED = "deleted"; public static final String UPDATED = "updated"; + public static final String RESEND = "resend"; private transient Logger LOG; private OutputCollector collector; @@ -120,6 +121,10 @@ public class EventProcessingBolt extends BaseRichBolt { sendSubAlarm(UPDATED, alarmId, subAlarmId, tenantId, alarmSubExpression); } + private void sendResendSubAlarm(String alarmId, String subAlarmId, String tenantId, AlarmSubExpression alarmSubExpression) { + sendSubAlarm(RESEND, alarmId, subAlarmId, tenantId, alarmSubExpression); + } + private void sendSubAlarm(String eventType, String alarmId, String subAlarmId, String tenantId, AlarmSubExpression alarmSubExpression) { MetricDefinition metricDef = alarmSubExpression.getMetricDefinition(); @@ -141,6 +146,13 @@ public class EventProcessingBolt extends BaseRichBolt { } void handle(AlarmUpdatedEvent event) { + if ((!event.oldAlarmState.equals(event.alarmState) || + !event.oldAlarmSubExpressions.isEmpty()) && event.changedSubExpressions.isEmpty() && + event.newAlarmSubExpressions.isEmpty()) { + for (Map.Entry entry : event.unchangedSubExpressions.entrySet()) { + sendResendSubAlarm(event.alarmId, entry.getKey(), event.tenantId, entry.getValue()); + } + } for (Map.Entry entry : event.oldAlarmSubExpressions.entrySet()) { sendDeletedSubAlarm(entry.getKey(), event.tenantId, entry.getValue().getMetricDefinition()); } diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java index d22591d..3469216 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java @@ -121,6 +121,8 @@ public class MetricAggregationBolt extends BaseRichBolt { handleAlarmCreated(metricDefinitionAndTenantId, subAlarm); else if (EventProcessingBolt.UPDATED.equals(eventType)) handleAlarmUpdated(metricDefinitionAndTenantId, subAlarm); + else if (EventProcessingBolt.RESEND.equals(eventType)) + handleAlarmResend(metricDefinitionAndTenantId, subAlarm); } } } @@ -246,6 +248,34 @@ public class MetricAggregationBolt extends BaseRichBolt { addSubAlarm(metricDefinitionAndTenantId, subAlarm); } + void handleAlarmResend(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm resendSubAlarm) { + final RepoAndStats repoAndStats = findExistingSubAlarmStats(metricDefinitionAndTenantId, resendSubAlarm); + if (repoAndStats == null) + return; + + final SubAlarmStats oldSubAlarmStats = repoAndStats.subAlarmStats; + final SubAlarm oldSubAlarm = oldSubAlarmStats.getSubAlarm(); + resendSubAlarm.setState(oldSubAlarm.getState()); + resendSubAlarm.setNoState(true); // Have it send its state again so the Alarm can be evaluated + LOG.debug("Forcing SubAlarm {} to send state at next evaluation", oldSubAlarm); + oldSubAlarmStats.updateSubAlarm(resendSubAlarm); + } + + private RepoAndStats findExistingSubAlarmStats(MetricDefinitionAndTenantId metricDefinitionAndTenantId, + SubAlarm oldSubAlarm) { + final SubAlarmStatsRepository oldSubAlarmStatsRepo = subAlarmStatsRepos.get(metricDefinitionAndTenantId); + if (oldSubAlarmStatsRepo == null) { + LOG.error("Did not find SubAlarmStatsRepository for MetricDefinition {}", metricDefinitionAndTenantId); + return null; + } + final SubAlarmStats oldSubAlarmStats = oldSubAlarmStatsRepo.get(oldSubAlarm.getId()); + if (oldSubAlarmStats == null) { + LOG.error("Did not find existing SubAlarm {} in SubAlarmStatsRepository", oldSubAlarm); + return null; + } + return new RepoAndStats(oldSubAlarmStatsRepo, oldSubAlarmStats); + } + private void addSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) { SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinitionAndTenantId); if (subAlarmStatsRepo == null) @@ -262,33 +292,26 @@ public class MetricAggregationBolt extends BaseRichBolt { */ void handleAlarmUpdated(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) { LOG.debug("Received AlarmUpdatedEvent for {}", subAlarm); - // Clear the old SubAlarm, but save the SubAlarm state - final SubAlarmStatsRepository oldSubAlarmStatsRepo = subAlarmStatsRepos.get(metricDefinitionAndTenantId); - if (oldSubAlarmStatsRepo == null) { - LOG.error("Did not find SubAlarmStatsRepository for MetricDefinition {}", metricDefinitionAndTenantId); - } - else { - final SubAlarmStats oldSubAlarmStats = oldSubAlarmStatsRepo.get(subAlarm.getId()); - if (oldSubAlarmStats == null) - LOG.error("Did not find existing SubAlarm {} in SubAlarmStatsRepository", subAlarm); - else { - final SubAlarm oldSubAlarm = oldSubAlarmStats.getSubAlarm(); - subAlarm.setState(oldSubAlarm.getState()); - subAlarm.setNoState(true); // Doesn't hurt to send too many state changes, just too few - if (oldSubAlarm.isCompatible(subAlarm)) { - LOG.debug("Changing SubAlarm {} to SubAlarm {} and keeping measurements", oldSubAlarm, subAlarm); - oldSubAlarmStats.updateSubAlarm(subAlarm); - return; - } - // Have to completely change the SubAlarmStats - LOG.debug("Changing SubAlarm {} to SubAlarm {} and flushing measurements", oldSubAlarm, subAlarm); - oldSubAlarmStatsRepo.remove(subAlarm.getId()); + final RepoAndStats repoAndStats = findExistingSubAlarmStats(metricDefinitionAndTenantId, subAlarm); + if (repoAndStats != null) { + // Clear the old SubAlarm, but save the SubAlarm state + final SubAlarmStats oldSubAlarmStats = repoAndStats.subAlarmStats; + final SubAlarm oldSubAlarm = oldSubAlarmStats.getSubAlarm(); + subAlarm.setState(oldSubAlarm.getState()); + subAlarm.setNoState(true); // Doesn't hurt to send too many state changes, just too few + if (oldSubAlarm.isCompatible(subAlarm)) { + LOG.debug("Changing SubAlarm {} to SubAlarm {} and keeping measurements", oldSubAlarm, subAlarm); + oldSubAlarmStats.updateSubAlarm(subAlarm); + return; } + // Have to completely change the SubAlarmStats + LOG.debug("Changing SubAlarm {} to SubAlarm {} and flushing measurements", oldSubAlarm, subAlarm); + repoAndStats.subAlarmStatsRepository.remove(subAlarm.getId()); } addSubAlarm(metricDefinitionAndTenantId, subAlarm); } -/** + /** * Removes the sub-alarm for the {@code subAlarmId} from the subAlarmStatsRepo for the * {@code metricDefinitionAndTenantId}. */ @@ -301,4 +324,15 @@ public class MetricAggregationBolt extends BaseRichBolt { subAlarmStatsRepos.remove(metricDefinitionAndTenantId); } } + + private static class RepoAndStats { + public final SubAlarmStatsRepository subAlarmStatsRepository; + public final SubAlarmStats subAlarmStats; + + public RepoAndStats(SubAlarmStatsRepository subAlarmStatsRepository, + SubAlarmStats subAlarmStats) { + this.subAlarmStatsRepository = subAlarmStatsRepository; + this.subAlarmStats = subAlarmStats; + } + } } diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java index 801243f..8adf1b5 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java @@ -87,7 +87,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { private AlarmExpression expression = new AlarmExpression( "max(hpcs.compute.cpu{id=5}) >= 3 or max(hpcs.compute.mem{id=5}) >= 557"); - private AlarmState currentState = AlarmState.OK; + private AlarmState currentState = AlarmState.UNDETERMINED; private volatile int alarmsSent = 0; public ThresholdingEngineAlarmTest() { @@ -171,7 +171,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { return result; } - final AlarmState[] expectedStates = { AlarmState.ALARM, AlarmState.OK, AlarmState.ALARM }; + final AlarmState[] expectedStates = { AlarmState.ALARM, AlarmState.OK, AlarmState.ALARM, AlarmState.OK }; public void shouldThreshold() throws Exception { doAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { @@ -192,9 +192,11 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { int goodValueCount = 0; boolean firstUpdate = true; boolean secondUpdate = true; + boolean thirdUpdate = true; final Alarm initialAlarm = new Alarm(TEST_ALARM_ID, TEST_ALARM_TENANT_ID, TEST_ALARM_NAME, TEST_ALARM_DESCRIPTION, expression, subAlarms, AlarmState.UNDETERMINED, Boolean.TRUE); final int expectedAlarms = expectedStates.length; + AlarmExpression savedAlarmExpression = null; for (int i = 1; alarmsSent != expectedAlarms && i < 300; i++) { if (i == 5) { final Map exprs = createSubExpressionMap(); @@ -215,15 +217,18 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { updatedSubAlarms.add(new SubAlarm(subAlarm.getId(), initialAlarm.getId(), subAlarm.getExpression())); } - final AlarmUpdatedEvent event = EventProcessingBoltTest.createAlarmUpdatedEvent(initialAlarm, expression, updatedSubAlarms); - event.alarmState = currentState; + initialAlarm.setState(currentState); + final AlarmUpdatedEvent event = EventProcessingBoltTest.createAlarmUpdatedEvent(initialAlarm, initialAlarm.getState(), expression, + updatedSubAlarms); subAlarms = updatedSubAlarms; + initialAlarm.setSubAlarms(updatedSubAlarms); eventSpout.feed(new Values(event)); System.out.printf("Send AlarmUpdatedEvent for expression %s%n", expression.getExpression()); } else if (alarmsSent == 2 && secondUpdate) { secondUpdate = false; + savedAlarmExpression = expression; expression = new AlarmExpression("max(hpcs.compute.load{id=5}) > 551 and (" + expression.getExpression().replace("556", "554") + ")"); final List updatedSubAlarms = new ArrayList<>(); updatedSubAlarms.add(new SubAlarm(UUID.randomUUID().toString(), initialAlarm.getId(), expression.getSubExpressions().get(0))); @@ -231,9 +236,30 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { updatedSubAlarms.add(new SubAlarm(subAlarms.get(index).getId(), initialAlarm.getId(), expression.getSubExpressions().get(index+1))); } - final AlarmUpdatedEvent event = EventProcessingBoltTest.createAlarmUpdatedEvent(initialAlarm, expression, updatedSubAlarms); - event.alarmState = currentState; + initialAlarm.setState(currentState); + final AlarmUpdatedEvent event = EventProcessingBoltTest.createAlarmUpdatedEvent(initialAlarm, initialAlarm.getState(), expression, + updatedSubAlarms); subAlarms = updatedSubAlarms; + initialAlarm.setSubAlarms(updatedSubAlarms); + eventSpout.feed(new Values(event)); + + System.out.printf("Send AlarmUpdatedEvent for expression %s%n", expression.getExpression()); + } + else if (alarmsSent == 3 && thirdUpdate) { + thirdUpdate = false; + expression = savedAlarmExpression; + final List updatedSubAlarms = new ArrayList<>(); + int index = 1; + for (AlarmSubExpression subExpression : expression.getSubExpressions()) { + updatedSubAlarms.add(new SubAlarm(subAlarms.get(index).getId(), initialAlarm.getId(), subExpression)); + index++; + } + + initialAlarm.setState(currentState); + final AlarmUpdatedEvent event = EventProcessingBoltTest.createAlarmUpdatedEvent(initialAlarm, initialAlarm.getState(), expression, + updatedSubAlarms); + subAlarms = updatedSubAlarms; + initialAlarm.setSubAlarms(updatedSubAlarms); eventSpout.feed(new Values(event)); System.out.printf("Send AlarmUpdatedEvent for expression %s%n", expression.getExpression()); @@ -265,7 +291,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { } } assertEquals(alarmsSent, expectedAlarms); - assertEquals(currentState, AlarmState.ALARM); + assertEquals(currentState, expectedStates[expectedStates.length - 1]); } private Map createSubExpressionMap() { diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBoltTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBoltTest.java index 7ef8d35..c2eb235 100644 --- a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBoltTest.java +++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBoltTest.java @@ -152,7 +152,7 @@ public class AlarmThresholdingBoltTest { final AlarmState newState = AlarmState.OK; boolean newEnabled = false; final AlarmUpdatedEvent event = new AlarmUpdatedEvent(tenantId, alarmId, newName, newDescription, alarm.getAlarmExpression().getExpression(), - newState, newEnabled, empty, empty, empty); + alarm.getState(), newState, newEnabled, empty, empty, empty, empty); final Tuple updateTuple = createAlarmUpdateTuple(event); bolt.execute(updateTuple); verify(collector, times(1)).ack(updateTuple); @@ -168,6 +168,7 @@ public class AlarmThresholdingBoltTest { final Map newSubExpressions = new HashMap<>(); final Map oldSubExpressions = new HashMap<>(); final Map changedSubExpressions = new HashMap<>(); + final Map unchangedSubExpressions = new HashMap<>(); final String newExpression = subExpressions[1] + " or " + subExpressions[2].replace("max", "avg") + " or " + "sum(diskio{instance_id=123,device=4242}, 1) > 5000"; @@ -180,13 +181,14 @@ public class AlarmThresholdingBoltTest { final SubAlarm changedSubAlarm = new SubAlarm(subAlarms.get(2).getId(), alarmId, newAlarmExpression.getSubExpressions().get(1)); changedSubExpressions.put(changedSubAlarm.getId(), changedSubAlarm.getExpression()); final SubAlarm unChangedSubAlarm = new SubAlarm(subAlarms.get(1).getId(), alarmId, subAlarms.get(1).getExpression()); + unchangedSubExpressions.put(unChangedSubAlarm.getId(), unChangedSubAlarm.getExpression()); emitSubAlarmStateChange(alarmId, changedSubAlarm, AlarmState.OK); emitSubAlarmStateChange(alarmId, unChangedSubAlarm, AlarmState.OK); unChangedSubAlarm.setState(AlarmState.OK); final AlarmUpdatedEvent event = new AlarmUpdatedEvent(tenantId, alarmId, alarm.getName(), alarm.getDescription(), newExpression, - alarm.getState(), alarm.isActionsEnabled(), oldSubExpressions, changedSubExpressions, newSubExpressions); + alarm.getState(), alarm.getState(), alarm.isActionsEnabled(), oldSubExpressions, changedSubExpressions, unchangedSubExpressions, newSubExpressions); final Tuple updateTuple = createAlarmUpdateTuple(event); bolt.execute(updateTuple); verify(collector, times(1)).ack(updateTuple); diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/EventProcessingBoltTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/EventProcessingBoltTest.java index 3622fa2..1fc88e7 100644 --- a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/EventProcessingBoltTest.java +++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/EventProcessingBoltTest.java @@ -23,8 +23,11 @@ import static org.mockito.Mockito.verify; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.testng.annotations.BeforeMethod; @@ -37,6 +40,9 @@ import backtype.storm.testing.MkTupleParam; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Sets; import com.hpcloud.mon.common.event.AlarmCreatedEvent; import com.hpcloud.mon.common.event.AlarmDeletedEvent; import com.hpcloud.mon.common.event.AlarmUpdatedEvent; @@ -144,43 +150,73 @@ public class EventProcessingBoltTest { } public static AlarmUpdatedEvent createAlarmUpdatedEvent(final Alarm alarm, - final AlarmExpression updatedAlarmExpression, List updatedSubAlarms) { - final Alarm updatedAlarm = new Alarm(); - updatedAlarm.setId(alarm.getId()); - updatedAlarm.setTenantId(alarm.getTenantId()); - updatedAlarm.setName(alarm.getName()); - updatedAlarm.setExpression(updatedAlarmExpression.getExpression()); - updatedAlarm.setDescription(alarm.getDescription()); - updatedAlarm.setState(alarm.getState()); + final AlarmState newState, + final AlarmExpression updatedAlarmExpression, + List updatedSubAlarms) { + final Map oldAlarmSubExpressions = new HashMap<>(); + for (final SubAlarm subAlarm : alarm.getSubAlarms()) + oldAlarmSubExpressions.put(subAlarm.getId(), subAlarm.getExpression()); + BiMap oldExpressions = HashBiMap.create(oldAlarmSubExpressions); + Set oldSet = oldExpressions.inverse().keySet(); + Set newSet = new HashSet<>(); + for (final SubAlarm subAlarm : updatedSubAlarms) + newSet.add(subAlarm.getExpression()); - final List toDelete = new ArrayList<>(alarm.getSubAlarms()); - final Map newAlarmSubExpressions = new HashMap<>(); - final Map updatedSubExpressions = new HashMap<>(); - for (final SubAlarm newSubAlarm : updatedSubAlarms) { - final SubAlarm oldSubAlarm = alarm.getSubAlarm(newSubAlarm.getId()); - if (oldSubAlarm == null) { - newAlarmSubExpressions.put(newSubAlarm.getId(), newSubAlarm.getExpression()); - } - else { - toDelete.remove(oldSubAlarm); - if (!newSubAlarm.getExpression().equals(oldSubAlarm.getExpression())) { - updatedSubExpressions.put(newSubAlarm.getId(), newSubAlarm.getExpression()); - } + // Identify old or changed expressions + Set oldOrChangedExpressions = new HashSet<>(Sets.difference(oldSet, newSet)); + + // Identify new or changed expressions + Set newOrChangedExpressions = new HashSet<>(Sets.difference(newSet, oldSet)); + + // Find changed expressions + Map changedExpressions = new HashMap<>(); + for (Iterator oldIt = oldOrChangedExpressions.iterator(); oldIt.hasNext();) { + AlarmSubExpression oldExpr = oldIt.next(); + for (Iterator newIt = newOrChangedExpressions.iterator(); newIt.hasNext();) { + AlarmSubExpression newExpr = newIt.next(); + if (sameKeyFields(oldExpr, newExpr)) { + oldIt.remove(); + newIt.remove(); + changedExpressions.put(oldExpressions.inverse().get(oldExpr), newExpr); + break; } + } } - final Map deletedSubExpressions = new HashMap<>(toDelete.size()); - for (final SubAlarm oldSubAlarm : toDelete) { - deletedSubExpressions.put(oldSubAlarm.getId(), oldSubAlarm.getExpression()); - } - final AlarmUpdatedEvent event = new AlarmUpdatedEvent(updatedAlarm.getTenantId(), updatedAlarm.getId(), - updatedAlarm.getName(), updatedAlarm.getDescription(), updatedAlarm.getAlarmExpression().getExpression(), alarm.getState(), true, deletedSubExpressions, - updatedSubExpressions, newAlarmSubExpressions); - return event; + + BiMap unchangedExpressions = HashBiMap.create(oldExpressions); + unchangedExpressions.values().removeAll(oldOrChangedExpressions); + unchangedExpressions.keySet().removeAll(changedExpressions.keySet()); + + // Remove old sub expressions + oldExpressions.values().retainAll(oldOrChangedExpressions); + + // Create IDs for new expressions + Map newExpressions = new HashMap<>(); + for (AlarmSubExpression expression : newOrChangedExpressions) + for (final SubAlarm subAlarm : updatedSubAlarms) + if (subAlarm.getExpression().equals(expression)) + newExpressions.put(subAlarm.getId(), expression); + + final AlarmUpdatedEvent event = new AlarmUpdatedEvent(alarm.getTenantId(), alarm.getId(), + alarm.getName(), alarm.getDescription(), updatedAlarmExpression.getExpression(), newState, alarm.getState(), + true, oldExpressions, + changedExpressions, unchangedExpressions, newExpressions); + return event; + } + + /** + * Returns whether all of the fields of {@code a} and {@code b} are the same except the operator + * and threshold. + */ + private static boolean sameKeyFields(AlarmSubExpression a, AlarmSubExpression b) { + return a.getMetricDefinition().equals(b.getMetricDefinition()) + && a.getFunction().equals(b.getFunction()) && a.getPeriod() == b.getPeriod() + && a.getPeriods() == b.getPeriods(); } public void testAlarmUpdatedEvent() { final String updatedExpression = "avg(hpcs.compute.cpu{instance_id=123,device=42}, 1) > 5 " + - "and max(hpcs.compute.Mem{instance_id=123,device=42}) > 90 " + + "and max(hpcs.compute.mem{instance_id=123,device=42}) > 90 " + "and max(hpcs.compute.newLoad{instance_id=123,device=42}) > 5"; final AlarmExpression updatedAlarmExpression = new AlarmExpression(updatedExpression); @@ -190,7 +226,8 @@ public class EventProcessingBoltTest { updatedSubAlarms.add(new SubAlarm(subAlarms.get(1).getId(), alarm.getId(), updatedAlarmExpression.getSubExpressions().get(1))); updatedSubAlarms.add(new SubAlarm(UUID.randomUUID().toString(), alarm.getId(), updatedAlarmExpression.getSubExpressions().get(2))); - final AlarmUpdatedEvent event = createAlarmUpdatedEvent(alarm, updatedAlarmExpression, updatedSubAlarms); + final AlarmUpdatedEvent event = createAlarmUpdatedEvent(alarm, alarm.getState(), updatedAlarmExpression, + updatedSubAlarms); final Tuple tuple = createTuple(event); bolt.execute(tuple); diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java index f27c113..f224940 100644 --- a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java +++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java @@ -172,6 +172,43 @@ public class MetricAggregationBoltTest { verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3)); } + public void shouldSendAlarmAgain() { + long t1 = 10; + bolt.setCurrentTime(t1); + bolt.execute(createMetricTuple(metricDef2, null)); + + bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 100))); + bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1++, 95))); + bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1++, 88))); + + t1 += 60; + bolt.setCurrentTime(t1); + final Tuple tickTuple = createTickTuple(); + bolt.execute(tickTuple); + verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + assertEquals(subAlarm2.getState(), AlarmState.ALARM); + verify(collector, times(1)).ack(tickTuple); + + final MkTupleParam tupleParam = new MkTupleParam(); + tupleParam.setFields(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_FIELDS); + tupleParam.setStream(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID); + final Tuple resendTuple = Testing.testTuple(Arrays.asList(EventProcessingBolt.RESEND, + new MetricDefinitionAndTenantId(metricDef2, TENANT_ID), subAlarm2), tupleParam); + bolt.execute(resendTuple); + + bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 100))); + bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1++, 95))); + bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1++, 88))); + + t1 += 60; + bolt.setCurrentTime(t1); + bolt.execute(tickTuple); + verify(collector, times(2)).ack(tickTuple); + + assertEquals(subAlarm2.getState(), AlarmState.ALARM); + verify(collector, times(2)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + } + public void shouldSendUndeterminedIfStateChanges() { long t1 = System.currentTimeMillis() / 1000; bolt.setCurrentTime(t1); diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/EventDeserializerTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/EventDeserializerTest.java index bf09a94..2266488 100644 --- a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/EventDeserializerTest.java +++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/EventDeserializerTest.java @@ -26,6 +26,7 @@ import org.testng.annotations.Test; import com.hpcloud.mon.common.event.AlarmCreatedEvent; import com.hpcloud.mon.common.event.AlarmDeletedEvent; import com.hpcloud.mon.common.event.AlarmUpdatedEvent; +import com.hpcloud.mon.common.model.alarm.AlarmState; import com.hpcloud.util.Serialization; @Test @@ -46,7 +47,8 @@ public class EventDeserializerTest { } public void shouldDeserializeAlarmUpdatedEvent() { - roundTrip(new AlarmUpdatedEvent(TENANT_ID, ALARM_ID, ALARM_NAME, ALARM_DESCRIPTION, ALARM_EXPRESSION, null, false, null, null, null)); + roundTrip(new AlarmUpdatedEvent(TENANT_ID, ALARM_ID, ALARM_NAME, ALARM_DESCRIPTION, ALARM_EXPRESSION, + AlarmState.OK, AlarmState.OK, false, null, null, null, null)); } private void roundTrip(Object event) {