Skip to content

Commit 79969e7

Browse files
committed
Cookbook example bridging from QueryBatcher to Data Services.
1 parent 5448e8e commit 79969e7

File tree

6 files changed

+268
-0
lines changed

6 files changed

+268
-0
lines changed

marklogic-client-api/build.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
plugins {
2+
id 'com.marklogic.ml-development-tools' version '5.1.0'
3+
}
14
apply plugin: 'maven'
25
apply plugin: 'maven-publish'
36
apply plugin: 'distribution'
@@ -199,3 +202,7 @@ task testServerTeardown(type: JavaExec) {
199202
main = 'com.marklogic.client.test.util.TestServerBootstrapper'
200203
args = ["teardown"]
201204
}
205+
206+
task generateBulkExportServices(type: com.marklogic.client.tools.gradle.EndpointProxiesGenTask) {
207+
serviceDeclarationFile = 'marklogic-client-api/src/main/resources/scripts/bulkExport/service.json'
208+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2020 MarkLogic Corporation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.marklogic.client.example.cookbook.datamovement;
17+
18+
// IMPORTANT: Do not edit. This file is generated.
19+
20+
import java.util.stream.Stream;
21+
import com.marklogic.client.io.Format;
22+
import java.io.Reader;
23+
24+
25+
import com.marklogic.client.DatabaseClient;
26+
import com.marklogic.client.io.marker.JSONWriteHandle;
27+
28+
import com.marklogic.client.impl.BaseProxy;
29+
30+
/**
31+
* Provides a set of operations on the database server
32+
*/
33+
public interface BulkExportServices {
34+
/**
35+
* Creates a BulkExportServices object for executing operations on the database server.
36+
*
37+
* The DatabaseClientFactory class can create the DatabaseClient parameter. A single
38+
* client object can be used for any number of requests and in multiple threads.
39+
*
40+
* @param db provides a client for communicating with the database server
41+
* @return an object for executing database operations
42+
*/
43+
static BulkExportServices on(DatabaseClient db) {
44+
return on(db, null);
45+
}
46+
/**
47+
* Creates a BulkExportServices object for executing operations on the database server.
48+
*
49+
* The DatabaseClientFactory class can create the DatabaseClient parameter. A single
50+
* client object can be used for any number of requests and in multiple threads.
51+
*
52+
* The service declaration uses a custom implementation of the same service instead
53+
* of the default implementation of the service by specifying an endpoint directory
54+
* in the modules database with the implementation. A service.json file with the
55+
* declaration can be read with FileHandle or a string serialization of the JSON
56+
* declaration with StringHandle.
57+
*
58+
* @param db provides a client for communicating with the database server
59+
* @param serviceDeclaration substitutes a custom implementation of the service
60+
* @return an object for executing database operations
61+
*/
62+
static BulkExportServices on(DatabaseClient db, JSONWriteHandle serviceDeclaration) {
63+
final class BulkExportServicesImpl implements BulkExportServices {
64+
private DatabaseClient dbClient;
65+
private BaseProxy baseProxy;
66+
67+
private BaseProxy.DBFunctionRequest req_readJsonDocs;
68+
69+
private BulkExportServicesImpl(DatabaseClient dbClient, JSONWriteHandle servDecl) {
70+
this.dbClient = dbClient;
71+
this.baseProxy = new BaseProxy("/example/cookbook/bulkExport/", servDecl);
72+
73+
this.req_readJsonDocs = this.baseProxy.request(
74+
"readJsonDocs.sjs", BaseProxy.ParameterValuesKind.MULTIPLE_ATOMICS);
75+
}
76+
77+
@Override
78+
public Stream<Reader> readJsonDocs(Stream<String> uris) {
79+
return readJsonDocs(
80+
this.req_readJsonDocs.on(this.dbClient), uris
81+
);
82+
}
83+
private Stream<Reader> readJsonDocs(BaseProxy.DBFunctionRequest request, Stream<String> uris) {
84+
return BaseProxy.JsonDocumentType.toReader(
85+
request
86+
.withParams(
87+
BaseProxy.atomicParam("uris", false, BaseProxy.StringType.fromString(uris))
88+
).responseMultiple(true, Format.JSON)
89+
);
90+
}
91+
}
92+
93+
return new BulkExportServicesImpl(db, serviceDeclaration);
94+
}
95+
96+
/**
97+
* Invokes the readJsonDocs operation on the database server
98+
*
99+
* @param uris provides input
100+
* @return as output
101+
*/
102+
Stream<Reader> readJsonDocs(Stream<String> uris);
103+
104+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright 2020 MarkLogic Corporation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.marklogic.client.example.cookbook.datamovement;
17+
18+
import com.marklogic.client.DatabaseClient;
19+
import com.marklogic.client.datamovement.DataMovementManager;
20+
import com.marklogic.client.datamovement.QueryBatch;
21+
import com.marklogic.client.datamovement.QueryBatchListener;
22+
import com.marklogic.client.datamovement.QueryBatcher;
23+
import com.marklogic.client.document.DocumentWriteSet;
24+
import com.marklogic.client.document.JSONDocumentManager;
25+
import com.marklogic.client.document.TextDocumentManager;
26+
import com.marklogic.client.io.DocumentMetadataHandle;
27+
import com.marklogic.client.io.StringHandle;
28+
import com.marklogic.client.example.cookbook.Util;
29+
import com.marklogic.client.query.*;
30+
31+
import java.io.*;
32+
import java.util.ArrayList;
33+
import java.util.Arrays;
34+
import java.util.List;
35+
import java.util.stream.Stream;
36+
37+
38+
public class BulkExportWithDataService implements QueryBatchListener {
39+
40+
private DatabaseClient dbClient = DatabaseClientSingleton.getAdmin("java-unittest");
41+
private DatabaseClient dbModulesClient = DatabaseClientSingleton.getAdmin("java-unittest-modules");
42+
private DataMovementManager moveMgr = dbClient.newDataMovementManager();
43+
private List<String> urisList = new ArrayList<>();
44+
45+
@Override
46+
public void processEvent(QueryBatch batch) {
47+
}
48+
49+
public static void main(String args[]) throws IOException {
50+
new BulkExportWithDataService().run();
51+
}
52+
53+
private void run() throws IOException {
54+
setup();
55+
exportJson();
56+
tearDown();
57+
}
58+
59+
private void setup() throws IOException {
60+
61+
writeDocuments(100,"BulkExportWithDataService");
62+
writeScriptFile("readJsonDocs.api");
63+
writeScriptFile("readJsonDocs.sjs");
64+
}
65+
66+
private void tearDown(){
67+
68+
QueryManager queryMgr = dbClient.newQueryManager();
69+
QueryManager queryMgrModules = dbModulesClient.newQueryManager();
70+
DeleteQueryDefinition deletedef = queryMgr.newDeleteDefinition();
71+
deletedef.setCollections("BulkExportWithDataService");
72+
queryMgr.delete(deletedef);
73+
queryMgrModules.delete(deletedef);
74+
}
75+
76+
77+
private void exportJson(){
78+
BulkExportServices bulkExportServices = uris -> {
79+
List<Object> list = Arrays.asList(uris.toArray());
80+
for(Object i:list) {
81+
if(!(urisList.contains(i)))
82+
throw new InternalError("urisList does not contain "+i.toString());
83+
urisList.remove(i);
84+
}
85+
return null;
86+
};
87+
88+
StructuredQueryBuilder structuredQueryBuilder = new StructuredQueryBuilder();
89+
structuredQueryBuilder.directory(1,"/example/cookbook/bulkExport/");
90+
QueryBatcher queryBatcher = moveMgr.newQueryBatcher(structuredQueryBuilder.collection("BulkExportWithDataService"))
91+
.withBatchSize(3)
92+
.withThreadCount(3)
93+
.onQueryFailure(batch -> new InternalError("An exception occured in queryBatcher"))
94+
.onUrisReady(batch -> bulkExportServices.readJsonDocs(Stream.of(batch.getItems())));
95+
moveMgr.startJob(queryBatcher);
96+
queryBatcher.awaitCompletion();
97+
moveMgr.stopJob(queryBatcher);
98+
99+
}
100+
101+
private void writeDocuments(int count, String collection) {
102+
JSONDocumentManager manager = dbClient.newJSONDocumentManager();
103+
DocumentMetadataHandle metadata = new DocumentMetadataHandle().withCollections(collection);
104+
105+
for(int i=1;i<=count;i++) {
106+
StringHandle data = new StringHandle("{\"docNum\":"+i+", \"docName\":\"doc"+i+"\"}");
107+
String docId = "/example/cookbook/bulkExport/"+i+".json";
108+
manager.write(docId, metadata, data);
109+
urisList.add(docId);
110+
}
111+
}
112+
113+
private void writeScriptFile(String fileName) throws IOException {
114+
TextDocumentManager modMgr = dbModulesClient.newTextDocumentManager();
115+
DocumentWriteSet writeSet = modMgr.newWriteSet();
116+
DocumentMetadataHandle metadata = new DocumentMetadataHandle().withCollections("BulkExportWithDataService");
117+
metadata.getPermissions().add("rest-writer", DocumentMetadataHandle.Capability.UPDATE, DocumentMetadataHandle.Capability.READ);
118+
metadata.getPermissions().add("rest-reader", DocumentMetadataHandle.Capability.READ);
119+
120+
InputStream in = (Util.openStream("scripts"+ File.separator+"bulkExport"+File.separator+fileName));
121+
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
122+
StringBuilder out = new StringBuilder();
123+
String line;
124+
while ((line = reader.readLine()) != null) {
125+
out.append(line);
126+
}
127+
reader.close();
128+
129+
writeSet.add("/example/cookbook/bulkExport/"+fileName, metadata,
130+
(new StringHandle(out.toString())));
131+
modMgr.write(writeSet);
132+
}
133+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
2+
{
3+
"functionName" : "readJsonDocs",
4+
"params" : [ {
5+
"name" : "uris",
6+
"datatype" : "string",
7+
"multiple" : "true"
8+
} ],
9+
"return" : {
10+
"datatype" : "jsonDocument",
11+
"multiple" : "true",
12+
"nullable" : "true"
13+
}
14+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
'use strict';
2+
const uris = external.uris;
3+
4+
const output = fn.doc(uris);
5+
console.log(output);
6+
output;
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"endpointDirectory" : "/example/cookbook/bulkExport/",
3+
"$javaClass" : "com.marklogic.client.example.cookbook.datamovement.BulkExportServices"
4+
}

0 commit comments

Comments
 (0)