Skip to content

Commit e180f63

Browse files
committed
Added the ability to batch load children objects and have their
resolution deferred until the batch completed. Also unit tests on the same.
1 parent 6f5b350 commit e180f63

File tree

15 files changed

+553
-52
lines changed

15 files changed

+553
-52
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1625,3 +1625,10 @@ When mapping a Java object to Aerospike the most common operations to do are to
16251625
- Document creation of builder -- multiple configuration files are allowed, if the same class is declared in both the first one encountered wins.
16261626
- Document methods with 2 parameters for keys and setters, the second one either a Key or a Value
16271627
- Document subclasses and the mapping to tables + references stored as lists
1628+
- Batch load of child items on Maps and References. Ensure testing of non-parameterized classes too. Also of methods on Virtual LIsts
1629+
- Test (and fix) batch loading of children on virtual lists.
1630+
- Document batch loading
1631+
- Ensure batchloading option exists in AerospikeReference Configuration
1632+
- handle object graph circularities (A->B->C). Be careful of: A->B(Lazy), A->C->B: B should end up fully hydrated in both instances, not lazy in both instances
1633+
- Consider the items on virtual list which return a list to be able to return a map as well (ELEMENT_LIST, ELEMENT_MAP)
1634+
- Test if map supports lazy loading of referenced objects.

src/main/java/com/aerospike/mapper/annotations/AerospikeReference.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
* @return
1818
*/
1919
boolean lazy() default false;
20+
/**
21+
* When a reference is to be loaded, it can either be loaded inline or it can be loaded via a batch load. The
22+
* batch load is typically significantly more efficient. Set this flag to <pre>false</pre> to prevent the batch load
23+
* @return
24+
*/
25+
boolean batchLoad() default true;
2026

2127
public static enum ReferenceType {
2228
ID,

src/main/java/com/aerospike/mapper/tools/AeroMapper.java

Lines changed: 133 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.aerospike.client.query.RecordSet;
2727
import com.aerospike.client.query.Statement;
2828
import com.aerospike.mapper.tools.ClassCache.PolicyType;
29+
import com.aerospike.mapper.tools.DeferredObjectLoader.DeferredObject;
30+
import com.aerospike.mapper.tools.DeferredObjectLoader.DeferredObjectSetter;
2931
import com.aerospike.mapper.tools.TypeUtils.AnnotatedType;
3032
import com.aerospike.mapper.tools.configuration.ClassConfig;
3133
import com.aerospike.mapper.tools.configuration.Configuration;
@@ -239,9 +241,11 @@ public Object translateToAerospike(Object obj) {
239241
* @return
240242
*/
241243
@SuppressWarnings("unchecked")
242-
public <T> T translateFromAerospike(Object obj, Class<T> expectedClazz) {
244+
public <T> T translateFromAerospike(@NotNull Object obj, @NotNull Class<T> expectedClazz) {
243245
TypeMapper thisMapper = TypeUtils.getMapper(expectedClazz, AnnotatedType.getDefaultAnnotateType(), this);
244-
return (T)(thisMapper == null ? obj : thisMapper.fromAerospikeFormat(obj));
246+
T result = (T)(thisMapper == null ? obj : thisMapper.fromAerospikeFormat(obj));
247+
resolveDependencies(ClassCache.getInstance().loadClass(expectedClazz, this));
248+
return result;
245249
}
246250

247251

@@ -276,34 +280,61 @@ public void update(@NotNull Object object, String ... binNames) throws Aerospike
276280
save(null, object, RecordExistsAction.UPDATE, binNames);
277281
}
278282

279-
280283
public <T> T readFromDigest(Policy readPolicy, @NotNull Class<T> clazz, @NotNull byte[] digest) throws AerospikeException {
284+
return this.readFromDigest(readPolicy, clazz, digest, true);
285+
}
286+
287+
/**
288+
* This method should not be used except by mappers
289+
*/
290+
public <T> T readFromDigest(Policy readPolicy, @NotNull Class<T> clazz, @NotNull byte[] digest, boolean resolveDependencies) throws AerospikeException {
281291
ClassCacheEntry<T> entry = getEntryAndValidateNamespace(clazz);
282292
Key key = new Key(entry.getNamespace(), digest, entry.getSetName(), null);
283-
return this.read(readPolicy, clazz, key, entry);
293+
return this.read(readPolicy, clazz, key, entry, resolveDependencies);
284294
}
285295

286296
public <T> T readFromDigest(@NotNull Class<T> clazz, @NotNull byte[] digest) throws AerospikeException {
297+
return this.readFromDigest(clazz, digest, true);
298+
}
299+
300+
/**
301+
* This method should not be used except by mappers
302+
*/
303+
public <T> T readFromDigest(@NotNull Class<T> clazz, @NotNull byte[] digest, boolean resolveDependencies) throws AerospikeException {
287304
ClassCacheEntry<T> entry = getEntryAndValidateNamespace(clazz);
288305
Key key = new Key(entry.getNamespace(), digest, entry.getSetName(), null);
289-
return this.read(null, clazz, key, entry);
306+
return this.read(null, clazz, key, entry, resolveDependencies);
290307
}
291308

292309
public <T> T read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Object userKey) throws AerospikeException {
310+
return this.read(readPolicy, clazz, userKey, true);
311+
}
312+
313+
/**
314+
* This method should not be used except by mappers
315+
*/
316+
public <T> T read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Object userKey, boolean resolveDependencies) throws AerospikeException {
293317
ClassCacheEntry<T> entry = getEntryAndValidateNamespace(clazz);
294318
String set = entry.getSetName();
295319
Key key = new Key(entry.getNamespace(), set, Value.get(entry.translateKeyToAerospikeKey(userKey)));
296-
return read(readPolicy, clazz, key, entry);
320+
return read(readPolicy, clazz, key, entry, resolveDependencies);
297321
}
298322

