Add lombok dependency
Replace all explicitly-created loggers with @Slf4j annotations Replace explicit getter methods and public fields with @Getter annotations Change-Id: Ibbe0a021e2b37daff0b601933eb07ceb88ccc52b
This commit is contained in:
@@ -16,6 +16,8 @@ plugins {
|
||||
// add jshell support (https://github.com/mrsarm/jshell-plugin):
|
||||
// rlwrap ./gradlew --console plain jshell
|
||||
id "com.github.mrsarm.jshell.plugin" version "1.2.1"
|
||||
// https://docs.freefair.io/gradle-plugins/8.4/reference/#_lombok
|
||||
id "io.freefair.lombok" version "8.4"
|
||||
}
|
||||
|
||||
repositories {
|
||||
|
||||
@@ -7,10 +7,10 @@ import eu.nebulouscloud.exn.core.Handler;
|
||||
import eu.nebulouscloud.exn.core.Publisher;
|
||||
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
|
||||
import eu.nebulouscloud.exn.settings.StaticExnConfig;
|
||||
import org.apache.qpid.protonj2.client.Message;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.qpid.protonj2.client.Message;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
@@ -27,10 +27,9 @@ import java.util.concurrent.CountDownLatch;
|
||||
* `Consumer` objects created in {@link ExnConnector#ExnConnector} receive
|
||||
* incoming messages and react to them, sending out messages in turn.
|
||||
*/
|
||||
@Slf4j
|
||||
public class ExnConnector {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ExnConnector.class);
|
||||
|
||||
/** The Connector used to talk with ActiveMQ */
|
||||
private final Connector conn;
|
||||
/** if non-null, signals after the connector is stopped */
|
||||
@@ -43,9 +42,15 @@ public class ExnConnector {
|
||||
/** The topic where we send AMPL messages */
|
||||
// 1 object with key: filename, value: AMPL file (serialized)
|
||||
public static final String ampl_message_channel = "eu.nebulouscloud.optimiser.ampl";
|
||||
/** Message producer for sending AMPL files, shared between all
|
||||
* NebulousApp instances. */
|
||||
private final Publisher ampl_message_publisher;
|
||||
|
||||
/**
|
||||
* The Message producer for sending AMPL files, shared between all
|
||||
* NebulousApp instances.
|
||||
*
|
||||
* @return the publisher configured to send AMPL files to the solver.
|
||||
*/
|
||||
@Getter
|
||||
private final Publisher amplMessagePublisher;
|
||||
|
||||
/**
|
||||
* Create a connection to ActiveMQ via the exn middleware, and set up the
|
||||
@@ -60,22 +65,18 @@ public class ExnConnector {
|
||||
* Connector#start} method has connected and set up all handlers.
|
||||
*/
|
||||
public ExnConnector(String host, int port, String name, String password, ConnectorHandler callback) {
|
||||
ampl_message_publisher = new Publisher("controller_ampl", ampl_message_channel, true, true);
|
||||
amplMessagePublisher = new Publisher("controller_ampl", ampl_message_channel, true, true);
|
||||
|
||||
conn = new Connector("optimiser_controller",
|
||||
callback,
|
||||
// List.of(new Publisher("config", "config", true)),
|
||||
List.of(ampl_message_publisher),
|
||||
List.of(amplMessagePublisher),
|
||||
List.of(new Consumer("ui_app_messages", app_creation_channel, new AppCreationMessageHandler(), true, true)),
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(host, port, name, password, 15, "eu.nebulouscloud"));
|
||||
}
|
||||
|
||||
public Publisher getAmplPublisher() {
|
||||
return ampl_message_publisher;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to ActiveMQ and activate all publishers and consumers. It is
|
||||
* an error to start the controller more than once.
|
||||
@@ -121,7 +122,7 @@ public class ExnConnector {
|
||||
try {
|
||||
String app_id = message.subject();
|
||||
log.info("App creation message received for app {}", app_id);
|
||||
NebulousApp app = NebulousApp.newFromAppMessage(mapper.valueToTree(body), ampl_message_publisher);
|
||||
NebulousApp app = NebulousApp.newFromAppMessage(mapper.valueToTree(body), amplMessagePublisher);
|
||||
NebulousApps.add(app);
|
||||
app.sendAMPL();
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -6,28 +6,23 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine.Command;
|
||||
import picocli.CommandLine.Parameters;
|
||||
import picocli.CommandLine.ParentCommand;
|
||||
|
||||
@Slf4j
|
||||
@Command(name = "local",
|
||||
aliases = {"l"},
|
||||
description = "Handle a single app creation message from the command line, printing its AMPL. If an ActiveMQ connection is specified, additionally send a message to the solver.",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
mixinStandardHelpOptions = true)
|
||||
public class LocalExecution implements Callable<Integer> {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(LocalExecution.class);
|
||||
|
||||
/**
|
||||
* Reference to Main, to access activemq_connector, sal_connector etc.
|
||||
*/
|
||||
/** Reference to Main set up by PicoCLI. This lets us ask for the SAL and
|
||||
* ActiveMQ connectors. */
|
||||
@ParentCommand
|
||||
private Main main;
|
||||
|
||||
@@ -44,7 +39,7 @@ public class LocalExecution implements Callable<Integer> {
|
||||
return 1;
|
||||
}
|
||||
NebulousApp app = NebulousApp.newFromAppMessage(msg,
|
||||
main.activemq_connector == null ? null : main.activemq_connector.getAmplPublisher());
|
||||
main.getActiveMQConnector() == null ? null : main.getActiveMQConnector().getAmplMessagePublisher());
|
||||
System.out.println(app.generateAMPL());
|
||||
|
||||
return 0;
|
||||
|
||||
@@ -1,20 +1,13 @@
|
||||
package eu.nebulouscloud.optimiser.controller;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import eu.nebulouscloud.exn.core.Context;
|
||||
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import picocli.CommandLine;
|
||||
import picocli.CommandLine.ParseResult;
|
||||
@@ -26,6 +19,7 @@ import static picocli.CommandLine.Option;
|
||||
/**
|
||||
* The main class of the optimizer controller.
|
||||
*/
|
||||
@Slf4j
|
||||
@Command(name = "nebulous-optimizer-controller",
|
||||
version = "0.1", // TODO read this from Bundle-Version in the jar MANIFEST.MF
|
||||
mixinStandardHelpOptions = true,
|
||||
@@ -81,8 +75,6 @@ public class Main implements Callable<Integer> {
|
||||
defaultValue = "${ACTIVEMQ_PASSWORD}")
|
||||
private String activemq_password;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(Main.class);
|
||||
|
||||
@Option(names = {"--verbose", "-v"},
|
||||
description = "Turn on more verbose logging output.",
|
||||
scope = ScopeType.INHERIT)
|
||||
@@ -96,10 +88,20 @@ public class Main implements Callable<Integer> {
|
||||
}
|
||||
}
|
||||
|
||||
/** Reference to SAL connector, used by Main and subcommands. */
|
||||
public SalConnector sal_connector = null;
|
||||
/** Reference to ActiveMQ connector, used by Main and subcommands. */
|
||||
public ExnConnector activemq_connector = null;
|
||||
/**
|
||||
* The connector to the SAL library.
|
||||
*
|
||||
* @return the SAL connector, or null if running offline.
|
||||
*/
|
||||
@Getter
|
||||
private SalConnector salConnector = null;
|
||||
/**
|
||||
* The ActiveMQ connector.
|
||||
*
|
||||
* @return the ActiveMQ connector wrapper, or null if running offline.
|
||||
*/
|
||||
@Getter
|
||||
private ExnConnector activeMQConnector = null;
|
||||
|
||||
/**
|
||||
* PicoCLI execution strategy that uses common initialization.
|
||||
@@ -116,13 +118,13 @@ public class Main implements Callable<Integer> {
|
||||
log.info("Beginning common startup of optimiser-controller");
|
||||
|
||||
if (sal_uri != null && sal_user != null && sal_password != null) {
|
||||
sal_connector = new SalConnector(sal_uri, sal_user, sal_password);
|
||||
if (!sal_connector.isConnected()) {
|
||||
salConnector = new SalConnector(sal_uri, sal_user, sal_password);
|
||||
if (!salConnector.isConnected()) {
|
||||
log.error("Connection to SAL unsuccessful");
|
||||
} else {
|
||||
log.info("Established connection to SAL");
|
||||
// FIXME: remove this once we have the exn connector
|
||||
NebulousApp.sal_connector = sal_connector;
|
||||
NebulousApp.setSalConnector(salConnector);
|
||||
}
|
||||
} else {
|
||||
log.info("SAL login information not specified, skipping");
|
||||
@@ -131,7 +133,7 @@ public class Main implements Callable<Integer> {
|
||||
if (activemq_user != null && activemq_password != null) {
|
||||
log.info("Preparing ActiveMQ connection: host={} port={}",
|
||||
activemq_host, activemq_port);
|
||||
activemq_connector
|
||||
activeMQConnector
|
||||
= new ExnConnector(activemq_host, activemq_port,
|
||||
activemq_user, activemq_password,
|
||||
new ConnectorHandler() {
|
||||
@@ -153,9 +155,9 @@ public class Main implements Callable<Integer> {
|
||||
@Override
|
||||
public Integer call() {
|
||||
CountDownLatch exn_synchronizer = new CountDownLatch(1);
|
||||
if (activemq_connector != null) {
|
||||
if (activeMQConnector != null) {
|
||||
log.info("Starting connection to ActiveMQ");
|
||||
activemq_connector.start(exn_synchronizer);
|
||||
activeMQConnector.start(exn_synchronizer);
|
||||
} else {
|
||||
log.error("ActiveMQ connector not initialized so we're unresponsive. Will keep running to keep CI/CD happy but don't expect anything more from me.");
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package eu.nebulouscloud.optimiser.controller;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonPointer;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
@@ -9,6 +8,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
|
||||
|
||||
import eu.nebulouscloud.exn.core.Publisher;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
@@ -35,14 +37,12 @@ import org.ow2.proactive.sal.model.NodeCandidate;
|
||||
import org.ow2.proactive.sal.model.Requirement;
|
||||
import org.ow2.proactive.sal.model.RequirementOperator;
|
||||
import org.ow2.proactive.sal.model.TaskDefinition;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Internal representation of a NebulOus app.
|
||||
*/
|
||||
@Slf4j
|
||||
public class NebulousApp {
|
||||
private static final Logger log = LoggerFactory.getLogger(NebulousApp.class);
|
||||
|
||||
/** Location of the kubevela yaml file in the app creation message (String) */
|
||||
private static final JsonPointer kubevela_path = JsonPointer.compile("/kubevela/original");
|
||||
@@ -62,16 +62,31 @@ public class NebulousApp {
|
||||
/** General-purpose object mapper */
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
// FIXME: we use this until we talk to SAL via the exn middleware.
|
||||
public static SalConnector sal_connector;
|
||||
/**
|
||||
* The active SAL connector, or null if we operate offline.
|
||||
*
|
||||
* NOTE: this might only be used until we switch to the exn-sal
|
||||
* middleware, or maybe we keep the SalConnector class and send to exn
|
||||
* from there.
|
||||
*
|
||||
* @param salConnector the SAL connector.
|
||||
*/
|
||||
@Setter
|
||||
private static SalConnector salConnector;
|
||||
|
||||
private static final List<Requirement> controller_requirements
|
||||
= List.of(
|
||||
new AttributeRequirement("hardware", "memory", RequirementOperator.GEQ, "2048"),
|
||||
new AttributeRequirement("hardware", "cpu", RequirementOperator.GEQ, "2"));
|
||||
|
||||
/** The app UUID; used as SAL job id as well */
|
||||
private String app_uuid;
|
||||
/**
|
||||
* The UUID of the app. This is the UUID that identifies a specific
|
||||
* application's ActiveMQ messages.
|
||||
*
|
||||
* @return the UUID of the app
|
||||
*/
|
||||
@Getter
|
||||
private String UUID;
|
||||
/** The app name; used as SAL job name as well */
|
||||
private String app_name;
|
||||
private JsonNode original_app_message;
|
||||
@@ -145,7 +160,7 @@ public class NebulousApp {
|
||||
// What's left is neither a raw nor composite metric.
|
||||
performance_indicators.put(m.get("key").asText(), m);
|
||||
}
|
||||
this.app_uuid = app_message.at(uuid_path).textValue();
|
||||
this.UUID = app_message.at(uuid_path).textValue();
|
||||
this.app_name = app_message.at(name_path).textValue();
|
||||
}
|
||||
|
||||
@@ -175,16 +190,6 @@ public class NebulousApp {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The UUID of the app. This is the UUID that identifies a specific
|
||||
* application's ActiveMQ messages.
|
||||
*
|
||||
* @return the UUID of the app
|
||||
*/
|
||||
public String getUUID() {
|
||||
return app_uuid;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set "deployed" status. Will typically be set to true once, and then
|
||||
* never to false again.
|
||||
@@ -523,7 +528,7 @@ public class NebulousApp {
|
||||
* Start application with default (not rewritten) KubeVela.
|
||||
*/
|
||||
public void startApplication() {
|
||||
log.info("Starting application {} with original KubeVela", app_uuid);
|
||||
log.info("Starting application {} with original KubeVela", UUID);
|
||||
startApplication(original_kubevela);
|
||||
}
|
||||
|
||||
@@ -532,8 +537,8 @@ public class NebulousApp {
|
||||
* nodes and submit KubeVela.
|
||||
*/
|
||||
public void startApplication(JsonNode kubevela) {
|
||||
log.info("Starting application {} with KubeVela", app_uuid);
|
||||
if (sal_connector == null) {
|
||||
log.info("Starting application {} with KubeVela", UUID);
|
||||
if (salConnector == null) {
|
||||
log.error("Tried to submit job, but do not have a connection to SAL");
|
||||
return;
|
||||
}
|
||||
@@ -552,7 +557,7 @@ public class NebulousApp {
|
||||
// ------------------------------------------------------------
|
||||
// 1. Create SAL job
|
||||
log.info("Creating job info");
|
||||
JobInformation jobinfo = new JobInformation(app_uuid, app_name);
|
||||
JobInformation jobinfo = new JobInformation(UUID, app_name);
|
||||
// TODO: figure out what ports to specify here
|
||||
List<Communication> communications = List.of();
|
||||
// This task is deployed on the controller node (the one not specified
|
||||
@@ -573,11 +578,11 @@ public class NebulousApp {
|
||||
"nebulous-worker", nebulous_worker_init, List.of());
|
||||
List<TaskDefinition> tasks = List.of(nebulous_controller_task, nebulous_worker_task);
|
||||
JobDefinition job = new JobDefinition(communications, jobinfo, tasks);
|
||||
Boolean success = sal_connector.createJob(job);
|
||||
Boolean success = salConnector.createJob(job);
|
||||
if (!success) {
|
||||
// This can happen if the job has already been submitted
|
||||
log.error("Error trying to create the job; SAL createJob returned {}", success);
|
||||
log.info("Check if a job with id {} already exists, run stopJobs if yes", app_uuid);
|
||||
log.info("Check if a job with id {} already exists, run stopJobs if yes", UUID);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -588,7 +593,7 @@ public class NebulousApp {
|
||||
// ------------------------------------------------------------
|
||||
// 3. Create coordinator node
|
||||
log.info("Creating app coordinator node");
|
||||
List<NodeCandidate> controller_candidates = sal_connector.findNodeCandidates(controller_requirements);
|
||||
List<NodeCandidate> controller_candidates = salConnector.findNodeCandidates(controller_requirements);
|
||||
if (controller_candidates.isEmpty()) {
|
||||
log.error("Could not find node candidates for controller node; requirements: {}", controller_requirements);
|
||||
return;
|
||||
@@ -598,7 +603,7 @@ public class NebulousApp {
|
||||
IaasDefinition controller_def = new IaasDefinition(
|
||||
"nebulous-controller-node", "nebulous-controller",
|
||||
controller_candidate.getId(), controller_candidate.getCloud().getId());
|
||||
success = sal_connector.addNodes(List.of(controller_def), app_uuid);
|
||||
success = salConnector.addNodes(List.of(controller_def), UUID);
|
||||
if (!success) {
|
||||
log.error("Failed to add controller node: {}", controller_candidate);
|
||||
return;
|
||||
@@ -607,10 +612,10 @@ public class NebulousApp {
|
||||
// ------------------------------------------------------------
|
||||
// 4. Submit job
|
||||
log.info("Starting job");
|
||||
String return_job_id = sal_connector.submitJob(app_uuid);
|
||||
String return_job_id = salConnector.submitJob(UUID);
|
||||
if (return_job_id.equals("-1")) {
|
||||
log.error("Failed to add start job {}, SAL returned {}",
|
||||
app_uuid, return_job_id);
|
||||
UUID, return_job_id);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -623,7 +628,7 @@ public class NebulousApp {
|
||||
// 6. Create worker nodes from requirements
|
||||
log.info("Starting worker nodes");
|
||||
for (Map.Entry<String, List<Requirement>> e : requirements.entrySet()) {
|
||||
List<NodeCandidate> candidates = sal_connector.findNodeCandidates(e.getValue());
|
||||
List<NodeCandidate> candidates = salConnector.findNodeCandidates(e.getValue());
|
||||
if (candidates.isEmpty()) {
|
||||
log.error("Could not find node candidates for requirements: {}", e.getValue());
|
||||
return;
|
||||
@@ -633,7 +638,7 @@ public class NebulousApp {
|
||||
e.getKey(), "nebulous-worker", candidate.getId(), candidate.getCloud().getId()
|
||||
);
|
||||
// TODO: can we collect all nodes app-wide and submit them at once?
|
||||
success = sal_connector.addNodes(List.of(def), app_uuid);
|
||||
success = salConnector.addNodes(List.of(def), UUID);
|
||||
if (!success) {
|
||||
log.error("Failed to add node: {}", candidate);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package eu.nebulouscloud.optimiser.controller;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
@@ -10,8 +9,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
/**
|
||||
* Class that manages a collection of NebulousApp instances.
|
||||
*/
|
||||
@Slf4j
|
||||
public class NebulousApps {
|
||||
private static final Logger log = LoggerFactory.getLogger(NebulousApps.class);
|
||||
|
||||
/** The global app registry. */
|
||||
// (Putting this here until we find a better place.)
|
||||
|
||||
@@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.ow2.proactive.sal.model.IaasDefinition;
|
||||
import org.ow2.proactive.sal.model.Job;
|
||||
@@ -15,8 +16,6 @@ import org.ow2.proactive.sal.model.JobDefinition;
|
||||
import org.ow2.proactive.sal.model.NodeCandidate;
|
||||
import org.ow2.proactive.sal.model.PACloud;
|
||||
import org.ow2.proactive.sal.model.Requirement;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.MediaType;
|
||||
import reactor.core.publisher.Mono;
|
||||
@@ -41,10 +40,9 @@ import java.util.List;
|
||||
* Documentation of the SAL REST API is here:
|
||||
* https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1
|
||||
*/
|
||||
@Slf4j
|
||||
public class SalConnector {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SalConnector.class);
|
||||
|
||||
private static final String connectStr = "sal/pagateway/connect";
|
||||
private static final String getAllCloudsStr = "sal/cloud";
|
||||
private static final String findNodeCandidatesStr = "sal/nodecandidates";
|
||||
|
||||
Reference in New Issue
Block a user