Adaptations for NebulOuS CI/CD

- Use library template for Zuul
- Change from Gradle to Maven

Change-Id: I1128bf3f17b8782cc3984af33d4e13a4f18ad53d
This commit is contained in:
robert.sanfeliu 2023-12-22 17:15:00 +01:00
parent fd7e34b3d1
commit 52c030334f
34 changed files with 1656 additions and 598 deletions

10
.gitignore vendored
View File

@ -1,2 +1,12 @@
/iot_dpp/.classpath
/iot_dpp/.gitattributes
/iot_dpp/.project
/iot_dpp/.settings/org.eclipse.buildship.core.prefs
/iot_dpp/.settings/org.eclipse.jdt.core.prefs
/iot_dpp/gradlew.bat
/iot_dpp/gradle/wrapper/gradle-wrapper.jar
/iot_dpp/gradle/wrapper/gradle-wrapper.properties
/iot_dpp/gradlew
__pycache__/
.nox/
/iot_dpp/maven-repo

View File

@ -1,23 +0,0 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

View File

@ -1,24 +0,0 @@
apiVersion: v2
name: nebulous-iot-dpp-orchestrator
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "latest"

View File

@ -1,22 +0,0 @@
1. Get the application URL by running these commands:
{{- if .Values.ingress.enabled }}
{{- range $host := .Values.ingress.hosts }}
{{- range .paths }}
http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }}
{{- end }}
{{- end }}
{{- else if contains "NodePort" .Values.service.type }}
export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "nebulous-iot-dpp-orchestrator.fullname" . }})
export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")
echo http://$NODE_IP:$NODE_PORT
{{- else if contains "LoadBalancer" .Values.service.type }}
NOTE: It may take a few minutes for the LoadBalancer IP to be available.
You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "nebulous-iot-dpp-orchestrator.fullname" . }}'
export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "nebulous-iot-dpp-orchestrator.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")
echo http://$SERVICE_IP:{{ .Values.service.port }}
{{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "nebulous-iot-dpp-orchestrator.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
echo "Visit http://127.0.0.1:8080 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT
{{- end }}

View File

@ -1,62 +0,0 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "nebulous-iot-dpp-orchestrator.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "nebulous-iot-dpp-orchestrator.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "nebulous-iot-dpp-orchestrator.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "nebulous-iot-dpp-orchestrator.labels" -}}
helm.sh/chart: {{ include "nebulous-iot-dpp-orchestrator.chart" . }}
{{ include "nebulous-iot-dpp-orchestrator.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "nebulous-iot-dpp-orchestrator.selectorLabels" -}}
app.kubernetes.io/name: {{ include "nebulous-iot-dpp-orchestrator.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "nebulous-iot-dpp-orchestrator.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "nebulous-iot-dpp-orchestrator.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}

View File

@ -1,61 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "nebulous-iot-dpp-orchestrator.fullname" . }}
labels:
{{- include "nebulous-iot-dpp-orchestrator.labels" . | nindent 4 }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: {{ .Values.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "nebulous-iot-dpp-orchestrator.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "nebulous-iot-dpp-orchestrator.selectorLabels" . | nindent 8 }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "nebulous-iot-dpp-orchestrator.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: 8080
protocol: TCP
livenessProbe:
httpGet:
path: /
port: http
readinessProbe:
httpGet:
path: /
port: http
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@ -1,28 +0,0 @@
{{- if .Values.autoscaling.enabled }}
apiVersion: autoscaling/v2beta1
kind: HorizontalPodAutoscaler
metadata:
name: {{ include "nebulous-iot-dpp-orchestrator.fullname" . }}
labels:
{{- include "nebulous-iot-dpp-orchestrator.labels" . | nindent 4 }}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: {{ include "nebulous-iot-dpp-orchestrator.fullname" . }}
minReplicas: {{ .Values.autoscaling.minReplicas }}
maxReplicas: {{ .Values.autoscaling.maxReplicas }}
metrics:
{{- if .Values.autoscaling.targetCPUUtilizationPercentage }}
- type: Resource
resource:
name: cpu
targetAverageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }}
{{- end }}
{{- if .Values.autoscaling.targetMemoryUtilizationPercentage }}
- type: Resource
resource:
name: memory
targetAverageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }}
{{- end }}
{{- end }}

View File

@ -1,61 +0,0 @@
{{- if .Values.ingress.enabled -}}
{{- $fullName := include "nebulous-iot-dpp-orchestrator.fullname" . -}}
{{- $svcPort := .Values.service.port -}}
{{- if and .Values.ingress.className (not (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion)) }}
{{- if not (hasKey .Values.ingress.annotations "kubernetes.io/ingress.class") }}
{{- $_ := set .Values.ingress.annotations "kubernetes.io/ingress.class" .Values.ingress.className}}
{{- end }}
{{- end }}
{{- if semverCompare ">=1.19-0" .Capabilities.KubeVersion.GitVersion -}}
apiVersion: networking.k8s.io/v1
{{- else if semverCompare ">=1.14-0" .Capabilities.KubeVersion.GitVersion -}}
apiVersion: networking.k8s.io/v1beta1
{{- else -}}
apiVersion: extensions/v1beta1
{{- end }}
kind: Ingress
metadata:
name: {{ $fullName }}
labels:
{{- include "nebulous-iot-dpp-orchestrator.labels" . | nindent 4 }}
{{- with .Values.ingress.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
{{- if and .Values.ingress.className (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion) }}
ingressClassName: {{ .Values.ingress.className }}
{{- end }}
{{- if .Values.ingress.tls }}
tls:
{{- range .Values.ingress.tls }}
- hosts:
{{- range .hosts }}
- {{ . | quote }}
{{- end }}
secretName: {{ .secretName }}
{{- end }}
{{- end }}
rules:
{{- range .Values.ingress.hosts }}
- host: {{ .host | quote }}
http:
paths:
{{- range .paths }}
- path: {{ .path }}
{{- if and .pathType (semverCompare ">=1.18-0" $.Capabilities.KubeVersion.GitVersion) }}
pathType: {{ .pathType }}
{{- end }}
backend:
{{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }}
service:
name: {{ $fullName }}
port:
number: {{ $svcPort }}
{{- else }}
serviceName: {{ $fullName }}
servicePort: {{ $svcPort }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}

View File

@ -1,15 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "nebulous-iot-dpp-orchestrator.fullname" . }}
labels:
{{- include "nebulous-iot-dpp-orchestrator.labels" . | nindent 4 }}
spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
protocol: TCP
name: http
selector:
{{- include "nebulous-iot-dpp-orchestrator.selectorLabels" . | nindent 4 }}

View File

@ -1,12 +0,0 @@
{{- if .Values.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "nebulous-iot-dpp-orchestrator.serviceAccountName" . }}
labels:
{{- include "nebulous-iot-dpp-orchestrator.labels" . | nindent 4 }}
{{- with .Values.serviceAccount.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- end }}

View File

@ -1,82 +0,0 @@
# Default values for nebulous-iot-dpp-orchestrator.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
replicaCount: 1
image:
repository: "quay.io/nebulous/iot-dpp-orchestrator-java-spring-boot-demo"
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: ""
imagePullSecrets: []
nameOverride: ""
fullnameOverride: ""
serviceAccount:
# Specifies whether a service account should be created
create: true
# Annotations to add to the service account
annotations: {}
# The name of the service account to use.
# If not set and create is true, a name is generated using the fullname template
name: ""
podAnnotations: {}
podSecurityContext: {}
# fsGroup: 2000
securityContext: {}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000
service:
type: ClusterIP
port: 80
ingress:
enabled: false
className: ""
annotations: {}
# kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true"
hosts:
- host: chart-example.local
paths:
- path: /
pathType: ImplementationSpecific
tls: []
# - secretName: chart-example-tls
# hosts:
# - chart-example.local
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 100
targetCPUUtilizationPercentage: 80
# targetMemoryUtilizationPercentage: 80
nodeSelector: {}
tolerations: []
affinity: {}

31
iot_dpp/README.md Normal file
View File

@ -0,0 +1,31 @@
# IoT data processing pipelines orchestration tool
The IoT data processing pipelines orchestration tool allows users to declaratively express data transformation pipelines by identifying their main components (data sources, transformation operations and data consumers) and interlace them to conform data transformation flows. Once defined by the user, the orchestration of these data transformation pipelines is handled by NebulOuS core, allocating/deallocating computational resources whenever needed to adapt to the current workload. The tool also offers the opportunity to the user to monitor the execution of these pipelines and detect any error occurring on them.
## Documentation
The repository contains the source code for two ActiveMQ Artemis plugins necessary for implementing the NebulOuS IoT data processing pipelines orchestration tool.
### MessageGroupIDAnnotationPlugin
ActiveMQ Artemis plugin for extracting the value to be used as JMSXGroupID from the message.
### MessageLifecycleMonitoringPlugin
ActiveMQ Artemis plugin for tracking the lifecycle of messages inside an ActiveMQ cluster. On each step of the message lifecycle (a producer writes a message to the cluster, the message is delivered to a consumer, the consumer ACKs the message), the plugin generates events with relevant information that can be used for understanding how IoT applications components on NebulOuS are communicating.
## Installation
Build the source code.
Configure your ActiveMQ Artemis to use the generated plugins. Follow the [documentation](https://activemq.apache.org/components/artemis/documentation/latest/broker-plugins.html)
## Authors
- [Robert Sanfeliu Prat (Eurecat)](robert.sanfeliu@eurecat.org)
## Acknowledgements
- NebulOuS is a project Funded by the European Union. Views and opinions expressed are however those of the author(s) only and do not necessarily reflect those of the European Union or European Commission. Neither the European Union nor the granting authority can be held responsible for them. | Grant Agreement No.: 101070516

150
iot_dpp/pom.xml Normal file
View File

@ -0,0 +1,150 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>eu.nebulouscloud</groupId>
<artifactId>iot-dpp-orchestrator</artifactId>
<version>0.1.0</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>exn-java-connector</id>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-commons</artifactId>
<version>2.31.2</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-protocols</artifactId>
<version>2.31.2</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-mqtt-protocol</artifactId>
<version>2.31.2</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-server</artifactId>
<version>2.31.2</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-amqp-protocol</artifactId>
<version>2.31.2</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-core-client</artifactId>
<version>2.31.2</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>2.31.2</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>2.31.2</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>eu.nebulouscloud</groupId>
<artifactId>exn-connector-java</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>protonj2-client</artifactId>
<version>1.0.0-M18</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-tooling</artifactId>
<version>6.0.1</version>
<scope>test</scope>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Automatic-Module-Name>
eu.nebulouscloud.iot_dpp_orchestrator</Automatic-Module-Name>
</manifestEntries>
<manifest>
<addDefaultSpecificationEntries>false</addDefaultSpecificationEntries>
<addDefaultImplementationEntries>false</addDefaultImplementationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,43 @@
package eut.nebulouscloud.iot_dpp;
/**
* DTO for handling information regarding the ActiveMQ Group Id value extraction criteria for a certain address.
*/
public class GroupIDExtractionParameters {
public enum GroupIDExpressionSource {
PROPERTY, BODY_JSON, BODY_XML
}
/**
* Message part that contains the group ID
*/
private GroupIDExpressionSource source;
/**
* Expression to extract the group Id from the message
*/
private String expression;
public GroupIDExtractionParameters(GroupIDExpressionSource source, String expression) {
super();
this.source = source;
this.expression = expression;
}
public GroupIDExpressionSource getSource() {
return source;
}
public void setSource(GroupIDExpressionSource source) {
this.source = source;
}
public String getExpression() {
return expression;
}
public void setExpression(String expression) {
this.expression = expression;
}
}

View File

@ -0,0 +1,176 @@
package eut.nebulouscloud.iot_dpp;
import java.io.ByteArrayInputStream;
import java.util.Map;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathFactory;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.JsonPath;
/**
* ActiveMQ Artemis plugin for extracting the value to be used as JMSXGroupID from the message.
*/
public class MessageGroupIDAnnotationPlugin implements ActiveMQServerMessagePlugin {
static Logger LOGGER = LoggerFactory.getLogger(MessageGroupIDAnnotationPlugin.class);
static ObjectMapper om = new ObjectMapper();
public static SimpleString MESSAGE_GROUP_ANNOTATION = new SimpleString("JMSXGroupID");
/**
* Dictionary with groupId extraction parameters.
* Each key in the dictionary corresponds to an address, the value is the information on how to extract the group Id from the messages on that address.
*/
Map<String, GroupIDExtractionParameters> groupIdExtractionParameterPerAddress;
public MessageGroupIDAnnotationPlugin()
{
}
public MessageGroupIDAnnotationPlugin(Map<String, GroupIDExtractionParameters> groupIdExtractionParameterPerTopic)
{
this.groupIdExtractionParameterPerAddress = groupIdExtractionParameterPerTopic;
}
/**
* Extracts the string value for an annotation from the message. Returns
* defaultValue if annotation is not found or null.
*
* @param message
* @param annotation
* @param defaultValue
* @return
*/
private String getStringAnnotationValueOrDefault(Message message, SimpleString annotation, String defaultValue) {
Object annotationValue = message.getAnnotation(annotation);
if (annotationValue == null)
return defaultValue;
if (annotationValue instanceof SimpleString)
return ((SimpleString) annotationValue).toString();
return (String) annotationValue;
}
@Override
public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct,
boolean noAutoCreateQueue) {
String destinationTopic = message.getAddress();
if (groupIdExtractionParameterPerAddress.containsKey(destinationTopic)) {
String groupID = getStringAnnotationValueOrDefault(message, MESSAGE_GROUP_ANNOTATION, null);
if (groupID != null) {
LOGGER.debug("Message already assigned to groupID " + groupID);
return;
}
groupID = extractGroupID(message, groupIdExtractionParameterPerAddress.get(destinationTopic));
if (groupID != null) {
LOGGER.debug(String.format("Message %s assigned group id %s", message.toString(), groupID));
message.putStringProperty(MESSAGE_GROUP_ANNOTATION, new SimpleString(groupID));
} else {
LOGGER.warn(String.format("GroupId assigned to message %s is null. Ignoring it", message.toString()));
}
} else {
LOGGER.debug("Ignoring message with address " + destinationTopic);
}
}
public static String extractGroupID(Message message, GroupIDExtractionParameters expression) {
switch (expression.getSource()) {
case BODY_JSON:
return extractGroupIDFromJSONBody(message, expression.getExpression());
case BODY_XML:
return extractGroupIdFromXMLBody(message,expression.getExpression());
case PROPERTY:
return extractGroupIDFromMessageProperties(message, expression.getExpression());
default:
throw new NotImplementedException(
String.format("Source %s not supported in extractGroupID", expression.getSource()));
}
}
public static String extractGroupIdFromXMLBody(Message message, String xpath) {
if (message instanceof LargeServerMessage) {
LOGGER.error("Can't extract group ID from XML body on LargeServerMessages");
return null;
}
String body = message.getStringBody();
if (body == null) {
LOGGER.error(
String.format("Can't extract group id from XML body on message %s since body is null", message));
}
try {
DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = builderFactory.newDocumentBuilder();
Document xmlDocument = builder.parse( new ByteArrayInputStream(message.getStringBody().getBytes()));
XPath xPath = XPathFactory.newInstance().newXPath();
return (String) xPath.compile(xpath).evaluate(xmlDocument, XPathConstants.STRING);
} catch (Exception ex) {
LOGGER.error(
String.format("Can't extract group id on path %s from XML body of message %s", xpath, message),
ex);
return null;
}
}
/**
* https://restfulapi.net/json-jsonpath/
* @param message
* @param jsonPath
* @return
*/
public static String extractGroupIDFromJSONBody(Message message, String jsonPath) {
if (message instanceof LargeServerMessage) {
LOGGER.error("Can't extract group id from JSON body on LargeServerMessages");
return null;
}
String body = message.getStringBody();
if (body == null) {
LOGGER.error(
String.format("Can't extract group id from JSON body on message %s since body is null", message));
}
try {
return JsonPath.read(body, jsonPath).toString();
} catch (Exception ex) {
LOGGER.error(
String.format("Can't extract group id on path %s from JSON body of message %s", jsonPath, message),
ex);
return null;
}
}
public static String extractGroupIDFromMessageProperties(Message message, String sourceProperty) {
if (!message.getPropertyNames().contains(sourceProperty)) {
LOGGER.debug(String.format(
"Can't extract groupID from property '%s' since this property doesn't exist in the message %s ",
sourceProperty, message.toString()));
return null;
}
return message.getObjectProperty(sourceProperty).toString();
}
}

View File

@ -0,0 +1,33 @@
package eut.nebulouscloud.iot_dpp.monitoring;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eut.nebulouscloud.iot_dpp.monitoring.events.MessageLifecycleEvent;
/**
* Extension of the abstract class MessageLifecycleMonitoringPlugin that handles
* sending the generated events to the EMS
*/
public class EMSMessageLifecycleMonitoringPlugin extends MessageLifecycleMonitoringPlugin {
Logger LOGGER = LoggerFactory.getLogger(EMSMessageLifecycleMonitoringPlugin.class);
protected ObjectMapper om = new ObjectMapper();
private static final String EMS_METRICS_TOPIC = "eu.nebulouscloud.monitoring.realtime.iot.messaging_events";
@Override
protected void notifyEvent(MessageLifecycleEvent event) {
String str;
try {
str = om.writeValueAsString(event);
EventManagementSystemPublisher.send(EMS_METRICS_TOPIC, str);
} catch (Exception e) {
LOGGER.error("Unable to send event to the EMS", e);
}
}
}

View File

@ -0,0 +1,23 @@
package eut.nebulouscloud.iot_dpp.monitoring;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EventManagementSystemPublisher {
Logger LOGGER = LoggerFactory.getLogger(EventManagementSystemPublisher.class);
private static EventManagementSystemPublisher instance;
private EventManagementSystemPublisher() {
}
public static void send(String address, String payload) {
if (instance == null)
instance = new EventManagementSystemPublisher();
//TODO: implement actual sending of the message to the EMS using the EXN ActiveMQ library
}
}

View File

@ -0,0 +1,252 @@
package eut.nebulouscloud.iot_dpp.monitoring;
import java.util.Date;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eut.nebulouscloud.iot_dpp.monitoring.events.MessageAcknowledgedEvent;
import eut.nebulouscloud.iot_dpp.monitoring.events.MessageDeliveredEvent;
import eut.nebulouscloud.iot_dpp.monitoring.events.MessageLifecycleEvent;
import eut.nebulouscloud.iot_dpp.monitoring.events.MessagePublishedEvent;
/**
* ActiveMQ Artemis plugin for tracking the lifecycle of messages inside an
* ActiveMQ cluster. On each step of the message lifecycle (a producer writes a
* message to the cluster, the message is delivered to a consumer, the consumer
* ACKs the message), the plugin generates events with relevant information that
* can be used for understanding how IoT applications components on NebulOuS are
* communicating.
*/
public abstract class MessageLifecycleMonitoringPlugin implements ActiveMQServerMessagePlugin {
Logger LOGGER = LoggerFactory.getLogger(MessageLifecycleMonitoringPlugin.class);
protected ObjectMapper om = new ObjectMapper();
/**
* Annotation used to identify the ActiveMQ cluster node that first received a
* message receiving
*/
SimpleString RECEIVING_NODE_ANNOTATION = new SimpleString("NEBULOUS_RECEIVING_NODE");
/**
* Annotation used to track the original id of the message when it was first
* received by the cluster.
*
*/
SimpleString ORIGINAL_MESSAGE_ID_ANNOTATION = new SimpleString("NEBULOUS_ORIGINAL_MESSAGE_ID");
/**
* Annotation used to mark the moment a message was sent to the client
*/
SimpleString DELIVER_TIMESTAMP_ANNOTATION = new SimpleString("NEBULOUS_DELIVER_TIMESTAMP");
/**
* Abstract method called whenever a message event is raised: - a producer
* writes a message to the cluster - the message is delivered to a consumer -
* the consumer ACKs the message
*
* @param event
*/
protected abstract void notifyEvent(MessageLifecycleEvent event);
/**
* Retrieves server name from consumer reference.
*
* @param consumer
* @return
*/
private String getServerName(ServerConsumer consumer) {
// TODO: Find a better way of retrieving server name rather than parsing the
// string representation of the consumer object.
return consumer.toString().split("server=ActiveMQServerImpl::name=")[1].split("]")[0];
}
/**
* Retrieves the estimated message size.
*
* @param message
* @return
* @throws ActiveMQException
*/
private long getEstimatedMessageSize(Message message) throws ActiveMQException {
if (message instanceof LargeServerMessage)
return ((LargeServerMessage) message).getLargeBody().getBodySize();
return message.getEncodeSize();
}
/**
* Extracts the string value for an annotation from the message. Returns
* defaultValue if annotation is not found or null.
*
* @param message
* @param annotation
* @param defaultValue
* @return
*/
private String getStringAnnotationValueOrDefault(Message message, SimpleString annotation, String defaultValue) {
Object annotationValue = message.getAnnotation(annotation);
if (annotationValue == null)
return defaultValue;
if (annotationValue instanceof SimpleString)
return ((SimpleString) annotationValue).toString();
return (String) annotationValue;
}
private Long getLongAnnotationValueOrDefault(Message message, SimpleString annotation, Long defaultValue) {
Object annotationValue = message.getAnnotation(annotation);
if (annotationValue == null)
return defaultValue;
return (Long) annotationValue;
}
/**
* Constructs a MessagePublishedMonitoringEvent from a Message reference
*
* @param message
* @return
* @throws Exception
*/
private MessagePublishedEvent buildPublishEvent(Message message) throws Exception {
String clientId = message.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
long size = getEstimatedMessageSize(message);
String sourceNode = getStringAnnotationValueOrDefault(message, RECEIVING_NODE_ANNOTATION, null);
long messageId = getLongAnnotationValueOrDefault(message, ORIGINAL_MESSAGE_ID_ANNOTATION, -1l);
return new MessagePublishedEvent(messageId, message.getAddress(), sourceNode, clientId, size,
message.getTimestamp());
}
/**
* Constructs a MessageDeliveredMonitoringEvent from a Message and consumer
* reference
*
* @param message
* @param consumer
* @return
* @throws Exception
*/
private MessageDeliveredEvent buildDeliverEvent(Message message, ServerConsumer consumer)
throws Exception {
MessagePublishedEvent publishEvent = buildPublishEvent(message);
String nodeName = getServerName(consumer);
Long deliveryTimeStamp = getLongAnnotationValueOrDefault(message, DELIVER_TIMESTAMP_ANNOTATION, -1l);
return new MessageDeliveredEvent(publishEvent, consumer.getQueueAddress().toString(), nodeName,
consumer.getConnectionClientID(), deliveryTimeStamp);
}
@Override
public void afterSend(ServerSession session, Transaction tx, Message message, boolean direct,
boolean noAutoCreateQueue, RoutingStatus result) throws ActiveMQException {
try {
/**
* Ignore activemq control messages
*/
if (message.getAddress().startsWith("$sys") || message.getAddress().startsWith("activemq"))
return;
/**
* If message is already annotated with any of our custom annotations means that
* the message is not being sent by a ActiveMQ external client (it is due to
* intra-cluster communication).
* Ignore it
*/
if (message.getAnnotation(RECEIVING_NODE_ANNOTATION) != null
|| message.getAnnotation(ORIGINAL_MESSAGE_ID_ANNOTATION) != null) {
return;
}
String nodeName = ((ActiveMQServerImpl) ((PostOfficeImpl) ((ServerSessionImpl) session).postOffice).getServer())
.getConfiguration().getName();
if (message.getAnnotation(RECEIVING_NODE_ANNOTATION) == null)
message.setAnnotation(RECEIVING_NODE_ANNOTATION, nodeName);
if (message.getAnnotation(ORIGINAL_MESSAGE_ID_ANNOTATION) == null)
message.setAnnotation(ORIGINAL_MESSAGE_ID_ANNOTATION, message.getMessageID());
MessageLifecycleEvent event = buildPublishEvent(message);
LOGGER.info("MessagePublishedMonitoringEvent: " + om.writeValueAsString(event));
notifyEvent(event);
} catch (Exception e) {
LOGGER.error("afterSend failed", e);
}
}
/**
* Anotates the message with the current time as value for
* DELIVER_TIMESTAMP_ANNOTATION
*/
@Override
public void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
reference.getMessage().setAnnotation(DELIVER_TIMESTAMP_ANNOTATION, new Date().getTime());
}
@Override
public void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
try {
/**
* Ignore activemq control messages
*/
if (reference.getQueue().getAddress().toString().startsWith("$sys")
|| reference.getQueue().getAddress().toString().startsWith("activemq"))
return;
Message message = reference.getMessage();
MessageDeliveredEvent deliverEvent = buildDeliverEvent(message, consumer);
LOGGER.info("MessageDeliveredMonitoringEvent: " + om.writeValueAsString(deliverEvent));
notifyEvent(deliverEvent);
} catch (Exception e) {
LOGGER.error("afterDeliver failed", e);
}
}
@Override
public void messageAcknowledged(Transaction tx, MessageReference reference, AckReason reason,
ServerConsumer consumer) throws ActiveMQException {
try {
/**
* Ignore activemq control messages
*/
if (reference.getQueue().getAddress().toString().startsWith("$sys")
|| reference.getQueue().getAddress().toString().startsWith("activemq") || consumer == null)
return;
Message message = reference.getMessage();
MessageDeliveredEvent deliverEvent = buildDeliverEvent(message, consumer);
MessageAcknowledgedEvent ackEvent = new MessageAcknowledgedEvent(deliverEvent,
new Date().getTime());
LOGGER.info("MessageAcknowledgedMonitoringEvent: " + om.writeValueAsString(ackEvent));
notifyEvent(ackEvent);
} catch (Exception e) {
LOGGER.error("messageAcknowledged failed", e);
}
}
}

View File

@ -0,0 +1,56 @@
package eut.nebulouscloud.iot_dpp.monitoring.events;
/**
* Event generated when a message is acknowledged by a client of the the pub/sub
* system
*/
public class MessageAcknowledgedEvent extends MessageLifecycleEvent {
/**
* Node where the client is connected
*/
public final String node;
/**
* Id of the client recieving the message
*/
public final String clientId;
/**
* Node where the message was originaly published
*/
public final String publishNode;
/**
* Id of the client that originally published the message
*/
public final String publishClientId;
/**
* Addres where the message was originally published
*/
public final String publishAddress;
/**
* Time when the message was published (milliseconds since epoch).
*/
public final long publishTimestamp;
/**
* Time when the message was delivered to the client.
*/
public final long deliverTimestamp;
public MessageAcknowledgedEvent(MessageDeliveredEvent deliverEvent,long timestamp) {
super(deliverEvent.messageId, deliverEvent.messageAddress, deliverEvent.messageSize, timestamp);
this.node = deliverEvent.node;
this.clientId = deliverEvent.clientId;
this.publishAddress = deliverEvent.messageAddress;
this.publishNode = deliverEvent.publishNode;
this.publishClientId = deliverEvent.publishClientId;
this.publishTimestamp = deliverEvent.publishTimestamp;
this.deliverTimestamp = deliverEvent.timestamp;
}
}

View File

@ -0,0 +1,53 @@
package eut.nebulouscloud.iot_dpp.monitoring.events;
/**
* Event generated when a message is delivered to a client of the the pub/sub
* system
*/
public class MessageDeliveredEvent extends MessageLifecycleEvent {
/**
* Node where the client is connected
*/
public final String node;
/**
* Id of the client recieving the message
*/
public final String clientId;
/**
* Node where the message was originaly published
*/
public final String publishNode;
/**
* Addres where the message was originally published
*/
public final String publishAddress;
/**
*
* /** Id of the client that originally published the message
*/
public final String publishClientId;
/**
* Time when the message was published (milliseconds since epoch).
*/
public final long publishTimestamp;
public MessageDeliveredEvent(MessagePublishedEvent publishEvent, String address, String node,
String clientId, long timestamp) {
super(publishEvent.messageId, address, publishEvent.messageSize, timestamp);
this.node = node;
this.clientId = clientId;
this.publishAddress = publishEvent.messageAddress;
this.publishNode = publishEvent.node;
this.publishClientId = publishEvent.clientId;
this.publishTimestamp = publishEvent.timestamp;
}
}

View File

@ -0,0 +1,36 @@
package eut.nebulouscloud.iot_dpp.monitoring.events;
/**
* A base class for modelling events related to messages
*/
public abstract class MessageLifecycleEvent {
/**
* Id of the message
*/
public final long messageId;
/**
* Timestamp when the event occurred
*/
public final long timestamp;
/**
* Size of the message (in bytes)
*/
public final long messageSize;
public final String messageAddress;
public MessageLifecycleEvent(long messageId,String messageAddress,long size, long timestamp)
{
this.timestamp = timestamp;
this.messageAddress = messageAddress;
this.messageId = messageId;
this.messageSize = size;
}
}

View File

@ -0,0 +1,24 @@
package eut.nebulouscloud.iot_dpp.monitoring.events;
/**
* Event generated when a message is published to the pub/sub system
*/
public class MessagePublishedEvent extends MessageLifecycleEvent{
/**
* The name of the pub/sub cluster where the messag was published
*/
public final String node;
/**
* The Id of the client that published the message
*/
public final String clientId;
public MessagePublishedEvent(long messageId, String address, String node, String clientId, long size,long timestamp) {
super(messageId,address,size,timestamp);
this.node = node;
this.clientId = clientId;
}
}

View File

@ -0,0 +1,35 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Log4J 2 configuration
# Monitor config file every X seconds for updates
monitorInterval = 5
loggers = activemq
logger.activemq.name = org.apache.activemq
logger.activemq.level = INFO
#logger.org.apache.activemq.artemis.core.server.cluster.level = TRACE
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = console
# Console appender
appender.console.type=Console
appender.console.name=console
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d %-5level [%logger] %msg%n

View File

@ -0,0 +1,249 @@
package eut.nebulouscloud.iot_dpp;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
//remove
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eut.nebulouscloud.iot_dpp.GroupIDExtractionParameters.GroupIDExpressionSource;
class MessageGroupIdAnnotationPluginTest {
static Logger LOGGER = LoggerFactory.getLogger(MessageGroupIdAnnotationPluginTest.class);
/**
* Creates a local ActiveMQ server listening at localhost:61616. The server
* accepts requests from any user. Configures the MessageGroupIDAnnotationPlugin and
* sets it to use the provided groupIdExtractionParameterPerTopic dict.
*
* @param events The groupIdExtractionParameterPerTopic dict. Its contents can be changed by the test code during the test execution and the plugin will react accordingly.
* @return the created EmbeddedActiveMQ instance.
* @throws Exception
*/
static private EmbeddedActiveMQ createLocalServer(int port,
Map<String, GroupIDExtractionParameters> groupIdExtractionParameterPerTopic) throws Exception {
Configuration config = new ConfigurationImpl();
String foldersRoot = "data/" + new Date().getTime() + "/data_" + port;
config.setBindingsDirectory(foldersRoot + "/bindings");
config.setJournalDirectory(foldersRoot + "/journal");
config.setJournalRetentionDirectory(foldersRoot + "/journalRetention");
config.setLargeMessagesDirectory(foldersRoot + "/lm");
config.setNodeManagerLockDirectory(foldersRoot + "/nodeManagerLock");
config.setPagingDirectory(foldersRoot + "/paging");
config.addConnectorConfiguration("serverAt" + port + "Connector", "tcp://localhost:" + port);
config.addAcceptorConfiguration("netty", "tcp://localhost:" + port);
config.getBrokerMessagePlugins().add(new MessageGroupIDAnnotationPlugin(groupIdExtractionParameterPerTopic));
EmbeddedActiveMQ server = new EmbeddedActiveMQ();
server.setSecurityManager(new ActiveMQSecurityManager() {
@Override
public boolean validateUserAndRole(String user, String password, Set<Role> roles, CheckType checkType) {
return true;
}
@Override
public boolean validateUser(String user, String password) {
return true;
}
});
server.setConfiguration(config);
server.start();
Thread.sleep(1000);
return server;
}
static EmbeddedActiveMQ server = null;
static Map<String, GroupIDExtractionParameters> groupIdExtractionParameterPerAddress = new HashMap<String, GroupIDExtractionParameters>();
static Session session;
static Connection connection;
@BeforeAll
static void createServer() throws Exception {
LOGGER.info("createServer");
server = createLocalServer(6161, groupIdExtractionParameterPerAddress);
ActiveMQJMSConnectionFactory connectionFactory = new ActiveMQJMSConnectionFactory("tcp://localhost:6161","artemis", "artemis"
);
connection = connectionFactory.createConnection();
connection.start();
}
@AfterAll
static void destroyServer() {
try {
server.stop();
} catch (Exception ex) {
}
}
@BeforeEach
void before() throws JMSException {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
groupIdExtractionParameterPerAddress.clear();
}
@AfterEach
void adter() {
if(session!=null) {try{session.close();}catch(Exception ex) {}}
}
/**
* Test it can extract a simple JSON value
* @throws Exception
*/
@Test
void JSONTest1() throws Exception {
String address = "testaddress";
groupIdExtractionParameterPerAddress.put(address,
new GroupIDExtractionParameters(GroupIDExpressionSource.BODY_JSON, "address.city"));
Destination destination = session.createQueue(address);
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
String text = "{\"address\":{\"city\":\"Lleida\",\"street\":\"C\\\\Cavallers\"},\"name\":\"Jon doe\",\"age\":\"22\"}";
TextMessage originalMessage = session.createTextMessage(text);
producer.send(originalMessage);
Message message = consumer.receive();
String value = ((ActiveMQTextMessage)message).getCoreMessage().getStringProperty(MessageGroupIDAnnotationPlugin.MESSAGE_GROUP_ANNOTATION.toString());
assertEquals("Lleida", value);
}
/**
* In case of invalid message body, MESSAGE_GROUP_ANNOTATION should remain null
* @throws Exception
*/
@Test
void JSONTest2() throws Exception {
String address = "testaddress";
groupIdExtractionParameterPerAddress.put(address,
new GroupIDExtractionParameters(GroupIDExpressionSource.BODY_JSON, "address.city"));
Destination destination = session.createQueue(address);
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
String text = "{\"ad2\"}";
TextMessage originalMessage = session.createTextMessage(text);
producer.send(originalMessage);
Message message = consumer.receive();
String value = ((ActiveMQTextMessage)message).getCoreMessage().getStringProperty(MessageGroupIDAnnotationPlugin.MESSAGE_GROUP_ANNOTATION.toString());
assertEquals(null, value);
}
/**
* Test it can extract a complex JSON value
* @throws Exception
*/
@Test
void JSONTest3() throws Exception {
String address = "testaddress";
groupIdExtractionParameterPerAddress.put(address,
new GroupIDExtractionParameters(GroupIDExpressionSource.BODY_JSON, "address"));
Destination destination = session.createQueue(address);
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
String text = "{\"address\":{\"city\":\"Lleida\",\"street\":\"C\\\\Cavallers\"},\"name\":\"Jon doe\",\"age\":\"22\"}";
TextMessage originalMessage = session.createTextMessage(text);
producer.send(originalMessage);
Message message = consumer.receive();
String value = ((ActiveMQTextMessage)message).getCoreMessage().getStringProperty(MessageGroupIDAnnotationPlugin.MESSAGE_GROUP_ANNOTATION.toString());
assertEquals("{city=Lleida, street=C\\Cavallers}", value);
}
/**
* Test it can extract a simple XML value
* @throws Exception
*/
@Test
void XMLTest1() throws Exception {
String address = "testaddress";
groupIdExtractionParameterPerAddress.put(address,
new GroupIDExtractionParameters(GroupIDExpressionSource.BODY_XML, "/root/address/city"));
Destination destination = session.createQueue(address);
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
String text = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\r\n"
+ "<root>\r\n"
+ " <address>\r\n"
+ " <city>Lleida</city>\r\n"
+ " <street>C\\Cavallers</street>\r\n"
+ " </address>\r\n"
+ " <age>22</age>\r\n"
+ " <name>Jon doe</name>\r\n"
+ "</root>";
TextMessage originalMessage = session.createTextMessage(text);
producer.send(originalMessage);
Message message = consumer.receive();
String value = ((ActiveMQTextMessage)message).getCoreMessage().getStringProperty(MessageGroupIDAnnotationPlugin.MESSAGE_GROUP_ANNOTATION.toString());
assertEquals("Lleida", value);
}
/**
* Test it can extract a complex XML value
* @throws Exception
*/
@Test
void XMLTest2() throws Exception {
String address = "testaddress";
groupIdExtractionParameterPerAddress.put(address,
new GroupIDExtractionParameters(GroupIDExpressionSource.BODY_XML, "/root/address"));
Destination destination = session.createQueue(address);
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
String text = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\r\n"
+ "<root>\r\n"
+ " <address>\r\n"
+ " <city>Lleida</city>\r\n"
+ " <street>C\\Cavallers</street>\r\n"
+ " </address>\r\n"
+ " <age>22</age>\r\n"
+ " <name>Jon doe</name>\r\n"
+ "</root>";
TextMessage originalMessage = session.createTextMessage(text);
producer.send(originalMessage);
Message message = consumer.receive();
String value = ((ActiveMQTextMessage)message).getCoreMessage().getStringProperty(MessageGroupIDAnnotationPlugin.MESSAGE_GROUP_ANNOTATION.toString());
assertTrue(value!=null);
assertTrue(value.contains("Lleida"));
assertTrue(value.contains("Cavallers"));
}
}

View File

@ -0,0 +1,467 @@
package eut.nebulouscloud.iot_dpp;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.qpid.protonj2.client.ClientOptions;
import org.apache.qpid.protonj2.client.Delivery;
import org.apache.qpid.protonj2.client.DeliveryMode;
import org.apache.qpid.protonj2.client.Receiver;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eut.nebulouscloud.iot_dpp.monitoring.MessageLifecycleMonitoringPlugin;
import eut.nebulouscloud.iot_dpp.monitoring.events.MessageAcknowledgedEvent;
import eut.nebulouscloud.iot_dpp.monitoring.events.MessageDeliveredEvent;
import eut.nebulouscloud.iot_dpp.monitoring.events.MessageLifecycleEvent;
import eut.nebulouscloud.iot_dpp.monitoring.events.MessagePublishedEvent;
/**
* Test the correct functionaltiy of MessageLifecycleMonitoringPlugin
*/
class MessageMonitoringPluginTest {
static Logger LOGGER = LoggerFactory.getLogger(MessageMonitoringPluginTest.class);
/**
* Creates a local ActiveMQ server listening at localhost:61616. The server
* accepts requests from any user. Configures the MessageMonitoringPluging and
* sets it to store generated events in the provided events list
*
* @param events A list that will contain all the events generated by the
* MessageMonitoringPluging
* @return the created MessageMonitoringPluging instance.
* @throws Exception
*/
private EmbeddedActiveMQ createActiveMQBroker(String nodeName, List<MessageLifecycleEvent> events, int port,
List<Integer> otherServers) throws Exception {
Configuration config = new ConfigurationImpl();
config.setName(nodeName);
String foldersRoot = "data/" + new Date().getTime() + "/data_" + port;
config.setBindingsDirectory(foldersRoot + "/bindings");
config.setJournalDirectory(foldersRoot + "/journal");
config.setJournalRetentionDirectory(foldersRoot + "/journalRetention");
config.setLargeMessagesDirectory(foldersRoot + "/lm");
config.setNodeManagerLockDirectory(foldersRoot + "/nodeManagerLock");
config.setPagingDirectory(foldersRoot + "/paging");
config.addConnectorConfiguration("serverAt" + port + "Connector", "tcp://localhost:" + port);
config.addAcceptorConfiguration("netty", "tcp://localhost:" + port);
// config.setPersistenceEnabled(true);
ClusterConnectionConfiguration cluster = new ClusterConnectionConfiguration();
cluster.setAddress("");
cluster.setConnectorName("serverAt" + port + "Connector");
cluster.setName("my-cluster");
cluster.setAllowDirectConnectionsOnly(false);
cluster.setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND);
cluster.setRetryInterval(100);
config.setClusterConfigurations(List.of(cluster));
if (otherServers != null) {
for (Integer otherPort : otherServers) {
cluster.setStaticConnectors(List.of("serverAt" + otherPort + "Connector"));
config.addConnectorConfiguration("serverAt" + otherPort + "Connector", "tcp://localhost:" + otherPort);
}
}
MessageLifecycleMonitoringPlugin plugin = new MessageLifecycleMonitoringPlugin() {
@Override
protected void notifyEvent(MessageLifecycleEvent event) {
events.add(event);
}
};
config.getBrokerMessagePlugins().add(plugin);
EmbeddedActiveMQ server = new EmbeddedActiveMQ();
server.setSecurityManager(new ActiveMQSecurityManager() {
@Override
public boolean validateUserAndRole(String user, String password, Set<Role> roles, CheckType checkType) {
return true;
}
@Override
public boolean validateUser(String user, String password) {
return true;
}
});
server.setConfiguration(config);
server.start();
return server;
}
private EmbeddedActiveMQ createActiveMQBroker(String nodeName, List<MessageLifecycleEvent> events)
throws Exception {
return createActiveMQBroker(nodeName, events, 61616, null);
}
/**
* Test message monitoring plugin when MQTT clients are interacting with the
* ActiveMQ broker.
*
* @throws Exception
*/
@Test
void MQTTTestSingleNode() throws Exception {
/**
* Create a local ActiveMQ server
*/
EmbeddedActiveMQ broker = null;
try {
List<MessageLifecycleEvent> raisedEvents = new LinkedList<MessageLifecycleEvent>();
broker = createActiveMQBroker("test-server", raisedEvents);
/**
* Create a persistent subscription on a topic. Close the consumer afterwards.
*/
String testTopic = "test-topic";
IMqttClient consumer = new MqttClient("tcp://localhost:61616", "consumer");
consumer.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
LOGGER.info("messageArrived: " + topic + " " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
@Override
public void connectionLost(Throwable cause) {
}
});
MqttConnectOptions opts = new MqttConnectOptions();
opts.setCleanSession(false);
consumer.connect(opts);
consumer.subscribe(testTopic, 2);
consumer.disconnect();
/**
* Publish a message to the topic
*/
IMqttClient publisher = new MqttClient("tcp://localhost:61616", "publisher");
publisher.connect();
MqttMessage message = new MqttMessage("hola".getBytes());
message.setQos(2);
publisher.publish(testTopic, message);
/**
* Wait some time and re-connect the consumer.
*/
Thread.sleep(1000);
long reconnectTime = new Date().getTime();
consumer.connect(opts);
Thread.sleep(1000); // Give some time for consumer to receive messages
/**
* Validate that the MessagePublishedMonitoringEvent,
* MessageDeliveredMonitoringEvent and MessageAcknowledgedMonitoringEvent are
* properly generated
*/
Optional<MessageLifecycleEvent> publishEventOpt = raisedEvents.stream()
.filter(e -> e.getClass() == MessagePublishedEvent.class).findFirst();
assertEquals(true, publishEventOpt.isPresent());
MessagePublishedEvent publishEvent = (MessagePublishedEvent) publishEventOpt.get();
Optional<MessageLifecycleEvent> deliverEventOpt = raisedEvents.stream()
.filter(e -> e.getClass() == MessageDeliveredEvent.class).findFirst();
assertEquals(true, deliverEventOpt.isPresent());
MessageDeliveredEvent deliverEvent = (MessageDeliveredEvent) deliverEventOpt.get();
Optional<MessageLifecycleEvent> ackEventOptional = raisedEvents.stream()
.filter(e -> e.getClass() == MessageAcknowledgedEvent.class).findFirst();
assertEquals(true, ackEventOptional.isPresent());
MessageAcknowledgedEvent ackEvent = (MessageAcknowledgedEvent) ackEventOptional.get();
assertEquals(publishEvent.messageAddress, deliverEvent.publishAddress);
assertEquals(publishEvent.clientId, deliverEvent.publishClientId);
assertEquals(publishEvent.messageId, deliverEvent.messageId);
// assertEquals(publishEvent.messageSize, deliverEvent.messageSize);
assertEquals(publishEvent.node, deliverEvent.publishNode);
assertEquals(publishEvent.timestamp, deliverEvent.publishTimestamp);
assertTrue(deliverEvent.timestamp > reconnectTime);
assertEquals(publishEvent.messageId, ackEvent.messageId);
assertEquals(ackEvent.deliverTimestamp, deliverEvent.timestamp);
assertTrue(ackEvent.timestamp > reconnectTime);
} finally {
try {
broker.stop();
} catch (Exception e) {
}
}
}
/**
* Test message monitoring plugin when AMQP clients are interacting with the
* ActiveMQ broker.
*
* @throws Exception
*/
@Test
void AMQPTestSingleNode() throws Exception {
EmbeddedActiveMQ broker = null;
try {
/**
* Create a local server
*/
List<MessageLifecycleEvent> raisedEvents = new LinkedList<MessageLifecycleEvent>();
broker = createActiveMQBroker("test-broker", raisedEvents);
/**
* Create a durable subscription on a topic. Close the receiver immediately.
*/
String address = "test-addr";
org.apache.qpid.protonj2.client.Connection receiverConnection;
Receiver receiver;
{
ClientOptions opts = new ClientOptions();
opts.id("receiver");
org.apache.qpid.protonj2.client.Client container = org.apache.qpid.protonj2.client.Client.create(opts);
receiverConnection = container.connect("localhost", 61616).openFuture().get();
org.apache.qpid.protonj2.client.ReceiverOptions options = new org.apache.qpid.protonj2.client.ReceiverOptions()
.deliveryMode(DeliveryMode.AT_LEAST_ONCE).autoAccept(false).autoSettle(false);
org.apache.qpid.protonj2.client.Session session = receiverConnection.openSession().openFuture().get();
receiver = session.openDurableReceiver(address, "my-durable-sub", options);
receiver.openFuture().get(10, TimeUnit.SECONDS);
/* receiver.close(); */
session.close();
container.close();
}
/**
* Send some messages to the topic
*/
org.apache.qpid.protonj2.client.Sender sender;
org.apache.qpid.protonj2.client.Connection senderConnection;
{
ClientOptions opts = new ClientOptions();
opts.id("sender");
org.apache.qpid.protonj2.client.Client container = org.apache.qpid.protonj2.client.Client.create(opts);
senderConnection = container.connect("localhost", 61616).openFuture().get();
org.apache.qpid.protonj2.client.SenderOptions options = new org.apache.qpid.protonj2.client.SenderOptions()
.deliveryMode(DeliveryMode.AT_MOST_ONCE);
org.apache.qpid.protonj2.client.Session session = senderConnection.openSession().openFuture().get();
sender = session.openSender(address, options);
}
final org.apache.qpid.protonj2.client.Message<String> message = org.apache.qpid.protonj2.client.Message
.create("Hello World").durable(true);
Map<String, Object> deliveryAnnotations = new HashMap<>();
final org.apache.qpid.protonj2.client.Tracker tracker = sender.send(message, deliveryAnnotations);
tracker.settlementFuture().isDone();
tracker.settlementFuture().get().settled();
/**
* Afte waiting some time, re-connect the receiver to the durable subscription.
*/
LOGGER.info("Wait for read");
Thread.sleep(100);
long receiveTime = new Date().getTime();
{
ClientOptions opts = new ClientOptions();
opts.id("receiver");
org.apache.qpid.protonj2.client.Client container = org.apache.qpid.protonj2.client.Client.create(opts);
receiverConnection = container.connect("localhost", 61616).openFuture().get();
org.apache.qpid.protonj2.client.ReceiverOptions options = new org.apache.qpid.protonj2.client.ReceiverOptions()
.deliveryMode(DeliveryMode.AT_LEAST_ONCE).autoAccept(false);
org.apache.qpid.protonj2.client.Session session = receiverConnection.openSession().openFuture().get();
receiver = session.openDurableReceiver(address, "my-durable-sub", options);
receiver.openFuture().get(10, TimeUnit.SECONDS);
}
/**
* Receive a message and ACK it slightly after.
*/
Delivery d = receiver.receive();
LOGGER.info("Wait for ack");
Thread.sleep(1000);
long acceptTime = new Date().getTime();
d.accept();
sender.closeAsync().get(10, TimeUnit.SECONDS);
receiver.closeAsync().get(10, TimeUnit.SECONDS);
receiverConnection.close();
senderConnection.close();
/**
* Validate that the MessagePublishedMonitoringEvent,
* MessageDeliveredMonitoringEvent and MessageAcknowledgedMonitoringEvent are
* properly generated
*/
assertEquals(3, raisedEvents.size());
Optional<MessageLifecycleEvent> publishEventOpt = raisedEvents.stream()
.filter(e -> e.getClass() == MessagePublishedEvent.class).findFirst();
assertEquals(true, publishEventOpt.isPresent());
MessagePublishedEvent publishEvent = (MessagePublishedEvent) publishEventOpt.get();
Optional<MessageLifecycleEvent> deliverEventOpt = raisedEvents.stream()
.filter(e -> e.getClass() == MessageDeliveredEvent.class).findFirst();
assertEquals(true, deliverEventOpt.isPresent());
MessageDeliveredEvent deliverEvent = (MessageDeliveredEvent) deliverEventOpt.get();
Optional<MessageLifecycleEvent> ackEventOptional = raisedEvents.stream()
.filter(e -> e.getClass() == MessageAcknowledgedEvent.class).findFirst();
assertEquals(true, ackEventOptional.isPresent());
MessageAcknowledgedEvent ackEvent = (MessageAcknowledgedEvent) ackEventOptional.get();
assertEquals(sender.client().containerId(), publishEvent.clientId);
assertEquals(address, publishEvent.messageAddress);
assertEquals(receiver.client().containerId(), deliverEvent.clientId);
assertEquals(address, deliverEvent.messageAddress);
assertEquals(publishEvent.messageAddress, deliverEvent.publishAddress);
assertEquals(publishEvent.clientId, deliverEvent.publishClientId);
assertEquals(publishEvent.messageId, deliverEvent.messageId);
// assertEquals(publishEvent.messageSize, deliverEvent.messageSize);
assertEquals(publishEvent.node, deliverEvent.publishNode);
assertEquals(publishEvent.timestamp, deliverEvent.publishTimestamp);
assertTrue(receiveTime < deliverEvent.timestamp);
assertEquals(publishEvent.timestamp, ackEvent.publishTimestamp);
assertTrue(acceptTime < ackEvent.timestamp);
} finally {
try {
broker.stop();
} catch (Exception e) {
}
}
}
/**
* Test that MQTT message flow is properly registered in a clustered
* environment. - Create two ActiveMQ brokers (A and B) - Create a consumer on
* broker B on "test/atopic" - Create a producer on broker A and send a message
* to "test/atopic" - Consumer on broker B receives the message.
*
* @throws Exception
*/
@Test
void MQTTTestClustered() throws Exception {
EmbeddedActiveMQ testBrokerA = null;
EmbeddedActiveMQ testBrokerB = null;
try {
/**
* Create a local ActiveMQ server
*/
final int brokerAPort = 6161;
final int brokerBPort = 6162;
List<MessageLifecycleEvent> raisedEventsBrokerA = new LinkedList<MessageLifecycleEvent>();
testBrokerA = createActiveMQBroker("test-broker-A", raisedEventsBrokerA, brokerAPort, List.of(brokerBPort));
List<MessageLifecycleEvent> raisedEventsBrokerB = new LinkedList<MessageLifecycleEvent>();
testBrokerB = createActiveMQBroker("test-broker-B", raisedEventsBrokerB, brokerBPort,null);
/**
* Create a persistent subscription on a topic. Close the consumer afterwards.
*/
String testTopic = "test/atopic";
IMqttClient consumerBrokerB = new MqttClient("tcp://localhost:" + brokerBPort, "consumer");
MqttConnectOptions opts = new MqttConnectOptions();
opts.setCleanSession(true);
consumerBrokerB.connect(opts);
consumerBrokerB.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
LOGGER.info("messageArrived: " + topic + " " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
}
});
consumerBrokerB.subscribe(testTopic, 2);
/**
* Publish a message to the topic
*/
IMqttClient publisherBrokerA = new MqttClient("tcp://localhost:" + brokerAPort, "publisher");
publisherBrokerA.connect();
MqttMessage message = new MqttMessage("hola from publisher".getBytes());
message.setQos(2);
publisherBrokerA.publish(testTopic, message);
Thread.sleep(5 * 1000); // Give some time for consumer to receive messages
/**
* Validate that the MessagePublishedMonitoringEvent,
* MessageDeliveredMonitoringEvent and MessageAcknowledgedMonitoringEvent are
* properly generated
*/
Optional<MessageLifecycleEvent> publishEventOpt = raisedEventsBrokerA.stream()
.filter(e -> e.getClass() == MessagePublishedEvent.class).findFirst();
assertEquals(true, publishEventOpt.isPresent());
MessagePublishedEvent publishEvent = (MessagePublishedEvent) publishEventOpt.get();
Optional<MessageLifecycleEvent> deliverEventOpt = raisedEventsBrokerB.stream()
.filter(e -> e.getClass() == MessageDeliveredEvent.class).findFirst();
assertEquals(true, deliverEventOpt.isPresent());
MessageDeliveredEvent deliverEvent = (MessageDeliveredEvent) deliverEventOpt.get();
Optional<MessageLifecycleEvent> ackEventOptional = raisedEventsBrokerB.stream()
.filter(e -> e.getClass() == MessageAcknowledgedEvent.class).findFirst();
assertEquals(true, ackEventOptional.isPresent());
MessageAcknowledgedEvent ackEvent = (MessageAcknowledgedEvent) ackEventOptional.get();
assertEquals(publishEvent.messageAddress, deliverEvent.publishAddress);
assertEquals(publishEvent.clientId, deliverEvent.publishClientId);
assertEquals(publishEvent.messageId, deliverEvent.messageId);
// assertEquals(publishEvent.messageSize, deliverEvent.messageSize);
assertEquals(publishEvent.node, deliverEvent.publishNode);
assertEquals(publishEvent.timestamp, deliverEvent.publishTimestamp);
// assertTrue(deliverEvent.timestamp > reconnectTime);
assertEquals(publishEvent.messageId, ackEvent.messageId);
assertEquals(ackEvent.deliverTimestamp, deliverEvent.timestamp);
// assertTrue(ackEvent.timestamp > reconnectTime);
} finally {
try {
testBrokerA.stop();
} catch (Exception e) {
}
try {
testBrokerB.stop();
} catch (Exception e) {
}
}
}
}

