EMS: Replaced use of Melodic integration model classes with Map's and adapted the code accordingly. Renamed endpoint from 'camelModel...' to 'appModel...', and also removed occurrences of 'camel' and 'esb' (former replaced with 'app' and later with 'others')

This commit is contained in:
ipatini
2023-09-18 16:23:42 +03:00
parent c82733b8e8
commit e1a288f6a4
6 changed files with 75 additions and 108 deletions

View File

@@ -97,7 +97,7 @@ control.IP_SETTING=${EMS_IP_SETTING:PUBLIC_IP}
control.EXECUTIONWARE=PROACTIVE
### URLs of Upperware services being invoked by EMS
control.esb-url = ${ESB_URL:https://mule:8088}
control.notification-url = ${NOTIFICATION_URL:https://mule:8088}
control.metasolver-configuration-url = ${METASOLVER_URL:http://metasolver:8092/updateConfiguration}
### Log settings
@@ -111,7 +111,7 @@ control.log-requests = ${EMS_LOG_REQUESTS:false}
#control.skip-baguette = true
#control.skip-collectors = true
#control.skip-metasolver = true
#control.skip-esb-notification = true
#control.skip-notification = true
control.upperware-grouping = GLOBAL
### Debug settings - Load/Save translation results

View File

@@ -100,7 +100,7 @@ control:
EXECUTIONWARE: PROACTIVE
### URLs of Upperware services being invoked by EMS
esb-url: ${ESB_URL:https://mule:8088}
notification-url: ${NOTIFICATION_URL:https://mule:8088}
metasolver-configuration-url: ${METASOLVER_URL:http://metasolver:8092/updateConfiguration}
### Log settings
@@ -114,7 +114,7 @@ control:
#skip-baguette: true
#skip-collectors: true
#skip-metasolver: true
#skip-esb-notification: true
#skip-notification: true
upperware-grouping: GLOBAL
### Debug settings - Load/Save translation results

View File

@@ -10,7 +10,6 @@
package gr.iccs.imu.ems.control.controller;
import com.google.gson.reflect.TypeToken;
import eu.melodic.models.interfaces.CamelModelRequestImpl;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -45,48 +44,21 @@ public class ControlServiceController {
private List<String> controllerEndpointsShort;
// ------------------------------------------------------------------------------------------------------------
// ESB and Upperware interfacing methods
// Application Model methods
// ------------------------------------------------------------------------------------------------------------
@RequestMapping(value = "/camelModel", method = POST)
public String newAppModel(@RequestBody CamelModelRequestImpl request,
@RequestHeader(name = HttpHeaders.AUTHORIZATION, required = false) String jwtToken)
{
log.debug("ControlServiceController.newAppModel(): Received request: {}", request);
log.trace("ControlServiceController.newAppModel(): JWT token: {}", jwtToken);
// Get information from request
String applicationId = request.getApplicationId();
String notificationUri = request.getNotificationURI();
String requestUuid = request.getWatermark().getUuid();
log.info("ControlServiceController.newAppModel(): Request info: app-id={}, notification-uri={}, request-id={}",
applicationId, notificationUri, requestUuid);
// Check parameters
if (StringUtils.isBlank(applicationId)) {
log.warn("ControlServiceController.newAppModel(): Request does not contain an application id");
throw new RestControllerException(400, "Request does not contain an application id");
}
// Start translation and reconfiguration in a worker thread
coordinator.processAppModel(applicationId, null, ControlServiceRequestInfo.create(notificationUri, requestUuid, jwtToken));
log.debug("ControlServiceController.newAppModel()/camelModel: Model translation dispatched to a worker thread");
return "OK";
}
@RequestMapping(value = "/appModelJson", method = POST,
@RequestMapping(value = { "/appModel", "/appModelJson" }, method = POST,
consumes = MediaType.APPLICATION_JSON_VALUE)
public String newAppModel(@RequestBody String requestStr,
@RequestHeader(name = HttpHeaders.AUTHORIZATION, required = false) String jwtToken)
{
log.debug("ControlServiceController.newAppModel(): Received request: {}", requestStr);
log.trace("ControlServiceController.newAppModel()/camelModelJson: JWT token: {}", jwtToken);
log.trace("ControlServiceController.newAppModel(): JWT token: {}", jwtToken);
// Use Gson to get model id's from request body (in JSON format)
com.google.gson.JsonObject jobj = new com.google.gson.Gson().fromJson(requestStr, com.google.gson.JsonObject.class);
String appModelId = Optional.ofNullable(jobj.get("app-model-id")).map(je -> stripQuotes(je.toString())).orElse(null);
String cpModelId = Optional.ofNullable(jobj.get("cp-model-id")).map(je -> stripQuotes(je.toString())).orElse(null);
com.google.gson.JsonObject jObj = new com.google.gson.Gson().fromJson(requestStr, com.google.gson.JsonObject.class);
String appModelId = Optional.ofNullable(jObj.get("app-model-id")).map(je -> stripQuotes(je.toString())).orElse(null);
String cpModelId = Optional.ofNullable(jObj.get("cp-model-id")).map(je -> stripQuotes(je.toString())).orElse(null);
log.info("ControlServiceController.newAppModel(): App model id from request: {}", appModelId);
log.info("ControlServiceController.newAppModel(): CP model id from request: {}", cpModelId);
@@ -105,7 +77,7 @@ public class ControlServiceController {
// ------------------------------------------------------------------------------------------------------------
@RequestMapping(value = "/cpModelJson", method = POST,
@RequestMapping(value = {"/cpModel", "/cpModelJson"}, method = POST,
consumes = MediaType.APPLICATION_JSON_VALUE)
public String newCpModel(@RequestBody String requestStr,
@RequestHeader(name = HttpHeaders.AUTHORIZATION, required = false) String jwtToken)

View File

@@ -24,17 +24,10 @@ import gr.iccs.imu.ems.control.util.TopicBeacon;
import gr.iccs.imu.ems.control.util.TranslationContextMonitorGsonDeserializer;
import gr.iccs.imu.ems.control.util.mvv.NoopMetricVariableValuesServiceImpl;
import gr.iccs.imu.ems.util.EventBus;
import eu.melodic.models.commons.NotificationResult;
import eu.melodic.models.commons.NotificationResultImpl;
import eu.melodic.models.commons.Watermark;
import eu.melodic.models.commons.WatermarkImpl;
import eu.melodic.models.services.CamelModelNotificationRequest;
import eu.melodic.models.services.CamelModelNotificationRequestImpl;
import gr.iccs.imu.ems.translate.NoopTranslator;
import gr.iccs.imu.ems.translate.TranslationContext;
import gr.iccs.imu.ems.translate.TranslationContextPrinter;
import gr.iccs.imu.ems.translate.Translator;
import gr.iccs.imu.ems.translate.dag.DAGNode;
import gr.iccs.imu.ems.translate.model.Monitor;
import gr.iccs.imu.ems.translate.model.Sink;
import gr.iccs.imu.ems.translate.mvv.MetricVariableValuesService;
@@ -58,6 +51,8 @@ import org.springframework.web.reactive.function.client.WebClient;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -194,7 +189,7 @@ public class ControlServiceCoordinator implements InitializingBean {
@Async
public void preloadModels() {
String preloadAppModel = properties.getPreload().getCamelModel();
String preloadAppModel = properties.getPreload().getAppModel();
String preloadCpModel = properties.getPreload().getCpModel();
if (StringUtils.isNotBlank(preloadAppModel)) {
log.info("===================================================================================================");
@@ -240,10 +235,10 @@ public class ControlServiceCoordinator implements InitializingBean {
if (!inUse.compareAndSet(false, true)) {
String mesg = "ControlServiceCoordinator."+caller+": ERROR: Coordinator is in use. Exits immediately";
log.warn(mesg);
if (!properties.isSkipEsbNotification()) {
if (!properties.isSkipNotification()) {
sendErrorNotification(appModelId, requestInfo, mesg, mesg);
} else {
log.warn("ControlServiceCoordinator."+caller+": Skipping ESB notification due to configuration");
log.warn("ControlServiceCoordinator.{}: Skipping notification due to configuration", caller);
}
return;
}
@@ -255,10 +250,10 @@ public class ControlServiceCoordinator implements InitializingBean {
String mesg = "ControlServiceCoordinator."+caller+": EXCEPTION: " + ex;
log.error(mesg, ex);
if (!properties.isSkipEsbNotification()) {
if (!properties.isSkipNotification()) {
sendErrorNotification(appModelId, requestInfo, mesg, mesg);
} else {
log.warn("ControlServiceCoordinator"+caller+": Skipping ESB notification due to configuration");
log.warn("ControlServiceCoordinator.{}: Skipping notification due to configuration", caller);
}
} finally {
// Release lock of this coordinator
@@ -350,11 +345,11 @@ public class ControlServiceCoordinator implements InitializingBean {
log.info("ControlServiceCoordinator._processAppModel(): Cache translation results: app-model-id={}", appModelId);
appModelToTcCache.put(_normalizeModelId(appModelId), _TC);
// Notify ESB, if 'notificationUri' is provided
if (!properties.isSkipEsbNotification()) {
notifyESB(appModelId, requestInfo, EMS_STATE.INITIALIZING);
// Notify others, if 'notificationUri' is provided
if (!properties.isSkipNotification()) {
notifyOthers(appModelId, requestInfo, EMS_STATE.INITIALIZING);
} else {
log.warn("ControlServiceCoordinator._processAppModel(): Skipping ESB notification due to configuration");
log.warn("ControlServiceCoordinator._processAppModel(): Skipping notification due to configuration");
}
this.currentTC = _TC;
@@ -376,7 +371,7 @@ public class ControlServiceCoordinator implements InitializingBean {
log.warn("ControlServiceCoordinator._processCpModel(): Skipping MVV retrieval due to configuration");
}
// Set MVV constants in Broker-CEP and Baguette Server, and then notify ESB
// Set MVV constants in Broker-CEP and Baguette Server, and then notify others
_setConstants(constants, requestInfo);
log.info("ControlServiceCoordinator._processCpModel(): END: cp-model-id={}", cpModelId);
@@ -407,11 +402,11 @@ public class ControlServiceCoordinator implements InitializingBean {
log.warn("ControlServiceCoordinator.setConstants(): Skipping Baguette Server setup due to configuration");
}
// Notify ESB, if 'notificationUri' is provided
if (!properties.isSkipEsbNotification()) {
notifyESB(null, requestInfo, EMS_STATE.RECONFIGURING);
// Notify others, if 'notificationUri' is provided
if (!properties.isSkipNotification()) {
notifyOthers(null, requestInfo, EMS_STATE.RECONFIGURING);
} else {
log.warn("ControlServiceCoordinator.setConstants(): Skipping ESB notification due to configuration");
log.warn("ControlServiceCoordinator.setConstants(): Skipping notification due to configuration");
}
log.info("ControlServiceCoordinator.setConstants(): END: constants={}", constants);
@@ -739,16 +734,16 @@ public class ControlServiceCoordinator implements InitializingBean {
}
}
private void notifyESB(String appModelId, ControlServiceRequestInfo requestInfo, @NonNull EMS_STATE emsState) {
private void notifyOthers(String appModelId, ControlServiceRequestInfo requestInfo, @NonNull EMS_STATE emsState) {
if (StringUtils.isNotBlank(requestInfo.getNotificationUri())) {
setCurrentEmsState(emsState, "Notifying ESB");
setCurrentEmsState(emsState, "Notifying others");
String notificationUri = requestInfo.getNotificationUri().trim();
log.debug("ControlServiceCoordinator.notifyESB(): Notifying ESB: {}", notificationUri);
log.debug("ControlServiceCoordinator.notifyOthers(): Notifying others: {}", notificationUri);
sendSuccessNotification(appModelId, requestInfo);
log.debug("ControlServiceCoordinator.notifyESB(): ESB notified: {}", notificationUri);
log.debug("ControlServiceCoordinator.notifyOthers(): Others notified: {}", notificationUri);
} else {
log.warn("ControlServiceCoordinator.notifyESB(): Notification URI is blank");
log.warn("ControlServiceCoordinator.notifyOthers(): Notification URI is blank");
}
}
@@ -774,15 +769,15 @@ public class ControlServiceCoordinator implements InitializingBean {
}
// ------------------------------------------------------------------------------------------------------------
// ESB notification methods
// Notification methods
// ------------------------------------------------------------------------------------------------------------
private void sendSuccessNotification(String applicationId, ControlServiceRequestInfo requestInfo) {
// Prepare success result notification
NotificationResultImpl result = new NotificationResultImpl();
result.setStatus(NotificationResult.StatusType.SUCCESS);
Map<String,String> result = new LinkedHashMap<>();
result.put("STATUS", "SUCCESS");
// Prepare and send CamelModelNotification
// Prepare and send Notification
try {
sendAppModelNotification(applicationId, result, requestInfo);
} catch (Exception ex) {
@@ -794,12 +789,12 @@ public class ControlServiceCoordinator implements InitializingBean {
String errorCode, String errorDescription)
{
// Prepare error result notification
NotificationResultImpl result = new NotificationResultImpl();
result.setStatus(NotificationResult.StatusType.ERROR);
result.setErrorCode(errorCode);
result.setErrorDescription(errorDescription);
Map<String,String> result = new LinkedHashMap<>();
result.put("STATUS", "ERROR");
result.put("ERROR-CODE", errorCode);
result.put("ERROR-DESCRIPTION", errorDescription);
// Prepare and send CamelModelNotification
// Prepare and send App Model notification
try {
sendAppModelNotification(applicationId, result, requestInfo);
} catch (Exception ex) {
@@ -807,56 +802,56 @@ public class ControlServiceCoordinator implements InitializingBean {
}
}
private void sendAppModelNotification(String applicationId, NotificationResult result, ControlServiceRequestInfo requestInfo) {
private void sendAppModelNotification(String applicationId, Map<String,String> result, ControlServiceRequestInfo requestInfo) {
// Create a new watermark
Watermark watermark = new WatermarkImpl();
watermark.setUser("EMS");
watermark.setSystem("EMS");
watermark.setDate(new java.util.Date());
Map<String,String> watermark = new LinkedHashMap<>();
watermark.put("USER", "EMS");
watermark.put("SYSTEM", "EMS");
watermark.put("DATE", DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(LocalDateTime.now()));
String uuid = Objects.requireNonNullElse( requestInfo.getRequestUuid(), UUID.randomUUID().toString().toLowerCase() );
watermark.setUuid(uuid);
watermark.put("UUID", uuid);
// Create a new CamelModelNotification
CamelModelNotificationRequest request = new CamelModelNotificationRequestImpl();
request.setApplicationId(applicationId);
request.setResult(result);
request.setWatermark(watermark);
// Create a new App Model notification
Map<String,Object> request = new LinkedHashMap<>();
request.put("application-id", applicationId);
request.put("result", request);
request.put("watermark", watermark);
// Send CamelModelNotification to ESB (Control Process)
// Send App Model notification
sendAppModelNotification(request, requestInfo);
}
private void sendAppModelNotification(CamelModelNotificationRequest notification, ControlServiceRequestInfo requestInfo) {
private void sendAppModelNotification(Map<String,Object> notification, ControlServiceRequestInfo requestInfo) {
String notificationUri = requestInfo.getNotificationUri();
String requestUuid = requestInfo.getRequestUuid();
String jwtToken = requestInfo.getJwtToken();
// Check if 'notificationUri' is blank
if (StringUtils.isBlank(notificationUri)) {
log.warn("ControlServiceCoordinator.sendAppModelNotification(): notificationUri not provided or is empty. No notification will be sent to ESB.");
log.warn("ControlServiceCoordinator.sendAppModelNotification(): notificationUri not provided or is empty. No notification will be sent.");
return;
}
notificationUri = notificationUri.trim();
// Get ESB url from control-service configuration
String esbUrl = properties.getEsbUrl();
if (StringUtils.isBlank(esbUrl)) {
log.warn("ControlServiceCoordinator.sendAppModelNotification(): esb-url property is empty. No notification will be sent to ESB.");
// Get Notification URL from control-service configuration
String notificationUrl = properties.getNotificationUrl();
if (StringUtils.isBlank(notificationUrl)) {
log.warn("ControlServiceCoordinator.sendAppModelNotification(): notification-url property is empty. No notification will be sent.");
return;
}
esbUrl = esbUrl.trim();
notificationUrl = notificationUrl.trim();
// Fixing ESB URL parts
if (esbUrl.endsWith("/")) {
esbUrl = esbUrl.substring(0, esbUrl.length() - 1);
// Fixing Notification URL parts
if (notificationUrl.endsWith("/")) {
notificationUrl = notificationUrl.substring(0, notificationUrl.length() - 1);
}
if (notificationUri.startsWith("/")) {
notificationUri = notificationUri.substring(1);
}
// Call ESB endpoint
String url = esbUrl + "/" + notificationUri;
log.info("ControlServiceCoordinator.sendAppModelNotification(): Invoking ESB endpoint: {}", url);
// Call Notification URL endpoint
String url = notificationUrl + "/" + notificationUri;
log.info("ControlServiceCoordinator.sendAppModelNotification(): Invoking Notification endpoint: {}", url);
log.trace("ControlServiceCoordinator.sendAppModelNotification(): JWT token: {}", jwtToken);
ResponseEntity<String> response;
@@ -875,11 +870,11 @@ public class ControlServiceCoordinator implements InitializingBean {
if (response!=null) {
String responseStatus = response.getStatusCode().toString();
if (response.getStatusCode().is2xxSuccessful())
log.info("ControlServiceCoordinator.sendAppModelNotification(): ESB endpoint invoked: {}, status={}, message={}", url, responseStatus, response.getBody());
log.info("ControlServiceCoordinator.sendAppModelNotification(): Notification endpoint invoked: {}, status={}, message={}", url, responseStatus, response.getBody());
else
log.info("ControlServiceCoordinator.sendAppModelNotification(): ESB endpoint invoked: {}, status={}, message={}", url, responseStatus, response.getBody());
log.info("ControlServiceCoordinator.sendAppModelNotification(): Notification endpoint invoked: {}, status={}, message={}", url, responseStatus, response.getBody());
} else {
log.warn("ControlServiceCoordinator.sendAppModelNotification(): ESB endpoint invoked: {}, response is NULL", url);
log.warn("ControlServiceCoordinator.sendAppModelNotification(): Notification endpoint invoked: {}, response is NULL", url);
}
}

View File

@@ -163,13 +163,13 @@ public class EmsInfoServiceImpl implements IEmsInfoService {
if (controlServiceProperties!=null && infoServiceProperties!=null) {
controlServiceInfo.put("prop-ip-setting", controlServiceProperties.getIpSetting());
controlServiceInfo.put("prop-executionware", controlServiceProperties.getExecutionware().toString());
controlServiceInfo.put("prop-esb-url", controlServiceProperties.getEsbUrl());
controlServiceInfo.put("prop-notification-url", controlServiceProperties.getNotificationUrl());
controlServiceInfo.put("prop-metasolver-config-url", controlServiceProperties.getMetasolverConfigurationUrl());
controlServiceInfo.put("prop-metrics-update-interval", infoServiceProperties.getMetricsUpdateInterval());
controlServiceInfo.put("prop-metrics-client-update-interval", infoServiceProperties.getMetricsClientUpdateInterval());
controlServiceInfo.put("prop-metrics-stream-event-name", infoServiceProperties.getMetricsStreamEventName());
controlServiceInfo.put("prop-metrics-stream-update-interval", infoServiceProperties.getMetricsStreamUpdateInterval());
controlServiceInfo.put("prop-preload-app-model", controlServiceProperties.getPreload().getCamelModel());
controlServiceInfo.put("prop-preload-app-model", controlServiceProperties.getPreload().getAppModel());
controlServiceInfo.put("prop-preload-cp-model", controlServiceProperties.getPreload().getCpModel());
controlServiceInfo.put("prop-upperware-grouping", controlServiceProperties.getUpperwareGrouping());
controlServiceInfo.put("prop-tc-load-file", controlServiceProperties.getTcLoadFile());
@@ -183,7 +183,7 @@ public class EmsInfoServiceImpl implements IEmsInfoService {
debugFlags.put("skip-baguette-server-init", controlServiceProperties.isSkipBaguette());
debugFlags.put("skip-mvv-retrieve", controlServiceProperties.isSkipMvvRetrieve());
debugFlags.put("skip-metasolver-configuration", controlServiceProperties.isSkipMetasolver());
debugFlags.put("skip-esb-notification", controlServiceProperties.isSkipEsbNotification());
debugFlags.put("skip-notification", controlServiceProperties.isSkipNotification());
controlServiceInfo.put("prop-debug-flags",debugFlags);
}
if (staticResourceProperties!=null) {

View File

@@ -50,7 +50,7 @@ public class ControlServiceProperties {
private String upperwareGrouping;
private String metasolverConfigurationUrl;
private String esbUrl;
private String notificationUrl;
private Preload preload = new Preload();
@@ -60,7 +60,7 @@ public class ControlServiceProperties {
private boolean skipBaguette;
private boolean skipCollectors;
private boolean skipMetasolver;
private boolean skipEsbNotification;
private boolean skipNotification;
private String tcLoadFile;
private String tcSaveFile;
@@ -76,7 +76,7 @@ public class ControlServiceProperties {
@Data
public static class Preload {
private String camelModel;
private String appModel;
private String cpModel;
}
}