diff options
| author | Benjamin Culkin <scorpress@gmail.com> | 2025-12-08 21:37:46 -0500 |
|---|---|---|
| committer | Benjamin Culkin <scorpress@gmail.com> | 2025-12-08 21:37:46 -0500 |
| commit | 85e3e06fef5f52250a40e39e0674886635ac08fa (patch) | |
| tree | ff7507198fd69b8462fb009d2d9541c224d8ccf9 | |
| parent | a8fd4101a74a948904c226a93be6c5b3d057823d (diff) | |
Fix parallelization
Fix the parallelization so that it actually works.
There is still future stuff to do (a way to see overall progress of the
save, not just task-based progress would be nice) but it is certainly a
start
3 files changed, 84 insertions, 57 deletions
diff --git a/firmal/src/main/java/bjc/firmal/Firmal.java b/firmal/src/main/java/bjc/firmal/Firmal.java index 29f0fca..b25d40f 100644 --- a/firmal/src/main/java/bjc/firmal/Firmal.java +++ b/firmal/src/main/java/bjc/firmal/Firmal.java @@ -5,6 +5,7 @@ import java.awt.event.WindowEvent; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -32,7 +33,6 @@ import bjc.functypes.ClosableThrowFunction; import bjc.utils.gui.layout.VLayout; import bjc.utils.gui.panels.BatchTaskProgressPanel; import bjc.utils.gui.panels.SimpleInputPanel; -import bjc.utils.misc.BoundPreparedStatement; import bjc.utils.misc.NamedPreparedStatement; /** @@ -86,20 +86,30 @@ public class Firmal { * single-threaded executor to serialize the execution of queries. * * @param sql The SQL statement to use (use {@link NamedPreparedStatement} syntax) - * @return A function that will execute batches using {@link BoundPreparedStatement} to hold the data + * @param argShape The shape of the arguments for using the external-build functionality + * @return A function that will execute batches using {@link NamedPreparedStatement.Args} to hold the data * @throws SQLException if something went wrong */ - public ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> createQueuedUpdater(String sql) throws SQLException { + public ClosableThrowFunction<List<NamedPreparedStatement.Args>, Future<List<Integer>>, SQLException> + createQueuedUpdater(String sql, NamedPreparedStatement.Args argShape) throws SQLException { if (dbConnection == null) { // Establish connection - we close it elsewhere dbConnection = DriverManager.getConnection(connectURL, connectUser, connectPassword); } - NamedPreparedStatement nameStatement = NamedPreparedStatement.prepare(dbConnection, sql); + NamedPreparedStatement.Executor nameStatement = + NamedPreparedStatement.Executor.create(dbConnection, sql, argShape); - ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> func = + // TODO do something to assist with this swallowing exceptions + ClosableThrowFunction<List<NamedPreparedStatement.Args>, Future<List<Integer>>, SQLException> func = ClosableThrowFunction.bindFunction(record -> { - Future<List<Integer>> result = dbExecutor.submit(() -> nameStatement.executeBatchFromRecord(record)); + Future<List<Integer>> result = dbExecutor.submit(() -> { + List<Integer> resList = new ArrayList<>(record.size()); + + int[] dbRes = nameStatement.executeBatch(record); + for (int i : dbRes) resList.add(i); + return resList; + }); return result; }, nameStatement); @@ -115,10 +125,10 @@ public class Firmal { public BatchTaskProgressPanel.BatchHandle createTaskBatch(String batchTitle) { return taskPanel.startBatch(batchTitle); } - + private void buildGUI() { try { - UIManager.setLookAndFeel(UIManager.getCrossPlatformLookAndFeelClassName()); + UIManager.setLookAndFeel(UIManager.getSystemLookAndFeelClassName()); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InstantiationException e) { diff --git a/firmal/src/main/java/bjc/firmal/SharedDBUpdateFunction.java b/firmal/src/main/java/bjc/firmal/SharedDBUpdateFunction.java deleted file mode 100644 index c36e988..0000000 --- a/firmal/src/main/java/bjc/firmal/SharedDBUpdateFunction.java +++ /dev/null @@ -1,16 +0,0 @@ -package bjc.firmal; - -import java.sql.SQLException; -import java.util.List; -import java.util.concurrent.Future; - -import bjc.functypes.ClosableThrowFunction; -import bjc.utils.misc.BoundPreparedStatement; - -/** - * Alias type for DB updates - */ -public interface SharedDBUpdateFunction extends - ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> { - -} diff --git a/firmal/src/main/java/bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java b/firmal/src/main/java/bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java index 39c98ca..2b96910 100644 --- a/firmal/src/main/java/bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java +++ b/firmal/src/main/java/bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java @@ -11,6 +11,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -32,18 +33,18 @@ import javax.swing.JScrollPane; import javax.swing.JSplitPane; import javax.swing.JTextPane; import javax.swing.ListSelectionModel; +import javax.swing.SwingUtilities; import javax.swing.SwingWorker; import org.json.JSONObject; import org.json.JSONTokener; import bjc.firmal.Firmal; -import bjc.firmal.SharedDBUpdateFunction; import bjc.functypes.ClosableThrowFunction; import bjc.utils.gui.DelegateListCellRenderer; -import bjc.utils.gui.layout.VLayout; import bjc.utils.gui.panels.BatchTaskProgressPanel.BatchHandle; -import bjc.utils.misc.BoundPreparedStatement; +import bjc.utils.misc.NamedPreparedStatement; + import org.json.JSONArray; /** @@ -61,39 +62,47 @@ public class GPTJSONBrowserFrame { */ private final class SaveConversationTask extends SwingWorker<Void, Integer> { private GPTConversationDB conversation; - private ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> insertConvFunc; - private ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> insertMessageFunc; + private ClosableThrowFunction<List<NamedPreparedStatement.Args>, Future<List<Integer>>, SQLException> insertConvFunc; + private ClosableThrowFunction<List<NamedPreparedStatement.Args>, Future<List<Integer>>, SQLException> insertMessageFunc; private String note; + private CountDownLatch msgCounter; /** * Create a DB save worker task * @param conv The conversation to save * @param convUpdate The DB function for conversation updates * @param msgUpdate The DB function for message updates + * @param msgCounter The message counter */ public SaveConversationTask(GPTConversationDB conv, - ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> convUpdate, - ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> msgUpdate) { + ClosableThrowFunction<List<NamedPreparedStatement.Args>, Future<List<Integer>>, SQLException> convUpdate, + ClosableThrowFunction<List<NamedPreparedStatement.Args>, Future<List<Integer>>, SQLException> msgUpdate, + CountDownLatch msgCounter) { this.conversation = conv; this.insertConvFunc = convUpdate; this.insertMessageFunc = msgUpdate; - + this.msgCounter = msgCounter; } + @Override protected Void doInBackground() throws Exception { - BoundPreparedStatement insertConvRecord = new BoundPreparedStatement(); - BoundPreparedStatement insertMessageRecord = new BoundPreparedStatement(); + List<NamedPreparedStatement.Args> insertConvArgs = new ArrayList<>(1); + NamedPreparedStatement.Args.Builder insertConvRecord = NamedPreparedStatement.Args.builder(); insertConvRecord.setString("id", conversation.getID()); insertConvRecord.setString("title", conversation.getTitle()); - insertConvRecord.addBatch(); + insertConvArgs.add(insertConvRecord.build()); List<RawMessageDB> messages = conversation.getMessages(); + List<NamedPreparedStatement.Args> insertMessageArgs = new ArrayList<>(messages.size()); + int totalNumMessages = messages.size(); int currNumMessages = 0; for (RawMessageDB message : messages) { + NamedPreparedStatement.Args.Builder insertMessageRecord = NamedPreparedStatement.Args.builder(); + if (isCancelled()) break; int currProgress = Math.min(100, (currNumMessages / totalNumMessages) * 100); @@ -114,7 +123,7 @@ public class GPTJSONBrowserFrame { // Or do we need to be doing these as independent DB queries instead of batching them? // That sounds rather inefficient, but so is doing a second pass to fill it later // insertMessage.setString("parentid", message.getParentMessageID()); - insertMessageRecord.addBatch(); + insertMessageArgs.add(insertMessageRecord.build()); } else { // TODO: this message has a missing/incorrect parent link } @@ -125,7 +134,7 @@ public class GPTJSONBrowserFrame { note = newNote; // Save our changes for this conversation - Future<List<Integer>> insertConversationResults = insertConvFunc.apply(insertConvRecord); + Future<List<Integer>> insertConversationResults = insertConvFunc.apply(insertConvArgs); for (int i : insertConversationResults.get()) { if (i != 0 && i != 1) { @@ -133,19 +142,13 @@ public class GPTJSONBrowserFrame { } } - // TODO this isn't working right, figure out why - // Moved it here thinking it might have been a concurrency thing, but... - Future<List<Integer>> insertMessageResults = insertMessageFunc.apply(insertMessageRecord); + Future<List<Integer>> insertMessageResults = insertMessageFunc.apply(insertMessageArgs); for (int i : insertMessageResults.get()) { if (i != 0 && i != 1) { // TODO handle oddities } } - // Reset batch records - insertConvRecord = new BoundPreparedStatement(); - insertMessageRecord = new BoundPreparedStatement(); - newNote = "Saved " + BATCH_THRESHOLD + " messages/conversations to the DB"; firePropertyChange("note", note, newNote); @@ -153,6 +156,11 @@ public class GPTJSONBrowserFrame { return null; } + @Override + protected void done() { + msgCounter.countDown(); + } + public String getNote() { return note; } @@ -171,13 +179,24 @@ public class GPTJSONBrowserFrame { @Override public void actionPerformed(ActionEvent aev) { Firmal fm = Firmal.fm; - try (var insertConversation = fm.createQueuedUpdater( + + NamedPreparedStatement.Args.Builder insertConvShape = NamedPreparedStatement.Args.builder(); + insertConvShape.setString("id", null); + insertConvShape.setString("title", null); + + NamedPreparedStatement.Args.Builder insertMessageShape = NamedPreparedStatement.Args.builder(); + insertMessageShape.setString("selfid", null); + insertMessageShape.setString("convid", null); + insertMessageShape.setString("body", null); + + try { + var insertConversation = fm.createQueuedUpdater( "insert into chatgpt.conversations (conversation_id, conversation_title)" - + " values (:id::uuid, :title) on conflict (conversation_id) do nothing"); - var insertMessage = fm.createQueuedUpdater( - "insert into chatgpt.raw_messages (message_id, conversation_id, message_body) " - + "values (:selfid::uuid, :convid::uuid, :body::json)" - + " on conflict (message_id) do nothing")) { + + " values (:id::uuid, :title) on conflict (conversation_id) do nothing", insertConvShape.build()); + var insertMessage = fm.createQueuedUpdater( + "insert into chatgpt.raw_messages (message_id, conversation_id, message_body) " + + "values (:selfid::uuid, :convid::uuid, :body::json)" + + " on conflict (message_id) do nothing", insertMessageShape.build()); Iterator<GPTConversationDB> conversations = conversationListModel.elements().asIterator(); @@ -186,17 +205,37 @@ public class GPTJSONBrowserFrame { int totalConversations = conversationListModel.getSize(); int currConversation = 0; + CountDownLatch msgCounter = new CountDownLatch(totalConversations); + while (conversations.hasNext()) { currConversation++; GPTConversationDB conversation = conversations.next(); - SaveConversationTask saveTask = new SaveConversationTask(conversation, insertConversation, insertMessage); + SaveConversationTask saveTask = new SaveConversationTask(conversation, insertConversation, insertMessage, msgCounter); String taskDesc = "Saving conversation " + currConversation + " of " + totalConversations + ": " + conversation.getTitle(); saveBatch.monitorSwingWorker(saveTask, taskDesc, true); saveTask.execute(); } + + // Make sure our statements are cleaned up once we are done + Thread cleanupThread = new Thread(() -> { + try { + msgCounter.await(); + + insertMessage.close(); + insertConversation.close(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + }); + cleanupThread.start(); } catch (SQLException sqlex) { JFrame mainFrame = null; JDialog errorDialog = new JDialog(mainFrame, "Error interfacing with DB"); @@ -216,12 +255,6 @@ public class GPTJSONBrowserFrame { errorDialog.pack(); errorDialog.setVisible(true); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (ExecutionException e) { - // TODO Auto-generated catch block - e.printStackTrace(); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); |
