Skip to content

Commit fe41c87

Browse files
wip threaded connection state
1 parent 71b87a7 commit fe41c87

16 files changed

+721
-1099
lines changed

android/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ add_library(
2929
../cpp/fileUtils.cpp
3030
../cpp/sqliteExecute.h
3131
../cpp/sqliteExecute.cpp
32-
../cpp/ThreadPool.h
33-
../cpp/ThreadPool.cpp
3432
../cpp/sqlbatchexecutor.h
3533
../cpp/sqlbatchexecutor.cpp
3634
../cpp/macros.h
3735
../cpp/ConnectionPool.cpp
3836
../cpp/ConnectionPool.h
37+
../cpp/ConnectionState.cpp
38+
../cpp/ConnectionState.h
3939
cpp-adapter.cpp
4040
)
4141

cpp/ConnectionPool.cpp

Lines changed: 60 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
#include "sqlite3.h"
44
#include "sqliteBridge.h"
55
#include "sqliteExecute.h"
6-
#include <fstream>
7-
#include <iostream>
86

97
ConnectionPool::ConnectionPool(std::string dbName, std::string docPath,
108
unsigned int numReadConnections)
@@ -13,49 +11,44 @@ ConnectionPool::ConnectionPool(std::string dbName, std::string docPath,
1311
onContextCallback = nullptr;
1412
isConcurrencyEnabled = maxReads > 0;
1513

16-
struct ConnectionState writeCon;
17-
writeCon.connection = nullptr;
18-
writeCon.currentLockId = EMPTY_LOCK_ID;
19-
20-
writeConnection = writeCon;
21-
2214
// Open the write connection
23-
genericSqliteOpenDb(dbName, docPath, &writeConnection.connection,
24-
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE |
25-
SQLITE_OPEN_FULLMUTEX);
15+
writeConnection = new ConnectionState(
16+
dbName, docPath,
17+
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX);
2618

2719
// Open the read connections
2820
for (int i = 0; i < maxReads; i++) {
29-
sqlite3 *db;
30-
genericSqliteOpenDb(dbName, docPath, &db,
31-
SQLITE_OPEN_READONLY | SQLITE_OPEN_FULLMUTEX);
32-
struct ConnectionState readCon;
33-
readCon.connection = db;
34-
readCon.currentLockId = EMPTY_LOCK_ID;
35-
readConnections.push_back(readCon);
21+
readConnections.push_back(new ConnectionState(
22+
dbName, docPath, SQLITE_OPEN_READONLY | SQLITE_OPEN_FULLMUTEX));
3623
}
3724

3825
if (true == isConcurrencyEnabled) {
39-
// Write connection
40-
sqliteExecuteLiteralWithDB(this->writeConnection.connection,
41-
"PRAGMA journal_mode = WAL;");
42-
sqliteExecuteLiteralWithDB(
43-
this->writeConnection.connection,
44-
"PRAGMA journal_size_limit = 6291456"); // 6Mb 1.5x default checkpoint
45-
// size
46-
// Default to normal on all connections
47-
sqliteExecuteLiteralWithDB(this->writeConnection.connection,
48-
"PRAGMA synchronous = NORMAL;");
49-
50-
// Read connections
26+
// Write connection WAL setup
27+
writeConnection->queueWork([](sqlite3 *db) {
28+
sqliteExecuteLiteralWithDB(db, "PRAGMA journal_mode = WAL;");
29+
sqliteExecuteLiteralWithDB(
30+
db,
31+
"PRAGMA journal_size_limit = 6291456"); // 6Mb 1.5x default checkpoint
32+
// size
33+
// Default to normal on all connections
34+
sqliteExecuteLiteralWithDB(db, "PRAGMA synchronous = NORMAL;");
35+
});
36+
37+
// Read connections WAL setup
5138
for (int i = 0; i < this->maxReads; i++) {
52-
sqliteExecuteLiteralWithDB(this->readConnections[i].connection,
53-
"PRAGMA synchronous = NORMAL;");
39+
readConnections[i]->queueWork([](sqlite3 *db) {
40+
sqliteExecuteLiteralWithDB(db, "PRAGMA synchronous = NORMAL;");
41+
});
5442
}
5543
}
5644
};
5745

