Add our node affinities to kubevela file

Change-Id: I9a920af9b5b72989f4e97720eaf8ddcf0dc11ddc
This commit is contained in:
Rudi Schlatte
2024-02-02 13:47:54 +01:00
parent 3b6ba806d2
commit 57fd7e967e
3 changed files with 136 additions and 63 deletions

View File

@@ -18,10 +18,8 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Spliterator;
@@ -29,13 +27,6 @@ import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.ow2.proactive.sal.model.AttributeRequirement;
import org.ow2.proactive.sal.model.NodeType;
import org.ow2.proactive.sal.model.NodeTypeRequirement;
import org.ow2.proactive.sal.model.OperatingSystemFamily;
import org.ow2.proactive.sal.model.Requirement;
import org.ow2.proactive.sal.model.RequirementOperator;
/**
* Internal representation of a NebulOus app.
*/
@@ -74,23 +65,6 @@ public class NebulousApp {
@Setter @Getter
private static SalConnector salConnector;
/**
* The requirements of the node running the NebulOuS controller. This
* machine runs the Kubernetes cluster and KubeVela.
*/
public static List<Requirement> getControllerRequirements(String jobID) {
return List.of(
new NodeTypeRequirement(List.of(NodeType.IAAS), jobID, jobID),
// TODO: untested; we rely on the fact that SAL has an abstraction
// over operating systems. See
// https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/sal-common/src/main/java/org/ow2/proactive/sal/model/OperatingSystemFamily.java#L39
// and
// https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/sal-service/src/main/java/org/ow2/proactive/sal/service/nc/NodeCandidateUtils.java#L159
new AttributeRequirement("image", "operatingSystem.family",
RequirementOperator.IN, OperatingSystemFamily.UBUNTU.toString()),
new AttributeRequirement("hardware", "memory", RequirementOperator.GEQ, "2048"),
new AttributeRequirement("hardware", "cpu", RequirementOperator.GEQ, "2"));
}
/**
* The UUID of the app. This is the UUID that identifies a specific
* application's ActiveMQ messages.
@@ -316,7 +290,7 @@ public class NebulousApp {
* @return the modified KubeVela YAML, deserialized into a string, or
* null if no KubeVela could be generated.
*/
public ObjectNode rewriteKubevela(ObjectNode variable_values) {
public ObjectNode rewriteKubevelaWithSolution(ObjectNode variable_values) {
ObjectNode fresh_kubevela = original_kubevela.deepCopy();
for (Map.Entry<String, JsonNode> entry : variable_values.properties()) {
// look up the prepared path in the variable |-> location map
@@ -397,7 +371,7 @@ public class NebulousApp {
return;
}
ObjectNode variables = solution.withObjectProperty("VariableValues");
ObjectNode kubevela = rewriteKubevela(variables);
ObjectNode kubevela = rewriteKubevelaWithSolution(variables);
if (isDeployed()) {
// We assume that killing a node will confuse the application's
// Kubernetes cluster, therefore:
@@ -406,13 +380,13 @@ public class NebulousApp {
// scripts
// 3. Send updated KubeVela for redeployment
// 4. Shut down superfluous nodes
NebulousAppDeployer.redeployApplication(this, kubevela);
} else {
// 1. Calculate node sets, including Nebulous controller node
// 2. Tell SAL to start all nodes, passing in the deployment
// scripts
// 3. Send KubeVela file for deployment
NebulousAppDeployer.startApplication(kubevela, UUID, name);
NebulousAppDeployer.deployApplication(kubevela, UUID, name);
}
}

View File

@@ -1,6 +1,7 @@
package eu.nebulouscloud.optimiser.controller;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -12,6 +13,8 @@ import org.ow2.proactive.sal.model.IaasDefinition;
import org.ow2.proactive.sal.model.JobDefinition;
import org.ow2.proactive.sal.model.JobInformation;
import org.ow2.proactive.sal.model.NodeCandidate;
import org.ow2.proactive.sal.model.NodeType;
import org.ow2.proactive.sal.model.NodeTypeRequirement;
import org.ow2.proactive.sal.model.OperatingSystemFamily;
import org.ow2.proactive.sal.model.Requirement;
import org.ow2.proactive.sal.model.RequirementOperator;
@@ -19,7 +22,9 @@ import org.ow2.proactive.sal.model.TaskDefinition;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
@@ -30,6 +35,39 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NebulousAppDeployer {
// TODO: find out the commands to initialize the controller
/**
* The installation scripts to send to SAL for the NebulOuS controller
* node.
*/
@Getter
private static CommandsInstallation controllerInstallation = new CommandsInstallation();
// TODO: find out the commands to initialize the workers
/**
* The installation scripts to send to SAL for a NebulOuS worker node.
*/
@Getter
private static CommandsInstallation nodeInstallation = new CommandsInstallation();
/**
* The requirements of the node running the NebulOuS controller. This
* machine runs the Kubernetes cluster and KubeVela.
*/
public static List<Requirement> getControllerRequirements(String jobID) {
return List.of(
new NodeTypeRequirement(List.of(NodeType.IAAS), jobID, jobID),
// TODO: untested; we rely on the fact that SAL has an abstraction
// over operating systems. See
// https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/sal-common/src/main/java/org/ow2/proactive/sal/model/OperatingSystemFamily.java#L39
// and
// https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/sal-service/src/main/java/org/ow2/proactive/sal/service/nc/NodeCandidateUtils.java#L159
new AttributeRequirement("image", "operatingSystem.family",
RequirementOperator.IN, OperatingSystemFamily.UBUNTU.toString()),
new AttributeRequirement("hardware", "memory", RequirementOperator.GEQ, "2048"),
new AttributeRequirement("hardware", "cpu", RequirementOperator.GEQ, "2"));
}
/**
* Given a KubeVela file, extract how many nodes to deploy for each
* component.
@@ -155,6 +193,45 @@ public class NebulousAppDeployer {
return result;
}
/**
* Add affinities trait to all components.
*
* TODO: we need to find out which key to use to get the node labels, as
* assigned by the SAL `addNodes` endpoint.
*
* #+begin_src yaml
* traits:
* - type: affinity
* properties:
* nodeAffinity:
* required:
* nodeSelectorTerms:
* - matchExpressions:
* - key: label
* operator: In
* values: ["machinelabel"]
* #+end_src
*
* @param kubevela the KubeVela specification to modify. This parameter is
* not modified.
* @return a fresh KubeVela specification with added nodeAffinity traits.
*/
public static JsonNode addNodeAffinities(JsonNode kubevela) {
JsonNode result = kubevela.deepCopy();
for (final JsonNode c : result.withArray("/spec/components")) {
ArrayNode traits = c.withArray("traits");
ObjectNode trait = traits.addObject();
trait.put("type", "affinity");
ArrayNode nodeSelectorTerms = trait.withArray("/properties/nodeAffinity/required/nodeSelectorTerms");
ArrayNode matchExpressions = nodeSelectorTerms.addObject().withArray("matchExpressions");
ObjectNode term = matchExpressions.addObject();
term.put("key", "label")
.put("operator", "In");
term.withArray("values").add(c.get("name").asText());
}
return result;
}
/**
* Given a KubeVela file, extract node requirements, create the job, start
* its nodes and submit KubeVela.
@@ -163,46 +240,48 @@ public class NebulousAppDeployer {
* @param appUUID the application UUID.
* @param appName the application name.
*/
public static void startApplication(JsonNode kubevela, String appUUID, String appName) {
log.info("Starting application {} with KubeVela", appUUID);
public static void deployApplication(JsonNode kubevela, String appUUID, String appName) {
log.info("Starting initial deployment of {}", appUUID);
if (NebulousApp.getSalConnector() == null) {
log.warn("Tried to submit job, but do not have a connection to SAL");
return;
}
// The overall flow:
// 1. Create a SAL job, with the uuid and name of the NebulOuS app
// 2. Extract node requirements from the KubeVela definition
// 3. Create a coordinator node; this will run the Kubernetes
// controller. This node is in addition to the nodes required by
// KubeVela.
//
// 1. Extract node requirements and node counts from the KubeVela
// definition.
// 2. Create a SAL job, with the uuid and name of the NebulOuS app
// 3. Create a coordinator node with hardcoded requirements; this node
// will run the Kubernetes controller. This node is in addition to
// the nodes required by KubeVela.
// 4. Submit the job, thereby starting the coordinator node
// 5. Extract information (IP address, ...) from the coordinator node
// 6. Add the worker nodes, as specified by KubeVela, to the job
// 7. Rewrite the KubeVela file to add node affinities, etc.
// 6. Add the worker nodes to the job
// 7. Rewrite the KubeVela file to add node affinities to each
// component
// 8. Send the KubeVela file to the coordinator node
// ------------------------------------------------------------
// 1. Create SAL job
log.debug("Creating job info");
// 1. Extract node requirements
Map<String, List<Requirement>> requirements = getSalRequirementsFromKubevela(kubevela);
Map<String, Integer> nodeCounts = getNodeCountFromKubevela(kubevela);
// ------------------------------------------------------------
// 2. Create SAL job
log.debug("Creating job info for {}", appUUID);
JobInformation jobinfo = new JobInformation(appUUID, appName);
// TODO: figure out what ports to specify here
List<Communication> communications = List.of();
// This task is deployed on the controller node (the one not specified
// in the app KubeVela file)
// TODO: find out the commands to initialize the controller
// TODO: specify ubuntu in CommandsInstallation operatingSystem
// argument (class OperatingSystemType)
CommandsInstallation nebulous_controller_init = new CommandsInstallation();
TaskDefinition nebulous_controller_task = new TaskDefinition(
"nebulous-controller", nebulous_controller_init, List.of());
"nebulous-controller", controllerInstallation, List.of());
// This task is deployed on all worker nodes (the ones specified by
// the app KubeVela file and optimized by NebulOuS)
// TODO: find out the commands to initialize the workers
// TODO: find out how to modify `nebulous_worker_task` to pass in
// TODO: find out if/how to modify `nebulous_worker_task` to pass in
// information about the controller
CommandsInstallation nebulous_worker_init = new CommandsInstallation();
TaskDefinition nebulous_worker_task = new TaskDefinition(
"nebulous-worker", nebulous_worker_init, List.of());
"nebulous-worker", nodeInstallation, List.of());
List<TaskDefinition> tasks = List.of(nebulous_controller_task, nebulous_worker_task);
JobDefinition job = new JobDefinition(communications, jobinfo, tasks);
Boolean success = NebulousApp.getSalConnector().createJob(job);
@@ -213,18 +292,14 @@ public class NebulousAppDeployer {
return;
}
// ------------------------------------------------------------
// 2. Extract node requirements
Map<String, List<Requirement>> requirements = getSalRequirementsFromKubevela(kubevela);
// ------------------------------------------------------------
// 3. Create coordinator node
log.debug("Creating app coordinator node");
log.debug("Creating app coordinator node for {}", appUUID);
List<NodeCandidate> controller_candidates
= NebulousApp.getSalConnector().findNodeCandidates(NebulousApp.getControllerRequirements(appUUID));
= NebulousApp.getSalConnector().findNodeCandidates(getControllerRequirements(appUUID));
if (controller_candidates.isEmpty()) {
log.error("Could not find node candidates for controller node; requirements: {}",
NebulousApp.getControllerRequirements(appUUID));
getControllerRequirements(appUUID));
return;
}
NodeCandidate controller_candidate = controller_candidates.get(0);
@@ -240,7 +315,7 @@ public class NebulousAppDeployer {
// ------------------------------------------------------------
// 4. Submit job
log.debug("Starting job");
log.debug("Starting job {}", appUUID);
String return_job_id = NebulousApp.getSalConnector().submitJob(appUUID);
if (return_job_id.equals("-1")) {
log.error("Failed to add start job {}, SAL returned {}",
@@ -255,7 +330,7 @@ public class NebulousAppDeployer {
// ------------------------------------------------------------
// 6. Create worker nodes from requirements
log.debug("Starting worker nodes");
log.debug("Starting worker nodes for {}", appUUID);
for (Map.Entry<String, List<Requirement>> e : requirements.entrySet()) {
List<NodeCandidate> candidates = NebulousApp.getSalConnector().findNodeCandidates(e.getValue());
if (candidates.isEmpty()) {
@@ -263,11 +338,14 @@ public class NebulousAppDeployer {
return;
}
NodeCandidate candidate = candidates.get(0);
// Here we specify the node names that we (hope to) use for node
// affinity declarations in KubeVela
IaasDefinition def = new IaasDefinition(
e.getKey(), "nebulous-worker", candidate.getId(), candidate.getCloud().getId()
);
// TODO: can we collect all nodes app-wide and submit them at once?
success = NebulousApp.getSalConnector().addNodes(List.of(def), appUUID);
int n = nodeCounts.get(e.getKey());
log.debug("Asking for {} copies of {} for application {}", n, candidate, appUUID);
success = NebulousApp.getSalConnector().addNodes(Collections.nCopies(n, def), appUUID);
if (!success) {
log.error("Failed to add node: {}", candidate);
}
@@ -277,6 +355,7 @@ public class NebulousAppDeployer {
// 7. Rewrite KubeVela file, based on running node names
// TODO
JsonNode rewritten = addNodeAffinities(kubevela);
// ------------------------------------------------------------
// 8. Submit KubeVela file to coordinator node
@@ -284,4 +363,24 @@ public class NebulousAppDeployer {
// TODO
}
/**
* Redeploy a running application.
*/
public static void redeployApplication(NebulousApp app, ObjectNode kubevela) {
// The overall flow:
//
// 1. Extract node requirements and node counts from the updated
// KubeVela definition.
// 2. Extract current nodes from running SAL job
// 3. Calculate new (to be started) and superfluous (to be shutdown)
// nodes
// 4. Find node candidates for new nodes (from Step 3) according to
// their requirements (from Step 1)
// 5. Create nodes, add them to SAL job
// 6. Rewrite KubeVela with updated node affinities
// 7. Send updated KubeVela to running cluster
// 8. Shut down superfluous nodes (from Step 3)
}
}