Improve exception handling
Make exception handling more precise. Change-Id: Ib7fb3eed5945c12c92a5d871d7d3f6eb72275a76
This commit is contained in:
parent
1969fb74be
commit
cc67023c13
@ -32,6 +32,8 @@ import monasca.persister.repository.Repo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class AlarmStateTransitionedEventHandler extends
|
||||
FlushableHandler<AlarmStateTransitionedEvent> {
|
||||
|
||||
@ -61,10 +63,10 @@ public class AlarmStateTransitionedEventHandler extends
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int process(String msg) throws Exception {
|
||||
protected int process(String msg) throws IOException {
|
||||
|
||||
AlarmStateTransitionedEvent alarmStateTransitionedEvent =
|
||||
objectMapper.readValue(msg, AlarmStateTransitionedEvent.class);
|
||||
this.objectMapper.readValue(msg, AlarmStateTransitionedEvent.class);
|
||||
|
||||
logger.debug("[{}]: [{}:{}] {}",
|
||||
this.threadId,
|
||||
@ -72,7 +74,7 @@ public class AlarmStateTransitionedEventHandler extends
|
||||
this.getMsgCount(),
|
||||
alarmStateTransitionedEvent);
|
||||
|
||||
alarmRepo.addToBatch(alarmStateTransitionedEvent);
|
||||
this.alarmRepo.addToBatch(alarmStateTransitionedEvent);
|
||||
|
||||
this.alarmStateTransitionCounter.inc();
|
||||
|
||||
@ -93,7 +95,7 @@ public class AlarmStateTransitionedEventHandler extends
|
||||
@Override
|
||||
protected int flushRepository() throws Exception {
|
||||
|
||||
return alarmRepo.flush(this.threadId);
|
||||
return this.alarmRepo.flush(this.threadId);
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -28,6 +28,8 @@ import io.dropwizard.setup.Environment;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public abstract class FlushableHandler<T> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class);
|
||||
@ -88,7 +90,7 @@ public abstract class FlushableHandler<T> {
|
||||
|
||||
protected abstract int flushRepository() throws Exception;
|
||||
|
||||
protected abstract int process(String msg) throws Exception;
|
||||
protected abstract int process(String msg) throws IOException;
|
||||
|
||||
public boolean onEvent(final String msg) throws Exception {
|
||||
|
||||
@ -124,7 +126,7 @@ public abstract class FlushableHandler<T> {
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isBatchSize() throws Exception {
|
||||
private boolean isBatchSize() {
|
||||
|
||||
logger.debug("[{}]: checking batch size", this.threadId);
|
||||
|
||||
@ -136,19 +138,21 @@ public abstract class FlushableHandler<T> {
|
||||
|
||||
} else {
|
||||
|
||||
logger.debug("[{}]: batch size now at {}, batch size {} not attained",
|
||||
this.threadId,
|
||||
this.msgCount,
|
||||
this.batchSize);
|
||||
|
||||
return false;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isFlushTime() throws Exception {
|
||||
private boolean isFlushTime() {
|
||||
|
||||
logger.debug("[{}}: checking flush time", this.threadId);
|
||||
|
||||
logger.debug(
|
||||
"[{}]: got heartbeat message, flush every {} seconds.",
|
||||
this.threadId,
|
||||
this.secondsBetweenFlushes);
|
||||
logger.debug("[{}}: got heartbeat message, checking flush time. flush every {} seconds.",
|
||||
this.threadId,
|
||||
this.secondsBetweenFlushes);
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
|
@ -17,6 +17,10 @@
|
||||
|
||||
package monasca.persister.pipeline.event;
|
||||
|
||||
import monasca.common.model.metric.MetricEnvelope;
|
||||
import monasca.persister.configuration.PipelineConfig;
|
||||
import monasca.persister.repository.Repo;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
@ -27,10 +31,9 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import io.dropwizard.setup.Environment;
|
||||
import monasca.common.model.metric.MetricEnvelope;
|
||||
import monasca.persister.configuration.PipelineConfig;
|
||||
import monasca.persister.repository.Repo;
|
||||
|
||||
public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
|
||||
|
||||
@ -60,10 +63,10 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int process(String msg) throws Exception {
|
||||
public int process(String msg) throws IOException {
|
||||
|
||||
MetricEnvelope[] metricEnvelopesArry =
|
||||
objectMapper.readValue(msg, MetricEnvelope[].class);
|
||||
this.objectMapper.readValue(msg, MetricEnvelope[].class);
|
||||
|
||||
for (final MetricEnvelope metricEnvelope : metricEnvelopesArry) {
|
||||
|
||||
@ -103,7 +106,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
|
||||
@Override
|
||||
public int flushRepository() throws Exception {
|
||||
|
||||
return metricRepo.flush(this.threadId);
|
||||
return this.metricRepo.flush(this.threadId);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -26,8 +26,12 @@ public class InfluxPoint {
|
||||
private final String timestamp;
|
||||
private final Map<String, Object> fields;
|
||||
|
||||
public InfluxPoint(final String name, final Map<String, String> tags, final String timestamp,
|
||||
final Map<String, Object> fields) {
|
||||
public InfluxPoint(
|
||||
final String name,
|
||||
final Map<String, String> tags,
|
||||
final String timestamp,
|
||||
final Map<String, Object> fields) {
|
||||
|
||||
this.name = name;
|
||||
this.tags = tags;
|
||||
this.timestamp = timestamp;
|
||||
|
@ -59,17 +59,17 @@ public abstract class InfluxRepo<T> implements Repo<T> {
|
||||
|
||||
try {
|
||||
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
final Timer.Context context = flushTimer.time();
|
||||
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
int msgWriteCnt = write(id);
|
||||
|
||||
final long endTime = System.currentTimeMillis();
|
||||
|
||||
context.stop();
|
||||
|
||||
logger.debug("[{}]: flushing batch took {} millis", id, endTime - startTime);
|
||||
logger.debug("[{}]: writing to influxdb took {} millis", id, endTime - startTime);
|
||||
|
||||
clearBuffers();
|
||||
|
||||
@ -77,7 +77,7 @@ public abstract class InfluxRepo<T> implements Repo<T> {
|
||||
|
||||
} catch (Exception e) {
|
||||
|
||||
logger.error("[{}]: failed to write msg to influxdb", id, e);
|
||||
logger.error("[{}]: failed to write to influxdb", id, e);
|
||||
|
||||
throw e;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user