From 75cb1cf85dacefce863cbacea57dff33ccf39e5f Mon Sep 17 00:00:00 2001 From: Benjamin Culkin Date: Sun, 7 Dec 2025 21:12:19 -0500 Subject: Part one of DB updates - Centralization Part one of the DB updates - switching to using a centralized source that uses a executor to parallelize. I'd imagine the performance would be more arse than it already was if I ran it now, which is why the next step is to change the save action to use SwingWorker and be parallelized per conversation. After that, set-up the UI for BatchTaskProgressPanel, and configure those SwingWorkers to use it --- firmal/src/main/java/bjc/firmal/Firmal.java | 54 ++++++++++++++- .../java/bjc/firmal/SharedDBUpdateFunction.java | 16 +++++ .../bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java | 76 +++++++++++++--------- firmal/src/main/java/module-info.java | 2 +- 4 files changed, 114 insertions(+), 34 deletions(-) create mode 100644 firmal/src/main/java/bjc/firmal/SharedDBUpdateFunction.java diff --git a/firmal/src/main/java/bjc/firmal/Firmal.java b/firmal/src/main/java/bjc/firmal/Firmal.java index 6e99889..9e9659f 100644 --- a/firmal/src/main/java/bjc/firmal/Firmal.java +++ b/firmal/src/main/java/bjc/firmal/Firmal.java @@ -5,6 +5,11 @@ import java.awt.event.WindowEvent; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import javax.swing.JButton; import javax.swing.JDialog; @@ -20,11 +25,14 @@ import javax.swing.JPasswordField; import javax.swing.SwingUtilities; import javax.swing.UIManager; import javax.swing.UnsupportedLookAndFeelException; -import javax.swing.WindowConstants; import bjc.firmal.gptbrowser.GPTJSONBrowserFrame; +import bjc.functypes.ClosableFunction; +import bjc.functypes.ClosableThrowFunction; import bjc.utils.gui.layout.VLayout; import bjc.utils.gui.panels.SimpleInputPanel; +import bjc.utils.misc.BoundPreparedStatement; +import bjc.utils.misc.NamedPreparedStatement; /** * Main class for Firmal. @@ -46,6 +54,8 @@ public class Firmal { private Connection dbConnection; + private final ExecutorService dbExecutor; + /** * The public instance of the application */ @@ -59,6 +69,40 @@ public class Firmal { SwingUtilities.invokeLater(() -> fm.buildGUI()); } + /** + * Create a new Firmal instance + */ + public Firmal() { + dbExecutor = Executors.newSingleThreadExecutor(); + } + + /** + * Create a function that batches updates to the database from multiple threads. + * + * This function is thread-safe, though that's because it (behind the scenes), uses an + * single-threaded executor to serialize the execution of queries. + * + * @param sql The SQL statement to use (use {@link NamedPreparedStatement} syntax) + * @return A function that will execute batches using {@link BoundPreparedStatement} to hold the data + * @throws SQLException if something went wrong + */ + public SharedDBUpdateFunction createQueuedUpdater(String sql) throws SQLException { + if (dbConnection == null) { + // Establish connection - we close it elsewhere + dbConnection = DriverManager.getConnection(connectURL, connectUser, connectPassword); + } + + NamedPreparedStatement nameStatement = NamedPreparedStatement.prepare(dbConnection, sql); + + ClosableThrowFunction>, SQLException> func = + ClosableThrowFunction.bindFunction(record -> { + Future> result = dbExecutor.submit(() -> nameStatement.executeBatchFromRecord(record)); + return result; + }, nameStatement); + + return (SharedDBUpdateFunction) func; + } + private void buildGUI() { try { UIManager.setLookAndFeel(UIManager.getCrossPlatformLookAndFeelClassName()); @@ -83,6 +127,14 @@ public class Firmal { } catch (Exception e) { e.printStackTrace(); } + if (!dbExecutor.isShutdown()) { + dbExecutor.shutdown(); + try { + dbExecutor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } mainFrame.dispose(); // TODO verify if we need to call System.exit() here, or if disposing our only window is enough } diff --git a/firmal/src/main/java/bjc/firmal/SharedDBUpdateFunction.java b/firmal/src/main/java/bjc/firmal/SharedDBUpdateFunction.java new file mode 100644 index 0000000..c36e988 --- /dev/null +++ b/firmal/src/main/java/bjc/firmal/SharedDBUpdateFunction.java @@ -0,0 +1,16 @@ +package bjc.firmal; + +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.Future; + +import bjc.functypes.ClosableThrowFunction; +import bjc.utils.misc.BoundPreparedStatement; + +/** + * Alias type for DB updates + */ +public interface SharedDBUpdateFunction extends + ClosableThrowFunction>, SQLException> { + +} diff --git a/firmal/src/main/java/bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java b/firmal/src/main/java/bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java index 0a87390..64bccf0 100644 --- a/firmal/src/main/java/bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java +++ b/firmal/src/main/java/bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java @@ -16,6 +16,8 @@ import java.util.ArrayList; import java.util.Enumeration; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import javax.swing.DefaultListModel; import javax.swing.JButton; @@ -45,12 +47,15 @@ import javax.swing.SwingWorker; import org.json.JSONObject; import org.json.JSONTokener; +import bjc.firmal.Firmal; +import bjc.firmal.SharedDBUpdateFunction; import bjc.utils.gui.DelegateListCellRenderer; import bjc.utils.gui.SimpleJList; import bjc.utils.gui.layout.HLayout; import bjc.utils.gui.layout.VLayout; import bjc.utils.gui.panels.SimpleInputPanel; import bjc.utils.ioutils.TextAreaOutputStream; +import bjc.utils.misc.BoundPreparedStatement; import bjc.utils.misc.NamedPreparedStatement; import org.json.JSONArray; @@ -60,11 +65,6 @@ public class GPTJSONBrowserFrame { private JList conversationListUI; private DefaultListModel conversationListModel; - // DB Connection details - private String connectURL; - private String connectUser; - private String connectPassword; - private final class SaveConversationTask extends SwingWorker, Integer> { @Override @@ -83,15 +83,14 @@ public class GPTJSONBrowserFrame { @Override public void actionPerformed(ActionEvent aev) { - try (Connection testConnection = DriverManager.getConnection(connectURL, connectUser, connectPassword)) { - NamedPreparedStatement insertConversation = NamedPreparedStatement.prepare(testConnection, - "insert into chatgpt.conversations (conversation_id, conversation_title) values (:id::uuid, :title)" - + " on conflict (conversation_id) do nothing"); - NamedPreparedStatement insertMessage = NamedPreparedStatement.prepare(testConnection, - // TODO fix not passing parent_message b/c of constraint reasons - "insert into chatgpt.raw_messages (message_id, conversation_id, message_body) " - + "values (:selfid::uuid, :convid::uuid, :body::json)" - + " on conflict (message_id) do nothing"); + Firmal fm = Firmal.fm; + try (SharedDBUpdateFunction insertConversation = fm.createQueuedUpdater( + "insert into chatgpt.conversations (conversation_id, conversation_title)" + + " values (:id::uuid, :title) on conflict (conversation_id) do nothing"); + SharedDBUpdateFunction insertMessage = fm.createQueuedUpdater( + "insert into chatgpt.raw_messages (message_id, conversation_id, message_body) " + + "values (:selfid::uuid, :convid::uuid, :body::json)" + + " on conflict (message_id) do nothing")) { Iterator conversations = conversationListModel.elements().asIterator(); int totalNumConversations = conversationListModel.getSize(); @@ -143,6 +142,9 @@ public class GPTJSONBrowserFrame { int overallMessageCount = 0; final int BATCH_THRESHOLD = 500; + BoundPreparedStatement insertConvRecord = new BoundPreparedStatement(); + BoundPreparedStatement insertMessageRecord = new BoundPreparedStatement(); + while (conversations.hasNext()) { currNumConversations++; @@ -155,9 +157,9 @@ public class GPTJSONBrowserFrame { System.out.println("Saving Conversation " + currNumConversations + ": " + conversation.getTitle()); - insertConversation.setString("id", conversation.getID()); - insertConversation.setString("title", conversation.getTitle()); - insertConversation.addBatch(); + insertConvRecord.setString("id", conversation.getID()); + insertConvRecord.setString("title", conversation.getTitle()); + insertConvRecord.addBatch(); List messages = conversation.getMessages(); int totalNumMessages = messages.size(); @@ -173,15 +175,15 @@ public class GPTJSONBrowserFrame { // DEBUG System.out.println("Saving message " + currNumMessages + " of " + totalNumMessages); - insertMessage.setString("selfid", message.getMessageID()); - insertMessage.setString("convid", message.getConversationID()); - insertMessage.setString("body", message.getMessageBody()); + insertMessageRecord.setString("selfid", message.getMessageID()); + insertMessageRecord.setString("convid", message.getConversationID()); + insertMessageRecord.setString("body", message.getMessageBody()); // TODO figure out why we are getting constraint violations here. // Do we really need to leave this null initially, then backfill it? // Or do we need to be doing these as independent DB queries instead of batching them? // That sounds rather inefficient, but so is doing a second pass to fill it later // insertMessage.setString("parentid", message.getParentMessageID()); - insertMessage.addBatch(); + insertMessageRecord.addBatch(); if (overallMessageCount == BATCH_THRESHOLD) { overallMessageCount = 0; @@ -190,21 +192,25 @@ public class GPTJSONBrowserFrame { System.out.println("Starting save of " + BATCH_THRESHOLD + " messages/conversations to the DB"); // Commit what we have so far to prevent overloading. - int[] insertConversationResults = insertConversation.executeBatch(); - int[] insertMessageResults = insertMessage.executeBatch(); + Future> insertConversationResults = insertConversation.apply(insertConvRecord); + Future> insertMessageResults = insertMessage.apply(insertMessageRecord); - for (int i : insertConversationResults) { + for (int i : insertConversationResults.get()) { if (i != 0 && i != 1) { // TODO: do something about an oddity } } - for (int i : insertMessageResults) { + for (int i : insertMessageResults.get()) { if (i != 0 && i != 1) { // TODO: do something about an oddity } } + // Reset batch records + insertConvRecord = new BoundPreparedStatement(); + insertMessageRecord = new BoundPreparedStatement(); + // DEBUG System.out.println("Saved " + BATCH_THRESHOLD + " messages/conversations to the DB"); } @@ -215,25 +221,22 @@ public class GPTJSONBrowserFrame { } // Prepare to save things - int[] insertConversationResults = insertConversation.executeBatch(); - int[] insertMessageResults = insertMessage.executeBatch(); + Future> insertConversationResults = insertConversation.apply(insertConvRecord); + Future> insertMessageResults = insertMessage.apply(insertMessageRecord); - for (int i : insertConversationResults) { + for (int i : insertConversationResults.get()) { if (i != 0 && i != 1) { // TODO: do something about an oddity } } - for (int i : insertMessageResults) { + for (int i : insertMessageResults.get()) { if (i != 0 && i != 1) { // TODO: do something about an oddity } } okButton.setEnabled(true); - - insertMessage.close(); - insertConversation.close(); } catch (SQLException sqlex) { JDialog errorDialog = new JDialog(parentFrame, "Error interfacing with DB"); @@ -252,6 +255,15 @@ public class GPTJSONBrowserFrame { errorDialog.pack(); errorDialog.setVisible(true); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (Exception e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); } } } diff --git a/firmal/src/main/java/module-info.java b/firmal/src/main/java/module-info.java index 7c434fa..7719a71 100644 --- a/firmal/src/main/java/module-info.java +++ b/firmal/src/main/java/module-info.java @@ -7,7 +7,7 @@ module firmal { exports bjc.firmal; - requires bjc.utils; + requires transitive bjc.utils; requires esodata; requires transitive java.desktop; requires json; -- cgit v1.2.3