Skip to content

Commit 0363ff3

Browse files
committed
Ensure async data consumers can avoid NPE if they have been canceled / released from another thread at the same with concurrent data processing
1 parent 024b199 commit 0363ff3

4 files changed

Lines changed: 44 additions & 15 deletions

File tree

httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,27 @@ public void completed(final E entity) {
113113
@Override
114114
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
115115
final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
116-
dataConsumer.updateCapacity(capacityChannel);
116+
if (dataConsumer != null) {
117+
dataConsumer.updateCapacity(capacityChannel);
118+
} else {
119+
capacityChannel.update(Integer.MAX_VALUE);
120+
}
117121
}
118122

119123
@Override
120124
public final void consume(final ByteBuffer src) throws IOException {
121125
final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
122-
dataConsumer.consume(src);
126+
if (dataConsumer != null) {
127+
dataConsumer.consume(src);
128+
}
123129
}
124130

125131
@Override
126132
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
127133
final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
128-
dataConsumer.streamEnd(trailers);
134+
if (dataConsumer != null) {
135+
dataConsumer.streamEnd(trailers);
136+
}
129137
}
130138

131139
@Override

httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractServerExchangeHandler.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,22 +174,27 @@ public void cancelled() {
174174
@Override
175175
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
176176
final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
177-
Asserts.notNull(requestConsumer, "Data consumer");
178-
requestConsumer.updateCapacity(capacityChannel);
177+
if (requestConsumer != null) {
178+
requestConsumer.updateCapacity(capacityChannel);
179+
} else {
180+
capacityChannel.update(Integer.MAX_VALUE);
181+
}
179182
}
180183

181184
@Override
182185
public final void consume(final ByteBuffer src) throws IOException {
183186
final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
184-
Asserts.notNull(requestConsumer, "Data consumer");
185-
requestConsumer.consume(src);
187+
if (requestConsumer != null) {
188+
requestConsumer.consume(src);
189+
}
186190
}
187191

188192
@Override
189193
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
190194
final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
191-
Asserts.notNull(requestConsumer, "Data consumer");
192-
requestConsumer.streamEnd(trailers);
195+
if (requestConsumer != null) {
196+
requestConsumer.streamEnd(trailers);
197+
}
193198
}
194199

195200
@Override

httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/BasicRequestConsumer.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,19 +100,27 @@ public void completed(final T body) {
100100
@Override
101101
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
102102
final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
103-
dataConsumer.updateCapacity(capacityChannel);
103+
if (dataConsumer != null) {
104+
dataConsumer.updateCapacity(capacityChannel);
105+
} else {
106+
capacityChannel.update(Integer.MAX_VALUE);
107+
}
104108
}
105109

106110
@Override
107111
public void consume(final ByteBuffer src) throws IOException {
108112
final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
109-
dataConsumer.consume(src);
113+
if (dataConsumer != null) {
114+
dataConsumer.consume(src);
115+
}
110116
}
111117

112118
@Override
113119
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
114120
final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
115-
dataConsumer.streamEnd(trailers);
121+
if (dataConsumer != null) {
122+
dataConsumer.streamEnd(trailers);
123+
}
116124
}
117125

118126
@Override

httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/BasicResponseConsumer.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,19 +104,27 @@ public void informationResponse(final HttpResponse response, final HttpContext h
104104
@Override
105105
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
106106
final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
107-
dataConsumer.updateCapacity(capacityChannel);
107+
if (dataConsumer != null) {
108+
dataConsumer.updateCapacity(capacityChannel);
109+
} else {
110+
capacityChannel.update(Integer.MAX_VALUE);
111+
}
108112
}
109113

110114
@Override
111115
public void consume(final ByteBuffer src) throws IOException {
112116
final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
113-
dataConsumer.consume(src);
117+
if (dataConsumer != null) {
118+
dataConsumer.consume(src);
119+
}
114120
}
115121

116122
@Override
117123
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
118124
final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
119-
dataConsumer.streamEnd(trailers);
125+
if (dataConsumer != null) {
126+
dataConsumer.streamEnd(trailers);
127+
}
120128
}
121129

122130
@Override

0 commit comments

Comments
 (0)