diff --git a/CHANGELOG.md b/CHANGELOG.md index b2431f6c..f44644ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## XX.XX.XX +* Fixed a bug where the request queue would stall after sending the first request, preventing subsequent persisted requests from being sent. + ## 24.1.5 * Fixed a bug where a non-JSON server response would cause a permanent networking deadlock, preventing all subsequent requests from being sent. * Fixed a bug where a NullPointerException in SDKCore.recover() would permanently block SDK initialization when a crash file from a previous session existed on disk. diff --git a/sdk-java/src/main/java/ly/count/sdk/java/internal/Tasks.java b/sdk-java/src/main/java/ly/count/sdk/java/internal/Tasks.java index 6eb7cb8c..0f279c4d 100644 --- a/sdk-java/src/main/java/ly/count/sdk/java/internal/Tasks.java +++ b/sdk-java/src/main/java/ly/count/sdk/java/internal/Tasks.java @@ -92,12 +92,9 @@ Future run(final Task task, final Callback callback) { @Override public T call() throws Exception { running = task.id; + T result; try { - T result = task.call(); - if (callback != null) { - callback.call(result); - } - return result; + result = task.call(); } finally { synchronized (pending) { if (!task.id.equals(0L)) { @@ -106,6 +103,10 @@ public T call() throws Exception { running = null; } } + if (callback != null) { + callback.call(result); + } + return result; } }); diff --git a/sdk-java/src/test/java/ly/count/sdk/java/internal/ScenarioRequestQueueStallTests.java b/sdk-java/src/test/java/ly/count/sdk/java/internal/ScenarioRequestQueueStallTests.java new file mode 100644 index 00000000..5f4c46f4 --- /dev/null +++ b/sdk-java/src/test/java/ly/count/sdk/java/internal/ScenarioRequestQueueStallTests.java @@ -0,0 +1,136 @@ +package ly.count.sdk.java.internal; + +import com.sun.net.httpserver.HttpServer; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import ly.count.sdk.java.Config; +import ly.count.sdk.java.Countly; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * End-to-end reproducer for GitHub issue #271: + * "Request queue stalls after the first request in SDK 24.1.5". + * + * The race the user hit in production: while request #1 is in-flight, the SDK + * queues additional requests to disk. When #1 completes, its callback inside + * Tasks.java re-enters DefaultNetworking.check() — which short-circuits if + * tasks.isRunning() returns true. In 24.1.5 the callback fired before + * `running` was cleared, so check() saw isRunning()=true and skipped + * scheduling the next request. The queue silently stopped draining. + * + * To reproduce deterministically we hold request #1 open with a CountDownLatch + * while we generate a backlog, then release it and assert the backlog drains + * without any external trigger. + */ +@RunWith(JUnit4.class) +public class ScenarioRequestQueueStallTests { + + private HttpServer server; + private int port; + + @Before + public void setUp() { + TestUtils.createCleanTestState(); + } + + @After + public void tearDown() { + Countly.instance().halt(); + if (server != null) { + server.stop(0); + } + } + + private Config configForLocalServer() { + return new Config("http://localhost:" + port, TestUtils.SERVER_APP_KEY, TestUtils.getTestSDirectory()) + .setLoggingLevel(Config.LoggingLevel.VERBOSE) + .setDeviceIdStrategy(Config.DeviceIdStrategy.UUID) + .enableFeatures(Config.Feature.Events, Config.Feature.Sessions) + .setEventQueueSizeToSend(1); + } + + /** + * Reproduces the user-reported symptom: with multiple requests piled up + * on disk while the network is busy, the queue must drain without + * external prompting once the network is free again. + * + * Buggy 24.1.5 Tasks.java: only request #1 reaches the server. + * Fixed: all queued requests reach the server. + */ + @Test + public void backloggedRequests_drainAfterInFlightCompletes() throws Exception { + AtomicInteger requestCount = new AtomicInteger(0); + CountDownLatch firstRequestArrived = new CountDownLatch(1); + CountDownLatch releaseFirstRequest = new CountDownLatch(1); + + server = HttpServer.create(new InetSocketAddress(0), 0); + port = server.getAddress().getPort(); + server.createContext("/", exchange -> { + int n = requestCount.incrementAndGet(); + if (n == 1) { + // Hold request #1 open until the test has built up a backlog. + firstRequestArrived.countDown(); + try { + releaseFirstRequest.await(10, TimeUnit.SECONDS); + } catch (InterruptedException ignored) { + } + } + String body = "{\"result\":\"Success\"}"; + exchange.sendResponseHeaders(200, body.length()); + OutputStream os = exchange.getResponseBody(); + os.write(body.getBytes()); + os.close(); + }); + server.start(); + + Countly.instance().init(configForLocalServer()); + Countly.session().begin(); + + // Wait until request #1 has reached the server and is being held. + Assert.assertTrue( + "request #1 should reach the server within 5s", + firstRequestArrived.await(5, TimeUnit.SECONDS) + ); + + // Build up a backlog: each recordEvent flushes a new request to disk. + // While the server holds #1, all of these queue up because + // DefaultNetworking.check() short-circuits on isRunning() == true. + final int backlogSize = 5; + for (int i = 0; i < backlogSize; i++) { + Countly.instance().events().recordEvent("backlog_evt_" + i); + } + + // Give the event flushes time to actually write request files to disk. + Thread.sleep(500); + + // Release #1. From this point on no external code calls check() — + // the queue must self-drain via the callback re-entry path that + // issue #271 broke. + releaseFirstRequest.countDown(); + + // Poll for drain. Total expected = 1 (begin_session) + backlogSize. + // Use >= because device-id resolution or merge requests may add extras; + // the regression is "queue stops at 1", so any number > 1 + a generous + // wait is the meaningful signal. + int expectedMinimum = 1 + backlogSize; + long deadline = System.currentTimeMillis() + 10_000; + while (System.currentTimeMillis() < deadline && requestCount.get() < expectedMinimum) { + Thread.sleep(100); + } + + Assert.assertTrue( + "request queue should drain to >= " + expectedMinimum + " requests " + + "without external check() calls — got " + requestCount.get() + + " (queue stalled if << expected)", + requestCount.get() >= expectedMinimum + ); + } +} diff --git a/sdk-java/src/test/java/ly/count/sdk/java/internal/TasksExceptionRecoveryTests.java b/sdk-java/src/test/java/ly/count/sdk/java/internal/TasksExceptionRecoveryTests.java index af67ec05..b94e15f6 100644 --- a/sdk-java/src/test/java/ly/count/sdk/java/internal/TasksExceptionRecoveryTests.java +++ b/sdk-java/src/test/java/ly/count/sdk/java/internal/TasksExceptionRecoveryTests.java @@ -1,6 +1,8 @@ package ly.count.sdk.java.internal; import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -131,10 +133,19 @@ public Boolean call() { Assert.assertFalse("Executor should not be running", tasks.isRunning()); } + private static Object getField(Object target, String fieldName) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return field.get(target); + } + /** * When a callback throws an exception, the executor should still recover - * and not deadlock. The callback runs inside the try block, so its exception - * is caught by the finally block. + * and not deadlock. After the issue #271 fix the callback runs outside the + * try/finally, so `running = null` has already executed before the throw — + * the exception propagates into the Future without leaving the executor + * stuck. This test uses id=0L; for the `pending.remove` branch on a + * non-zero-id task see callbackExceptionDoesNotLeakPendingForIdTask. */ @Test public void callbackException_executorRecovers() throws Exception { @@ -287,4 +298,101 @@ public Boolean call() throws Exception { tasks.isRunning() ); } + + /** + * After a task with a non-zero id throws, the `pending` map must be empty. + * Locks the cleanup contract directly rather than inferring it from dedup + * behavior — guards against future refactors that move pending.remove() + * out of the finally block. + */ + @Test + public void pendingIsEmptyAfterIdTaskThrows() throws Exception { + Long taskId = 42L; + + tasks.run(new Tasks.Task(taskId) { + @Override + public Boolean call() { + throw new RuntimeException("Simulated failure"); + } + }); + + tasks.await(); + + Map pending = (Map) getField(tasks, "pending"); + synchronized (pending) { + Assert.assertTrue("pending map should be empty after task throws", pending.isEmpty()); + } + } + + /** + * Issue #271 + #264 interaction: when a callback throws on a task with a + * non-zero id, both `running` and `pending` must be cleared. The existing + * callbackException_executorRecovers test uses id=0L, so it never exercises + * the `pending.remove(task.id)` branch — this test does. + */ + @Test + public void callbackExceptionDoesNotLeakPendingForIdTask() throws Exception { + Long taskId = 99L; + + tasks.run(new Tasks.Task(taskId) { + @Override + public Boolean call() { + return true; + } + }, result -> { + throw new RuntimeException("Simulated callback failure"); + }); + + tasks.await(); + + Assert.assertFalse("running should be cleared after callback throws", tasks.isRunning()); + Map pending = (Map) getField(tasks, "pending"); + synchronized (pending) { + Assert.assertTrue("pending should be empty after callback throws on id-task", pending.isEmpty()); + } + } + + /** + * Issue #271 + #264 recovery path: after a callback throws (e.g. transient + * parse error in DefaultNetworking.check), a subsequent task's callback + * must still be able to re-enter the scheduler and queue the next request. + * This is the production scenario — taking down the request queue requires + * BOTH "callback A throws" AND "callback B can no longer reschedule". + */ + @Test + public void callbackThrows_subsequentCallbackCanReschedule() throws Exception { + // Task A: succeeds, callback throws. + tasks.run(new Tasks.Task(0L) { + @Override + public Boolean call() { + return true; + } + }, result -> { + throw new RuntimeException("Simulated callback A failure"); + }); + tasks.await(); + + // Task B: succeeds, callback re-enters scheduler to run task C + // (mirrors DefaultNetworking.check() recovering after a failed callback). + CountDownLatch taskCRan = new CountDownLatch(1); + tasks.run(new Tasks.Task(0L) { + @Override + public Boolean call() { + return true; + } + }, paramB -> { + if (!tasks.isRunning()) { + tasks.run(new Tasks.Task(0L) { + @Override + public Boolean call() { + taskCRan.countDown(); + return true; + } + }); + } + }); + + Assert.assertTrue("task C should run — recovery path after a failed callback must still allow re-entry", + taskCRan.await(2, TimeUnit.SECONDS)); + } } diff --git a/sdk-java/src/test/java/ly/count/sdk/java/internal/TasksTests.java b/sdk-java/src/test/java/ly/count/sdk/java/internal/TasksTests.java index 6cbf8825..8d13f53d 100644 --- a/sdk-java/src/test/java/ly/count/sdk/java/internal/TasksTests.java +++ b/sdk-java/src/test/java/ly/count/sdk/java/internal/TasksTests.java @@ -1,8 +1,10 @@ package ly.count.sdk.java.internal; import java.lang.reflect.Field; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -76,6 +78,65 @@ public Object call() { Assert.assertEquals(Boolean.TRUE, called[1]); } + /** + * Regression for issue #271, end-to-end: mirrors the + * DefaultNetworking.check() pattern where task A's callback re-enters the + * scheduler with `if (!isRunning()) tasks.run(taskB)`. The actual user-facing + * failure was not "isRunning() reports the wrong value" but "the next request + * never runs". This test proves task B is both scheduled AND executed. + */ + @Test + public void testCallbackCanScheduleAndRunNextTask() throws Exception { + final CountDownLatch taskBRan = new CountDownLatch(1); + final boolean[] taskBScheduled = { false }; + + tasks.run(new Tasks.Task(0L) { + @Override + public Integer call() { + return 1; + } + }, paramA -> { + if (!tasks.isRunning()) { + taskBScheduled[0] = true; + tasks.run(new Tasks.Task(0L) { + @Override + public Integer call() { + taskBRan.countDown(); + return 2; + } + }); + } + }); + + Assert.assertTrue("task B should have been scheduled and run by callback A", + taskBRan.await(2, TimeUnit.SECONDS)); + Assert.assertTrue("callback A should have observed isRunning() == false and entered the schedule branch", + taskBScheduled[0]); + } + + /** + * Regression for issue #271: callback must observe isRunning() == false + * so that a callback re-entering the scheduler (e.g. DefaultNetworking.check) + * can schedule the next task. In 24.1.5 the callback was invoked before + * `running` was cleared, deadlocking the request queue. + */ + @Test + public void testCallbackSeesIsRunningFalse() throws Exception { + final Boolean[] runningInsideCallback = new Boolean[] { null }; + + tasks.run(new Tasks.Task(0L) { + @Override + public Integer call() { + return 1; + } + }, param -> runningInsideCallback[0] = tasks.isRunning()); + + tasks.await(); + + Assert.assertNotNull("callback was not invoked", runningInsideCallback[0]); + Assert.assertEquals(Boolean.FALSE, runningInsideCallback[0]); + } + @Test public void testTaskIdsWork() throws Exception { final int result = 123;