Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import static io.lettuce.TestTags.INTEGRATION_TEST;
import static org.assertj.core.api.Assertions.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

import javax.inject.Inject;

Expand Down Expand Up @@ -463,6 +466,105 @@ public void message(RedisClusterNode node, String pattern, String channel, Strin
assertThat(nodes.poll()).isNull();
}

@Test
void testKeyspaceNotifications() throws Exception {
// This test demonstrates the documented workaround for keyspace notifications in cluster mode
// See: https://redis.github.io/lettuce/user-guide/pubsub/#redis-cluster

// Custom listener to capture complete keyspace notification events
class KeyspaceEvent {

final RedisClusterNode node;

final String pattern;

final String channel;

final String message;

KeyspaceEvent(RedisClusterNode node, String pattern, String channel, String message) {
this.node = node;
this.pattern = pattern;
this.channel = channel;
this.message = message;
}

String extractKey() {
// Extract key name from channel: __keyspace@0__:keyname -> keyname
return channel.substring("__keyspace@0__:".length());
}

}

List<KeyspaceEvent> events = new ArrayList<>();
RedisClusterPubSubAdapter<String, String> keyspaceListener = new RedisClusterPubSubAdapter<String, String>() {

@Override
public void message(RedisClusterNode node, String pattern, String channel, String message) {
events.add(new KeyspaceEvent(node, pattern, channel, message));
}

};

try {
// Step 1: Enable keyspace notifications on all master nodes
// KEA = Keyspace events + Keyevent events + All commands
connection.sync().upstream().commands().configSet("notify-keyspace-events", "KEA");

// Step 2: Add dedicated listener for this test
pubSubConnection.addListener(keyspaceListener);

// Step 3: Enable node message propagation (critical for cluster-wide notifications)
pubSubConnection.setNodeMessagePropagation(true);

// Step 4: Subscribe to keyspace notifications on ALL master nodes (not replicas)
pubSubConnection.sync().upstream().commands().psubscribe("__keyspace@0__:*");

// Wait for subscription to be established (subscription confirmations go to regular listener)
Wait.untilTrue(() -> !events.isEmpty() || connectionListener.getPatterns().peek() != null).waitOrTimeout();
connectionListener.getPatterns().poll(); // Consume subscription confirmation

// Step 5: Perform operations on keys that will be distributed to different nodes
// Use hash tags to ensure keys go to different slots/nodes
// {a} and {b} will hash to different slots in a multi-node cluster
String key1 = "{a}keyspace-test";
String key2 = "{b}keyspace-test";

connection.sync().set(key1, "value1");
connection.sync().set(key2, "value2");

// Step 6: Wait for notifications and verify
// Note: {a} hashes to slot 15495 (node 7380), {b} hashes to slot 3300 (node 7379)
Wait.untilEquals(2, events::size).waitOrTimeout();

// Verify exact count of events
assertThat(events).hasSize(2);

// Verify all events have the correct pattern (what we subscribed to)
assertThat(events).allMatch(event -> "__keyspace@0__:*".equals(event.pattern),
"All events should match pattern __keyspace@0__:*");

// Verify all events have the correct message (operation type)
assertThat(events).allMatch(event -> "set".equals(event.message), "All events should be 'set' operations");

// Extract keys from channels
List<String> keys = events.stream().map(KeyspaceEvent::extractKey).collect(Collectors.toList());
assertThat(keys).containsExactlyInAnyOrder(key1, key2);

// Verify channels contain the full keyspace notification format
List<String> channels = events.stream().map(event -> event.channel).collect(Collectors.toList());
assertThat(channels).containsExactlyInAnyOrder("__keyspace@0__:" + key1, "__keyspace@0__:" + key2);

// Verify events came from different cluster nodes (proving multi-node distribution)
Set<String> nodeIds = events.stream().map(event -> event.node.getNodeId()).collect(Collectors.toSet());
assertThat(nodeIds).hasSize(2).as("Events should come from 2 different cluster nodes");

} finally {
// Clean up: disable keyspace notifications
connection.sync().upstream().commands().configSet("notify-keyspace-events", "");
}
}

private RedisClusterNode getOtherThan(String nodeId) {
for (RedisClusterNode redisClusterNode : clusterClient.getPartitions()) {
if (redisClusterNode.getNodeId().equals(nodeId)) {
Expand Down
Loading