From 25b3821d9de03faa932681bd281ab2f4ea440cbb Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Fri, 13 Feb 2026 12:21:11 +0100 Subject: [PATCH] Emit PgNotification on a duplicated context Currently, notifications are emitted on the context captured when the connection is created. If a PgSubscriber was created while running on a DuplicatedContext, this would be context of emissions. This is wrong and could cause local data (e.g. tracing) to be still present when notification handlers are executed. This change consists in duplicating the connection context before emitting notifications. Signed-off-by: Thomas Segismont --- .../vertx/pgclient/impl/PgConnectionImpl.java | 10 ++-- .../io/vertx/tests/pgclient/PubSubTest.java | 52 +++++++++++++++++++ 2 files changed, 57 insertions(+), 5 deletions(-) diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index 4ce2cc2e0..4b0cab61f 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -28,9 +28,9 @@ import io.vertx.pgclient.impl.codec.NoticeResponse; import io.vertx.pgclient.impl.codec.TxFailedEvent; import io.vertx.pgclient.spi.PgDriver; -import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.codec.SocketConnectionBase; import io.vertx.sqlclient.internal.SqlConnectionBase; +import io.vertx.sqlclient.spi.connection.Connection; public class PgConnectionImpl extends SqlConnectionBase implements PgConnection { @@ -67,10 +67,11 @@ public void handleEvent(Object event) { Handler handler = notificationHandler; if (handler != null) { Notification notification = (Notification) event; - handler.handle(new PgNotification() + PgNotification pgNotification = new PgNotification() .setChannel(notification.getChannel()) .setProcessId(notification.getProcessId()) - .setPayload(notification.getPayload())); + .setPayload(notification.getPayload()); + context.duplicate().emit(pgNotification, handler); } } else if (event instanceof NoticeResponse) { Handler handler = noticeHandler; @@ -94,8 +95,7 @@ public void handleEvent(Object event) { .setDataType(noticeEvent.getDataType()) .setConstraint(noticeEvent.getConstraint()); if (handler != null) { - handler.handle(notice - ); + context.duplicate().emit(notice, handler); } else { notice.log(SocketConnectionBase.logger); } diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PubSubTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PubSubTest.java index 751ce9fd7..af53a6bb5 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PubSubTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PubSubTest.java @@ -18,6 +18,7 @@ import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; +import io.vertx.core.internal.ContextInternal; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.pgclient.PgConnectOptions; @@ -32,9 +33,12 @@ import java.util.Arrays; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static io.vertx.core.internal.ContextInternal.LOCAL_MAP; + public class PubSubTest extends PgTestBase { Vertx vertx; @@ -361,4 +365,52 @@ public void testNoticedRaised(TestContext ctx) { })); })); } + + @Test + public void testSubscriberEmitsOnDuplicatedContext(TestContext ctx) { + String channelName = "test_channel"; + String quotedChannelName = "\"" + channelName + "\""; + + // Create a duplicated context with local data BEFORE creating the subscriber + ContextInternal originalContext = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicatedContext = originalContext.duplicate(); + duplicatedContext.getLocal(LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + + Async connectLatch = ctx.async(); + + duplicatedContext.runOnContext(v -> { + ctx.assertEquals("bar", ContextInternal.current().getLocal(LOCAL_MAP, ConcurrentHashMap::new).get("foo")); + + subscriber = PgSubscriber.subscriber(vertx, options); + subscriber.connect().onComplete(v2 -> connectLatch.complete()); + }); + + connectLatch.awaitSuccess(10000); + + Async notificationLatch = ctx.async(); + + duplicatedContext.runOnContext(v -> { + ctx.assertEquals("bar", ContextInternal.current().getLocal(LOCAL_MAP, ConcurrentHashMap::new).get("foo")); + + PgChannel channel = subscriber.channel(channelName); + channel.handler(notif -> { + ContextInternal currentContext = ContextInternal.current(); + ctx.assertTrue(currentContext.isDuplicate()); + ctx.assertNotEquals(duplicatedContext, currentContext); + ctx.assertNull(currentContext.getLocal(LOCAL_MAP, ConcurrentHashMap::new).get("foo")); + ctx.assertEquals("msg", notif); + notificationLatch.countDown(); + }); + }); + + vertx.setTimer(100, t -> { + subscriber + .actualConnection() + .query("NOTIFY " + quotedChannelName + ", 'msg'") + .execute() + .onComplete(ctx.asyncAssertSuccess()); + }); + + notificationLatch.awaitSuccess(10000); + } }