Remove 4 classes.
Standardize logging.
Add threadId to logging.
Remove need for casting.

Change-Id: I7aeff8a0ed9d7ec7afdf0a09863e3b514e5f9d1e
This commit is contained in:
Deklan Dieterly 2015-04-18 10:46:03 -06:00 committed by Craig Bryant
parent 213fc06707
commit 7f634a5dc8
31 changed files with 436 additions and 448 deletions

View File

@ -31,14 +31,14 @@ import io.dropwizard.setup.Environment;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.consumer.Consumer;
import monasca.persister.consumer.ConsumerFactory;
import monasca.persister.consumer.ManagedConsumer;
import monasca.persister.consumer.ManagedConsumerFactory;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.consumer.KafkaChannelFactory;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumer;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerFactory;
import monasca.persister.consumer.metric.KafkaMetricsConsumer;
import monasca.persister.consumer.metric.KafkaMetricsConsumerFactory;
import monasca.persister.consumer.KafkaConsumer;
import monasca.persister.consumer.KafkaConsumerFactory;
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory;
import monasca.persister.healthcheck.SimpleHealthCheck;
import monasca.persister.pipeline.ManagedPipeline;
import monasca.persister.pipeline.ManagedPipelineFactory;
@ -97,70 +97,88 @@ public class PersisterApplication extends Application<PersisterConfig> {
final KafkaChannelFactory kafkaChannelFactory = injector.getInstance(KafkaChannelFactory.class);
final ConsumerFactory<MetricEnvelope[]> metricsConsumerFactory =
injector.getInstance(Key.get(new TypeLiteral<ConsumerFactory<MetricEnvelope[]>>() {
}));
final ManagedConsumerFactory<MetricEnvelope[]> metricManagedConsumerFactory =
injector.getInstance(Key.get(new TypeLiteral<ManagedConsumerFactory<MetricEnvelope[]>>() {}));
// Metrics
final KafkaMetricsConsumerFactory<MetricEnvelope[]> kafkaMetricsConsumerFactory =
injector.getInstance(Key.get(new TypeLiteral<KafkaMetricsConsumerFactory<MetricEnvelope[]>>(){}));
final KafkaConsumerFactory<MetricEnvelope[]> kafkaMetricConsumerFactory =
injector.getInstance(Key.get(new TypeLiteral<KafkaConsumerFactory<MetricEnvelope[]>>(){}));
final KafkaConsumerRunnableBasicFactory<MetricEnvelope[]> kafkaMetricConsumerRunnableBasicFactory =
injector.getInstance(Key.get(new TypeLiteral<KafkaConsumerRunnableBasicFactory
<MetricEnvelope[]>>(){}));
for (int i = 0; i < configuration.getMetricConfiguration().getNumThreads(); i++) {
final KafkaChannel kafkaChannel =
kafkaChannelFactory.create(configuration, configuration.getMetricConfiguration(), i);
String threadId = "metric-" + String.valueOf(i);
final ManagedPipeline<MetricEnvelope[]> metricPipeline = getMetricPipeline(
configuration, i, injector);
final KafkaChannel kafkaMetricChannel =
kafkaChannelFactory.create(configuration.getMetricConfiguration(), threadId);
final KafkaMetricsConsumer<MetricEnvelope[]> kafkaMetricsConsumer =
kafkaMetricsConsumerFactory.create(MetricEnvelope[].class, kafkaChannel, i, metricPipeline);
final ManagedPipeline<MetricEnvelope[]> managedMetricPipeline =
getMetricPipeline(configuration, threadId, injector);
Consumer<MetricEnvelope[]> metricsConsumer =
metricsConsumerFactory.create(kafkaMetricsConsumer, metricPipeline);
KafkaConsumerRunnableBasic<MetricEnvelope[]> kafkaMetricConsumerRunnableBasic =
kafkaMetricConsumerRunnableBasicFactory.create(managedMetricPipeline, kafkaMetricChannel, threadId);
environment.lifecycle().manage(metricsConsumer);
final KafkaConsumer<MetricEnvelope[]> kafkaMetricConsumer =
kafkaMetricConsumerFactory.create(kafkaMetricConsumerRunnableBasic, threadId);
ManagedConsumer<MetricEnvelope[]> managedMetricConsumer =
metricManagedConsumerFactory.create(kafkaMetricConsumer, threadId);
environment.lifecycle().manage(managedMetricConsumer);
}
// AlarmStateTransitions
final ConsumerFactory<AlarmStateTransitionedEvent>
alarmStateTransitionsConsumerFactory = injector.getInstance(Key.get(new TypeLiteral
<ConsumerFactory<AlarmStateTransitionedEvent>>(){}));
final ManagedConsumerFactory<AlarmStateTransitionedEvent>
alarmStateTransitionsManagedConsumerFactory = injector.getInstance(Key.get(new TypeLiteral
<ManagedConsumerFactory<AlarmStateTransitionedEvent>>(){}));
final KafkaAlarmStateTransitionConsumerFactory<AlarmStateTransitionedEvent>
final KafkaConsumerFactory<AlarmStateTransitionedEvent>
kafkaAlarmStateTransitionConsumerFactory =
injector.getInstance(Key.get(new TypeLiteral<KafkaAlarmStateTransitionConsumerFactory<AlarmStateTransitionedEvent
>>() {}));
injector.getInstance(Key.get(new TypeLiteral<KafkaConsumerFactory<AlarmStateTransitionedEvent>>() { }));
final KafkaConsumerRunnableBasicFactory<AlarmStateTransitionedEvent> kafkaAlarmStateTransitionConsumerRunnableBasicFactory =
injector.getInstance(Key.get(new TypeLiteral<KafkaConsumerRunnableBasicFactory
<AlarmStateTransitionedEvent>>(){})) ;
for (int i = 0; i < configuration.getAlarmHistoryConfiguration().getNumThreads(); i++) {
final KafkaChannel kafkaChannel =
String threadId = "alarm-state-transition-" + String.valueOf(i);
final KafkaChannel kafkaAlarmStateTransitionChannel =
kafkaChannelFactory
.create(configuration, configuration.getAlarmHistoryConfiguration(), i);
.create(configuration.getAlarmHistoryConfiguration(), threadId);
final ManagedPipeline<AlarmStateTransitionedEvent> pipeline =
getAlarmStateHistoryPipeline(configuration, i, injector);
final ManagedPipeline<AlarmStateTransitionedEvent> managedAlarmStateTransitionPipeline =
getAlarmStateHistoryPipeline(configuration, threadId, injector);
final KafkaAlarmStateTransitionConsumer<AlarmStateTransitionedEvent> kafkaAlarmStateTransitionConsumer =
kafkaAlarmStateTransitionConsumerFactory.create(AlarmStateTransitionedEvent.class, kafkaChannel, i, pipeline);
KafkaConsumerRunnableBasic<AlarmStateTransitionedEvent> kafkaAlarmStateTransitionConsumerRunnableBasic =
kafkaAlarmStateTransitionConsumerRunnableBasicFactory.create(managedAlarmStateTransitionPipeline, kafkaAlarmStateTransitionChannel, threadId);
Consumer<AlarmStateTransitionedEvent> alarmStateTransitionConsumer =
alarmStateTransitionsConsumerFactory.create(kafkaAlarmStateTransitionConsumer, pipeline);
final KafkaConsumer<AlarmStateTransitionedEvent> kafkaAlarmStateTransitionConsumer =
kafkaAlarmStateTransitionConsumerFactory.create(kafkaAlarmStateTransitionConsumerRunnableBasic, threadId);
environment.lifecycle().manage(alarmStateTransitionConsumer);
ManagedConsumer<AlarmStateTransitionedEvent> managedAlarmStateTransitionConsumer =
alarmStateTransitionsManagedConsumerFactory.create(kafkaAlarmStateTransitionConsumer, threadId);
environment.lifecycle().manage(managedAlarmStateTransitionConsumer);
}
}
private ManagedPipeline<MetricEnvelope[]> getMetricPipeline(PersisterConfig configuration, int threadNum,
private ManagedPipeline<MetricEnvelope[]> getMetricPipeline(
PersisterConfig configuration,
String threadId,
Injector injector) {
logger.debug("Creating metric pipeline...");
logger.debug("Creating metric pipeline [{}]...", threadId);
final int batchSize = configuration.getMetricConfiguration().getBatchSize();
logger.debug("Batch size for metric pipeline [" + batchSize + "]");
logger.debug("Batch size for metric pipeline [{}]", batchSize);
MetricHandlerFactory<MetricEnvelope[]> metricEventHandlerFactory =
injector.getInstance(Key.get(new TypeLiteral<MetricHandlerFactory<MetricEnvelope[]>>(){}));
MetricHandlerFactory metricEventHandlerFactory =
injector.getInstance(MetricHandlerFactory.class);
ManagedPipelineFactory<MetricEnvelope[]>
managedPipelineFactory = injector.getInstance(Key.get(new TypeLiteral
@ -168,33 +186,34 @@ public class PersisterApplication extends Application<PersisterConfig> {
final ManagedPipeline<MetricEnvelope[]> pipeline =
managedPipelineFactory.create(metricEventHandlerFactory.create(
configuration.getMetricConfiguration(), threadNum, batchSize));
configuration.getMetricConfiguration(), threadId, batchSize), threadId);
logger.debug("Instance of metric pipeline fully created");
logger.debug("Instance of metric pipeline [{}] fully created", threadId);
return pipeline;
}
public ManagedPipeline<AlarmStateTransitionedEvent> getAlarmStateHistoryPipeline(
PersisterConfig configuration, int threadNum, Injector injector) {
PersisterConfig configuration,
String threadId,
Injector injector) {
logger.debug("Creating alarm state history pipeline...");
logger.debug("Creating alarm state history pipeline [{}]...", threadId);
int batchSize = configuration.getAlarmHistoryConfiguration().getBatchSize();
logger.debug("Batch size for each AlarmStateHistoryPipeline [" + batchSize + "]");
logger.debug("Batch size for each AlarmStateHistoryPipeline [{}]", batchSize);
AlarmStateTransitionedEventHandlerFactory<AlarmStateTransitionedEvent> alarmHistoryEventHandlerFactory =
injector.getInstance(Key.get(new TypeLiteral<AlarmStateTransitionedEventHandlerFactory
<AlarmStateTransitionedEvent>>(){}));
AlarmStateTransitionedEventHandlerFactory alarmHistoryEventHandlerFactory =
injector.getInstance(AlarmStateTransitionedEventHandlerFactory.class);
ManagedPipelineFactory<AlarmStateTransitionedEvent> alarmStateTransitionPipelineFactory =
injector.getInstance(new Key<ManagedPipelineFactory<AlarmStateTransitionedEvent>>(){});
ManagedPipeline<AlarmStateTransitionedEvent> pipeline =
alarmStateTransitionPipelineFactory.create(alarmHistoryEventHandlerFactory.create(
configuration.getAlarmHistoryConfiguration(), threadNum, batchSize));
configuration.getAlarmHistoryConfiguration(), threadId, batchSize), threadId);
logger.debug("Instance of alarm state history pipeline fully created");
logger.debug("Instance of alarm state history pipeline [{}] fully created", threadId);
return pipeline;
}

