expected = new HashSet<>();
+ for (int i = 0; i < NUM_DOCS; i++) {
+ expected.add(i);
+ }
+ assertEquals("Cursor must return every document exactly once", expected, values);
+ }
+
+ /**
+ * Subclass that injects a {@link FailingMongoCursor} for the first {@code maxFailures}
+ * resume calls, simulating {@link MongoCursorNotFoundException} mid-iteration.
+ *
+ * The {@code initialized} flag is false while {@code super()} runs (Java guarantees that
+ * subclass fields are still at their defaults during the superclass constructor), so the first
+ * cursor created during construction uses the real MongoDB cursor. Once our constructor body
+ * sets {@code initialized = true} and calls {@link #reset()}, subsequent calls to
+ * {@code newFindIterable} / {@code newAggregateIterable} wrap the cursor with a failing one.
+ */
+ private static class FailingCursorPersistentCursor extends MongoPersistentCursor {
+
+ private final int failAfter;
+ private int failuresLeft;
+ /** False while super() runs so the constructor-time reset uses a real cursor. */
+ private boolean initialized;
+
+ // Find mode
+ FailingCursorPersistentCursor(MongoDBCollection collection, Bson query, Bson projection,
+ QueryOptions options, int failAfter, int maxFailures) {
+ super(collection, query, projection, options);
+ this.failAfter = failAfter;
+ this.failuresLeft = maxFailures;
+ this.initialized = true;
+ reset(); // restart fresh – this time newFindIterable wraps with a failing cursor
+ }
+
+ // Aggregate mode
+ FailingCursorPersistentCursor(MongoDBCollection collection, List pipeline,
+ QueryOptions options, int failAfter, int maxFailures) {
+ super(collection, pipeline, options);
+ this.failAfter = failAfter;
+ this.failuresLeft = maxFailures;
+ this.initialized = true;
+ reset(); // restart fresh – this time newAggregateIterable wraps with a failing cursor
+ }
+
+ @Override
+ protected FindIterable newFindIterable(Bson query, Bson projection, QueryOptions options) {
+ FindIterable real = super.newFindIterable(query, projection, options);
+ if (!initialized || failuresLeft == 0) {
+ return real;
+ }
+ failuresLeft--;
+ return wrapFind(real);
+ }
+
+ @Override
+ protected AggregateIterable newAggregateIterable(List activePipeline) {
+ AggregateIterable real = super.newAggregateIterable(activePipeline);
+ if (!initialized || failuresLeft == 0) {
+ return real;
+ }
+ failuresLeft--;
+ return wrapAggregate(real);
+ }
+
+ private FindIterable wrapFind(FindIterable real) {
+ // Fluent calls are forwarded to `real` so that sort/skip/limit are applied
+ // before real.iterator() is opened — important for tests that use skip/limit.
+ FindIterable mockIterable = mock(FindIterable.class);
+ when(mockIterable.sort(any(Bson.class))).thenAnswer(inv -> { real.sort(inv.getArgument(0)); return mockIterable; });
+ when(mockIterable.batchSize(anyInt())).thenAnswer(inv -> { real.batchSize(inv.getArgument(0)); return mockIterable; });
+ when(mockIterable.limit(anyInt())).thenAnswer(inv -> { real.limit(inv.getArgument(0)); return mockIterable; });
+ when(mockIterable.skip(anyInt())).thenAnswer(inv -> { real.skip(inv.getArgument(0)); return mockIterable; });
+ when(mockIterable.iterator()).thenAnswer(inv -> new FailingMongoCursor(real.iterator(), failAfter));
+ return mockIterable;
+ }
+
+ private AggregateIterable wrapAggregate(AggregateIterable real) {
+ AggregateIterable mockIterable = mock(AggregateIterable.class);
+ when(mockIterable.batchSize(anyInt())).thenAnswer(inv -> { real.batchSize(inv.getArgument(0)); return mockIterable; });
+ when(mockIterable.iterator()).thenAnswer(inv -> new FailingMongoCursor(real.iterator(), failAfter));
+ return mockIterable;
+ }
+ }
+
+ /**
+ * Wraps a real {@link MongoCursor} and throws {@link MongoCursorNotFoundException} after
+ * exactly {@code failAfter} documents have been returned via {@link #next()}.
+ */
+ private static class FailingMongoCursor implements MongoCursor {
+
+ private final MongoCursor delegate;
+ private int remaining;
+
+ FailingMongoCursor(MongoCursor delegate, int failAfter) {
+ this.delegate = delegate;
+ this.remaining = failAfter;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (remaining == 0) {
+ throw new MongoCursorNotFoundException(0xDEADBEEFL, new ServerAddress());
+ }
+ return delegate.hasNext();
+ }
+
+ @Override
+ public Document next() {
+ remaining--;
+ return delegate.next();
+ }
+
+ @Override
+ public Document tryNext() {
+ if (remaining == 0) {
+ throw new MongoCursorNotFoundException(0xDEADBEEFL, new ServerAddress());
+ }
+ Document doc = delegate.tryNext();
+ if (doc != null) {
+ remaining--;
+ }
+ return doc;
+ }
+
+ @Override
+ public int available() {
+ return delegate.available();
+ }
+
+ @Override
+ public ServerCursor getServerCursor() {
+ return delegate.getServerCursor();
+ }
+
+ @Override
+ public ServerAddress getServerAddress() {
+ return delegate.getServerAddress();
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+ }
+}
diff --git a/commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java b/commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java
index b87f3691d..6a694b381 100644
--- a/commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java
+++ b/commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java
@@ -310,10 +310,14 @@ public List apply(List batch) throws Exception {
if (batch == null || batch.isEmpty()) {
return batch;
}
- increment(batch.size(), () -> {
- T lastElement = batch.get(batch.size() - 1);
- return messageBuilder.apply(lastElement);
- });
+ if (messageBuilder == null) {
+ increment(batch.size());
+ } else {
+ increment(batch.size(), () -> {
+ T lastElement = batch.get(batch.size() - 1);
+ return messageBuilder.apply(lastElement);
+ });
+ }
return batch;
}
};
diff --git a/commons-lib/src/main/java/org/opencb/commons/io/DataReader.java b/commons-lib/src/main/java/org/opencb/commons/io/DataReader.java
index e203fa7fc..c3dde17a1 100644
--- a/commons-lib/src/main/java/org/opencb/commons/io/DataReader.java
+++ b/commons-lib/src/main/java/org/opencb/commons/io/DataReader.java
@@ -173,4 +173,22 @@ default void forEach(Consumer super T> action, int batchSize) {
action.accept(t);
}
}
+
+ static DataReader wrap(Iterable iterable) {
+ return wrap(iterable.iterator());
+ }
+
+ static DataReader wrap(Iterator iterator) {
+ return new DataReader() {
+ @Override
+ public List read(int batchSize) {
+ List batch = new ArrayList<>(batchSize);
+ while (iterator.hasNext() && batchSize > batch.size()) {
+ batch.add(iterator.next());
+ }
+ return batch;
+ }
+ };
+ }
+
}
diff --git a/commons-lib/src/main/java/org/opencb/commons/io/DataWriter.java b/commons-lib/src/main/java/org/opencb/commons/io/DataWriter.java
index fc86358ad..5c00ab6c1 100644
--- a/commons-lib/src/main/java/org/opencb/commons/io/DataWriter.java
+++ b/commons-lib/src/main/java/org/opencb/commons/io/DataWriter.java
@@ -20,6 +20,10 @@
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -98,6 +102,158 @@ public void post() throws Exception {
};
}
+ /**
+ * Fan out to two writers receiving the same batches sequentially.
+ *
+ * @param dw1 First writer
+ * @param dw2 Second writer
+ * @param Batch element type
+ * @return Composite writer that writes to both dw1 and dw2 in sequence.
+ */
+ static DataWriter tee(DataWriter dw1, DataWriter dw2) {
+ return dw1.then(dw2.asTask());
+ }
+
+ static DataWriter tee(DataWriter dw1, DataWriter dw2, boolean parallel) {
+ return tee(dw1, dw2, parallel, 1);
+ }
+
+ /**
+ * Fan out to two writers receiving the same batches.
+ *
+ * When {@code parallel=true} each writer runs in its own background thread.
+ * Batches are enqueued from the caller thread; the background threads consume and write.
+ * Any exception thrown by a background thread is rethrown from {@link #post()}.
+ *
+ * @param dw1 First writer
+ * @param dw2 Second writer
+ * @param parallel Whether to run each writer in its own background thread
+ * @param queueCapacity Maximum number of batches buffered per writer when parallel
+ * @param Batch element type
+ * @return Composite writer that writes to both dw1 and dw2.
+ */
+ static DataWriter tee(DataWriter dw1, DataWriter dw2, boolean parallel, int queueCapacity) {
+ if (!parallel) {
+ return tee(dw1, dw2);
+ }
+ return new DataWriter() {
+ private final BlockingQueue>> queue1 = new LinkedBlockingQueue<>(queueCapacity);
+ private final BlockingQueue>> queue2 = new LinkedBlockingQueue<>(queueCapacity);
+ private Thread thread1;
+ private Thread thread2;
+ private volatile Throwable error1;
+ private volatile Throwable error2;
+
+ @Override
+ public boolean pre() {
+ thread1 = new Thread(() -> {
+ try {
+ dw1.open();
+ dw1.pre();
+ Optional> item = queue1.take();
+ while (item.isPresent()) {
+ dw1.write(item.get());
+ item = queue1.take();
+ }
+ dw1.post();
+ dw1.close();
+ } catch (Throwable t) {
+ error1 = t;
+ }
+ }, Thread.currentThread().getName() + "-writer-1");
+ thread2 = new Thread(() -> {
+ try {
+ dw2.open();
+ dw2.pre();
+ Optional> item = queue2.take();
+ while (item.isPresent()) {
+ dw2.write(item.get());
+ item = queue2.take();
+ }
+ dw2.post();
+ dw2.close();
+ } catch (Throwable t) {
+ error2 = t;
+ }
+ }, Thread.currentThread().getName() + "-writer-2");
+ thread1.start();
+ thread2.start();
+ return true;
+ }
+
+ @Override
+ public boolean write(List batch) {
+ checkErrors();
+ try {
+ offerIfAlive(queue1, Optional.of(batch), thread1);
+ offerIfAlive(queue2, Optional.of(batch), thread2);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while enqueueing batch", e);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean post() {
+ boolean pill1Sent = false;
+ boolean pill2Sent = false;
+ try {
+ offerIfAlive(queue1, Optional.empty(), thread1);
+ pill1Sent = true;
+ offerIfAlive(queue2, Optional.empty(), thread2);
+ pill2Sent = true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while sending poison pill", e);
+ } finally {
+ if (!pill1Sent) {
+ thread1.interrupt();
+ }
+ if (!pill2Sent) {
+ thread2.interrupt();
+ }
+ try {
+ thread1.join();
+ thread2.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ checkErrors();
+ return true;
+ }
+
+ /**
+ * Offer an item to the queue, retrying with a timeout. If the consumer thread
+ * is dead (can't drain the queue), clear the queue and return.
+ */
+ private void offerIfAlive(BlockingQueue>> queue, Optional> item, Thread thread)
+ throws InterruptedException {
+ while (!queue.offer(item, 100, TimeUnit.MILLISECONDS)) {
+ if (!thread.isAlive()) {
+ queue.clear();
+ checkErrors();
+ return;
+ }
+ }
+ }
+
+ private void checkErrors() {
+ if (error1 != null && error2 != null) {
+ RuntimeException e = new RuntimeException("Error in tee background writers");
+ e.addSuppressed(error1);
+ e.addSuppressed(error2);
+ throw e;
+ } else if (error1 != null) {
+ throw new RuntimeException("Error in tee background writer 1", error1);
+ } else if (error2 != null) {
+ throw new RuntimeException("Error in tee background writer 2", error2);
+ }
+ }
+ };
+ }
+
default DataWriter then(DataWriter nextWriter) {
return then(nextWriter.asTask());
}
diff --git a/commons-lib/src/main/java/org/opencb/commons/run/Task.java b/commons-lib/src/main/java/org/opencb/commons/run/Task.java
index 971057946..ab24c69b9 100644
--- a/commons-lib/src/main/java/org/opencb/commons/run/Task.java
+++ b/commons-lib/src/main/java/org/opencb/commons/run/Task.java
@@ -130,47 +130,62 @@ default Task then(DataWriter writer) {
/**
* Use to execute multiple Tasks with the same input.
- * Only the output of the main task will be propagated.
+ * Only the output of the main task will be propagated. The side task runs as a side-effect.
*
- * task = Task.join(task1, task2);
+ * task = Task.tee(task1, task2);
*
- * @param mainTask Main task to propagate
- * @param otherTask Task to execute with the same input. The output will be lost.
+ * @param mainTask Main task whose output is propagated
+ * @param sideTask Task to execute with the same input. The output will be discarded.
* @param Input type.
* @param Return type.
* @return Task that runs both tasks with the same input.
*/
- static Task join(Task mainTask, Task otherTask) {
+ static Task tee(Task mainTask, Task sideTask) {
return new Task() {
@Override
public void pre() throws Exception {
mainTask.pre();
- otherTask.pre();
+ sideTask.pre();
}
@Override
public List apply(List batch) throws Exception {
List apply1 = mainTask.apply(batch);
- otherTask.apply(batch); // ignore output
+ sideTask.apply(batch); // ignore output
return apply1;
}
@Override
public List drain() throws Exception {
- // Drain both tasks
List drain1 = mainTask.drain();
- otherTask.drain(); // ignore output
-
- // Return drain1
+ sideTask.drain(); // ignore output
return drain1;
}
@Override
public void post() throws Exception {
mainTask.post();
- otherTask.post();
+ sideTask.post();
}
};
}
+ /**
+ * Use to execute multiple Tasks with the same input.
+ * Only the output of the main task will be propagated.
+ *
+ * task = Task.join(task1, task2);
+ *
+ * @param mainTask Main task to propagate
+ * @param otherTask Task to execute with the same input. The output will be lost.
+ * @param Input type.
+ * @param Return type.
+ * @return Task that runs both tasks with the same input.
+ * @deprecated Use {@link #tee(Task, Task)} instead.
+ */
+ @Deprecated
+ static Task join(Task mainTask, Task otherTask) {
+ return tee(mainTask, otherTask);
+ }
+
}
diff --git a/commons-lib/src/test/java/org/opencb/commons/io/DataWriterTest.java b/commons-lib/src/test/java/org/opencb/commons/io/DataWriterTest.java
new file mode 100644
index 000000000..42ab6096b
--- /dev/null
+++ b/commons-lib/src/test/java/org/opencb/commons/io/DataWriterTest.java
@@ -0,0 +1,320 @@
+package org.opencb.commons.io;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.opencb.commons.run.Task;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DataWriterTest {
+
+ // ===== tee(dw1, dw2) — sequential =====
+
+ @Test
+ public void testTeeSequentialBothReceiveSameBatches() {
+ List> received1 = new ArrayList<>();
+ List> received2 = new ArrayList<>();
+ DataWriter dw1 = b -> { received1.add(new ArrayList<>(b)); return true; };
+ DataWriter dw2 = b -> { received2.add(new ArrayList<>(b)); return true; };
+
+ DataWriter tee = DataWriter.tee(dw1, dw2);
+ tee.write(Arrays.asList("a", "b"));
+ tee.write(Collections.singletonList("c"));
+
+ Assert.assertEquals(received1, received2);
+ Assert.assertEquals(2, received1.size());
+ Assert.assertEquals(Arrays.asList("a", "b"), received1.get(0));
+ Assert.assertEquals(Collections.singletonList("c"), received1.get(1));
+ }
+
+ @Test
+ public void testTeeSequentialLifecycleCalledOnBothWriters() {
+ AtomicBoolean pre1 = new AtomicBoolean();
+ AtomicBoolean post1 = new AtomicBoolean();
+ AtomicBoolean pre2 = new AtomicBoolean();
+ AtomicBoolean post2 = new AtomicBoolean();
+
+ DataWriter dw1 = new DataWriter() {
+ @Override public boolean pre() { pre1.set(true); return true; }
+ @Override public boolean write(List b) { return true; }
+ @Override public boolean post() { post1.set(true); return true; }
+ };
+ DataWriter dw2 = new DataWriter() {
+ @Override public boolean pre() { pre2.set(true); return true; }
+ @Override public boolean write(List b) { return true; }
+ @Override public boolean post() { post2.set(true); return true; }
+ };
+
+ DataWriter tee = DataWriter.tee(dw1, dw2);
+ tee.pre();
+ tee.write(Collections.singletonList("x"));
+ tee.post();
+
+ Assert.assertTrue(pre1.get());
+ Assert.assertTrue(pre2.get());
+ Assert.assertTrue(post1.get());
+ Assert.assertTrue(post2.get());
+ }
+
+ // ===== tee(dw1, dw2, false) — explicit non-parallel =====
+
+ @Test
+ public void testTeeNonParallelEquivalentToSequential() {
+ List> received1 = new ArrayList<>();
+ List> received2 = new ArrayList<>();
+ DataWriter dw1 = b -> { received1.add(new ArrayList<>(b)); return true; };
+ DataWriter dw2 = b -> { received2.add(new ArrayList<>(b)); return true; };
+
+ DataWriter tee = DataWriter.tee(dw1, dw2, false);
+ tee.write(Arrays.asList("x", "y"));
+
+ Assert.assertEquals(received1, received2);
+ Assert.assertEquals(1, received1.size());
+ }
+
+ // ===== tee(dw1, dw2, true) — parallel =====
+
+ @Test
+ public void testTeeParallelBothReceiveAllBatches() throws Exception {
+ List received1 = Collections.synchronizedList(new ArrayList<>());
+ List received2 = Collections.synchronizedList(new ArrayList<>());
+ DataWriter dw1 = b -> { received1.addAll(b); return true; };
+ DataWriter dw2 = b -> { received2.addAll(b); return true; };
+
+ DataWriter tee = DataWriter.tee(dw1, dw2, true);
+ tee.pre();
+ tee.write(Arrays.asList("a", "b", "c"));
+ tee.write(Arrays.asList("d", "e"));
+ tee.post(); // blocks until both background threads complete
+
+ List expected = Arrays.asList("a", "b", "c", "d", "e");
+ Assert.assertEquals(expected, received1);
+ Assert.assertEquals(expected, received2);
+ }
+
+ @Test
+ public void testTeeParallelOpenPrePostCloseCalledOnBothWriters() throws Exception {
+ AtomicBoolean open1 = new AtomicBoolean();
+ AtomicBoolean pre1 = new AtomicBoolean();
+ AtomicBoolean post1 = new AtomicBoolean();
+ AtomicBoolean close1 = new AtomicBoolean();
+ AtomicBoolean open2 = new AtomicBoolean();
+ AtomicBoolean pre2 = new AtomicBoolean();
+ AtomicBoolean post2 = new AtomicBoolean();
+ AtomicBoolean close2 = new AtomicBoolean();
+
+ DataWriter dw1 = new DataWriter() {
+ @Override public boolean open() { open1.set(true); return true; }
+ @Override public boolean pre() { pre1.set(true); return true; }
+ @Override public boolean write(List b) { return true; }
+ @Override public boolean post() { post1.set(true); return true; }
+ @Override public boolean close() { close1.set(true); return true; }
+ };
+ DataWriter dw2 = new DataWriter() {
+ @Override public boolean open() { open2.set(true); return true; }
+ @Override public boolean pre() { pre2.set(true); return true; }
+ @Override public boolean write(List b) { return true; }
+ @Override public boolean post() { post2.set(true); return true; }
+ @Override public boolean close() { close2.set(true); return true; }
+ };
+
+ DataWriter tee = DataWriter.tee(dw1, dw2, true);
+ tee.pre();
+ tee.write(Collections.singletonList("x"));
+ tee.post(); // joins background threads → all lifecycle steps completed
+
+ Assert.assertTrue(open1.get());
+ Assert.assertTrue(pre1.get());
+ Assert.assertTrue(post1.get());
+ Assert.assertTrue(close1.get());
+ Assert.assertTrue(open2.get());
+ Assert.assertTrue(pre2.get());
+ Assert.assertTrue(post2.get());
+ Assert.assertTrue(close2.get());
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testTeeParallelExceptionInBackgroundWriterRethrownOnPost() throws Exception {
+ // dw1 always throws; the error must surface when post() joins the background thread
+ DataWriter failing = b -> { throw new RuntimeException("write failed"); };
+ DataWriter ok = b -> true;
+
+ DataWriter tee = DataWriter.tee(failing, ok, true);
+ tee.pre();
+ tee.write(Collections.singletonList("x"));
+ tee.post(); // background thread for 'failing' set error1; post() must rethrow it
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testTeeParallelExceptionDetectedOnSubsequentWrite() throws Exception {
+ // After the background thread has failed, the next write() should detect it
+ DataWriter failing = b -> { throw new RuntimeException("write failed"); };
+ DataWriter ok = b -> true;
+
+ DataWriter tee = DataWriter.tee(failing, ok, true);
+ tee.pre();
+ tee.write(Collections.singletonList("trigger-failure"));
+ Thread.sleep(200); // wait for background thread to process and set the error
+ tee.write(Collections.singletonList("should-throw")); // error already set → throws
+ }
+
+ @Test(timeout = 5000)
+ public void testTeeParallelPostShouldNotHangWhenBackgroundWriterDies() throws Exception {
+ // Scenario: thread1 dies (exception in write), leaving an unconsumed batch in
+ // its bounded queue. post() tries to enqueue a poison pill into the full queue
+ // and hangs forever because there's no consumer to drain it.
+ CountDownLatch writerStarted = new CountDownLatch(1);
+ CountDownLatch writerCanProceed = new CountDownLatch(1);
+ CountDownLatch okWriterConsumedBatch1 = new CountDownLatch(1);
+
+ DataWriter failing = new DataWriter() {
+ @Override
+ public boolean write(List batch) {
+ writerStarted.countDown();
+ try {
+ writerCanProceed.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ throw new RuntimeException("write failed");
+ }
+ };
+
+ DataWriter ok = b -> {
+ okWriterConsumedBatch1.countDown();
+ return true;
+ };
+
+ DataWriter tee = DataWriter.tee(failing, ok, true, 1);
+ tee.pre();
+
+ // batch1: thread1 takes it, enters write(), signals writerStarted, blocks on latch
+ tee.write(Collections.singletonList("batch1"));
+ writerStarted.await();
+
+ // Ensure thread2 has consumed batch1 so queue2.put won't block on next write
+ okWriterConsumedBatch1.await();
+
+ // batch2: fills queue1 to capacity (thread1 is blocked, can't consume)
+ tee.write(Collections.singletonList("batch2"));
+
+ // Let thread1 proceed → it throws and dies. queue1 still has batch2 (full).
+ writerCanProceed.countDown();
+ Thread.sleep(200);
+
+ // post() should complete and report the error, not hang.
+ // BUG: queue1.put(poison pill) blocks forever because queue1 is full and thread1 is dead.
+ try {
+ tee.post();
+ Assert.fail("post() should have thrown due to failed background writer");
+ } catch (RuntimeException e) {
+ // Expected: error from failed writer should propagate
+ }
+ }
+
+ // ===== asTask() =====
+
+ @Test
+ public void testAsTaskDelegatesFullLifecycle() throws Exception {
+ AtomicBoolean opened = new AtomicBoolean();
+ AtomicBoolean pre = new AtomicBoolean();
+ AtomicBoolean post = new AtomicBoolean();
+ AtomicBoolean closed = new AtomicBoolean();
+ List> written = new ArrayList<>();
+
+ DataWriter dw = new DataWriter() {
+ @Override public boolean open() { opened.set(true); return true; }
+ @Override public boolean pre() { pre.set(true); return true; }
+ @Override public boolean write(List b) { written.add(new ArrayList<>(b)); return true; }
+ @Override public boolean post() { post.set(true); return true; }
+ @Override public boolean close() { closed.set(true); return true; }
+ };
+
+ Task task = dw.asTask();
+ task.pre();
+ List result = task.apply(Arrays.asList("x", "y"));
+ task.post();
+
+ Assert.assertTrue(opened.get());
+ Assert.assertTrue(pre.get());
+ Assert.assertTrue(post.get());
+ Assert.assertTrue(closed.get());
+ Assert.assertEquals(1, written.size());
+ Assert.assertEquals(Arrays.asList("x", "y"), result); // batch passed through
+ }
+
+ @Test
+ public void testAsTaskPreAndPostAreIdempotent() throws Exception {
+ // open/pre are called only once even if task.pre() is invoked multiple times,
+ // same for post/close.
+ AtomicInteger preCount = new AtomicInteger();
+ AtomicInteger postCount = new AtomicInteger();
+
+ DataWriter dw = new DataWriter() {
+ @Override public boolean pre() { preCount.incrementAndGet(); return true; }
+ @Override public boolean write(List b) { return true; }
+ @Override public boolean post() { postCount.incrementAndGet(); return true; }
+ };
+
+ Task task = dw.asTask();
+ task.pre();
+ task.pre();
+ task.post();
+ task.post();
+
+ Assert.assertEquals(1, preCount.get());
+ Assert.assertEquals(1, postCount.get());
+ }
+
+ // ===== then(DataWriter) =====
+
+ @Test
+ public void testThenDataWriterBothWritersReceiveSameBatches() {
+ List received1 = new ArrayList<>();
+ List received2 = new ArrayList<>();
+ DataWriter dw1 = b -> { received1.addAll(b); return true; };
+ DataWriter dw2 = b -> { received2.addAll(b); return true; };
+
+ DataWriter chained = dw1.then(dw2);
+ chained.write(Arrays.asList("a", "b"));
+ chained.write(Collections.singletonList("c"));
+
+ Assert.assertEquals(received1, received2);
+ Assert.assertEquals(Arrays.asList("a", "b", "c"), received1);
+ }
+
+ @Test
+ public void testThenDataWriterLifecycleCalledOnBoth() {
+ AtomicBoolean pre1 = new AtomicBoolean();
+ AtomicBoolean post1 = new AtomicBoolean();
+ AtomicBoolean pre2 = new AtomicBoolean();
+ AtomicBoolean post2 = new AtomicBoolean();
+
+ DataWriter dw1 = new DataWriter() {
+ @Override public boolean pre() { pre1.set(true); return true; }
+ @Override public boolean write(List b) { return true; }
+ @Override public boolean post() { post1.set(true); return true; }
+ };
+ DataWriter dw2 = new DataWriter() {
+ @Override public boolean pre() { pre2.set(true); return true; }
+ @Override public boolean write(List b) { return true; }
+ @Override public boolean post() { post2.set(true); return true; }
+ };
+
+ DataWriter chained = dw1.then(dw2);
+ chained.pre();
+ chained.write(Collections.singletonList("x"));
+ chained.post();
+
+ Assert.assertTrue(pre1.get());
+ Assert.assertTrue(pre2.get());
+ Assert.assertTrue(post1.get());
+ Assert.assertTrue(post2.get());
+ }
+}
diff --git a/commons-lib/src/test/java/org/opencb/commons/run/TaskTest.java b/commons-lib/src/test/java/org/opencb/commons/run/TaskTest.java
new file mode 100644
index 000000000..559b8d120
--- /dev/null
+++ b/commons-lib/src/test/java/org/opencb/commons/run/TaskTest.java
@@ -0,0 +1,231 @@
+package org.opencb.commons.run;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+public class TaskTest {
+
+ // ===== forEach(Function) =====
+
+ @Test
+ public void testForEachFunctionTransforms() throws Exception {
+ Task task = Task.forEach(String::length);
+ List result = task.apply(Arrays.asList("a", "bb", "ccc"));
+ Assert.assertEquals(Arrays.asList(1, 2, 3), result);
+ }
+
+ @Test
+ public void testForEachFunctionFiltersNullReturns() throws Exception {
+ // Elements where the function returns null are excluded from the output
+ Task task = Task.forEach(s -> s.startsWith("a") ? s.length() : null);
+ List result = task.apply(Arrays.asList("abc", "xyz", "ab"));
+ Assert.assertEquals(Arrays.asList(3, 2), result);
+ }
+
+ @Test
+ public void testForEachFunctionAllNullReturnsEmptyList() throws Exception {
+ Task task = Task.forEach(s -> null);
+ List result = task.apply(Arrays.asList("a", "b"));
+ Assert.assertEquals(Collections.emptyList(), result);
+ }
+
+ @Test
+ public void testForEachFunctionEmptyBatch() throws Exception {
+ Task task = Task.forEach(String::length);
+ Assert.assertEquals(Collections.emptyList(), task.apply(Collections.emptyList()));
+ }
+
+ @Test
+ public void testForEachFunctionNullBatch() throws Exception {
+ Task task = Task.forEach(String::length);
+ Assert.assertEquals(Collections.emptyList(), task.apply(null));
+ }
+
+ // ===== forEach(Consumer) =====
+
+ @Test
+ public void testForEachConsumerRunsOnEachElementAndPassesThrough() throws Exception {
+ List visited = new ArrayList<>();
+ Task task = Task.forEach((Consumer) s -> { visited.add(s); });
+ List input = Arrays.asList("a", "b", "c");
+
+ List result = task.apply(input);
+
+ Assert.assertEquals(input, visited); // consumer received every element
+ Assert.assertEquals(input, result); // original batch passed through unchanged
+ }
+
+ @Test
+ public void testForEachConsumerEmptyBatch() throws Exception {
+ Task task = Task.forEach((Consumer) s -> {});
+ Assert.assertEquals(Collections.emptyList(), task.apply(Collections.emptyList()));
+ }
+
+ @Test
+ public void testForEachConsumerNullBatch() throws Exception {
+ Task task = Task.forEach((Consumer) s -> {});
+ Assert.assertEquals(Collections.emptyList(), task.apply(null));
+ }
+
+ // ===== then(Task) =====
+
+ @Test
+ public void testThenChainsOutputToNextTaskInput() throws Exception {
+ Task lengths = Task.forEach(String::length);
+ Task labels = Task.forEach(i -> "len=" + i);
+
+ List result = lengths.then(labels).apply(Arrays.asList("hello", "hi"));
+ Assert.assertEquals(Arrays.asList("len=5", "len=2"), result);
+ }
+
+ @Test
+ public void testThenPreAndPostCalledOnBothTasks() throws Exception {
+ AtomicBoolean pre1 = new AtomicBoolean();
+ AtomicBoolean post1 = new AtomicBoolean();
+ AtomicBoolean pre2 = new AtomicBoolean();
+ AtomicBoolean post2 = new AtomicBoolean();
+
+ Task task1 = new Task() {
+ @Override public void pre() { pre1.set(true); }
+ @Override public List apply(List b) { return b; }
+ @Override public void post() { post1.set(true); }
+ };
+ Task task2 = new Task() {
+ @Override public void pre() { pre2.set(true); }
+ @Override public List apply(List b) { return b; }
+ @Override public void post() { post2.set(true); }
+ };
+
+ Task combined = task1.then(task2);
+ combined.pre();
+ combined.post();
+
+ Assert.assertTrue(pre1.get());
+ Assert.assertTrue(pre2.get());
+ Assert.assertTrue(post1.get());
+ Assert.assertTrue(post2.get());
+ }
+
+ @Test
+ public void testThenDrainFromFirstTaskFeedsIntoSecond() throws Exception {
+ // task1 drains an extra element; task2 uppercases everything
+ Task task1 = new Task() {
+ @Override public List apply(List b) { return b; }
+ @Override public List drain() { return Collections.singletonList("drained"); }
+ };
+ Task task2 = Task.forEach(s -> {
+ return s.toUpperCase();
+ });
+
+ // drain1 = ["drained"] → task2.apply(drain1) = ["DRAINED"], task2.drain() = []
+ List drain = task1.then(task2).drain();
+ Assert.assertEquals(Collections.singletonList("DRAINED"), drain);
+ }
+
+ @Test
+ public void testThenDrainCombinesBothDrains() throws Exception {
+ // Both tasks drain something; the combined result should include both contributions
+ Task task1 = new Task() {
+ @Override public List apply(List b) { return b; }
+ @Override public List drain() { return Collections.singletonList("from-task1"); }
+ };
+ Task task2 = new Task() {
+ @Override public List apply(List b) { return b; }
+ @Override public List drain() { return Collections.singletonList("from-task2"); }
+ };
+
+ // task2.apply(["from-task1"]) = ["from-task1"], task2.drain() = ["from-task2"]
+ List drain = task1.then(task2).drain();
+ Assert.assertEquals(Arrays.asList("from-task1", "from-task2"), drain);
+ }
+
+ // ===== tee(mainTask, sideTask) =====
+
+ @Test
+ public void testTeeMainOutputPropagated() throws Exception {
+ Task main = Task.forEach(String::length);
+ Task side = Task.forEach(s -> {
+ return s.toUpperCase();
+ }); // output discarded
+
+ List result = Task.tee(main, side).apply(Arrays.asList("hello", "world"));
+ Assert.assertEquals(Arrays.asList(5, 5), result);
+ }
+
+ @Test
+ public void testTeeSideTaskReceivesSameInputAsBatch() throws Exception {
+ List sideInput = new ArrayList<>();
+ Task main = Task.forEach(String::length);
+ Task side = Task.forEach(s -> { sideInput.add(s); return s; });
+
+ Task.tee(main, side).apply(Arrays.asList("hello", "world"));
+ Assert.assertEquals(Arrays.asList("hello", "world"), sideInput);
+ }
+
+ @Test
+ public void testTeePrePostAndDrainCalledOnBothTasks() throws Exception {
+ AtomicBoolean preMain = new AtomicBoolean();
+ AtomicBoolean postMain = new AtomicBoolean();
+ AtomicBoolean preSide = new AtomicBoolean();
+ AtomicBoolean postSide = new AtomicBoolean();
+ AtomicBoolean drainSide = new AtomicBoolean();
+
+ Task main = new Task() {
+ @Override public void pre() { preMain.set(true); }
+ @Override public List apply(List b) { return b; }
+ @Override public void post() { postMain.set(true); }
+ };
+ Task side = new Task() {
+ @Override public void pre() { preSide.set(true); }
+ @Override public List apply(List b) { return b; }
+ @Override public List drain() { drainSide.set(true); return Collections.emptyList(); }
+ @Override public void post() { postSide.set(true); }
+ };
+
+ Task tee = Task.tee(main, side);
+ tee.pre();
+ tee.drain();
+ tee.post();
+
+ Assert.assertTrue(preMain.get());
+ Assert.assertTrue(preSide.get());
+ Assert.assertTrue(postMain.get());
+ Assert.assertTrue(postSide.get());
+ Assert.assertTrue(drainSide.get());
+ }
+
+ @Test
+ public void testTeeSideExceptionPropagates() throws Exception {
+ Task main = Task.forEach(String::length);
+ Task side = batch -> { throw new RuntimeException("side failed"); };
+
+ try {
+ Task.tee(main, side).apply(Collections.singletonList("x"));
+ Assert.fail("Expected exception from side task");
+ } catch (RuntimeException e) {
+ Assert.assertEquals("side failed", e.getMessage());
+ }
+ }
+
+ // ===== join (deprecated alias for tee) =====
+
+ @Test
+ public void testJoinDelegatesToTee() throws Exception {
+ List sideInput = new ArrayList<>();
+ Task main = Task.forEach(String::length);
+ Task