summaryrefslogtreecommitdiff
path: root/base/src/main/java/bjc/utils/cli/StreamTerminal.java
diff options
context:
space:
mode:
Diffstat (limited to 'base/src/main/java/bjc/utils/cli/StreamTerminal.java')
-rw-r--r--base/src/main/java/bjc/utils/cli/StreamTerminal.java185
1 files changed, 141 insertions, 44 deletions
diff --git a/base/src/main/java/bjc/utils/cli/StreamTerminal.java b/base/src/main/java/bjc/utils/cli/StreamTerminal.java
index a45a22e..185891f 100644
--- a/base/src/main/java/bjc/utils/cli/StreamTerminal.java
+++ b/base/src/main/java/bjc/utils/cli/StreamTerminal.java
@@ -6,7 +6,16 @@ import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
+import java.util.function.Consumer;
+import bjc.data.Either;
+
+/**
+ * Implementation of {@link Terminal} using {@link Reader} and {@link Writer}
+ *
+ * @author bjcul
+ *
+ */
public class StreamTerminal implements Terminal, Runnable {
private SortedSet<Long> pendingRequests;
private ConcurrentMap<Long, String> pendingReplies;
@@ -21,9 +30,32 @@ public class StreamTerminal implements Terminal, Runnable {
private Scanner inputScanner;
private Writer output;
+ private String prefix;
+ private Consumer<String> mode;
+
private long currentRequest = -1;
+ /**
+ * Create a new stream terminal.
+ *
+ * @param input The input source
+ * @param output The output source
+ */
public StreamTerminal(Reader input, Writer output) {
+ this(input, output, null, null);
+ }
+
+ /**
+ * Create a new stream terminal which backs an application
+ *
+ * @param input The input source
+ * @param output The output source
+ * @param prefix The string any input must start with to be directed to the
+ * terminal
+ * @param mode The place where all input not directed to the terminal is
+ * written.
+ */
+ public StreamTerminal(Reader input, Writer output, String prefix, Consumer<String> mode) {
this.inputScanner = new Scanner(input);
this.output = output;
@@ -33,6 +65,9 @@ public class StreamTerminal implements Terminal, Runnable {
this.replyLock = new ReentrantLock();
this.replyCondition = replyLock.newCondition();
+
+ this.prefix = prefix;
+ this.mode = mode;
}
@Override
@@ -40,6 +75,9 @@ public class StreamTerminal implements Terminal, Runnable {
running = true;
try {
output.write(INFO_STARTCOMPROC.toString() + "\n");
+ while (!pendingOutput.isEmpty())
+ output.write(pendingOutput.remove());
+ output.flush();
} catch (IOException e) {
// TODO Consider if there is some better way to handle these
throw new RuntimeException(e);
@@ -49,57 +87,71 @@ public class StreamTerminal implements Terminal, Runnable {
try {
while (!pendingOutput.isEmpty())
output.write(pendingOutput.remove());
+ output.flush();
String ln = inputScanner.nextLine();
- String com = "";
- int spcIdx = ln.indexOf(' ');
- if (spcIdx == -1) {
- com = ln;
- } else {
- com = ln.substring(0, spcIdx);
- ln = ln.substring(spcIdx + 1);
- }
- comswt: switch (com) {
- case "r": {
- // General command format is 'r <request no.>,<reply>
- String subRep = ln.substring(2);
- // Process a reply
- int comIndex = subRep.indexOf(',');
- long repNo = 0;
- if (comIndex == -1) {
- // Reply to the oldest message by default
- repNo = pendingRequests.first();
+ if (prefix != null && ln.startsWith(prefix)) {
+ ln = ln.substring(prefix.length());
+ String com = "";
+ int spcIdx = ln.indexOf(' ');
+ if (spcIdx == -1) {
+ com = ln;
} else {
- String repStr = subRep.substring(0, comIndex);
- try {
- repNo = Long.parseLong(repStr);
- } catch (NumberFormatException nfex) {
- output.write(ERROR_INVREPNO.toString() + "\n");
- continue overall;
- }
- // Skip over the comma
- subRep = subRep.substring(comIndex + 1);
+ com = ln.substring(0, spcIdx);
+ ln = ln.substring(spcIdx + 1);
}
- if (!pendingRequests.contains(repNo)) {
- output.write(ERROR_UNKREPNO.toString() + "\n");
- continue overall;
- }
+ comswt: switch (com) {
+ case "r": {
+ // General command format is 'r <request no.>,<reply>
+ String subRep = ln.substring(2);
+ // Process a reply
+ int comIndex = subRep.indexOf(',');
+ long repNo = 0;
+ if (comIndex == -1) {
+ // Reply to the oldest message by default
+ repNo = pendingRequests.first();
+ } else {
+ String repStr = subRep.substring(0, comIndex);
+ try {
+ repNo = Long.parseLong(repStr);
+ } catch (NumberFormatException nfex) {
+ output.write(ERROR_INVREPNO.toString() + "\n");
+ output.flush();
+ continue overall;
+ }
+ // Skip over the comma
+ subRep = subRep.substring(comIndex + 1);
+ }
- pendingRequests.remove(repNo);
- pendingReplies.put(repNo, subRep);
+ if (!pendingRequests.contains(repNo)) {
+ output.write(ERROR_UNKREPNO.toString() + "\n");
+ output.flush();
+ continue overall;
+ }
- replyLock.lock();
- replyCondition.signalAll();
- replyLock.unlock();
- break comswt;
- }
- case "q":
- running = false;
- break comswt;
- default:
- output.write(ERROR_UNRECCOM.toString() + "\n");
+ pendingRequests.remove(repNo);
+ pendingReplies.put(repNo, subRep);
+
+ replyLock.lock();
+ replyCondition.signalAll();
+ replyLock.unlock();
+ break comswt;
+ }
+ case "q":
+ running = false;
+ break comswt;
+ default:
+ output.write(ERROR_UNRECCOM.toString() + "\n");
+ output.flush();
+ }
+ } else {
+ mode.accept(ln);
}
+
+ while (!pendingOutput.isEmpty())
+ output.write(pendingOutput.remove());
+ output.flush();
} catch (IOException ioex) {
throw new RuntimeException(ioex);
}
@@ -107,7 +159,10 @@ public class StreamTerminal implements Terminal, Runnable {
running = false;
try {
+ while (!pendingOutput.isEmpty())
+ output.write(pendingOutput.remove());
output.write(INFO_ENDCOMPROC.toString() + "\n");
+ output.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -139,6 +194,25 @@ public class StreamTerminal implements Terminal, Runnable {
}
@Override
+ public Optional<String> awaitReply(long id, TimeUnit unit, long delay) throws InterruptedException {
+ if (pendingReplies.containsKey(id))
+ return Optional.of(pendingReplies.get(id));
+ while (true) {
+ replyLock.lock();
+ boolean stat = replyCondition.await(delay, unit);
+ replyLock.unlock();
+
+ // If we timed out, say so
+ if (stat == false)
+ return Optional.empty();
+ // Explanation: Since the reply map is add-only, the lock isn't actually
+ // protecting anything. We just want to wait until a response is received.
+ if (pendingReplies.containsKey(id))
+ return Optional.of(pendingReplies.get(id));
+ }
+ }
+
+ @Override
public Optional<String> checkReply(long id) {
return Optional.ofNullable(pendingReplies.get(id));
}
@@ -147,6 +221,29 @@ public class StreamTerminal implements Terminal, Runnable {
public String submitRequestSync(String req) throws InterruptedException {
return awaitReply(submitRequest(req));
}
+
+ @Override
+ public Either<String, Long> submitRequestSync(String req, TimeUnit unit, long delay) throws InterruptedException {
+ long id = submitRequest(req);
+ Optional<String> rep = awaitReply(id, unit, delay);
+ return rep.isEmpty() ? Either.right(id) : Either.left(rep.get());
+ }
- // TODO add variants of the two blocking methods above with timeout support
+ /**
+ * Add a string to be printed next time output is printed.
+ *
+ * @param out The output
+ */
+ public void addOutput(String out) {
+ pendingOutput.add(out);
+ }
+
+ /**
+ * Set the mode for handling non-terminal input
+ *
+ * @param mode The new mode for handling non-terminal input
+ */
+ public void setMode(Consumer<String> mode) {
+ this.mode = mode;
+ }
} \ No newline at end of file