View File

@ -30,16 +30,14 @@ import io.dropwizard.setup.Environment;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.consumer.Consumer;
import monasca.persister.consumer.ConsumerFactory;
import monasca.persister.consumer.ManagedConsumer;
import monasca.persister.consumer.ManagedConsumerFactory;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.consumer.KafkaChannelFactory;
import monasca.persister.consumer.KafkaConsumer;
import monasca.persister.consumer.KafkaConsumerFactory;
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumer;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerFactory;
import monasca.persister.consumer.metric.KafkaMetricsConsumer;
import monasca.persister.consumer.metric.KafkaMetricsConsumerFactory;
import monasca.persister.dbi.DBIProvider;
import monasca.persister.pipeline.ManagedPipeline;
import monasca.persister.pipeline.ManagedPipelineFactory;
@ -78,15 +76,15 @@ public class PersisterModule extends AbstractModule {
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<MetricHandler<MetricEnvelope[]>>() {},
new TypeLiteral<MetricHandler<MetricEnvelope[]>>() {})
.build(new TypeLiteral<MetricHandlerFactory<MetricEnvelope[]>>() {}));
MetricHandler.class,
MetricHandler.class)
.build(MetricHandlerFactory.class));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<AlarmStateTransitionedEventHandler<AlarmStateTransitionedEvent>>() {},
new TypeLiteral<AlarmStateTransitionedEventHandler<AlarmStateTransitionedEvent>>() {})
.build(new TypeLiteral<AlarmStateTransitionedEventHandlerFactory<AlarmStateTransitionedEvent>>() {}));
AlarmStateTransitionedEventHandler.class,
AlarmStateTransitionedEventHandler.class)
.build(AlarmStateTransitionedEventHandlerFactory.class));
install(
new FactoryModuleBuilder().implement(
@ -102,14 +100,14 @@ public class PersisterModule extends AbstractModule {
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<KafkaMetricsConsumer<MetricEnvelope[]>>() {},
new TypeLiteral<KafkaMetricsConsumer<MetricEnvelope[]>>() {})
.build(new TypeLiteral<KafkaMetricsConsumerFactory<MetricEnvelope[]>>() {}));
new TypeLiteral<KafkaConsumer<MetricEnvelope[]>>() {},
new TypeLiteral<KafkaConsumer<MetricEnvelope[]>>() {})
.build(new TypeLiteral<KafkaConsumerFactory<MetricEnvelope[]>>() {}));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<ManagedPipeline<MetricEnvelope[]>>() {},
new TypeLiteral<ManagedPipeline<MetricEnvelope[]>>() {})
new TypeLiteral<ManagedPipeline<MetricEnvelope[]>>() {},
new TypeLiteral<ManagedPipeline<MetricEnvelope[]>>() {})
.build(new TypeLiteral<ManagedPipelineFactory<MetricEnvelope[]>>() {}));
install(
@ -120,21 +118,21 @@ public class PersisterModule extends AbstractModule {
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<Consumer<AlarmStateTransitionedEvent>>() {},
new TypeLiteral<Consumer<AlarmStateTransitionedEvent>>() {})
.build(new TypeLiteral<ConsumerFactory<AlarmStateTransitionedEvent>>() {}));
new TypeLiteral<ManagedConsumer<AlarmStateTransitionedEvent>>() {},
new TypeLiteral<ManagedConsumer<AlarmStateTransitionedEvent>>() {})
.build(new TypeLiteral<ManagedConsumerFactory<AlarmStateTransitionedEvent>>() {}));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<KafkaAlarmStateTransitionConsumer<AlarmStateTransitionedEvent>>() {},
new TypeLiteral<KafkaAlarmStateTransitionConsumer<AlarmStateTransitionedEvent>>() {})
.build(new TypeLiteral<KafkaAlarmStateTransitionConsumerFactory<AlarmStateTransitionedEvent>>() {}));
new TypeLiteral<KafkaConsumer<AlarmStateTransitionedEvent>>() {},
new TypeLiteral<KafkaConsumer<AlarmStateTransitionedEvent>>() {})
.build(new TypeLiteral<KafkaConsumerFactory<AlarmStateTransitionedEvent>>() {}));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<Consumer<MetricEnvelope[]>>() {},
new TypeLiteral<Consumer<MetricEnvelope[]>>() {})
.build(new TypeLiteral<ConsumerFactory<MetricEnvelope[]>>() {}));
new TypeLiteral<ManagedConsumer<MetricEnvelope[]>>() {},
new TypeLiteral<ManagedConsumer<MetricEnvelope[]>>() {})
.build(new TypeLiteral<ManagedConsumerFactory<MetricEnvelope[]>>() {}));
install(
new FactoryModuleBuilder().implement(

View File

@ -43,16 +43,16 @@ public class KafkaChannel {
private final String topic;
private final ConsumerConnector consumerConnector;
private final int threadNum;
private final String threadId;
@Inject
public KafkaChannel(
@Assisted PersisterConfig configuration,
PersisterConfig configuration,
@Assisted PipelineConfig pipelineConfig,
@Assisted int threadNum) {
@Assisted String threadId) {
this.topic = pipelineConfig.getTopic();
this.threadNum = threadNum;
this.threadId = threadId;
Properties kafkaProperties =
createKafkaProperties(configuration.getKafkaConfig(), pipelineConfig);
consumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig(kafkaProperties));
@ -90,7 +90,7 @@ public class KafkaChannel {
properties.put("group.id", pipelineConfig.getGroupId());
properties.put("zookeeper.connect", kafkaConfig.getZookeeperConnect());
properties.put("consumer.id",
String.format("%s_%d", pipelineConfig.getConsumerId(), this.threadNum));
String.format("%s_%s", pipelineConfig.getConsumerId(), this.threadId));
properties.put("socket.timeout.ms", kafkaConfig.getSocketTimeoutMs().toString());
properties.put("socket.receive.buffer.bytes", kafkaConfig.getSocketReceiveBufferBytes()
.toString());
@ -108,7 +108,7 @@ public class KafkaChannel {
.toString());
properties.put("auto.offset.reset", kafkaConfig.getAutoOffsetReset());
properties.put("consumer.timeout.ms", kafkaConfig.getConsumerTimeoutMs().toString());
properties.put("client.id", String.format("%s_%d", pipelineConfig.getClientId(), threadNum));
properties.put("client.id", String.format("%s_%s", pipelineConfig.getClientId(), threadId));
properties.put("zookeeper.session.timeout.ms", kafkaConfig
.getZookeeperSessionTimeoutMs().toString());
properties.put("zookeeper.connection.timeout.ms", kafkaConfig
@ -117,7 +117,7 @@ public class KafkaChannel {
.put("zookeeper.sync.time.ms", kafkaConfig.getZookeeperSyncTimeMs().toString());
for (String key : properties.stringPropertyNames()) {
logger.info(KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key));
logger.info("[{}]: " + KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key), threadId);
}
return properties;

View File

@ -17,13 +17,11 @@
package monasca.persister.consumer;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.configuration.PipelineConfig;
public interface KafkaChannelFactory {
KafkaChannel create(
PersisterConfig configuration,
PipelineConfig pipelineConfig,
int threadNum);
String threadId);
}

View File

@ -17,6 +17,9 @@
package monasca.persister.consumer;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,42 +27,65 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public abstract class KafkaConsumer<T> {
public class KafkaConsumer<T> {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
private static final int WAIT_TIME = 10;
private ExecutorService executorService;
private final KafkaChannel kafkaChannel;
private final int threadNum;
private KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic;
public KafkaConsumer(KafkaChannel kafkaChannel, int threadNum) {
this.kafkaChannel = kafkaChannel;
this.threadNum = threadNum;
private final KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic;
private final String threadId;
@Inject
public KafkaConsumer(
@Assisted KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic,
@Assisted String threadId) {
this.kafkaConsumerRunnableBasic = kafkaConsumerRunnableBasic;
this.threadId = threadId;
}
protected abstract KafkaConsumerRunnableBasic<T> createRunnable(
KafkaChannel kafkaChannel,
int threadNumber);
public void start() {
logger.info("[{}]: start", this.threadId);
executorService = Executors.newFixedThreadPool(1);
kafkaConsumerRunnableBasic = createRunnable(kafkaChannel, this.threadNum);
executorService.submit(kafkaConsumerRunnableBasic);
}
public void stop() {
logger.info("[{}]: stop", this.threadId);
kafkaConsumerRunnableBasic.stop();
if (executorService != null) {
logger.info("[{}]: shutting down executor service", this.threadId);
executorService.shutdown();
try {
logger.info("[{}]: awaiting termination...", this.threadId);
if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) {
logger.warn("Did not shut down in {} seconds", WAIT_TIME);
logger.warn("[{}]: did not shut down in {} seconds", this.threadId, WAIT_TIME);
}
logger.info("[{}]: terminated", this.threadId);
} catch (InterruptedException e) {
logger.info("awaitTermination interrupted", e);
logger.info("[{}]: awaitTermination interrupted", this.threadId, e);
}
}
}

View File

@ -14,17 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.consumer;
package monasca.persister.consumer.metric;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.pipeline.ManagedPipeline;
public interface KafkaConsumerFactory<T> {
public interface KafkaMetricsConsumerFactory<T> {
KafkaConsumer<T> create(
KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic,
String threadId);
KafkaMetricsConsumer<T> create(
Class<T> clazz,
KafkaChannel kafkaChannel,
int threadNum,
ManagedPipeline<T> pipeline);
}

View File

@ -20,8 +20,6 @@ package monasca.persister.consumer;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,61 +29,68 @@ import monasca.persister.pipeline.ManagedPipeline;
public class KafkaConsumerRunnableBasic<T> implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnableBasic.class);
private final KafkaChannel kafkaChannel;
private final int threadNumber;
private final String threadId;
private final ManagedPipeline<T> pipeline;
private volatile boolean stop = false;
private final ObjectMapper objectMapper;
private final Class<T> clazz;
@Inject
public KafkaConsumerRunnableBasic(
@Assisted Class<T> clazz,
@Assisted ObjectMapper objectMapper,
@Assisted KafkaChannel kafkaChannel,
@Assisted ManagedPipeline<T> pipeline,
@Assisted int threadNumber) {
@Assisted String threadId) {
this.kafkaChannel = kafkaChannel;
this.pipeline = pipeline;
this.threadNumber = threadNumber;
this.objectMapper = objectMapper;
this.clazz = clazz;
this.threadId = threadId;
}
protected void publishHeartbeat() {
publishEvent(null);
}
protected void handleMessage(String message) {
protected void handleMessage(String msg) {
try {
final T o = objectMapper.readValue(message, this.clazz);
publishEvent(o);
publishEvent(msg);
} catch (Exception e) {
logger.error("Failed to deserialize JSON message and send to handler: " + message, e);
logger.error(
"[{}]: failed to deserialize JSON message and send to handler: {} ",
threadId,
msg,
e);
}
}
private void markRead() {
logger.debug("[{}]: marking read", this.threadId);
this.kafkaChannel.markRead();
}
public void stop() {
logger.info("[{}]: stop", this.threadId);
this.stop = true;
this.pipeline.shutdown();
}
public void run() {
logger.info("[{}]: run", this.threadId);
final ConsumerIterator<byte[], byte[]> it = kafkaChannel.getKafkaStream().iterator();
logger.debug("KafkaChannel {} has stream", this.threadNumber);
logger.debug("[{}]: KafkaChannel has stream iterator", this.threadId);
while (!this.stop) {
@ -93,11 +98,11 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
if (it.hasNext()) {
final String s = new String(it.next().message());
final String msg = new String(it.next().message());
logger.debug("Thread {}: {}", threadNumber, s);
logger.debug("[{}]: {}", this.threadId, msg);
handleMessage(s);
handleMessage(msg);
}
@ -108,14 +113,14 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
}
}
logger.debug("Shutting down Thread: {}", threadNumber);
logger.info("[{}]: shutting down", this.threadId);
this.kafkaChannel.stop();
}
protected void publishEvent(final T event) {
protected void publishEvent(final String msg) {
if (pipeline.publishEvent(event)) {
if (pipeline.publishEvent(msg)) {
markRead();

View File

@ -16,17 +16,13 @@
*/
package monasca.persister.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import monasca.persister.pipeline.ManagedPipeline;
public interface KafkaConsumerRunnableBasicFactory<T> {
KafkaConsumerRunnableBasic<T> create(
ObjectMapper objectMapper,
Class<T> clazz,
ManagedPipeline<T> pipeline,
KafkaChannel kafkaChannel,
int threadNumber);
String threadId);
}

View File

@ -17,8 +17,6 @@
package monasca.persister.consumer;
import monasca.persister.pipeline.ManagedPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@ -27,31 +25,36 @@ import io.dropwizard.lifecycle.Managed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Consumer<T> implements Managed {
public class ManagedConsumer<T> implements Managed {
private static final Logger logger = LoggerFactory.getLogger(ManagedConsumer.class);
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
private final KafkaConsumer<T> consumer;
private final ManagedPipeline<T> pipeline;
private final String threadId;
@Inject
public Consumer(
public ManagedConsumer(
@Assisted KafkaConsumer<T> kafkaConsumer,
@Assisted ManagedPipeline<T> pipeline) {
@Assisted String threadId) {
this.consumer = kafkaConsumer;
this.pipeline = pipeline;
this.threadId = threadId;
}
@Override
public void start() throws Exception {
logger.debug("start");
consumer.start();
logger.debug("[{}]: start", this.threadId);
this.consumer.start();
}
@Override
public void stop() throws Exception {
logger.debug("stop");
consumer.stop();
pipeline.shutdown();
logger.debug("[{}]: stop", this.threadId);
this.consumer.stop();
}
}

View File

@ -17,12 +17,10 @@
package monasca.persister.consumer;
import monasca.persister.pipeline.ManagedPipeline;
public interface ManagedConsumerFactory<T> {
public interface ConsumerFactory<T> {
Consumer<T> create(
ManagedConsumer<T> create(
KafkaConsumer<T> kafkaConsumer,
ManagedPipeline<T> pipeline);
String threadId);
}

View File

@ -1,67 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.consumer.alarmstate;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.consumer.KafkaConsumer;
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory;
import monasca.persister.pipeline.ManagedPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
public class KafkaAlarmStateTransitionConsumer<T> extends KafkaConsumer<T> {
@Inject
private KafkaConsumerRunnableBasicFactory<T> factory;
private final ManagedPipeline<T> pipeline;
private final Class<T> clazz;
@Inject
public KafkaAlarmStateTransitionConsumer(
@Assisted Class<T> clazz,
@Assisted KafkaChannel kafkaChannel,
@Assisted int threadNum,
@Assisted final ManagedPipeline<T> pipeline) {
super(kafkaChannel, threadNum);
this.pipeline = pipeline;
this.clazz = clazz;
}
@Override
protected KafkaConsumerRunnableBasic<T> createRunnable(
KafkaChannel kafkaChannel,
int threadNumber) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
objectMapper.enable(DeserializationFeature.UNWRAP_ROOT_VALUE);
return factory.create(objectMapper, clazz, pipeline, kafkaChannel, threadNumber);
}
}

View File

@ -1,29 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.consumer.alarmstate;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.pipeline.ManagedPipeline;
public interface KafkaAlarmStateTransitionConsumerFactory<T> {
KafkaAlarmStateTransitionConsumer<T> create(
Class<T> clazz,
KafkaChannel kafkaChannel, int threadNum,
final ManagedPipeline<T> pipeline);
}

View File

@ -1,68 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.consumer.metric;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.consumer.KafkaConsumer;
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory;
import monasca.persister.pipeline.ManagedPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
public class KafkaMetricsConsumer<T> extends KafkaConsumer<T> {
@Inject
private KafkaConsumerRunnableBasicFactory<T> factory;
private final ManagedPipeline<T> pipeline;
private final Class<T> clazz;
@Inject
public KafkaMetricsConsumer(
@Assisted Class<T> clazz,
@Assisted KafkaChannel kafkaChannel,
@Assisted int threadNum,
@Assisted ManagedPipeline<T> pipeline) {
super(kafkaChannel, threadNum);
this.pipeline = pipeline;
this.clazz = clazz;
}
@Override
protected KafkaConsumerRunnableBasic<T> createRunnable(
KafkaChannel kafkaChannel,
int threadNumber) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
objectMapper.setPropertyNamingStrategy(
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
return factory.create(objectMapper, clazz, pipeline, kafkaChannel, threadNumber);
}
}

View File

@ -30,28 +30,34 @@ public class ManagedPipeline<T> {
private static final Logger logger = LoggerFactory.getLogger(ManagedPipeline.class);
private final FlushableHandler<T> handler;
private final String threadId;
@Inject
public ManagedPipeline(
@Assisted FlushableHandler<T> handler) {
@Assisted FlushableHandler<T> handler,
@Assisted String threadId) {
this.handler = handler;
this.threadId = threadId;
}
public void shutdown() {
logger.info("[{}]: shutdown", this.threadId);
handler.flush();
}
public boolean publishEvent(T holder) {
public boolean publishEvent(String msg) {
try {
return this.handler.onEvent(holder);
return this.handler.onEvent(msg);
} catch (Exception e) {
logger.error("Failed to handle event", e);
logger.error("[{}]: failed to handle msg: {}", msg, e);
return false;

View File

@ -20,6 +20,8 @@ import monasca.persister.pipeline.event.FlushableHandler;
public interface ManagedPipelineFactory<T> {
ManagedPipeline<T> create(FlushableHandler<T> handler);
ManagedPipeline<T> create(
FlushableHandler<T> handler,
String threadId);
}

View File

@ -25,55 +25,75 @@ import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.codahale.metrics.Counter;
import com.fasterxml.jackson.databind.DeserializationFeature;
import io.dropwizard.setup.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlarmStateTransitionedEventHandler<T> extends
FlushableHandler<T> {
public class AlarmStateTransitionedEventHandler extends
FlushableHandler<AlarmStateTransitionedEvent> {
private static final Logger logger = LoggerFactory
.getLogger(AlarmStateTransitionedEventHandler.class);
private static final Logger logger =
LoggerFactory.getLogger(AlarmStateTransitionedEventHandler.class);
private final AlarmRepo alarmRepo;
private final int ordinal;
private final Counter alarmStateTransitionCounter;
@Inject
public AlarmStateTransitionedEventHandler(
AlarmRepo alarmRepo,
@Assisted PipelineConfig configuration,
Environment environment,
@Assisted("ordinal") int ordinal,
@Assisted PipelineConfig configuration,
@Assisted("threadId") String threadId,
@Assisted("batchSize") int batchSize) {
super(configuration, environment, ordinal, batchSize,
AlarmStateTransitionedEventHandler.class.getName());
super(configuration, environment, threadId, batchSize);
this.alarmRepo = alarmRepo;
this.ordinal = ordinal;
final String handlerName = String.format("%s[%d]", AlarmStateTransitionedEventHandler.class.getName(), ordinal);
this.alarmStateTransitionCounter =
environment.metrics().counter(handlerName + "." + "alarm-state-transitions-added-to-batch-counter");
environment.metrics()
.counter(this.handlerName + "." + "alarm-state-transitions-added-to-batch-counter");
}
@Override
protected int process(T event) throws Exception {
protected int process(String msg) throws Exception {
logger.debug("Ordinal: {}: {}", this.ordinal, event);
AlarmStateTransitionedEvent alarmStateTransitionedEvent =
objectMapper.readValue(msg, AlarmStateTransitionedEvent.class);
alarmRepo.addToBatch((AlarmStateTransitionedEvent) event);
logger.debug("[{}]: [{}:{}]: {}",
this.threadId,
this.getBatchCount(),
this.getMsgCount(),
alarmStateTransitionedEvent);
alarmRepo.addToBatch(alarmStateTransitionedEvent);
this.alarmStateTransitionCounter.inc();
return 1;
}
@Override
protected void initObjectMapper() {
this.objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
this.objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
this.objectMapper.enable(DeserializationFeature.UNWRAP_ROOT_VALUE);
}
@Override
protected void flushRepository() {
alarmRepo.flush();
alarmRepo.flush(this.threadId);
}
}

View File

@ -21,10 +21,10 @@ import monasca.persister.configuration.PipelineConfig;
import com.google.inject.assistedinject.Assisted;
public interface AlarmStateTransitionedEventHandlerFactory<T> {
public interface AlarmStateTransitionedEventHandlerFactory {
AlarmStateTransitionedEventHandler<T> create(
AlarmStateTransitionedEventHandler create(
PipelineConfig configuration,
@Assisted("ordinal") int ordinal,
@Assisted("threadId") String threadId,
@Assisted("batchSize") int batchSize);
}

View File

@ -21,6 +21,7 @@ import monasca.persister.configuration.PipelineConfig;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.setup.Environment;
@ -31,61 +32,75 @@ public abstract class FlushableHandler<T> {
private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class);
private final int ordinal;
private final int batchSize;
private final String handlerName;
private long millisSinceLastFlush = System.currentTimeMillis();
private long flushTimeMillis = System.currentTimeMillis();
private final long millisBetweenFlushes;
private final int secondsBetweenFlushes;
private int eventCount = 0;
private final Environment environment;
private int msgCount = 0;
private long batchCount = 0;
private final Meter processedMeter;
private final Meter commitMeter;
private final Timer commitTimer;
protected final String threadId;
protected ObjectMapper objectMapper = new ObjectMapper();
protected final String handlerName;
protected FlushableHandler(
PipelineConfig configuration,
Environment environment,
int ordinal,
int batchSize,
String baseName) {
String threadId,
int batchSize) {
this.threadId = threadId;
this.handlerName =
String.format(
"%s[%s]",
this.getClass().getName(),
threadId);
this.handlerName = String.format("%s[%d]", baseName, ordinal);
this.environment = environment;
this.processedMeter =
this.environment.metrics()
environment.metrics()
.meter(handlerName + "." + "events-processed-processedMeter");
this.commitMeter =
this.environment.metrics().meter(handlerName + "." + "commits-executed-processedMeter");
environment.metrics().meter(handlerName + "." + "commits-executed-processedMeter");
this.commitTimer =
this.environment.metrics().timer(handlerName + "." + "total-commit-and-flush-timer");
environment.metrics().timer(handlerName + "." + "total-commit-and-flush-timer");
this.secondsBetweenFlushes = configuration.getMaxBatchTime();
this.millisBetweenFlushes = secondsBetweenFlushes * 1000;
this.ordinal = ordinal;
this.batchSize = batchSize;
initObjectMapper();
}
protected abstract void initObjectMapper();
protected abstract void flushRepository();
protected abstract int process(T metricEvent) throws Exception;
protected abstract int process(String msg) throws Exception;
public boolean onEvent(final T event) throws Exception {
public boolean onEvent(final String msg) throws Exception {
if (event == null) {
if (msg == null) {
long delta = millisSinceLastFlush + millisBetweenFlushes;
logger.debug("{} received heartbeat message, flush every {} seconds.", this.handlerName,
logger.debug("[{}]: got heartbeat message, flush every {} seconds.", this.threadId,
this.secondsBetweenFlushes);
if (delta < System.currentTimeMillis()) {
if (this.flushTimeMillis < System.currentTimeMillis()) {
logger.debug("{}: {} seconds since last flush. Flushing to repository now.",
this.handlerName, delta);
logger.debug("[{}]: {} millis past flush time. flushing to repository now.",
this.threadId, (System.currentTimeMillis() - this.flushTimeMillis));
flush();
@ -93,35 +108,67 @@ public abstract class FlushableHandler<T> {
} else {
logger.debug("{}: {} seconds since last flush. No need to flush at this time.",
this.handlerName, delta);
logger.debug("[{}]: {} millis to next flush time. no need to flush at this time.",
this.threadId, this.flushTimeMillis - System.currentTimeMillis());
return false;
}
}
processedMeter.mark();
this.processedMeter.mark();
eventCount += process(event);
this.msgCount += process(msg);
if (this.msgCount >= this.batchSize) {
logger.debug("[{}]: batch sized {} attained", this.threadId, this.batchSize);
if (eventCount >= batchSize) {
flush();
return true;
} else {
return false;
}
}
public void flush() {
if (eventCount == 0) {
logger.debug("{}: Nothing to flush", this.handlerName);
logger.debug("[{}]: flush", this.threadId);
if (this.msgCount == 0) {
logger.debug("[{}]: nothing to flush", this.threadId);
}
Timer.Context context = commitTimer.time();
Timer.Context context = this.commitTimer.time();
flushRepository();
context.stop();
commitMeter.mark();
millisSinceLastFlush = System.currentTimeMillis();
logger.debug("{}: Flushed {} events", this.handlerName, this.eventCount);
eventCount = 0;
this.commitMeter.mark();
this.flushTimeMillis = System.currentTimeMillis() + this.millisBetweenFlushes;
logger.debug("[{}]: flushed {} msg", this.threadId, this.msgCount);
this.msgCount = 0;
this.batchCount++;
}
public long getBatchCount() {
return this.batchCount;
}
public int getMsgCount() {
return this.msgCount;
}
}

View File

@ -21,6 +21,8 @@ import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.codahale.metrics.Counter;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,47 +32,43 @@ import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PipelineConfig;
import monasca.persister.repository.MetricRepo;
public class MetricHandler<T> extends FlushableHandler<T> {
public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
private static final Logger logger = LoggerFactory
.getLogger(MetricHandler.class);
private static final Logger logger =
LoggerFactory.getLogger(MetricHandler.class);
private final MetricRepo metricRepo;
private final int ordinal;
private final Counter metricCounter;
@Inject
public MetricHandler(
MetricRepo metricRepo,
@Assisted PipelineConfig configuration,
Environment environment,
@Assisted("ordinal") int ordinal,
@Assisted PipelineConfig configuration,
@Assisted("threadId") String threadId,
@Assisted("batchSize") int batchSize) {
super(configuration,
environment,
ordinal,
batchSize,
MetricHandler.class.getName());
super(configuration, environment, threadId, batchSize);
this.metricRepo = metricRepo;
this.ordinal = ordinal;
final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal);
this.metricCounter =
environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter");
environment.metrics()
.counter(this.handlerName + "." + "metrics-added-to-batch-counter");
}
@Override
public int process(T metricEnvelopes) throws Exception {
public int process(String msg) throws Exception {
MetricEnvelope[] metricEnvelopesArry =
objectMapper.readValue(msg, MetricEnvelope[].class);
MetricEnvelope[] metricEnvelopesArry = (MetricEnvelope[]) metricEnvelopes;
for (final MetricEnvelope metricEnvelope : metricEnvelopesArry) {
processEnvelope(metricEnvelope);
}
return metricEnvelopesArry.length;
@ -78,17 +76,34 @@ public class MetricHandler<T> extends FlushableHandler<T> {
private void processEnvelope(MetricEnvelope metricEnvelope) {
logger.debug("Ordinal: {}: {}", this.ordinal, metricEnvelope);
logger.debug("[{}]: [{}:{}]: {}",
this.threadId,
this.getBatchCount(),
this.getMsgCount(),
metricEnvelope);
this.metricRepo.addToBatch(metricEnvelope);
metricCounter.inc();
this.metricCounter.inc();
}
@Override
protected void initObjectMapper() {
this.objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
this.objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
this.objectMapper.setPropertyNamingStrategy(
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
}
@Override
public void flushRepository() {
metricRepo.flush();
metricRepo.flush(this.threadId);
}
}

View File

@ -21,10 +21,10 @@ import monasca.persister.configuration.PipelineConfig;
import com.google.inject.assistedinject.Assisted;
public interface MetricHandlerFactory<T> {
public interface MetricHandlerFactory{
MetricHandler<T> create(
MetricHandler create(
PipelineConfig pipelineConfig,
@Assisted("ordinal") int ordinal,
@Assisted("threadId") String threadId,
@Assisted("batchSize") int batchSize);
}

View File

@ -23,5 +23,5 @@ public interface AlarmRepo {
void addToBatch(final AlarmStateTransitionedEvent message);
void flush();
void flush(String id);
}

View File

@ -23,5 +23,5 @@ public interface MetricRepo {
void addToBatch(final MetricEnvelope metricEnvelope);
void flush();
void flush(String id);
}

View File

@ -50,7 +50,7 @@ public abstract class InfluxAlarmRepo implements AlarmRepo {
MetricRegistry.name(getClass(), "alarm_state_history-meter"));
}
protected abstract void write () throws Exception;
protected abstract void write (String id) throws Exception;
@Override
public void addToBatch(AlarmStateTransitionedEvent alarmStateTransitionedEvent) {
@ -61,25 +61,26 @@ public abstract class InfluxAlarmRepo implements AlarmRepo {
}
@Override
public void flush() {
public void flush(String id) {
try {
if (this.alarmStateTransitionedEventList.isEmpty()) {
logger.debug("There are no alarm state transition events to be written to the influxDB");
logger.debug("Returning from flush");
logger.debug("[{}]: no alarm state transition msg to be written to the influxDB", id);
logger.debug("[{}]: returning from flush", id);
return;
}
long startTime = System.currentTimeMillis();
Timer.Context context = flushTimer.time();
write();
write(id);
context.stop();
long endTime = System.currentTimeMillis();
logger.debug("Commiting batch took {} seconds", (endTime - startTime) / 1000);
logger.debug("[{}]: flushing batch took {} seconds", id, (endTime - startTime) / 1000);
} catch (Exception e) {
logger.error("Failed to write alarm state history to database", e);
logger.error("[{}]: failed to write alarm state history to database", id, e);
}
this.alarmStateTransitionedEventList.clear();

View File

@ -40,7 +40,7 @@ public abstract class InfluxMetricRepo implements MetricRepo {
public final com.codahale.metrics.Timer flushTimer;
public final Meter measurementMeter;
protected abstract void write() throws Exception;
protected abstract void write(String id) throws Exception;
public InfluxMetricRepo(final Environment env) {
@ -72,22 +72,31 @@ public abstract class InfluxMetricRepo implements MetricRepo {
@Override
public void flush() {
public void flush(String id) {
try {
if (this.measurementBuffer.isEmpty()) {
logger.debug("[{}]: no metric msg to be written to the influxDB", id);
logger.debug("[{}]: returning from flush", id);
return;
}
final long startTime = System.currentTimeMillis();
final Timer.Context context = flushTimer.time();
write();
write(id);
final long endTime = System.currentTimeMillis();
context.stop();
logger.debug("Writing measurements, definitions, and dimensions to InfluxDB took {} seconds",
(endTime - startTime) / 1000);
logger.debug("[{}]: flushing batch took {} seconds",
id, (endTime - startTime) / 1000);
} catch (Exception e) {
logger.error("Failed to write measurements to InfluxDB", e);
logger.error("[{}]: failed to write measurements to InfluxDB", id, e);
}
clearBuffers();

View File

@ -28,8 +28,6 @@ import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.LinkedList;
@ -40,8 +38,6 @@ import io.dropwizard.setup.Environment;
public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
private static final Logger logger = LoggerFactory.getLogger(InfluxV9AlarmRepo.class);
private final InfluxV9RepoWriter influxV9RepoWriter;
private final ObjectMapper objectMapper = new ObjectMapper();
@ -60,9 +56,9 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
}
@Override
protected void write() throws Exception {
protected void write(String id) throws Exception {
this.influxV9RepoWriter.write(getInfluxPointArry());
this.influxV9RepoWriter.write(getInfluxPointArry(), id);
}

View File

@ -41,9 +41,9 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo {
}
@Override
protected void write() throws Exception {
protected void write(String id) throws Exception {
this.influxV9RepoWriter.write(getInfluxPointArry());
this.influxV9RepoWriter.write(getInfluxPointArry(), id);
}
@ -77,9 +77,7 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo {
influxPointList.add(influxPoint);
}
}
}
return influxPointList.toArray(new InfluxPoint[influxPointList.size()]);
@ -89,11 +87,17 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo {
private Map<String, Object> buildValueMap(Measurement measurement) {
Map<String, Object> valueMap = new HashMap<>();
valueMap.put("value", measurement.getValue());
String valueMetaJSONString = measurement.getValueMetaJSONString();
if (valueMetaJSONString != null) {
valueMap.put("value_meta", valueMetaJSONString);
}
return valueMap;
}
@ -111,10 +115,10 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo {
}
tagMap.put("_tenant_id", definition.getTenantId());
tagMap.put("_region", definition.getRegion());
return tagMap;
}
}

View File

@ -123,7 +123,7 @@ public class InfluxV9RepoWriter {
}
}
protected void write(final InfluxPoint[] influxPointArry) throws Exception {
protected void write(final InfluxPoint[] influxPointArry, String id) throws Exception {
HttpPost request = new HttpPost(this.influxUrl);
@ -133,12 +133,14 @@ public class InfluxV9RepoWriter {
InfluxWrite
influxWrite =
new InfluxWrite(this.influxName, this.influxRetentionPolicy, influxPointArry,
new HashMap());
new HashMap<String, String>());
String json = this.objectMapper.writeValueAsString(influxWrite);
if (this.gzip) {
logger.debug("[{}]: gzip set to true. sending gzip msg", id);
HttpEntity
requestEntity =
EntityBuilder
@ -155,6 +157,8 @@ public class InfluxV9RepoWriter {
} else {
logger.debug("[{}]: gzip set to false. sending non-gzip msg", id);
StringEntity stringEntity = new StringEntity(json, "UTF-8");
request.setEntity(stringEntity);
@ -163,8 +167,8 @@ public class InfluxV9RepoWriter {
try {
logger.debug("Writing {} points to influxdb database {} at {}", influxPointArry.length,
this.influxName, this.influxUrl);
logger.debug("[{}]: sending {} points to influxdb database {} at {}", id,
influxPointArry.length, this.influxName, this.influxUrl);
HttpResponse response = this.httpClient.execute(request);
@ -173,17 +177,20 @@ public class InfluxV9RepoWriter {
if (rc != HttpStatus.SC_OK) {
HttpEntity responseEntity = response.getEntity();
String responseString = EntityUtils.toString(responseEntity, "UTF-8");
logger.error("Failed to write data to influx database {} at {}: {}", this.influxName,
this.influxUrl, String.valueOf(rc));
logger.error("Http response: {}", responseString);
logger.error("[{}]: failed to send data to influxdb database {} at {}: {}", id,
this.influxName, this.influxUrl, String.valueOf(rc));
logger.error("[{}]: http response: {}", id, responseString);
throw new Exception(rc + ":" + responseString);
}
logger
.debug("Successfully wrote {} points to influx database {} at {}", influxPointArry.length,
this.influxName, this.influxUrl);
.debug("[{}]: successfully sent {} points to influxdb database {} at {}", id,
influxPointArry.length, this.influxName, this.influxUrl);
} finally {

View File

@ -55,6 +55,12 @@ public class MeasurementBuffer {
}
public boolean isEmpty() {
return this.measurementMap.isEmpty();
}
private Map<Dimensions, List<Measurement>> initDimensionsMap(Definition definition,
Dimensions dimensions) {

View File

@ -73,7 +73,7 @@ public class VerticaAlarmRepo extends VerticaRepo implements AlarmRepo {
.bind(6, timeStamp);
}
public void flush() {
public void flush(String id) {
try {
commitBatch();
} catch (Exception e) {

View File

@ -406,7 +406,7 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo {
}
@Override
public void flush() {
public void flush(String id) {
try {
long startTime = System.currentTimeMillis();
Timer.Context context = flushTimer.time();

View File

@ -18,8 +18,8 @@
package monasca.persister;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.consumer.Consumer;
import monasca.persister.consumer.metric.KafkaMetricsConsumer;
import monasca.persister.consumer.ManagedConsumer;
import monasca.persister.consumer.KafkaConsumer;
import monasca.persister.pipeline.ManagedPipeline;
import monasca.persister.pipeline.event.MetricHandler;
@ -32,10 +32,10 @@ import org.mockito.MockitoAnnotations;
public class MonPersisterConsumerTest {
@Mock
private KafkaMetricsConsumer kafkaConsumer;
private KafkaConsumer<MetricEnvelope[]> kafkaConsumer;
@Mock
private Consumer monConsumer;
private ManagedConsumer<MetricEnvelope[]> monManagedConsumer;
private MetricHandler metricHandler;
@ -44,14 +44,14 @@ public class MonPersisterConsumerTest {
@Before
public void initMocks() {
metricHandler = Mockito.mock(MetricHandler.class);
metricPipeline = Mockito.spy(new ManagedPipeline<MetricEnvelope[]>(metricHandler));
metricPipeline = Mockito.spy(new ManagedPipeline<MetricEnvelope[]>(metricHandler, "metric-1"));
MockitoAnnotations.initMocks(this);
}
@Test
public void testKafkaConsumerLifecycle() throws Exception {
monConsumer.start();
monConsumer.stop();
monManagedConsumer.start();
monManagedConsumer.stop();
metricPipeline.shutdown();
Mockito.verify(metricHandler).flush();
}