299323
public <T> T read(@NotNull Class<T> clazz, @NotNull Object userKey) throws AerospikeException {
324+
return this.read(clazz, userKey, true);
325+
}
326+
327+
/**
328+
* This method should not be used: It is used by mappers to correctly resolved dependencies. Use read(clazz, userkey) instead
329+
*/
330+
public <T> T read(@NotNull Class<T> clazz, @NotNull Object userKey, boolean resolveDependencies) throws AerospikeException {
300331
ClassCacheEntry<T> entry = getEntryAndValidateNamespace(clazz);
301332
String set = entry.getSetName();
302333
Key key = new Key(entry.getNamespace(), set, Value.get(entry.translateKeyToAerospikeKey(userKey)));
303-
return read(null, clazz, key, entry);
334+
return read(null, clazz, key, entry, resolveDependencies);
304335
}
305336

306-
private <T> T read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Key key, @NotNull ClassCacheEntry<T> entry) {
337+
private <T> T read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Key key, @NotNull ClassCacheEntry<T> entry, boolean resolveDepenencies) {
307338
if (readPolicy == null) {
308339
readPolicy = entry.getReadPolicy();
309340
}
@@ -314,7 +345,7 @@ private <T> T read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Key key,
314345
} else {
315346
try {
316347
ThreadLocalKeySaver.save(key);
317-
T result = (T) convertToObject(clazz, record, entry);
348+
T result = (T) convertToObject(clazz, record, entry, resolveDepenencies);
318349
return result;
319350
} catch (ReflectiveOperationException e) {
320351
throw new AerospikeException(e);
@@ -325,6 +356,7 @@ private <T> T read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Key key,
325356
}
326357
}
327358

359+
328360
public <T> boolean delete(@NotNull Class<T> clazz, @NotNull Object userKey) throws AerospikeException {
329361
return this.delete(null, clazz, userKey);
330362
}
@@ -461,18 +493,39 @@ public <T> T convertToObject(Class<T> clazz, Record record) {
461493
}
462494

463495
public <T> T convertToObject(Class<T> clazz, Record record, ClassCacheEntry<T> entry) throws ReflectiveOperationException {
496+
return this.convertToObject(clazz, record, entry, true);
497+
}
498+
499+
/**
500+
* This method should not be used, it is public only to allow mappers to see it.
501+
*/
502+
public <T> T convertToObject(Class<T> clazz, Record record, ClassCacheEntry<T> entry, boolean resolveDependencies) throws ReflectiveOperationException {
464503
if (entry == null) {
465504
entry = ClassCache.getInstance().loadClass(clazz, this);
466505
}
467-
return entry.constructAndHydrate(clazz, record);
506+
T result = entry.constructAndHydrate(clazz, record);
507+
if (resolveDependencies) {
508+
resolveDependencies(entry);
509+
}
510+
return result;
468511
}
469512

