For local processing, send out AMPL file

... only if connected to ActiveMQ, of course.

Change-Id: Id886f440f8fbe0dda1c620c5aabeeb82b17b5604
This commit is contained in:
Rudi Schlatte
2024-01-21 18:31:12 +01:00
parent 3860e14902
commit 9256d6231a
3 changed files with 31 additions and 13 deletions

View File

@@ -5,10 +5,12 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.nebulouscloud.exn.core.Publisher;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine.Command; import picocli.CommandLine.Command;
import picocli.CommandLine.Parameters; import picocli.CommandLine.Parameters;
@@ -31,17 +33,27 @@ public class LocalExecution implements Callable<Integer> {
@Override public Integer call() { @Override public Integer call() {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
CountDownLatch exn_synchronizer = new CountDownLatch(1);
ExnConnector connector = main.getActiveMQConnector();
Publisher publisher = null;
if (connector != null) {
publisher = connector.getAmplMessagePublisher();
connector.start(exn_synchronizer);
}
JsonNode msg; JsonNode msg;
try { try {
msg = mapper.readTree(Files.readString(app_creation_msg, StandardCharsets.UTF_8)); msg = mapper.readTree(Files.readString(app_creation_msg, StandardCharsets.UTF_8));
} catch (IOException e) { } catch (IOException e) {
log.error("Could not read an input file: ", e); log.error("Could not read an input file: ", e);
return 1; return 1;
} }
NebulousApp app = NebulousApp.newFromAppMessage(msg, NebulousApp app = NebulousApp.newFromAppMessage(msg, publisher);
main.getActiveMQConnector() == null ? null : main.getActiveMQConnector().getAmplMessagePublisher()); if (connector != null) {
log.info("Sending AMPL to channel {}", publisher);
app.sendAMPL();
}
System.out.println(app.generateAMPL()); System.out.println(app.generateAMPL());
// TODO: wait for solver reply here?
return 0; return 0;
} }
} }

View File

@@ -112,7 +112,10 @@ public class Main implements Callable<Integer> {
} }
/** /**
* Initialization code shared between main and subcommands. * Initialization code shared between main and subcommands. Note that
* here we connect to SAL if possible, but (for now) do not start the EXN
* ActiveMQ middleware. Each main method needs to call
* `activeMQConnector.start`.
*/ */
private void init() { private void init() {
log.info("Beginning common startup of optimiser-controller"); log.info("Beginning common startup of optimiser-controller");
@@ -123,7 +126,6 @@ public class Main implements Callable<Integer> {
log.error("Connection to SAL unsuccessful"); log.error("Connection to SAL unsuccessful");
} else { } else {
log.info("Established connection to SAL"); log.info("Established connection to SAL");
// FIXME: remove this once we have the exn connector
NebulousApp.setSalConnector(salConnector); NebulousApp.setSalConnector(salConnector);
} }
} else { } else {
@@ -134,14 +136,13 @@ public class Main implements Callable<Integer> {
log.info("Preparing ActiveMQ connection: host={} port={}", log.info("Preparing ActiveMQ connection: host={} port={}",
activemq_host, activemq_port); activemq_host, activemq_port);
activeMQConnector activeMQConnector
= new ExnConnector(activemq_host, activemq_port, = new ExnConnector(activemq_host, activemq_port,
activemq_user, activemq_password, activemq_user, activemq_password,
new ConnectorHandler() { new ConnectorHandler() {
public void onReady(AtomicReference<Context> context) { public void onReady(AtomicReference<Context> context) {
log.info("Optimiser-controller connected to ActiveMQ"); log.info("Optimiser-controller connected to ActiveMQ");
}
} }
); });
} else { } else {
log.info("ActiveMQ login info not set, only operating locally."); log.info("ActiveMQ login info not set, only operating locally.");
} }

View File

@@ -162,6 +162,7 @@ public class NebulousApp {
} }
this.UUID = app_message.at(uuid_path).textValue(); this.UUID = app_message.at(uuid_path).textValue();
this.app_name = app_message.at(name_path).textValue(); this.app_name = app_message.at(name_path).textValue();
log.info("New App instantiated: Name='{}', UUID='{}'", app_name, UUID);
} }
/** /**
@@ -306,6 +307,10 @@ public class NebulousApp {
* Calculate AMPL file and send it off to the right channel. * Calculate AMPL file and send it off to the right channel.
*/ */
public void sendAMPL() { public void sendAMPL() {
if (ampl_message_channel == null) {
log.error("AMPL publisher not set, cannot send AMPL file");
return;
}
String ampl = generateAMPL(); String ampl = generateAMPL();
ObjectNode msg = mapper.createObjectNode(); ObjectNode msg = mapper.createObjectNode();
msg.put(getUUID() + ".ampl", ampl); msg.put(getUUID() + ".ampl", ampl);