Deploy application using cluster endpoints
- Implement calls to SAL cluster endpoints. - Sort node candidates returned from broker: first by rank (lower is better), then by score (higher is better). - Find node candidates for each replica individually, since multiple replicas can be deployed on different node candidates in the case of edge nodes. - Make sure to assign BYON, EDGE nodes only once. Change-Id: Ibae503fdb11446995d1b216b05429742657d5ac0
This commit is contained in:
		| @@ -23,18 +23,23 @@ import com.fasterxml.jackson.databind.node.ArrayNode; | |||||||
| import com.fasterxml.jackson.databind.node.ObjectNode; | import com.fasterxml.jackson.databind.node.ObjectNode; | ||||||
|  |  | ||||||
| import java.util.Arrays; | import java.util.Arrays; | ||||||
| import java.util.HashMap; |  | ||||||
| import java.util.List; | import java.util.List; | ||||||
| import java.util.Map; | import java.util.Map; | ||||||
|  | import java.util.Set; | ||||||
| import java.util.concurrent.CountDownLatch; | import java.util.concurrent.CountDownLatch; | ||||||
|  | import java.util.stream.Collectors; | ||||||
|  |  | ||||||
| /** | /** | ||||||
|  * A class that connects to the EXN middleware and starts listening to |  * A class that connects to the EXN middleware and starts listening to | ||||||
|  * messages from the ActiveMQ server. |  * messages from the ActiveMQ server. | ||||||
|  * |  * | ||||||
|  * This class will drive the main behavior of the optimiser-controller: the |  * <p>This class will drive the main behavior of the optimiser-controller: the | ||||||
|  * `Consumer` objects created in {@link ExnConnector#ExnConnector} receive |  * `Consumer` objects created in {@link ExnConnector#ExnConnector} receive | ||||||
|  * incoming messages and react to them, sending out messages in turn. |  * incoming messages and react to them, sending out messages in turn. | ||||||
|  |  * | ||||||
|  |  * <p>The class also provides methods wrapping the exn-sal middleware | ||||||
|  |  * endpoints, converting from raw JSON responses to sal-common datatypes where | ||||||
|  |  * possible. | ||||||
|  */ |  */ | ||||||
| @Slf4j | @Slf4j | ||||||
| public class ExnConnector { | public class ExnConnector { | ||||||
| @@ -67,31 +72,24 @@ public class ExnConnector { | |||||||
|     // ---------------------------------------- |     // ---------------------------------------- | ||||||
|     // Communication with SAL |     // Communication with SAL | ||||||
|  |  | ||||||
|     // We define these publishers here instead of in the `SalConnector` |  | ||||||
|     // class since they need to be registered here and I'm afraid I will |  | ||||||
|     // forget to do it when adding new endpoints over in another class. |  | ||||||
|  |  | ||||||
|     /** The createJob endpoint. */ |     /** The createJob endpoint. */ | ||||||
|     public static final SyncedPublisher createJob |     public final SyncedPublisher createJob; | ||||||
|         = new SyncedPublisher("createJob", |  | ||||||
|             "eu.nebulouscloud.exn.sal.job.post", true, true); |  | ||||||
|     /** The findNodeCandidates endpoint.  Should not be used during normal |     /** The findNodeCandidates endpoint.  Should not be used during normal | ||||||
|       * operation--ask the broker instead. */ |       * operation--ask the broker instead. */ | ||||||
|     public static final SyncedPublisher findNodeCandidates |     public final SyncedPublisher findSalNodeCandidates; | ||||||
|         = new SyncedPublisher("findNodeCandidates", |     /** The findNodeCandidates endpoint (Broker's version).  This one adds | ||||||
|             "eu.nebulouscloud.exn.sal.nodecandidate.get", true, true); |       * attributes "score", "rank" to the answer it gets from SAL. */ | ||||||
|     /** The findNodeCandidates endpoint (Broker's version). */ |     public final SyncedPublisher findBrokerNodeCandidates; | ||||||
|     public static final SyncedPublisher findBrokerNodeCandidates |     /** The defineCluster endpoint. */ | ||||||
|         = new SyncedPublisher("findBrokerNodeCandidates", |     public final SyncedPublisher defineCluster; | ||||||
|             "eu.nebulouscloud.cfsb.get_node_candidates", true, true); |     /** The deployCluster endpoint. */ | ||||||
|     /** The addNodes endpoint. */ |     public final SyncedPublisher deployCluster; | ||||||
|     public static final SyncedPublisher addNodes |     /** The deployApplication endpoint. */ | ||||||
|         = new SyncedPublisher("addNodes", |     public final SyncedPublisher deployApplication; | ||||||
|             "eu.nebulouscloud.exn.sal.nodes.add", true, true); |     /** The scaleOut endpoint. */ | ||||||
|     /** The submitJob endpoint. */ |     public final SyncedPublisher scaleOut; | ||||||
|     public static final SyncedPublisher submitJob |     /** The scaleIn endpoint. */ | ||||||
|         = new SyncedPublisher("submitJob", |     public final SyncedPublisher scaleIn; | ||||||
|             "eu.nebulouscloud.exn.sal.job.update", true, true); |  | ||||||
|  |  | ||||||
|     /** |     /** | ||||||
|      * Create a connection to ActiveMQ via the exn middleware, and set up the |      * Create a connection to ActiveMQ via the exn middleware, and set up the | ||||||
| @@ -107,12 +105,19 @@ public class ExnConnector { | |||||||
|      */ |      */ | ||||||
|     public ExnConnector(String host, int port, String name, String password, ConnectorHandler callback) { |     public ExnConnector(String host, int port, String name, String password, ConnectorHandler callback) { | ||||||
|         amplMessagePublisher = new Publisher("controller_ampl", ampl_message_channel, true, true); |         amplMessagePublisher = new Publisher("controller_ampl", ampl_message_channel, true, true); | ||||||
|  |         createJob = new SyncedPublisher("createJob", "eu.nebulouscloud.exn.sal.job.post", true, true); | ||||||
|  |         findSalNodeCandidates = new SyncedPublisher("findSalNodeCandidates", "eu.nebulouscloud.exn.sal.nodecandidate.get", true, true); | ||||||
|  |         findBrokerNodeCandidates = new SyncedPublisher("findBrokerNodeCandidates", "eu.nebulouscloud.cfsb.get_node_candidates", true, true); | ||||||
|  |         defineCluster = new SyncedPublisher("defineCluster", "eu.nebulouscloud.exn.sal.cluster.define", true, true); | ||||||
|  |         deployCluster = new SyncedPublisher("deployCluster", "eu.nebulouscloud.exn.sal.cluster.deploy", true, true); | ||||||
|  |         deployApplication = new SyncedPublisher("deployApplication", "eu.nebulouscloud.exn.sal.cluster.deployApplication", true, true); | ||||||
|  |         scaleOut = new SyncedPublisher("scaleOut", "eu.nebulouscloud.exn.sal.cluster.scaleout", true, true); | ||||||
|  |         scaleIn = new SyncedPublisher("scaleIn", "eu.nebulouscloud.exn.sal.cluster.scalein", true, true); | ||||||
|  |  | ||||||
|         conn = new Connector("optimiser_controller", |         conn = new Connector("optimiser_controller", | ||||||
|             callback, |             callback, | ||||||
|             // List.of(new Publisher("config", "config", true)), |  | ||||||
|             List.of(amplMessagePublisher, |             List.of(amplMessagePublisher, | ||||||
|                 createJob, findNodeCandidates, findBrokerNodeCandidates, addNodes, submitJob), |                 findSalNodeCandidates, findBrokerNodeCandidates, defineCluster, deployCluster, deployApplication, scaleOut, scaleIn), | ||||||
|             List.of( |             List.of( | ||||||
|                 new Consumer("ui_app_messages", app_creation_channel, |                 new Consumer("ui_app_messages", app_creation_channel, | ||||||
|                     new AppCreationMessageHandler(), true, true), |                     new AppCreationMessageHandler(), true, true), | ||||||
| @@ -140,7 +145,7 @@ public class ExnConnector { | |||||||
|     /** |     /** | ||||||
|      * Disconnect from ActiveMQ and stop all Consumer processes.  Also count |      * Disconnect from ActiveMQ and stop all Consumer processes.  Also count | ||||||
|      * down the countdown latch passed in the {@link |      * down the countdown latch passed in the {@link | ||||||
|      * ExnConnector#start(CountDownLatch)} method if applicable. |      * #start(CountDownLatch)} method if applicable. | ||||||
|      */ |      */ | ||||||
|     public synchronized void stop() { |     public synchronized void stop() { | ||||||
|         conn.stop(); |         conn.stop(); | ||||||
| @@ -150,6 +155,9 @@ public class ExnConnector { | |||||||
|         log.debug("ExnConnector stopped."); |         log.debug("ExnConnector stopped."); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     // ---------------------------------------- | ||||||
|  |     // Message Handlers | ||||||
|  |  | ||||||
|     /** |     /** | ||||||
|      * A message handler that processes app creation messages coming in via |      * A message handler that processes app creation messages coming in via | ||||||
|      * `eu.nebulouscloud.ui.dsl.generic`.  Such messages contain, among |      * `eu.nebulouscloud.ui.dsl.generic`.  Such messages contain, among | ||||||
| @@ -215,4 +223,334 @@ public class ExnConnector { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     // ---------------------------------------- | ||||||
|  |     // Communication with SAL | ||||||
|  |  | ||||||
|  |     /** | ||||||
|  |      * Extract and check the SAL response from an exn-middleware response. | ||||||
|  |      * The SAL response will be valid JSON encoded as a string in the "body" | ||||||
|  |      * field of the response.  If the response is of the following form, log | ||||||
|  |      * an error and return a missing node instead: | ||||||
|  |      * | ||||||
|  |      * <pre>{@code | ||||||
|  |      * { | ||||||
|  |      *   "key": <known exception key>, | ||||||
|  |      *   "message": "some error message" | ||||||
|  |      * } | ||||||
|  |      * }</pre> | ||||||
|  |      * | ||||||
|  |      * @param response The response from exn-middleware. | ||||||
|  |      * @param appID The application ID, used for logging only. | ||||||
|  |      * @return The SAL response as a parsed JsonNode, or a node where {@code | ||||||
|  |      *  isMissingNode()} will return true if SAL reported an error. | ||||||
|  |      */ | ||||||
|  |     private static JsonNode extractPayloadFromExnResponse(Map<String, Object> response, String appID) { | ||||||
|  |         String body = (String)response.get("body"); | ||||||
|  |         JsonNode payload = mapper.missingNode(); | ||||||
|  | 	try { | ||||||
|  | 	    payload = mapper.readTree(body); | ||||||
|  | 	} catch (JsonProcessingException e) { | ||||||
|  |             log.error("Could not read message body as JSON: " + body, keyValue("appId", appID), e); | ||||||
|  |             return mapper.missingNode(); | ||||||
|  | 	} | ||||||
|  |         // These messages are listed in the {@code AbstractProcessor} class of | ||||||
|  |         // the exn-middleware project. | ||||||
|  |         if (Set.of("generic-exception-error", | ||||||
|  |             "gateway-client-exception-error", | ||||||
|  |             "gateway-server-exception-error") | ||||||
|  |             .contains(payload.at("/key").asText()) | ||||||
|  |             && !payload.at("/message").isMissingNode()) { | ||||||
|  |             log.error("exn-middleware-sal request failed with error type '{}' and message '{}'", | ||||||
|  |                 payload.at("/key").asText(), | ||||||
|  |                 payload.at("/message").asText(), | ||||||
|  |                 keyValue("appId", appID)); | ||||||
|  |             return mapper.missingNode(); | ||||||
|  |         } | ||||||
|  |         return payload; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /** | ||||||
|  |      * Get list of node candidates from the resource broker that fulfill the | ||||||
|  |      * given requirements, and sort them by rank and score so that better node | ||||||
|  |      * candidates come first in the result. | ||||||
|  |      * | ||||||
|  |      * <p>A candidate is better than another one if it has a lower rank or, if | ||||||
|  |      * the rank is equal, a higher score. | ||||||
|  |      * | ||||||
|  |      * @param requirements The list of requirements. | ||||||
|  |      * @param appID The application ID. | ||||||
|  |      * @return A sorted List containing node candidates, better candidates | ||||||
|  |      *  first. | ||||||
|  |      */ | ||||||
|  |     public List<NodeCandidate> findNodeCandidates(List<Requirement> requirements, String appID) { | ||||||
|  |         Map<String, Object> msg; | ||||||
|  |         try { | ||||||
|  |             msg = Map.of( | ||||||
|  |                 "metaData", Map.of("user", "admin"), | ||||||
|  |                 "body", mapper.writeValueAsString(requirements)); | ||||||
|  |         } catch (JsonProcessingException e) { | ||||||
|  |             log.error("Could not convert requirements list to JSON string (this should never happen)", | ||||||
|  |                 keyValue("appId", appID), e); | ||||||
|  |             return null; | ||||||
|  |         } | ||||||
|  |         Map<String, Object> response = findBrokerNodeCandidates.sendSync(msg, appID, null, false); | ||||||
|  |         // Note: we do not call extractPayloadFromExnResponse here, since this | ||||||
|  |         // response does not come from the exn-middleware. | ||||||
|  |         ObjectNode jsonBody = mapper.convertValue(response, ObjectNode.class); | ||||||
|  |         // Note: what we would really like to do here is something like: | ||||||
|  |         //     return Arrays.asList(mapper.readValue(response, NodeCandidate[].class)); | ||||||
|  |         // But since the broker adds two attributes, the array elements cannot | ||||||
|  |         // be deserialized into org.ow2.proactive.sal.model.NodeCandidate | ||||||
|  |         // objects. | ||||||
|  |         List<JsonNode> result = Arrays.asList(mapper.convertValue(jsonBody.withArray("/body"), JsonNode[].class)); | ||||||
|  |         result.sort((JsonNode c1, JsonNode c2) -> { | ||||||
|  |                 long rank1 = c1.at("/rank").longValue(); | ||||||
|  |                 long rank2 = c2.at("/rank").longValue(); | ||||||
|  |                 double score1 = c1.at("/score").doubleValue(); | ||||||
|  |                 double score2 = c2.at("/score").doubleValue(); | ||||||
|  |                 // We return < 0 if c1 < c2.  Since we want to sort better | ||||||
|  |                 // candidates first, c1 < c2 if rank is lower or rank is equal | ||||||
|  |                 // and score is higher. (Lower rank = better, higher score = | ||||||
|  |                 // better.) | ||||||
|  |                 if (rank1 != rank2) return Math.toIntExact(rank1 - rank2); | ||||||
|  |                 else return Math.toIntExact(Math.round(score2 - score1)); | ||||||
|  |             }); | ||||||
|  |         return result.stream() | ||||||
|  |             .map(candidate -> | ||||||
|  |                 mapper.convertValue( | ||||||
|  |                     ((ObjectNode)candidate).deepCopy().remove(List.of("score", "rank")), | ||||||
|  |                     NodeCandidate.class)) | ||||||
|  |             .collect(Collectors.toList()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /** | ||||||
|  |      * Get list of node candidates from the resource broker that fulfil the | ||||||
|  |      * given requirements. | ||||||
|  |      * | ||||||
|  |      * <p>Note that we cannot convert the result to a list containing {@code | ||||||
|  |      * org.ow2.proactive.sal.model.NodeCandidate} instances, since the broker | ||||||
|  |      * adds the additional fields {@code score} and {@code ranking}.  Instead | ||||||
|  |      * we return a JSON {@code ArrayNode} containing {@code ObjectNode}s in | ||||||
|  |      * the format specified at | ||||||
|  |      * https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/nodecandidates-endpoints.md#71--filter-node-candidates-endpoint | ||||||
|  |      * but with these two additional attributes. | ||||||
|  |      * | ||||||
|  |      * @param requirements The list of requirements. | ||||||
|  |      * @param appID The application ID. | ||||||
|  |      * @return A list containing node candidates, or null in case of error. | ||||||
|  |      */ | ||||||
|  |     public List<NodeCandidate> findNodeCandidatesFromSal(List<Requirement> requirements, String appID) { | ||||||
|  |         Map<String, Object> msg; | ||||||
|  |         try { | ||||||
|  |             msg = Map.of( | ||||||
|  |                 "metaData", Map.of("user", "admin"), | ||||||
|  |                 "body", mapper.writeValueAsString(requirements)); | ||||||
|  |         } catch (JsonProcessingException e) { | ||||||
|  |             log.error("Could not convert requirements list to JSON string (this should never happen)", | ||||||
|  |                 keyValue("appId", appID), e); | ||||||
|  |             return null; | ||||||
|  |         } | ||||||
|  |         Map<String, Object> response = findSalNodeCandidates.sendSync(msg, appID, null, false); | ||||||
|  |         JsonNode payload = extractPayloadFromExnResponse(response, appID); | ||||||
|  |         if (payload.isMissingNode()) return null; | ||||||
|  |         try { | ||||||
|  | 	    return Arrays.asList(mapper.treeToValue(payload, NodeCandidate[].class)); | ||||||
|  | 	} catch (JsonProcessingException e) { | ||||||
|  |             log.error("Could not decode node candidates payload", keyValue("appId", appID), e); | ||||||
|  |             return null; | ||||||
|  | 	} | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /** | ||||||
|  |      * Define a cluster with the given name and node list. | ||||||
|  |      * | ||||||
|  |      * <p>The nodes are passed in a JSON array containing objects of the | ||||||
|  |      * following shape: | ||||||
|  |      * | ||||||
|  |      * <pre>{@code | ||||||
|  |      * { | ||||||
|  |      *   "nodeName": "some-component", | ||||||
|  |      *   "nodeCandidateId": "some-candidate-id", | ||||||
|  |      *   "cloudId": "some-cloud-id" | ||||||
|  |      * } | ||||||
|  |      * }</pre> | ||||||
|  |      * | ||||||
|  |      * <p>Each value for {@code nodeName} has to be unique, and should be | ||||||
|  |      * either the name of the master node or the name of a node that will | ||||||
|  |      * subsequently be referenced in the affinity trait of the modified | ||||||
|  |      * kubevela file (see {@link NebulousAppDeployer#addNodeAffinities()}). | ||||||
|  |      * | ||||||
|  |      * <p>The values for {@code nodeCandidateId} and {@code cloudId} come from | ||||||
|  |      * the return value of a call to {@link #findNodeCandidates()}. | ||||||
|  |      * | ||||||
|  |      * <p>Note that this method could be rewritten to accept the nodes as a | ||||||
|  |      * {@code List<org.ow2.proactive.sal.model.IaasNode>} instead, if that is | ||||||
|  |      * more convenient. | ||||||
|  |      * | ||||||
|  |      * @param appID The application's id, used to name the cluster. | ||||||
|  |      * @param masterNodeName The name of the master node. | ||||||
|  |      * @param nodes A JSON array containing the node definitions. | ||||||
|  |      * @return true if the cluster was successfully defined, false otherwise. | ||||||
|  |      */ | ||||||
|  |     public boolean defineCluster(String appID, String masterNodeName, ArrayNode nodes) { | ||||||
|  |         // https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed | ||||||
|  |         ObjectNode body = mapper.createObjectNode() | ||||||
|  |             .put("name", appID) | ||||||
|  |             .put("master-node", masterNodeName); | ||||||
|  |         body.putArray("nodes").addAll(nodes); | ||||||
|  |         Map<String, Object> msg; | ||||||
|  |         try { | ||||||
|  |             msg = Map.of("metaData", Map.of("user", "admin"), | ||||||
|  |                 "body", mapper.writeValueAsString(body)); | ||||||
|  |         } catch (JsonProcessingException e) { | ||||||
|  |             log.error("Could not convert JSON to string (this should never happen)", | ||||||
|  |                 keyValue("appId", appID), e); | ||||||
|  |             return false; | ||||||
|  |         } | ||||||
|  |         Map<String, Object> response = defineCluster.sendSync(msg, appID, null, false); | ||||||
|  |         JsonNode payload = extractPayloadFromExnResponse(response, appID); | ||||||
|  |         return payload.asBoolean(); | ||||||
|  |         // TODO: check if we still need to unwrap this; see | ||||||
|  |         // `AbstractProcessor.groovy#normalizeResponse` and bug 2055053 | ||||||
|  |         // https://opendev.org/nebulous/exn-middleware/src/commit/ffc2ca7bdf657b3831d2b803ff2b84d5e8e1bdcd/exn-middleware-core/src/main/groovy/eu/nebulouscloud/exn/modules/sal/processors/AbstractProcessor.groovy#L111 | ||||||
|  |         // https://bugs.launchpad.net/nebulous/+bug/2055053 | ||||||
|  |         // return payload.at("/success").asBoolean(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /** | ||||||
|  |      * Get the definition of a cluster created by {@link #defineCluster}. | ||||||
|  |      * | ||||||
|  |      * @param appID The application ID, as used to define the cluster. | ||||||
|  |      * @return The cluster definition, or null in case of error. | ||||||
|  |      */ | ||||||
|  |     public JsonNode getCluster(String appID) { | ||||||
|  |         Map<String, Object> msg; | ||||||
|  |         msg = Map.of("metaData", Map.of("user", "admin", "clusterName", appID)); | ||||||
|  |         Map<String, Object> response = deployCluster.sendSync(msg, appID, null, false); | ||||||
|  |         JsonNode payload = extractPayloadFromExnResponse(response, appID); | ||||||
|  |         return payload.isMissingNode() ? null : payload; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /** | ||||||
|  |      * Deploy a cluster created by {@link #defineCluster}. | ||||||
|  |      * | ||||||
|  |      * @param appID The application's id, used to name the cluster. | ||||||
|  |      * @return true if the cluster was successfully deployed, false otherwise. | ||||||
|  |      */ | ||||||
|  |     public boolean deployCluster(String appID) { | ||||||
|  |         // https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed | ||||||
|  |         ObjectNode body = mapper.createObjectNode() | ||||||
|  |             .put("applicationId", appID); | ||||||
|  |         Map<String, Object> msg; | ||||||
|  |         try { | ||||||
|  |             msg = Map.of("metaData", Map.of("user", "admin"), | ||||||
|  |                 "body", mapper.writeValueAsString(body)); | ||||||
|  |         } catch (JsonProcessingException e) { | ||||||
|  |             log.error("Could not convert JSON to string (this should never happen)", | ||||||
|  |                 keyValue("appId", appID), e); | ||||||
|  |             return false; | ||||||
|  |         } | ||||||
|  |         Map<String, Object> response = deployCluster.sendSync(msg, appID, null, false); | ||||||
|  |         JsonNode payload = extractPayloadFromExnResponse(response, appID); | ||||||
|  |         return payload.asBoolean(); | ||||||
|  |         // TODO: check if we still need to unwrap this; see | ||||||
|  |         // `AbstractProcessor.groovy#normalizeResponse` and bug 2055053 | ||||||
|  |         // https://opendev.org/nebulous/exn-middleware/src/commit/ffc2ca7bdf657b3831d2b803ff2b84d5e8e1bdcd/exn-middleware-core/src/main/groovy/eu/nebulouscloud/exn/modules/sal/processors/AbstractProcessor.groovy#L111 | ||||||
|  |         // https://bugs.launchpad.net/nebulous/+bug/2055053 | ||||||
|  |         // return payload.at("/success").asBoolean(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /** | ||||||
|  |      * Submit a KubeVela file to a deployed cluster. | ||||||
|  |      * | ||||||
|  |      * @param appID The application's id. | ||||||
|  |      * @param kubevela The KubeVela file, with node affinity traits | ||||||
|  |      *  corresponding to the cluster definintion. | ||||||
|  |      * @return true if the application was successfully deployed, false otherwise. | ||||||
|  |      */ | ||||||
|  |     public boolean deployApplication(String appID, String kubevela) { | ||||||
|  |         // https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed | ||||||
|  |         ObjectNode body = mapper.createObjectNode() | ||||||
|  |             .put("applicationId", appID) | ||||||
|  |             .put("KubevelaYaml", kubevela); | ||||||
|  |         Map<String, Object> msg; | ||||||
|  |         try { | ||||||
|  |             msg = Map.of("metaData", Map.of("user", "admin"), | ||||||
|  |                 "body", mapper.writeValueAsString(body)); | ||||||
|  |         } catch (JsonProcessingException e) { | ||||||
|  |             log.error("Could not convert JSON to string (this should never happen)", | ||||||
|  |                 keyValue("appId", appID), e); | ||||||
|  |             return false; | ||||||
|  |         } | ||||||
|  |         Map<String, Object> response = deployApplication.sendSync(msg, appID, null, false); | ||||||
|  |         JsonNode payload = extractPayloadFromExnResponse(response, appID); | ||||||
|  |         return payload.asBoolean(); | ||||||
|  |         // TODO: check if we still need to unwrap this; see | ||||||
|  |         // `AbstractProcessor.groovy#normalizeResponse` and bug 2055053 | ||||||
|  |         // https://opendev.org/nebulous/exn-middleware/src/commit/ffc2ca7bdf657b3831d2b803ff2b84d5e8e1bdcd/exn-middleware-core/src/main/groovy/eu/nebulouscloud/exn/modules/sal/processors/AbstractProcessor.groovy#L111 | ||||||
|  |         // https://bugs.launchpad.net/nebulous/+bug/2055053 | ||||||
|  |         // return payload.at("/success").asBoolean(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /** | ||||||
|  |      * Add new nodes to a deployed cluster. | ||||||
|  |      * | ||||||
|  |      * <p>The new nodes are specified in the same way as in {@link | ||||||
|  |      * #defineCluster()}. | ||||||
|  |      * | ||||||
|  |      * @param appID The application's id. | ||||||
|  |      * @param additionalWorkers The additional nodes to add. | ||||||
|  |      */ | ||||||
|  |     // TODO: deserialize response into sal-common `Cluster` | ||||||
|  |     public void scaleOut(String appID, ArrayNode additionalNodes) { | ||||||
|  |         // https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed | ||||||
|  |         ObjectNode body = mapper.createObjectNode() | ||||||
|  |             .put("applicationId", appID); | ||||||
|  |         body.putArray("workers").addAll(additionalNodes); | ||||||
|  |         Map<String, Object> msg; | ||||||
|  |         try { | ||||||
|  |             msg = Map.of("metaData", Map.of("user", "admin"), | ||||||
|  |                 "body", mapper.writeValueAsString(body)); | ||||||
|  |         } catch (JsonProcessingException e) { | ||||||
|  |             log.error("Could not convert JSON to string (this should never happen)", | ||||||
|  |                 keyValue("appId", appID), e); | ||||||
|  |             return; | ||||||
|  |         } | ||||||
|  |         Map<String, Object> response = scaleOut.sendSync(msg, appID, null, false); | ||||||
|  |         // Called for side-effect only; we want to log errors | ||||||
|  |         JsonNode payload = extractPayloadFromExnResponse(response, appID); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /** | ||||||
|  |      * Remove nodes from a deployed cluster. | ||||||
|  |      * | ||||||
|  |      * @param appID The application's id. | ||||||
|  |      * @param superfluousNodes The names of nodes to be removed. | ||||||
|  |      * @return true if the call was successful, false otherwise. | ||||||
|  |      */ | ||||||
|  |     public boolean scaleIn(String appID, List<String> superfluousNodes) { | ||||||
|  |         // NOTE: not yet defined in | ||||||
|  |         // https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed | ||||||
|  |         ArrayNode body = mapper.createArrayNode(); | ||||||
|  |         superfluousNodes.forEach(nodeName -> body.add(nodeName)); | ||||||
|  |         Map<String, Object> msg; | ||||||
|  |         try { | ||||||
|  |             msg = Map.of("metaData", Map.of("user", "admin"), | ||||||
|  |                 "body", mapper.writeValueAsString(body)); | ||||||
|  |         } catch (JsonProcessingException e) { | ||||||
|  |             log.error("Could not convert JSON to string (this should never happen)", | ||||||
|  |                 keyValue("appId", appID), e); | ||||||
|  |             return false; | ||||||
|  |         } | ||||||
|  |         Map<String, Object> response = scaleIn.sendSync(msg, appID, null, false); | ||||||
|  |         JsonNode payload = extractPayloadFromExnResponse(response, appID); | ||||||
|  |         return payload.asBoolean(); | ||||||
|  |         // TODO: check if we still need to unwrap this; see | ||||||
|  |         // `AbstractProcessor.groovy#normalizeResponse` and bug 2055053 | ||||||
|  |         // https://opendev.org/nebulous/exn-middleware/src/commit/ffc2ca7bdf657b3831d2b803ff2b84d5e8e1bdcd/exn-middleware-core/src/main/groovy/eu/nebulouscloud/exn/modules/sal/processors/AbstractProcessor.groovy#L111 | ||||||
|  |         // https://bugs.launchpad.net/nebulous/+bug/2055053 | ||||||
|  |         // return payload.at("/success").asBoolean(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |  | ||||||
| } | } | ||||||
|   | |||||||
| @@ -10,7 +10,6 @@ import java.util.concurrent.CountDownLatch; | |||||||
| import com.fasterxml.jackson.databind.JsonNode; | import com.fasterxml.jackson.databind.JsonNode; | ||||||
| import com.fasterxml.jackson.databind.ObjectMapper; | import com.fasterxml.jackson.databind.ObjectMapper; | ||||||
|  |  | ||||||
| import eu.nebulouscloud.exn.core.Publisher; |  | ||||||
| import lombok.extern.slf4j.Slf4j; | import lombok.extern.slf4j.Slf4j; | ||||||
| import picocli.CommandLine.Command; | import picocli.CommandLine.Command; | ||||||
| import picocli.CommandLine.Parameters; | import picocli.CommandLine.Parameters; | ||||||
|   | |||||||
| @@ -12,6 +12,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; | |||||||
|  |  | ||||||
| import eu.nebulouscloud.exn.core.Publisher; | import eu.nebulouscloud.exn.core.Publisher; | ||||||
| import lombok.Getter; | import lombok.Getter; | ||||||
|  | import lombok.Setter; | ||||||
| import lombok.extern.slf4j.Slf4j; | import lombok.extern.slf4j.Slf4j; | ||||||
| import static net.logstash.logback.argument.StructuredArguments.keyValue; | import static net.logstash.logback.argument.StructuredArguments.keyValue; | ||||||
|  |  | ||||||
| @@ -22,6 +23,7 @@ import java.nio.file.Path; | |||||||
| import java.util.HashMap; | import java.util.HashMap; | ||||||
| import java.util.HashSet; | import java.util.HashSet; | ||||||
| import java.util.Iterator; | import java.util.Iterator; | ||||||
|  | import java.util.List; | ||||||
| import java.util.Map; | import java.util.Map; | ||||||
| import java.util.Set; | import java.util.Set; | ||||||
| import java.util.Spliterator; | import java.util.Spliterator; | ||||||
| @@ -29,6 +31,8 @@ import java.util.Spliterators; | |||||||
| import java.util.stream.Collectors; | import java.util.stream.Collectors; | ||||||
| import java.util.stream.StreamSupport; | import java.util.stream.StreamSupport; | ||||||
|  |  | ||||||
|  | import org.ow2.proactive.sal.model.Requirement; | ||||||
|  |  | ||||||
| /** | /** | ||||||
|  * Internal representation of a NebulOus app. |  * Internal representation of a NebulOus app. | ||||||
|  */ |  */ | ||||||
| @@ -97,6 +101,14 @@ public class NebulousApp { | |||||||
|     @Getter private JsonNode originalAppMessage; |     @Getter private JsonNode originalAppMessage; | ||||||
|     private ObjectNode original_kubevela; |     private ObjectNode original_kubevela; | ||||||
|  |  | ||||||
|  |     /** | ||||||
|  |      * The current "generation" of deployment.  Initial deployment sets this | ||||||
|  |      * to 1, each subsequent redeployment increases by 1.  This value is used | ||||||
|  |      * to name node instances generated during that deployment. | ||||||
|  |      */ | ||||||
|  |     @Getter @Setter | ||||||
|  |     private int deployGeneration = 0; | ||||||
|  |  | ||||||
|     /** |     /** | ||||||
|      * Map of component name to machine name(s) deployed for that component. |      * Map of component name to machine name(s) deployed for that component. | ||||||
|      * Component names are defined in the KubeVela file.  We assume that |      * Component names are defined in the KubeVela file.  We assume that | ||||||
| @@ -111,9 +123,20 @@ public class NebulousApp { | |||||||
|  |  | ||||||
|     /** When an app gets deployed or redeployed, this is where we send the AMPL file */ |     /** When an app gets deployed or redeployed, this is where we send the AMPL file */ | ||||||
|     private Publisher ampl_message_channel; |     private Publisher ampl_message_channel; | ||||||
|     /** Have we ever been deployed?  I.e., when we rewrite KubeVela, are there |     // /** Have we ever been deployed?  I.e., when we rewrite KubeVela, are there | ||||||
|      * already nodes running for us? */ |     //  * already nodes running for us? */ | ||||||
|     private boolean deployed = false; |     // private boolean deployed = false; | ||||||
|  |  | ||||||
|  |     /** The KubeVela as it was most recently sent to the app's controller. */ | ||||||
|  |     @Getter @Setter | ||||||
|  |     private JsonNode deployedKubevela; | ||||||
|  |     /** For each KubeVela component, the number of deployed nodes.  All nodes | ||||||
|  |       * will be identical wrt machine type etc. */ | ||||||
|  |     @Getter @Setter | ||||||
|  |     private Map<String, Integer> deployedNodeCounts; | ||||||
|  |     /** For each KubeVela component, the requirements for its node(s). */ | ||||||
|  |     @Getter @Setter | ||||||
|  |     private Map<String, List<Requirement>> deployedNodeRequirements; | ||||||
|  |  | ||||||
|     /** |     /** | ||||||
|      * The EXN connector for this class.  At the moment all apps share the |      * The EXN connector for this class.  At the moment all apps share the | ||||||
| @@ -250,25 +273,6 @@ public class NebulousApp { | |||||||
|         return readKubevelaString(Files.readString(Path.of(path), StandardCharsets.UTF_8)); |         return readKubevelaString(Files.readString(Path.of(path), StandardCharsets.UTF_8)); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /** |  | ||||||
|      * Set "deployed" status. Will typically be set to true once, and then |  | ||||||
|      * never to false again. |  | ||||||
|      * |  | ||||||
|      * @param deployed the new status. |  | ||||||
|      */ |  | ||||||
|     public void setDeployed(boolean deployed) { |  | ||||||
|         this.deployed = deployed; |  | ||||||
|     } |  | ||||||
|     /** |  | ||||||
|      * Check if the app has been deployed, i.e., if there are already VMs |  | ||||||
|      * allocated from SAL for us. |  | ||||||
|      * |  | ||||||
|      * @return false if we never asked for nodes, true otherwise. |  | ||||||
|      */ |  | ||||||
|     public boolean isDeployed() { |  | ||||||
|         return deployed; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     /** |     /** | ||||||
|      * Check that all parameters have a name, type and path, and that the |      * Check that all parameters have a name, type and path, and that the | ||||||
|      * target path can be found in the original KubeVela file. |      * target path can be found in the original KubeVela file. | ||||||
| @@ -433,7 +437,7 @@ public class NebulousApp { | |||||||
|         } |         } | ||||||
|         ObjectNode variables = solution.withObjectProperty("VariableValues"); |         ObjectNode variables = solution.withObjectProperty("VariableValues"); | ||||||
|         ObjectNode kubevela = rewriteKubevelaWithSolution(variables); |         ObjectNode kubevela = rewriteKubevelaWithSolution(variables); | ||||||
|         if (isDeployed()) { |         if (deployGeneration > 0) { | ||||||
|             // We assume that killing a node will confuse the application's |             // We assume that killing a node will confuse the application's | ||||||
|             // Kubernetes cluster, therefore: |             // Kubernetes cluster, therefore: | ||||||
|             // 1. Recalculate node sets |             // 1. Recalculate node sets | ||||||
|   | |||||||
| @@ -7,8 +7,8 @@ import java.util.Map; | |||||||
| import java.util.Set; | import java.util.Set; | ||||||
| import eu.nebulouscloud.optimiser.kubevela.KubevelaAnalyzer; | import eu.nebulouscloud.optimiser.kubevela.KubevelaAnalyzer; | ||||||
| import org.ow2.proactive.sal.model.AttributeRequirement; | import org.ow2.proactive.sal.model.AttributeRequirement; | ||||||
| import org.ow2.proactive.sal.model.CommandsInstallation; |  | ||||||
| import org.ow2.proactive.sal.model.NodeCandidate; | import org.ow2.proactive.sal.model.NodeCandidate; | ||||||
|  | import org.ow2.proactive.sal.model.NodeCandidate.NodeCandidateTypeEnum; | ||||||
| import org.ow2.proactive.sal.model.NodeType; | import org.ow2.proactive.sal.model.NodeType; | ||||||
| import org.ow2.proactive.sal.model.NodeTypeRequirement; | import org.ow2.proactive.sal.model.NodeTypeRequirement; | ||||||
| import org.ow2.proactive.sal.model.OperatingSystemFamily; | import org.ow2.proactive.sal.model.OperatingSystemFamily; | ||||||
| @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode; | |||||||
| import com.fasterxml.jackson.databind.node.ObjectNode; | import com.fasterxml.jackson.databind.node.ObjectNode; | ||||||
| import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; | import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; | ||||||
|  |  | ||||||
| import lombok.Getter; |  | ||||||
| import lombok.extern.slf4j.Slf4j; | import lombok.extern.slf4j.Slf4j; | ||||||
| import static net.logstash.logback.argument.StructuredArguments.keyValue; | import static net.logstash.logback.argument.StructuredArguments.keyValue; | ||||||
|  |  | ||||||
| @@ -33,24 +32,9 @@ import static net.logstash.logback.argument.StructuredArguments.keyValue; | |||||||
| @Slf4j | @Slf4j | ||||||
| public class NebulousAppDeployer { | public class NebulousAppDeployer { | ||||||
|  |  | ||||||
|     // TODO: find out the commands to initialize the controller |  | ||||||
|     /** |  | ||||||
|      * The installation scripts to send to SAL for the NebulOuS controller |  | ||||||
|      * node. |  | ||||||
|      */ |  | ||||||
|     @Getter |  | ||||||
|     private static CommandsInstallation controllerInstallation = new CommandsInstallation(); |  | ||||||
|  |  | ||||||
|     private static final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); |     private static final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); | ||||||
|     private static final ObjectMapper mapper = new ObjectMapper(); |     private static final ObjectMapper mapper = new ObjectMapper(); | ||||||
|  |  | ||||||
|     // TODO: find out the commands to initialize the workers |  | ||||||
|     /** |  | ||||||
|      * The installation scripts to send to SAL for a NebulOuS worker node. |  | ||||||
|      */ |  | ||||||
|     @Getter |  | ||||||
|     private static CommandsInstallation nodeInstallation = new CommandsInstallation(); |  | ||||||
|  |  | ||||||
|     /** |     /** | ||||||
|      * The requirements of the node running the NebulOuS controller. |      * The requirements of the node running the NebulOuS controller. | ||||||
|      * This machine runs the Kubernetes cluster and KubeVela.  For |      * This machine runs the Kubernetes cluster and KubeVela.  For | ||||||
| @@ -122,23 +106,38 @@ public class NebulousAppDeployer { | |||||||
|      * Given a KubeVela file, extract node requirements, create the job, start |      * Given a KubeVela file, extract node requirements, create the job, start | ||||||
|      * its nodes and submit KubeVela. |      * its nodes and submit KubeVela. | ||||||
|      * |      * | ||||||
|      * NOTE: this method is under reconstruction, pending the new endpoints. |      * <p>NOTE: this method modifies the NebulousApp object state, storing | ||||||
|  |      * various facts about the deployed cluster. | ||||||
|      * |      * | ||||||
|      * @param app the NebulOuS app object. |      * <p>NOTE: this method is under reconstruction, pending the new | ||||||
|  |      * endpoints. | ||||||
|  |      * | ||||||
|  |      * @param app The NebulOuS app object. | ||||||
|      * @param kubevela the KubeVela file to deploy. |      * @param kubevela the KubeVela file to deploy. | ||||||
|      */ |      */ | ||||||
|     public static void deployApplication(NebulousApp app, JsonNode kubevela) { |     public static void deployApplication(NebulousApp app, JsonNode kubevela) { | ||||||
|         String appUUID = app.getUUID(); |         String appUUID = app.getUUID(); | ||||||
|  |         ExnConnector conn = app.getExnConnector(); | ||||||
|  |         Set<NodeCandidate> chosenEdgeCandidates = new HashSet<>(); | ||||||
|         log.info("Starting initial deployment for application", keyValue("appId", appUUID)); |         log.info("Starting initial deployment for application", keyValue("appId", appUUID)); | ||||||
|  |  | ||||||
|  |         int deployGeneration = app.getDeployGeneration() + 1; | ||||||
|  |         app.setDeployGeneration(deployGeneration); | ||||||
|  |  | ||||||
|         // The overall flow: |         // The overall flow: | ||||||
|         // |         // | ||||||
|         // 1. Extract node requirements and node counts from the KubeVela |         // 1. Extract node requirements and node counts from the KubeVela | ||||||
|         //    definition. |         //    definition. | ||||||
|         // 2. Find node candidates for all workers and the controller. |         // 2. Ask resource broker for node candidates for all workers and the | ||||||
|         // 3. Select node candidates. |         //    controller. | ||||||
|  |         // 3. Select node candidates, making sure to only select edge nodes | ||||||
|  |         //    once. | ||||||
|         // 4. Create a SAL cluster. |         // 4. Create a SAL cluster. | ||||||
|         // 5. Deploy the SAL cluster. |         // 5. Deploy the SAL cluster. | ||||||
|  |         // 6. Add node affinity traits to the KubeVela file. | ||||||
|  |         // 7. Deploy the SAL application. | ||||||
|  |         // 8. Store cluster state (deployed KubeVela file, etc.) in | ||||||
|  |         //    NebulousApp object. | ||||||
|  |  | ||||||
|         // ------------------------------------------------------------ |         // ------------------------------------------------------------ | ||||||
|         // 1. Extract node requirements |         // 1. Extract node requirements | ||||||
| @@ -152,29 +151,46 @@ public class NebulousAppDeployer { | |||||||
|         // ---------------------------------------- |         // ---------------------------------------- | ||||||
|         // 2. Find node candidates |         // 2. Find node candidates | ||||||
|  |  | ||||||
|         // ArrayNode controllerCandidates = SalConnector.findNodeCandidates(controllerRequirements, appUUID); |         List<NodeCandidate> controllerCandidates = conn.findNodeCandidates(controllerRequirements, appUUID); | ||||||
|         // if (controllerCandidates.isEmpty()) { |         if (controllerCandidates.isEmpty()) { | ||||||
|         //     log.error("Could not find node candidates for requirements: {}", |             log.error("Could not find node candidates for requirements: {}", | ||||||
|         //         controllerRequirements, keyValue("appId", appUUID)); |                 controllerRequirements, keyValue("appId", appUUID)); | ||||||
|         //     // Continue here while we don't really deploy |             // Continue here while we don't really deploy | ||||||
|         //     // return; |             // return; | ||||||
|         // } |         } | ||||||
|         // Map<String, ArrayNode> workerCandidates = new HashMap<>(); |         Map<String, List<NodeCandidate>> workerCandidates = new HashMap<>(); | ||||||
|         // for (Map.Entry<String, List<Requirement>> e : workerRequirements.entrySet()) { |         for (Map.Entry<String, List<Requirement>> e : workerRequirements.entrySet()) { | ||||||
|         //     String nodeName = e.getKey(); |             String nodeName = e.getKey(); | ||||||
|         //     List<Requirement> requirements = e.getValue(); |             List<Requirement> requirements = e.getValue(); | ||||||
|         //     ArrayNode candidates = SalConnector.findNodeCandidates(requirements, appUUID); |             List<NodeCandidate> candidates = conn.findNodeCandidates(requirements, appUUID); | ||||||
|         //     if (candidates.isEmpty()) { |             if (candidates.isEmpty()) { | ||||||
|         //         log.error("Could not find node candidates for requirements: {}", requirements); |                 log.error("Could not find node candidates for requirements: {}", requirements, | ||||||
|         //         // Continue here while we don't really deploy |                     keyValue("appId", appUUID)); | ||||||
|         //         // return; |                 // Continue here while we don't really deploy | ||||||
|         //     } |                 // return; | ||||||
|         //     workerCandidates.put(nodeName, candidates); |             } | ||||||
|         // } |             workerCandidates.put(nodeName, candidates); | ||||||
|  |         } | ||||||
|  |  | ||||||
|         // ------------------------------------------------------------ |         // ------------------------------------------------------------ | ||||||
|         // 3. Select node candidates |         // 3. Select node candidates | ||||||
|  |  | ||||||
|  |         // Controller node | ||||||
|  |         log.debug("Deciding on controller node candidate", keyValue("appId", appUUID)); | ||||||
|  |         NodeCandidate masterNodeCandidate = null; | ||||||
|  |         if (controllerCandidates.size() > 0) { | ||||||
|  |             masterNodeCandidate = controllerCandidates.get(0); | ||||||
|  |             if (Set.of(NodeCandidateTypeEnum.BYON, NodeCandidateTypeEnum.EDGE) | ||||||
|  |                 .contains(masterNodeCandidate.getNodeCandidateType())) { | ||||||
|  |                 // Mark this candidate as already chosen | ||||||
|  |                 chosenEdgeCandidates.add(masterNodeCandidate); | ||||||
|  |             } | ||||||
|  |         } else { | ||||||
|  |             log.error("Empty node candidate list for controller, continuing without creating node", | ||||||
|  |                 keyValue("appId", appUUID)); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         // Component nodes | ||||||
|         log.debug("Collecting worker nodes for {}", appUUID, keyValue("appId", appUUID)); |         log.debug("Collecting worker nodes for {}", appUUID, keyValue("appId", appUUID)); | ||||||
|         Map<String, NodeCandidate> nodeNameToCandidate = new HashMap<>(); |         Map<String, NodeCandidate> nodeNameToCandidate = new HashMap<>(); | ||||||
|         for (Map.Entry<String, List<Requirement>> e : workerRequirements.entrySet()) { |         for (Map.Entry<String, List<Requirement>> e : workerRequirements.entrySet()) { | ||||||
| @@ -185,28 +201,30 @@ public class NebulousAppDeployer { | |||||||
|             int numberOfNodes = nodeCounts.get(componentName); |             int numberOfNodes = nodeCounts.get(componentName); | ||||||
|             Set<String> nodeNames = new HashSet<>(); |             Set<String> nodeNames = new HashSet<>(); | ||||||
|             for (int i = 1; i <= numberOfNodes; i++) { |             for (int i = 1; i <= numberOfNodes; i++) { | ||||||
|                 String nodeName = String.format("%s-%s", componentName, i); |                 String nodeName = String.format("%s-%s-%s", componentName, deployGeneration, i); | ||||||
|  |                 List<NodeCandidate> candidates = workerCandidates.get(componentName); | ||||||
|  |  | ||||||
|  |                 if (candidates.size() == 0) { | ||||||
|  |                     log.error("Empty node candidate list for component ~s, continuing without creating node", componentName, keyValue("appId", appUUID)); | ||||||
|  |                     continue; | ||||||
|  |                 } | ||||||
|  |  | ||||||
|  |                 NodeCandidate candidate = candidates.stream() | ||||||
|  |                     .filter(each -> !chosenEdgeCandidates.contains(each)) | ||||||
|  |                     .findFirst() | ||||||
|  |                     .orElse(null); | ||||||
|  |                 if (Set.of(NodeCandidateTypeEnum.BYON, NodeCandidateTypeEnum.EDGE).contains(candidate.getNodeCandidateType())) { | ||||||
|  |                     // We could remove this candidate from `candidates` here, | ||||||
|  |                     // to save skipping over already-assigned edge nodes for | ||||||
|  |                     // the next replica of this component, but we don't want | ||||||
|  |                     // to make assumptions on whether the candidate list can | ||||||
|  |                     // be modified.  Note that we have to keep track of all | ||||||
|  |                     // assigned edge nodes in any case, since they might be | ||||||
|  |                     // candidates in subsequent components. | ||||||
|  |                     chosenEdgeCandidates.add(candidate); | ||||||
|  |                 } | ||||||
|  |                 nodeNameToCandidate.put(nodeName, candidate); | ||||||
|                 nodeNames.add(nodeName); |                 nodeNames.add(nodeName); | ||||||
|                 // TODO: choose the node candidate with the highest score |  | ||||||
|                 // and/or ranking. |  | ||||||
|  |  | ||||||
|                 // TODO: Here we need to discriminate between edge and cloud |  | ||||||
|                 // node candidates: we can deploy an edge node only once, but |  | ||||||
|                 // cloud nodes arbitrarily often.  So if the best node |  | ||||||
|                 // candidate is an edge node, we should select it and fill the |  | ||||||
|                 // rest of the nodes with second-best cloud nodes. |  | ||||||
|  |  | ||||||
|                 // // TODO: make sure we only choose the same edge node once; it |  | ||||||
|                 // // might be in all node candidate lists :) |  | ||||||
|                 // if (!workerCandidates.get(componentName).isEmpty()) { |  | ||||||
|                 //     // should always be true, except currently we don't abort |  | ||||||
|                 //     // in Step 2 if we don't find candidates. |  | ||||||
|                 //     JsonNode candidate = workerCandidates.get(componentName).get(0); |  | ||||||
|                 //     NodeCandidate c = mapper.convertValue(((ObjectNode)candidate).deepCopy() |  | ||||||
|                 //         .remove(List.of("score", "ranking")), |  | ||||||
|                 //         NodeCandidate.class); |  | ||||||
|                 //     nodeNameToCandidate.put(nodeName, c); |  | ||||||
|                 // } |  | ||||||
|             } |             } | ||||||
|             app.getComponentMachineNames().put(componentName, nodeNames); |             app.getComponentMachineNames().put(componentName, nodeNames); | ||||||
|         } |         } | ||||||
| @@ -216,30 +234,57 @@ public class NebulousAppDeployer { | |||||||
|         // ------------------------------------------------------------ |         // ------------------------------------------------------------ | ||||||
|         // 4. Create cluster |         // 4. Create cluster | ||||||
|  |  | ||||||
|         // TODO: call defineCluster endpoint with nodename -> candidate |         String masterNodeName = "masternode"; // safe because all component node names end with a number | ||||||
|         // mapping etc. |         ObjectNode cluster = mapper.createObjectNode(); | ||||||
|  |         cluster.put("name", appUUID) | ||||||
|  |             .put("master-node", masterNodeName); | ||||||
|  |         ArrayNode nodes = cluster.withArray("nodes"); | ||||||
|  |         if (masterNodeCandidate != null) { | ||||||
|  |             nodes.addObject() | ||||||
|  |                 .put("nodeName", masterNodeName) | ||||||
|  |                 .put("nodeCandidateId", masterNodeCandidate.getId()) | ||||||
|  |                 .put("cloudId", masterNodeCandidate.getCloud().getId()); | ||||||
|  |         } | ||||||
|  |         nodeNameToCandidate.forEach((name, candidate) -> { | ||||||
|  |                 nodes.addObject() | ||||||
|  |                     .put("nodeName", name) | ||||||
|  |                     .put("nodeCandidateId", candidate.getId()) | ||||||
|  |                     .put("cloudId", candidate.getCloud().getId()); | ||||||
|  |             }); | ||||||
|  |         boolean defineClusterSuccess = conn.defineCluster(appUUID, masterNodeName, null); | ||||||
|  |  | ||||||
|         // ------------------------------------------------------------ |         // ------------------------------------------------------------ | ||||||
|         // 5. Deploy cluster |         // 5. Deploy cluster | ||||||
|  |         boolean deployClusterSuccess = conn.deployCluster(appUUID); | ||||||
|  |  | ||||||
|         // TODO: call deployCluster endpoint |         // ------------------------------------------------------------ | ||||||
|  |         // 6. Rewrite KubeVela | ||||||
|         JsonNode rewritten = addNodeAffinities(kubevela, app.getComponentMachineNames()); |         JsonNode rewritten = addNodeAffinities(kubevela, app.getComponentMachineNames()); | ||||||
|         String rewritten_kubevela = "---\n# Did not manage to create rewritten KubeVela"; |         String rewritten_kubevela = "---\n# Did not manage to create rewritten KubeVela"; | ||||||
|         try { |         try { | ||||||
|             rewritten_kubevela = yamlMapper.writeValueAsString(rewritten); |             rewritten_kubevela = yamlMapper.writeValueAsString(rewritten); | ||||||
|         } catch (JsonProcessingException e) { |         } catch (JsonProcessingException e) { | ||||||
|             log.error("Failed to convert KubeVela to YAML; this should never happen", e); |             log.error("Failed to convert KubeVela to YAML; this should never happen", keyValue("appId", appUUID), e); | ||||||
|         } |         } | ||||||
|         Main.logFile("rewritten-kubevela-" + appUUID + ".yaml", rewritten_kubevela); |         Main.logFile("rewritten-kubevela-" + appUUID + ".yaml", rewritten_kubevela); | ||||||
|  |  | ||||||
|  |         // ------------------------------------------------------------ | ||||||
|  |         // 7. Deploy application | ||||||
|  |  | ||||||
|         // TODO: call deployApplication endpoint |         // TODO: call deployApplication endpoint | ||||||
|  |  | ||||||
|  |         // ------------------------------------------------------------ | ||||||
|  |         // 8. Update NebulousApp state | ||||||
|  |  | ||||||
|  |         // TODO: store rewritten KubeVela in application object | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /** |     /** | ||||||
|      * Given a KubeVela file, adapt the running application to its specification. |      * Given a KubeVela file, adapt the running application to its | ||||||
|  |        specification. | ||||||
|      * |      * | ||||||
|      * The KubeVela file will have been rewritten with updated |      * The KubeVela file will have been rewritten with updated information | ||||||
|      * information from the solver. |      * from the solver. | ||||||
|      * |      * | ||||||
|      * NOTE: this method is under development, pending the new endpoints. |      * NOTE: this method is under development, pending the new endpoints. | ||||||
|      * |      * | ||||||
| @@ -248,16 +293,17 @@ public class NebulousAppDeployer { | |||||||
|      */ |      */ | ||||||
|     public static void redeployApplication(NebulousApp app, ObjectNode kubevela) { |     public static void redeployApplication(NebulousApp app, ObjectNode kubevela) { | ||||||
|         String appUUID = app.getUUID(); |         String appUUID = app.getUUID(); | ||||||
|         log.info("Starting redeployment of {}", appUUID); |         int deployGeneration = app.getDeployGeneration() + 1; | ||||||
|  |         app.setDeployGeneration(deployGeneration); | ||||||
|  |  | ||||||
|  |         log.info("Starting redeployment generation {}", deployGeneration, keyValue("appId", appUUID)); | ||||||
|         // The overall flow: |         // The overall flow: | ||||||
|         // |         // | ||||||
|         // 1. Extract node requirements and node counts from the updated |         // 1. Extract node requirements and node counts from the updated | ||||||
|         //    KubeVela definition. |         //    KubeVela definition. | ||||||
|         // 2. Extract current nodes from running SAL job |         // 2. Calculate new (to be started) and superfluous (to be shutdown) | ||||||
|         // 3. Calculate new (to be started) and superfluous (to be shutdown) |         //    nodes by comparing against previous deployment. | ||||||
|         //    nodes |         // 3. Find node candidates for new nodes (from Step 3) according to | ||||||
|         // 4. Find node candidates for new nodes (from Step 3) according to |  | ||||||
|         //    their requirements (from Step 1) |         //    their requirements (from Step 1) | ||||||
|         // 5. Rewrite KubeVela with updated node affinities |         // 5. Rewrite KubeVela with updated node affinities | ||||||
|         // 6. Call clusterScaleOut endpoint with list of added nodes |         // 6. Call clusterScaleOut endpoint with list of added nodes | ||||||
| @@ -269,11 +315,11 @@ public class NebulousAppDeployer { | |||||||
|         // 1. Extract node requirements |         // 1. Extract node requirements | ||||||
|         Map<String, List<Requirement>> workerRequirements = KubevelaAnalyzer.getRequirements(kubevela); |         Map<String, List<Requirement>> workerRequirements = KubevelaAnalyzer.getRequirements(kubevela); | ||||||
|         Map<String, Integer> nodeCounts = KubevelaAnalyzer.getNodeCount(kubevela); |         Map<String, Integer> nodeCounts = KubevelaAnalyzer.getNodeCount(kubevela); | ||||||
|         List<Requirement> controllerRequirements = getControllerRequirements(appUUID); |  | ||||||
|  |          | ||||||
|  |  | ||||||
|         Main.logFile("worker-requirements-" + appUUID + ".txt", workerRequirements); |         Main.logFile("worker-requirements-" + appUUID + ".txt", workerRequirements); | ||||||
|         Main.logFile("worker-counts-" + appUUID + ".txt", nodeCounts); |         Main.logFile("worker-counts-" + appUUID + ".txt", nodeCounts); | ||||||
|         Main.logFile("controller-requirements-" + appUUID + ".txt", controllerRequirements); |  | ||||||
|          |          | ||||||
|     } |     } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,73 +0,0 @@ | |||||||
| package eu.nebulouscloud.optimiser.controller; |  | ||||||
|  |  | ||||||
| import java.util.Arrays; |  | ||||||
| import java.util.HashMap; |  | ||||||
| import java.util.List; |  | ||||||
| import java.util.Map; |  | ||||||
|  |  | ||||||
| import org.ow2.proactive.sal.model.NodeCandidate; |  | ||||||
| import org.ow2.proactive.sal.model.Requirement; |  | ||||||
|  |  | ||||||
| import com.fasterxml.jackson.core.JsonProcessingException; |  | ||||||
| import com.fasterxml.jackson.databind.ObjectMapper; |  | ||||||
| import com.fasterxml.jackson.databind.node.ArrayNode; |  | ||||||
| import com.fasterxml.jackson.databind.node.ObjectNode; |  | ||||||
|  |  | ||||||
| import lombok.extern.slf4j.Slf4j; |  | ||||||
| import static net.logstash.logback.argument.StructuredArguments.keyValue; |  | ||||||
|  |  | ||||||
|  |  | ||||||
| /** |  | ||||||
|  * A class that wraps communication with SAL (the Scheduling Abstraction Layer |  | ||||||
|  * of ProActive) over EXN. |  | ||||||
|  * |  | ||||||
|  * Documentation of the SAL REST API is here: |  | ||||||
|  * https://github.com/ow2-proactive/scheduling-abstraction-layer/tree/master/documentation |  | ||||||
|  */ |  | ||||||
| @Slf4j |  | ||||||
| public class SalConnector { |  | ||||||
|  |  | ||||||
|     private SalConnector() {} |  | ||||||
|  |  | ||||||
|     private static final ObjectMapper mapper = new ObjectMapper(); |  | ||||||
|  |  | ||||||
|     /** |  | ||||||
|      * Get list of node candidates from the resource broker that fulfil the |  | ||||||
|        given requirements. |  | ||||||
|      * |  | ||||||
|      * <p>Note that we cannot convert the result to a list containing {@code |  | ||||||
|      * org.ow2.proactive.sal.model.NodeCandidate} instances, since the broker |  | ||||||
|      * adds the additional fields {@code score} and {@code ranking}.  Instead |  | ||||||
|      * we return a JSON {@code ArrayNode} containing {@code ObjectNode}s in |  | ||||||
|      * the format specified at |  | ||||||
|      * https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/nodecandidates-endpoints.md#71--filter-node-candidates-endpoint |  | ||||||
|      * but with these two additional attributes. |  | ||||||
|      * |  | ||||||
|      * @param requirements The list of requirements. |  | ||||||
|      * @param appID The application ID. |  | ||||||
|      * @return A JSON array containing node candidates. |  | ||||||
|      */ |  | ||||||
|     public static ArrayNode findNodeCandidates(List<Requirement> requirements, String appID) { |  | ||||||
|         Map<String, Object> msg; |  | ||||||
|         try { |  | ||||||
|             msg = Map.of( |  | ||||||
|                 "metaData", Map.of("user", "admin"), |  | ||||||
|                 "body", mapper.writeValueAsString(requirements)); |  | ||||||
| 	} catch (JsonProcessingException e) { |  | ||||||
|             log.error("Could not convert requirements list to JSON string (this should never happen)", |  | ||||||
|                 keyValue("appId", appID), e); |  | ||||||
|             return null; |  | ||||||
| 	} |  | ||||||
|         Map<String, Object> response = ExnConnector.findBrokerNodeCandidates.sendSync(msg, appID, null, false); |  | ||||||
|         ObjectNode jsonBody = mapper.convertValue(response, ObjectNode.class); |  | ||||||
|         // Note: what we would really like to do here is something like: |  | ||||||
|         // |  | ||||||
|         //     return Arrays.asList(mapper.readValue(response, NodeCandidate[].class)); |  | ||||||
|         // |  | ||||||
|         // But since the broker adds two attributes, the array elements cannot |  | ||||||
|         // be deserialized into org.ow2.proactive.sal.model.NodeCandidate |  | ||||||
|         // objects. |  | ||||||
|         return jsonBody.withArray("/nodes"); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
| } |  | ||||||
		Reference in New Issue
	
	Block a user
	 Rudi Schlatte
					Rudi Schlatte