summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--base/src/main/java/bjc/utils/cli/StreamTerminal.java156
-rw-r--r--base/src/test/java/bjc/utils/test/cli/StreamTerminalTest.java16
2 files changed, 120 insertions, 52 deletions
diff --git a/base/src/main/java/bjc/utils/cli/StreamTerminal.java b/base/src/main/java/bjc/utils/cli/StreamTerminal.java
index 87370f8..185891f 100644
--- a/base/src/main/java/bjc/utils/cli/StreamTerminal.java
+++ b/base/src/main/java/bjc/utils/cli/StreamTerminal.java
@@ -6,6 +6,7 @@ import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
+import java.util.function.Consumer;
import bjc.data.Either;
@@ -29,15 +30,32 @@ public class StreamTerminal implements Terminal, Runnable {
private Scanner inputScanner;
private Writer output;
+ private String prefix;
+ private Consumer<String> mode;
+
private long currentRequest = -1;
/**
* Create a new stream terminal.
*
- * @param input The input source
+ * @param input The input source
* @param output The output source
*/
public StreamTerminal(Reader input, Writer output) {
+ this(input, output, null, null);
+ }
+
+ /**
+ * Create a new stream terminal which backs an application
+ *
+ * @param input The input source
+ * @param output The output source
+ * @param prefix The string any input must start with to be directed to the
+ * terminal
+ * @param mode The place where all input not directed to the terminal is
+ * written.
+ */
+ public StreamTerminal(Reader input, Writer output, String prefix, Consumer<String> mode) {
this.inputScanner = new Scanner(input);
this.output = output;
@@ -47,6 +65,9 @@ public class StreamTerminal implements Terminal, Runnable {
this.replyLock = new ReentrantLock();
this.replyCondition = replyLock.newCondition();
+
+ this.prefix = prefix;
+ this.mode = mode;
}
@Override
@@ -54,6 +75,9 @@ public class StreamTerminal implements Terminal, Runnable {
running = true;
try {
output.write(INFO_STARTCOMPROC.toString() + "\n");
+ while (!pendingOutput.isEmpty())
+ output.write(pendingOutput.remove());
+ output.flush();
} catch (IOException e) {
// TODO Consider if there is some better way to handle these
throw new RuntimeException(e);
@@ -63,57 +87,71 @@ public class StreamTerminal implements Terminal, Runnable {
try {
while (!pendingOutput.isEmpty())
output.write(pendingOutput.remove());
+ output.flush();
String ln = inputScanner.nextLine();
- String com = "";
- int spcIdx = ln.indexOf(' ');
- if (spcIdx == -1) {
- com = ln;
- } else {
- com = ln.substring(0, spcIdx);
- ln = ln.substring(spcIdx + 1);
- }
- comswt: switch (com) {
- case "r": {
- // General command format is 'r <request no.>,<reply>
- String subRep = ln.substring(2);
- // Process a reply
- int comIndex = subRep.indexOf(',');
- long repNo = 0;
- if (comIndex == -1) {
- // Reply to the oldest message by default
- repNo = pendingRequests.first();
+ if (prefix != null && ln.startsWith(prefix)) {
+ ln = ln.substring(prefix.length());
+ String com = "";
+ int spcIdx = ln.indexOf(' ');
+ if (spcIdx == -1) {
+ com = ln;
} else {
- String repStr = subRep.substring(0, comIndex);
- try {
- repNo = Long.parseLong(repStr);
- } catch (NumberFormatException nfex) {
- output.write(ERROR_INVREPNO.toString() + "\n");
- continue overall;
- }
- // Skip over the comma
- subRep = subRep.substring(comIndex + 1);
+ com = ln.substring(0, spcIdx);
+ ln = ln.substring(spcIdx + 1);
}
- if (!pendingRequests.contains(repNo)) {
- output.write(ERROR_UNKREPNO.toString() + "\n");
- continue overall;
- }
+ comswt: switch (com) {
+ case "r": {
+ // General command format is 'r <request no.>,<reply>
+ String subRep = ln.substring(2);
+ // Process a reply
+ int comIndex = subRep.indexOf(',');
+ long repNo = 0;
+ if (comIndex == -1) {
+ // Reply to the oldest message by default
+ repNo = pendingRequests.first();
+ } else {
+ String repStr = subRep.substring(0, comIndex);
+ try {
+ repNo = Long.parseLong(repStr);
+ } catch (NumberFormatException nfex) {
+ output.write(ERROR_INVREPNO.toString() + "\n");
+ output.flush();
+ continue overall;
+ }
+ // Skip over the comma
+ subRep = subRep.substring(comIndex + 1);
+ }
- pendingRequests.remove(repNo);
- pendingReplies.put(repNo, subRep);
+ if (!pendingRequests.contains(repNo)) {
+ output.write(ERROR_UNKREPNO.toString() + "\n");
+ output.flush();
+ continue overall;
+ }
- replyLock.lock();
- replyCondition.signalAll();
- replyLock.unlock();
- break comswt;
- }
- case "q":
- running = false;
- break comswt;
- default:
- output.write(ERROR_UNRECCOM.toString() + "\n");
+ pendingRequests.remove(repNo);
+ pendingReplies.put(repNo, subRep);
+
+ replyLock.lock();
+ replyCondition.signalAll();
+ replyLock.unlock();
+ break comswt;
+ }
+ case "q":
+ running = false;
+ break comswt;
+ default:
+ output.write(ERROR_UNRECCOM.toString() + "\n");
+ output.flush();
+ }
+ } else {
+ mode.accept(ln);
}
+
+ while (!pendingOutput.isEmpty())
+ output.write(pendingOutput.remove());
+ output.flush();
} catch (IOException ioex) {
throw new RuntimeException(ioex);
}
@@ -121,7 +159,10 @@ public class StreamTerminal implements Terminal, Runnable {
running = false;
try {
+ while (!pendingOutput.isEmpty())
+ output.write(pendingOutput.remove());
output.write(INFO_ENDCOMPROC.toString() + "\n");
+ output.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -151,7 +192,7 @@ public class StreamTerminal implements Terminal, Runnable {
return pendingReplies.get(id);
}
}
-
+
@Override
public Optional<String> awaitReply(long id, TimeUnit unit, long delay) throws InterruptedException {
if (pendingReplies.containsKey(id))
@@ -160,9 +201,10 @@ public class StreamTerminal implements Terminal, Runnable {
replyLock.lock();
boolean stat = replyCondition.await(delay, unit);
replyLock.unlock();
-
+
// If we timed out, say so
- if (stat == false) return Optional.empty();
+ if (stat == false)
+ return Optional.empty();
// Explanation: Since the reply map is add-only, the lock isn't actually
// protecting anything. We just want to wait until a response is received.
if (pendingReplies.containsKey(id))
@@ -179,11 +221,29 @@ public class StreamTerminal implements Terminal, Runnable {
public String submitRequestSync(String req) throws InterruptedException {
return awaitReply(submitRequest(req));
}
-
+
@Override
public Either<String, Long> submitRequestSync(String req, TimeUnit unit, long delay) throws InterruptedException {
long id = submitRequest(req);
Optional<String> rep = awaitReply(id, unit, delay);
return rep.isEmpty() ? Either.right(id) : Either.left(rep.get());
}
+
+ /**
+ * Add a string to be printed next time output is printed.
+ *
+ * @param out The output
+ */
+ public void addOutput(String out) {
+ pendingOutput.add(out);
+ }
+
+ /**
+ * Set the mode for handling non-terminal input
+ *
+ * @param mode The new mode for handling non-terminal input
+ */
+ public void setMode(Consumer<String> mode) {
+ this.mode = mode;
+ }
} \ No newline at end of file
diff --git a/base/src/test/java/bjc/utils/test/cli/StreamTerminalTest.java b/base/src/test/java/bjc/utils/test/cli/StreamTerminalTest.java
index de300d2..c080d6f 100644
--- a/base/src/test/java/bjc/utils/test/cli/StreamTerminalTest.java
+++ b/base/src/test/java/bjc/utils/test/cli/StreamTerminalTest.java
@@ -3,7 +3,8 @@ package bjc.utils.test.cli;
import static org.junit.Assert.*;
import java.io.*;
-import java.util.Scanner;
+import java.util.*;
+import java.util.function.Consumer;
import org.junit.Test;
@@ -36,7 +37,10 @@ public class StreamTerminalTest {
InputStreamReader outPipeReader = new InputStreamReader(outPipeIn);
OutputStreamWriter outPipeWriter = new OutputStreamWriter(outPipeOut);
- StreamTerminal terminal = new StreamTerminal(inPipeReader, outPipeWriter);
+ List<String> repList = new ArrayList<>();
+ Consumer<String> repAction = repList::add;
+
+ StreamTerminal terminal = new StreamTerminal(inPipeReader, outPipeWriter, "/", repAction);
long reqID1 = terminal.submitRequest("Request 1");
long reqID2 = terminal.submitRequest("Request 2");
@@ -45,8 +49,9 @@ public class StreamTerminalTest {
assertEquals(1, reqID2);
inPipeWriter.write("r 0,A\n");
- inPipeWriter.write("r 1,B\n");
- inPipeWriter.write("q\n");
+ inPipeWriter.write("/r 0,A\n");
+ inPipeWriter.write("/r 1,B\n");
+ inPipeWriter.write("/q\n");
inPipeWriter.flush();
inPipeWriter.close();
@@ -75,6 +80,9 @@ public class StreamTerminalTest {
assertEquals("A", terminal.awaitReply(reqID1));
assertEquals("B", terminal.awaitReply(reqID2));
+
+ assertEquals(1, repList.size());
+ assertEquals("r 0,A", repList.get(0));
} catch (IOException ioex) {
throw new RuntimeException(ioex);
} catch (InterruptedException iex) {