77import com .aerospike .mapper .annotations .AerospikeRecord ;
88import com .aerospike .mapper .annotations .ParamFrom ;
99import com .aerospike .mapper .tools .ReactiveAeroMapper ;
10+ import lombok .SneakyThrows ;
1011import org .junit .jupiter .api .AfterAll ;
1112import org .junit .jupiter .api .BeforeAll ;
1213import org .junit .jupiter .api .Test ;
1516
1617import java .util .ArrayList ;
1718import java .util .Arrays ;
19+ import java .util .Comparator ;
1820import java .util .List ;
21+ import java .util .stream .Collectors ;
22+ import java .util .stream .Stream ;
23+
24+ import static org .junit .jupiter .api .Assertions .assertNotNull ;
1925
2026@ TestInstance (TestInstance .Lifecycle .PER_CLASS )
2127public class ReactiveBatchLoadTest extends ReactiveAeroMapperBaseTest {
@@ -24,39 +30,6 @@ public class ReactiveBatchLoadTest extends ReactiveAeroMapperBaseTest {
2430 private final B [] bees = new B [100 ];
2531 private final A [] as = new A [10 ];
2632
27- @ AerospikeRecord (namespace = "test" , set = "batchB" )
28- public static class B {
29- @ AerospikeKey
30- public int id ;
31- public String name ;
32-
33- public B (@ ParamFrom ("id" ) int id , @ ParamFrom ("name" ) String name ) {
34- this .id = id ;
35- this .name = name ;
36- }
37- }
38-
39- @ AerospikeRecord (namespace = "test" , set = "batchA" )
40- public static class A {
41- @ AerospikeKey
42- public int id ;
43- public String name ;
44- public List <B > data ;
45-
46- public A (int id , String name ) {
47- this .id = id ;
48- this .name = name ;
49- data = new ArrayList <>();
50- }
51-
52- public void setBList (List <B > bees ) {
53- data = bees ;
54- }
55-
56- public A () {
57- }
58- }
59-
6033 @ BeforeAll
6134 public void populateStaticData () {
6235 for (int i = 0 ; i < 100 ; i ++) {
@@ -76,26 +49,28 @@ public void clear() {
7649 client .truncate (null , "test" , "batchB" , null );
7750 }
7851
52+ @ SneakyThrows
7953 private ReactiveAeroMapper populate () {
8054 client .truncate (null , "test" , "batchA" , null );
8155 client .truncate (null , "test" , "batchB" , null );
56+ Thread .sleep (1000 );
8257
8358 ReactiveAeroMapper reactiveMapper = new ReactiveAeroMapper .Builder (reactorClient ).build ();
8459
85- reactiveMapper .save ((Object []) bees ).subscribeOn (Schedulers .parallel ()).collectList ().block ();
86- reactiveMapper .save ((Object []) as ).subscribeOn (Schedulers .parallel ()).collectList ().block ();
60+ reactiveMapper .save ((Object []) bees ).subscribeOn (Schedulers .single ()).collectList ().block ();
61+ reactiveMapper .save ((Object []) as ).subscribeOn (Schedulers .single ()).collectList ().block ();
8762
8863 return reactiveMapper ;
8964 }
9065
9166 @ Test
92- public void testBatchLoad () {
67+ void testBatchLoad () {
9368 ReactiveAeroMapper reactiveMapper = populate ();
9469
95- B resultB = reactiveMapper .read (B .class , bees [1 ].id ).subscribeOn (Schedulers .parallel ()).block ();
70+ B resultB = reactiveMapper .read (B .class , bees [1 ].id ).subscribeOn (Schedulers .single ()).block ();
9671 compare (bees [1 ], resultB );
9772
98- A resultA = reactiveMapper .read (A .class , as [1 ].id ).subscribeOn (Schedulers .parallel ()).block ();
73+ A resultA = reactiveMapper .read (A .class , as [1 ].id ).subscribeOn (Schedulers .single ()).block ();
9974 compare (as [1 ], resultA );
10075
10176 Integer [] ids = new Integer [6 ];
@@ -106,28 +81,26 @@ public void testBatchLoad() {
10681 ids [4 ] = as [1 ].id ;
10782 ids [5 ] = 3000 ;
10883
109- A [] results = new A [6 ];
110- List <A > resultsObjects = reactiveMapper .read (A .class , ids )
111- .subscribeOn (Schedulers .parallel ()).collectList ().block ();
112-
113- assert resultsObjects != null ;
114- results = resultsObjects .toArray (results );
115- compare (results [0 ], as [4 ]);
116- compare (results [1 ], as [7 ]);
117- compare (results [2 ], as [5 ]);
118- compare (results [3 ], as [0 ]);
119- compare (results [4 ], as [1 ]);
120- compare (results [5 ], null );
84+ List <A > expected = Stream .of (as [4 ], as [7 ], as [5 ], as [0 ], as [1 ])
85+ .sorted (Comparator .nullsFirst (A ::compareTo ))
86+ .collect (Collectors .toList ());
87+
88+ List <A > resultsList = reactiveMapper .read (A .class , ids )
89+ .subscribeOn (Schedulers .single ()).collectList ().block ();
90+
91+ assertNotNull (resultsList );
92+ resultsList .sort (Comparator .nullsFirst (A ::compareTo ));
93+ compare (expected , resultsList );
12194 }
12295
12396 @ Test
124- public void testBatchLoadWithOperations () {
97+ void testBatchLoadWithOperations () {
12598 ReactiveAeroMapper reactiveMapper = populate ();
12699
127- B resultB = reactiveMapper .read (B .class , bees [1 ].id ).subscribeOn (Schedulers .parallel ()).block ();
100+ B resultB = reactiveMapper .read (B .class , bees [1 ].id ).subscribeOn (Schedulers .single ()).block ();
128101 compare (bees [1 ], resultB );
129102
130- A resultA = reactiveMapper .read (A .class , as [1 ].id ).subscribeOn (Schedulers .parallel ()).block ();
103+ A resultA = reactiveMapper .read (A .class , as [1 ].id ).subscribeOn (Schedulers .single ()).block ();
131104 compare (as [1 ], resultA );
132105
133106 Integer [] userKeys = new Integer [6 ];
@@ -142,21 +115,66 @@ public void testBatchLoadWithOperations() {
142115 ops [0 ] = ListOperation .size (DATA_BIN );
143116 ops [1 ] = ListOperation .getByIndex (DATA_BIN , -1 , ListReturnType .VALUE );
144117
145- A [] results = new A [6 ];
146- List <A > resultsList = reactiveMapper .read (A .class , userKeys , ops ).subscribeOn (Schedulers .parallel ()).collectList ().block ();
147- assert resultsList != null ;
148- results = resultsList .toArray (results );
149-
150- compare (results [0 ].data .get (0 ).id , 10 );
151- compare (results [0 ].data .get (1 ).id , as [4 ].data .get (as [4 ].data .size () - 1 ).id );
152- compare (results [1 ].data .get (0 ).id , 10 );
153- compare (results [1 ].data .get (1 ).id , as [7 ].data .get (as [7 ].data .size () - 1 ).id );
154- compare (results [2 ].data .get (0 ).id , 10 );
155- compare (results [2 ].data .get (1 ).id , as [5 ].data .get (as [5 ].data .size () - 1 ).id );
156- compare (results [3 ].data .get (0 ).id , 10 );
157- compare (results [3 ].data .get (1 ).id , as [0 ].data .get (as [0 ].data .size () - 1 ).id );
158- compare (results [4 ].data .get (0 ).id , 10 );
159- compare (results [4 ].data .get (1 ).id , as [1 ].data .get (as [1 ].data .size () - 1 ).id );
160- compare (results [5 ], null );
118+ List <List <B >> expected = Stream .of (as [4 ], as [7 ], as [5 ], as [0 ], as [1 ])
119+ .map (a -> a .data )
120+ .sorted (Comparator .comparing ((List <B > o ) -> o .get (1 )))
121+ .collect (Collectors .toList ());
122+
123+ List <List <B >> resultsList = reactiveMapper .read (A .class , userKeys , ops )
124+ .subscribeOn (Schedulers .parallel ()).collectList ().block ()
125+ .stream ().map (a -> a .data )
126+ .sorted (Comparator .comparing ((List <B > o ) -> o .get (1 )))
127+ .collect (Collectors .toList ());
128+
129+ assertNotNull (resultsList );
130+ for (int i = 0 ; i < expected .size (); i ++) {
131+ compare (resultsList .get (i ).get (0 ).id , 10 );
132+ compare (expected .get (i ).get (expected .get (i ).size () - 1 ), resultsList .get (i ).get (1 ));
133+ }
134+ }
135+
136+ @ AerospikeRecord (namespace = "test" , set = "batchB" )
137+ public static class B implements Comparable <B > {
138+ @ AerospikeKey
139+ public int id ;
140+ public String name ;
141+
142+ public B (@ ParamFrom ("id" ) int id , @ ParamFrom ("name" ) String name ) {
143+ this .id = id ;
144+ this .name = name ;
145+ }
146+
147+ @ Override
148+ public int compareTo (B o ) {
149+ if (o == null ) return 1 ;
150+ return Integer .compare (id , o .id );
151+ }
152+ }
153+
154+ @ AerospikeRecord (namespace = "test" , set = "batchA" )
155+ public static class A implements Comparable <A > {
156+ @ AerospikeKey
157+ public int id ;
158+ public String name ;
159+ public List <B > data ;
160+
161+ public A (int id , String name ) {
162+ this .id = id ;
163+ this .name = name ;
164+ data = new ArrayList <>();
165+ }
166+
167+ public A () {
168+ }
169+
170+ public void setBList (List <B > bees ) {
171+ data = bees ;
172+ }
173+
174+ @ Override
175+ public int compareTo (A o ) {
176+ if (o == null ) return 1 ;
177+ return Integer .compare (id , o .id );
178+ }
161179 }
162180}
0 commit comments