Modified to use AlarmStateTransitionedEvent in mon.common and added alarm description.
This commit is contained in:
6
pom.xml
6
pom.xml
@@ -20,6 +20,7 @@
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
<artifactNamedVersion>${project.name}-${project.version}-${timestamp}-${buildNumber}
|
||||
</artifactNamedVersion>
|
||||
<mon.common.version>1.0.0.27</mon.common.version>
|
||||
</properties>
|
||||
|
||||
<!--Needed for buildnumber-maven-plugin-->
|
||||
@@ -30,6 +31,11 @@
|
||||
</scm>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.hpcloud</groupId>
|
||||
<artifactId>mon-model</artifactId>
|
||||
<version>${mon.common.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.10</artifactId>
|
||||
|
||||
@@ -7,8 +7,8 @@ import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.persister.consumer.*;
|
||||
import com.hpcloud.mon.persister.dbi.DBIProvider;
|
||||
import com.hpcloud.mon.persister.disruptor.*;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionMessageEventHandler;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionMessageEventHandlerFactory;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEventHandler;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEventHandlerFactory;
|
||||
import com.hpcloud.mon.persister.disruptor.event.MetricMessageEventHandler;
|
||||
import com.hpcloud.mon.persister.disruptor.event.MetricMessageEventHandlerFactory;
|
||||
import com.hpcloud.mon.persister.repository.RepositoryCommitHeartbeat;
|
||||
@@ -37,8 +37,8 @@ public class MonPersisterModule extends AbstractModule {
|
||||
.build(MetricMessageEventHandlerFactory.class));
|
||||
|
||||
install(new FactoryModuleBuilder()
|
||||
.implement(AlarmStateTransitionMessageEventHandler.class, AlarmStateTransitionMessageEventHandler.class)
|
||||
.build(AlarmStateTransitionMessageEventHandlerFactory.class));
|
||||
.implement(AlarmStateTransitionedMessageEventHandler.class, AlarmStateTransitionedMessageEventHandler.class)
|
||||
.build(AlarmStateTransitionedMessageEventHandlerFactory.class));
|
||||
|
||||
install(new FactoryModuleBuilder()
|
||||
.implement(KafkaMetricsConsumerRunnableBasic.class, KafkaMetricsConsumerRunnableBasic.class)
|
||||
|
||||
@@ -5,13 +5,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionMessageEvent;
|
||||
import com.hpcloud.mon.persister.message.AlarmStateTransitionMessage;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEvent;
|
||||
import com.lmax.disruptor.EventTranslator;
|
||||
import kafka.consumer.ConsumerIterator;
|
||||
import kafka.consumer.KafkaStream;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
|
||||
|
||||
public class KafkaAlarmStateTransitionConsumerRunnableBasic implements Runnable {
|
||||
|
||||
@@ -45,13 +45,13 @@ public class KafkaAlarmStateTransitionConsumerRunnableBasic implements Runnable
|
||||
logger.debug("Thread " + threadNumber + ": " + s);
|
||||
|
||||
try {
|
||||
final AlarmStateTransitionMessage message = objectMapper.readValue(s, AlarmStateTransitionMessage.class);
|
||||
final AlarmStateTransitionedEvent message = objectMapper.readValue(s, AlarmStateTransitionedEvent.class);
|
||||
|
||||
logger.debug(message.toString());
|
||||
|
||||
disruptor.publishEvent(new EventTranslator<AlarmStateTransitionMessageEvent>() {
|
||||
disruptor.publishEvent(new EventTranslator<AlarmStateTransitionedMessageEvent>() {
|
||||
@Override
|
||||
public void translateTo(AlarmStateTransitionMessageEvent event, long sequence) {
|
||||
public void translateTo(AlarmStateTransitionedMessageEvent event, long sequence) {
|
||||
event.setMessage(message);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -3,8 +3,8 @@ package com.hpcloud.mon.persister.disruptor;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Provider;
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionMessageEventFactory;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionMessageEventHandlerFactory;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEventFactory;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEventHandlerFactory;
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import com.lmax.disruptor.ExceptionHandler;
|
||||
import org.slf4j.Logger;
|
||||
@@ -18,13 +18,13 @@ public class AlarmHistoryDisruptorProvider implements Provider<AlarmStateHistory
|
||||
private static final Logger logger = LoggerFactory.getLogger(AlarmHistoryDisruptorProvider.class);
|
||||
|
||||
private final MonPersisterConfiguration configuration;
|
||||
private final AlarmStateTransitionMessageEventHandlerFactory eventHandlerFactory;
|
||||
private final AlarmStateTransitionedMessageEventHandlerFactory eventHandlerFactory;
|
||||
private final ExceptionHandler exceptionHandler;
|
||||
private final AlarmStateHistoryDisruptor instance;
|
||||
|
||||
@Inject
|
||||
public AlarmHistoryDisruptorProvider(MonPersisterConfiguration configuration,
|
||||
AlarmStateTransitionMessageEventHandlerFactory eventHandlerFactory,
|
||||
AlarmStateTransitionedMessageEventHandlerFactory eventHandlerFactory,
|
||||
ExceptionHandler exceptionHandler) {
|
||||
this.configuration = configuration;
|
||||
this.eventHandlerFactory = eventHandlerFactory;
|
||||
@@ -37,7 +37,7 @@ public class AlarmHistoryDisruptorProvider implements Provider<AlarmStateHistory
|
||||
logger.debug("Creating disruptor...");
|
||||
|
||||
Executor executor = Executors.newCachedThreadPool();
|
||||
AlarmStateTransitionMessageEventFactory eventFactory = new AlarmStateTransitionMessageEventFactory();
|
||||
AlarmStateTransitionedMessageEventFactory eventFactory = new AlarmStateTransitionedMessageEventFactory();
|
||||
|
||||
int bufferSize = configuration.getDisruptorConfiguration().getBufferSize();
|
||||
logger.debug("Buffer size for instance of disruptor [" + bufferSize + "]");
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.hpcloud.mon.persister.disruptor;
|
||||
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionMessageEvent;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEvent;
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
import com.lmax.disruptor.WaitStrategy;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
@@ -8,7 +8,7 @@ import com.lmax.disruptor.dsl.ProducerType;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
public class AlarmStateHistoryDisruptor extends Disruptor<AlarmStateTransitionMessageEvent> {
|
||||
public class AlarmStateHistoryDisruptor extends Disruptor<AlarmStateTransitionedMessageEvent> {
|
||||
public AlarmStateHistoryDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) {
|
||||
super(eventFactory, ringBufferSize, executor);
|
||||
}
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
package com.hpcloud.mon.persister.disruptor.event;
|
||||
|
||||
import com.hpcloud.mon.persister.message.AlarmStateTransitionMessage;
|
||||
|
||||
public class AlarmStateTransitionMessageEvent
|
||||
{
|
||||
private AlarmStateTransitionMessage message;
|
||||
|
||||
public AlarmStateTransitionMessage getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
public void setMessage(AlarmStateTransitionMessage message) {
|
||||
this.message = message;
|
||||
}
|
||||
}
|
||||
@@ -1,13 +0,0 @@
|
||||
package com.hpcloud.mon.persister.disruptor.event;
|
||||
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
|
||||
public class AlarmStateTransitionMessageEventFactory implements EventFactory<AlarmStateTransitionMessageEvent> {
|
||||
|
||||
public static final AlarmStateTransitionMessageEventFactory INSTANCE = new AlarmStateTransitionMessageEventFactory();
|
||||
|
||||
@Override
|
||||
public AlarmStateTransitionMessageEvent newInstance() {
|
||||
return new AlarmStateTransitionMessageEvent();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.hpcloud.mon.persister.disruptor.event;
|
||||
|
||||
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
|
||||
|
||||
public class AlarmStateTransitionedMessageEvent
|
||||
{
|
||||
private AlarmStateTransitionedEvent message;
|
||||
|
||||
public AlarmStateTransitionedEvent getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
public void setMessage(AlarmStateTransitionedEvent message) {
|
||||
this.message = message;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.hpcloud.mon.persister.disruptor.event;
|
||||
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
|
||||
public class AlarmStateTransitionedMessageEventFactory implements EventFactory<AlarmStateTransitionedMessageEvent> {
|
||||
|
||||
public static final AlarmStateTransitionedMessageEventFactory INSTANCE = new AlarmStateTransitionedMessageEventFactory();
|
||||
|
||||
@Override
|
||||
public AlarmStateTransitionedMessageEvent newInstance() {
|
||||
return new AlarmStateTransitionedMessageEvent();
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,7 @@ package com.hpcloud.mon.persister.disruptor.event;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.persister.message.AlarmStateTransitionMessage;
|
||||
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
|
||||
import com.hpcloud.mon.persister.repository.VerticaAlarmStateHistoryRepository;
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import com.yammer.metrics.Metrics;
|
||||
@@ -16,9 +16,9 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class AlarmStateTransitionMessageEventHandler implements EventHandler<AlarmStateTransitionMessageEvent> {
|
||||
public class AlarmStateTransitionedMessageEventHandler implements EventHandler<AlarmStateTransitionedMessageEvent> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(AlarmStateTransitionMessageEventHandler.class);
|
||||
private static final Logger logger = LoggerFactory.getLogger(AlarmStateTransitionedMessageEventHandler.class);
|
||||
private final int ordinal;
|
||||
private final int numProcessors;
|
||||
private final int batchSize;
|
||||
@@ -36,11 +36,11 @@ public class AlarmStateTransitionMessageEventHandler implements EventHandler<Ala
|
||||
private final Timer commitTimer = Metrics.newTimer(this.getClass(), "total-commit-and-flush-timer");
|
||||
|
||||
@Inject
|
||||
public AlarmStateTransitionMessageEventHandler(VerticaAlarmStateHistoryRepository repository,
|
||||
MonPersisterConfiguration configuration,
|
||||
@Assisted("ordinal") int ordinal,
|
||||
@Assisted("numProcessors") int numProcessors,
|
||||
@Assisted("batchSize") int batchSize) {
|
||||
public AlarmStateTransitionedMessageEventHandler(VerticaAlarmStateHistoryRepository repository,
|
||||
MonPersisterConfiguration configuration,
|
||||
@Assisted("ordinal") int ordinal,
|
||||
@Assisted("numProcessors") int numProcessors,
|
||||
@Assisted("batchSize") int batchSize) {
|
||||
|
||||
this.repository = repository;
|
||||
this.configuration = configuration;
|
||||
@@ -53,7 +53,7 @@ public class AlarmStateTransitionMessageEventHandler implements EventHandler<Ala
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(AlarmStateTransitionMessageEvent event, long sequence, boolean b) throws Exception {
|
||||
public void onEvent(AlarmStateTransitionedMessageEvent event, long sequence, boolean b) throws Exception {
|
||||
|
||||
if (event.getMessage() == null) {
|
||||
logger.debug("Received heartbeat message. Checking last flush time.");
|
||||
@@ -76,7 +76,7 @@ public class AlarmStateTransitionMessageEventHandler implements EventHandler<Ala
|
||||
" Ordinal: " + ordinal +
|
||||
" Event: " + event.getMessage());
|
||||
|
||||
AlarmStateTransitionMessage message = event.getMessage();
|
||||
AlarmStateTransitionedEvent message = event.getMessage();
|
||||
repository.addToBatch(message);
|
||||
|
||||
if (sequence % batchSize == (batchSize - 1)) {
|
||||
@@ -2,8 +2,8 @@ package com.hpcloud.mon.persister.disruptor.event;
|
||||
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
public interface AlarmStateTransitionMessageEventHandlerFactory {
|
||||
AlarmStateTransitionMessageEventHandler create(@Assisted("ordinal") int ordinal,
|
||||
public interface AlarmStateTransitionedMessageEventHandlerFactory {
|
||||
AlarmStateTransitionedMessageEventHandler create(@Assisted("ordinal") int ordinal,
|
||||
@Assisted("numProcessors") int numProcessors,
|
||||
@Assisted("batchSize") int batchSize);
|
||||
}
|
||||
@@ -1,36 +0,0 @@
|
||||
package com.hpcloud.mon.persister.message;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonRootName;
|
||||
|
||||
@JsonRootName(value = "alarm-transitioned")
|
||||
public class AlarmStateTransitionMessage {
|
||||
|
||||
public String tenantId;
|
||||
public String alarmId;
|
||||
public String alarmName;
|
||||
public String oldState;
|
||||
public String newState;
|
||||
public String stateChangeReason;
|
||||
public long timestamp;
|
||||
|
||||
private AlarmStateTransitionMessage() {
|
||||
}
|
||||
|
||||
public AlarmStateTransitionMessage(String tenantId, String alarmId, String alarmName,
|
||||
String oldState, String newState, String stateChangeReason, long timestamp) {
|
||||
this.tenantId = tenantId;
|
||||
this.alarmId = alarmId;
|
||||
this.alarmName = alarmName;
|
||||
this.oldState = oldState;
|
||||
this.newState = newState;
|
||||
this.stateChangeReason = stateChangeReason;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format(
|
||||
"AlarmStateTransitionEvent [tenantId=%s, alarmId=%s, alarmName=%s, oldState=%s, newState=%s, stateChangeReason=%s, timestamp=%s]",
|
||||
tenantId, alarmId, alarmName, oldState, newState, stateChangeReason, timestamp);
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,7 @@ package com.hpcloud.mon.persister.repository;
|
||||
import com.google.inject.Inject;
|
||||
import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor;
|
||||
import com.hpcloud.mon.persister.disruptor.MetricDisruptor;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionMessageEvent;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEvent;
|
||||
import com.hpcloud.mon.persister.disruptor.event.MetricMessageEvent;
|
||||
import com.lmax.disruptor.EventTranslator;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
@@ -65,10 +65,10 @@ public class RepositoryCommitHeartbeat implements Managed {
|
||||
event.setEnvelope(null);
|
||||
}
|
||||
});
|
||||
alarmHistoryDisruptor.publishEvent(new EventTranslator<AlarmStateTransitionMessageEvent>() {
|
||||
alarmHistoryDisruptor.publishEvent(new EventTranslator<AlarmStateTransitionedMessageEvent>() {
|
||||
|
||||
@Override
|
||||
public void translateTo(AlarmStateTransitionMessageEvent event, long sequence) {
|
||||
public void translateTo(AlarmStateTransitionedMessageEvent event, long sequence) {
|
||||
event.setMessage(null);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.hpcloud.mon.persister.repository;
|
||||
|
||||
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.persister.message.AlarmStateTransitionMessage;
|
||||
import com.yammer.metrics.Metrics;
|
||||
import com.yammer.metrics.core.Timer;
|
||||
import com.yammer.metrics.core.TimerContext;
|
||||
@@ -22,7 +22,7 @@ public class VerticaAlarmStateHistoryRepository extends VerticaRepository {
|
||||
private static final Logger logger = LoggerFactory.getLogger(VerticaAlarmStateHistoryRepository.class);
|
||||
private final MonPersisterConfiguration configuration;
|
||||
private static final String SQL_INSERT_INTO_ALARM_HISTORY =
|
||||
"insert into MonAlarms.StateHistory (tenant_id, alarm_id, alarm_name, old_state, new_state, reason, time_stamp) values (:tenant_id, :alarm_id, :alarm_name, :old_state, :new_state, :reason, :time_stamp)";
|
||||
"insert into MonAlarms.StateHistory (tenant_id, alarm_id, alarm_name, alarm_description, old_state, new_state, reason, time_stamp) values (:tenant_id, :alarm_id, :alarm_name, :alarm_description, :old_state, :new_state, :reason, :time_stamp)";
|
||||
private PreparedBatch batch;
|
||||
private final Timer commitTimer = Metrics.newTimer(this.getClass(), "commits-timer");
|
||||
private final SimpleDateFormat simpleDateFormat;
|
||||
@@ -42,9 +42,17 @@ public class VerticaAlarmStateHistoryRepository extends VerticaRepository {
|
||||
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT-0"));
|
||||
}
|
||||
|
||||
public void addToBatch(AlarmStateTransitionMessage message) {
|
||||
public void addToBatch(AlarmStateTransitionedEvent message) {
|
||||
String timeStamp = simpleDateFormat.format(new Date(message.timestamp * 1000));
|
||||
batch.add().bind(0, message.tenantId).bind(1, message.alarmId).bind(2, message.alarmName).bind(3, message.oldState).bind(4, message.newState).bind(5, message.stateChangeReason).bind(6, timeStamp);
|
||||
batch.add()
|
||||
.bind(0, message.tenantId)
|
||||
.bind(1, message.alarmId)
|
||||
.bind(2, message.alarmName)
|
||||
.bind(3, message.alarmDescription)
|
||||
.bind(4, message.oldState.name())
|
||||
.bind(5, message.newState.name())
|
||||
.bind(6, message.stateChangeReason)
|
||||
.bind(7, timeStamp);
|
||||
}
|
||||
|
||||
public void flush() {
|
||||
|
||||
Reference in New Issue
Block a user