package bjc.utils.cli; import static bjc.utils.cli.TerminalCodes.*; import java.io.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; import java.util.function.Consumer; import java.util.function.Function; import bjc.data.Either; import bjc.utils.funcutils.StringUtils; import bjc.utils.parserutils.TokenUtils; /** * Implementation of {@link Terminal} using {@link Reader} and {@link Writer} * * @author Ben Culkin * */ public class StreamTerminal implements Terminal, Runnable { private SortedSet pendingRequests; private ConcurrentMap pendingReplies; private Lock replyLock; private Condition replyCondition; private Queue pendingOutput; private boolean isRunning; private boolean isParagraphMode; private boolean isPromptOnContinue; private Scanner input; private Writer output; private Writer status; private String prefix; private String paragraphTerminator = ""; private String prompt = "..> "; private Consumer mode; private long currentRequest = -1; /** * Create a new stream terminal. * * @param input The input source * @param output The output source */ public StreamTerminal(Reader input, Writer output) { this(input, output, null, null); } /** * Create a new stream terminal which backs an application * * @param input The input source * @param output The output source * @param prefix The string any input must start with to be directed to the * terminal * @param mode The place where all input not directed to the terminal is * written. */ public StreamTerminal(Reader input, Writer output, String prefix, Consumer mode) { this.input = new Scanner(input); this.output = output; this.status = output; this.pendingRequests = new TreeSet<>(); this.pendingReplies = new ConcurrentHashMap<>(); this.pendingOutput = new ArrayDeque<>(); this.replyLock = new ReentrantLock(); this.replyCondition = replyLock.newCondition(); this.prefix = prefix; this.mode = mode; } @Override public void run() { isRunning = true; try { status.write(INFO_STARTCOMPROC.toString() + "\n"); status.flush(); writePendingOutput(); output.flush(); } catch (IOException e) { throw new RuntimeException(e); } StringBuilder pendingInput = new StringBuilder(); overall: while (isRunning && input.hasNextLine()) { try { writePendingOutput(); output.flush(); String ln = input.nextLine(); if (prefix != null && ln.startsWith(prefix)) { ln = ln.substring(prefix.length()); String com = ""; int spcIdx = ln.indexOf(' '); if (spcIdx == -1) { com = ln; } else { com = ln.substring(0, spcIdx); ln = ln.substring(spcIdx + 1); } comswt: switch (com) { case "c": handleConfigure(ln); break; case "r": { // General command format is 'r , String subRep = ln.substring(2); // Process a reply int comIndex = subRep.indexOf(','); long repNo = 0; if (comIndex == -1) { // Reply to the oldest message by default repNo = pendingRequests.first(); } else { String repStr = subRep.substring(0, comIndex); try { repNo = Long.parseLong(repStr); } catch (NumberFormatException nfex) { status.write(ERROR_INVREPNO.toString() + "\n"); status.flush(); continue overall; } // Skip over the comma subRep = subRep.substring(comIndex + 1); } if (!pendingRequests.contains(repNo)) { status.write(ERROR_UNKREPNO.toString() + "\n"); status.flush(); continue overall; } pendingRequests.remove(repNo); pendingReplies.put(repNo, subRep); replyLock.lock(); replyCondition.signalAll(); replyLock.unlock(); break comswt; } case "q": isRunning = false; break comswt; case "?": case "h": handleHelp(ln); break; default: status.write(ERROR_UNRECCOM.toString() + "\n"); status.flush(); } } else { if (isParagraphMode) { if (ln.equals(paragraphTerminator)) { mode.accept(pendingInput.toString()); pendingInput = new StringBuilder(); writePendingOutput(); output.flush(); continue; } // Handle line-continuation if (ln.endsWith("\\")) { pendingInput.append(ln.substring(0, ln.length() - 1)); pendingInput.append(" "); } else { pendingInput.append(ln); pendingInput.append("\n"); } addOutput(prompt); } else { mode.accept(ln); } } writePendingOutput(); output.flush(); } catch (IOException ioex) { throw new RuntimeException(ioex); } } isRunning = false; try { writePendingOutput(); output.flush(); status.write(INFO_ENDCOMPROC.toString() + "\n"); status.flush(); } catch (IOException e) { throw new RuntimeException(e); } } private void writePendingOutput() throws IOException { while (!pendingOutput.isEmpty()) output.write(pendingOutput.remove()); } private void handleHelp(String ln) { addOutput("Help not yet fully implemented"); } private void handleConfigure(String ln) { int spaceIdx = ln.indexOf(" "); String setting = ln.substring(0, spaceIdx); String setArgs = ln.substring(spaceIdx).strip(); List args = TokenUtils.processArguments(setArgs); switch(setting) { case "paraTerm": paragraphTerminator = args.get(0); break; case "para": isParagraphMode = !isParagraphMode; break; case "prompt?": isPromptOnContinue = !isPromptOnContinue; break; case "prompt": prompt = args.get(0); break; default: addOutput(ERROR_UNKCONFSET.toString() + "\n"); } } public boolean isParagraphMode() { return isParagraphMode; } public void setParagraphMode(boolean isParagraphMode) { this.isParagraphMode = isParagraphMode; } public boolean isPromptOnContinue() { return isPromptOnContinue; } public void setPromptOnContinue(boolean isPromptOnContinue) { this.isPromptOnContinue = isPromptOnContinue; } public String getParagraphTerminator() { return paragraphTerminator; } public void setParagraphTerminator(String paragraphTerminator) { this.paragraphTerminator = paragraphTerminator; } public String getPrompt() { return prompt; } public void setPrompt(String prompt) { this.prompt = prompt; } @Override public long submitRequest(String req) { long reqNo = currentRequest + 1; currentRequest += 1; pendingOutput.add(reqNo + " " + req + "\n"); pendingRequests.add(reqNo); return reqNo; } @Override public String awaitReply(long id) throws InterruptedException { if (pendingReplies.containsKey(id)) return pendingReplies.get(id); while (true) { replyLock.lock(); replyCondition.await(); replyLock.unlock(); // Explanation: Since the reply map is add-only, the lock isn't actually // protecting anything. We just want to wait until a response is received. if (pendingReplies.containsKey(id)) return pendingReplies.get(id); } } @Override public Optional awaitReply(long id, TimeUnit unit, long delay) throws InterruptedException { if (pendingReplies.containsKey(id)) return Optional.of(pendingReplies.get(id)); while (true) { replyLock.lock(); boolean stat = replyCondition.await(delay, unit); replyLock.unlock(); // If we timed out, say so if (stat == false) return Optional.empty(); // Explanation: Since the reply map is add-only, the lock isn't actually // protecting anything. We just want to wait until a response is received. if (pendingReplies.containsKey(id)) return Optional.of(pendingReplies.get(id)); } } @Override public Optional checkReply(long id) { return Optional.ofNullable(pendingReplies.get(id)); } @Override public String submitRequestSync(String req) throws InterruptedException { return awaitReply(submitRequest(req)); } @Override public Either submitRequestSync(String req, TimeUnit unit, long delay) throws InterruptedException { long id = submitRequest(req); Optional rep = awaitReply(id, unit, delay); return rep.isEmpty() ? Either.right(id) : Either.left(rep.get()); } /** * Add a string to be printed next time output is printed. * * @param out The output */ public void addOutput(String out) { pendingOutput.add(out); } /** * Set the mode for handling non-terminal input * * @param mode The new mode for handling non-terminal input */ public void setMode(Consumer mode) { this.mode = mode; } public Writer getStatus() { return status; } public void setStatus(Writer status) { this.status = status; } }