Skip to content

Commit 8b22695

Browse files
committed
Fix CI examples and formatting
1 parent 4e8d895 commit 8b22695

156 files changed

Lines changed: 6430 additions & 6071 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ jobs:
5353
uses: gradle/actions/wrapper-validation@v4
5454

5555
- name: Check formatting (Spotless)
56+
if: ${{ matrix.jdk == '21' }}
5657
run: ./gradlew spotlessCheck --no-daemon
5758

5859
- name: Build

arcp-client/src/main/java/dev/arcp/client/ArcpClient.java

Lines changed: 486 additions & 476 deletions
Large diffs are not rendered by default.
Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package dev.arcp.client;
22

33
import dev.arcp.core.credentials.Credential;
4-
import dev.arcp.core.events.EventBody;
54
import dev.arcp.core.error.ArcpException;
5+
import dev.arcp.core.events.EventBody;
66
import dev.arcp.core.ids.JobId;
77
import dev.arcp.core.messages.JobAccepted;
88
import dev.arcp.core.messages.JobResult;
@@ -14,21 +14,21 @@
1414
/** Client-side handle to one submitted job. */
1515
public interface JobHandle {
1616

17-
JobId jobId();
17+
JobId jobId();
1818

19-
String resolvedAgent();
19+
String resolvedAgent();
2020

21-
JobAccepted accepted();
21+
JobAccepted accepted();
2222

23-
default Optional<List<Credential>> credentials() {
24-
return Optional.ofNullable(accepted().credentials());
25-
}
23+
default Optional<List<Credential>> credentials() {
24+
return Optional.ofNullable(accepted().credentials());
25+
}
2626

27-
/** Hot publisher of {@link EventBody} for this job's {@code job.event} stream. */
28-
Flow.Publisher<EventBody> events();
27+
/** Hot publisher of {@link EventBody} for this job's {@code job.event} stream. */
28+
Flow.Publisher<EventBody> events();
2929

30-
/** Completes with {@link JobResult} on success or fails with {@link ArcpException}. */
31-
CompletableFuture<JobResult> result();
30+
/** Completes with {@link JobResult} on success or fails with {@link ArcpException}. */
31+
CompletableFuture<JobResult> result();
3232

33-
void cancel();
33+
void cancel();
3434
}

arcp-client/src/main/java/dev/arcp/client/Page.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
import org.jspecify.annotations.Nullable;
66

77
public record Page<T>(List<T> items, @Nullable String nextCursor) {
8-
public static Page<JobSummary> empty() {
9-
return new Page<>(List.of(), null);
10-
}
8+
public static Page<JobSummary> empty() {
9+
return new Page<>(List.of(), null);
10+
}
1111

12-
public boolean hasNext() {
13-
return nextCursor != null;
14-
}
12+
public boolean hasNext() {
13+
return nextCursor != null;
14+
}
1515
}

arcp-client/src/main/java/dev/arcp/client/ReplayingPublisher.java

Lines changed: 92 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -10,113 +10,113 @@
1010
import java.util.concurrent.locks.ReentrantLock;
1111

