Added initial Kafka support

This commit is contained in:
Jonathan Halterman
2014-02-19 21:06:12 -08:00
parent 551ff6966b
commit ce58b7fde2
6 changed files with 124 additions and 150 deletions

20
pom.xml
View File

@@ -79,6 +79,11 @@
<artifactId>mon-util</artifactId>
<version>${mon.common.version}</version>
</dependency>
<dependency>
<groupId>com.hpcloud</groupId>
<artifactId>mon-messaging</artifactId>
<version>${mon.common.version}</version>
</dependency>
<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-core</artifactId>
@@ -145,6 +150,21 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.0</version>
<exclusions>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>

View File

@@ -20,10 +20,13 @@ import com.hpcloud.mon.infrastructure.identity.IdentityServiceClient;
import com.hpcloud.mon.infrastructure.servlet.AddressValidationProxyServlet;
import com.hpcloud.mon.infrastructure.servlet.PostAuthenticationFilter;
import com.hpcloud.mon.infrastructure.servlet.PreAuthenticationFilter;
import com.hpcloud.mon.infrastructure.servlet.TenantCallCountingFilter;
import com.hpcloud.mon.infrastructure.servlet.TenantValidationFilter;
import com.hpcloud.mon.resource.VersionResource;
import com.hpcloud.mon.resource.exception.EntityExistsExceptionMapper;
import com.hpcloud.mon.resource.exception.EntityNotFoundExceptionMapper;
import com.hpcloud.mon.resource.exception.IllegalArgumentExceptionMapper;
import com.hpcloud.mon.resource.exception.InvalidEntityExceptionMapper;
import com.hpcloud.mon.resource.exception.JsonMappingExceptionManager;
import com.hpcloud.mon.resource.exception.JsonProcessingExceptionMapper;
import com.hpcloud.mon.resource.exception.ResourceNotFoundExceptionMapper;
@@ -57,15 +60,6 @@ public class MonApiApplication extends Application<MonApiConfiguration> {
Injector.registerModules(new PlatformModule(environment, config), new ApplicationModule(config));
/** Configure managed services */
// RabbitMQService internalRabbitService = Injector.getInstance(RabbitMQService.class,
// "internal");
// RabbitMQService externalRabbitService = Injector.getInstance(RabbitMQService.class,
// "external");
// environment.manage(Management.managed(internalRabbitService));
// environment.manage(Management.managed(externalRabbitService));
// WorkService workService = Injector.getInstance(WorkService.class);
// environment.manage(Management.managed(workService));
// environment.manage(Management.managed(Injector.getInstance(LockService.class)));
/** Configure resources */
environment.jersey().register(Injector.getInstance(VersionResource.class));
@@ -76,6 +70,7 @@ public class MonApiApplication extends Application<MonApiConfiguration> {
environment.jersey().register(new EntityNotFoundExceptionMapper());
environment.jersey().register(new ResourceNotFoundExceptionMapper());
environment.jersey().register(new IllegalArgumentExceptionMapper());
environment.jersey().register(new InvalidEntityExceptionMapper());
environment.jersey().register(new JsonProcessingExceptionMapper());
environment.jersey().register(new JsonMappingExceptionManager());
environment.jersey().register(new ThrowableExceptionMapper<Throwable>() {
@@ -92,7 +87,7 @@ public class MonApiApplication extends Application<MonApiConfiguration> {
// environment.addHealthCheck(new RabbitMQAPIHealthCheck(
// Injector.getInstance(RabbitMQAdminService.class)));
/** Configure filters */
/** Configure servlet filters */
if (config.useMiddleware) {
Map<String, String> authInitParams = new HashMap<String, String>();
authInitParams.put("ServiceIds", config.middleware.serviceIds);
@@ -112,7 +107,7 @@ public class MonApiApplication extends Application<MonApiConfiguration> {
authInitParams.put("ConnRetryTimes", config.middleware.connRetryTimes);
authInitParams.put("ConnRetryInterval", config.middleware.connRetryInterval);
/** Setup auth filter chain */
/** Setup servlet filters */
environment.servlets()
.addFilter("pre-auth", new PreAuthenticationFilter())
.addMappingForUrlPatterns(null, true, "/*");
@@ -122,6 +117,12 @@ public class MonApiApplication extends Application<MonApiConfiguration> {
environment.servlets()
.addFilter("post-auth", new PostAuthenticationFilter(config.middleware.rolesToMatch))
.addMappingForUrlPatterns(null, true, "/*");
environment.servlets()
.addFilter("tenant-validation", new TenantValidationFilter())
.addMappingForUrlPatterns(null, true, "/*");
environment.servlets()
.addFilter("call-counting", new TenantCallCountingFilter())
.addMappingForUrlPatterns(null, true, "/*");
}
/** Initialize the identity service */

View File

@@ -14,9 +14,9 @@ import javax.validation.constraints.NotNull;
import org.hibernate.validator.constraints.NotEmpty;
import com.hpcloud.messaging.kafka.KafkaConfiguration;
import com.hpcloud.mon.infrastructure.identity.IdentityServiceConfiguration;
import com.hpcloud.mon.infrastructure.middleware.MiddlewareConfiguration;
import com.hpcloud.mon.infrastructure.zookeeper.ZookeeperConfiguration;
/**
* @author Jonathan Halterman
@@ -26,18 +26,17 @@ public class MonApiConfiguration extends Configuration {
@Valid @NotNull public IdentityServiceConfiguration identityService;
@NotNull public Boolean accessedViaHttps;
@NotNull public Boolean useMiddleware;
@NotEmpty public String controlExchange;
@NotEmpty public String controlEventRoutingKey;
@NotEmpty public String maasMetricsExchange;
@NotEmpty public String metricsTopic;
@NotEmpty public String eventsTopic;
@NotEmpty public String[] adminUsers;
@NotEmpty public String externalHost;
public String apiCallCountPersistDelta;
@Valid @NotNull public ZookeeperConfiguration zookeeper;
@Valid @NotNull public DataSourceFactory database = new DataSourceFactory();
// @Valid @NotNull public RabbitMQConfiguration internalRabbit = new RabbitMQConfiguration();
// @Valid @NotNull public RabbitMQConfiguration externalRabbit = new RabbitMQConfiguration();
@Valid @NotNull public DataSourceFactory database;
@Valid @NotNull public KafkaConfiguration kafka;
@Valid @NotNull public MiddlewareConfiguration middleware;
@Valid @NotNull public JerseyClientConfiguration jerseyClient = new JerseyClientConfiguration();
@Valid @NotNull public JerseyClientConfiguration jerseyClient;
@Valid @NotNull public AddressValidationProxyConfiguration addressValidation;
public static class CloudServiceConfiguration {

View File

@@ -5,18 +5,19 @@ import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.jdbi.DBIFactory;
import io.dropwizard.setup.Environment;
import java.io.File;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.util.Properties;
import javax.inject.Singleton;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.skife.jdbi.v2.DBI;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Joiner;
import com.google.inject.AbstractModule;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import com.google.inject.Scopes;
import com.sun.jersey.api.client.Client;
@@ -37,79 +38,37 @@ public class PlatformModule extends AbstractModule {
@Override
protected void configure() {
bind(MetricRegistry.class).in(Scopes.SINGLETON);
bind(DataSourceFactory.class).toInstance(config.database);
bind(DBI.class).toProvider(new Provider<DBI>() {
@Override
public DBI get() {
try {
return new DBIFactory().build(environment, config.database, "mysql");
} catch (ClassNotFoundException e) {
throw new ProvisionException("Failed to provision DBI", e);
}
}
}).in(Scopes.SINGLETON);
// install(new RabbitMQModule());
// bind(RabbitMQConfiguration.class).annotatedWith(Names.named("internal")).toInstance(
// config.internalRabbit);
// bind(RabbitMQConfiguration.class).annotatedWith(Names.named("external")).toInstance(
// config.externalRabbit);
bind(Client.class).toProvider(new Provider<Client>() {
@Override
@SuppressWarnings("deprecation")
public Client get() {
try {
// Keystore
KeyStore ks = KeyStore.getInstance("jks");
FileInputStream is1 = new FileInputStream(new File(config.middleware.keystore));
try {
ks.load(is1, config.middleware.keystorePass.toCharArray());
} finally {
is1.close();
}
// Truststore
KeyStore ts = KeyStore.getInstance("jks");
FileInputStream is2 = new FileInputStream(new File(config.middleware.truststore));
try {
ts.load(is2, config.middleware.truststorePass.toCharArray());
} finally {
is2.close();
}
SSLSocketFactory ssf = new SSLSocketFactory(ks, config.middleware.keystorePass, ts);
PlainSocketFactory psf = PlainSocketFactory.getSocketFactory();
SchemeRegistry sr = new SchemeRegistry();
sr.register(new Scheme("http", 80, psf));
sr.register(new Scheme("http", 8080, psf));
sr.register(new Scheme("https", 443, ssf));
return new JerseyClientBuilder(environment).using(config.jerseyClient)
.using(environment)
.using(sr)
.build("default");
} catch (Exception e) {
throw new ProvisionException("Failed to create jersey client", e);
}
}
}).in(Scopes.SINGLETON);
// // Bind external config by default for AdminService to use
// bind(RabbitMQConfiguration.class).toInstance(config.externalRabbit);
// bind(RabbitMQAdminService.class).in(Scopes.SINGLETON);
}
// @Provides
// @Named("internal")
// @Singleton
// public RabbitMQService getInternalRabbit() {
// return new RabbitMQService(config.internalRabbit);
// }
//
// @Provides
// @Named("external")
// @Singleton
// public RabbitMQService getExternalRabbit() {
// return new RabbitMQService(config.externalRabbit);
// }
@Provides
@Singleton
public DBI getDBI() {
try {
return new DBIFactory().build(environment, config.database, "mysql");
} catch (ClassNotFoundException e) {
throw new ProvisionException("Failed to provision DBI", e);
}
}
@Provides
@Singleton
public Client getClient() {
return new JerseyClientBuilder(environment).using(config.jerseyClient)
.using(environment)
.build("default");
}
@Provides
@Singleton
public Producer<?, ?> getProducer() {
Properties props = new Properties();
props.put("metadata.broker.list", Joiner.on(',').join(config.kafka.hosts));
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
return new Producer<String, String>(config);
}
}

View File

@@ -1,13 +1,14 @@
package com.hpcloud.mon.app;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.inject.Inject;
import javax.inject.Named;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -19,12 +20,12 @@ import com.hpcloud.mon.common.model.alarm.AlarmExpression;
import com.hpcloud.mon.common.model.alarm.AlarmSubExpression;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.exception.EntityExistsException;
import com.hpcloud.mon.domain.exception.InvalidEntityException;
import com.hpcloud.mon.domain.model.alarm.AlarmDetail;
import com.hpcloud.mon.domain.model.alarm.AlarmRepository;
import com.hpcloud.mon.domain.model.notificationmethod.NotificationMethodRepository;
import com.hpcloud.util.Exceptions;
import com.hpcloud.util.Serialization;
import com.yammer.dropwizard.validation.InvalidEntityException;
/**
* Services alarm related requests.
@@ -33,18 +34,17 @@ import com.yammer.dropwizard.validation.InvalidEntityException;
*/
public class AlarmService {
private static final Logger LOG = LoggerFactory.getLogger(AlarmService.class);
private final MonApiConfiguration config;
private final Producer<String, String> producer;
private final AlarmRepository repo;
private final String controlExchange;
private final String controlRoutingKey;
// private final RabbitMQService rabbitService;
private final NotificationMethodRepository notificationMethodRepo;
@Inject
public AlarmService(MonApiConfiguration config, @Named("external") RabbitMQService rabbitService,
public AlarmService(MonApiConfiguration config, Producer<String, String> producer,
AlarmRepository repo, NotificationMethodRepository notificationMethodRepo) {
controlExchange = config.controlExchange;
controlRoutingKey = config.controlEventRoutingKey;
this.rabbitService = rabbitService;
this.config = config;
this.producer = producer;
this.repo = repo;
this.notificationMethodRepo = notificationMethodRepo;
}
@@ -63,8 +63,7 @@ public class AlarmService {
// Assert notification methods exist for tenant
for (String alarmAction : alarmActions)
if (!notificationMethodRepo.exists(tenantId, alarmAction))
throw new InvalidEntityException("The alarm is invalid", Arrays.asList(String.format(
"No notification method exists for %s", alarmAction)));
throw new InvalidEntityException("No notification method exists for %s", alarmAction);
Map<String, AlarmSubExpression> subAlarms = new HashMap<String, AlarmSubExpression>();
for (AlarmSubExpression subExpression : alarmExpression.getSubExpressions()) {
@@ -82,7 +81,7 @@ public class AlarmService {
// Notify interested parties of new alarm
String event = Serialization.toJson(new AlarmCreatedEvent(tenantId, alarmId, name,
expression, subAlarms));
rabbitService.send(controlExchange, controlRoutingKey, event);
producer.send(new KeyedMessage<>(config.eventsTopic, tenantId, event));
return alarm;
} catch (Exception e) {
@@ -101,6 +100,6 @@ public class AlarmService {
// Notify interested parties of alarm deletion
String event = Serialization.toJson(new AlarmDeletedEvent(tenantId, alarmId, subAlarmMetricDefs));
rabbitService.send(controlExchange, controlRoutingKey, event);
producer.send(new KeyedMessage<>(config.eventsTopic, tenantId, event));
}
}

View File

@@ -1,22 +1,19 @@
package com.hpcloud.mon.app;
import java.util.Calendar;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.hash.HashCode;
import com.hpcloud.mon.MonApiConfiguration;
import com.hpcloud.mon.common.model.metric.Metric;
import com.hpcloud.mon.common.model.metric.Metrics;
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
import com.hpcloud.mon.common.model.metric.MetricEnvelopes;
/**
* Metric service implementation.
@@ -24,45 +21,44 @@ import com.hpcloud.mon.common.model.metric.Metrics;
* @author Todd Walk
*/
public class MetricService {
private Map<HashCode, String> metricTypeCache = new ConcurrentHashMap<HashCode, String>();
private AtomicInteger cacheMonth = new AtomicInteger(-1); // Must empty out the metricTypeCache
// each month
private final RabbitMQService rabbitService;
private final MetricRepository repo;
private final String exchange;
// private Map<HashCode, String> metricTypeCache = new ConcurrentHashMap<HashCode, String>();
// private AtomicInteger cacheMonth = new AtomicInteger(-1); // Must empty out the metricTypeCache
// // each month
private final MonApiConfiguration config;
private final Producer<String, String> producer;
// private final MetricRepository repo;
private final Meter metricMeter;
@Inject
public MetricService(MonApiConfiguration config,
@Named("internal") RabbitMQService rabbitService, MetricRepository repo,
public MetricService(MonApiConfiguration config, Producer<String, String> producer,
MetricRegistry metricRegistry) {
exchange = config.maasMetricsExchange;
this.rabbitService = rabbitService;
this.repo = repo;
metricMeter = metricRegistry.meter("maas metrics published");
cacheMonth = new AtomicInteger(Calendar.getInstance().get(Calendar.MONTH));
this.config = config;
this.producer = producer;
metricMeter = metricRegistry.meter(MetricRegistry.name(MetricService.class, "metrics.published"));
// cacheMonth = new AtomicInteger(Calendar.getInstance().get(Calendar.MONTH));
}
public void create(Metric metric, String tenantId, @Nullable String crossTenantId,
String authToken) {
// Send metric
String event = Metrics.toJson(metric);
Builder<String, Object> builder = new ImmutableMap.Builder<String, Object>().put("tenantId",
Builder<String, Object> meta = new ImmutableMap.Builder<String, Object>().put("tenantId",
tenantId).put("authToken", authToken);
if (crossTenantId != null)
builder.put("crossTenantId", crossTenantId);
rabbitService.sendUTF8(exchange, tenantId, event, builder.build());
meta.put("crossTenantId", crossTenantId);
MetricEnvelope envelope = new MetricEnvelope(metric, meta.build());
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(config.metricsTopic, tenantId,
MetricEnvelopes.toJson(envelope));
producer.send(keyedMessage);
// Add metric stats
int month = Calendar.getInstance().get(Calendar.MONTH);
int prevMonth = month > 0 ? month - 1 : 11;
if (cacheMonth.compareAndSet(prevMonth, month))
metricTypeCache.clear();
HashCode hash = metric.definition().toHashCode();
if (!metricTypeCache.containsKey(hash)) {
metricTypeCache.put(hash, tenantId);
repo.persist(tenantId, hash);
}
// // Add metric stats
// int month = Calendar.getInstance().get(Calendar.MONTH);
// int prevMonth = month > 0 ? month - 1 : 11;
// if (cacheMonth.compareAndSet(prevMonth, month))
// metricTypeCache.clear();
// HashCode hash = metric.definition().toHashCode();
// if (!metricTypeCache.containsKey(hash)) {
// metricTypeCache.put(hash, tenantId);
// repo.persist(tenantId, hash);
// }
metricMeter.mark();
}