Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ com.nimbusds:nimbus-jose-jwt;9.37.3
com.puppycrawl.tools:checkstyle;9.3
com.toasttab.android:gummy-bears-api-26;0.12.0
commons-io:commons-io;2.17.0
io.reactivex.rxjava2:rxjava;2.2.21
net.java.dev.jna:jna-platform;5.17.0
net.oneandone.reflections8:reflections8;0.11.7
net.jonathangiles.tools:dependencyChecker-maven-plugin;1.0.6
Expand Down
7 changes: 0 additions & 7 deletions sdk/cosmos/azure-cosmos-encryption/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,6 @@ Licensed under the MIT License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.21</version> <!-- {x-version-update;io.reactivex.rxjava2:rxjava;external_dependency} -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
6 changes: 0 additions & 6 deletions sdk/cosmos/azure-cosmos-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,6 @@ Licensed under the MIT License.
<version>3.7.11</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.21</version> <!-- {x-version-update;io.reactivex.rxjava2:rxjava;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,21 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import io.reactivex.subscribers.TestSubscriber;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -594,14 +596,14 @@ private void testPartialPKContinuationToken() {
options.setMaxDegreeOfParallelism(2);
CosmosPagedFlux<ObjectNode> queryObservable = cosmosAsyncContainer.queryItems(query, options, ObjectNode.class);

TestSubscriber<FeedResponse<ObjectNode>> testSubscriber = new TestSubscriber<>();
queryObservable.byPage(requestContinuation, 1).subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertComplete();
AtomicReference<FeedResponse<ObjectNode>> value = new AtomicReference<>();
StepVerifier.create(queryObservable.byPage(requestContinuation, 1))
.assertNext(value::set)
.thenConsumeWhile(Objects::nonNull)
.expectComplete()
.verify(Duration.ofMillis(TIMEOUT));

@SuppressWarnings("unchecked")
FeedResponse<ObjectNode> firstPage = (FeedResponse<ObjectNode>) testSubscriber.getEvents().get(0).get(0);
FeedResponse<ObjectNode> firstPage = value.get();
requestContinuation = firstPage.getContinuationToken();
receivedDocuments.addAll(firstPage.getResults());
assertThat(firstPage.getResults().size()).isEqualTo(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@
import io.netty.buffer.ByteBufUtil;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.util.ReferenceCountUtil;
import io.reactivex.subscribers.TestSubscriber;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.lang.reflect.Field;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -955,30 +955,24 @@ public void throttlingExceptionGatewayModeScenario() {
}

private StoreResponse validateSuccess(Mono<StoreResponse> storeResponse) {
TestSubscriber<StoreResponse> testSubscriber = new TestSubscriber<>();
storeResponse.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(60000, TimeUnit.MILLISECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertComplete();
testSubscriber.assertValueCount(1);
return testSubscriber.values().get(0);
AtomicReference<StoreResponse> value = new AtomicReference<>();
StepVerifier.create(storeResponse)
.assertNext(value::set)
.expectComplete()
.verify(Duration.ofMillis(60_000));

return value.get();
}

private void validateFailure(Mono<StoreResponse> storeResponse) {
TestSubscriber<StoreResponse> testSubscriber = new TestSubscriber<>();
storeResponse.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(60000, TimeUnit.MILLISECONDS);
testSubscriber.assertNotComplete();
testSubscriber.assertTerminated();
StepVerifier.create(storeResponse).expectError().verify(Duration.ofMillis(60_000));
}

private void validateServiceResponseSuccess(Mono<ResourceResponse<Document>> documentServiceResponseMono) {
TestSubscriber<ResourceResponse<Document>> testSubscriber = new TestSubscriber<>();
documentServiceResponseMono.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(60000, TimeUnit.MILLISECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertComplete();
testSubscriber.assertValueCount(1);
StepVerifier.create(documentServiceResponseMono)
.expectNextCount(1)
.expectComplete()
.verify(Duration.ofMillis(60_000));
}

private static class TestRetryPolicy extends DocumentClientRetryPolicy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,26 @@

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.directconnectivity.WFConstants;
import java.util.Map;
import java.util.HashMap;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
import com.azure.cosmos.implementation.directconnectivity.ChannelAcquisitionException;
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import com.azure.cosmos.implementation.directconnectivity.WFConstants;
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import io.netty.handler.timeout.ReadTimeoutException;
import io.reactivex.subscribers.TestSubscriber;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import javax.net.ssl.SSLHandshakeException;
import java.net.SocketException;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.Map;

import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;

Expand Down Expand Up @@ -687,13 +686,9 @@ public static void validateSuccess(Mono<ShouldRetryResult> single,
public static void validateSuccess(Mono<ShouldRetryResult> single,
ShouldRetryValidator validator,
long timeout) {
TestSubscriber<ShouldRetryResult> testSubscriber = new TestSubscriber<>();

single.flux().subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
testSubscriber.assertComplete();
testSubscriber.assertNoErrors();
testSubscriber.assertValueCount(1);
validator.validate(testSubscriber.values().get(0));
StepVerifier.create(single)
.assertNext(validator::validate)
.expectComplete()
.verify(Duration.ofMillis(timeout));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.StoreResponseValidator;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.reactivex.subscribers.TestSubscriber;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
Expand All @@ -15,14 +14,13 @@
import org.testng.annotations.Test;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static com.azure.cosmos.implementation.TestUtils.mockDocumentServiceRequest;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

public class RetryUtilsTest {
Expand Down Expand Up @@ -109,26 +107,20 @@ public void toRetryWithAlternateFuncTestingMethodTwo() {
}

private void validateFailure(Mono<StoreResponse> single, long timeout, Class<? extends Throwable> class1) {

TestSubscriber<StoreResponse> testSubscriber = new TestSubscriber<>();
single.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
testSubscriber.assertNotComplete();
testSubscriber.assertTerminated();
assertThat(testSubscriber.errorCount()).isEqualTo(1);
Throwable throwable = Exceptions.unwrap(testSubscriber.errors().get(0));
if (!(throwable.getClass().equals(class1))) {
fail("Not expecting " + testSubscriber.getEvents().get(1).get(0));
}
StepVerifier.create(single)
.expectErrorSatisfies(thrown -> {
Throwable throwable = Exceptions.unwrap(thrown);
if (!(throwable.getClass().equals(class1))) {
fail("Not expecting " + thrown);
}
}).verify(Duration.ofMillis(timeout));
}

private void validateSuccess(Mono<StoreResponse> single, StoreResponseValidator validator, long timeout) {

TestSubscriber<StoreResponse> testSubscriber = new TestSubscriber<>();
single.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
assertThat(testSubscriber.valueCount()).isEqualTo(1);
validator.validate(testSubscriber.values().get(0));
StepVerifier.create(single)
.assertNext(validator::validate)
.expectComplete()
.verify(Duration.ofMillis(timeout));
}

private void toggleMockFuncBtwFailureSuccess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.implementation.http.HttpClient;
Expand All @@ -14,17 +13,16 @@
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import io.netty.channel.ConnectTimeoutException;
import io.netty.handler.timeout.ReadTimeoutException;
import io.reactivex.subscribers.TestSubscriber;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.net.SocketException;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -303,13 +301,9 @@ public void validateFailure(Mono<RxDocumentServiceResponse> observable,
public static void validateFailure(Mono<RxDocumentServiceResponse> observable,
FailureValidator validator,
long timeout) {
TestSubscriber<RxDocumentServiceResponse> testSubscriber = new TestSubscriber<>();
observable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
testSubscriber.assertNotComplete();
testSubscriber.assertTerminated();
assertThat(testSubscriber.errorCount()).isEqualTo(1);
validator.validate(testSubscriber.errors().get(0));
StepVerifier.create(observable)
.expectErrorSatisfies(validator::validate)
.verify(Duration.ofMillis(timeout));
}

enum SessionTokenType {
Expand Down
Loading
Loading