View File

@ -1,33 +0,0 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/

View File

@ -1,15 +0,0 @@
#
# Build stage
#
FROM docker.io/library/maven:3.9.2-eclipse-temurin-17 AS build
COPY src /home/app/src
COPY pom.xml /home/app
RUN mvn -f /home/app/pom.xml clean package
#
# Package stage
#
FROM docker.io/library/eclipse-temurin:17-jre
COPY --from=build /home/app/target/demo-0.0.1-SNAPSHOT.jar /usr/local/lib/demo.jar
EXPOSE 8080
ENTRYPOINT ["java","-jar","/usr/local/lib/demo.jar"]

View File

@ -1,42 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,13 +0,0 @@
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}

View File

@ -1,14 +0,0 @@
package com.example.demo;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DemoController {
@RequestMapping("/")
public String root() {
return "test";
}
}

View File

@ -1,13 +0,0 @@
package com.example.demo;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DemoApplicationTests {
@Test
void contextLoads() {
}
}

View File

@ -1,72 +1,20 @@
- job:
name: nebulous-iot-dpp-orchestrator-build-container-images
parent: nebulous-build-container-images
dependencies:
- name: opendev-buildset-registry
soft: false
name: nebulous-iot-dpp-orchestrator-build-java-libraries
parent: nebulous-build-java-libraries
provides:
- nebulous-iot-dpp-orchestrator-container-images
description: Build the container images.
files: &image_files
- ^java-spring-boot-demo/
vars: &image_vars
promote_container_image_job: nebulous-iot-dpp-orchestrator-upload-container-images
container_images:
- context: java-spring-boot-demo
registry: quay.io
repository: quay.io/nebulous/iot-dpp-orchestrator-java-spring-boot-demo
namespace: nebulous
repo_shortname: iot-dpp-orchestrator-java-spring-boot-demo
repo_description: ""
- nebulous-iot-dpp-orchestrator-java-libraries
description: Build the java libraries to be used as ActiveMQ plugin.
files: &library_files
- ^iot_dpp/
vars: &library_vars
java_libraries:
- context: iot_dpp
- job:
name: nebulous-iot-dpp-orchestrator-upload-container-images
parent: nebulous-upload-container-images
dependencies:
- name: opendev-buildset-registry
soft: false
name: nebulous-iot-dpp-orchestrator-upload-java-libraries
parent: nebulous-upload-java-libraries
provides:
- nebulous-iot-dpp-orchestrator-container-images
description: Build and upload the container images.
files: *image_files
vars: *image_vars
- job:
name: nebulous-iot-dpp-orchestrator-promote-container-images
parent: nebulous-promote-container-images
description: Promote previously uploaded container images.
files: *image_files
vars: *image_vars
- job:
name: nebulous-iot-dpp-orchestrator-hadolint
parent: nebulous-hadolint
description: Run Hadolint on Dockerfile(s).
vars:
dockerfiles:
- java-spring-boot-demo/Dockerfile
- job:
name: nebulous-iot-dpp-orchestrator-helm-lint
parent: nebulous-helm-lint
description: Run helm lint on Helm charts.
vars:
helm_charts:
- ./charts/nebulous-iot-dpp-orchestrator
- job:
name: nebulous-iot-dpp-orchestrator-apply-helm-charts
parent: nebulous-apply-helm-charts
dependencies:
- name: opendev-buildset-registry
soft: false
- name: nebulous-iot-dpp-orchestrator-build-container-images
soft: true
- name: nebulous-iot-dpp-orchestrator-upload-container-images
soft: true
requires:
- nebulous-iot-dpp-orchestrator-container-images
description: Deploy a Kubernetes cluster and apply charts.
vars:
helm_charts:
nebulous-iot-dpp-orchestrator: ./charts/nebulous-iot-dpp-orchestrator
- nebulous-liot-dpp-orchestrator-java-libraries
description: Build and upload the java libraries.
files: *library_files
vars: *library_vars

View File

@ -1,20 +1,12 @@
- project:
check:
jobs:
- opendev-buildset-registry
- nebulous-iot-dpp-orchestrator-helm-lint
- nebulous-iot-dpp-orchestrator-build-container-images
- nebulous-iot-dpp-orchestrator-hadolint
- nebulous-platform-apply-helm-charts
- nebulous-iot-dpp-orchestrator-build-java-libraries
- nox-linters
gate:
jobs:
- opendev-buildset-registry
- nebulous-iot-dpp-orchestrator-helm-lint
- nebulous-iot-dpp-orchestrator-upload-container-images
- nebulous-iot-dpp-orchestrator-hadolint
- nebulous-platform-apply-helm-charts
- nebulous-iot-dpp-orchestrator-build-java-libraries
- nox-linters
promote:
jobs:
- nebulous-iot-dpp-orchestrator-promote-container-images
- nebulous-iot-dpp-orchestrator-upload-java-libraries