Skip to content

Commit 5161c82

Browse files
author
ehennum
committed
#1296 reuse bufferable handle converted from input for retry
1 parent 2c28da2 commit 5161c82

File tree

6 files changed

+24
-20
lines changed

6 files changed

+24
-20
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/IOCallerImpl.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,6 @@ BaseProxy.DBFunctionRequest makeRequest(DatabaseClient db, CallContextImpl<I,O>
190190
return makeRequest(db, callCtxt, (RESTServices.CallField) null);
191191
}
192192
BaseProxy.DBFunctionRequest makeRequest(
193-
DatabaseClient db, CallContextImpl<I,O> callCtxt, I[] input
194-
) {
195-
return makeRequest(db, callCtxt, bufferableInputHandleOn(input));
196-
}
197-
private BaseProxy.DBFunctionRequest makeRequest(
198193
DatabaseClient db, CallContextImpl<I,O> callCtxt, BufferableContentHandle<?,?>[] input
199194
) {
200195
RESTServices.CallField inputField = null;

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/InputCallerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.marklogic.client.dataservices.impl;
1717

1818
import com.marklogic.client.DatabaseClient;
19+
import com.marklogic.client.io.marker.BufferableContentHandle;
1920
import com.marklogic.client.io.marker.JSONWriteHandle;
2021

2122
final public class InputCallerImpl<I,O> extends IOCallerImpl<I,O> {
@@ -38,7 +39,7 @@ public InputCallerImpl(JSONWriteHandle apiDeclaration, HandleProvider<I,O> handl
3839
}
3940
}
4041

41-
public void arrayCall(DatabaseClient db, CallContextImpl<I,O> callCtxt, I[] input) {
42+
public void arrayCall(DatabaseClient db, CallContextImpl<I,O> callCtxt, BufferableContentHandle<?,?>[] input) {
4243
responseWithState(makeRequest(db, callCtxt, input), callCtxt);
4344
}
4445
}

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/InputEndpointImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.LinkedBlockingQueue;
2121

2222
import com.marklogic.client.dataservices.InputCaller;
23+
import com.marklogic.client.io.marker.BufferableContentHandle;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
2526

@@ -58,7 +59,8 @@ public void call(I[] input) {
5859
@Override
5960
public void call(CallContext callContext, I[] input) {
6061
InputCallerImpl<I,O> callerImpl = getCaller();
61-
callerImpl.arrayCall(getClient(), checkAllowedArgs(callContext), input);
62+
BufferableContentHandle<?,?>[] inputHandles = callerImpl.bufferableInputHandleOn(input);
63+
callerImpl.arrayCall(getClient(), checkAllowedArgs(callContext), inputHandles);
6264
}
6365

6466
@Deprecated
@@ -196,10 +198,11 @@ private void processInput(CallContextImpl<I,O> callContext, I[] inputBatch) {
196198

197199
ErrorDisposition error = ErrorDisposition.RETRY;
198200

201+
BufferableContentHandle<?,?>[] inputHandles = callerImpl.bufferableInputHandleOn(inputBatch);
199202
for (int retryCount = 0; retryCount < DEFAULT_MAX_RETRIES && error == ErrorDisposition.RETRY; retryCount++) {
200203
Throwable throwable = null;
201204
try {
202-
getEndpoint().getCaller().arrayCall(callContext.getClient(), callContext, inputBatch);
205+
getEndpoint().getCaller().arrayCall(callContext.getClient(), callContext, inputHandles);
203206
incrementCallCount();
204207
return;
205208
} catch (Throwable catchedThrowable) {
@@ -211,11 +214,10 @@ private void processInput(CallContextImpl<I,O> callContext, I[] inputBatch) {
211214
logger.error("No error listener set. Stop all calls. " + getEndpoint().getEndpointPath(), throwable);
212215
error = ErrorDisposition.STOP_ALL_CALLS;
213216
} else {
214-
215217
try {
216218
if (retryCount < DEFAULT_MAX_RETRIES - 1) {
217219
error = getErrorListener().processError(
218-
retryCount, throwable, callContext, callerImpl.bufferableInputHandleOn(inputBatch)
220+
retryCount, throwable, callContext, inputHandles
219221
);
220222
} else {
221223
error = ErrorDisposition.SKIP_CALL;

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/InputOutputCallerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.marklogic.client.dataservices.impl;
1717

1818
import com.marklogic.client.DatabaseClient;
19+
import com.marklogic.client.io.marker.BufferableContentHandle;
1920
import com.marklogic.client.io.marker.JSONWriteHandle;
2021

2122
final public class InputOutputCallerImpl<I,O> extends IOCallerImpl<I,O> {
@@ -34,7 +35,7 @@ public InputOutputCallerImpl(JSONWriteHandle apiDeclaration, HandleProvider<I,O>
3435
}
3536
}
3637

37-
public O[] arrayCall(DatabaseClient db, CallContextImpl<I,O> callCtxt, I[] input) {
38+
public O[] arrayCall(DatabaseClient db, CallContextImpl<I,O> callCtxt, BufferableContentHandle<?,?>[] input) {
3839
return responseMultipleAsArray(makeRequest(db, callCtxt, input), callCtxt);
3940
}
4041
}

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/InputOutputEndpointImpl.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.marklogic.client.DatabaseClient;
1919
import com.marklogic.client.SessionState;
2020
import com.marklogic.client.dataservices.InputOutputCaller;
21+
import com.marklogic.client.io.marker.BufferableContentHandle;
2122
import com.marklogic.client.io.marker.JSONWriteHandle;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
@@ -99,9 +100,8 @@ public BulkInputOutputCaller<I,O> bulkCaller(CallContext[] callContexts, int thr
99100

100101
private O[] getResponseData(CallContext callContext, I[] input) {
101102
InputOutputCallerImpl<I,O> callerImpl = getCaller();
102-
return callerImpl.arrayCall(
103-
getClient(), checkAllowedArgs(callContext), input
104-
);
103+
BufferableContentHandle<?,?>[] inputHandles = callerImpl.bufferableInputHandleOn(input);
104+
return callerImpl.arrayCall(getClient(), checkAllowedArgs(callContext), inputHandles);
105105
}
106106

107107
static public class BulkInputOutputCallerImpl<I,O> extends IOEndpointImpl.BulkIOEndpointCallerImpl<I,O>
@@ -198,11 +198,12 @@ private void processInput(CallContextImpl<I,O> callContext, I[] inputBatch) {
198198

199199
ErrorDisposition error = ErrorDisposition.RETRY;
200200

201+
BufferableContentHandle<?,?>[] inputHandles = callerImpl.bufferableInputHandleOn(inputBatch);
201202
for (int retryCount = 0; retryCount < DEFAULT_MAX_RETRIES && error == ErrorDisposition.RETRY; retryCount++) {
202203
Throwable throwable = null;
203204
O[] output = null;
204205
try {
205-
output = callerImpl.arrayCall(callContext.getClient(), callContext, inputBatch);
206+
output = callerImpl.arrayCall(callContext.getClient(), callContext, inputHandles);
206207

207208
incrementCallCount();
208209
processOutputBatch(output, getOutputListener());
@@ -216,7 +217,7 @@ private void processInput(CallContextImpl<I,O> callContext, I[] inputBatch) {
216217
try {
217218
if (retryCount < DEFAULT_MAX_RETRIES - 1) {
218219
error = getErrorListener().processError(
219-
retryCount, throwable, callContext, callerImpl.bufferableInputHandleOn(inputBatch)
220+
retryCount, throwable, callContext, inputHandles
220221
);
221222
} else {
222223
error = ErrorDisposition.SKIP_CALL;

marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/ErrorListenerInputOutputEndpointTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,17 @@ public void testInputOutputCallerWithRetry() {
9191

9292

9393
InputOutputCaller.BulkInputOutputCaller.ErrorListener errorListener =
94-
(retryCount, throwable, callContext, inputHandles)
95-
-> IOEndpoint.BulkIOEndpointCaller.ErrorDisposition.RETRY;
94+
(retryCount, throwable, callContext, inputHandles) -> {
95+
// System.out.println("retry count is "+retryCount);
96+
return IOEndpoint.BulkIOEndpointCaller.ErrorDisposition.RETRY;
97+
};
9698
bulkCaller.setErrorListener(errorListener);
9799

98100

99-
bulkCaller.setOutputListener(value -> {output.add(NodeConverter.InputStreamToString(value));
100-
//System.out.println("value is "+value);
101+
bulkCaller.setOutputListener(value -> {
102+
String v = NodeConverter.InputStreamToString(value);
103+
output.add(v);
104+
// System.out.println("value is "+v);
101105
});
102106

103107
input.stream().forEach(value -> bulkCaller.accept(IOTestUtil.asInputStream(value)));

0 commit comments

Comments
 (0)