-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path5-pipe.kt
More file actions
62 lines (54 loc) · 1.69 KB
/
5-pipe.kt
File metadata and controls
62 lines (54 loc) · 1.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package ConcurrentQueue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.time.LocalDateTime
open class PipeTask(
name: String,
interval: Long,
startTime: LocalDateTime = LocalDateTime.MIN,
waitTimeout: Long = Long.MAX_VALUE,
priority: Int = 0,
var processed: Boolean = false
): PriorityTask(name, interval, startTime, waitTimeout, priority) {
override fun toString() = "name = $name; interval = $interval; processed = $processed "
}
class PipeQueue<T: PipeTask>(
concurrency: Int,
scope: CoroutineScope = GlobalScope
): PriorityQueue<T>(concurrency, scope) {
protected var destination: PipeQueue<T>? = null
override fun finish(error: Throwable?, task: T) {
if (error != null) {
onFailure?.invoke(error, task)
}
else {
onSuccess?.invoke(task)
destination?.add(task)
}
onDone?.invoke(error, task)
if (count == 0) onDrain?.invoke()
}
fun pipe(recipient: PipeQueue<T>): PipeQueue<T> {
destination = recipient
return this
}
}
private fun main() = runBlocking {
val destination = PipeQueue<PipeTask>(2).apply {
wait(5000)
process { task, next -> next(null, task.apply { processed = true }) }
done { error, task -> println("task: $task") }
}
val source = PipeQueue<PipeTask>(3).apply {
timeout(4000)
process { task, next ->
delay(task.interval)
next(null, task)
}
pipe(destination)
}
for (i in 0 until 10) source.add(PipeTask("Task$i", 1000L))
delay(15000)
}