Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ It extends the [observer pattern](http://en.wikipedia.org/wiki/Observer_pattern)
- :+1: `java.util.concurrent.Flow`-based implementation.
- :+1: Virtual Thread support; `virtualCreate()`, `virtualTransform()`, :eye: `Schedulers.virtual()`.
- :+1: New `Streamable<T>` built around Virtual Threads & virtual blocking. Think `IAsyncEnumerable` for Java. :satellite: in progress.
- :+1: Using Java Cleaner API to detect resource leaks and using it for adaptive cleanups.
- :information_source: Reactive Streams Test Compatibility Kit usage; [Reactive-Streams](https://github.com/reactive-streams/reactive-streams-jvm).
- :satellite: Rewamp of the javadoc bloat in the base types via `sealed` interfaces.
- :satellite: Reduce overload bloat by using `record`-based configurations.
- :satellite: Internal optimizations now that I have the master :key:.
- :eye: Possible usages for Scoped variables for context and per-item resource management.
- :eye: Possible use for the Java Cleaner API.
- :eye: Possible inclusion of 2nd and 3rd party operators.
- :eye: Possible inclusion of the Iterable Extensions (Ix) 2nd party library. ju.Stream is sh|t wrt interfacing and composability.
- :question: Android compatibility depends on your API level and what desugaring is available.
Expand Down
109 changes: 106 additions & 3 deletions src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,77 @@

package io.reactivex.rxjava4.core;

import java.lang.ref.Cleaner;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import io.reactivex.rxjava4.annotations.NonNull;
import io.reactivex.rxjava4.disposables.*;
import io.reactivex.rxjava4.plugins.RxJavaPlugins;

/**
* Consist of a terminal stage and a disposable to be able to cancel a sequence.
* @param <T> the return and element type of the various stages
* @param stage the embedded stage to work with
* @param disposable the way to cancel the stage concurrently
* @since 4.0.0
*/
public record CompletionStageDisposable<T>(@NonNull CompletionStage<T> stage, @NonNull Disposable disposable) {
public final class CompletionStageDisposable<T> implements AutoCloseable {

// record classes can't have extra fields, why?
// also I have to write out the constructor instead of declaring it in the record definition, FFS

static final Cleaner cleaner = Cleaner.create();

static volatile Consumer<Cleaner.Cleanable> trackAllocations;

static final class State extends AtomicBoolean implements Runnable {

/** */
private static final long serialVersionUID = 262854674341831347L;

Throwable allocationTrace;

@Override
public void run() {
if (!get()) {
RxJavaPlugins.onError(
new IllegalStateException("CompletionStageDisposable was not awaited or ignored explicitly",
allocationTrace));
}
}

}

final CompletionStage<T> stage;
final Disposable disposable;
final State state;
final Cleaner.Cleanable cleanable;

/**
* Construct an instance with parameters
* @param stage the stage to be awaited
* @param disposable the disposable to cancel asynchronously
*/
public CompletionStageDisposable(@NonNull CompletionStage<T> stage, @NonNull Disposable disposable) {
Objects.requireNonNull(stage, "stage is null");
Objects.requireNonNull(disposable, "disposable is null");
this.stage = stage;
this.disposable = disposable;
this.state = new State();
this.cleanable = cleaner.register(this, state);
if (trackAllocations != null) {
state.allocationTrace = new StackOverflowError("CompletionStageDisposable::AllocationTrace");
trackAllocations.accept(this.cleanable);
} else {
state.allocationTrace = null;
}
}
/**
* Await the completion of the current stage.
*/
public void await() {
state.lazySet(true);;
Streamer.await(stage);
}

Expand All @@ -39,6 +92,56 @@ public void await() {
* @param canceller the canceller link
*/
public void await(DisposableContainer canceller) {
state.lazySet(true);;
Streamer.await(stage, canceller);
}

/**
* Indicate this instance is deliberately not awaiting its stage.
*/
public void ignore() {
state.lazySet(true);;
}

@Override
public void close() {
try {
state.lazySet(true);
disposable.dispose();
} finally {
cleanable.clean();
}
}

/**
* Set an allocator tracer callback to track where CompletionStageDisposables are leaking.
* @param callback the callback to call when a new trace is being established
*/
public static void setAllocationTrace(Consumer<Cleaner.Cleanable> callback) {
trackAllocations = callback;
}

/**
* Returns the current allocation stacktrace capturing consumer.
* @return the current allocation stacktrace capturing consumer.
*/
public static Consumer<Cleaner.Cleanable> getAllocationTrace() {
return trackAllocations;
}

/***
* Returns the associated completion stage value.
* @return the associated completion stage value.
*/
public CompletionStage<T> stage() {
return stage;
}

/**
* Returns the associated disposable value.
* @return the associated disposable value.
*/
public Disposable disposable() {
return disposable;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava4.internal.operators.streamable;

import java.lang.ref.Cleaner;
import java.util.*;

import org.junit.jupiter.api.*;

import io.reactivex.rxjava4.core.CompletionStageDisposable;
import io.reactivex.rxjava4.exceptions.CompositeException;
import io.reactivex.rxjava4.functions.Consumer;
import io.reactivex.rxjava4.plugins.RxJavaPlugins;

public abstract class StreamableBaseTest {

protected java.util.function.Consumer<Cleaner.Cleanable> stageTrackingState;

protected Consumer<? super Throwable> oldHandler;

protected List<Throwable> errors;

protected List<Cleaner.Cleanable> cleaners;

protected volatile boolean undeliverablesExpected;

@BeforeEach
protected final void beforeTest() {
errors = Collections.synchronizedList(new ArrayList<>());
cleaners = Collections.synchronizedList(new ArrayList<>());
undeliverablesExpected = false;

stageTrackingState = CompletionStageDisposable.getAllocationTrace();
CompletionStageDisposable.setAllocationTrace(cleaners::add);

oldHandler = RxJavaPlugins.getErrorHandler();
RxJavaPlugins.setErrorHandler(e -> {
if (!undeliverablesExpected) {
errors.add(e);
}
if (oldHandler != null) {
oldHandler.accept(e);
}
});
}

@AfterEach
protected final void afterTest(TestInfo testInfo) {
CompletionStageDisposable.setAllocationTrace(stageTrackingState);
for (var c : cleaners) {
c.clean();
}
if (errors.size() != 0) {
throw new AssertionError("Undeliverable exceptions during test detected: " + testInfo.getDisplayName(),
new CompositeException(errors));
}
}

protected final void setUndeliverablesExpected(boolean isExpected) {
undeliverablesExpected = isExpected;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

package io.reactivex.rxjava4.internal.operators.streamable;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.*;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -29,7 +28,7 @@
import io.reactivex.rxjava4.testsupport.TestHelper;

@Isolated
public class StreamableTest {
public class StreamableTest extends StreamableBaseTest {

@Test
public void empty() throws Throwable {
Expand All @@ -38,16 +37,18 @@ public void empty() throws Throwable {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
ts.onSubscribe(EmptySubscription.INSTANCE);

var comp = Streamable.empty().forEach(e -> { ts.onError(new TestException("Element produced? " + e)); }, exec);
try (var comp = Streamable.empty().forEach(e -> { ts.onError(new TestException("Element produced? " + e)); }, exec)) {

comp.stage().toCompletableFuture().thenAccept(_ -> ts.onComplete()).exceptionally(e -> { ts.onError(e); return null; }).join();
comp.stage().toCompletableFuture().thenAccept(_ -> ts.onComplete())
.exceptionally(e -> { ts.onError(e); return null; });

ts
.awaitDone(5, TimeUnit.SECONDS)
.assertResult();
ts
.awaitDone(5, TimeUnit.SECONDS)
.assertResult();

assertFalse(exec.isShutdown(), "Exec::IsShutdown");
assertFalse(exec.isTerminated(), "Exec::IsTerminated");
assertFalse(exec.isShutdown(), "Exec::IsShutdown");
assertFalse(exec.isTerminated(), "Exec::IsTerminated");
}
});
}

Expand All @@ -58,16 +59,18 @@ public void just() throws Throwable {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
ts.onSubscribe(EmptySubscription.INSTANCE);

var comp = Streamable.just(1).forEach(e -> { ts.onNext(e); }, exec);
try (var comp = Streamable.just(1).forEach(e -> { ts.onNext(e); }, exec)) {

comp.stage().toCompletableFuture().thenAccept(_ -> ts.onComplete()).exceptionally(e -> { ts.onError(e); return null; }).join();
comp.stage().toCompletableFuture().thenAccept(_ -> ts.onComplete())
.exceptionally(e -> { ts.onError(e); return null; }).join();

ts
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
ts
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);

assertFalse(exec.isShutdown(), "Exec::IsShutdown");
assertFalse(exec.isTerminated(), "Exec::IsTerminated");
assertFalse(exec.isShutdown(), "Exec::IsShutdown");
assertFalse(exec.isTerminated(), "Exec::IsTerminated");
}
});
}

Expand Down