diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index 4ce2cc2e0..4b0cab61f 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -28,9 +28,9 @@ import io.vertx.pgclient.impl.codec.NoticeResponse; import io.vertx.pgclient.impl.codec.TxFailedEvent; import io.vertx.pgclient.spi.PgDriver; -import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.codec.SocketConnectionBase; import io.vertx.sqlclient.internal.SqlConnectionBase; +import io.vertx.sqlclient.spi.connection.Connection; public class PgConnectionImpl extends SqlConnectionBase implements PgConnection { @@ -67,10 +67,11 @@ public void handleEvent(Object event) { Handler handler = notificationHandler; if (handler != null) { Notification notification = (Notification) event; - handler.handle(new PgNotification() + PgNotification pgNotification = new PgNotification() .setChannel(notification.getChannel()) .setProcessId(notification.getProcessId()) - .setPayload(notification.getPayload())); + .setPayload(notification.getPayload()); + context.duplicate().emit(pgNotification, handler); } } else if (event instanceof NoticeResponse) { Handler handler = noticeHandler; @@ -94,8 +95,7 @@ public void handleEvent(Object event) { .setDataType(noticeEvent.getDataType()) .setConstraint(noticeEvent.getConstraint()); if (handler != null) { - handler.handle(notice - ); + context.duplicate().emit(notice, handler); } else { notice.log(SocketConnectionBase.logger); } diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PubSubTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PubSubTest.java index 751ce9fd7..af53a6bb5 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PubSubTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PubSubTest.java @@ -18,6 +18,7 @@ import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; +import io.vertx.core.internal.ContextInternal; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.pgclient.PgConnectOptions; @@ -32,9 +33,12 @@ import java.util.Arrays; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static io.vertx.core.internal.ContextInternal.LOCAL_MAP; + public class PubSubTest extends PgTestBase { Vertx vertx; @@ -361,4 +365,52 @@ public void testNoticedRaised(TestContext ctx) { })); })); } + + @Test + public void testSubscriberEmitsOnDuplicatedContext(TestContext ctx) { + String channelName = "test_channel"; + String quotedChannelName = "\"" + channelName + "\""; + + // Create a duplicated context with local data BEFORE creating the subscriber + ContextInternal originalContext = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicatedContext = originalContext.duplicate(); + duplicatedContext.getLocal(LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + + Async connectLatch = ctx.async(); + + duplicatedContext.runOnContext(v -> { + ctx.assertEquals("bar", ContextInternal.current().getLocal(LOCAL_MAP, ConcurrentHashMap::new).get("foo")); + + subscriber = PgSubscriber.subscriber(vertx, options); + subscriber.connect().onComplete(v2 -> connectLatch.complete()); + }); + + connectLatch.awaitSuccess(10000); + + Async notificationLatch = ctx.async(); + + duplicatedContext.runOnContext(v -> { + ctx.assertEquals("bar", ContextInternal.current().getLocal(LOCAL_MAP, ConcurrentHashMap::new).get("foo")); + + PgChannel channel = subscriber.channel(channelName); + channel.handler(notif -> { + ContextInternal currentContext = ContextInternal.current(); + ctx.assertTrue(currentContext.isDuplicate()); + ctx.assertNotEquals(duplicatedContext, currentContext); + ctx.assertNull(currentContext.getLocal(LOCAL_MAP, ConcurrentHashMap::new).get("foo")); + ctx.assertEquals("msg", notif); + notificationLatch.countDown(); + }); + }); + + vertx.setTimer(100, t -> { + subscriber + .actualConnection() + .query("NOTIFY " + quotedChannelName + ", 'msg'") + .execute() + .onComplete(ctx.asyncAssertSuccess()); + }); + + notificationLatch.awaitSuccess(10000); + } }