470513
public <T> T convertToObject(Class<T> clazz, List<Object> record) {
514+
return this.convertToObject(clazz, record, true);
515+
}
516+
517+
/**
518+
* This method should not be used, it is public only to allow mappers to see it.
519+
*/
520+
public <T> T convertToObject(Class<T> clazz, List<Object> record, boolean resolveDependencies) {
471521
try {
472522
ClassCacheEntry<T> entry = ClassCache.getInstance().loadClass(clazz, this);
473523
T result;
474524
result = clazz.getConstructor().newInstance();
475525
entry.hydrateFromList(record, result);
526+
if (resolveDependencies) {
527+
resolveDependencies(entry);
528+
}
476529
return result;
477530
} catch (ReflectiveOperationException e) {
478531
throw new AerospikeException(e);
@@ -499,4 +552,74 @@ public <T> Map<String, Object> convertToMap(@NotNull T instance) {
499552
ClassCacheEntry<T> entry = (ClassCacheEntry<T>) ClassCache.getInstance().loadClass(instance.getClass(), this);
500553
return entry.getMap(instance, false);
501554
}
555+
556+
/**
557+
* If an object refers to other objects (eg A has a list of B via references), then reading the object will populate the
558+
* ids. If configured to do so, these objects can be loaded via a batch load and populated back into the references which
559+
* contain them. This method performs this batch load, translating the records to objects and mapping them back to the
560+
* references.
561+
* <p/>
562+
* These loaded child objects can themselves have other references to other objects, so we iterate through this until
563+
* the list of deferred objects is empty. The deferred objects are stored in a <pre>ThreadLocalData<pre> list, so are thread safe
564+
* @param parentEntity - the ClassCacheEntry of the parent entity. This is used to get the batch policy to use.
565+
*/
566+
private void resolveDependencies(ClassCacheEntry<?> parentEntity) {
567+
List<DeferredObjectSetter> deferredObjects = DeferredObjectLoader.getAndClear();
568+
569+
if (deferredObjects.size() == 0) {
570+
return;
571+
}
572+
573+
BatchPolicy batchPolicy = parentEntity == null ? mClient.getBatchPolicyDefault() : parentEntity.getBatchPolicy();
574+
BatchPolicy batchPolicyClone = new BatchPolicy(batchPolicy);
575+
576+
while (deferredObjects != null && !deferredObjects.isEmpty()) {
577+
int size = deferredObjects.size();
578+
579+
ClassCacheEntry<?>[] classCaches = new ClassCacheEntry<?>[size];
580+
Key[] keys = new Key[size];
581+
582+
for (int i = 0; i < size; i++) {
583+
DeferredObjectSetter thisObjectSetter = deferredObjects.get(i);
584+
DeferredObject deferredObject = thisObjectSetter.getObject();
585+
Class<?> clazz = deferredObject.getType();
586+
ClassCacheEntry<?> entry = (ClassCacheEntry<?>) ClassCache.getInstance().loadClass(clazz, this);
587+
classCaches[i] = entry;
588+
589+
if (deferredObject.isDigest()) {
590+
keys[i] = new Key(entry.getNamespace(), (byte[])deferredObject.getKey(), entry.getSetName(), null);
591+
}
592+
else {
593+
keys[i] = new Key(entry.getNamespace(), entry.getSetName(), Value.get(entry.translateKeyToAerospikeKey(deferredObject.getKey())));
594+
}
595+
}
596+
597+
// Load the data
598+
if (keys.length <= 2) {
599+
// Just single-thread these keys for speed
600+
batchPolicyClone.maxConcurrentThreads = 1;
601+
}
602+
else {
603+
batchPolicyClone.maxConcurrentThreads = batchPolicy.maxConcurrentThreads;
604+
}
605+
Record[] records = this.mClient.get(batchPolicyClone, keys);
606+
607+
for (int i = 0; i < size; i++) {
608+
DeferredObjectSetter thisObjectSetter = deferredObjects.get(i);
609+
try {
610+
ThreadLocalKeySaver.save(keys[i]);
611+
Object result = this.convertToObject((Class)thisObjectSetter.getObject().getType(), records[i], classCaches[i], false);
612+
thisObjectSetter.getSetter().setValue(result);
613+
} catch (ReflectiveOperationException e) {
614+
throw new AerospikeException(e);
615+
}
616+
finally {
617+
ThreadLocalKeySaver.clear();
618+
}
619+
}
620+
deferredObjects = DeferredObjectLoader.getAndClear();
621+
}
622+
}
623+
624+
502625
}

src/main/java/com/aerospike/mapper/tools/ClassCache.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ private ClassCache() {
4545
}
4646

4747
public <T> ClassCacheEntry<T> loadClass(@NotNull Class<T> clazz, AeroMapper mapper) {
48+
if (clazz.isPrimitive() || clazz.equals(Object.class) || clazz.equals(String.class) || Number.class.isAssignableFrom(clazz)) {
49+
return null;
50+
}
4851
ClassCacheEntry<T> entry = cacheMap.get(clazz);
4952
if (entry == null) {
5053
try {

src/main/java/com/aerospike/mapper/tools/ClassCacheEntry.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import com.aerospike.client.AerospikeException;
2222
import com.aerospike.client.Bin;
23+
import com.aerospike.client.Key;
2324
import com.aerospike.client.Record;
25+
import com.aerospike.client.Value;
2426
import com.aerospike.client.cdt.MapOrder;
2527
import com.aerospike.client.policy.BatchPolicy;
2628
import com.aerospike.client.policy.Policy;
@@ -36,6 +38,8 @@
3638
import com.aerospike.mapper.annotations.AerospikeRecord;
3739
import com.aerospike.mapper.annotations.AerospikeSetter;
3840
import com.aerospike.mapper.annotations.ParamFrom;
41+
import com.aerospike.mapper.tools.DeferredObjectLoader.DeferredObject;
42+
import com.aerospike.mapper.tools.DeferredObjectLoader.DeferredObjectSetter;
3943
import com.aerospike.mapper.tools.TypeUtils.AnnotatedType;
4044
import com.aerospike.mapper.tools.configuration.BinConfig;
4145
import com.aerospike.mapper.tools.configuration.ClassConfig;
@@ -137,6 +141,15 @@ public Policy getReadPolicy() {
137141
public WritePolicy getWritePolicy() {
138142
return writePolicy;
139143
}
144+
public BatchPolicy getBatchPolicy() {
145+
return batchPolicy;
146+
}
147+
public QueryPolicy getQueryPolicy() {
148+
return queryPolicy;
149+
}
150+
public ScanPolicy getScanPolicy() {
151+
return scanPolicy;
152+
}
140153

141154
public Class<?> getUnderlyingClass() {
142155
return this.clazz;
@@ -752,6 +765,7 @@ private T constructAndHydrate(Class<T> clazz, Record record, Map<String, Object>
752765
valueMap.clear();
753766
thisClass = thisClass.superClazz;
754767
}
768+
755769
return result;
756770
}
757771
catch (ReflectiveOperationException ref) {
@@ -909,7 +923,7 @@ public void hydrateFromList(List<Object> list, Object instance, boolean skipKey)
909923
try {
910924
int index = 0;
911925
int endIndex = list.size();
912-
ClassCacheEntry thisClass = this;
926+
ClassCacheEntry<?> thisClass = this;
913927
while (thisClass != null) {
914928
if (index < endIndex) {
915929
Object lastValue = list.get(endIndex-1);
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package com.aerospike.mapper.tools;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
6+
public class DeferredObjectLoader {
7+
public static interface DeferredSetter {
8+
public void setValue(Object object);
9+
}
10+
11+
public static class DeferredObject {
12+
private final Object key;
13+
private final Class<?> type;
14+
private final boolean isDigest;
15+
public DeferredObject(Object key, Class<?> type, boolean isDigest) {
16+
super();
17+
this.key = key;
18+
this.type = type;
19+
this.isDigest = isDigest;
20+
}
21+
public Object getKey() {
22+
return key;
23+
}
24+
public Class<?> getType() {
25+
return type;
26+
}
27+
public boolean isDigest() {
28+
return isDigest;
29+
}
30+
}
31+
32+
public static class DeferredObjectSetter {
33+
private final DeferredSetter setter;
34+
private final DeferredObject object;
35+
36+
public DeferredObjectSetter(DeferredSetter setter, DeferredObject object) {
37+
super();
38+
this.setter = setter;
39+
this.object = object;
40+
}
41+
public DeferredSetter getSetter() {
42+
return setter;
43+
}
44+
public DeferredObject getObject() {
45+
return object;
46+
}
47+
}
48+
49+
50+
private static ThreadLocal<List<DeferredObjectSetter>> threadLocalLoader = new ThreadLocal<List<DeferredObjectSetter>>() {
51+
@Override
52+
public List<DeferredObjectSetter> initialValue() {
53+
return new ArrayList<DeferredObjectSetter>();
54+
}
55+
};
56+
57+
public static void save(DeferredObjectSetter object) {
58+
threadLocalLoader.get().add(object);
59+
}
60+
61+
public static void clear() {
62+
threadLocalLoader.get().clear();
63+
}
64+
65+
public static List<DeferredObjectSetter> get() {
66+
return threadLocalLoader.get();
67+
}
68+
69+
public static void add(DeferredObjectSetter deferredSetter) {
70+
threadLocalLoader.get().add(deferredSetter);
71+
}
72+
73+
public static List<DeferredObjectSetter> getAndClear() {
74+
List<DeferredObjectSetter> localArray = threadLocalLoader.get();
75+
List<DeferredObjectSetter> setters = new ArrayList<DeferredObjectSetter>(localArray);
76+
localArray.clear();
77+
return setters;
78+
}
79+
}

0 commit comments

Comments
 (0)