| import { AgentMessage, State } from "./types"; |
| import { formatNumber } from "./utils"; |
| |
| /** |
| * Event types for data manager |
| */ |
| export type DataManagerEventType = "dataChanged" | "connectionStatusChanged"; |
| |
| /** |
| * Connection status types |
| */ |
| export type ConnectionStatus = |
| | "connected" |
| | "connecting" |
| | "disconnected" |
| | "disabled"; |
| |
| /** |
| * DataManager - Class to manage timeline data, fetching, and SSE streaming |
| */ |
| export class DataManager { |
| // State variables |
| private messages: AgentMessage[] = []; |
| private timelineState: State | null = null; |
| private isFirstLoad: boolean = true; |
| private lastHeartbeatTime: number = 0; |
| private connectionStatus: ConnectionStatus = "disconnected"; |
| private eventSource: EventSource | null = null; |
| private reconnectTimer: number | null = null; |
| private reconnectAttempt: number = 0; |
| private maxReconnectDelayMs: number = 60000; // Max delay of 60 seconds |
| private baseReconnectDelayMs: number = 1000; // Start with 1 second |
| |
| // Event listeners |
| private eventListeners: Map< |
| DataManagerEventType, |
| Array<(...args: any[]) => void> |
| > = new Map(); |
| |
| constructor() { |
| // Initialize empty arrays for each event type |
| this.eventListeners.set("dataChanged", []); |
| this.eventListeners.set("connectionStatusChanged", []); |
| |
| // Check connection status periodically |
| setInterval(() => this.checkConnectionStatus(), 5000); |
| } |
| |
| /** |
| * Initialize the data manager and connect to the SSE stream |
| */ |
| public async initialize(): Promise<void> { |
| // Connect to the SSE stream |
| this.connect(); |
| } |
| |
| /** |
| * Connect to the SSE stream |
| */ |
| private connect(): void { |
| // If we're already connecting or connected, don't start another connection attempt |
| if ( |
| this.eventSource && |
| (this.connectionStatus === "connecting" || |
| this.connectionStatus === "connected") |
| ) { |
| return; |
| } |
| |
| // Close any existing connection |
| this.closeEventSource(); |
| |
| // Update connection status to connecting |
| this.updateConnectionStatus("connecting", "Connecting..."); |
| |
| // Determine the starting point for the stream based on what we already have |
| const fromIndex = |
| this.messages.length > 0 |
| ? this.messages[this.messages.length - 1].idx + 1 |
| : 0; |
| |
| // Create a new EventSource connection |
| this.eventSource = new EventSource(`stream?from=${fromIndex}`); |
| |
| // Set up event handlers |
| this.eventSource.addEventListener("open", () => { |
| console.log("SSE stream opened"); |
| this.reconnectAttempt = 0; // Reset reconnect attempt counter on successful connection |
| this.updateConnectionStatus("connected"); |
| this.lastHeartbeatTime = Date.now(); // Set initial heartbeat time |
| }); |
| |
| this.eventSource.addEventListener("error", (event) => { |
| console.error("SSE stream error:", event); |
| this.closeEventSource(); |
| this.updateConnectionStatus("disconnected", "Connection lost"); |
| this.scheduleReconnect(); |
| }); |
| |
| // Handle incoming messages |
| this.eventSource.addEventListener("message", (event) => { |
| const message = JSON.parse(event.data) as AgentMessage; |
| this.processNewMessage(message); |
| }); |
| |
| // Handle state updates |
| this.eventSource.addEventListener("state", (event) => { |
| const state = JSON.parse(event.data) as State; |
| this.timelineState = state; |
| this.emitEvent("dataChanged", { state, newMessages: [] }); |
| }); |
| |
| // Handle heartbeats |
| this.eventSource.addEventListener("heartbeat", () => { |
| this.lastHeartbeatTime = Date.now(); |
| // Make sure connection status is updated if it wasn't already |
| if (this.connectionStatus !== "connected") { |
| this.updateConnectionStatus("connected"); |
| } |
| }); |
| } |
| |
| /** |
| * Close the current EventSource connection |
| */ |
| private closeEventSource(): void { |
| if (this.eventSource) { |
| this.eventSource.close(); |
| this.eventSource = null; |
| } |
| } |
| |
| /** |
| * Schedule a reconnection attempt with exponential backoff |
| */ |
| private scheduleReconnect(): void { |
| if (this.reconnectTimer !== null) { |
| window.clearTimeout(this.reconnectTimer); |
| this.reconnectTimer = null; |
| } |
| |
| // Calculate backoff delay with exponential increase and maximum limit |
| const delay = Math.min( |
| this.baseReconnectDelayMs * Math.pow(1.5, this.reconnectAttempt), |
| this.maxReconnectDelayMs, |
| ); |
| |
| console.log( |
| `Scheduling reconnect in ${delay}ms (attempt ${this.reconnectAttempt + 1})`, |
| ); |
| |
| // Increment reconnect attempt counter |
| this.reconnectAttempt++; |
| |
| // Schedule the reconnect |
| this.reconnectTimer = window.setTimeout(() => { |
| this.reconnectTimer = null; |
| this.connect(); |
| }, delay); |
| } |
| |
| /** |
| * Check heartbeat status to determine if connection is still active |
| */ |
| private checkConnectionStatus(): void { |
| if (this.connectionStatus !== "connected") { |
| return; // Only check if we think we're connected |
| } |
| |
| const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeatTime; |
| if (timeSinceLastHeartbeat > 90000) { |
| // 90 seconds without heartbeat |
| console.warn( |
| "No heartbeat received in 90 seconds, connection appears to be lost", |
| ); |
| this.closeEventSource(); |
| this.updateConnectionStatus( |
| "disconnected", |
| "Connection timed out (no heartbeat)", |
| ); |
| this.scheduleReconnect(); |
| } |
| } |
| |
| /** |
| * Process a new message from the SSE stream |
| */ |
| private processNewMessage(message: AgentMessage): void { |
| // Find the message's position in the array |
| const existingIndex = this.messages.findIndex((m) => m.idx === message.idx); |
| |
| if (existingIndex >= 0) { |
| // This shouldn't happen - we should never receive duplicates |
| console.error( |
| `Received duplicate message with idx ${message.idx}`, |
| message, |
| ); |
| return; |
| } else { |
| // Add the new message to our array |
| this.messages.push(message); |
| // Sort messages by idx to ensure they're in the correct order |
| this.messages.sort((a, b) => a.idx - b.idx); |
| } |
| |
| // Mark that we've completed first load |
| if (this.isFirstLoad) { |
| this.isFirstLoad = false; |
| } |
| |
| // Emit an event that data has changed |
| this.emitEvent("dataChanged", { |
| state: this.timelineState, |
| newMessages: [message], |
| isFirstFetch: false, |
| }); |
| } |
| |
| /** |
| * Get all messages |
| */ |
| public getMessages(): AgentMessage[] { |
| return this.messages; |
| } |
| |
| /** |
| * Get the current state |
| */ |
| public getState(): State | null { |
| return this.timelineState; |
| } |
| |
| /** |
| * Get the connection status |
| */ |
| public getConnectionStatus(): ConnectionStatus { |
| return this.connectionStatus; |
| } |
| |
| /** |
| * Get the isFirstLoad flag |
| */ |
| public getIsFirstLoad(): boolean { |
| return this.isFirstLoad; |
| } |
| |
| /** |
| * Add an event listener |
| */ |
| public addEventListener( |
| event: DataManagerEventType, |
| callback: (...args: any[]) => void, |
| ): void { |
| const listeners = this.eventListeners.get(event) || []; |
| listeners.push(callback); |
| this.eventListeners.set(event, listeners); |
| } |
| |
| /** |
| * Remove an event listener |
| */ |
| public removeEventListener( |
| event: DataManagerEventType, |
| callback: (...args: any[]) => void, |
| ): void { |
| const listeners = this.eventListeners.get(event) || []; |
| const index = listeners.indexOf(callback); |
| if (index !== -1) { |
| listeners.splice(index, 1); |
| this.eventListeners.set(event, listeners); |
| } |
| } |
| |
| /** |
| * Emit an event |
| */ |
| private emitEvent(event: DataManagerEventType, ...args: any[]): void { |
| const listeners = this.eventListeners.get(event) || []; |
| listeners.forEach((callback) => callback(...args)); |
| } |
| |
| /** |
| * Update the connection status |
| */ |
| private updateConnectionStatus( |
| status: ConnectionStatus, |
| message?: string, |
| ): void { |
| if (this.connectionStatus !== status) { |
| this.connectionStatus = status; |
| this.emitEvent("connectionStatusChanged", status, message || ""); |
| } |
| } |
| |
| /** |
| * Send a message to the agent |
| */ |
| public async send(message: string): Promise<boolean> { |
| // Attempt to connect if we're not already connected |
| if ( |
| this.connectionStatus !== "connected" && |
| this.connectionStatus !== "connecting" |
| ) { |
| this.connect(); |
| } |
| |
| try { |
| const response = await fetch("chat", { |
| method: "POST", |
| headers: { |
| "Content-Type": "application/json", |
| }, |
| body: JSON.stringify({ message }), |
| }); |
| |
| if (!response.ok) { |
| throw new Error(`HTTP error! Status: ${response.status}`); |
| } |
| |
| return true; |
| } catch (error) { |
| console.error("Error sending message:", error); |
| return false; |
| } |
| } |
| |
| /** |
| * Cancel the current conversation |
| */ |
| public async cancel(): Promise<boolean> { |
| try { |
| const response = await fetch("cancel", { |
| method: "POST", |
| headers: { |
| "Content-Type": "application/json", |
| }, |
| body: JSON.stringify({ reason: "User cancelled" }), |
| }); |
| |
| if (!response.ok) { |
| throw new Error(`HTTP error! Status: ${response.status}`); |
| } |
| |
| return true; |
| } catch (error) { |
| console.error("Error cancelling conversation:", error); |
| return false; |
| } |
| } |
| |
| /** |
| * Cancel a specific tool call |
| */ |
| public async cancelToolUse(toolCallId: string): Promise<boolean> { |
| try { |
| const response = await fetch("cancel", { |
| method: "POST", |
| headers: { |
| "Content-Type": "application/json", |
| }, |
| body: JSON.stringify({ |
| reason: "User cancelled tool use", |
| tool_call_id: toolCallId, |
| }), |
| }); |
| |
| if (!response.ok) { |
| throw new Error(`HTTP error! Status: ${response.status}`); |
| } |
| |
| return true; |
| } catch (error) { |
| console.error("Error cancelling tool use:", error); |
| return false; |
| } |
| } |
| |
| /** |
| * Download the conversation data |
| */ |
| public downloadConversation(): void { |
| window.location.href = "download"; |
| } |
| |
| /** |
| * Get a suggested reprompt |
| */ |
| public async getSuggestedReprompt(): Promise<string | null> { |
| try { |
| const response = await fetch("suggest-reprompt"); |
| if (!response.ok) { |
| throw new Error(`HTTP error! Status: ${response.status}`); |
| } |
| const data = await response.json(); |
| return data.prompt; |
| } catch (error) { |
| console.error("Error getting suggested reprompt:", error); |
| return null; |
| } |
| } |
| |
| /** |
| * Get description for a commit |
| */ |
| public async getCommitDescription(revision: string): Promise<string | null> { |
| try { |
| const response = await fetch( |
| `commit-description?revision=${encodeURIComponent(revision)}`, |
| ); |
| if (!response.ok) { |
| throw new Error(`HTTP error! Status: ${response.status}`); |
| } |
| const data = await response.json(); |
| return data.description; |
| } catch (error) { |
| console.error("Error getting commit description:", error); |
| return null; |
| } |
| } |
| } |