From 988027b48e6e293620dc834fc8da28aa693f1657 Mon Sep 17 00:00:00 2001
From: alzimmermsft <48699787+alzimmermsft@users.noreply.github.com>
Date: Thu, 4 Dec 2025 18:00:34 -0500
Subject: [PATCH 1/3] Replace usages of RxJava2 in testing with Reactor Test
---
sdk/cosmos/azure-cosmos-encryption/pom.xml | 7 --
sdk/cosmos/azure-cosmos-tests/pom.xml | 6 --
.../com/azure/cosmos/CosmosMultiHashTest.java | 20 ++--
.../cosmos/RetryContextOnDiagnosticTest.java | 32 +++----
.../implementation/ClientRetryPolicyTest.java | 25 ++---
.../cosmos/implementation/RetryUtilsTest.java | 32 +++----
.../RxGatewayStoreModelTest.java | 14 +--
.../cosmos/implementation/TestSuiteBase.java | 53 +++--------
.../WebExceptionRetryPolicyTest.java | 15 +--
.../ConsistencyReaderTest.java | 45 ++++-----
.../ConsistencyWriterTest.java | 33 +++----
.../GatewayAddressCacheTest.java | 26 +++---
.../HttpTransportClientTest.java | 27 ++----
.../MetadataRequestRetryPolicyTests.java | 15 +--
.../directconnectivity/QuorumReaderTest.java | 53 +++++------
...catedResourceClientPartitionSplitTest.java | 41 +++------
...ReplicatedResourceClientRetryWithTest.java | 29 ++----
.../ReplicatedResourceClientTest.java | 17 +---
.../RntbdTransportClientTest.java | 16 +---
.../StoreReaderDotNetTest.java | 38 +++-----
.../directconnectivity/StoreReaderTest.java | 50 ++++------
.../query/DocumentProducerTest.java | 92 +++++++------------
.../implementation/query/FetcherTest.java | 19 ++--
.../rx/BackPressureCrossPartitionTest.java | 44 +++++----
.../com/azure/cosmos/rx/BackPressureTest.java | 73 ++++++++-------
.../azure/cosmos/rx/FeedRangeQueryTests.java | 16 ++--
.../cosmos/rx/OffsetLimitQueryTests.java | 21 +++--
.../cosmos/rx/OrderbyDocumentQueryTest.java | 57 ++++++------
.../cosmos/rx/ParallelDocumentQueryTest.java | 18 ++--
.../azure/cosmos/rx/QueryValidationTests.java | 14 +--
.../rx/ReadFeedExceptionHandlingTest.java | 15 +--
.../rx/SinglePartitionDocumentQueryTest.java | 33 +++----
.../com/azure/cosmos/rx/TestSuiteBase.java | 92 +++++++------------
.../com/azure/cosmos/rx/TopQueryTests.java | 34 +++----
.../implementation/DatabaseForTest.java | 9 +-
.../directconnectivity/ConsistencyWriter.java | 6 +-
.../directconnectivity/StoreReader.java | 27 ++----
37 files changed, 454 insertions(+), 710 deletions(-)
diff --git a/sdk/cosmos/azure-cosmos-encryption/pom.xml b/sdk/cosmos/azure-cosmos-encryption/pom.xml
index 1a2eeb22d564..7f29e7afa527 100644
--- a/sdk/cosmos/azure-cosmos-encryption/pom.xml
+++ b/sdk/cosmos/azure-cosmos-encryption/pom.xml
@@ -182,13 +182,6 @@ Licensed under the MIT License.
test
-
- io.reactivex.rxjava2
- rxjava
- 2.2.21
- test
-
-
org.mockito
mockito-core
diff --git a/sdk/cosmos/azure-cosmos-tests/pom.xml b/sdk/cosmos/azure-cosmos-tests/pom.xml
index 939815cdbc54..0e1ab0250ef0 100644
--- a/sdk/cosmos/azure-cosmos-tests/pom.xml
+++ b/sdk/cosmos/azure-cosmos-tests/pom.xml
@@ -184,12 +184,6 @@ Licensed under the MIT License.
3.7.11
test
-
- io.reactivex.rxjava2
- rxjava
- 2.2.21
- test
-
org.mockito
mockito-core
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosMultiHashTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosMultiHashTest.java
index 9d3f0795461d..5275af383f2a 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosMultiHashTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosMultiHashTest.java
@@ -24,19 +24,21 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
-import io.reactivex.subscribers.TestSubscriber;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
+import reactor.test.StepVerifier;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -594,14 +596,14 @@ private void testPartialPKContinuationToken() {
options.setMaxDegreeOfParallelism(2);
CosmosPagedFlux queryObservable = cosmosAsyncContainer.queryItems(query, options, ObjectNode.class);
- TestSubscriber> testSubscriber = new TestSubscriber<>();
- queryObservable.byPage(requestContinuation, 1).subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
+ AtomicReference> value = new AtomicReference<>();
+ StepVerifier.create(queryObservable.byPage(requestContinuation, 1))
+ .assertNext(value::set)
+ .thenConsumeWhile(Objects::nonNull)
+ .expectComplete()
+ .verify(Duration.ofMillis(TIMEOUT));
- @SuppressWarnings("unchecked")
- FeedResponse firstPage = (FeedResponse) testSubscriber.getEvents().get(0).get(0);
+ FeedResponse firstPage = value.get();
requestContinuation = firstPage.getContinuationToken();
receivedDocuments.addAll(firstPage.getResults());
assertThat(firstPage.getResults().size()).isEqualTo(1);
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java
index ee59f0ac723e..5253d8af5d78 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java
@@ -61,12 +61,12 @@
import io.netty.buffer.ByteBufUtil;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.util.ReferenceCountUtil;
-import io.reactivex.subscribers.TestSubscriber;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.lang.reflect.Field;
import java.net.URISyntaxException;
@@ -955,30 +955,24 @@ public void throttlingExceptionGatewayModeScenario() {
}
private StoreResponse validateSuccess(Mono storeResponse) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
- storeResponse.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(60000, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- return testSubscriber.values().get(0);
+ AtomicReference value = new AtomicReference<>();
+ StepVerifier.create(storeResponse)
+ .assertNext(value::set)
+ .expectComplete()
+ .verify(Duration.ofMillis(60_000));
+
+ return value.get();
}
private void validateFailure(Mono storeResponse) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
- storeResponse.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(60000, TimeUnit.MILLISECONDS);
- testSubscriber.assertNotComplete();
- testSubscriber.assertTerminated();
+ StepVerifier.create(storeResponse).expectError().verify(Duration.ofMillis(60_000));
}
private void validateServiceResponseSuccess(Mono> documentServiceResponseMono) {
- TestSubscriber> testSubscriber = new TestSubscriber<>();
- documentServiceResponseMono.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(60000, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
+ StepVerifier.create(documentServiceResponseMono)
+ .expectNextCount(1)
+ .expectComplete()
+ .verify(Duration.ofMillis(60_000));
}
private static class TestRetryPolicy extends DocumentClientRetryPolicy {
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java
index 94575922f652..96d2969a8cb3 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java
@@ -5,27 +5,26 @@
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
-import com.azure.cosmos.implementation.directconnectivity.WFConstants;
-import java.util.Map;
-import java.util.HashMap;
import com.azure.cosmos.ThrottlingRetryOptions;
-import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
import com.azure.cosmos.implementation.directconnectivity.ChannelAcquisitionException;
-import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
+import com.azure.cosmos.implementation.directconnectivity.WFConstants;
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover;
+import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
+import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import io.netty.handler.timeout.ReadTimeoutException;
-import io.reactivex.subscribers.TestSubscriber;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import javax.net.ssl.SSLHandshakeException;
import java.net.SocketException;
import java.net.URI;
import java.time.Duration;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
@@ -687,13 +686,9 @@ public static void validateSuccess(Mono single,
public static void validateSuccess(Mono single,
ShouldRetryValidator validator,
long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- single.flux().subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertComplete();
- testSubscriber.assertNoErrors();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
}
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RetryUtilsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RetryUtilsTest.java
index df602b2d287a..3b90300ed79b 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RetryUtilsTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RetryUtilsTest.java
@@ -6,7 +6,6 @@
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.StoreResponseValidator;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import io.reactivex.subscribers.TestSubscriber;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -15,14 +14,13 @@
import org.testng.annotations.Test;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.time.Duration;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static com.azure.cosmos.implementation.TestUtils.mockDocumentServiceRequest;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
public class RetryUtilsTest {
@@ -109,26 +107,20 @@ public void toRetryWithAlternateFuncTestingMethodTwo() {
}
private void validateFailure(Mono single, long timeout, Class extends Throwable> class1) {
-
- TestSubscriber testSubscriber = new TestSubscriber<>();
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNotComplete();
- testSubscriber.assertTerminated();
- assertThat(testSubscriber.errorCount()).isEqualTo(1);
- Throwable throwable = Exceptions.unwrap(testSubscriber.errors().get(0));
- if (!(throwable.getClass().equals(class1))) {
- fail("Not expecting " + testSubscriber.getEvents().get(1).get(0));
- }
+ StepVerifier.create(single)
+ .expectErrorSatisfies(thrown -> {
+ Throwable throwable = Exceptions.unwrap(thrown);
+ if (!(throwable.getClass().equals(class1))) {
+ fail("Not expecting " + thrown);
+ }
+ }).verify(Duration.ofMillis(timeout));
}
private void validateSuccess(Mono single, StoreResponseValidator validator, long timeout) {
-
- TestSubscriber testSubscriber = new TestSubscriber<>();
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- assertThat(testSubscriber.valueCount()).isEqualTo(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
private void toggleMockFuncBtwFailureSuccess(
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java
index 5a54c807b6eb..aa10e2cede0a 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java
@@ -5,7 +5,6 @@
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosException;
-import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.implementation.http.HttpClient;
@@ -14,17 +13,16 @@
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import io.netty.channel.ConnectTimeoutException;
import io.netty.handler.timeout.ReadTimeoutException;
-import io.reactivex.subscribers.TestSubscriber;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.net.SocketException;
import java.net.URI;
import java.time.Duration;
-import java.util.concurrent.TimeUnit;
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static org.assertj.core.api.Assertions.assertThat;
@@ -303,13 +301,9 @@ public void validateFailure(Mono observable,
public static void validateFailure(Mono observable,
FailureValidator validator,
long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
- observable.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNotComplete();
- testSubscriber.assertTerminated();
- assertThat(testSubscriber.errorCount()).isEqualTo(1);
- validator.validate(testSubscriber.errors().get(0));
+ StepVerifier.create(observable)
+ .expectErrorSatisfies(validator::validate)
+ .verify(Duration.ofMillis(timeout));
}
enum SessionTokenType {
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java
index ce0b29a47de5..5c59341d59c0 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java
@@ -31,12 +31,10 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
-import io.reactivex.subscribers.TestSubscriber;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
-import org.testng.ITestContext;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.DataProvider;
@@ -44,6 +42,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
+import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.ArrayList;
@@ -52,7 +51,6 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doAnswer;
@Listeners({TestNGLogListener.class, CosmosNettyLeakDetectorFactory.class})
@@ -766,15 +764,10 @@ public void validateSuccess(Mono> obser
public static void validateSuccess(Mono> observable,
ResourceResponseValidator validator, long timeout) {
-
- TestSubscriber> testSubscriber = new TestSubscriber<>();
-
- observable.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(observable)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
public void validateFailure(Mono> observable,
@@ -784,15 +777,9 @@ public void validateFailure(Mono> obser
public static void validateFailure(Mono> observable,
FailureValidator validator, long timeout) {
-
- TestSubscriber> testSubscriber = new TestSubscriber<>();
-
- observable.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNotComplete();
- testSubscriber.assertTerminated();
- assertThat(testSubscriber.errorCount()).isEqualTo(1);
- validator.validate((Throwable) testSubscriber.getEvents().get(1).get(0));
+ StepVerifier.create(observable)
+ .expectErrorSatisfies(validator::validate)
+ .verify(Duration.ofMillis(timeout));
}
public void validateQuerySuccess(Flux> observable,
@@ -802,14 +789,10 @@ public void validateQuerySuccess(Flux> obse
public static void validateQuerySuccess(Flux> observable,
FeedResponseListValidator validator, long timeout) {
-
- TestSubscriber> testSubscriber = new TestSubscriber<>();
-
- observable.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- validator.validate(testSubscriber.values());
+ StepVerifier.create(observable.collectList())
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
public void validateQueryFailure(Flux> observable,
@@ -819,15 +802,9 @@ public void validateQueryFailure(Flux> obse
public static void validateQueryFailure(Flux> observable,
FailureValidator validator, long timeout) {
-
- TestSubscriber> testSubscriber = new TestSubscriber<>();
-
- observable.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNotComplete();
- testSubscriber.assertTerminated();
- assertThat(testSubscriber.errorCount()).isEqualTo(1);
- validator.validate((Throwable) testSubscriber.getEvents().get(1).get(0));
+ StepVerifier.create(observable)
+ .expectErrorSatisfies(validator::validate)
+ .verify(Duration.ofMillis(timeout));
}
@DataProvider
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/WebExceptionRetryPolicyTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/WebExceptionRetryPolicyTest.java
index 41db724bf295..d1cd79cb8f54 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/WebExceptionRetryPolicyTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/WebExceptionRetryPolicyTest.java
@@ -7,15 +7,14 @@
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import io.netty.handler.timeout.ReadTimeoutException;
-import io.reactivex.subscribers.TestSubscriber;
import org.mockito.Mockito;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.net.URI;
import java.time.Duration;
-import java.util.concurrent.TimeUnit;
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static org.assertj.core.api.Assertions.assertThat;
@@ -314,13 +313,9 @@ public static void validateSuccess(Mono single,
public static void validateSuccess(Mono single,
ShouldRetryValidator validator,
long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- single.flux().subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertComplete();
- testSubscriber.assertNoErrors();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
}
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReaderTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReaderTest.java
index 372730278a48..c8b7c14ce484 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReaderTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReaderTest.java
@@ -6,8 +6,6 @@
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.ReadConsistencyStrategy;
import com.azure.cosmos.SessionRetryOptions;
-import com.azure.cosmos.implementation.NotFoundException;
-import com.azure.cosmos.implementation.RequestRateTooLargeException;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.DocumentServiceRequestContext;
import com.azure.cosmos.implementation.FailureValidator;
@@ -15,29 +13,31 @@
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.ISessionContainer;
import com.azure.cosmos.implementation.ISessionToken;
+import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.RequestChargeTracker;
+import com.azure.cosmos.implementation.RequestRateTooLargeException;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.StoreResponseBuilder;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.VectorSessionToken;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
-import io.reactivex.subscribers.TestSubscriber;
import org.mockito.Mockito;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.math.BigDecimal;
import java.math.RoundingMode;
+import java.time.Duration;
import java.util.List;
-import java.util.concurrent.TimeUnit;
+import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static com.azure.cosmos.implementation.Utils.ValueHolder;
import static org.assertj.core.api.Assertions.assertThat;
-import static com.azure.cosmos.implementation.TestUtils.*;
public class ConsistencyReaderTest {
private final Configs configs = new Configs();
@@ -854,14 +854,10 @@ public static void validateSuccess(Mono> single,
public static void validateSuccess(Mono> single,
MultiStoreResultValidator validator,
long timeout) {
- TestSubscriber> testSubscriber = new TestSubscriber<>();
-
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
public static void validateSuccess(Mono single,
@@ -872,28 +868,19 @@ public static void validateSuccess(Mono single,
public static void validateSuccess(Mono single,
StoreResponseValidator validator,
long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
public static void validateException(Mono single,
FailureValidator validator,
long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNotComplete();
- testSubscriber.assertTerminated();
- assertThat(testSubscriber.errorCount()).isEqualTo(1);
- validator.validate(testSubscriber.errors().get(0));
+ StepVerifier.create(single)
+ .expectErrorSatisfies(validator::validate)
+ .verify(Duration.ofMillis(timeout));
}
public static void validateException(Mono single,
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java
index 9ede1c281219..7cb9320e17fe 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java
@@ -21,7 +21,6 @@
import com.azure.cosmos.implementation.StoreResponseBuilder;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
-import io.reactivex.subscribers.TestSubscriber;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -29,7 +28,10 @@
import org.testng.annotations.Test;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+import reactor.test.subscriber.TestSubscriber;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -117,12 +119,7 @@ public void exception(Exception ex, Class klass, int expectedStatusCo
.subStatusCode(expectedSubStatusCode)
.build();
- TestSubscriber subscriber = new TestSubscriber<>();
- res.subscribe(subscriber);
- subscriber.awaitTerminalEvent();
- subscriber.assertNotComplete();
- assertThat(subscriber.errorCount()).isEqualTo(1);
- failureValidator.validate(subscriber.errors().get(0));
+ StepVerifier.create(res).verifyErrorSatisfies(failureValidator::validate);
}
@Test(groups = "unit")
@@ -228,20 +225,15 @@ public void timeout1() throws Exception {
TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class);
Mockito.doReturn(true).when(timeoutHelper).isElapsed();
ConsistencyWriter spyConsistencyWriter = Mockito.spy(this.consistencyWriter);
- TestSubscriber subscriber = new TestSubscriber<>();
RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext);
ClientSideRequestStatistics clientSideRequestStatistics = BridgeInternal.getClientSideRequestStatics(request.requestContext.cosmosDiagnostics);
RetryContext retryContext = Mockito.mock(RetryContext.class);
ReflectionUtils.setRetryContext(clientSideRequestStatistics, retryContext);
Mockito.doReturn(2).when(retryContext).getRetryCount();
- spyConsistencyWriter.writeAsync(request, timeoutHelper, false)
- .subscribe(subscriber);
-
- subscriber.awaitTerminalEvent(10, TimeUnit.MILLISECONDS);
- subscriber.assertNoValues();
-
- subscriber.assertError(RequestTimeoutException.class);
+ StepVerifier.create(spyConsistencyWriter.writeAsync(request, timeoutHelper, false))
+ .expectError(RequestTimeoutException.class)
+ .verify(Duration.ofMillis(10));
}
@Test(groups = "unit")
@@ -250,18 +242,15 @@ public void timeout2() throws Exception {
TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class);
Mockito.doReturn(false).doReturn(true).when(timeoutHelper).isElapsed();
ConsistencyWriter spyConsistencyWriter = Mockito.spy(this.consistencyWriter);
- TestSubscriber subscriber = new TestSubscriber<>();
RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext);
ClientSideRequestStatistics clientSideRequestStatistics = BridgeInternal.getClientSideRequestStatics(request.requestContext.cosmosDiagnostics);
RetryContext retryContext = Mockito.mock(RetryContext.class);
ReflectionUtils.setRetryContext(clientSideRequestStatistics, retryContext);
Mockito.doReturn(2).when(retryContext).getRetryCount();
- spyConsistencyWriter.writeAsync(request, timeoutHelper, false)
- .subscribe(subscriber);
-
- subscriber.awaitTerminalEvent(10, TimeUnit.MILLISECONDS);
- subscriber.assertError(RequestTimeoutException.class);
+ StepVerifier.create(spyConsistencyWriter.writeAsync(request, timeoutHelper, false))
+ .expectError(RequestTimeoutException.class)
+ .verify(Duration.ofMillis(10));
}
@Test(groups = "unit", dataProvider = "storeResponseArgProvider")
@@ -388,7 +377,7 @@ private void initializeConsistencyWriter(boolean useMultipleWriteLocation) {
public static void validateError(Mono single,
FailureValidator validator) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
+ TestSubscriber testSubscriber = TestSubscriber.create();
try {
single.flux().subscribe(testSubscriber);
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java
index 8285ea915603..172c00f799bc 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java
@@ -32,7 +32,6 @@
import com.azure.cosmos.implementation.http.HttpClientConfig;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import com.azure.cosmos.models.PartitionKeyDefinition;
-import io.reactivex.subscribers.TestSubscriber;
import org.assertj.core.api.AssertionsForClassTypes;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
@@ -45,6 +44,7 @@
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
@@ -1565,13 +1565,12 @@ public static void assertExactlyEqual(List actual, List T getSuccessResult(Mono observable, long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
- observable.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- return testSubscriber.values().get(0);
+ AtomicReference value = new AtomicReference<>();
+ StepVerifier.create(observable)
+ .assertNext(value::set)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
+ return value.get();
}
public static void validateSuccess(Mono> observable,
@@ -1580,13 +1579,10 @@ public static void validateSuccess(Mono> observable,
RxDocumentServiceRequest serviceRequest,
int requestIndex,
long timeout) {
- TestSubscriber> testSubscriber = new TestSubscriber<>();
- observable.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(observable)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
// Verifying activity id is being set in header on address call to gateway.
String addressResolutionActivityId =
BridgeInternal.getClientSideRequestStatics(serviceRequest.requestContext.cosmosDiagnostics).getAddressResolutionStatistics().keySet().iterator().next();
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClientTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClientTest.java
index af649c2f505b..07c14f5c9711 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClientTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClientTest.java
@@ -35,16 +35,14 @@
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.HttpResponse;
import io.netty.channel.ConnectTimeoutException;
-import io.reactivex.subscribers.TestSubscriber;
-import org.assertj.core.api.Assertions;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@@ -611,14 +609,10 @@ public void validateSuccess(Mono single, StoreResponseValidator v
public static void validateSuccess(Mono single,
StoreResponseValidator validator, long timeout) {
-
- TestSubscriber testSubscriber = new TestSubscriber<>();
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
public void validateFailure(Mono single,
@@ -628,13 +622,8 @@ public void validateFailure(Mono single,
public static void validateFailure(Mono single,
FailureValidator validator, long timeout) {
-
- TestSubscriber testSubscriber = new TestSubscriber<>();
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNotComplete();
- testSubscriber.assertTerminated();
- Assertions.assertThat(testSubscriber.errorCount()).isEqualTo(1);
- validator.validate(testSubscriber.errors().get(0));
+ StepVerifier.create(single)
+ .expectErrorSatisfies(validator::validate)
+ .verify(Duration.ofMillis(timeout));
}
}
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/MetadataRequestRetryPolicyTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/MetadataRequestRetryPolicyTests.java
index 210112149959..175eceee540e 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/MetadataRequestRetryPolicyTests.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/MetadataRequestRetryPolicyTests.java
@@ -50,13 +50,13 @@
import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder;
import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorResult;
import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType;
-import io.reactivex.subscribers.TestSubscriber;
import org.mockito.Mockito;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.net.SocketException;
import java.net.URI;
@@ -67,7 +67,6 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
@@ -533,14 +532,10 @@ public static void validateSuccess(Mono single,
public static void validateSuccess(Mono single,
ShouldRetryValidator validator,
long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- single.flux().subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertComplete();
- testSubscriber.assertNoErrors();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
private static RxDocumentServiceRequest createRequest(
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/QuorumReaderTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/QuorumReaderTest.java
index 8a22146d0bf1..cc833ba9ab7f 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/QuorumReaderTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/QuorumReaderTest.java
@@ -17,17 +17,16 @@
import com.azure.cosmos.implementation.StoreResponseBuilder;
import com.azure.cosmos.implementation.guava25.base.Stopwatch;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
-import io.reactivex.subscribers.TestSubscriber;
import org.mockito.Mockito;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static org.assertj.core.api.Assertions.assertThat;
@@ -44,15 +43,15 @@ public QuorumReaderTest() {
public Object[][] simpleReadStrongArgProvider() {
return new Object[][]{
//int replicaCountToRead, ReadMode readMode, Long lsn, Long localLSN
- { 1, ReadMode.Strong, 51l, 18l },
- { 2, ReadMode.Strong, 51l, 18l },
- { 3, ReadMode.Strong, 51l, 18l },
+ { 1, ReadMode.Strong, 51L, 18L },
+ { 2, ReadMode.Strong, 51L, 18L },
+ { 3, ReadMode.Strong, 51L, 18L },
- { 2, ReadMode.Any, 51l, 18l },
- { 1, ReadMode.Any, 51l, 18l },
+ { 2, ReadMode.Any, 51L, 18L },
+ { 1, ReadMode.Any, 51L, 18L },
- { 2, ReadMode.Any, null, 18l },
- { 1, ReadMode.Any, null, 18l },
+ { 2, ReadMode.Any, null, 18L },
+ { 1, ReadMode.Any, null, 18L },
};
}
@@ -268,8 +267,8 @@ public void readStrong_OnlySecondary_RequestBarrier_Success(int numberOfBarrierR
DocumentServiceRequestValidator requestValidator = DocumentServiceRequestValidator.builder()
.add(DocumentServiceRequestContextValidator.builder()
- .qurorumSelectedLSN(0l)
- .globalCommittedSelectedLSN(0l)
+ .qurorumSelectedLSN(0L)
+ .globalCommittedSelectedLSN(0L)
.storeResponses(null)
.build())
.build();
@@ -439,8 +438,8 @@ public void readStrong_SecondaryReadBarrierExhausted_ReadBarrierOnPrimary_Succes
DocumentServiceRequestValidator requestValidator = DocumentServiceRequestValidator.builder()
.add(DocumentServiceRequestContextValidator.builder()
- .qurorumSelectedLSN(0l)
- .globalCommittedSelectedLSN(0l)
+ .qurorumSelectedLSN(0L)
+ .globalCommittedSelectedLSN(0L)
.storeResponses(null)
.build())
.build();
@@ -545,8 +544,8 @@ public void readStrong_QuorumNotSelected_ReadPrimary() {
DocumentServiceRequestValidator requestValidator = DocumentServiceRequestValidator.builder()
.add(DocumentServiceRequestContextValidator.builder()
- .qurorumSelectedLSN(0l)
- .globalCommittedSelectedLSN(0l)
+ .qurorumSelectedLSN(0L)
+ .globalCommittedSelectedLSN(0L)
.storeResponses(null)
.build())
.build();
@@ -631,14 +630,10 @@ public static void validateSuccess(Mono> single,
public static void validateSuccess(Mono> single,
MultiStoreResultValidator validator,
long timeout) {
- TestSubscriber> testSubscriber = new TestSubscriber<>();
-
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
public static void validateSuccess(Mono single,
@@ -649,13 +644,9 @@ public static void validateSuccess(Mono single,
public static void validateSuccess(Mono single,
StoreResponseValidator validator,
long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
}
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientPartitionSplitTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientPartitionSplitTest.java
index 3d3a49410362..abe696c021f5 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientPartitionSplitTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientPartitionSplitTest.java
@@ -18,16 +18,14 @@
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.SessionContainer;
import com.azure.cosmos.implementation.StoreResponseBuilder;
-import io.reactivex.subscribers.TestSubscriber;
-import org.assertj.core.api.Assertions;
import org.mockito.Mockito;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+import java.time.Duration;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
@@ -156,14 +154,10 @@ public static void validateSuccess(Mono> single,
public static void validateSuccess(Mono> single,
MultiStoreResultValidator validator, long timeout) {
- TestSubscriber> testSubscriber = new TestSubscriber<>();
-
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
public static void validateSuccess(Mono single,
@@ -173,26 +167,17 @@ public static void validateSuccess(Mono single,
public static void validateSuccess(Mono single,
StoreResponseValidator validator, long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
public static void validateFailure(Mono single, FailureValidator validator, long timeout) {
-
- TestSubscriber testSubscriber = new TestSubscriber<>();
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNotComplete();
- testSubscriber.assertTerminated();
- Assertions.assertThat(testSubscriber.errorCount()).isEqualTo(1);
- validator.validate(Exceptions.unwrap(testSubscriber.errors().get(0)));
+ StepVerifier.create(single)
+ .expectErrorSatisfies(validator::validate)
+ .verify(Duration.ofMillis(timeout));
}
private PartitionKeyRange partitionKeyRangeWithId(String id) {
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientRetryWithTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientRetryWithTest.java
index 9fce6118c374..cde946c5ed13 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientRetryWithTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientRetryWithTest.java
@@ -16,10 +16,10 @@
import com.azure.cosmos.implementation.SessionContainer;
import com.azure.cosmos.implementation.StoreResponseBuilder;
import com.azure.cosmos.implementation.http.HttpHeaders;
-import io.reactivex.subscribers.TestSubscriber;
import org.mockito.Mockito;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.net.URI;
import java.net.URISyntaxException;
@@ -27,11 +27,10 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import static org.assertj.core.api.Assertions.assertThat;
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
+import static org.assertj.core.api.Assertions.assertThat;
public class ReplicatedResourceClientRetryWithTest {
protected static final int TIMEOUT = 120000;
@@ -155,14 +154,10 @@ public static void validateSuccess(Mono> single,
public static void validateSuccess(Mono> single,
MultiStoreResultValidator validator, long timeout) {
- TestSubscriber> testSubscriber = new TestSubscriber<>();
-
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
public static void validateSuccess(Mono single,
@@ -172,14 +167,10 @@ public static void validateSuccess(Mono single,
public static void validateSuccess(Mono single,
StoreResponseValidator validator, long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
private PartitionKeyRange partitionKeyRangeWithId(String id) {
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientTest.java
index fe42ec93d0e6..26b2fe6bcbfe 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientTest.java
@@ -11,16 +11,15 @@
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
-import io.reactivex.subscribers.TestSubscriber;
-import org.assertj.core.api.Assertions;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
@@ -64,14 +63,8 @@ public void invokeAsyncWithGoneException() {
}
public static void validateFailure(Mono single, FailureValidator validator, long timeout) {
-
- TestSubscriber testSubscriber = new TestSubscriber<>();
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNotComplete();
- testSubscriber.assertTerminated();
- Assertions.assertThat(testSubscriber.errorCount()).isEqualTo(1);
- Throwable throwable = Exceptions.unwrap(testSubscriber.errors().get(0));
- validator.validate(throwable);
+ StepVerifier.create(single)
+ .expectErrorSatisfies(thrown -> validator.validate(Exceptions.unwrap(thrown)))
+ .verify(Duration.ofMillis(timeout));
}
}
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java
index 7685ebb7dff4..37502af46ecf 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java
@@ -71,7 +71,6 @@
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
-import io.reactivex.subscribers.TestSubscriber;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.mockito.Mockito;
@@ -79,6 +78,7 @@
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
+import reactor.test.StepVerifier;
import javax.net.ssl.SSLException;
import java.io.IOException;
@@ -91,7 +91,6 @@
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static com.azure.cosmos.implementation.HttpConstants.HttpHeaders;
@@ -907,17 +906,8 @@ private void validateFailure(final Mono extends StoreResponse> responseMono, f
}
private static void validateFailure(
- final Mono extends StoreResponse> mono, final FailureValidator validator, final long timeout
- ) {
-
- final TestSubscriber subscriber = new TestSubscriber<>();
- mono.subscribe(subscriber);
-
- subscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- assertThat(subscriber.errorCount()).isEqualTo(1);
- subscriber.assertSubscribed();
- subscriber.assertNoValues();
- validator.validate(subscriber.errors().get(0));
+ final Mono extends StoreResponse> mono, final FailureValidator validator, final long timeout) {
+ StepVerifier.create(mono).expectErrorSatisfies(validator::validate).verify(Duration.ofMillis(timeout));
}
// region Types
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderDotNetTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderDotNetTest.java
index c7b33aff323c..0adda7fd8268 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderDotNetTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderDotNetTest.java
@@ -26,7 +26,6 @@
import com.azure.cosmos.implementation.SessionContainer;
import com.azure.cosmos.implementation.StoreResponseBuilder;
import com.azure.cosmos.implementation.Strings;
-import io.reactivex.subscribers.TestSubscriber;
import org.apache.commons.lang3.StringUtils;
import org.assertj.core.api.Assertions;
import org.mockito.ArgumentMatchers;
@@ -35,6 +34,7 @@
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.net.URISyntaxException;
import java.time.Duration;
@@ -42,7 +42,6 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
-import java.util.concurrent.TimeUnit;
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static org.assertj.core.api.Assertions.assertThat;
@@ -513,14 +512,10 @@ public static void validateSuccess(Mono> single,
public static void validateSuccess(Mono> single,
MultiStoreResultValidator validator, long timeout) {
- TestSubscriber> testSubscriber = new TestSubscriber<>();
-
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
public static void validateSuccess(Mono single,
@@ -530,14 +525,10 @@ public static void validateSuccess(Mono single,
public static void validateSuccess(Mono single,
StoreResultValidator validator, long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
public static void validateException(Mono single,
@@ -547,14 +538,9 @@ public static void validateException(Mono single,
public static void validateException(Mono single,
FailureValidator validator, long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNotComplete();
- testSubscriber.assertTerminated();
- assertThat(testSubscriber.errorCount()).isEqualTo(1);
- validator.validate(testSubscriber.errors().get(0));
+ StepVerifier.create(single)
+ .expectErrorSatisfies(validator::validate)
+ .verify(Duration.ofMillis(timeout));
}
/**
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java
index 27052ab0d7a5..047d796ea858 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java
@@ -29,7 +29,6 @@
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.VectorSessionToken;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
-import io.reactivex.subscribers.TestSubscriber;
import org.assertj.core.api.AssertionsForClassTypes;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
@@ -37,7 +36,10 @@
import org.testng.annotations.Test;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+import reactor.test.subscriber.TestSubscriber;
+import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -189,12 +191,7 @@ public void exception(Exception ex, Class klass, int expectedStatusCo
.subStatusCode(expectedSubStatusCode)
.build();
- TestSubscriber> subscriber = new TestSubscriber<>();
- res.subscribe(subscriber);
- subscriber.awaitTerminalEvent();
- subscriber.assertNotComplete();
- assertThat(subscriber.errorCount()).isEqualTo(1);
- failureValidator.validate(subscriber.errors().get(0));
+ StepVerifier.create(res).verifyErrorSatisfies(failureValidator::validate);
if (expectedStatusCode == 410) {
assertThat(dsr.requestContext.getFailedEndpoints().size()).isEqualTo(1);
@@ -970,14 +967,10 @@ public static void validateSuccess(Mono> single,
public static void validateSuccess(Mono> single,
MultiStoreResultValidator validator, long timeout) {
- TestSubscriber> testSubscriber = new TestSubscriber<>();
-
- single.flux().subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
public static void validateSuccess(Mono single,
@@ -987,26 +980,17 @@ public static void validateSuccess(Mono single,
public static void validateSuccess(Mono single,
StoreResultValidator validator, long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- single.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(single)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
public static void validateException(Mono single,
FailureValidator validator, long timeout) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- single.flux().subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNotComplete();
- testSubscriber.assertTerminated();
- assertThat(testSubscriber.errorCount()).isEqualTo(1);
- validator.validate((Throwable) testSubscriber.getEvents().get(1).get(0));
+ StepVerifier.create(single)
+ .expectErrorSatisfies(validator::validate)
+ .verify(Duration.ofMillis(timeout));
}
public static void validateException(Mono single,
@@ -1016,10 +1000,8 @@ public static void validateException(Mono single,
public static void validateError(Mono single,
FailureValidator validator) {
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
try {
- single.flux().subscribe(testSubscriber);
+ single.flux().subscribe(TestSubscriber.create());
} catch (Throwable throwable) {
assertThat(throwable).isInstanceOf(Error.class);
validator.validate(throwable);
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/DocumentProducerTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/DocumentProducerTest.java
index 754a4aa21c9e..d99c4238d00f 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/DocumentProducerTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/DocumentProducerTest.java
@@ -10,7 +10,6 @@
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.GlobalEndpointManager;
-import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.IRetryPolicyFactory;
import com.azure.cosmos.implementation.PartitionKeyRange;
@@ -26,6 +25,7 @@
import com.azure.cosmos.implementation.guava25.collect.Iterables;
import com.azure.cosmos.implementation.guava25.collect.LinkedListMultimap;
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover;
+import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
import com.azure.cosmos.implementation.query.orderbyquery.OrderByRowResult;
import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
@@ -33,7 +33,6 @@
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import com.azure.cosmos.models.FeedResponse;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import io.reactivex.subscribers.TestSubscriber;
import org.assertj.core.api.Assertions;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
@@ -42,6 +41,7 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.net.URI;
import java.time.Duration;
@@ -54,6 +54,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -210,13 +211,10 @@ public void partitionSplit(String initialContinuationToken, int numberOfResultPa
range1,
() -> "n/a");
- TestSubscriber.DocumentProducerFeedResponse> subscriber = new TestSubscriber<>();
-
- documentProducer.produceAsync().subscribe(subscriber);
- subscriber.awaitTerminalEvent();
-
- subscriber.assertNoErrors();
- subscriber.assertComplete();
+ AtomicReference.DocumentProducerFeedResponse>> value = new AtomicReference<>();
+ StepVerifier.create(documentProducer.produceAsync().collectList())
+ .consumeNextWith(value::set)
+ .verifyComplete();
validateSplitCaptureRequests(
requestCreator.invocations,
@@ -233,7 +231,7 @@ public void partitionSplit(String initialContinuationToken, int numberOfResultPa
.distinct().collect(Collectors.toList())).containsExactlyElementsOf(Collections.singleton(initialPageSize));
// expected results
- validateSplitResults(subscriber.values(),
+ validateSplitResults(value.get(),
parentPartitionId, leftChildPartitionId,
rightChildPartitionId, resultFromParentPartition, resultFromLeftChildPartition,
resultFromRightChildPartition, false);
@@ -327,13 +325,10 @@ public void orderByPartitionSplit(String initialContinuationToken, int numberOfR
new HashMap<>(),
() -> "n/a");
- TestSubscriber.DocumentProducerFeedResponse> subscriber = new TestSubscriber<>();
-
- documentProducer.produceAsync().subscribe(subscriber);
- subscriber.awaitTerminalEvent();
-
- subscriber.assertNoErrors();
- subscriber.assertComplete();
+ AtomicReference.DocumentProducerFeedResponse>> value = new AtomicReference<>();
+ StepVerifier.create(documentProducer.produceAsync().collectList())
+ .consumeNextWith(value::set)
+ .verifyComplete();
validateSplitCaptureRequests(requestCreator.invocations, initialContinuationToken, parentPartitionId,
leftChildPartitionId, rightChildPartitionId, resultFromParentPartition,
@@ -344,7 +339,7 @@ public void orderByPartitionSplit(String initialContinuationToken, int numberOfR
assertThat(requestCreator.invocations.stream().map(i -> i.maxItemCount).distinct().collect(Collectors.toList())).containsExactlyElementsOf(Collections.singleton(initialPageSize));
// expected results
- validateSplitResults(subscriber.values(),
+ validateSplitResults(value.get(),
parentPartitionId, leftChildPartitionId,
rightChildPartitionId, resultFromParentPartition, resultFromLeftChildPartition,
resultFromRightChildPartition, true);
@@ -420,13 +415,10 @@ public void partitionMerge(
currentFeedRange,
() -> "n/a");
- TestSubscriber.DocumentProducerFeedResponse> subscriber = new TestSubscriber<>();
-
- documentProducer.produceAsync().subscribe(subscriber);
- subscriber.awaitTerminalEvent();
-
- subscriber.assertNoErrors();
- subscriber.assertComplete();
+ AtomicReference.DocumentProducerFeedResponse>> value = new AtomicReference<>();
+ StepVerifier.create(documentProducer.produceAsync().collectList())
+ .consumeNextWith(value::set)
+ .verifyComplete();
validateMergeCaptureRequests(
requestCreator.invocations,
@@ -510,13 +502,10 @@ public void orderByPartitionMerge(
new HashMap<>(),
() -> "n/a");
- TestSubscriber.DocumentProducerFeedResponse> subscriber = new TestSubscriber<>();
-
- documentProducer.produceAsync().subscribe(subscriber);
- subscriber.awaitTerminalEvent();
-
- subscriber.assertNoErrors();
- subscriber.assertComplete();
+ AtomicReference.DocumentProducerFeedResponse>> value = new AtomicReference<>();
+ StepVerifier.create(documentProducer.produceAsync().collectList())
+ .consumeNextWith(value::set)
+ .verifyComplete();
validateMergeCaptureRequests(
requestCreator.invocations,
@@ -594,15 +583,9 @@ public void simple() {
range1,
() -> "n/a");
- TestSubscriber.DocumentProducerFeedResponse> subscriber = new TestSubscriber<>();
-
- documentProducer.produceAsync().subscribe(subscriber);
- subscriber.awaitTerminalEvent();
-
- subscriber.assertNoErrors();
- subscriber.assertComplete();
-
- subscriber.assertValueCount(responses.size());
+ StepVerifier.create(documentProducer.produceAsync().collectList())
+ .expectNextCount(responses.size())
+ .verifyComplete();
// requests match
assertThat(requestCreator.invocations.stream().map(i -> i.invocationResult).collect(Collectors.toList()))
@@ -696,15 +679,13 @@ public void retries() {
feedRangeEpk,
() -> "n/a");
- TestSubscriber.DocumentProducerFeedResponse> subscriber = new TestSubscriber<>();
+ AtomicReference.DocumentProducerFeedResponse>> value = new AtomicReference<>();
+ StepVerifier.create(documentProducer.produceAsync().collectList())
+ .consumeNextWith(value::set)
+ .verifyComplete();
- documentProducer.produceAsync().subscribe(subscriber);
- subscriber.awaitTerminalEvent();
-
- subscriber.assertNoErrors();
- subscriber.assertComplete();
-
- subscriber.assertValueCount(responsesBeforeThrottle.size() + responsesAfterThrottle.size());
+ Assertions.assertThat(value.get().size())
+ .isEqualTo(responsesBeforeThrottle.size() + responsesAfterThrottle.size());
// requested max page size match
assertThat(requestCreator.invocations.stream().map(i -> i.maxItemCount).distinct().collect(Collectors.toList())).containsExactlyElementsOf(Collections.singleton(7));
@@ -719,7 +700,7 @@ public void retries() {
.containsExactlyElementsOf(Collections.singletonList(feedRangeEpk));
List resultContinuationToken =
- subscriber.values().stream().map(r -> r.pageResult.getContinuationToken()).collect(Collectors.toList());
+ value.get().stream().map(r -> r.pageResult.getContinuationToken()).collect(Collectors.toList());
List beforeExceptionContinuationTokens =
responsesBeforeThrottle.stream().map(FeedResponse::getContinuationToken).collect(Collectors.toList());
List afterExceptionContinuationTokens =
@@ -802,13 +783,10 @@ public void retriesExhausted() {
feedRangeEpk,
() -> "n/a");
- TestSubscriber.DocumentProducerFeedResponse> subscriber = new TestSubscriber<>();
-
- documentProducer.produceAsync().subscribe(subscriber);
- subscriber.awaitTerminalEvent();
-
- subscriber.assertError(throttlingException);
- subscriber.assertValueCount(responsesBeforeThrottle.size());
+ List.DocumentProducerFeedResponse> values = new ArrayList<>();
+ StepVerifier.create(documentProducer.produceAsync())
+ .thenConsumeWhile(values::add)
+ .verifyErrorMatches(throttlingException::equals);
}
private CosmosException mockThrottlingException(Duration retriesAfterDuration) {
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/FetcherTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/FetcherTest.java
index c65918d46c7d..1283644ec296 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/FetcherTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/FetcherTest.java
@@ -3,25 +3,26 @@
package com.azure.cosmos.implementation.query;
+import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.GlobalEndpointManager;
-import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
+import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
+import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
-import com.azure.cosmos.implementation.Document;
-import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.models.ModelBridgeInternal;
-import io.reactivex.subscribers.TestSubscriber;
import org.mockito.Mockito;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -207,13 +208,9 @@ private void validateFetcher(ServerSideOnlyContinuationFetcherImpl fet
}
private FeedResponse validate(Mono> page) {
- TestSubscriber> subscriber = new TestSubscriber<>();
- page.subscribe(subscriber);
- subscriber.awaitTerminalEvent();
- subscriber.assertComplete();
- subscriber.assertNoErrors();
- subscriber.assertValueCount(1);
- return subscriber.values().get(0);
+ AtomicReference> value = new AtomicReference<>();
+ StepVerifier.create(page).consumeNextWith(value::set).verifyComplete();
+ return value.get();
}
private String getExpectedContinuationTokenInRequest(String continuationToken,
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java
index ce9ae238c301..08d3927a162b 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java
@@ -13,13 +13,12 @@
import com.azure.cosmos.implementation.TestUtils;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.IncludedPath;
import com.azure.cosmos.models.IndexingPolicy;
import com.azure.cosmos.models.PartitionKeyDefinition;
-import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.util.CosmosPagedFlux;
-import io.reactivex.subscribers.TestSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
@@ -28,7 +27,9 @@
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;
+import reactor.test.subscriber.TestSubscriber;
import reactor.util.concurrent.Queues;
import java.util.ArrayList;
@@ -112,13 +113,15 @@ public void queryPages(String query, int maxItemCount, int maxExpectedBufferedCo
rxClient.httpRequests.clear();
log.info("instantiating subscriber ...");
- TestSubscriber> subscriber = new TestSubscriber<>(1);
+ TestSubscriber> subscriber = TestSubscriber.builder()
+ .initialRequest(1)
+ .build();
queryObservable.byPage(maxItemCount).publishOn(Schedulers.boundedElastic(), 1).subscribe(subscriber);
int sleepTimeInMillis = 10000;
int i = 0;
// use a test subscriber and request for more result and sleep in between
- while (subscriber.completions() == 0 && subscriber.errorCount() == 0) {
+ while (!subscriber.isTerminated()) {
log.debug("loop " + i);
TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
@@ -126,24 +129,24 @@ public void queryPages(String query, int maxItemCount, int maxExpectedBufferedCo
if (sleepTimeInMillis > 4000) {
// validate that only one item is returned to subscriber in each iteration
- assertThat(subscriber.valueCount() - i).isEqualTo(1);
+ assertThat(subscriber.getReceivedOnNext().size() - i).isEqualTo(1);
}
- log.debug("subscriber.getValueCount(): " + subscriber.valueCount());
+ log.debug("subscriber.getValueCount(): " + subscriber.getReceivedOnNext().size());
log.debug("client.httpRequests.size(): " + rxClient.httpRequests.size());
// validate that the difference between the number of requests to backend
// and the number of returned results is always less than a fixed threshold
- assertThat(rxClient.httpRequests.size() - subscriber.valueCount())
+ assertThat(rxClient.httpRequests.size() - subscriber.getReceivedOnNext().size())
.isLessThanOrEqualTo(maxExpectedBufferedCountForBackPressure);
log.debug("requesting more");
- subscriber.requestMore(1);
+ subscriber.request(1);
i++;
}
- subscriber.assertNoErrors();
- subscriber.assertComplete();
- assertThat(subscriber.values().stream().mapToInt(p -> p.getResults().size()).sum()).isEqualTo(expectedNumberOfResults);
+ assertThat(subscriber.expectTerminalSignal()).satisfies(Signal::isOnComplete);
+ assertThat(subscriber.getReceivedOnNext().stream().mapToInt(p -> p.getResults().size()).sum())
+ .isEqualTo(expectedNumberOfResults);
}
@Test(groups = { "long" }, dataProvider = "queryProvider", timeOut = 2 * TIMEOUT)
@@ -156,13 +159,15 @@ public void queryItems(String query, int maxItemCount, int maxExpectedBufferedCo
rxClient.httpRequests.clear();
log.info("instantiating subscriber ...");
- TestSubscriber subscriber = new TestSubscriber<>(1);
+ TestSubscriber subscriber = TestSubscriber.builder()
+ .initialRequest(1)
+ .build();
queryObservable.publishOn(Schedulers.boundedElastic(), 1).subscribe(subscriber);
int sleepTimeInMillis = 10000;
int i = 0;
// use a test subscriber and request for more result and sleep in between
- while (subscriber.completions() == 0 && subscriber.errorCount() == 0) {
+ while (!subscriber.isTerminated()) {
log.debug("loop " + i);
TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
@@ -170,24 +175,23 @@ public void queryItems(String query, int maxItemCount, int maxExpectedBufferedCo
if (sleepTimeInMillis > 4000) {
// validate that only one item is returned to subscriber in each iteration
- assertThat(subscriber.valueCount() - i).isEqualTo(1);
+ assertThat(subscriber.getReceivedOnNext().size() - i).isEqualTo(1);
}
- log.debug("subscriber.getValueCount(): " + subscriber.valueCount());
+ log.debug("subscriber.getValueCount(): " + subscriber.getReceivedOnNext().size());
log.debug("client.httpRequests.size(): " + rxClient.httpRequests.size());
// validate that the difference between the number of requests to backend
// and the number of returned results is always less than a fixed threshold
- assertThat(rxClient.httpRequests.size() - subscriber.valueCount())
+ assertThat(rxClient.httpRequests.size() - subscriber.getReceivedOnNext().size())
.isLessThanOrEqualTo(maxExpectedBufferedCountForBackPressure);
log.debug("requesting more");
- subscriber.requestMore(1);
+ subscriber.request(1);
i++;
}
- subscriber.assertNoErrors();
- subscriber.assertComplete();
- assertThat(Integer.valueOf(subscriber.values().size())).isEqualTo(expectedNumberOfResults);
+ assertThat(subscriber.expectTerminalSignal()).satisfies(Signal::isOnComplete);
+ assertThat(subscriber.getReceivedOnNext().size()).isEqualTo(expectedNumberOfResults);
}
@BeforeClass(groups = { "long" }, timeOut = SETUP_TIMEOUT)
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java
index 1a3c89736e8c..c133d0e81241 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java
@@ -22,12 +22,13 @@
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.util.CosmosPagedFlux;
-import io.reactivex.subscribers.TestSubscriber;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
+import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;
+import reactor.test.subscriber.TestSubscriber;
import reactor.util.concurrent.Queues;
import java.util.ArrayList;
@@ -81,7 +82,9 @@ public void readFeedPages() throws Exception {
AtomicInteger valueCount = new AtomicInteger();
rxClient.httpRequests.clear();
- TestSubscriber> subscriber = new TestSubscriber>(1);
+ TestSubscriber> subscriber = TestSubscriber.builder()
+ .initialRequest(1)
+ .build();
queryObservable.byPage(1).doOnNext(feedResponse -> {
if (!feedResponse.getResults().isEmpty()) {
valueCount.incrementAndGet();
@@ -92,26 +95,25 @@ public void readFeedPages() throws Exception {
int i = 0;
// use a test subscriber and request for more result and sleep in between
- while (subscriber.completions() == 0 && subscriber.getEvents().get(1).isEmpty()) {
+ while (!subscriber.isTerminated()) {
TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
sleepTimeInMillis /= 2;
if (sleepTimeInMillis > 1000) {
// validate that only one item is returned to subscriber in each iteration
- assertThat(subscriber.valueCount() - i).isEqualTo(1);
+ assertThat(subscriber.getReceivedOnNext().size() - i).isEqualTo(1);
}
// validate that only one item is returned to subscriber in each iteration
// validate that the difference between the number of requests to backend
// and the number of returned results is always less than a fixed threshold
- assertThat(rxClient.httpRequests.size() - subscriber.valueCount())
+ assertThat(rxClient.httpRequests.size() - subscriber.getReceivedOnNext().size())
.isLessThanOrEqualTo(2 * Queues.SMALL_BUFFER_SIZE);
- subscriber.requestMore(1);
+ subscriber.request(1);
i++;
}
- subscriber.assertNoErrors();
- subscriber.assertComplete();
+ assertThat(subscriber.expectTerminalSignal()).satisfies(Signal::isOnComplete);
assertThat(valueCount.get()).isEqualTo(createdDocuments.size());
}
@@ -126,35 +128,34 @@ public void readFeedItems() throws Exception {
AtomicInteger valueCount = new AtomicInteger();
rxClient.httpRequests.clear();
- TestSubscriber subscriber = new TestSubscriber<>(1);
- queryObservable.doOnNext(feedResponse -> {
- valueCount.incrementAndGet();
- }).publishOn(Schedulers.boundedElastic(), 1).subscribe(subscriber);
+ TestSubscriber subscriber = TestSubscriber.builder().initialRequest(1).build();
+ queryObservable.doOnNext(feedResponse -> valueCount.incrementAndGet())
+ .publishOn(Schedulers.boundedElastic(), 1)
+ .subscribe(subscriber);
int sleepTimeInMillis = 10000; // 10 seconds
int i = 0;
// use a test subscriber and request for more result and sleep in between
- while (subscriber.completions() == 0 && subscriber.getEvents().get(1).isEmpty()) {
+ while (!subscriber.isTerminated()) {
TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
sleepTimeInMillis /= 2;
if (sleepTimeInMillis > 1000) {
// validate that only one item is returned to subscriber in each iteration
- assertThat(subscriber.valueCount() - i).isEqualTo(1);
+ assertThat(subscriber.getReceivedOnNext().size() - i).isEqualTo(1);
}
// validate that only one item is returned to subscriber in each iteration
// validate that the difference between the number of requests to backend
// and the number of returned results is always less than a fixed threshold
- assertThat(rxClient.httpRequests.size() - subscriber.valueCount())
+ assertThat(rxClient.httpRequests.size() - subscriber.getReceivedOnNext().size())
.isLessThanOrEqualTo(Queues.SMALL_BUFFER_SIZE);
- subscriber.requestMore(1);
+ subscriber.request(1);
i++;
}
- subscriber.assertNoErrors();
- subscriber.assertComplete();
+ assertThat(subscriber.expectTerminalSignal()).satisfies(Signal::isOnComplete);
assertThat(valueCount.get()).isEqualTo(createdDocuments.size());
}
@@ -167,7 +168,9 @@ public void queryPages() throws Exception {
RxDocumentClientUnderTest rxClient = (RxDocumentClientUnderTest)CosmosBridgeInternal.getAsyncDocumentClient(client);
rxClient.httpRequests.clear();
- TestSubscriber> subscriber = new TestSubscriber>(1);
+ TestSubscriber> subscriber = TestSubscriber.builder()
+ .initialRequest(1)
+ .build();
AtomicInteger valueCount = new AtomicInteger();
queryObservable.byPage(1).doOnNext(feedResponse -> {
@@ -180,26 +183,24 @@ public void queryPages() throws Exception {
int i = 0;
// use a test subscriber and request for more result and sleep in between
- while(subscriber.completions() == 0 && subscriber.getEvents().get(1).isEmpty()) {
+ while (!subscriber.isTerminated()) {
TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
sleepTimeInMillis /= 2;
if (sleepTimeInMillis > 1000) {
// validate that only one item is returned to subscriber in each iteration
- assertThat(subscriber.valueCount() - i).isEqualTo(1);
+ assertThat(subscriber.getReceivedOnNext().size() - i).isEqualTo(1);
}
// validate that the difference between the number of requests to backend
// and the number of returned results is always less than a fixed threshold
- assertThat(rxClient.httpRequests.size() - subscriber.valueCount())
+ assertThat(rxClient.httpRequests.size() - subscriber.getReceivedOnNext().size())
.isLessThanOrEqualTo(2 * Queues.SMALL_BUFFER_SIZE);
- subscriber.requestMore(1);
+ subscriber.request(1);
i++;
}
- subscriber.assertNoErrors();
- subscriber.assertComplete();
-
+ assertThat(subscriber.expectTerminalSignal()).satisfies(Signal::isOnComplete);
assertThat(valueCount.get()).isEqualTo(createdDocuments.size());
}
@@ -212,37 +213,35 @@ public void queryItems() throws Exception {
RxDocumentClientUnderTest rxClient = (RxDocumentClientUnderTest)CosmosBridgeInternal.getAsyncDocumentClient(client);
rxClient.httpRequests.clear();
- TestSubscriber subscriber = new TestSubscriber<>(1);
+ TestSubscriber subscriber = TestSubscriber.builder().initialRequest(1).build();
AtomicInteger valueCount = new AtomicInteger();
- queryObservable.doOnNext(internalObjectNode -> {
- valueCount.incrementAndGet();
- }).publishOn(Schedulers.boundedElastic(), 1).subscribe(subscriber);
+ queryObservable.doOnNext(internalObjectNode -> valueCount.incrementAndGet())
+ .publishOn(Schedulers.boundedElastic(), 1)
+ .subscribe(subscriber);
int sleepTimeInMillis = 10000;
int i = 0;
// use a test subscriber and request for more result and sleep in between
- while(subscriber.completions() == 0 && subscriber.getEvents().get(1).isEmpty()) {
+ while (!subscriber.isTerminated()) {
TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
sleepTimeInMillis /= 2;
if (sleepTimeInMillis > 1000) {
// validate that only one item is returned to subscriber in each iteration
- assertThat(subscriber.valueCount() - i).isEqualTo(1);
+ assertThat(subscriber.getReceivedOnNext().size() - i).isEqualTo(1);
}
// validate that the difference between the number of requests to backend
// and the number of returned results is always less than a fixed threshold
- assertThat(rxClient.httpRequests.size() - subscriber.valueCount())
+ assertThat(rxClient.httpRequests.size() - subscriber.getReceivedOnNext().size())
.isLessThanOrEqualTo(Queues.SMALL_BUFFER_SIZE);
- subscriber.requestMore(1);
+ subscriber.request(1);
i++;
}
- subscriber.assertNoErrors();
- subscriber.assertComplete();
-
+ assertThat(subscriber.expectTerminalSignal()).satisfies(Signal::isOnComplete);
logger.debug("final value count {}", valueCount);
assertThat(valueCount.get()).isEqualTo(createdDocuments.size());
}
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java
index 1fa75cfd69c4..de2b0d9bddd0 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java
@@ -20,12 +20,13 @@
import com.azure.cosmos.util.CosmosPagedFlux;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
-import io.reactivex.subscribers.TestSubscriber;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -33,6 +34,7 @@
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -147,12 +149,12 @@ public void queryWithPartitionKeyAndFeedRange() {
private List queryAndGetResults(SqlQuerySpec querySpec, CosmosQueryRequestOptions options, Class type) {
CosmosPagedFlux queryPagedFlux = createdContainer.queryItems(querySpec, options, type);
- TestSubscriber testSubscriber = new TestSubscriber<>();
- queryPagedFlux.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- return testSubscriber.values();
+ AtomicReference> value = new AtomicReference<>();
+ StepVerifier.create(queryPagedFlux.collectList())
+ .consumeNextWith(value::set)
+ .expectComplete()
+ .verify(Duration.ofMillis(TIMEOUT));
+ return value.get();
}
@BeforeClass(groups = {"query"}, timeOut = SETUP_TIMEOUT)
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java
index 3642833ad0c4..227c4db66133 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java
@@ -17,19 +17,21 @@
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.fasterxml.jackson.databind.JsonNode;
-import io.reactivex.subscribers.TestSubscriber;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
+import reactor.test.StepVerifier;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -284,15 +286,14 @@ private List queryWithContinuationTokens(String query, int p
CosmosPagedFlux queryObservable =
createdCollection.queryItems(query, options, InternalObjectNode.class);
- TestSubscriber> testSubscriber = new TestSubscriber<>();
- queryObservable.byPage(requestContinuation,5).subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
+ AtomicReference> value = new AtomicReference<>();
+ StepVerifier.create(queryObservable.byPage(requestContinuation, 5))
+ .consumeNextWith(value::set)
+ .thenConsumeWhile(Objects::nonNull)
+ .expectComplete()
+ .verify(Duration.ofMillis(TIMEOUT));
- @SuppressWarnings("unchecked")
- FeedResponse firstPage =
- (FeedResponse) testSubscriber.getEvents().get(0).get(0);
+ FeedResponse firstPage = value.get();
requestContinuation = firstPage.getContinuationToken();
receivedDocuments.addAll(firstPage.getResults());
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java
index e210696bad82..e02330cbf9b6 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java
@@ -19,8 +19,8 @@
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.QueryMetrics;
-import com.azure.cosmos.implementation.ResourceValidator;
import com.azure.cosmos.implementation.Resource;
+import com.azure.cosmos.implementation.ResourceValidator;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.Utils.ValueHolder;
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
@@ -39,7 +39,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import io.reactivex.subscribers.TestSubscriber;
import org.apache.commons.lang3.StringUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -49,7 +48,9 @@
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -57,8 +58,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -339,13 +341,14 @@ public void queryOrderByMixedTypes(String sortOrder) throws Exception {
int pageSize = 20;
CosmosPagedFlux queryFlux = createdCollection
.queryItems(query, options, InternalObjectNode.class);
- TestSubscriber> subscriber = new TestSubscriber<>();
- queryFlux.byPage(pageSize).subscribe(subscriber);
- subscriber.awaitTerminalEvent();
- subscriber.assertComplete();
- subscriber.assertNoErrors();
+
List results = new ArrayList<>();
- subscriber.values().forEach(feedResponse -> results.addAll(feedResponse.getResults()));
+ StepVerifier.create(queryFlux.byPage(pageSize))
+ .thenConsumeWhile(feedResponse -> {
+ results.addAll(feedResponse.getResults());
+ return true;
+ }).verifyComplete();
+
// Make sure all elements inserted are returned
assertThat(results.size()).isEqualTo(createdDocuments.size());
@@ -489,16 +492,12 @@ public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exc
CosmosPagedFlux queryObservable = createdCollection.queryItems(query, options, InternalObjectNode.class);
int preferredPageSize = 3;
- TestSubscriber> subscriber = new TestSubscriber<>();
- queryObservable.byPage(preferredPageSize).take(1).subscribe(subscriber);
-
- subscriber.awaitTerminalEvent();
- subscriber.assertComplete();
- subscriber.assertNoErrors();
- assertThat(subscriber.valueCount()).isEqualTo(1);
+ AtomicReference> value = new AtomicReference<>();
+ StepVerifier.create(queryObservable.byPage(preferredPageSize).take(1))
+ .consumeNextWith(value::set)
+ .verifyComplete();
- @SuppressWarnings("unchecked")
- FeedResponse page = (FeedResponse) subscriber.getEvents().get(0).get(0);
+ FeedResponse page = value.get();
assertThat(page.getResults()).hasSize(3);
assertThat(page.getContinuationToken()).isNotEmpty();
@@ -881,10 +880,9 @@ private void assertInvalidContinuationToken(String query, int[] pageSize, List> firstPageObservable = queryObservable.first();
- TestSubscriber> testSubscriber = new TestSubscriber<>();
- queryObservable.byPage(orderByContinuationToken.toString(),1).subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS);
- testSubscriber.assertError(CosmosException.class);
+ StepVerifier.create(queryObservable.byPage(orderByContinuationToken.toString(), 1))
+ .expectError(CosmosException.class)
+ .verify(Duration.ofMillis(TIMEOUT));
} while (requestContinuation != null);
}
@@ -912,14 +910,13 @@ private List queryWithContinuationTokens(String query, int p
options, InternalObjectNode.class);
//Observable> firstPageObservable = queryObservable.byPage().first();
- TestSubscriber> testSubscriber = new TestSubscriber<>();
- queryObservable.byPage(requestContinuation, pageSize).subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
-
- @SuppressWarnings("unchecked")
- FeedResponse firstPage = (FeedResponse) testSubscriber.getEvents().get(0).get(0);
+ AtomicReference> value = new AtomicReference<>();
+ StepVerifier.create(queryObservable.byPage(requestContinuation, pageSize))
+ .consumeNextWith(value::set)
+ .thenConsumeWhile(Objects::nonNull)
+ .verifyComplete();
+
+ FeedResponse firstPage = value.get();
requestContinuation = firstPage.getContinuationToken();
receivedDocuments.addAll(firstPage.getResults());
continuationTokens.add(requestContinuation);
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java
index a2e4dd401504..ff84a3efa2b9 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java
@@ -34,7 +34,6 @@
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.fasterxml.jackson.databind.JsonNode;
-import io.reactivex.subscribers.TestSubscriber;
import org.assertj.core.groups.Tuple;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -42,7 +41,9 @@
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -51,6 +52,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static com.azure.cosmos.models.ModelBridgeInternal.setPartitionKeyRangeIdInternal;
@@ -672,14 +674,14 @@ private List queryWithContinuationTokens(String query, int p
options.setMaxDegreeOfParallelism(2);
CosmosPagedFlux queryObservable = createdCollection.queryItems(query, options, InternalObjectNode.class);
- TestSubscriber> testSubscriber = new TestSubscriber<>();
- queryObservable.byPage(requestContinuation, pageSize).subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
+ AtomicReference> value = new AtomicReference<>();
+ StepVerifier.create(queryObservable.byPage(requestContinuation, pageSize))
+ .consumeNextWith(value::set)
+ .thenConsumeWhile(Objects::nonNull)
+ .expectComplete()
+ .verify(Duration.ofMillis(TIMEOUT));
- @SuppressWarnings("unchecked")
- FeedResponse firstPage = (FeedResponse) testSubscriber.getEvents().get(0).get(0);
+ FeedResponse firstPage = value.get();
requestContinuation = firstPage.getContinuationToken();
receivedDocuments.addAll(firstPage.getResults());
continuationTokens.add(requestContinuation);
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java
index baf6b455c0ab..7bc19c14626d 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java
@@ -38,13 +38,12 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import io.reactivex.subscribers.TestSubscriber;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
-import org.testng.annotations.Ignore;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.ArrayList;
@@ -57,6 +56,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -610,10 +610,12 @@ private List getPartitionKeyRanges(
private List queryAndGetResults(SqlQuerySpec querySpec, CosmosQueryRequestOptions options, Class type) {
CosmosPagedFlux queryPagedFlux = createdContainer.queryItems(querySpec, options, type);
- TestSubscriber testSubscriber = new TestSubscriber<>();
- queryPagedFlux.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS);
- return testSubscriber.values();
+ AtomicReference> value = new AtomicReference<>();
+ StepVerifier.create(queryPagedFlux.collectList())
+ .consumeNextWith(value::set)
+ .expectComplete()
+ .verify(Duration.ofMillis(TIMEOUT));
+ return value.get();
}
private List queryWithContinuationTokens(String query, int pageSize, CosmosAsyncContainer container, Class klass) {
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedExceptionHandlingTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedExceptionHandlingTest.java
index dc513bf148a9..480f818c4632 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedExceptionHandlingTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedExceptionHandlingTest.java
@@ -5,29 +5,25 @@
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosClientBuilder;
-import com.azure.cosmos.implementation.DiagnosticsProvider;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.QueryFeedOperationState;
import com.azure.cosmos.implementation.ResourceType;
-import com.azure.cosmos.models.CosmosClientTelemetryConfig;
import com.azure.cosmos.models.CosmosDatabaseProperties;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;
-import io.reactivex.subscribers.TestSubscriber;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
import java.util.ArrayList;
-import static org.assertj.core.api.Assertions.assertThat;
-
public class ReadFeedExceptionHandlingTest extends TestSuiteBase {
private static final ImplementationBridgeHelpers.FeedResponseHelper.FeedResponseAccessor feedResponseAccessor =
@@ -72,12 +68,9 @@ public void readFeedException() throws Exception {
pagedFluxOptions.setFeedOperationState(state);
return response;
}));
- TestSubscriber> subscriber = new TestSubscriber<>();
- mockedClientWrapper.readAllDatabases().byPage().subscribe(subscriber);
- assertThat(subscriber.valueCount()).isEqualTo(2);
- subscriber.assertNotComplete();
- subscriber.assertTerminated();
- assertThat(subscriber.errorCount()).isEqualTo(1);
+ StepVerifier.create(mockedClientWrapper.readAllDatabases().byPage())
+ .expectNextCount(2)
+ .verifyError();
}
@BeforeClass(groups = { "query" }, timeOut = SETUP_TIMEOUT)
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java
index ba176b28700f..ddb421861ef0 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java
@@ -8,35 +8,35 @@
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosException;
-import com.azure.cosmos.implementation.Configs;
+import com.azure.cosmos.implementation.Database;
+import com.azure.cosmos.implementation.FailureValidator;
+import com.azure.cosmos.implementation.FeedResponseListValidator;
+import com.azure.cosmos.implementation.FeedResponseValidator;
+import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.RxStoreModel;
+import com.azure.cosmos.implementation.TestUtils;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.implementation.guava25.collect.Lists;
-import com.azure.cosmos.models.PartitionKey;
-import com.azure.cosmos.util.CosmosPagedFlux;
-import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
+import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
-import com.azure.cosmos.implementation.Database;
-import com.azure.cosmos.implementation.FailureValidator;
-import com.azure.cosmos.implementation.FeedResponseListValidator;
-import com.azure.cosmos.implementation.FeedResponseValidator;
-import com.azure.cosmos.implementation.TestUtils;
-import io.reactivex.subscribers.TestSubscriber;
+import com.azure.cosmos.util.CosmosPagedFlux;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
+import reactor.test.StepVerifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -258,15 +258,12 @@ public void continuationToken() throws Exception {
int maxItemCount = 3;
CosmosPagedFlux queryObservable = createdCollection.queryItems(query, options, InternalObjectNode.class);
- TestSubscriber> subscriber = new TestSubscriber<>();
- queryObservable.byPage(maxItemCount).take(1).subscribe(subscriber);
+ AtomicReference> value = new AtomicReference<>();
+ StepVerifier.create(queryObservable.byPage(maxItemCount).take(1))
+ .consumeNextWith(value::set)
+ .verifyComplete();
- subscriber.awaitTerminalEvent();
- subscriber.assertComplete();
- subscriber.assertNoErrors();
- assertThat(subscriber.valueCount()).isEqualTo(1);
- @SuppressWarnings("unchecked")
- FeedResponse page = ((FeedResponse) subscriber.getEvents().get(0).get(0));
+ FeedResponse page = value.get();
assertThat(page.getResults()).hasSize(3);
assertThat(page.getContinuationToken()).isNotEmpty();
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java
index d4e06ca7407b..cef37b160f3b 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java
@@ -22,7 +22,6 @@
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.GatewayConnectionConfig;
import com.azure.cosmos.Http2ConnectionConfig;
-import com.azure.cosmos.TestNGLogListener;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ConnectionPolicy;
@@ -67,18 +66,16 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
-import io.reactivex.subscribers.TestSubscriber;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.mockito.stubbing.Answer;
-import org.testng.ITestContext;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.DataProvider;
-import org.testng.annotations.Listeners;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
+import reactor.test.StepVerifier;
import java.io.ByteArrayOutputStream;
import java.time.Duration;
@@ -86,8 +83,10 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static com.azure.cosmos.BridgeInternal.extractConfigs;
@@ -984,15 +983,10 @@ public void validateSuccess(Mono single, CosmosRes
@SuppressWarnings("rawtypes")
public static void validateSuccess(Flux flowable,
CosmosResponseValidator validator, long timeout) {
-
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- flowable.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(flowable)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(timeout));
}
@SuppressWarnings("rawtypes")
@@ -1004,40 +998,26 @@ public void validateFailure(Mono mono, FailureV
@SuppressWarnings("rawtypes")
public static void validateFailure(Flux flowable,
FailureValidator validator, long timeout) throws InterruptedException {
-
- TestSubscriber testSubscriber = new TestSubscriber<>();
-
- flowable.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNotComplete();
- testSubscriber.assertTerminated();
- assertThat(testSubscriber.errors()).hasSize(1);
- validator.validate((Throwable) testSubscriber.getEvents().get(1).get(0));
+ StepVerifier.create(flowable)
+ .expectErrorSatisfies(validator::validate)
+ .verify(Duration.ofMillis(timeout));
}
@SuppressWarnings("rawtypes")
public void validateItemSuccess(
Mono responseMono, CosmosItemResponseValidator validator) {
-
- TestSubscriber testSubscriber = new TestSubscriber<>();
- responseMono.subscribe(testSubscriber);
- testSubscriber.awaitTerminalEvent(subscriberValidationTimeout, TimeUnit.MILLISECONDS);
- testSubscriber.assertNoErrors();
- testSubscriber.assertComplete();
- testSubscriber.assertValueCount(1);
- validator.validate(testSubscriber.values().get(0));
+ StepVerifier.create(responseMono)
+ .assertNext(validator::validate)
+ .expectComplete()
+ .verify(Duration.ofMillis(subscriberValidationTimeout));
}
@SuppressWarnings("rawtypes")
public void validateItemFailure(
Mono