Skip to content

Commit 8dfc016

Browse files
merge threadpool with connection state
1 parent fe41c87 commit 8dfc016

File tree

8 files changed

+470
-421
lines changed

8 files changed

+470
-421
lines changed

.mtslconfig.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{ "ignore_dirs": [".git", "node_modules", "tests"] }
1+
{"ignore_dirs":[".git","node_modules","tests"]}

cpp/ConnectionPool.cpp

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,24 @@
66

77
ConnectionPool::ConnectionPool(std::string dbName, std::string docPath,
88
unsigned int numReadConnections)
9-
: dbName(dbName), maxReads(numReadConnections) {
9+
: dbName(dbName), maxReads(numReadConnections),
10+
writeConnection(dbName, docPath,
11+
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE |
12+
SQLITE_OPEN_FULLMUTEX) {
1013

1114
onContextCallback = nullptr;
1215
isConcurrencyEnabled = maxReads > 0;
1316

14-
// Open the write connection
15-
writeConnection = new ConnectionState(
16-
dbName, docPath,
17-
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX);
18-
17+
readConnections = new ConnectionState *[maxReads];
1918
// Open the read connections
2019
for (int i = 0; i < maxReads; i++) {
21-
readConnections.push_back(new ConnectionState(
22-
dbName, docPath, SQLITE_OPEN_READONLY | SQLITE_OPEN_FULLMUTEX));
20+
readConnections[i] = new ConnectionState(
21+
dbName, docPath, SQLITE_OPEN_READONLY | SQLITE_OPEN_FULLMUTEX);
2322
}
2423

2524
if (true == isConcurrencyEnabled) {
2625
// Write connection WAL setup
27-
writeConnection->queueWork([](sqlite3 *db) {
26+
writeConnection.queueWork([](sqlite3 *db) {
2827
sqliteExecuteLiteralWithDB(db, "PRAGMA journal_mode = WAL;");
2928
sqliteExecuteLiteralWithDB(
3029
db,
@@ -44,10 +43,10 @@ ConnectionPool::ConnectionPool(std::string dbName, std::string docPath,
4443
};
4544

4645
ConnectionPool::~ConnectionPool() {
47-
delete writeConnection;
48-
for (auto con : readConnections) {
49-
delete con;
46+
for (int i = 0; i < maxReads; i++) {
47+
delete readConnections[i];
5048
}
49+
delete readConnections;
5150
}
5251

5352
void ConnectionPool::readLock(ConnectionLockId contextId) {
@@ -65,7 +64,7 @@ void ConnectionPool::readLock(ConnectionLockId contextId) {
6564
for (int i = 0; i < maxReads; i++) {
6665
if (readConnections[i]->isEmptyLock()) {
6766
// There is an open slot
68-
activateContext(readConnections[i], contextId);
67+
activateContext(*readConnections[i], contextId);
6968
return;
7069
}
7170
}
@@ -77,7 +76,7 @@ void ConnectionPool::readLock(ConnectionLockId contextId) {
7776

7877
void ConnectionPool::writeLock(ConnectionLockId contextId) {
7978
// Check if there are any available read connections
80-
if (writeConnection->isEmptyLock()) {
79+
if (writeConnection.isEmptyLock()) {
8180
activateContext(writeConnection, contextId);
8281
return;
8382
}
@@ -90,8 +89,8 @@ SQLiteOPResult
9089
ConnectionPool::queueInContext(ConnectionLockId contextId,
9190
std::function<void(sqlite3 *)> task) {
9291
ConnectionState *state = nullptr;
93-
if (writeConnection->matchesLock(contextId)) {
94-
state = writeConnection;
92+
if (writeConnection.matchesLock(contextId)) {
93+
state = &writeConnection;
9594
} else {
9695
// Check if it's a read connection
9796
for (int i = 0; i < maxReads; i++) {
@@ -124,27 +123,27 @@ void ConnectionPool::setOnContextAvailable(void (*callback)(std::string,
124123
void ConnectionPool::setTableUpdateHandler(
125124
void (*callback)(void *, int, const char *, const char *, sqlite3_int64)) {
126125
// Only the write connection can make changes
127-
sqlite3_update_hook(writeConnection->connection, callback,
126+
sqlite3_update_hook(writeConnection.connection, callback,
128127
(void *)(dbName.c_str()));
129128
}
130129

131130
void ConnectionPool::closeContext(ConnectionLockId contextId) {
132-
if (writeConnection->matchesLock(contextId)) {
131+
if (writeConnection.matchesLock(contextId)) {
133132
if (writeQueue.size() > 0) {
134133
// There are items in the queue, activate the next one
135134
activateContext(writeConnection, writeQueue[0]);
136135
writeQueue.erase(writeQueue.begin());
137136
} else {
138137
// No items in the queue, clear the context
139-
writeConnection->clearLock();
138+
writeConnection.clearLock();
140139
}
141140
} else {
142141
// Check if it's a read connection
143142
for (int i = 0; i < maxReads; i++) {
144143
if (readConnections[i]->matchesLock(contextId)) {
145144
if (readQueue.size() > 0) {
146145
// There are items in the queue, activate the next one
147-
activateContext(readConnections[i], readQueue[0]);
146+
activateContext(*readConnections[i], readQueue[0]);
148147
readQueue.erase(readQueue.begin());
149148
} else {
150149
// No items in the queue, clear the context
@@ -157,7 +156,7 @@ void ConnectionPool::closeContext(ConnectionLockId contextId) {
157156
}
158157

159158
void ConnectionPool::closeAll() {
160-
writeConnection->close();
159+
writeConnection.close();
161160
for (int i = 0; i < maxReads; i++) {
162161
readConnections[i]->close();
163162
}
@@ -177,7 +176,7 @@ SQLiteOPResult ConnectionPool::attachDatabase(std::string const dbFileName,
177176
auto dbConnections = getAllConnections();
178177

179178
for (auto &connectionState : dbConnections) {
180-
if (connectionState->isEmptyLock()) {
179+
if (!connectionState->isEmptyLock()) {
181180
return SQLiteOPResult{
182181
.type = SQLiteError,
183182
.errorMessage = dbName + " was unable to attach another database: " +
@@ -214,7 +213,7 @@ SQLiteOPResult ConnectionPool::detachDatabase(std::string const alias) {
214213
auto dbConnections = getAllConnections();
215214

216215
for (auto &connectionState : dbConnections) {
217-
if (connectionState->isEmptyLock()) {
216+
if (!connectionState->isEmptyLock()) {
218217
return SQLiteOPResult{
219218
.type = SQLiteError,
220219
.errorMessage = dbName + " was unable to detach another database: " +
@@ -243,16 +242,16 @@ SQLiteOPResult ConnectionPool::detachDatabase(std::string const alias) {
243242

244243
std::vector<ConnectionState *> ConnectionPool::getAllConnections() {
245244
std::vector<ConnectionState *> result;
246-
result.push_back(writeConnection);
245+
result.push_back(&writeConnection);
247246
for (int i = 0; i < maxReads; i++) {
248247
result.push_back(readConnections[i]);
249248
}
250249
return result;
251250
}
252251

253-
void ConnectionPool::activateContext(ConnectionState *state,
252+
void ConnectionPool::activateContext(ConnectionState &state,
254253
ConnectionLockId contextId) {
255-
state->activateLock(contextId);
254+
state.activateLock(contextId);
256255

257256
if (onContextCallback != nullptr) {
258257
onContextCallback(dbName, contextId);

cpp/ConnectionPool.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ class ConnectionPool {
5151
private:
5252
int maxReads;
5353
std::string dbName;
54-
std::vector<ConnectionState *> readConnections;
55-
ConnectionState *writeConnection;
54+
ConnectionState **readConnections;
55+
ConnectionState writeConnection;
5656

5757
std::vector<ConnectionLockId> readQueue;
5858
std::vector<ConnectionLockId> writeQueue;
@@ -118,7 +118,7 @@ class ConnectionPool {
118118
private:
119119
std::vector<ConnectionState *> getAllConnections();
120120

121-
void activateContext(ConnectionState *state, ConnectionLockId contextId);
121+
void activateContext(ConnectionState &state, ConnectionLockId contextId);
122122

123123
SQLiteOPResult genericSqliteOpenDb(string const dbName, string const docPath,
124124
sqlite3 **db, int sqlOpenFlags);

cpp/ConnectionState.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,14 @@ ConnectionState::~ConnectionState() {
2929
delete thread;
3030
}
3131

32-
void ConnectionState::clearLock() { _currentLockId = EMPTY_LOCK_ID; }
32+
void ConnectionState::clearLock() {
33+
if (!workQueue.empty()) {
34+
waitFinished();
35+
}
36+
_currentLockId = EMPTY_LOCK_ID;
37+
}
3338

34-
SQLiteOPResult ConnectionState::activateLock(const ConnectionLockId &lockId) {
39+
void ConnectionState::activateLock(const ConnectionLockId &lockId) {
3540
_currentLockId = lockId;
3641
}
3742

@@ -42,7 +47,11 @@ bool ConnectionState::matchesLock(const ConnectionLockId &lockId) {
4247
bool ConnectionState::isEmptyLock() { return _currentLockId == EMPTY_LOCK_ID; }
4348

4449
void ConnectionState::close() {
45-
waitFinished();
50+
if (!workQueue.empty()) {
51+
waitFinished();
52+
}
53+
// So that the thread can stop (if not already)
54+
threadDone = true;
4655
sqlite3_close_v2(connection);
4756
}
4857

@@ -84,6 +93,7 @@ void ConnectionState::doWork() {
8493
task(connection);
8594
--threadBusy;
8695
}
96+
workQueueConditionVariable.notify_all();
8797
}
8898

8999
void ConnectionState::waitFinished() {

cpp/ConnectionState.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class ConnectionState {
3737
~ConnectionState();
3838

3939
void clearLock();
40-
SQLiteOPResult activateLock(const ConnectionLockId &lockId);
40+
void activateLock(const ConnectionLockId &lockId);
4141
bool matchesLock(const ConnectionLockId &lockId);
4242
bool isEmptyLock();
4343

cpp/bindings.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ void osp::install(jsi::Runtime &rt,
471471
module.setProperty(rt, "open", move(open));
472472
module.setProperty(rt, "requestLock", move(requestLock));
473473
module.setProperty(rt, "releaseLock", move(releaseLock));
474-
module.setProperty(rt, "executeInContextAsync", move(executeInContext));
474+
module.setProperty(rt, "executeInContext", move(executeInContext));
475475
module.setProperty(rt, "close", move(close));
476476

477477
module.setProperty(rt, "attach", move(attach));

cpp/sqliteBridge.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ SQLiteOPResult sqliteCloseDb(string const dbName) {
6565
ConnectionPool *connection = dbMap[dbName];
6666

6767
connection->closeAll();
68-
delete connection;
6968
dbMap.erase(dbName);
69+
delete connection;
7070

7171
return SQLiteOPResult{
7272
.type = SQLiteOk,

0 commit comments

Comments
 (0)