Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 63 additions & 32 deletions src/websocket/reconnectingWebSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@ import type {
UnidirectionalStream,
} from "./eventStreamConnection";

/**
* Connection states for the ReconnectingWebSocket state machine.
*/
export enum ConnectionState {
/** Initial state, ready to connect */
IDLE = "IDLE",
/** Actively running connect() - WS factory in progress */
CONNECTING = "CONNECTING",
/** Socket is open and working */
CONNECTED = "CONNECTED",
/** Waiting for backoff timer before attempting reconnection */
AWAITING_RETRY = "AWAITING_RETRY",
/** Temporarily paused - user must call reconnect() to resume */
DISCONNECTED = "DISCONNECTED",
/** Permanently closed - cannot be reused */
DISPOSED = "DISPOSED",
}

export type SocketFactory<TData> = () => Promise<UnidirectionalStream<TData>>;

export interface ReconnectingWebSocketOptions {
Expand Down Expand Up @@ -46,10 +64,8 @@ export class ReconnectingWebSocket<
#lastRoute = "unknown"; // Cached route for logging when socket is closed
#backoffMs: number;
#reconnectTimeoutId: NodeJS.Timeout | null = null;
#isDisconnected = false; // Temporary pause, can be resumed via reconnect()
#isDisposed = false; // Permanent disposal, cannot be resumed
#isConnecting = false;
#pendingReconnect = false;
#state: ConnectionState = ConnectionState.IDLE;
#pendingReconnect = false; // Queue reconnect during CONNECTING state
#certRefreshAttempted = false; // Tracks if cert refresh was already attempted this connection cycle
readonly #onDispose?: () => void;

Expand Down Expand Up @@ -94,11 +110,10 @@ export class ReconnectingWebSocket<
}

/**
* Returns true if the socket is temporarily disconnected and not attempting to reconnect.
* Use reconnect() to resume.
* Returns the current connection state.
*/
get isDisconnected(): boolean {
return this.#isDisconnected;
get state(): string {
return this.#state;
}

/**
Expand Down Expand Up @@ -133,22 +148,22 @@ export class ReconnectingWebSocket<
* Resumes the socket if previously disconnected via disconnect().
*/
reconnect(): void {
if (this.#isDisconnected) {
this.#isDisconnected = false;
this.#backoffMs = this.#options.initialBackoffMs;
this.#certRefreshAttempted = false; // User-initiated reconnect, allow retry
if (this.#state === ConnectionState.DISPOSED) {
return;
}

if (this.#isDisposed) {
return;
if (this.#state === ConnectionState.DISCONNECTED) {
this.#state = ConnectionState.IDLE;
this.#backoffMs = this.#options.initialBackoffMs;
this.#certRefreshAttempted = false; // User-initiated reconnect, allow retry
}

if (this.#reconnectTimeoutId !== null) {
clearTimeout(this.#reconnectTimeoutId);
this.#reconnectTimeoutId = null;
}

if (this.#isConnecting) {
if (this.#state === ConnectionState.CONNECTING) {
this.#pendingReconnect = true;
return;
}
Expand All @@ -161,16 +176,19 @@ export class ReconnectingWebSocket<
* Temporarily disconnect the socket. Can be resumed via reconnect().
*/
disconnect(code?: number, reason?: string): void {
if (this.#isDisposed || this.#isDisconnected) {
if (
this.#state === ConnectionState.DISPOSED ||
this.#state === ConnectionState.DISCONNECTED
) {
return;
}

this.#isDisconnected = true;
this.#state = ConnectionState.DISCONNECTED;
this.clearCurrentSocket(code, reason);
}

close(code?: number, reason?: string): void {
if (this.#isDisposed) {
if (this.#state === ConnectionState.DISPOSED) {
return;
}

Expand All @@ -187,11 +205,16 @@ export class ReconnectingWebSocket<
}

private async connect(): Promise<void> {
if (this.#isDisposed || this.#isDisconnected || this.#isConnecting) {
// Only allow connecting from IDLE, CONNECTED (reconnect), or AWAITING_RETRY states
if (
this.#state === ConnectionState.DISPOSED ||
this.#state === ConnectionState.DISCONNECTED ||
this.#state === ConnectionState.CONNECTING
) {
return;
}

this.#isConnecting = true;
this.#state = ConnectionState.CONNECTING;
try {
// Close any existing socket before creating a new one
if (this.#currentSocket) {
Expand All @@ -204,18 +227,20 @@ export class ReconnectingWebSocket<

const socket = await this.#socketFactory();

// Check if disconnected/disposed while waiting for factory
if (this.#isDisposed || this.#isDisconnected) {
// Check if state changed while waiting for factory (e.g., disconnect/dispose called)
if (this.#state !== ConnectionState.CONNECTING) {
socket.close(WebSocketCloseCode.NORMAL, "Cancelled during connection");
return;
}

this.#currentSocket = socket;
this.#lastRoute = this.#route;
this.#state = ConnectionState.CONNECTED;

socket.addEventListener("open", (event) => {
// Reset backoff on successful connection
this.#backoffMs = this.#options.initialBackoffMs;
this.#certRefreshAttempted = false; // Reset on successful connection
this.#certRefreshAttempted = false;
this.executeHandlers("open", event);
});

Expand All @@ -233,7 +258,10 @@ export class ReconnectingWebSocket<
});

socket.addEventListener("close", (event) => {
if (this.#isDisposed || this.#isDisconnected) {
if (
this.#state === ConnectionState.DISPOSED ||
this.#state === ConnectionState.DISCONNECTED
) {
return;
}

Expand All @@ -256,8 +284,6 @@ export class ReconnectingWebSocket<
} catch (error) {
await this.handleConnectionError(error);
} finally {
this.#isConnecting = false;

if (this.#pendingReconnect) {
this.#pendingReconnect = false;
this.reconnect();
Expand All @@ -267,13 +293,15 @@ export class ReconnectingWebSocket<

private scheduleReconnect(): void {
if (
this.#isDisposed ||
this.#isDisconnected ||
this.#reconnectTimeoutId !== null
this.#state === ConnectionState.DISPOSED ||
this.#state === ConnectionState.DISCONNECTED ||
this.#state === ConnectionState.AWAITING_RETRY
) {
return;
}

this.#state = ConnectionState.AWAITING_RETRY;

const jitter =
this.#backoffMs * this.#options.jitterFactor * (Math.random() * 2 - 1);
const delayMs = Math.max(0, this.#backoffMs + jitter);
Expand Down Expand Up @@ -354,7 +382,10 @@ export class ReconnectingWebSocket<
* otherwise schedules a reconnect.
*/
private async handleConnectionError(error: unknown): Promise<void> {
if (this.#isDisposed || this.#isDisconnected) {
if (
this.#state === ConnectionState.DISPOSED ||
this.#state === ConnectionState.DISCONNECTED
) {
return;
}

Expand Down Expand Up @@ -396,11 +427,11 @@ export class ReconnectingWebSocket<
}

private dispose(code?: number, reason?: string): void {
if (this.#isDisposed) {
if (this.#state === ConnectionState.DISPOSED) {
return;
}

this.#isDisposed = true;
this.#state = ConnectionState.DISPOSED;
this.clearCurrentSocket(code, reason);

for (const set of Object.values(this.#eventHandlers)) {
Expand Down
Loading