Skip to content

Commit 6339bad

Browse files
authored
Refactoring errors (#20)
* Remove Raise from JVM * Update abi * Update abi
1 parent c8c48cc commit 6339bad

23 files changed

Lines changed: 148 additions & 490 deletions

File tree

delayedqueue-jvm/api/delayedqueue-jvm.api

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ public final class org/funfix/delayedqueue/jvm/OfferOutcome$Updated : org/funfix
399399
public fun toString ()Ljava/lang/String;
400400
}
401401

402-
public final class org/funfix/delayedqueue/jvm/ResourceUnavailableException : java/lang/Exception {
402+
public final class org/funfix/delayedqueue/jvm/ResourceUnavailableException : java/io/IOException {
403403
public fun <init> (Ljava/lang/String;Ljava/lang/Throwable;)V
404404
}
405405

delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt

Lines changed: 26 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.funfix.delayedqueue.jvm
1818

1919
import java.security.MessageDigest
20-
import java.sql.SQLException
2120
import java.time.Clock
2221
import java.time.Instant
2322
import java.util.UUID
@@ -45,8 +44,6 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SqliteMigrations
4544
import org.funfix.delayedqueue.jvm.internals.jdbc.withConnection
4645
import org.funfix.delayedqueue.jvm.internals.jdbc.withDbRetries
4746
import org.funfix.delayedqueue.jvm.internals.jdbc.withTransaction
48-
import org.funfix.delayedqueue.jvm.internals.utils.Raise
49-
import org.funfix.delayedqueue.jvm.internals.utils.unsafeSneakyRaises
5047
import org.slf4j.LoggerFactory
5148

5249
/**
@@ -118,14 +115,9 @@ private constructor(
118115
* This method has Raise context for ResourceUnavailableException and InterruptedException,
119116
* which matches what the public API declares via @Throws.
120117
*/
121-
context(_: Raise<ResourceUnavailableException>, _: Raise<InterruptedException>)
122-
private fun <T> withRetries(
123-
block:
124-
context(Raise<SQLException>, Raise<InterruptedException>)
125-
() -> T
126-
): T {
118+
private fun <T> withRetries(block: () -> T): T {
127119
return if (config.retryPolicy == null) {
128-
block(Raise._PRIVATE_AND_UNSAFE, Raise._PRIVATE_AND_UNSAFE)
120+
block()
129121
} else {
130122
withDbRetries(
131123
config = config.retryPolicy,
@@ -138,17 +130,16 @@ private constructor(
138130

139131
@Throws(ResourceUnavailableException::class, InterruptedException::class)
140132
override fun offerOrUpdate(key: String, payload: A, scheduleAt: Instant): OfferOutcome =
141-
unsafeSneakyRaises {
142-
withRetries { offer(key, payload, scheduleAt, canUpdate = true) }
133+
withRetries {
134+
offer(key, payload, scheduleAt, canUpdate = true)
143135
}
144136

145137
@Throws(ResourceUnavailableException::class, InterruptedException::class)
146138
override fun offerIfNotExists(key: String, payload: A, scheduleAt: Instant): OfferOutcome =
147-
unsafeSneakyRaises {
148-
withRetries { offer(key, payload, scheduleAt, canUpdate = false) }
139+
withRetries {
140+
offer(key, payload, scheduleAt, canUpdate = false)
149141
}
150142

151-
context(_: Raise<InterruptedException>, _: Raise<SQLException>)
152143
private fun offer(
153144
key: String,
154145
payload: A,
@@ -232,11 +223,10 @@ private constructor(
232223

233224
@Throws(ResourceUnavailableException::class, InterruptedException::class)
234225
override fun <In> offerBatch(messages: List<BatchedMessage<In, A>>): List<BatchedReply<In, A>> =
235-
unsafeSneakyRaises {
236-
withRetries { offerBatchImpl(messages) }
226+
withRetries {
227+
offerBatchImpl(messages)
237228
}
238229

239-
context(_: Raise<InterruptedException>, _: Raise<SQLException>)
240230
private fun <In> offerBatchImpl(
241231
messages: List<BatchedMessage<In, A>>
242232
): List<BatchedReply<In, A>> {
@@ -344,25 +334,20 @@ private constructor(
344334
}
345335

346336
@Throws(ResourceUnavailableException::class, InterruptedException::class)
347-
override fun tryPoll(): AckEnvelope<A>? = unsafeSneakyRaises { withRetries { tryPollImpl() } }
337+
override fun tryPoll(): AckEnvelope<A>? = withRetries { tryPollImpl() }
348338

349339
private fun acknowledgeByLockUuid(lockUuid: String): AcknowledgeFun = {
350-
unsafeSneakyRaises {
351-
withRetries {
352-
database.withTransaction { conn -> adapter.deleteRowsWithLock(conn, lockUuid) }
353-
}
340+
withRetries {
341+
database.withTransaction { conn -> adapter.deleteRowsWithLock(conn, lockUuid) }
354342
}
355343
}
356344

357345
private fun acknowledgeByFingerprint(row: DBTableRowWithId): AcknowledgeFun = {
358-
unsafeSneakyRaises {
359-
withRetries {
360-
database.withTransaction { conn -> adapter.deleteRowByFingerprint(conn, row) }
361-
}
346+
withRetries {
347+
database.withTransaction { conn -> adapter.deleteRowByFingerprint(conn, row) }
362348
}
363349
}
364350

365-
context(_: Raise<InterruptedException>, _: Raise<SQLException>)
366351
private fun tryPollImpl(): AckEnvelope<A>? {
367352
// Retry loop to handle failed acquires (concurrent modifications)
368353
// This matches the original Scala implementation which retries if acquire fails
@@ -422,11 +407,10 @@ private constructor(
422407
}
423408

424409
@Throws(ResourceUnavailableException::class, InterruptedException::class)
425-
override fun tryPollMany(batchMaxSize: Int): AckEnvelope<List<A>> = unsafeSneakyRaises {
426-
withRetries { tryPollManyImpl(batchMaxSize) }
410+
override fun tryPollMany(batchMaxSize: Int): AckEnvelope<List<A>> = withRetries {
411+
tryPollManyImpl(batchMaxSize)
427412
}
428413

429-
context(_: Raise<InterruptedException>, _: Raise<SQLException>)
430414
private fun tryPollManyImpl(batchMaxSize: Int): AckEnvelope<List<A>> {
431415
// Handle edge case: non-positive batch size
432416
if (batchMaxSize <= 0) {
@@ -508,11 +492,8 @@ private constructor(
508492
}
509493

510494
@Throws(ResourceUnavailableException::class, InterruptedException::class)
511-
override fun read(key: String): AckEnvelope<A>? = unsafeSneakyRaises {
512-
withRetries { readImpl(key) }
513-
}
495+
override fun read(key: String): AckEnvelope<A>? = withRetries { readImpl(key) }
514496

515-
context(_: Raise<InterruptedException>, _: Raise<SQLException>)
516497
private fun readImpl(key: String): AckEnvelope<A>? {
517498
return database.withConnection { connection ->
518499
val row = adapter.selectByKey(connection, pKind, key) ?: return@withConnection null
@@ -539,19 +520,13 @@ private constructor(
539520
}
540521

541522
@Throws(ResourceUnavailableException::class, InterruptedException::class)
542-
override fun dropMessage(key: String): Boolean = unsafeSneakyRaises {
543-
withRetries {
544-
database.withTransaction { connection -> adapter.deleteOneRow(connection, key, pKind) }
545-
}
523+
override fun dropMessage(key: String): Boolean = withRetries {
524+
database.withTransaction { connection -> adapter.deleteOneRow(connection, key, pKind) }
546525
}
547526

548527
@Throws(ResourceUnavailableException::class, InterruptedException::class)
549-
override fun containsMessage(key: String): Boolean = unsafeSneakyRaises {
550-
withRetries {
551-
database.withConnection { connection ->
552-
adapter.checkIfKeyExists(connection, key, pKind)
553-
}
554-
}
528+
override fun containsMessage(key: String): Boolean = withRetries {
529+
database.withConnection { connection -> adapter.checkIfKeyExists(connection, key, pKind) }
555530
}
556531

557532
@Throws(
@@ -564,12 +539,8 @@ private constructor(
564539
"To drop all messages, you must provide the exact confirmation string"
565540
}
566541

567-
return unsafeSneakyRaises {
568-
withRetries {
569-
database.withTransaction { connection ->
570-
adapter.dropAllMessages(connection, pKind)
571-
}
572-
}
542+
return withRetries {
543+
database.withTransaction { connection -> adapter.dropAllMessages(connection, pKind) }
573544
}
574545
}
575546

@@ -607,6 +578,8 @@ private constructor(
607578
public companion object {
608579
private val logger = LoggerFactory.getLogger(DelayedQueueJDBC::class.java)
609580

581+
private fun <T> withRetries(block: () -> T): T = block()
582+
610583
/**
611584
* Runs database migrations for the specified configuration.
612585
*
@@ -619,7 +592,7 @@ private constructor(
619592
*/
620593
@JvmStatic
621594
@Throws(ResourceUnavailableException::class, InterruptedException::class)
622-
public fun runMigrations(config: DelayedQueueJDBCConfig): Unit = unsafeSneakyRaises {
595+
public fun runMigrations(config: DelayedQueueJDBCConfig): Unit = withRetries {
623596
val database = Database(config.db)
624597
database.use {
625598
database.withConnection { connection ->
@@ -671,7 +644,7 @@ private constructor(
671644
serializer: MessageSerializer<A>,
672645
config: DelayedQueueJDBCConfig,
673646
clock: Clock = Clock.systemUTC(),
674-
): DelayedQueueJDBC<A> = unsafeSneakyRaises {
647+
): DelayedQueueJDBC<A> = withRetries {
675648
val database = Database(config.db)
676649
val adapter = SQLVendorAdapter.create(config.db.driver, config.tableName)
677650
DelayedQueueJDBC(

delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/exceptions.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616

1717
package org.funfix.delayedqueue.jvm
1818

19+
import java.io.IOException
20+
1921
/**
2022
* Checked exception thrown in case of exceptions happening that are not recoverable, rendering
2123
* DelayedQueue inaccessible.
2224
*
2325
* Example: issues with the RDBMS (bugs, or connection unavailable, failing after multiple retries)
2426
*/
2527
public class ResourceUnavailableException(message: String?, cause: Throwable?) :
26-
Exception(message, cause)
28+
IOException(message, cause)

delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/CronServiceImpl.kt

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@ import org.funfix.delayedqueue.jvm.CronPayloadGenerator
3333
import org.funfix.delayedqueue.jvm.CronService
3434
import org.funfix.delayedqueue.jvm.DelayedQueue
3535
import org.funfix.delayedqueue.jvm.ResourceUnavailableException
36-
import org.funfix.delayedqueue.jvm.internals.utils.Raise
37-
import org.funfix.delayedqueue.jvm.internals.utils.runAndRecoverRaised
38-
import org.funfix.delayedqueue.jvm.internals.utils.unsafeSneakyRaises
3936
import org.funfix.delayedqueue.jvm.internals.utils.withTimeout
4037
import org.slf4j.LoggerFactory
4138

@@ -45,9 +42,7 @@ import org.slf4j.LoggerFactory
4542
* Used by CronServiceImpl to delegate database operations to the DelayedQueue implementation while
4643
* maintaining proper exception flow tracking via Raise context.
4744
*/
48-
internal typealias CronDeleteOperation =
49-
context(Raise<ResourceUnavailableException>, Raise<InterruptedException>)
50-
(CronConfigHash, String) -> Unit
45+
internal typealias CronDeleteOperation = (CronConfigHash, String) -> Unit
5146

5247
/**
5348
* Base implementation of CronService that can be used by both in-memory and JDBC implementations.
@@ -66,19 +61,17 @@ internal class CronServiceImpl<A>(
6661
keyPrefix: String,
6762
messages: List<CronMessage<A>>,
6863
) {
69-
unsafeSneakyRaises {
70-
installTick0(
71-
configHash = configHash,
72-
keyPrefix = keyPrefix,
73-
messages = messages,
74-
canUpdate = false,
75-
)
76-
}
64+
installTick0(
65+
configHash = configHash,
66+
keyPrefix = keyPrefix,
67+
messages = messages,
68+
canUpdate = false,
69+
)
7770
}
7871

7972
@Throws(ResourceUnavailableException::class, InterruptedException::class)
8073
override fun uninstallTick(configHash: CronConfigHash, keyPrefix: String) {
81-
unsafeSneakyRaises { deleteCurrentCron(configHash, keyPrefix) }
74+
deleteCurrentCron(configHash, keyPrefix)
8275
}
8376

8477
@Throws(ResourceUnavailableException::class, InterruptedException::class)
@@ -157,7 +150,6 @@ internal class CronServiceImpl<A>(
157150
* @param canUpdate whether to update existing messages (false for installTick, varies for
158151
* install)
159152
*/
160-
context(_: Raise<ResourceUnavailableException>, _: Raise<InterruptedException>)
161153
private fun installTick0(
162154
configHash: CronConfigHash,
163155
keyPrefix: String,
@@ -208,21 +200,17 @@ internal class CronServiceImpl<A>(
208200

209201
val task = Runnable {
210202
try {
211-
runAndRecoverRaised({
212-
withTimeout(scheduleInterval) {
213-
val now = clock.instant()
214-
val firstRun = isFirst.getAndSet(false)
215-
val messages = generateMany(now)
216-
217-
installTick0(
218-
configHash = configHash,
219-
keyPrefix = keyPrefix,
220-
messages = messages,
221-
canUpdate = firstRun,
222-
)
223-
}
224-
}) { timeout ->
225-
throw timeout
203+
withTimeout(scheduleInterval) {
204+
val now = clock.instant()
205+
val firstRun = isFirst.getAndSet(false)
206+
val messages = generateMany(now)
207+
208+
installTick0(
209+
configHash = configHash,
210+
keyPrefix = keyPrefix,
211+
messages = messages,
212+
canUpdate = firstRun,
213+
)
226214
}
227215
} catch (e: Exception) {
228216
logger.error("Error in cron task for $keyPrefix", e)

delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/Migration.kt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
package org.funfix.delayedqueue.jvm.internals.jdbc
1818

19-
import java.sql.SQLException
20-
import org.funfix.delayedqueue.jvm.internals.utils.Raise
21-
2219
/**
2320
* Represents a database migration with SQL and a test to check if it needs to run.
2421
*
@@ -91,7 +88,6 @@ internal object MigrationRunner {
9188
* @param migrations List of migrations to run
9289
* @return Number of migrations executed
9390
*/
94-
context(_: Raise<InterruptedException>, _: Raise<SQLException>)
9591
fun runMigrations(conn: SafeConnection, migrations: List<Migration>): Int {
9692
var executed = 0
9793
for (migration in migrations) {

0 commit comments

Comments
 (0)