Skip to content

Commit db93265

Browse files
committed
Addressing the encoding issues
Addressing some general cases
1 parent b5c5335 commit db93265

File tree

5 files changed

+223
-188
lines changed

5 files changed

+223
-188
lines changed

src/main/java/io/lettuce/core/datastructure/queue/IncompleteQueue.java

Lines changed: 0 additions & 84 deletions
This file was deleted.

src/main/java/io/lettuce/core/protocol/CommandEncoder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ private void encode(ChannelHandlerContext ctx, ByteBuf out, RedisCommand<?, ?, ?
7676
try {
7777
out.markWriterIndex();
7878
command.encode(out);
79-
} catch (RuntimeException e) {
80-
out.resetWriterIndex();
81-
command.completeExceptionally(new EncoderException(
82-
"Cannot encode command. Please close the connection as the connection state may be out of sync.", e));
79+
} catch (Throwable e) {
80+
ctx.close();
81+
logger.error("{} Cannot encode command. Closing the connection as the connection state may be out of sync.", logPrefix(ctx.channel()), e);
82+
throw new EncoderException("Cannot encode command. Closing the connection as the connection state may be out of sync.", e);
8383
}
8484

8585
if (debugEnabled) {

src/main/java/io/lettuce/core/protocol/CommandHandler.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import io.lettuce.core.api.push.PushListener;
4747
import io.lettuce.core.api.push.PushMessage;
4848
import io.lettuce.core.datastructure.queue.HashIndexedQueue;
49-
import io.lettuce.core.datastructure.queue.IncompleteQueue;
5049
import io.lettuce.core.internal.LettuceAssert;
5150
import io.lettuce.core.internal.LettuceSets;
5251
import io.lettuce.core.metrics.CommandLatencyRecorder;
@@ -70,6 +69,7 @@
7069
import io.netty.util.internal.logging.InternalLogLevel;
7170
import io.netty.util.internal.logging.InternalLogger;
7271
import io.netty.util.internal.logging.InternalLoggerFactory;
72+
import net.bytebuddy.implementation.bytecode.Throw;
7373

7474
/**
7575
* A netty {@link ChannelHandler} responsible for writing redis commands and reading responses from the server.
@@ -161,11 +161,7 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc
161161
this.commandLatencyRecorder = clientResources.commandLatencyRecorder();
162162
this.latencyMetricsEnabled = commandLatencyRecorder.isEnabled();
163163
this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE;
164-
Queue<RedisCommand<?, ?, ?>> implementation = clientOptions.isUseHashIndexedQueue() ? new HashIndexedQueue<>()
165-
: new ArrayDeque<>();
166-
this.stack = new IncompleteQueue(implementation);
167-
168-
// this.stack = clientOptions.isUseHashIndexedQueue() ? new HashIndexedQueue<>() : new ArrayDeque<>();
164+
this.stack = clientOptions.isUseHashIndexedQueue() ? new HashIndexedQueue<>() : new ArrayDeque<>();
169165

170166
Tracing tracing = clientResources.tracing();
171167

@@ -664,8 +660,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
664660
return;
665661
}
666662

667-
} catch (Exception e) {
668-
663+
} catch (Throwable e) {
669664
ctx.close();
670665
throw e;
671666
}
@@ -690,8 +685,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
690685
decodeBufferPolicy.afterPartialDecode(buffer);
691686
return;
692687
}
693-
} catch (Exception e) {
694-
688+
} catch (Throwable e) {
695689
ctx.close();
696690
throw e;
697691
}
@@ -709,8 +703,9 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
709703
logger.debug("{} Completing command {}", logPrefix(), command);
710704
}
711705
complete(command);
712-
} catch (Exception e) {
706+
} catch (Throwable e) {
713707
logger.warn("{} Unexpected exception during request: {}", logPrefix, e.toString(), e);
708+
command.completeExceptionally(e);
714709
}
715710
}
716711
}

0 commit comments

Comments
 (0)