1212
/**
13-
* Multicast {@link Flow.Publisher} that buffers every emission and replays the
14-
* buffer to each newly attached subscriber before forwarding live deliveries.
15-
* Used by {@link JobHandle#events()} so callers subscribing after a job has
16-
* already emitted still see the full event history.
13+
* Multicast {@link Flow.Publisher} that buffers every emission and replays the buffer to each newly
14+
* attached subscriber before forwarding live deliveries. Used by {@link JobHandle#events()} so
15+
* callers subscribing after a job has already emitted still see the full event history.
1716
*/
1817
final class ReplayingPublisher<T> implements Flow.Publisher<T> {
1918

20-
private final List<T> buffer = new CopyOnWriteArrayList<>();
21-
private final SubmissionPublisher<T> live;
22-
private final ReentrantLock lock = new ReentrantLock();
23-
private volatile boolean closed;
19+
private final List<T> buffer = new CopyOnWriteArrayList<>();
20+
private final SubmissionPublisher<T> live;
21+
private final ReentrantLock lock = new ReentrantLock();
22+
private volatile boolean closed;
2423

25-
ReplayingPublisher() {
26-
this.live = new SubmissionPublisher<>(
27-
Executors.newVirtualThreadPerTaskExecutor(), 1024);
28-
}
24+
ReplayingPublisher() {
25+
this.live = new SubmissionPublisher<>(Executors.newVirtualThreadPerTaskExecutor(), 1024);
26+
}
2927

30-
// Lock spans live.submit so concurrent producers preserve buffer/live order;
31-
// back-pressure blocking on a full submission queue therefore stalls peers.
32-
void submit(T item) {
33-
lock.lock();
34-
try {
35-
buffer.add(item);
36-
live.submit(item);
37-
} finally {
38-
lock.unlock();
39-
}
28+
// Lock spans live.submit so concurrent producers preserve buffer/live order;
29+
// back-pressure blocking on a full submission queue therefore stalls peers.
30+
void submit(T item) {
31+
lock.lock();
32+
try {
33+
buffer.add(item);
34+
live.submit(item);
35+
} finally {
36+
lock.unlock();
4037
}
38+
}
4139

42-
void close() {
43-
lock.lock();
44-
try {
45-
if (closed) {
46-
return;
47-
}
48-
closed = true;
49-
live.close();
50-
} finally {
51-
lock.unlock();
52-
}
40+
void close() {
41+
lock.lock();
42+
try {
43+
if (closed) {
44+
return;
45+
}
46+
closed = true;
47+
live.close();
48+
} finally {
49+
lock.unlock();
5350
}
51+
}
5452

55-
@Override
56-
public void subscribe(Flow.Subscriber<? super T> downstream) {
57-
final List<T> snapshot;
58-
final boolean wasClosed;
59-
AtomicBoolean cancelled = new AtomicBoolean(false);
53+
@Override
54+
public void subscribe(Flow.Subscriber<? super T> downstream) {
55+
final List<T> snapshot;
56+
final boolean wasClosed;
57+
AtomicBoolean cancelled = new AtomicBoolean(false);
6058

61-
// Hold the publisher lock while snapshotting AND attaching to live so
62-
// no submit() can interleave between the two and produce a gap.
63-
lock.lock();
64-
try {
65-
snapshot = new ArrayList<>(buffer);
66-
wasClosed = closed;
67-
if (!wasClosed) {
68-
live.subscribe(new Flow.Subscriber<T>() {
69-
@Override
70-
public void onSubscribe(Flow.Subscription s) {
71-
s.request(Long.MAX_VALUE);
72-
}
59+
// Hold the publisher lock while snapshotting AND attaching to live so
60+
// no submit() can interleave between the two and produce a gap.
61+
lock.lock();
62+
try {
63+
snapshot = new ArrayList<>(buffer);
64+
wasClosed = closed;
65+
if (!wasClosed) {
66+
live.subscribe(
67+
new Flow.Subscriber<T>() {
68+
@Override
69+
public void onSubscribe(Flow.Subscription s) {
70+
s.request(Long.MAX_VALUE);
71+
}
7372

74-
@Override
75-
public void onNext(T item) {
76-
if (!cancelled.get()) {
77-
downstream.onNext(item);
78-
}
79-
}
73+
@Override
74+
public void onNext(T item) {
75+
if (!cancelled.get()) {
76+
downstream.onNext(item);
77+
}
78+
}
8079

81-
@Override
82-
public void onError(Throwable throwable) {
83-
if (!cancelled.get()) {
84-
downstream.onError(throwable);
85-
}
86-
}
80+
@Override
81+
public void onError(Throwable throwable) {
82+
if (!cancelled.get()) {
83+
downstream.onError(throwable);
84+
}
85+
}
8786

88-
@Override
89-
public void onComplete() {
90-
if (!cancelled.get()) {
91-
downstream.onComplete();
92-
}
93-
}
94-
});
95-
}
96-
} finally {
97-
lock.unlock();
98-
}
87+
@Override
88+
public void onComplete() {
89+
if (!cancelled.get()) {
90+
downstream.onComplete();
91+
}
92+
}
93+
});
94+
}
95+
} finally {
96+
lock.unlock();
97+
}
9998

100-
downstream.onSubscribe(new Flow.Subscription() {
101-
@Override
102-
public void request(long n) {
103-
// Replay is delivered eagerly below; live items honor the
104-
// forwarder's request(MAX_VALUE).
105-
}
99+
downstream.onSubscribe(
100+
new Flow.Subscription() {
101+
@Override
102+
public void request(long n) {
103+
// Replay is delivered eagerly below; live items honor the
104+
// forwarder's request(MAX_VALUE).
105+
}
106106

107-
@Override
108-
public void cancel() {
109-
cancelled.set(true);
110-
}
107+
@Override
108+
public void cancel() {
109+
cancelled.set(true);
110+
}
111111
});
112-
for (T item : snapshot) {
113-
if (cancelled.get()) {
114-
return;
115-
}
116-
downstream.onNext(item);
117-
}
118-
if (wasClosed) {
119-
downstream.onComplete();
120-
}
112+
for (T item : snapshot) {
113+
if (cancelled.get()) {
114+
return;
115+
}
116+
downstream.onNext(item);
117+
}
118+
if (wasClosed) {
119+
downstream.onComplete();
121120
}
121+
}
122122
}

0 commit comments

Comments
 (0)