58-
ConnectionPool::~ConnectionPool() {}
46+
ConnectionPool::~ConnectionPool() {
47+
delete writeConnection;
48+
for (auto con : readConnections) {
49+
delete con;
50+
}
51+
}
5952

6053
void ConnectionPool::readLock(ConnectionLockId contextId) {
6154
// Maintain compatibility if no concurrent read connections are present
@@ -70,9 +63,9 @@ void ConnectionPool::readLock(ConnectionLockId contextId) {
7063
} else {
7164
// Check if there are open slots
7265
for (int i = 0; i < maxReads; i++) {
73-
if (readConnections[i].currentLockId == EMPTY_LOCK_ID) {
66+
if (readConnections[i]->isEmptyLock()) {
7467
// There is an open slot
75-
activateContext(&readConnections[i], contextId);
68+
activateContext(readConnections[i], contextId);
7669
return;
7770
}
7871
}
@@ -84,66 +77,43 @@ void ConnectionPool::readLock(ConnectionLockId contextId) {
8477

8578
void ConnectionPool::writeLock(ConnectionLockId contextId) {
8679
// Check if there are any available read connections
87-
if (writeConnection.currentLockId == EMPTY_LOCK_ID) {
88-
activateContext(&writeConnection, contextId);
80+
if (writeConnection->isEmptyLock()) {
81+
activateContext(writeConnection, contextId);
8982
return;
9083
}
9184

9285
// If we made it here, there were no open slots, need to queue
9386
writeQueue.push_back(contextId);
9487
}
9588

96-
SQLiteOPResult ConnectionPool::executeInContext(
97-
ConnectionLockId contextId, string const &query, vector<QuickValue> *params,
98-
vector<map<string, QuickValue>> *results,
99-
vector<QuickColumnMetadata> *metadata) {
100-
sqlite3 *db = nullptr;
101-
if (writeConnection.currentLockId == contextId) {
102-
db = writeConnection.connection;
89+
SQLiteOPResult
90+
ConnectionPool::queueInContext(ConnectionLockId contextId,
91+
std::function<void(sqlite3 *)> task) {
92+
ConnectionState *state = nullptr;
93+
if (writeConnection->matchesLock(contextId)) {
94+
state = writeConnection;
10395
} else {
10496
// Check if it's a read connection
10597
for (int i = 0; i < maxReads; i++) {
106-
if (readConnections[i].currentLockId == contextId) {
107-
db = readConnections[i].connection;
98+
if (readConnections[i]->matchesLock(contextId)) {
99+
state = readConnections[i];
108100
break;
109101
}
110102
}
111103
}
112-
if (db == nullptr) {
113-
// throw error that context is not available
104+
if (state == nullptr) {
105+
// return error that context is not available
114106
return SQLiteOPResult{
115107
.errorMessage = "Context is no longer available",
116108
.type = SQLiteError,
117109
};
118110
}
119111

120-
return sqliteExecuteWithDB(db, query, params, results, metadata);
121-
}
122-
123-
SequelLiteralUpdateResult
124-
ConnectionPool::executeLiteralInContext(ConnectionLockId contextId,
125-
string const &query) {
126-
sqlite3 *db = nullptr;
127-
if (writeConnection.currentLockId == contextId) {
128-
db = writeConnection.connection;
129-
} else {
130-
// Check if it's a read connection
131-
for (int i = 0; i < maxReads; i++) {
132-
if (readConnections[i].currentLockId == contextId) {
133-
db = readConnections[i].connection;
134-
break;
135-
}
136-
}
137-
}
138-
if (db == nullptr) {
139-
// throw error that context is not available
140-
return SequelLiteralUpdateResult{
141-
.type = SQLiteError,
142-
.message = "Context is no longer available",
143-
};
144-
}
112+
state->queueWork(task);
145113

146-
return sqliteExecuteLiteralWithDB(db, query);
114+
return SQLiteOPResult{
115+
.type = SQLiteOk,
116+
};
147117
}
148118

149119
void ConnectionPool::setOnContextAvailable(void (*callback)(std::string,
@@ -154,31 +124,31 @@ void ConnectionPool::setOnContextAvailable(void (*callback)(std::string,
154124
void ConnectionPool::setTableUpdateHandler(
155125
void (*callback)(void *, int, const char *, const char *, sqlite3_int64)) {
156126
// Only the write connection can make changes
157-
sqlite3_update_hook(writeConnection.connection, callback,
127+
sqlite3_update_hook(writeConnection->connection, callback,
158128
(void *)(dbName.c_str()));
159129
}
160130

161131
void ConnectionPool::closeContext(ConnectionLockId contextId) {
162-
if (writeConnection.currentLockId == contextId) {
132+
if (writeConnection->matchesLock(contextId)) {
163133
if (writeQueue.size() > 0) {
164134
// There are items in the queue, activate the next one
165-
activateContext(&writeConnection, writeQueue[0]);
135+
activateContext(writeConnection, writeQueue[0]);
166136
writeQueue.erase(writeQueue.begin());
167137
} else {
168138
// No items in the queue, clear the context
169-
writeConnection.currentLockId = EMPTY_LOCK_ID;
139+
writeConnection->clearLock();
170140
}
171141
} else {
172142
// Check if it's a read connection
173143
for (int i = 0; i < maxReads; i++) {
174-
if (readConnections[i].currentLockId == contextId) {
144+
if (readConnections[i]->matchesLock(contextId)) {
175145
if (readQueue.size() > 0) {
176146
// There are items in the queue, activate the next one
177-
activateContext(&readConnections[i], readQueue[0]);
147+
activateContext(readConnections[i], readQueue[0]);
178148
readQueue.erase(readQueue.begin());
179149
} else {
180150
// No items in the queue, clear the context
181-
readConnections[i].currentLockId = EMPTY_LOCK_ID;
151+
readConnections[i]->clearLock();
182152
}
183153
return;
184154
}
@@ -187,9 +157,9 @@ void ConnectionPool::closeContext(ConnectionLockId contextId) {
187157
}
188158

189159
void ConnectionPool::closeAll() {
190-
sqlite3_close_v2(writeConnection.connection);
160+
writeConnection->close();
191161
for (int i = 0; i < maxReads; i++) {
192-
sqlite3_close_v2(readConnections[i].connection);
162+
readConnections[i]->close();
193163
}
194164
}
195165

@@ -207,7 +177,7 @@ SQLiteOPResult ConnectionPool::attachDatabase(std::string const dbFileName,
207177
auto dbConnections = getAllConnections();
208178

209179
for (auto &connectionState : dbConnections) {
210-
if (connectionState.currentLockId != EMPTY_LOCK_ID) {
180+
if (connectionState->isEmptyLock()) {
211181
return SQLiteOPResult{
212182
.type = SQLiteError,
213183
.errorMessage = dbName + " was unable to attach another database: " +
@@ -218,7 +188,7 @@ SQLiteOPResult ConnectionPool::attachDatabase(std::string const dbFileName,
218188

219189
for (auto &connectionState : dbConnections) {
220190
SequelLiteralUpdateResult result =
221-
sqliteExecuteLiteralWithDB(connectionState.connection, statement);
191+
sqliteExecuteLiteralWithDB(connectionState->connection, statement);
222192
if (result.type == SQLiteError) {
223193
// Revert change on any successful connections
224194
detachDatabase(alias);
@@ -244,7 +214,7 @@ SQLiteOPResult ConnectionPool::detachDatabase(std::string const alias) {
244214
auto dbConnections = getAllConnections();
245215

246216
for (auto &connectionState : dbConnections) {
247-
if (connectionState.currentLockId != EMPTY_LOCK_ID) {
217+
if (connectionState->isEmptyLock()) {
248218
return SQLiteOPResult{
249219
.type = SQLiteError,
250220
.errorMessage = dbName + " was unable to detach another database: " +
@@ -255,7 +225,7 @@ SQLiteOPResult ConnectionPool::detachDatabase(std::string const alias) {
255225

256226
for (auto &connectionState : dbConnections) {
257227
SequelLiteralUpdateResult result =
258-
sqliteExecuteLiteralWithDB(connectionState.connection, statement);
228+
sqliteExecuteLiteralWithDB(connectionState->connection, statement);
259229
if (result.type == SQLiteError) {
260230
return SQLiteOPResult{
261231
.type = SQLiteError,
@@ -269,53 +239,10 @@ SQLiteOPResult ConnectionPool::detachDatabase(std::string const alias) {
269239
};
270240
}
271241

272-
SequelBatchOperationResult
273-
ConnectionPool::importSQLFile(std::string fileLocation) {
274-
std::string line;
275-
std::ifstream sqFile(fileLocation);
276-
277-
if (sqFile.is_open()) {
278-
sqlite3 *connection = writeConnection.connection;
279-
try {
280-
int affectedRows = 0;
281-
int commands = 0;
282-
sqliteExecuteLiteralWithDB(connection, "BEGIN EXCLUSIVE TRANSACTION");
283-
while (std::getline(sqFile, line, '\n')) {
284-
if (!line.empty()) {
285-
SequelLiteralUpdateResult result =
286-
sqliteExecuteLiteralWithDB(connection, line);
287-
if (result.type == SQLiteError) {
288-
sqliteExecuteLiteralWithDB(connection, "ROLLBACK");
289-
sqFile.close();
290-
return {SQLiteError, result.message, 0, commands};
291-
} else {
292-
affectedRows += result.affectedRows;
293-
commands++;
294-
}
295-
}
296-
}
297-
sqFile.close();
298-
sqliteExecuteLiteralWithDB(connection, "COMMIT");
299-
return {SQLiteOk, "", affectedRows, commands};
300-
} catch (...) {
301-
sqFile.close();
302-
sqliteExecuteLiteralWithDB(connection, "ROLLBACK");
303-
return {SQLiteError,
304-
"[react-native-quick-sqlite][loadSQLFile] Unexpected error, "
305-
"transaction was rolledback",
306-
0, 0};
307-
}
308-
} else {
309-
return {SQLiteError,
310-
"[react-native-quick-sqlite][loadSQLFile] Could not open file", 0,
311-
0};
312-
}
313-
}
314-
315242
// ===================== Private ===============
316243

317-
std::vector<ConnectionState> ConnectionPool::getAllConnections() {
318-
std::vector<ConnectionState> result;
244+
std::vector<ConnectionState *> ConnectionPool::getAllConnections() {
245+
std::vector<ConnectionState *> result;
319246
result.push_back(writeConnection);
320247
for (int i = 0; i < maxReads; i++) {
321248
result.push_back(readConnections[i]);
@@ -325,26 +252,9 @@ std::vector<ConnectionState> ConnectionPool::getAllConnections() {
325252

326253
void ConnectionPool::activateContext(ConnectionState *state,
327254
ConnectionLockId contextId) {
328-
state->currentLockId = contextId;
255+
state->activateLock(contextId);
329256

330257
if (onContextCallback != nullptr) {
331258
onContextCallback(dbName, contextId);
332259
}
333260
}
334-
335-
SQLiteOPResult ConnectionPool::genericSqliteOpenDb(string const dbName,
336-
string const docPath,
337-
sqlite3 **db,
338-
int sqlOpenFlags) {
339-
string dbPath = get_db_path(dbName, docPath);
340-
341-
int exit = 0;
342-
exit = sqlite3_open_v2(dbPath.c_str(), db, sqlOpenFlags, nullptr);
343-
344-
if (exit != SQLITE_OK) {
345-
return SQLiteOPResult{.type = SQLiteError,
346-
.errorMessage = sqlite3_errmsg(*db)};
347-
}
348-
349-
return SQLiteOPResult{.type = SQLiteOk, .rowsAffected = 0};
350-
}

0 commit comments

Comments
 (0)