diff --git a/.gitignore b/.gitignore index e0e9b8b..33debbf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,12 @@ +/iot_dpp/.classpath +/iot_dpp/.gitattributes +/iot_dpp/.project +/iot_dpp/.settings/org.eclipse.buildship.core.prefs +/iot_dpp/.settings/org.eclipse.jdt.core.prefs +/iot_dpp/gradlew.bat +/iot_dpp/gradle/wrapper/gradle-wrapper.jar +/iot_dpp/gradle/wrapper/gradle-wrapper.properties +/iot_dpp/gradlew __pycache__/ .nox/ +/iot_dpp/maven-repo diff --git a/charts/nebulous-iot-dpp-orchestrator/.helmignore b/charts/nebulous-iot-dpp-orchestrator/.helmignore deleted file mode 100644 index 0e8a0eb..0000000 --- a/charts/nebulous-iot-dpp-orchestrator/.helmignore +++ /dev/null @@ -1,23 +0,0 @@ -# Patterns to ignore when building packages. -# This supports shell glob matching, relative path matching, and -# negation (prefixed with !). Only one pattern per line. -.DS_Store -# Common VCS dirs -.git/ -.gitignore -.bzr/ -.bzrignore -.hg/ -.hgignore -.svn/ -# Common backup files -*.swp -*.bak -*.tmp -*.orig -*~ -# Various IDEs -.project -.idea/ -*.tmproj -.vscode/ diff --git a/charts/nebulous-iot-dpp-orchestrator/Chart.yaml b/charts/nebulous-iot-dpp-orchestrator/Chart.yaml deleted file mode 100644 index 2d6930e..0000000 --- a/charts/nebulous-iot-dpp-orchestrator/Chart.yaml +++ /dev/null @@ -1,24 +0,0 @@ -apiVersion: v2 -name: nebulous-iot-dpp-orchestrator -description: A Helm chart for Kubernetes - -# A chart can be either an 'application' or a 'library' chart. -# -# Application charts are a collection of templates that can be packaged into versioned archives -# to be deployed. -# -# Library charts provide useful utilities or functions for the chart developer. They're included as -# a dependency of application charts to inject those utilities and functions into the rendering -# pipeline. Library charts do not define any templates and therefore cannot be deployed. -type: application - -# This is the chart version. This version number should be incremented each time you make changes -# to the chart and its templates, including the app version. -# Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.0 - -# This is the version number of the application being deployed. This version number should be -# incremented each time you make changes to the application. Versions are not expected to -# follow Semantic Versioning. They should reflect the version the application is using. -# It is recommended to use it with quotes. -appVersion: "latest" diff --git a/charts/nebulous-iot-dpp-orchestrator/templates/NOTES.txt b/charts/nebulous-iot-dpp-orchestrator/templates/NOTES.txt deleted file mode 100644 index 3ff9284..0000000 --- a/charts/nebulous-iot-dpp-orchestrator/templates/NOTES.txt +++ /dev/null @@ -1,22 +0,0 @@ -1. Get the application URL by running these commands: -{{- if .Values.ingress.enabled }} -{{- range $host := .Values.ingress.hosts }} - {{- range .paths }} - http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }} - {{- end }} -{{- end }} -{{- else if contains "NodePort" .Values.service.type }} - export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "nebulous-iot-dpp-orchestrator.fullname" . }}) - export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}") - echo http://$NODE_IP:$NODE_PORT -{{- else if contains "LoadBalancer" .Values.service.type }} - NOTE: It may take a few minutes for the LoadBalancer IP to be available. - You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "nebulous-iot-dpp-orchestrator.fullname" . }}' - export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "nebulous-iot-dpp-orchestrator.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}") - echo http://$SERVICE_IP:{{ .Values.service.port }} -{{- else if contains "ClusterIP" .Values.service.type }} - export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "nebulous-iot-dpp-orchestrator.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}") - export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}") - echo "Visit http://127.0.0.1:8080 to use your application" - kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT -{{- end }} diff --git a/charts/nebulous-iot-dpp-orchestrator/templates/_helpers.tpl b/charts/nebulous-iot-dpp-orchestrator/templates/_helpers.tpl deleted file mode 100644 index 005b1cd..0000000 --- a/charts/nebulous-iot-dpp-orchestrator/templates/_helpers.tpl +++ /dev/null @@ -1,62 +0,0 @@ -{{/* -Expand the name of the chart. -*/}} -{{- define "nebulous-iot-dpp-orchestrator.name" -}} -{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} -{{- end }} - -{{/* -Create a default fully qualified app name. -We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). -If release name contains chart name it will be used as a full name. -*/}} -{{- define "nebulous-iot-dpp-orchestrator.fullname" -}} -{{- if .Values.fullnameOverride }} -{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} -{{- else }} -{{- $name := default .Chart.Name .Values.nameOverride }} -{{- if contains $name .Release.Name }} -{{- .Release.Name | trunc 63 | trimSuffix "-" }} -{{- else }} -{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} -{{- end }} -{{- end }} -{{- end }} - -{{/* -Create chart name and version as used by the chart label. -*/}} -{{- define "nebulous-iot-dpp-orchestrator.chart" -}} -{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} -{{- end }} - -{{/* -Common labels -*/}} -{{- define "nebulous-iot-dpp-orchestrator.labels" -}} -helm.sh/chart: {{ include "nebulous-iot-dpp-orchestrator.chart" . }} -{{ include "nebulous-iot-dpp-orchestrator.selectorLabels" . }} -{{- if .Chart.AppVersion }} -app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} -{{- end }} -app.kubernetes.io/managed-by: {{ .Release.Service }} -{{- end }} - -{{/* -Selector labels -*/}} -{{- define "nebulous-iot-dpp-orchestrator.selectorLabels" -}} -app.kubernetes.io/name: {{ include "nebulous-iot-dpp-orchestrator.name" . }} -app.kubernetes.io/instance: {{ .Release.Name }} -{{- end }} - -{{/* -Create the name of the service account to use -*/}} -{{- define "nebulous-iot-dpp-orchestrator.serviceAccountName" -}} -{{- if .Values.serviceAccount.create }} -{{- default (include "nebulous-iot-dpp-orchestrator.fullname" .) .Values.serviceAccount.name }} -{{- else }} -{{- default "default" .Values.serviceAccount.name }} -{{- end }} -{{- end }} diff --git a/charts/nebulous-iot-dpp-orchestrator/templates/deployment.yaml b/charts/nebulous-iot-dpp-orchestrator/templates/deployment.yaml deleted file mode 100644 index 997d367..0000000 --- a/charts/nebulous-iot-dpp-orchestrator/templates/deployment.yaml +++ /dev/null @@ -1,61 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: {{ include "nebulous-iot-dpp-orchestrator.fullname" . }} - labels: - {{- include "nebulous-iot-dpp-orchestrator.labels" . | nindent 4 }} -spec: - {{- if not .Values.autoscaling.enabled }} - replicas: {{ .Values.replicaCount }} - {{- end }} - selector: - matchLabels: - {{- include "nebulous-iot-dpp-orchestrator.selectorLabels" . | nindent 6 }} - template: - metadata: - {{- with .Values.podAnnotations }} - annotations: - {{- toYaml . | nindent 8 }} - {{- end }} - labels: - {{- include "nebulous-iot-dpp-orchestrator.selectorLabels" . | nindent 8 }} - spec: - {{- with .Values.imagePullSecrets }} - imagePullSecrets: - {{- toYaml . | nindent 8 }} - {{- end }} - serviceAccountName: {{ include "nebulous-iot-dpp-orchestrator.serviceAccountName" . }} - securityContext: - {{- toYaml .Values.podSecurityContext | nindent 8 }} - containers: - - name: {{ .Chart.Name }} - securityContext: - {{- toYaml .Values.securityContext | nindent 12 }} - image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" - imagePullPolicy: {{ .Values.image.pullPolicy }} - ports: - - name: http - containerPort: 8080 - protocol: TCP - livenessProbe: - httpGet: - path: / - port: http - readinessProbe: - httpGet: - path: / - port: http - resources: - {{- toYaml .Values.resources | nindent 12 }} - {{- with .Values.nodeSelector }} - nodeSelector: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.affinity }} - affinity: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.tolerations }} - tolerations: - {{- toYaml . | nindent 8 }} - {{- end }} diff --git a/charts/nebulous-iot-dpp-orchestrator/templates/hpa.yaml b/charts/nebulous-iot-dpp-orchestrator/templates/hpa.yaml deleted file mode 100644 index 044871a..0000000 --- a/charts/nebulous-iot-dpp-orchestrator/templates/hpa.yaml +++ /dev/null @@ -1,28 +0,0 @@ -{{- if .Values.autoscaling.enabled }} -apiVersion: autoscaling/v2beta1 -kind: HorizontalPodAutoscaler -metadata: - name: {{ include "nebulous-iot-dpp-orchestrator.fullname" . }} - labels: - {{- include "nebulous-iot-dpp-orchestrator.labels" . | nindent 4 }} -spec: - scaleTargetRef: - apiVersion: apps/v1 - kind: Deployment - name: {{ include "nebulous-iot-dpp-orchestrator.fullname" . }} - minReplicas: {{ .Values.autoscaling.minReplicas }} - maxReplicas: {{ .Values.autoscaling.maxReplicas }} - metrics: - {{- if .Values.autoscaling.targetCPUUtilizationPercentage }} - - type: Resource - resource: - name: cpu - targetAverageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }} - {{- end }} - {{- if .Values.autoscaling.targetMemoryUtilizationPercentage }} - - type: Resource - resource: - name: memory - targetAverageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }} - {{- end }} -{{- end }} diff --git a/charts/nebulous-iot-dpp-orchestrator/templates/ingress.yaml b/charts/nebulous-iot-dpp-orchestrator/templates/ingress.yaml deleted file mode 100644 index 2e48e69..0000000 --- a/charts/nebulous-iot-dpp-orchestrator/templates/ingress.yaml +++ /dev/null @@ -1,61 +0,0 @@ -{{- if .Values.ingress.enabled -}} -{{- $fullName := include "nebulous-iot-dpp-orchestrator.fullname" . -}} -{{- $svcPort := .Values.service.port -}} -{{- if and .Values.ingress.className (not (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion)) }} - {{- if not (hasKey .Values.ingress.annotations "kubernetes.io/ingress.class") }} - {{- $_ := set .Values.ingress.annotations "kubernetes.io/ingress.class" .Values.ingress.className}} - {{- end }} -{{- end }} -{{- if semverCompare ">=1.19-0" .Capabilities.KubeVersion.GitVersion -}} -apiVersion: networking.k8s.io/v1 -{{- else if semverCompare ">=1.14-0" .Capabilities.KubeVersion.GitVersion -}} -apiVersion: networking.k8s.io/v1beta1 -{{- else -}} -apiVersion: extensions/v1beta1 -{{- end }} -kind: Ingress -metadata: - name: {{ $fullName }} - labels: - {{- include "nebulous-iot-dpp-orchestrator.labels" . | nindent 4 }} - {{- with .Values.ingress.annotations }} - annotations: - {{- toYaml . | nindent 4 }} - {{- end }} -spec: - {{- if and .Values.ingress.className (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion) }} - ingressClassName: {{ .Values.ingress.className }} - {{- end }} - {{- if .Values.ingress.tls }} - tls: - {{- range .Values.ingress.tls }} - - hosts: - {{- range .hosts }} - - {{ . | quote }} - {{- end }} - secretName: {{ .secretName }} - {{- end }} - {{- end }} - rules: - {{- range .Values.ingress.hosts }} - - host: {{ .host | quote }} - http: - paths: - {{- range .paths }} - - path: {{ .path }} - {{- if and .pathType (semverCompare ">=1.18-0" $.Capabilities.KubeVersion.GitVersion) }} - pathType: {{ .pathType }} - {{- end }} - backend: - {{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }} - service: - name: {{ $fullName }} - port: - number: {{ $svcPort }} - {{- else }} - serviceName: {{ $fullName }} - servicePort: {{ $svcPort }} - {{- end }} - {{- end }} - {{- end }} -{{- end }} diff --git a/charts/nebulous-iot-dpp-orchestrator/templates/service.yaml b/charts/nebulous-iot-dpp-orchestrator/templates/service.yaml deleted file mode 100644 index c26e205..0000000 --- a/charts/nebulous-iot-dpp-orchestrator/templates/service.yaml +++ /dev/null @@ -1,15 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: {{ include "nebulous-iot-dpp-orchestrator.fullname" . }} - labels: - {{- include "nebulous-iot-dpp-orchestrator.labels" . | nindent 4 }} -spec: - type: {{ .Values.service.type }} - ports: - - port: {{ .Values.service.port }} - targetPort: http - protocol: TCP - name: http - selector: - {{- include "nebulous-iot-dpp-orchestrator.selectorLabels" . | nindent 4 }} diff --git a/charts/nebulous-iot-dpp-orchestrator/templates/serviceaccount.yaml b/charts/nebulous-iot-dpp-orchestrator/templates/serviceaccount.yaml deleted file mode 100644 index edda3d2..0000000 --- a/charts/nebulous-iot-dpp-orchestrator/templates/serviceaccount.yaml +++ /dev/null @@ -1,12 +0,0 @@ -{{- if .Values.serviceAccount.create -}} -apiVersion: v1 -kind: ServiceAccount -metadata: - name: {{ include "nebulous-iot-dpp-orchestrator.serviceAccountName" . }} - labels: - {{- include "nebulous-iot-dpp-orchestrator.labels" . | nindent 4 }} - {{- with .Values.serviceAccount.annotations }} - annotations: - {{- toYaml . | nindent 4 }} - {{- end }} -{{- end }} diff --git a/charts/nebulous-iot-dpp-orchestrator/values.yaml b/charts/nebulous-iot-dpp-orchestrator/values.yaml deleted file mode 100644 index 29c2570..0000000 --- a/charts/nebulous-iot-dpp-orchestrator/values.yaml +++ /dev/null @@ -1,82 +0,0 @@ -# Default values for nebulous-iot-dpp-orchestrator. -# This is a YAML-formatted file. -# Declare variables to be passed into your templates. - -replicaCount: 1 - -image: - repository: "quay.io/nebulous/iot-dpp-orchestrator-java-spring-boot-demo" - pullPolicy: IfNotPresent - # Overrides the image tag whose default is the chart appVersion. - tag: "" - -imagePullSecrets: [] -nameOverride: "" -fullnameOverride: "" - -serviceAccount: - # Specifies whether a service account should be created - create: true - # Annotations to add to the service account - annotations: {} - # The name of the service account to use. - # If not set and create is true, a name is generated using the fullname template - name: "" - -podAnnotations: {} - -podSecurityContext: {} - # fsGroup: 2000 - -securityContext: {} - # capabilities: - # drop: - # - ALL - # readOnlyRootFilesystem: true - # runAsNonRoot: true - # runAsUser: 1000 - -service: - type: ClusterIP - port: 80 - -ingress: - enabled: false - className: "" - annotations: {} - # kubernetes.io/ingress.class: nginx - # kubernetes.io/tls-acme: "true" - hosts: - - host: chart-example.local - paths: - - path: / - pathType: ImplementationSpecific - tls: [] - # - secretName: chart-example-tls - # hosts: - # - chart-example.local - -resources: {} - # We usually recommend not to specify default resources and to leave this as a conscious - # choice for the user. This also increases chances charts run on environments with little - # resources, such as Minikube. If you do want to specify resources, uncomment the following - # lines, adjust them as necessary, and remove the curly braces after 'resources:'. - # limits: - # cpu: 100m - # memory: 128Mi - # requests: - # cpu: 100m - # memory: 128Mi - -autoscaling: - enabled: false - minReplicas: 1 - maxReplicas: 100 - targetCPUUtilizationPercentage: 80 - # targetMemoryUtilizationPercentage: 80 - -nodeSelector: {} - -tolerations: [] - -affinity: {} diff --git a/iot_dpp/README.md b/iot_dpp/README.md new file mode 100644 index 0000000..dbb4f21 --- /dev/null +++ b/iot_dpp/README.md @@ -0,0 +1,31 @@ +# IoT data processing pipelines orchestration tool + +The IoT data processing pipelines orchestration tool allows users to declaratively express data transformation pipelines by identifying their main components (data sources, transformation operations and data consumers) and interlace them to conform data transformation flows. Once defined by the user, the orchestration of these data transformation pipelines is handled by NebulOuS core, allocating/deallocating computational resources whenever needed to adapt to the current workload. The tool also offers the opportunity to the user to monitor the execution of these pipelines and detect any error occurring on them. + + +## Documentation + +The repository contains the source code for two ActiveMQ Artemis plugins necessary for implementing the NebulOuS IoT data processing pipelines orchestration tool. + +### MessageGroupIDAnnotationPlugin + +ActiveMQ Artemis plugin for extracting the value to be used as JMSXGroupID from the message. + +### MessageLifecycleMonitoringPlugin + +ActiveMQ Artemis plugin for tracking the lifecycle of messages inside an ActiveMQ cluster. On each step of the message lifecycle (a producer writes a message to the cluster, the message is delivered to a consumer, the consumer ACKs the message), the plugin generates events with relevant information that can be used for understanding how IoT applications components on NebulOuS are communicating. +## Installation + +Build the source code. + +Configure your ActiveMQ Artemis to use the generated plugins. Follow the [documentation](https://activemq.apache.org/components/artemis/documentation/latest/broker-plugins.html) + + +## Authors + +- [Robert Sanfeliu Prat (Eurecat)](robert.sanfeliu@eurecat.org) + + +## Acknowledgements + + - NebulOuS is a project Funded by the European Union. Views and opinions expressed are however those of the author(s) only and do not necessarily reflect those of the European Union or European Commission. Neither the European Union nor the granting authority can be held responsible for them. | Grant Agreement No.: 101070516 diff --git a/iot_dpp/pom.xml b/iot_dpp/pom.xml new file mode 100644 index 0000000..cf33fad --- /dev/null +++ b/iot_dpp/pom.xml @@ -0,0 +1,150 @@ + + + 4.0.0 + + eu.nebulouscloud + iot-dpp-orchestrator + 0.1.0 + jar + + 11 + 11 + + + + + exn-java-connector + https://s01.oss.sonatype.org/content/repositories/snapshots/ + + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.19.0 + + + + org.apache.activemq + artemis-commons + 2.31.2 + + + + + org.apache.activemq + artemis-protocols + 2.31.2 + pom + + + + + org.apache.activemq + artemis-mqtt-protocol + 2.31.2 + + + + + org.apache.activemq + artemis-jms-server + 2.31.2 + + + + org.apache.activemq + artemis-amqp-protocol + 2.31.2 + + + org.apache.activemq + artemis-core-client + 2.31.2 + + + org.apache.activemq + artemis-jms-client + 2.31.2 + + + com.jayway.jsonpath + json-path + 2.8.0 + + + com.fasterxml.jackson.core + jackson-databind + 2.16.0 + + + + org.apache.activemq + artemis-server + 2.31.2 + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + test + + + + eu.nebulouscloud + exn-connector-java + 1.0-SNAPSHOT + test + + + + org.apache.qpid + protonj2-client + 1.0.0-M18 + test + + + + org.apache.activemq.tooling + activemq-tooling + 6.0.1 + test + pom + + + + org.junit.jupiter + junit-jupiter-api + 5.8.1 + test + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + + eu.nebulouscloud.iot_dpp_orchestrator + + + false + false + + + + + + + + \ No newline at end of file diff --git a/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/GroupIDExtractionParameters.java b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/GroupIDExtractionParameters.java new file mode 100644 index 0000000..102bf74 --- /dev/null +++ b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/GroupIDExtractionParameters.java @@ -0,0 +1,43 @@ +package eut.nebulouscloud.iot_dpp; + +/** + * DTO for handling information regarding the ActiveMQ Group Id value extraction criteria for a certain address. + */ +public class GroupIDExtractionParameters { + + public enum GroupIDExpressionSource { + PROPERTY, BODY_JSON, BODY_XML + } + /** + * Message part that contains the group ID + */ + private GroupIDExpressionSource source; + + /** + * Expression to extract the group Id from the message + */ + private String expression; + + public GroupIDExtractionParameters(GroupIDExpressionSource source, String expression) { + super(); + this.source = source; + this.expression = expression; + } + + public GroupIDExpressionSource getSource() { + return source; + } + + public void setSource(GroupIDExpressionSource source) { + this.source = source; + } + + public String getExpression() { + return expression; + } + + public void setExpression(String expression) { + this.expression = expression; + } + +} diff --git a/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/MessageGroupIDAnnotationPlugin.java b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/MessageGroupIDAnnotationPlugin.java new file mode 100644 index 0000000..472c758 --- /dev/null +++ b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/MessageGroupIDAnnotationPlugin.java @@ -0,0 +1,176 @@ +package eut.nebulouscloud.iot_dpp; + +import java.io.ByteArrayInputStream; +import java.util.Map; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathFactory; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.commons.lang3.NotImplementedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.jsonpath.JsonPath; + +/** + * ActiveMQ Artemis plugin for extracting the value to be used as JMSXGroupID from the message. + */ +public class MessageGroupIDAnnotationPlugin implements ActiveMQServerMessagePlugin { + static Logger LOGGER = LoggerFactory.getLogger(MessageGroupIDAnnotationPlugin.class); + static ObjectMapper om = new ObjectMapper(); + public static SimpleString MESSAGE_GROUP_ANNOTATION = new SimpleString("JMSXGroupID"); + + /** + * Dictionary with groupId extraction parameters. + * Each key in the dictionary corresponds to an address, the value is the information on how to extract the group Id from the messages on that address. + */ + Map groupIdExtractionParameterPerAddress; + + + public MessageGroupIDAnnotationPlugin() + { + + } + + public MessageGroupIDAnnotationPlugin(Map groupIdExtractionParameterPerTopic) + { + this.groupIdExtractionParameterPerAddress = groupIdExtractionParameterPerTopic; + } + + /** + * Extracts the string value for an annotation from the message. Returns + * defaultValue if annotation is not found or null. + * + * @param message + * @param annotation + * @param defaultValue + * @return + */ + private String getStringAnnotationValueOrDefault(Message message, SimpleString annotation, String defaultValue) { + + Object annotationValue = message.getAnnotation(annotation); + if (annotationValue == null) + return defaultValue; + if (annotationValue instanceof SimpleString) + return ((SimpleString) annotationValue).toString(); + return (String) annotationValue; + } + + @Override + public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, + boolean noAutoCreateQueue) { + + String destinationTopic = message.getAddress(); + if (groupIdExtractionParameterPerAddress.containsKey(destinationTopic)) { + String groupID = getStringAnnotationValueOrDefault(message, MESSAGE_GROUP_ANNOTATION, null); + if (groupID != null) { + LOGGER.debug("Message already assigned to groupID " + groupID); + return; + } + groupID = extractGroupID(message, groupIdExtractionParameterPerAddress.get(destinationTopic)); + if (groupID != null) { + LOGGER.debug(String.format("Message %s assigned group id %s", message.toString(), groupID)); + message.putStringProperty(MESSAGE_GROUP_ANNOTATION, new SimpleString(groupID)); + } else { + LOGGER.warn(String.format("GroupId assigned to message %s is null. Ignoring it", message.toString())); + } + + } else { + LOGGER.debug("Ignoring message with address " + destinationTopic); + } + + } + + public static String extractGroupID(Message message, GroupIDExtractionParameters expression) { + + switch (expression.getSource()) { + case BODY_JSON: + return extractGroupIDFromJSONBody(message, expression.getExpression()); + case BODY_XML: + return extractGroupIdFromXMLBody(message,expression.getExpression()); + case PROPERTY: + return extractGroupIDFromMessageProperties(message, expression.getExpression()); + default: + throw new NotImplementedException( + String.format("Source %s not supported in extractGroupID", expression.getSource())); + + } + + } + + public static String extractGroupIdFromXMLBody(Message message, String xpath) { + if (message instanceof LargeServerMessage) { + LOGGER.error("Can't extract group ID from XML body on LargeServerMessages"); + return null; + } + String body = message.getStringBody(); + if (body == null) { + LOGGER.error( + String.format("Can't extract group id from XML body on message %s since body is null", message)); + } + + try { + DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder builder = builderFactory.newDocumentBuilder(); + Document xmlDocument = builder.parse( new ByteArrayInputStream(message.getStringBody().getBytes())); + XPath xPath = XPathFactory.newInstance().newXPath(); + return (String) xPath.compile(xpath).evaluate(xmlDocument, XPathConstants.STRING); + } catch (Exception ex) { + LOGGER.error( + String.format("Can't extract group id on path %s from XML body of message %s", xpath, message), + ex); + return null; + } + + } + + /** + * https://restfulapi.net/json-jsonpath/ + * @param message + * @param jsonPath + * @return + */ + public static String extractGroupIDFromJSONBody(Message message, String jsonPath) { + if (message instanceof LargeServerMessage) { + LOGGER.error("Can't extract group id from JSON body on LargeServerMessages"); + return null; + } + String body = message.getStringBody(); + if (body == null) { + LOGGER.error( + String.format("Can't extract group id from JSON body on message %s since body is null", message)); + } + + try { + return JsonPath.read(body, jsonPath).toString(); + } catch (Exception ex) { + LOGGER.error( + String.format("Can't extract group id on path %s from JSON body of message %s", jsonPath, message), + ex); + return null; + } + + } + + public static String extractGroupIDFromMessageProperties(Message message, String sourceProperty) { + if (!message.getPropertyNames().contains(sourceProperty)) { + LOGGER.debug(String.format( + "Can't extract groupID from property '%s' since this property doesn't exist in the message %s ", + sourceProperty, message.toString())); + return null; + } + return message.getObjectProperty(sourceProperty).toString(); + } + +} diff --git a/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/EMSMessageLifecycleMonitoringPlugin.java b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/EMSMessageLifecycleMonitoringPlugin.java new file mode 100644 index 0000000..78a8393 --- /dev/null +++ b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/EMSMessageLifecycleMonitoringPlugin.java @@ -0,0 +1,33 @@ +package eut.nebulouscloud.iot_dpp.monitoring; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eut.nebulouscloud.iot_dpp.monitoring.events.MessageLifecycleEvent; + +/** + * Extension of the abstract class MessageLifecycleMonitoringPlugin that handles + * sending the generated events to the EMS + */ +public class EMSMessageLifecycleMonitoringPlugin extends MessageLifecycleMonitoringPlugin { + + Logger LOGGER = LoggerFactory.getLogger(EMSMessageLifecycleMonitoringPlugin.class); + protected ObjectMapper om = new ObjectMapper(); + private static final String EMS_METRICS_TOPIC = "eu.nebulouscloud.monitoring.realtime.iot.messaging_events"; + + @Override + protected void notifyEvent(MessageLifecycleEvent event) { + String str; + try { + str = om.writeValueAsString(event); + EventManagementSystemPublisher.send(EMS_METRICS_TOPIC, str); + } catch (Exception e) { + LOGGER.error("Unable to send event to the EMS", e); + } + + } + +} diff --git a/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/EventManagementSystemPublisher.java b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/EventManagementSystemPublisher.java new file mode 100644 index 0000000..0e4a47d --- /dev/null +++ b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/EventManagementSystemPublisher.java @@ -0,0 +1,23 @@ +package eut.nebulouscloud.iot_dpp.monitoring; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventManagementSystemPublisher { + + Logger LOGGER = LoggerFactory.getLogger(EventManagementSystemPublisher.class); + + private static EventManagementSystemPublisher instance; + + private EventManagementSystemPublisher() { + + } + + public static void send(String address, String payload) { + if (instance == null) + instance = new EventManagementSystemPublisher(); + + //TODO: implement actual sending of the message to the EMS using the EXN ActiveMQ library + + } +} diff --git a/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/MessageLifecycleMonitoringPlugin.java b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/MessageLifecycleMonitoringPlugin.java new file mode 100644 index 0000000..36e21b1 --- /dev/null +++ b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/MessageLifecycleMonitoringPlugin.java @@ -0,0 +1,252 @@ +package eut.nebulouscloud.iot_dpp.monitoring; + +import java.util.Date; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; +import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.reader.MessageUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eut.nebulouscloud.iot_dpp.monitoring.events.MessageAcknowledgedEvent; +import eut.nebulouscloud.iot_dpp.monitoring.events.MessageDeliveredEvent; +import eut.nebulouscloud.iot_dpp.monitoring.events.MessageLifecycleEvent; +import eut.nebulouscloud.iot_dpp.monitoring.events.MessagePublishedEvent; + +/** + * ActiveMQ Artemis plugin for tracking the lifecycle of messages inside an + * ActiveMQ cluster. On each step of the message lifecycle (a producer writes a + * message to the cluster, the message is delivered to a consumer, the consumer + * ACKs the message), the plugin generates events with relevant information that + * can be used for understanding how IoT applications components on NebulOuS are + * communicating. + */ +public abstract class MessageLifecycleMonitoringPlugin implements ActiveMQServerMessagePlugin { + + Logger LOGGER = LoggerFactory.getLogger(MessageLifecycleMonitoringPlugin.class); + protected ObjectMapper om = new ObjectMapper(); + /** + * Annotation used to identify the ActiveMQ cluster node that first received a + * message receiving + */ + SimpleString RECEIVING_NODE_ANNOTATION = new SimpleString("NEBULOUS_RECEIVING_NODE"); + + /** + * Annotation used to track the original id of the message when it was first + * received by the cluster. + * + */ + SimpleString ORIGINAL_MESSAGE_ID_ANNOTATION = new SimpleString("NEBULOUS_ORIGINAL_MESSAGE_ID"); + + /** + * Annotation used to mark the moment a message was sent to the client + */ + SimpleString DELIVER_TIMESTAMP_ANNOTATION = new SimpleString("NEBULOUS_DELIVER_TIMESTAMP"); + + /** + * Abstract method called whenever a message event is raised: - a producer + * writes a message to the cluster - the message is delivered to a consumer - + * the consumer ACKs the message + * + * @param event + */ + protected abstract void notifyEvent(MessageLifecycleEvent event); + + /** + * Retrieves server name from consumer reference. + * + * @param consumer + * @return + */ + private String getServerName(ServerConsumer consumer) { + // TODO: Find a better way of retrieving server name rather than parsing the + // string representation of the consumer object. + return consumer.toString().split("server=ActiveMQServerImpl::name=")[1].split("]")[0]; + } + + /** + * Retrieves the estimated message size. + * + * @param message + * @return + * @throws ActiveMQException + */ + private long getEstimatedMessageSize(Message message) throws ActiveMQException { + if (message instanceof LargeServerMessage) + return ((LargeServerMessage) message).getLargeBody().getBodySize(); + return message.getEncodeSize(); + } + + /** + * Extracts the string value for an annotation from the message. Returns + * defaultValue if annotation is not found or null. + * + * @param message + * @param annotation + * @param defaultValue + * @return + */ + private String getStringAnnotationValueOrDefault(Message message, SimpleString annotation, String defaultValue) { + + Object annotationValue = message.getAnnotation(annotation); + if (annotationValue == null) + return defaultValue; + if (annotationValue instanceof SimpleString) + return ((SimpleString) annotationValue).toString(); + return (String) annotationValue; + } + + private Long getLongAnnotationValueOrDefault(Message message, SimpleString annotation, Long defaultValue) { + + Object annotationValue = message.getAnnotation(annotation); + if (annotationValue == null) + return defaultValue; + return (Long) annotationValue; + } + + /** + * Constructs a MessagePublishedMonitoringEvent from a Message reference + * + * @param message + * @return + * @throws Exception + */ + private MessagePublishedEvent buildPublishEvent(Message message) throws Exception { + String clientId = message.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME); + long size = getEstimatedMessageSize(message); + String sourceNode = getStringAnnotationValueOrDefault(message, RECEIVING_NODE_ANNOTATION, null); + long messageId = getLongAnnotationValueOrDefault(message, ORIGINAL_MESSAGE_ID_ANNOTATION, -1l); + return new MessagePublishedEvent(messageId, message.getAddress(), sourceNode, clientId, size, + message.getTimestamp()); + } + + /** + * Constructs a MessageDeliveredMonitoringEvent from a Message and consumer + * reference + * + * @param message + * @param consumer + * @return + * @throws Exception + */ + private MessageDeliveredEvent buildDeliverEvent(Message message, ServerConsumer consumer) + throws Exception { + MessagePublishedEvent publishEvent = buildPublishEvent(message); + String nodeName = getServerName(consumer); + Long deliveryTimeStamp = getLongAnnotationValueOrDefault(message, DELIVER_TIMESTAMP_ANNOTATION, -1l); + return new MessageDeliveredEvent(publishEvent, consumer.getQueueAddress().toString(), nodeName, + consumer.getConnectionClientID(), deliveryTimeStamp); + } + + + + @Override + public void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, + boolean noAutoCreateQueue, RoutingStatus result) throws ActiveMQException { + try { + /** + * Ignore activemq control messages + */ + if (message.getAddress().startsWith("$sys") || message.getAddress().startsWith("activemq")) + return; + + + /** + * If message is already annotated with any of our custom annotations means that + * the message is not being sent by a ActiveMQ external client (it is due to + * intra-cluster communication). + * Ignore it + */ + if (message.getAnnotation(RECEIVING_NODE_ANNOTATION) != null + || message.getAnnotation(ORIGINAL_MESSAGE_ID_ANNOTATION) != null) { + return; + } + + String nodeName = ((ActiveMQServerImpl) ((PostOfficeImpl) ((ServerSessionImpl) session).postOffice).getServer()) + .getConfiguration().getName(); + if (message.getAnnotation(RECEIVING_NODE_ANNOTATION) == null) + message.setAnnotation(RECEIVING_NODE_ANNOTATION, nodeName); + + if (message.getAnnotation(ORIGINAL_MESSAGE_ID_ANNOTATION) == null) + message.setAnnotation(ORIGINAL_MESSAGE_ID_ANNOTATION, message.getMessageID()); + + MessageLifecycleEvent event = buildPublishEvent(message); + LOGGER.info("MessagePublishedMonitoringEvent: " + om.writeValueAsString(event)); + notifyEvent(event); + } catch (Exception e) { + LOGGER.error("afterSend failed", e); + } + + } + + /** + * Anotates the message with the current time as value for + * DELIVER_TIMESTAMP_ANNOTATION + */ + @Override + public void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException { + reference.getMessage().setAnnotation(DELIVER_TIMESTAMP_ANNOTATION, new Date().getTime()); + } + + @Override + public void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException { + + try { + + /** + * Ignore activemq control messages + */ + if (reference.getQueue().getAddress().toString().startsWith("$sys") + || reference.getQueue().getAddress().toString().startsWith("activemq")) + return; + + Message message = reference.getMessage(); + + MessageDeliveredEvent deliverEvent = buildDeliverEvent(message, consumer); + LOGGER.info("MessageDeliveredMonitoringEvent: " + om.writeValueAsString(deliverEvent)); + notifyEvent(deliverEvent); + + } catch (Exception e) { + LOGGER.error("afterDeliver failed", e); + } + } + + @Override + public void messageAcknowledged(Transaction tx, MessageReference reference, AckReason reason, + ServerConsumer consumer) throws ActiveMQException { + + try { + + /** + * Ignore activemq control messages + */ + if (reference.getQueue().getAddress().toString().startsWith("$sys") + || reference.getQueue().getAddress().toString().startsWith("activemq") || consumer == null) + return; + + Message message = reference.getMessage(); + MessageDeliveredEvent deliverEvent = buildDeliverEvent(message, consumer); + MessageAcknowledgedEvent ackEvent = new MessageAcknowledgedEvent(deliverEvent, + new Date().getTime()); + LOGGER.info("MessageAcknowledgedMonitoringEvent: " + om.writeValueAsString(ackEvent)); + notifyEvent(ackEvent); + } catch (Exception e) { + LOGGER.error("messageAcknowledged failed", e); + } + } + +} diff --git a/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/events/MessageAcknowledgedEvent.java b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/events/MessageAcknowledgedEvent.java new file mode 100644 index 0000000..a0abc13 --- /dev/null +++ b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/events/MessageAcknowledgedEvent.java @@ -0,0 +1,56 @@ +package eut.nebulouscloud.iot_dpp.monitoring.events; + +/** + * Event generated when a message is acknowledged by a client of the the pub/sub + * system + */ +public class MessageAcknowledgedEvent extends MessageLifecycleEvent { + + /** + * Node where the client is connected + */ + public final String node; + + /** + * Id of the client recieving the message + */ + public final String clientId; + + /** + * Node where the message was originaly published + */ + public final String publishNode; + + /** + * Id of the client that originally published the message + */ + public final String publishClientId; + + /** + * Addres where the message was originally published + */ + public final String publishAddress; + + /** + * Time when the message was published (milliseconds since epoch). + */ + public final long publishTimestamp; + + /** + * Time when the message was delivered to the client. + */ + public final long deliverTimestamp; + + public MessageAcknowledgedEvent(MessageDeliveredEvent deliverEvent,long timestamp) { + super(deliverEvent.messageId, deliverEvent.messageAddress, deliverEvent.messageSize, timestamp); + this.node = deliverEvent.node; + this.clientId = deliverEvent.clientId; + this.publishAddress = deliverEvent.messageAddress; + this.publishNode = deliverEvent.publishNode; + this.publishClientId = deliverEvent.publishClientId; + this.publishTimestamp = deliverEvent.publishTimestamp; + this.deliverTimestamp = deliverEvent.timestamp; + } + + +} diff --git a/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/events/MessageDeliveredEvent.java b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/events/MessageDeliveredEvent.java new file mode 100644 index 0000000..4a68f07 --- /dev/null +++ b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/events/MessageDeliveredEvent.java @@ -0,0 +1,53 @@ +package eut.nebulouscloud.iot_dpp.monitoring.events; + +/** + * Event generated when a message is delivered to a client of the the pub/sub + * system + */ +public class MessageDeliveredEvent extends MessageLifecycleEvent { + + /** + * Node where the client is connected + */ + public final String node; + + /** + * Id of the client recieving the message + */ + public final String clientId; + + /** + * Node where the message was originaly published + */ + public final String publishNode; + + /** + * Addres where the message was originally published + */ + public final String publishAddress; + + /** + * + * /** Id of the client that originally published the message + */ + public final String publishClientId; + + /** + * Time when the message was published (milliseconds since epoch). + */ + public final long publishTimestamp; + + public MessageDeliveredEvent(MessagePublishedEvent publishEvent, String address, String node, + String clientId, long timestamp) { + super(publishEvent.messageId, address, publishEvent.messageSize, timestamp); + this.node = node; + this.clientId = clientId; + this.publishAddress = publishEvent.messageAddress; + this.publishNode = publishEvent.node; + this.publishClientId = publishEvent.clientId; + this.publishTimestamp = publishEvent.timestamp; + } + + + +} diff --git a/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/events/MessageLifecycleEvent.java b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/events/MessageLifecycleEvent.java new file mode 100644 index 0000000..42ef634 --- /dev/null +++ b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/events/MessageLifecycleEvent.java @@ -0,0 +1,36 @@ +package eut.nebulouscloud.iot_dpp.monitoring.events; + +/** + * A base class for modelling events related to messages + */ +public abstract class MessageLifecycleEvent { + /** + * Id of the message + */ + public final long messageId; + + /** + * Timestamp when the event occurred + */ + public final long timestamp; + + /** + * Size of the message (in bytes) + */ + public final long messageSize; + + + public final String messageAddress; + + + public MessageLifecycleEvent(long messageId,String messageAddress,long size, long timestamp) + { + this.timestamp = timestamp; + this.messageAddress = messageAddress; + this.messageId = messageId; + this.messageSize = size; + + } + + +} diff --git a/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/events/MessagePublishedEvent.java b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/events/MessagePublishedEvent.java new file mode 100644 index 0000000..d9d6520 --- /dev/null +++ b/iot_dpp/src/main/java/eut/nebulouscloud/iot_dpp/monitoring/events/MessagePublishedEvent.java @@ -0,0 +1,24 @@ +package eut.nebulouscloud.iot_dpp.monitoring.events; + +/** + * Event generated when a message is published to the pub/sub system + */ +public class MessagePublishedEvent extends MessageLifecycleEvent{ + + /** + * The name of the pub/sub cluster where the messag was published + */ + public final String node; + + /** + * The Id of the client that published the message + */ + public final String clientId; + + public MessagePublishedEvent(long messageId, String address, String node, String clientId, long size,long timestamp) { + super(messageId,address,size,timestamp); + this.node = node; + this.clientId = clientId; + } + +} diff --git a/iot_dpp/src/main/resources/log4j2.properties b/iot_dpp/src/main/resources/log4j2.properties new file mode 100644 index 0000000..60fae2b --- /dev/null +++ b/iot_dpp/src/main/resources/log4j2.properties @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Log4J 2 configuration + +# Monitor config file every X seconds for updates +monitorInterval = 5 + +loggers = activemq +logger.activemq.name = org.apache.activemq +logger.activemq.level = INFO + + +#logger.org.apache.activemq.artemis.core.server.cluster.level = TRACE + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = console + +# Console appender +appender.console.type=Console +appender.console.name=console +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d %-5level [%logger] %msg%n diff --git a/iot_dpp/src/test/java/eut/nebulouscloud/iot_dpp/MessageGroupIdAnnotationPluginTest.java b/iot_dpp/src/test/java/eut/nebulouscloud/iot_dpp/MessageGroupIdAnnotationPluginTest.java new file mode 100644 index 0000000..b68f389 --- /dev/null +++ b/iot_dpp/src/test/java/eut/nebulouscloud/iot_dpp/MessageGroupIdAnnotationPluginTest.java @@ -0,0 +1,249 @@ +package eut.nebulouscloud.iot_dpp; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +//remove +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.security.CheckType; +import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; +import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eut.nebulouscloud.iot_dpp.GroupIDExtractionParameters.GroupIDExpressionSource; + +class MessageGroupIdAnnotationPluginTest { + static Logger LOGGER = LoggerFactory.getLogger(MessageGroupIdAnnotationPluginTest.class); + + /** + * Creates a local ActiveMQ server listening at localhost:61616. The server + * accepts requests from any user. Configures the MessageGroupIDAnnotationPlugin and + * sets it to use the provided groupIdExtractionParameterPerTopic dict. + * + * @param events The groupIdExtractionParameterPerTopic dict. Its contents can be changed by the test code during the test execution and the plugin will react accordingly. + * @return the created EmbeddedActiveMQ instance. + * @throws Exception + */ + static private EmbeddedActiveMQ createLocalServer(int port, + Map groupIdExtractionParameterPerTopic) throws Exception { + Configuration config = new ConfigurationImpl(); + + String foldersRoot = "data/" + new Date().getTime() + "/data_" + port; + config.setBindingsDirectory(foldersRoot + "/bindings"); + config.setJournalDirectory(foldersRoot + "/journal"); + config.setJournalRetentionDirectory(foldersRoot + "/journalRetention"); + config.setLargeMessagesDirectory(foldersRoot + "/lm"); + config.setNodeManagerLockDirectory(foldersRoot + "/nodeManagerLock"); + config.setPagingDirectory(foldersRoot + "/paging"); + config.addConnectorConfiguration("serverAt" + port + "Connector", "tcp://localhost:" + port); + config.addAcceptorConfiguration("netty", "tcp://localhost:" + port); + config.getBrokerMessagePlugins().add(new MessageGroupIDAnnotationPlugin(groupIdExtractionParameterPerTopic)); + EmbeddedActiveMQ server = new EmbeddedActiveMQ(); + server.setSecurityManager(new ActiveMQSecurityManager() { + @Override + public boolean validateUserAndRole(String user, String password, Set roles, CheckType checkType) { + return true; + } + + @Override + public boolean validateUser(String user, String password) { + return true; + } + }); + server.setConfiguration(config); + server.start(); + Thread.sleep(1000); + return server; + } + + static EmbeddedActiveMQ server = null; + static Map groupIdExtractionParameterPerAddress = new HashMap(); + static Session session; + static Connection connection; + @BeforeAll + static void createServer() throws Exception { + LOGGER.info("createServer"); + server = createLocalServer(6161, groupIdExtractionParameterPerAddress); + ActiveMQJMSConnectionFactory connectionFactory = new ActiveMQJMSConnectionFactory("tcp://localhost:6161","artemis", "artemis" + ); + connection = connectionFactory.createConnection(); + connection.start(); + + + + } + + @AfterAll + static void destroyServer() { + try { + server.stop(); + } catch (Exception ex) { + } + } + + @BeforeEach + void before() throws JMSException { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + groupIdExtractionParameterPerAddress.clear(); + } + + @AfterEach + void adter() { + if(session!=null) {try{session.close();}catch(Exception ex) {}} + } + + /** + * Test it can extract a simple JSON value + * @throws Exception + */ + @Test + void JSONTest1() throws Exception { + + String address = "testaddress"; + groupIdExtractionParameterPerAddress.put(address, + new GroupIDExtractionParameters(GroupIDExpressionSource.BODY_JSON, "address.city")); + Destination destination = session.createQueue(address); + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + String text = "{\"address\":{\"city\":\"Lleida\",\"street\":\"C\\\\Cavallers\"},\"name\":\"Jon doe\",\"age\":\"22\"}"; + TextMessage originalMessage = session.createTextMessage(text); + producer.send(originalMessage); + Message message = consumer.receive(); + String value = ((ActiveMQTextMessage)message).getCoreMessage().getStringProperty(MessageGroupIDAnnotationPlugin.MESSAGE_GROUP_ANNOTATION.toString()); + assertEquals("Lleida", value); + + } + + /** + * In case of invalid message body, MESSAGE_GROUP_ANNOTATION should remain null + * @throws Exception + */ + @Test + void JSONTest2() throws Exception { + + String address = "testaddress"; + groupIdExtractionParameterPerAddress.put(address, + new GroupIDExtractionParameters(GroupIDExpressionSource.BODY_JSON, "address.city")); + Destination destination = session.createQueue(address); + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + String text = "{\"ad2\"}"; + TextMessage originalMessage = session.createTextMessage(text); + producer.send(originalMessage); + Message message = consumer.receive(); + String value = ((ActiveMQTextMessage)message).getCoreMessage().getStringProperty(MessageGroupIDAnnotationPlugin.MESSAGE_GROUP_ANNOTATION.toString()); + assertEquals(null, value); + + } + + /** + * Test it can extract a complex JSON value + * @throws Exception + */ + @Test + void JSONTest3() throws Exception { + + String address = "testaddress"; + groupIdExtractionParameterPerAddress.put(address, + new GroupIDExtractionParameters(GroupIDExpressionSource.BODY_JSON, "address")); + Destination destination = session.createQueue(address); + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + String text = "{\"address\":{\"city\":\"Lleida\",\"street\":\"C\\\\Cavallers\"},\"name\":\"Jon doe\",\"age\":\"22\"}"; + TextMessage originalMessage = session.createTextMessage(text); + producer.send(originalMessage); + Message message = consumer.receive(); + String value = ((ActiveMQTextMessage)message).getCoreMessage().getStringProperty(MessageGroupIDAnnotationPlugin.MESSAGE_GROUP_ANNOTATION.toString()); + assertEquals("{city=Lleida, street=C\\Cavallers}", value); + + } + + + /** + * Test it can extract a simple XML value + * @throws Exception + */ + @Test + void XMLTest1() throws Exception { + + String address = "testaddress"; + groupIdExtractionParameterPerAddress.put(address, + new GroupIDExtractionParameters(GroupIDExpressionSource.BODY_XML, "/root/address/city")); + Destination destination = session.createQueue(address); + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + String text = "\r\n" + + "\r\n" + + "
\r\n" + + " Lleida\r\n" + + " C\\Cavallers\r\n" + + "
\r\n" + + " 22\r\n" + + " Jon doe\r\n" + + "
"; + TextMessage originalMessage = session.createTextMessage(text); + producer.send(originalMessage); + Message message = consumer.receive(); + + String value = ((ActiveMQTextMessage)message).getCoreMessage().getStringProperty(MessageGroupIDAnnotationPlugin.MESSAGE_GROUP_ANNOTATION.toString()); + assertEquals("Lleida", value); + + } + + /** + * Test it can extract a complex XML value + * @throws Exception + */ + @Test + void XMLTest2() throws Exception { + + String address = "testaddress"; + groupIdExtractionParameterPerAddress.put(address, + new GroupIDExtractionParameters(GroupIDExpressionSource.BODY_XML, "/root/address")); + Destination destination = session.createQueue(address); + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + String text = "\r\n" + + "\r\n" + + "
\r\n" + + " Lleida\r\n" + + " C\\Cavallers\r\n" + + "
\r\n" + + " 22\r\n" + + " Jon doe\r\n" + + "
"; + TextMessage originalMessage = session.createTextMessage(text); + producer.send(originalMessage); + Message message = consumer.receive(); + String value = ((ActiveMQTextMessage)message).getCoreMessage().getStringProperty(MessageGroupIDAnnotationPlugin.MESSAGE_GROUP_ANNOTATION.toString()); + assertTrue(value!=null); + assertTrue(value.contains("Lleida")); + assertTrue(value.contains("Cavallers")); + } + +} diff --git a/iot_dpp/src/test/java/eut/nebulouscloud/iot_dpp/MessageMonitoringPluginTest.java b/iot_dpp/src/test/java/eut/nebulouscloud/iot_dpp/MessageMonitoringPluginTest.java new file mode 100644 index 0000000..b9afefb --- /dev/null +++ b/iot_dpp/src/test/java/eut/nebulouscloud/iot_dpp/MessageMonitoringPluginTest.java @@ -0,0 +1,467 @@ +package eut.nebulouscloud.iot_dpp; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.security.CheckType; +import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; +import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; +import org.apache.qpid.protonj2.client.ClientOptions; +import org.apache.qpid.protonj2.client.Delivery; +import org.apache.qpid.protonj2.client.DeliveryMode; +import org.apache.qpid.protonj2.client.Receiver; +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eut.nebulouscloud.iot_dpp.monitoring.MessageLifecycleMonitoringPlugin; +import eut.nebulouscloud.iot_dpp.monitoring.events.MessageAcknowledgedEvent; +import eut.nebulouscloud.iot_dpp.monitoring.events.MessageDeliveredEvent; +import eut.nebulouscloud.iot_dpp.monitoring.events.MessageLifecycleEvent; +import eut.nebulouscloud.iot_dpp.monitoring.events.MessagePublishedEvent; + +/** + * Test the correct functionaltiy of MessageLifecycleMonitoringPlugin + */ +class MessageMonitoringPluginTest { + + static Logger LOGGER = LoggerFactory.getLogger(MessageMonitoringPluginTest.class); + + /** + * Creates a local ActiveMQ server listening at localhost:61616. The server + * accepts requests from any user. Configures the MessageMonitoringPluging and + * sets it to store generated events in the provided events list + * + * @param events A list that will contain all the events generated by the + * MessageMonitoringPluging + * @return the created MessageMonitoringPluging instance. + * @throws Exception + */ + private EmbeddedActiveMQ createActiveMQBroker(String nodeName, List events, int port, + List otherServers) throws Exception { + Configuration config = new ConfigurationImpl(); + config.setName(nodeName); + String foldersRoot = "data/" + new Date().getTime() + "/data_" + port; + config.setBindingsDirectory(foldersRoot + "/bindings"); + config.setJournalDirectory(foldersRoot + "/journal"); + config.setJournalRetentionDirectory(foldersRoot + "/journalRetention"); + config.setLargeMessagesDirectory(foldersRoot + "/lm"); + config.setNodeManagerLockDirectory(foldersRoot + "/nodeManagerLock"); + config.setPagingDirectory(foldersRoot + "/paging"); + config.addConnectorConfiguration("serverAt" + port + "Connector", "tcp://localhost:" + port); + config.addAcceptorConfiguration("netty", "tcp://localhost:" + port); + // config.setPersistenceEnabled(true); + + + ClusterConnectionConfiguration cluster = new ClusterConnectionConfiguration(); + cluster.setAddress(""); + cluster.setConnectorName("serverAt" + port + "Connector"); + cluster.setName("my-cluster"); + cluster.setAllowDirectConnectionsOnly(false); + cluster.setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND); + cluster.setRetryInterval(100); + config.setClusterConfigurations(List.of(cluster)); + if (otherServers != null) { + for (Integer otherPort : otherServers) { + cluster.setStaticConnectors(List.of("serverAt" + otherPort + "Connector")); + config.addConnectorConfiguration("serverAt" + otherPort + "Connector", "tcp://localhost:" + otherPort); + } + } + MessageLifecycleMonitoringPlugin plugin = new MessageLifecycleMonitoringPlugin() { + @Override + protected void notifyEvent(MessageLifecycleEvent event) { + events.add(event); + } + }; + + config.getBrokerMessagePlugins().add(plugin); + EmbeddedActiveMQ server = new EmbeddedActiveMQ(); + server.setSecurityManager(new ActiveMQSecurityManager() { + @Override + public boolean validateUserAndRole(String user, String password, Set roles, CheckType checkType) { + return true; + } + + @Override + public boolean validateUser(String user, String password) { + return true; + } + }); + server.setConfiguration(config); + server.start(); + return server; + } + + private EmbeddedActiveMQ createActiveMQBroker(String nodeName, List events) + throws Exception { + return createActiveMQBroker(nodeName, events, 61616, null); + } + + /** + * Test message monitoring plugin when MQTT clients are interacting with the + * ActiveMQ broker. + * + * @throws Exception + */ + @Test + void MQTTTestSingleNode() throws Exception { + + /** + * Create a local ActiveMQ server + */ + + EmbeddedActiveMQ broker = null; + try { + List raisedEvents = new LinkedList(); + broker = createActiveMQBroker("test-server", raisedEvents); + + /** + * Create a persistent subscription on a topic. Close the consumer afterwards. + */ + String testTopic = "test-topic"; + IMqttClient consumer = new MqttClient("tcp://localhost:61616", "consumer"); + consumer.setCallback(new MqttCallback() { + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + LOGGER.info("messageArrived: " + topic + " " + new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + + @Override + public void connectionLost(Throwable cause) { + } + }); + MqttConnectOptions opts = new MqttConnectOptions(); + opts.setCleanSession(false); + consumer.connect(opts); + consumer.subscribe(testTopic, 2); + consumer.disconnect(); + + /** + * Publish a message to the topic + */ + IMqttClient publisher = new MqttClient("tcp://localhost:61616", "publisher"); + publisher.connect(); + MqttMessage message = new MqttMessage("hola".getBytes()); + message.setQos(2); + publisher.publish(testTopic, message); + + /** + * Wait some time and re-connect the consumer. + */ + Thread.sleep(1000); + long reconnectTime = new Date().getTime(); + consumer.connect(opts); + Thread.sleep(1000); // Give some time for consumer to receive messages + + /** + * Validate that the MessagePublishedMonitoringEvent, + * MessageDeliveredMonitoringEvent and MessageAcknowledgedMonitoringEvent are + * properly generated + */ + Optional publishEventOpt = raisedEvents.stream() + .filter(e -> e.getClass() == MessagePublishedEvent.class).findFirst(); + assertEquals(true, publishEventOpt.isPresent()); + MessagePublishedEvent publishEvent = (MessagePublishedEvent) publishEventOpt.get(); + + Optional deliverEventOpt = raisedEvents.stream() + .filter(e -> e.getClass() == MessageDeliveredEvent.class).findFirst(); + assertEquals(true, deliverEventOpt.isPresent()); + MessageDeliveredEvent deliverEvent = (MessageDeliveredEvent) deliverEventOpt.get(); + + Optional ackEventOptional = raisedEvents.stream() + .filter(e -> e.getClass() == MessageAcknowledgedEvent.class).findFirst(); + assertEquals(true, ackEventOptional.isPresent()); + MessageAcknowledgedEvent ackEvent = (MessageAcknowledgedEvent) ackEventOptional.get(); + + assertEquals(publishEvent.messageAddress, deliverEvent.publishAddress); + assertEquals(publishEvent.clientId, deliverEvent.publishClientId); + assertEquals(publishEvent.messageId, deliverEvent.messageId); + // assertEquals(publishEvent.messageSize, deliverEvent.messageSize); + assertEquals(publishEvent.node, deliverEvent.publishNode); + assertEquals(publishEvent.timestamp, deliverEvent.publishTimestamp); + assertTrue(deliverEvent.timestamp > reconnectTime); + assertEquals(publishEvent.messageId, ackEvent.messageId); + assertEquals(ackEvent.deliverTimestamp, deliverEvent.timestamp); + assertTrue(ackEvent.timestamp > reconnectTime); + + } finally { + try { + broker.stop(); + } catch (Exception e) { + } + } + + } + + /** + * Test message monitoring plugin when AMQP clients are interacting with the + * ActiveMQ broker. + * + * @throws Exception + */ + @Test + void AMQPTestSingleNode() throws Exception { + + EmbeddedActiveMQ broker = null; + try { + /** + * Create a local server + */ + List raisedEvents = new LinkedList(); + broker = createActiveMQBroker("test-broker", raisedEvents); + /** + * Create a durable subscription on a topic. Close the receiver immediately. + */ + String address = "test-addr"; + org.apache.qpid.protonj2.client.Connection receiverConnection; + Receiver receiver; + { + ClientOptions opts = new ClientOptions(); + opts.id("receiver"); + org.apache.qpid.protonj2.client.Client container = org.apache.qpid.protonj2.client.Client.create(opts); + receiverConnection = container.connect("localhost", 61616).openFuture().get(); + org.apache.qpid.protonj2.client.ReceiverOptions options = new org.apache.qpid.protonj2.client.ReceiverOptions() + .deliveryMode(DeliveryMode.AT_LEAST_ONCE).autoAccept(false).autoSettle(false); + org.apache.qpid.protonj2.client.Session session = receiverConnection.openSession().openFuture().get(); + + receiver = session.openDurableReceiver(address, "my-durable-sub", options); + receiver.openFuture().get(10, TimeUnit.SECONDS); + /* receiver.close(); */ + session.close(); + container.close(); + } + + /** + * Send some messages to the topic + */ + org.apache.qpid.protonj2.client.Sender sender; + org.apache.qpid.protonj2.client.Connection senderConnection; + { + ClientOptions opts = new ClientOptions(); + opts.id("sender"); + + org.apache.qpid.protonj2.client.Client container = org.apache.qpid.protonj2.client.Client.create(opts); + senderConnection = container.connect("localhost", 61616).openFuture().get(); + org.apache.qpid.protonj2.client.SenderOptions options = new org.apache.qpid.protonj2.client.SenderOptions() + .deliveryMode(DeliveryMode.AT_MOST_ONCE); + org.apache.qpid.protonj2.client.Session session = senderConnection.openSession().openFuture().get(); + sender = session.openSender(address, options); + } + + final org.apache.qpid.protonj2.client.Message message = org.apache.qpid.protonj2.client.Message + .create("Hello World").durable(true); + Map deliveryAnnotations = new HashMap<>(); + final org.apache.qpid.protonj2.client.Tracker tracker = sender.send(message, deliveryAnnotations); + tracker.settlementFuture().isDone(); + tracker.settlementFuture().get().settled(); + + /** + * Afte waiting some time, re-connect the receiver to the durable subscription. + */ + LOGGER.info("Wait for read"); + Thread.sleep(100); + + long receiveTime = new Date().getTime(); + { + ClientOptions opts = new ClientOptions(); + opts.id("receiver"); + org.apache.qpid.protonj2.client.Client container = org.apache.qpid.protonj2.client.Client.create(opts); + receiverConnection = container.connect("localhost", 61616).openFuture().get(); + org.apache.qpid.protonj2.client.ReceiverOptions options = new org.apache.qpid.protonj2.client.ReceiverOptions() + .deliveryMode(DeliveryMode.AT_LEAST_ONCE).autoAccept(false); + org.apache.qpid.protonj2.client.Session session = receiverConnection.openSession().openFuture().get(); + + receiver = session.openDurableReceiver(address, "my-durable-sub", options); + receiver.openFuture().get(10, TimeUnit.SECONDS); + } + + /** + * Receive a message and ACK it slightly after. + */ + Delivery d = receiver.receive(); + LOGGER.info("Wait for ack"); + Thread.sleep(1000); + long acceptTime = new Date().getTime(); + d.accept(); + + sender.closeAsync().get(10, TimeUnit.SECONDS); + receiver.closeAsync().get(10, TimeUnit.SECONDS); + receiverConnection.close(); + senderConnection.close(); + + /** + * Validate that the MessagePublishedMonitoringEvent, + * MessageDeliveredMonitoringEvent and MessageAcknowledgedMonitoringEvent are + * properly generated + */ + assertEquals(3, raisedEvents.size()); + + Optional publishEventOpt = raisedEvents.stream() + .filter(e -> e.getClass() == MessagePublishedEvent.class).findFirst(); + assertEquals(true, publishEventOpt.isPresent()); + MessagePublishedEvent publishEvent = (MessagePublishedEvent) publishEventOpt.get(); + + Optional deliverEventOpt = raisedEvents.stream() + .filter(e -> e.getClass() == MessageDeliveredEvent.class).findFirst(); + assertEquals(true, deliverEventOpt.isPresent()); + MessageDeliveredEvent deliverEvent = (MessageDeliveredEvent) deliverEventOpt.get(); + + Optional ackEventOptional = raisedEvents.stream() + .filter(e -> e.getClass() == MessageAcknowledgedEvent.class).findFirst(); + assertEquals(true, ackEventOptional.isPresent()); + MessageAcknowledgedEvent ackEvent = (MessageAcknowledgedEvent) ackEventOptional.get(); + + assertEquals(sender.client().containerId(), publishEvent.clientId); + assertEquals(address, publishEvent.messageAddress); + assertEquals(receiver.client().containerId(), deliverEvent.clientId); + assertEquals(address, deliverEvent.messageAddress); + assertEquals(publishEvent.messageAddress, deliverEvent.publishAddress); + assertEquals(publishEvent.clientId, deliverEvent.publishClientId); + assertEquals(publishEvent.messageId, deliverEvent.messageId); + // assertEquals(publishEvent.messageSize, deliverEvent.messageSize); + assertEquals(publishEvent.node, deliverEvent.publishNode); + assertEquals(publishEvent.timestamp, deliverEvent.publishTimestamp); + + assertTrue(receiveTime < deliverEvent.timestamp); + assertEquals(publishEvent.timestamp, ackEvent.publishTimestamp); + assertTrue(acceptTime < ackEvent.timestamp); + } finally { + try { + broker.stop(); + } catch (Exception e) { + } + } + } + + /** + * Test that MQTT message flow is properly registered in a clustered + * environment. - Create two ActiveMQ brokers (A and B) - Create a consumer on + * broker B on "test/atopic" - Create a producer on broker A and send a message + * to "test/atopic" - Consumer on broker B receives the message. + * + * @throws Exception + */ + @Test + void MQTTTestClustered() throws Exception { + + EmbeddedActiveMQ testBrokerA = null; + EmbeddedActiveMQ testBrokerB = null; + try { + /** + * Create a local ActiveMQ server + */ + final int brokerAPort = 6161; + final int brokerBPort = 6162; + List raisedEventsBrokerA = new LinkedList(); + testBrokerA = createActiveMQBroker("test-broker-A", raisedEventsBrokerA, brokerAPort, List.of(brokerBPort)); + + List raisedEventsBrokerB = new LinkedList(); + testBrokerB = createActiveMQBroker("test-broker-B", raisedEventsBrokerB, brokerBPort,null); + + /** + * Create a persistent subscription on a topic. Close the consumer afterwards. + */ + String testTopic = "test/atopic"; + IMqttClient consumerBrokerB = new MqttClient("tcp://localhost:" + brokerBPort, "consumer"); + MqttConnectOptions opts = new MqttConnectOptions(); + opts.setCleanSession(true); + consumerBrokerB.connect(opts); + consumerBrokerB.setCallback(new MqttCallback() { + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + LOGGER.info("messageArrived: " + topic + " " + new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + // TODO Auto-generated method stub + + } + + @Override + public void connectionLost(Throwable cause) { + // TODO Auto-generated method stub + + } + }); + consumerBrokerB.subscribe(testTopic, 2); + /** + * Publish a message to the topic + */ + IMqttClient publisherBrokerA = new MqttClient("tcp://localhost:" + brokerAPort, "publisher"); + publisherBrokerA.connect(); + MqttMessage message = new MqttMessage("hola from publisher".getBytes()); + message.setQos(2); + publisherBrokerA.publish(testTopic, message); + + Thread.sleep(5 * 1000); // Give some time for consumer to receive messages + + /** + * Validate that the MessagePublishedMonitoringEvent, + * MessageDeliveredMonitoringEvent and MessageAcknowledgedMonitoringEvent are + * properly generated + */ + Optional publishEventOpt = raisedEventsBrokerA.stream() + .filter(e -> e.getClass() == MessagePublishedEvent.class).findFirst(); + assertEquals(true, publishEventOpt.isPresent()); + MessagePublishedEvent publishEvent = (MessagePublishedEvent) publishEventOpt.get(); + + Optional deliverEventOpt = raisedEventsBrokerB.stream() + .filter(e -> e.getClass() == MessageDeliveredEvent.class).findFirst(); + assertEquals(true, deliverEventOpt.isPresent()); + MessageDeliveredEvent deliverEvent = (MessageDeliveredEvent) deliverEventOpt.get(); + + Optional ackEventOptional = raisedEventsBrokerB.stream() + .filter(e -> e.getClass() == MessageAcknowledgedEvent.class).findFirst(); + assertEquals(true, ackEventOptional.isPresent()); + MessageAcknowledgedEvent ackEvent = (MessageAcknowledgedEvent) ackEventOptional.get(); + + assertEquals(publishEvent.messageAddress, deliverEvent.publishAddress); + assertEquals(publishEvent.clientId, deliverEvent.publishClientId); + assertEquals(publishEvent.messageId, deliverEvent.messageId); + // assertEquals(publishEvent.messageSize, deliverEvent.messageSize); + assertEquals(publishEvent.node, deliverEvent.publishNode); + assertEquals(publishEvent.timestamp, deliverEvent.publishTimestamp); + // assertTrue(deliverEvent.timestamp > reconnectTime); + assertEquals(publishEvent.messageId, ackEvent.messageId); + assertEquals(ackEvent.deliverTimestamp, deliverEvent.timestamp); + // assertTrue(ackEvent.timestamp > reconnectTime); + + } finally { + try { + testBrokerA.stop(); + } catch (Exception e) { + } + try { + testBrokerB.stop(); + } catch (Exception e) { + } + } + } +} diff --git a/java-spring-boot-demo/.gitignore b/java-spring-boot-demo/.gitignore deleted file mode 100644 index 549e00a..0000000 --- a/java-spring-boot-demo/.gitignore +++ /dev/null @@ -1,33 +0,0 @@ -HELP.md -target/ -!.mvn/wrapper/maven-wrapper.jar -!**/src/main/**/target/ -!**/src/test/**/target/ - -### STS ### -.apt_generated -.classpath -.factorypath -.project -.settings -.springBeans -.sts4-cache - -### IntelliJ IDEA ### -.idea -*.iws -*.iml -*.ipr - -### NetBeans ### -/nbproject/private/ -/nbbuild/ -/dist/ -/nbdist/ -/.nb-gradle/ -build/ -!**/src/main/**/build/ -!**/src/test/**/build/ - -### VS Code ### -.vscode/ diff --git a/java-spring-boot-demo/Dockerfile b/java-spring-boot-demo/Dockerfile deleted file mode 100644 index 427e30e..0000000 --- a/java-spring-boot-demo/Dockerfile +++ /dev/null @@ -1,15 +0,0 @@ -# -# Build stage -# -FROM docker.io/library/maven:3.9.2-eclipse-temurin-17 AS build -COPY src /home/app/src -COPY pom.xml /home/app -RUN mvn -f /home/app/pom.xml clean package - -# -# Package stage -# -FROM docker.io/library/eclipse-temurin:17-jre -COPY --from=build /home/app/target/demo-0.0.1-SNAPSHOT.jar /usr/local/lib/demo.jar -EXPOSE 8080 -ENTRYPOINT ["java","-jar","/usr/local/lib/demo.jar"] diff --git a/java-spring-boot-demo/pom.xml b/java-spring-boot-demo/pom.xml deleted file mode 100644 index 76e0f0e..0000000 --- a/java-spring-boot-demo/pom.xml +++ /dev/null @@ -1,42 +0,0 @@ - - - 4.0.0 - - org.springframework.boot - spring-boot-starter-parent - 3.1.0 - - - com.example - demo - 0.0.1-SNAPSHOT - demo - Demo project for Spring Boot - - 17 - - - - org.springframework.boot - spring-boot-starter - - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.boot - spring-boot-starter-test - test - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - - diff --git a/java-spring-boot-demo/src/main/java/com/example/demo/DemoApplication.java b/java-spring-boot-demo/src/main/java/com/example/demo/DemoApplication.java deleted file mode 100644 index 094d95b..0000000 --- a/java-spring-boot-demo/src/main/java/com/example/demo/DemoApplication.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.example.demo; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - -@SpringBootApplication -public class DemoApplication { - - public static void main(String[] args) { - SpringApplication.run(DemoApplication.class, args); - } - -} diff --git a/java-spring-boot-demo/src/main/java/com/example/demo/DemoController.java b/java-spring-boot-demo/src/main/java/com/example/demo/DemoController.java deleted file mode 100644 index 101591b..0000000 --- a/java-spring-boot-demo/src/main/java/com/example/demo/DemoController.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.example.demo; - -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -@RestController -public class DemoController { - - @RequestMapping("/") - public String root() { - return "test"; - } - -} diff --git a/java-spring-boot-demo/src/main/resources/application.properties b/java-spring-boot-demo/src/main/resources/application.properties deleted file mode 100644 index e69de29..0000000 diff --git a/java-spring-boot-demo/src/test/java/com/example/demo/DemoApplicationTests.java b/java-spring-boot-demo/src/test/java/com/example/demo/DemoApplicationTests.java deleted file mode 100644 index eaa9969..0000000 --- a/java-spring-boot-demo/src/test/java/com/example/demo/DemoApplicationTests.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.example.demo; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -@SpringBootTest -class DemoApplicationTests { - - @Test - void contextLoads() { - } - -} diff --git a/zuul.d/jobs.yaml b/zuul.d/jobs.yaml index 05995fc..4a0e220 100644 --- a/zuul.d/jobs.yaml +++ b/zuul.d/jobs.yaml @@ -1,72 +1,20 @@ - job: - name: nebulous-iot-dpp-orchestrator-build-container-images - parent: nebulous-build-container-images - dependencies: - - name: opendev-buildset-registry - soft: false + name: nebulous-iot-dpp-orchestrator-build-java-libraries + parent: nebulous-build-java-libraries provides: - - nebulous-iot-dpp-orchestrator-container-images - description: Build the container images. - files: &image_files - - ^java-spring-boot-demo/ - vars: &image_vars - promote_container_image_job: nebulous-iot-dpp-orchestrator-upload-container-images - container_images: - - context: java-spring-boot-demo - registry: quay.io - repository: quay.io/nebulous/iot-dpp-orchestrator-java-spring-boot-demo - namespace: nebulous - repo_shortname: iot-dpp-orchestrator-java-spring-boot-demo - repo_description: "" + - nebulous-iot-dpp-orchestrator-java-libraries + description: Build the java libraries to be used as ActiveMQ plugin. + files: &library_files + - ^iot_dpp/ + vars: &library_vars + java_libraries: + - context: iot_dpp - job: - name: nebulous-iot-dpp-orchestrator-upload-container-images - parent: nebulous-upload-container-images - dependencies: - - name: opendev-buildset-registry - soft: false + name: nebulous-iot-dpp-orchestrator-upload-java-libraries + parent: nebulous-upload-java-libraries provides: - - nebulous-iot-dpp-orchestrator-container-images - description: Build and upload the container images. - files: *image_files - vars: *image_vars - -- job: - name: nebulous-iot-dpp-orchestrator-promote-container-images - parent: nebulous-promote-container-images - description: Promote previously uploaded container images. - files: *image_files - vars: *image_vars - -- job: - name: nebulous-iot-dpp-orchestrator-hadolint - parent: nebulous-hadolint - description: Run Hadolint on Dockerfile(s). - vars: - dockerfiles: - - java-spring-boot-demo/Dockerfile - -- job: - name: nebulous-iot-dpp-orchestrator-helm-lint - parent: nebulous-helm-lint - description: Run helm lint on Helm charts. - vars: - helm_charts: - - ./charts/nebulous-iot-dpp-orchestrator - -- job: - name: nebulous-iot-dpp-orchestrator-apply-helm-charts - parent: nebulous-apply-helm-charts - dependencies: - - name: opendev-buildset-registry - soft: false - - name: nebulous-iot-dpp-orchestrator-build-container-images - soft: true - - name: nebulous-iot-dpp-orchestrator-upload-container-images - soft: true - requires: - - nebulous-iot-dpp-orchestrator-container-images - description: Deploy a Kubernetes cluster and apply charts. - vars: - helm_charts: - nebulous-iot-dpp-orchestrator: ./charts/nebulous-iot-dpp-orchestrator + - nebulous-liot-dpp-orchestrator-java-libraries + description: Build and upload the java libraries. + files: *library_files + vars: *library_vars diff --git a/zuul.d/project.yaml b/zuul.d/project.yaml index 5fa00bd..f913056 100644 --- a/zuul.d/project.yaml +++ b/zuul.d/project.yaml @@ -1,20 +1,12 @@ - project: check: jobs: - - opendev-buildset-registry - - nebulous-iot-dpp-orchestrator-helm-lint - - nebulous-iot-dpp-orchestrator-build-container-images - - nebulous-iot-dpp-orchestrator-hadolint - - nebulous-platform-apply-helm-charts + - nebulous-iot-dpp-orchestrator-build-java-libraries - nox-linters gate: jobs: - - opendev-buildset-registry - - nebulous-iot-dpp-orchestrator-helm-lint - - nebulous-iot-dpp-orchestrator-upload-container-images - - nebulous-iot-dpp-orchestrator-hadolint - - nebulous-platform-apply-helm-charts + - nebulous-iot-dpp-orchestrator-build-java-libraries - nox-linters promote: jobs: - - nebulous-iot-dpp-orchestrator-promote-container-images + - nebulous-iot-dpp-orchestrator-upload-java-libraries