diff --git a/pom.xml b/pom.xml
index 1a9b0a2..dd6db99 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,6 +79,11 @@
mon-util
${mon.common.version}
+
+ com.hpcloud
+ mon-messaging
+ ${mon.common.version}
+
io.dropwizard
dropwizard-core
@@ -145,6 +150,21 @@
+
+ org.apache.kafka
+ kafka_2.9.2
+ 0.8.0
+
+
+ com.sun.jdmk
+ jmxtools
+
+
+ com.sun.jmx
+ jmxri
+
+
+
org.eclipse.jetty
jetty-servlets
diff --git a/src/main/java/com/hpcloud/mon/MonApiApplication.java b/src/main/java/com/hpcloud/mon/MonApiApplication.java
index 845a9e8..774f859 100644
--- a/src/main/java/com/hpcloud/mon/MonApiApplication.java
+++ b/src/main/java/com/hpcloud/mon/MonApiApplication.java
@@ -20,10 +20,13 @@ import com.hpcloud.mon.infrastructure.identity.IdentityServiceClient;
import com.hpcloud.mon.infrastructure.servlet.AddressValidationProxyServlet;
import com.hpcloud.mon.infrastructure.servlet.PostAuthenticationFilter;
import com.hpcloud.mon.infrastructure.servlet.PreAuthenticationFilter;
+import com.hpcloud.mon.infrastructure.servlet.TenantCallCountingFilter;
+import com.hpcloud.mon.infrastructure.servlet.TenantValidationFilter;
import com.hpcloud.mon.resource.VersionResource;
import com.hpcloud.mon.resource.exception.EntityExistsExceptionMapper;
import com.hpcloud.mon.resource.exception.EntityNotFoundExceptionMapper;
import com.hpcloud.mon.resource.exception.IllegalArgumentExceptionMapper;
+import com.hpcloud.mon.resource.exception.InvalidEntityExceptionMapper;
import com.hpcloud.mon.resource.exception.JsonMappingExceptionManager;
import com.hpcloud.mon.resource.exception.JsonProcessingExceptionMapper;
import com.hpcloud.mon.resource.exception.ResourceNotFoundExceptionMapper;
@@ -57,15 +60,6 @@ public class MonApiApplication extends Application {
Injector.registerModules(new PlatformModule(environment, config), new ApplicationModule(config));
/** Configure managed services */
- // RabbitMQService internalRabbitService = Injector.getInstance(RabbitMQService.class,
- // "internal");
- // RabbitMQService externalRabbitService = Injector.getInstance(RabbitMQService.class,
- // "external");
- // environment.manage(Management.managed(internalRabbitService));
- // environment.manage(Management.managed(externalRabbitService));
- // WorkService workService = Injector.getInstance(WorkService.class);
- // environment.manage(Management.managed(workService));
- // environment.manage(Management.managed(Injector.getInstance(LockService.class)));
/** Configure resources */
environment.jersey().register(Injector.getInstance(VersionResource.class));
@@ -76,6 +70,7 @@ public class MonApiApplication extends Application {
environment.jersey().register(new EntityNotFoundExceptionMapper());
environment.jersey().register(new ResourceNotFoundExceptionMapper());
environment.jersey().register(new IllegalArgumentExceptionMapper());
+ environment.jersey().register(new InvalidEntityExceptionMapper());
environment.jersey().register(new JsonProcessingExceptionMapper());
environment.jersey().register(new JsonMappingExceptionManager());
environment.jersey().register(new ThrowableExceptionMapper() {
@@ -92,7 +87,7 @@ public class MonApiApplication extends Application {
// environment.addHealthCheck(new RabbitMQAPIHealthCheck(
// Injector.getInstance(RabbitMQAdminService.class)));
- /** Configure filters */
+ /** Configure servlet filters */
if (config.useMiddleware) {
Map authInitParams = new HashMap();
authInitParams.put("ServiceIds", config.middleware.serviceIds);
@@ -112,7 +107,7 @@ public class MonApiApplication extends Application {
authInitParams.put("ConnRetryTimes", config.middleware.connRetryTimes);
authInitParams.put("ConnRetryInterval", config.middleware.connRetryInterval);
- /** Setup auth filter chain */
+ /** Setup servlet filters */
environment.servlets()
.addFilter("pre-auth", new PreAuthenticationFilter())
.addMappingForUrlPatterns(null, true, "/*");
@@ -122,6 +117,12 @@ public class MonApiApplication extends Application {
environment.servlets()
.addFilter("post-auth", new PostAuthenticationFilter(config.middleware.rolesToMatch))
.addMappingForUrlPatterns(null, true, "/*");
+ environment.servlets()
+ .addFilter("tenant-validation", new TenantValidationFilter())
+ .addMappingForUrlPatterns(null, true, "/*");
+ environment.servlets()
+ .addFilter("call-counting", new TenantCallCountingFilter())
+ .addMappingForUrlPatterns(null, true, "/*");
}
/** Initialize the identity service */
diff --git a/src/main/java/com/hpcloud/mon/MonApiConfiguration.java b/src/main/java/com/hpcloud/mon/MonApiConfiguration.java
index afb0bfd..ba0030a 100644
--- a/src/main/java/com/hpcloud/mon/MonApiConfiguration.java
+++ b/src/main/java/com/hpcloud/mon/MonApiConfiguration.java
@@ -14,9 +14,9 @@ import javax.validation.constraints.NotNull;
import org.hibernate.validator.constraints.NotEmpty;
+import com.hpcloud.messaging.kafka.KafkaConfiguration;
import com.hpcloud.mon.infrastructure.identity.IdentityServiceConfiguration;
import com.hpcloud.mon.infrastructure.middleware.MiddlewareConfiguration;
-import com.hpcloud.mon.infrastructure.zookeeper.ZookeeperConfiguration;
/**
* @author Jonathan Halterman
@@ -26,18 +26,17 @@ public class MonApiConfiguration extends Configuration {
@Valid @NotNull public IdentityServiceConfiguration identityService;
@NotNull public Boolean accessedViaHttps;
@NotNull public Boolean useMiddleware;
- @NotEmpty public String controlExchange;
- @NotEmpty public String controlEventRoutingKey;
- @NotEmpty public String maasMetricsExchange;
+
+ @NotEmpty public String metricsTopic;
+ @NotEmpty public String eventsTopic;
+
@NotEmpty public String[] adminUsers;
@NotEmpty public String externalHost;
public String apiCallCountPersistDelta;
- @Valid @NotNull public ZookeeperConfiguration zookeeper;
- @Valid @NotNull public DataSourceFactory database = new DataSourceFactory();
- // @Valid @NotNull public RabbitMQConfiguration internalRabbit = new RabbitMQConfiguration();
- // @Valid @NotNull public RabbitMQConfiguration externalRabbit = new RabbitMQConfiguration();
+ @Valid @NotNull public DataSourceFactory database;
+ @Valid @NotNull public KafkaConfiguration kafka;
@Valid @NotNull public MiddlewareConfiguration middleware;
- @Valid @NotNull public JerseyClientConfiguration jerseyClient = new JerseyClientConfiguration();
+ @Valid @NotNull public JerseyClientConfiguration jerseyClient;
@Valid @NotNull public AddressValidationProxyConfiguration addressValidation;
public static class CloudServiceConfiguration {
diff --git a/src/main/java/com/hpcloud/mon/PlatformModule.java b/src/main/java/com/hpcloud/mon/PlatformModule.java
index d584968..027c6b0 100644
--- a/src/main/java/com/hpcloud/mon/PlatformModule.java
+++ b/src/main/java/com/hpcloud/mon/PlatformModule.java
@@ -5,18 +5,19 @@ import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.jdbi.DBIFactory;
import io.dropwizard.setup.Environment;
-import java.io.File;
-import java.io.FileInputStream;
-import java.security.KeyStore;
+import java.util.Properties;
+
+import javax.inject.Singleton;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.ProducerConfig;
-import org.apache.http.conn.scheme.PlainSocketFactory;
-import org.apache.http.conn.scheme.Scheme;
-import org.apache.http.conn.scheme.SchemeRegistry;
-import org.apache.http.conn.ssl.SSLSocketFactory;
import org.skife.jdbi.v2.DBI;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Joiner;
import com.google.inject.AbstractModule;
-import com.google.inject.Provider;
+import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import com.google.inject.Scopes;
import com.sun.jersey.api.client.Client;
@@ -37,79 +38,37 @@ public class PlatformModule extends AbstractModule {
@Override
protected void configure() {
+ bind(MetricRegistry.class).in(Scopes.SINGLETON);
bind(DataSourceFactory.class).toInstance(config.database);
- bind(DBI.class).toProvider(new Provider() {
- @Override
- public DBI get() {
- try {
- return new DBIFactory().build(environment, config.database, "mysql");
- } catch (ClassNotFoundException e) {
- throw new ProvisionException("Failed to provision DBI", e);
- }
- }
- }).in(Scopes.SINGLETON);
-
- // install(new RabbitMQModule());
- // bind(RabbitMQConfiguration.class).annotatedWith(Names.named("internal")).toInstance(
- // config.internalRabbit);
- // bind(RabbitMQConfiguration.class).annotatedWith(Names.named("external")).toInstance(
- // config.externalRabbit);
-
- bind(Client.class).toProvider(new Provider() {
- @Override
- @SuppressWarnings("deprecation")
- public Client get() {
- try {
- // Keystore
- KeyStore ks = KeyStore.getInstance("jks");
- FileInputStream is1 = new FileInputStream(new File(config.middleware.keystore));
- try {
- ks.load(is1, config.middleware.keystorePass.toCharArray());
- } finally {
- is1.close();
- }
-
- // Truststore
- KeyStore ts = KeyStore.getInstance("jks");
- FileInputStream is2 = new FileInputStream(new File(config.middleware.truststore));
- try {
- ts.load(is2, config.middleware.truststorePass.toCharArray());
- } finally {
- is2.close();
- }
-
- SSLSocketFactory ssf = new SSLSocketFactory(ks, config.middleware.keystorePass, ts);
- PlainSocketFactory psf = PlainSocketFactory.getSocketFactory();
- SchemeRegistry sr = new SchemeRegistry();
- sr.register(new Scheme("http", 80, psf));
- sr.register(new Scheme("http", 8080, psf));
- sr.register(new Scheme("https", 443, ssf));
- return new JerseyClientBuilder(environment).using(config.jerseyClient)
- .using(environment)
- .using(sr)
- .build("default");
- } catch (Exception e) {
- throw new ProvisionException("Failed to create jersey client", e);
- }
- }
- }).in(Scopes.SINGLETON);
-
- // // Bind external config by default for AdminService to use
- // bind(RabbitMQConfiguration.class).toInstance(config.externalRabbit);
- // bind(RabbitMQAdminService.class).in(Scopes.SINGLETON);
}
- // @Provides
- // @Named("internal")
- // @Singleton
- // public RabbitMQService getInternalRabbit() {
- // return new RabbitMQService(config.internalRabbit);
- // }
- //
- // @Provides
- // @Named("external")
- // @Singleton
- // public RabbitMQService getExternalRabbit() {
- // return new RabbitMQService(config.externalRabbit);
- // }
+ @Provides
+ @Singleton
+ public DBI getDBI() {
+ try {
+ return new DBIFactory().build(environment, config.database, "mysql");
+ } catch (ClassNotFoundException e) {
+ throw new ProvisionException("Failed to provision DBI", e);
+ }
+ }
+
+ @Provides
+ @Singleton
+ public Client getClient() {
+ return new JerseyClientBuilder(environment).using(config.jerseyClient)
+ .using(environment)
+ .build("default");
+ }
+
+ @Provides
+ @Singleton
+ public Producer, ?> getProducer() {
+ Properties props = new Properties();
+ props.put("metadata.broker.list", Joiner.on(',').join(config.kafka.hosts));
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("partitioner.class", "example.producer.SimplePartitioner");
+ props.put("request.required.acks", "1");
+ ProducerConfig config = new ProducerConfig(props);
+ return new Producer(config);
+ }
}
diff --git a/src/main/java/com/hpcloud/mon/app/AlarmService.java b/src/main/java/com/hpcloud/mon/app/AlarmService.java
index 8da1a20..b3ae929 100644
--- a/src/main/java/com/hpcloud/mon/app/AlarmService.java
+++ b/src/main/java/com/hpcloud/mon/app/AlarmService.java
@@ -1,13 +1,14 @@
package com.hpcloud.mon.app;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.inject.Inject;
-import javax.inject.Named;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -19,12 +20,12 @@ import com.hpcloud.mon.common.model.alarm.AlarmExpression;
import com.hpcloud.mon.common.model.alarm.AlarmSubExpression;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.exception.EntityExistsException;
+import com.hpcloud.mon.domain.exception.InvalidEntityException;
import com.hpcloud.mon.domain.model.alarm.AlarmDetail;
import com.hpcloud.mon.domain.model.alarm.AlarmRepository;
import com.hpcloud.mon.domain.model.notificationmethod.NotificationMethodRepository;
import com.hpcloud.util.Exceptions;
import com.hpcloud.util.Serialization;
-import com.yammer.dropwizard.validation.InvalidEntityException;
/**
* Services alarm related requests.
@@ -33,18 +34,17 @@ import com.yammer.dropwizard.validation.InvalidEntityException;
*/
public class AlarmService {
private static final Logger LOG = LoggerFactory.getLogger(AlarmService.class);
+
+ private final MonApiConfiguration config;
+ private final Producer producer;
private final AlarmRepository repo;
- private final String controlExchange;
- private final String controlRoutingKey;
- // private final RabbitMQService rabbitService;
private final NotificationMethodRepository notificationMethodRepo;
@Inject
- public AlarmService(MonApiConfiguration config, @Named("external") RabbitMQService rabbitService,
+ public AlarmService(MonApiConfiguration config, Producer producer,
AlarmRepository repo, NotificationMethodRepository notificationMethodRepo) {
- controlExchange = config.controlExchange;
- controlRoutingKey = config.controlEventRoutingKey;
- this.rabbitService = rabbitService;
+ this.config = config;
+ this.producer = producer;
this.repo = repo;
this.notificationMethodRepo = notificationMethodRepo;
}
@@ -63,8 +63,7 @@ public class AlarmService {
// Assert notification methods exist for tenant
for (String alarmAction : alarmActions)
if (!notificationMethodRepo.exists(tenantId, alarmAction))
- throw new InvalidEntityException("The alarm is invalid", Arrays.asList(String.format(
- "No notification method exists for %s", alarmAction)));
+ throw new InvalidEntityException("No notification method exists for %s", alarmAction);
Map subAlarms = new HashMap();
for (AlarmSubExpression subExpression : alarmExpression.getSubExpressions()) {
@@ -82,7 +81,7 @@ public class AlarmService {
// Notify interested parties of new alarm
String event = Serialization.toJson(new AlarmCreatedEvent(tenantId, alarmId, name,
expression, subAlarms));
- rabbitService.send(controlExchange, controlRoutingKey, event);
+ producer.send(new KeyedMessage<>(config.eventsTopic, tenantId, event));
return alarm;
} catch (Exception e) {
@@ -101,6 +100,6 @@ public class AlarmService {
// Notify interested parties of alarm deletion
String event = Serialization.toJson(new AlarmDeletedEvent(tenantId, alarmId, subAlarmMetricDefs));
- rabbitService.send(controlExchange, controlRoutingKey, event);
+ producer.send(new KeyedMessage<>(config.eventsTopic, tenantId, event));
}
}
diff --git a/src/main/java/com/hpcloud/mon/app/MetricService.java b/src/main/java/com/hpcloud/mon/app/MetricService.java
index ee63cbf..90c4483 100644
--- a/src/main/java/com/hpcloud/mon/app/MetricService.java
+++ b/src/main/java/com/hpcloud/mon/app/MetricService.java
@@ -1,22 +1,19 @@
package com.hpcloud.mon.app;
-import java.util.Calendar;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
import javax.annotation.Nullable;
import javax.inject.Inject;
-import javax.inject.Named;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
-import com.google.common.hash.HashCode;
import com.hpcloud.mon.MonApiConfiguration;
import com.hpcloud.mon.common.model.metric.Metric;
-import com.hpcloud.mon.common.model.metric.Metrics;
+import com.hpcloud.mon.common.model.metric.MetricEnvelope;
+import com.hpcloud.mon.common.model.metric.MetricEnvelopes;
/**
* Metric service implementation.
@@ -24,45 +21,44 @@ import com.hpcloud.mon.common.model.metric.Metrics;
* @author Todd Walk
*/
public class MetricService {
- private Map metricTypeCache = new ConcurrentHashMap();
- private AtomicInteger cacheMonth = new AtomicInteger(-1); // Must empty out the metricTypeCache
- // each month
- private final RabbitMQService rabbitService;
- private final MetricRepository repo;
- private final String exchange;
+ // private Map metricTypeCache = new ConcurrentHashMap();
+ // private AtomicInteger cacheMonth = new AtomicInteger(-1); // Must empty out the metricTypeCache
+ // // each month
+ private final MonApiConfiguration config;
+ private final Producer producer;
+ // private final MetricRepository repo;
private final Meter metricMeter;
@Inject
- public MetricService(MonApiConfiguration config,
- @Named("internal") RabbitMQService rabbitService, MetricRepository repo,
+ public MetricService(MonApiConfiguration config, Producer producer,
MetricRegistry metricRegistry) {
- exchange = config.maasMetricsExchange;
- this.rabbitService = rabbitService;
- this.repo = repo;
- metricMeter = metricRegistry.meter("maas metrics published");
- cacheMonth = new AtomicInteger(Calendar.getInstance().get(Calendar.MONTH));
+ this.config = config;
+ this.producer = producer;
+ metricMeter = metricRegistry.meter(MetricRegistry.name(MetricService.class, "metrics.published"));
+ // cacheMonth = new AtomicInteger(Calendar.getInstance().get(Calendar.MONTH));
}
public void create(Metric metric, String tenantId, @Nullable String crossTenantId,
String authToken) {
- // Send metric
- String event = Metrics.toJson(metric);
- Builder builder = new ImmutableMap.Builder().put("tenantId",
+ Builder meta = new ImmutableMap.Builder().put("tenantId",
tenantId).put("authToken", authToken);
if (crossTenantId != null)
- builder.put("crossTenantId", crossTenantId);
- rabbitService.sendUTF8(exchange, tenantId, event, builder.build());
+ meta.put("crossTenantId", crossTenantId);
+ MetricEnvelope envelope = new MetricEnvelope(metric, meta.build());
+ KeyedMessage keyedMessage = new KeyedMessage<>(config.metricsTopic, tenantId,
+ MetricEnvelopes.toJson(envelope));
+ producer.send(keyedMessage);
- // Add metric stats
- int month = Calendar.getInstance().get(Calendar.MONTH);
- int prevMonth = month > 0 ? month - 1 : 11;
- if (cacheMonth.compareAndSet(prevMonth, month))
- metricTypeCache.clear();
- HashCode hash = metric.definition().toHashCode();
- if (!metricTypeCache.containsKey(hash)) {
- metricTypeCache.put(hash, tenantId);
- repo.persist(tenantId, hash);
- }
+ // // Add metric stats
+ // int month = Calendar.getInstance().get(Calendar.MONTH);
+ // int prevMonth = month > 0 ? month - 1 : 11;
+ // if (cacheMonth.compareAndSet(prevMonth, month))
+ // metricTypeCache.clear();
+ // HashCode hash = metric.definition().toHashCode();
+ // if (!metricTypeCache.containsKey(hash)) {
+ // metricTypeCache.put(hash, tenantId);
+ // repo.persist(tenantId, hash);
+ // }
metricMeter.mark();
}