summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Culkin <scorpress@gmail.com>2025-12-08 21:37:46 -0500
committerBenjamin Culkin <scorpress@gmail.com>2025-12-08 21:37:46 -0500
commit85e3e06fef5f52250a40e39e0674886635ac08fa (patch)
treeff7507198fd69b8462fb009d2d9541c224d8ccf9
parenta8fd4101a74a948904c226a93be6c5b3d057823d (diff)
Fix parallelization
Fix the parallelization so that it actually works. There is still future stuff to do (a way to see overall progress of the save, not just task-based progress would be nice) but it is certainly a start
-rw-r--r--firmal/src/main/java/bjc/firmal/Firmal.java26
-rw-r--r--firmal/src/main/java/bjc/firmal/SharedDBUpdateFunction.java16
-rw-r--r--firmal/src/main/java/bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java99
3 files changed, 84 insertions, 57 deletions
diff --git a/firmal/src/main/java/bjc/firmal/Firmal.java b/firmal/src/main/java/bjc/firmal/Firmal.java
index 29f0fca..b25d40f 100644
--- a/firmal/src/main/java/bjc/firmal/Firmal.java
+++ b/firmal/src/main/java/bjc/firmal/Firmal.java
@@ -5,6 +5,7 @@ import java.awt.event.WindowEvent;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -32,7 +33,6 @@ import bjc.functypes.ClosableThrowFunction;
import bjc.utils.gui.layout.VLayout;
import bjc.utils.gui.panels.BatchTaskProgressPanel;
import bjc.utils.gui.panels.SimpleInputPanel;
-import bjc.utils.misc.BoundPreparedStatement;
import bjc.utils.misc.NamedPreparedStatement;
/**
@@ -86,20 +86,30 @@ public class Firmal {
* single-threaded executor to serialize the execution of queries.
*
* @param sql The SQL statement to use (use {@link NamedPreparedStatement} syntax)
- * @return A function that will execute batches using {@link BoundPreparedStatement} to hold the data
+ * @param argShape The shape of the arguments for using the external-build functionality
+ * @return A function that will execute batches using {@link NamedPreparedStatement.Args} to hold the data
* @throws SQLException if something went wrong
*/
- public ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> createQueuedUpdater(String sql) throws SQLException {
+ public ClosableThrowFunction<List<NamedPreparedStatement.Args>, Future<List<Integer>>, SQLException>
+ createQueuedUpdater(String sql, NamedPreparedStatement.Args argShape) throws SQLException {
if (dbConnection == null) {
// Establish connection - we close it elsewhere
dbConnection = DriverManager.getConnection(connectURL, connectUser, connectPassword);
}
- NamedPreparedStatement nameStatement = NamedPreparedStatement.prepare(dbConnection, sql);
+ NamedPreparedStatement.Executor nameStatement =
+ NamedPreparedStatement.Executor.create(dbConnection, sql, argShape);
- ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> func =
+ // TODO do something to assist with this swallowing exceptions
+ ClosableThrowFunction<List<NamedPreparedStatement.Args>, Future<List<Integer>>, SQLException> func =
ClosableThrowFunction.bindFunction(record -> {
- Future<List<Integer>> result = dbExecutor.submit(() -> nameStatement.executeBatchFromRecord(record));
+ Future<List<Integer>> result = dbExecutor.submit(() -> {
+ List<Integer> resList = new ArrayList<>(record.size());
+
+ int[] dbRes = nameStatement.executeBatch(record);
+ for (int i : dbRes) resList.add(i);
+ return resList;
+ });
return result;
}, nameStatement);
@@ -115,10 +125,10 @@ public class Firmal {
public BatchTaskProgressPanel.BatchHandle createTaskBatch(String batchTitle) {
return taskPanel.startBatch(batchTitle);
}
-
+
private void buildGUI() {
try {
- UIManager.setLookAndFeel(UIManager.getCrossPlatformLookAndFeelClassName());
+ UIManager.setLookAndFeel(UIManager.getSystemLookAndFeelClassName());
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InstantiationException e) {
diff --git a/firmal/src/main/java/bjc/firmal/SharedDBUpdateFunction.java b/firmal/src/main/java/bjc/firmal/SharedDBUpdateFunction.java
deleted file mode 100644
index c36e988..0000000
--- a/firmal/src/main/java/bjc/firmal/SharedDBUpdateFunction.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package bjc.firmal;
-
-import java.sql.SQLException;
-import java.util.List;
-import java.util.concurrent.Future;
-
-import bjc.functypes.ClosableThrowFunction;
-import bjc.utils.misc.BoundPreparedStatement;
-
-/**
- * Alias type for DB updates
- */
-public interface SharedDBUpdateFunction extends
- ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> {
-
-}
diff --git a/firmal/src/main/java/bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java b/firmal/src/main/java/bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java
index 39c98ca..2b96910 100644
--- a/firmal/src/main/java/bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java
+++ b/firmal/src/main/java/bjc/firmal/gptbrowser/GPTJSONBrowserFrame.java
@@ -11,6 +11,7 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -32,18 +33,18 @@ import javax.swing.JScrollPane;
import javax.swing.JSplitPane;
import javax.swing.JTextPane;
import javax.swing.ListSelectionModel;
+import javax.swing.SwingUtilities;
import javax.swing.SwingWorker;
import org.json.JSONObject;
import org.json.JSONTokener;
import bjc.firmal.Firmal;
-import bjc.firmal.SharedDBUpdateFunction;
import bjc.functypes.ClosableThrowFunction;
import bjc.utils.gui.DelegateListCellRenderer;
-import bjc.utils.gui.layout.VLayout;
import bjc.utils.gui.panels.BatchTaskProgressPanel.BatchHandle;
-import bjc.utils.misc.BoundPreparedStatement;
+import bjc.utils.misc.NamedPreparedStatement;
+
import org.json.JSONArray;
/**
@@ -61,39 +62,47 @@ public class GPTJSONBrowserFrame {
*/
private final class SaveConversationTask extends SwingWorker<Void, Integer> {
private GPTConversationDB conversation;
- private ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> insertConvFunc;
- private ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> insertMessageFunc;
+ private ClosableThrowFunction<List<NamedPreparedStatement.Args>, Future<List<Integer>>, SQLException> insertConvFunc;
+ private ClosableThrowFunction<List<NamedPreparedStatement.Args>, Future<List<Integer>>, SQLException> insertMessageFunc;
private String note;
+ private CountDownLatch msgCounter;
/**
* Create a DB save worker task
* @param conv The conversation to save
* @param convUpdate The DB function for conversation updates
* @param msgUpdate The DB function for message updates
+ * @param msgCounter The message counter
*/
public SaveConversationTask(GPTConversationDB conv,
- ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> convUpdate,
- ClosableThrowFunction<BoundPreparedStatement, Future<List<Integer>>, SQLException> msgUpdate) {
+ ClosableThrowFunction<List<NamedPreparedStatement.Args>, Future<List<Integer>>, SQLException> convUpdate,
+ ClosableThrowFunction<List<NamedPreparedStatement.Args>, Future<List<Integer>>, SQLException> msgUpdate,
+ CountDownLatch msgCounter) {
this.conversation = conv;
this.insertConvFunc = convUpdate;
this.insertMessageFunc = msgUpdate;
-
+ this.msgCounter = msgCounter;
}
+
@Override
protected Void doInBackground() throws Exception {
- BoundPreparedStatement insertConvRecord = new BoundPreparedStatement();
- BoundPreparedStatement insertMessageRecord = new BoundPreparedStatement();
+ List<NamedPreparedStatement.Args> insertConvArgs = new ArrayList<>(1);
+ NamedPreparedStatement.Args.Builder insertConvRecord = NamedPreparedStatement.Args.builder();
insertConvRecord.setString("id", conversation.getID());
insertConvRecord.setString("title", conversation.getTitle());
- insertConvRecord.addBatch();
+ insertConvArgs.add(insertConvRecord.build());
List<RawMessageDB> messages = conversation.getMessages();
+ List<NamedPreparedStatement.Args> insertMessageArgs = new ArrayList<>(messages.size());
+
int totalNumMessages = messages.size();
int currNumMessages = 0;
for (RawMessageDB message : messages) {
+ NamedPreparedStatement.Args.Builder insertMessageRecord = NamedPreparedStatement.Args.builder();
+
if (isCancelled()) break;
int currProgress = Math.min(100, (currNumMessages / totalNumMessages) * 100);
@@ -114,7 +123,7 @@ public class GPTJSONBrowserFrame {
// Or do we need to be doing these as independent DB queries instead of batching them?
// That sounds rather inefficient, but so is doing a second pass to fill it later
// insertMessage.setString("parentid", message.getParentMessageID());
- insertMessageRecord.addBatch();
+ insertMessageArgs.add(insertMessageRecord.build());
} else {
// TODO: this message has a missing/incorrect parent link
}
@@ -125,7 +134,7 @@ public class GPTJSONBrowserFrame {
note = newNote;
// Save our changes for this conversation
- Future<List<Integer>> insertConversationResults = insertConvFunc.apply(insertConvRecord);
+ Future<List<Integer>> insertConversationResults = insertConvFunc.apply(insertConvArgs);
for (int i : insertConversationResults.get()) {
if (i != 0 && i != 1) {
@@ -133,19 +142,13 @@ public class GPTJSONBrowserFrame {
}
}
- // TODO this isn't working right, figure out why
- // Moved it here thinking it might have been a concurrency thing, but...
- Future<List<Integer>> insertMessageResults = insertMessageFunc.apply(insertMessageRecord);
+ Future<List<Integer>> insertMessageResults = insertMessageFunc.apply(insertMessageArgs);
for (int i : insertMessageResults.get()) {
if (i != 0 && i != 1) {
// TODO handle oddities
}
}
- // Reset batch records
- insertConvRecord = new BoundPreparedStatement();
- insertMessageRecord = new BoundPreparedStatement();
-
newNote = "Saved " + BATCH_THRESHOLD + " messages/conversations to the DB";
firePropertyChange("note", note, newNote);
@@ -153,6 +156,11 @@ public class GPTJSONBrowserFrame {
return null;
}
+ @Override
+ protected void done() {
+ msgCounter.countDown();
+ }
+
public String getNote() {
return note;
}
@@ -171,13 +179,24 @@ public class GPTJSONBrowserFrame {
@Override
public void actionPerformed(ActionEvent aev) {
Firmal fm = Firmal.fm;
- try (var insertConversation = fm.createQueuedUpdater(
+
+ NamedPreparedStatement.Args.Builder insertConvShape = NamedPreparedStatement.Args.builder();
+ insertConvShape.setString("id", null);
+ insertConvShape.setString("title", null);
+
+ NamedPreparedStatement.Args.Builder insertMessageShape = NamedPreparedStatement.Args.builder();
+ insertMessageShape.setString("selfid", null);
+ insertMessageShape.setString("convid", null);
+ insertMessageShape.setString("body", null);
+
+ try {
+ var insertConversation = fm.createQueuedUpdater(
"insert into chatgpt.conversations (conversation_id, conversation_title)"
- + " values (:id::uuid, :title) on conflict (conversation_id) do nothing");
- var insertMessage = fm.createQueuedUpdater(
- "insert into chatgpt.raw_messages (message_id, conversation_id, message_body) "
- + "values (:selfid::uuid, :convid::uuid, :body::json)"
- + " on conflict (message_id) do nothing")) {
+ + " values (:id::uuid, :title) on conflict (conversation_id) do nothing", insertConvShape.build());
+ var insertMessage = fm.createQueuedUpdater(
+ "insert into chatgpt.raw_messages (message_id, conversation_id, message_body) "
+ + "values (:selfid::uuid, :convid::uuid, :body::json)"
+ + " on conflict (message_id) do nothing", insertMessageShape.build());
Iterator<GPTConversationDB> conversations = conversationListModel.elements().asIterator();
@@ -186,17 +205,37 @@ public class GPTJSONBrowserFrame {
int totalConversations = conversationListModel.getSize();
int currConversation = 0;
+ CountDownLatch msgCounter = new CountDownLatch(totalConversations);
+
while (conversations.hasNext()) {
currConversation++;
GPTConversationDB conversation = conversations.next();
- SaveConversationTask saveTask = new SaveConversationTask(conversation, insertConversation, insertMessage);
+ SaveConversationTask saveTask = new SaveConversationTask(conversation, insertConversation, insertMessage, msgCounter);
String taskDesc = "Saving conversation " + currConversation + " of " + totalConversations + ": " + conversation.getTitle();
saveBatch.monitorSwingWorker(saveTask, taskDesc, true);
saveTask.execute();
}
+
+ // Make sure our statements are cleaned up once we are done
+ Thread cleanupThread = new Thread(() -> {
+ try {
+ msgCounter.await();
+
+ insertMessage.close();
+ insertConversation.close();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ });
+ cleanupThread.start();
} catch (SQLException sqlex) {
JFrame mainFrame = null;
JDialog errorDialog = new JDialog(mainFrame, "Error interfacing with DB");
@@ -216,12 +255,6 @@ public class GPTJSONBrowserFrame {
errorDialog.pack();
errorDialog.setVisible(true);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (ExecutionException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();