Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions console/src/api/materialize/SubscribeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ export class SubscribeManager<T extends object, R> implements Connectable {
);
};

setRequest = (request: SqlRequest) => {
this.sqlRequest = request;
};

disconnect = () => {
clearInterval(this.flushIntervalHandle);
this.socket.disconnect();
Expand Down
27 changes: 27 additions & 0 deletions console/src/api/materialize/WebsocketConnectionManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,33 @@ describe("WebsocketConnectionManager", () => {
vi.advanceTimersByTime(5000);
expect(mockTarget.reconnect).not.toHaveBeenCalled();
});

it("does not start a second connect while a handshake is in progress", () => {
const store = getStore();
store.set(
environmentsWithHealth,
new Map([
["aws/us-east-1", createHealthyEnvironment("localhost:6875")],
]) as unknown as EnvironmentsWithHealth,
);

// Constructor starts a connect; handshake is still in progress.
manager = new WebsocketConnectionManager(
mockTarget,
store,
reconnectionStateAtom,
);
expect(mockTarget.reconnect).toHaveBeenCalledTimes(1);

// Overlapping reconnect is dropped while the handshake is in progress.
manager.reconnect();
expect(mockTarget.reconnect).toHaveBeenCalledTimes(1);

// Once the handshake completes, the next reconnect proceeds.
mockTarget.simulateOpen();
manager.reconnect();
expect(mockTarget.reconnect).toHaveBeenCalledTimes(2);
});
});

describe("destroy", () => {
Expand Down
37 changes: 22 additions & 15 deletions console/src/api/materialize/WebsocketConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ export class WebsocketConnectionManager {
private hasEverConnected = false;
private retryTimer: ReturnType<typeof setTimeout> | undefined;
private initialized = false;
/** Set while a connect is mid-handshake. Reentry would strand the CONNECTING socket on Safari. */
private connectInFlight = false;

private unsubscribeFromClose: (() => void) | undefined;
private unsubscribeFromOpen: (() => void) | undefined;
Expand Down Expand Up @@ -148,17 +150,15 @@ export class WebsocketConnectionManager {
private handleEnvironmentChange = () => {
const currentEnvironment = this.getCurrentEnvironment();
const nowHealthy = this.isEnvironmentHealthy(currentEnvironment);
const prevHealthy = this.isHealthy;

this.isHealthy = nowHealthy;

// Update http address if environment is enabled
if (currentEnvironment?.state === "enabled") {
this.currentHttpAddress = currentEnvironment.httpAddress;
}

if (nowHealthy) {
if (!prevHealthy || !this.target.isConnected()) {
if (!this.target.isConnected()) {
this.resumeConnection();
}
} else {
Expand Down Expand Up @@ -192,19 +192,26 @@ export class WebsocketConnectionManager {
// --- Target event handlers ---

private handleTargetClose = () => {
this.connectInFlight = false;
if (this.isHealthy) {
this.scheduleRetry();
}
this.notifyStateChange();
};

private handleTargetOpen = () => {
this.connectInFlight = false;
this.hasEverConnected = true;
this.retryAttempt = 0;
this.clearRetryTimer();
this.notifyStateChange();
};

/** Tear down and reopen the socket. Used on SQL request changes. */
reconnect() {
this.attemptConnection();
}

// --- Retry scheduling ---

private scheduleRetry() {
Expand All @@ -225,18 +232,18 @@ export class WebsocketConnectionManager {
}

private attemptConnection() {
if (this.target.isConnected()) return;
if (this.currentHttpAddress) {
const sessionVariables = this.options.getSessionVariables?.({
hasEverConnected: this.hasEverConnected,
});
try {
this.target.reconnect(this.currentHttpAddress, sessionVariables);
} catch {
// If the WebSocket constructor throws (e.g. network blocked),
// schedule another retry
this.scheduleRetry();
}
if (this.connectInFlight) return;
if (!this.currentHttpAddress) return;

const sessionVariables = this.options.getSessionVariables?.({
hasEverConnected: this.hasEverConnected,
});
this.connectInFlight = true;
try {
this.target.reconnect(this.currentHttpAddress, sessionVariables);
} catch {
this.connectInFlight = false;
this.scheduleRetry();
}
}

Expand Down
8 changes: 7 additions & 1 deletion console/src/api/materialize/useSubscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,13 @@ export function useGlobalUpsertSubscribe<T extends object, R = SubscribeRow<T>>(
React.useEffect(() => {
const cleanup = subscribe.onChange(() => {
const snapshot = subscribe.getSnapshot();
if (getStore().get(options.atom) === snapshot) return;
const current = getStore().get(options.atom);
if (current === snapshot) return;

// Hold cached atom data through a fresh manager's empty pre-snapshot state.
const snapshotIsEmptyPreload =
!snapshot.snapshotComplete && !snapshot.data.length && !snapshot.error;
if (snapshotIsEmptyPreload && current.data.length) return;

setValue(snapshot);
});
Expand Down
22 changes: 5 additions & 17 deletions console/src/hooks/useAutomaticallyConnectSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import {
ReconnectionState,
WebsocketConnectionManager,
} from "~/api/materialize/WebsocketConnectionManager";
import { currentEnvironmentState } from "~/store/environments";

// Atom for reconnection state - can be shared across components if needed
export const reconnectionStateAtom = atom<ReconnectionState>({
Expand Down Expand Up @@ -74,27 +73,16 @@ export const useAutomaticallyConnectSocket = <T extends object, R>({
};
}, [target, store, getSessionVariablesRef]);

// Handle request changes for subscribe queries
const currentEnvironment = useAtomValue(currentEnvironmentState);
const previousRequest = usePrevious(request);

React.useEffect(() => {
if (!subscribe || !request) return;
if (previousRequest === request) return;
if (currentEnvironment?.state !== "enabled") return;

subscribe.connect(
request,
currentEnvironment.httpAddress,
getSessionVariablesRef.current?.({ hasEverConnected: false }),
);
}, [
subscribe,
request,
previousRequest,
currentEnvironment,
getSessionVariablesRef,
]);
subscribe.setRequest(request);
// The manager owns the initial connect; only force a reconnect on changes.
if (previousRequest === undefined) return;
managerRef.current?.reconnect();
}, [subscribe, request, previousRequest]);

return { reconnectionState };
};
Loading