blob: 2130c21072154f1f78349a4add3327539c6910dd [file] [log] [blame]
Earl Lee2e463fb2025-04-17 11:22:22 -07001import { TimelineMessage } from "./types";
2import { formatNumber } from "./utils";
3
4/**
5 * Event types for data manager
6 */
7export type DataManagerEventType = 'dataChanged' | 'connectionStatusChanged';
8
9/**
10 * Connection status types
11 */
12export type ConnectionStatus = 'connected' | 'disconnected' | 'disabled';
13
14/**
15 * State interface
16 */
17export interface TimelineState {
18 hostname?: string;
19 working_dir?: string;
20 initial_commit?: string;
21 message_count?: number;
22 title?: string;
23 total_usage?: {
24 input_tokens: number;
25 output_tokens: number;
26 cache_read_input_tokens: number;
27 cache_creation_input_tokens: number;
28 total_cost_usd: number;
29 };
30}
31
32/**
33 * DataManager - Class to manage timeline data, fetching, and polling
34 */
35export class DataManager {
36 // State variables
37 private lastMessageCount: number = 0;
38 private nextFetchIndex: number = 0;
39 private currentFetchStartIndex: number = 0;
40 private currentPollController: AbortController | null = null;
41 private isFetchingMessages: boolean = false;
42 private isPollingEnabled: boolean = true;
43 private isFirstLoad: boolean = true;
44 private connectionStatus: ConnectionStatus = "disabled";
45 private messages: TimelineMessage[] = [];
46 private timelineState: TimelineState | null = null;
47
48 // Event listeners
49 private eventListeners: Map<DataManagerEventType, Array<(...args: any[]) => void>> = new Map();
50
51 constructor() {
52 // Initialize empty arrays for each event type
53 this.eventListeners.set('dataChanged', []);
54 this.eventListeners.set('connectionStatusChanged', []);
55 }
56
57 /**
58 * Initialize the data manager and fetch initial data
59 */
60 public async initialize(): Promise<void> {
61 try {
62 // Initial data fetch
63 await this.fetchData();
64 // Start polling for updates only if initial fetch succeeds
65 this.startPolling();
66 } catch (error) {
67 console.error("Initial data fetch failed, will retry via polling", error);
68 // Still start polling to recover
69 this.startPolling();
70 }
71 }
72
73 /**
74 * Get all messages
75 */
76 public getMessages(): TimelineMessage[] {
77 return this.messages;
78 }
79
80 /**
81 * Get the current state
82 */
83 public getState(): TimelineState | null {
84 return this.timelineState;
85 }
86
87 /**
88 * Get the connection status
89 */
90 public getConnectionStatus(): ConnectionStatus {
91 return this.connectionStatus;
92 }
93
94 /**
95 * Get the isFirstLoad flag
96 */
97 public getIsFirstLoad(): boolean {
98 return this.isFirstLoad;
99 }
100
101 /**
102 * Get the currentFetchStartIndex
103 */
104 public getCurrentFetchStartIndex(): number {
105 return this.currentFetchStartIndex;
106 }
107
108 /**
109 * Add an event listener
110 */
111 public addEventListener(event: DataManagerEventType, callback: (...args: any[]) => void): void {
112 const listeners = this.eventListeners.get(event) || [];
113 listeners.push(callback);
114 this.eventListeners.set(event, listeners);
115 }
116
117 /**
118 * Remove an event listener
119 */
120 public removeEventListener(event: DataManagerEventType, callback: (...args: any[]) => void): void {
121 const listeners = this.eventListeners.get(event) || [];
122 const index = listeners.indexOf(callback);
123 if (index !== -1) {
124 listeners.splice(index, 1);
125 this.eventListeners.set(event, listeners);
126 }
127 }
128
129 /**
130 * Emit an event
131 */
132 private emitEvent(event: DataManagerEventType, ...args: any[]): void {
133 const listeners = this.eventListeners.get(event) || [];
134 listeners.forEach(callback => callback(...args));
135 }
136
137 /**
138 * Set polling enabled/disabled state
139 */
140 public setPollingEnabled(enabled: boolean): void {
141 this.isPollingEnabled = enabled;
142
143 if (enabled) {
144 this.startPolling();
145 } else {
146 this.stopPolling();
147 }
148 }
149
150 /**
151 * Start polling for updates
152 */
153 public startPolling(): void {
154 this.stopPolling(); // Stop any existing polling
155
156 // Start long polling
157 this.longPoll();
158 }
159
160 /**
161 * Stop polling for updates
162 */
163 public stopPolling(): void {
164 // Abort any ongoing long poll request
165 if (this.currentPollController) {
166 this.currentPollController.abort();
167 this.currentPollController = null;
168 }
169
170 // If polling is disabled by user, set connection status to disabled
171 if (!this.isPollingEnabled) {
172 this.updateConnectionStatus("disabled");
173 }
174 }
175
176 /**
177 * Update the connection status
178 */
179 private updateConnectionStatus(status: ConnectionStatus): void {
180 if (this.connectionStatus !== status) {
181 this.connectionStatus = status;
182 this.emitEvent('connectionStatusChanged', status);
183 }
184 }
185
186 /**
187 * Long poll for updates
188 */
189 private async longPoll(): Promise<void> {
190 // Abort any existing poll request
191 if (this.currentPollController) {
192 this.currentPollController.abort();
193 this.currentPollController = null;
194 }
195
196 // If polling is disabled, don't start a new poll
197 if (!this.isPollingEnabled) {
198 return;
199 }
200
201 let timeoutId: number | undefined;
202
203 try {
204 // Create a new abort controller for this request
205 this.currentPollController = new AbortController();
206 const signal = this.currentPollController.signal;
207
208 // Get the URL with the current message count
209 const pollUrl = `state?poll=true&seen=${this.lastMessageCount}`;
210
211 // Make the long poll request
212 // Use explicit timeout to handle stalled connections (120s)
213 const controller = new AbortController();
214 timeoutId = window.setTimeout(() => controller.abort(), 120000);
215
216 interface CustomFetchOptions extends RequestInit {
217 [Symbol.toStringTag]?: unknown;
218 }
219
220 const fetchOptions: CustomFetchOptions = {
221 signal: controller.signal,
222 // Use the original signal to allow manual cancellation too
223 get [Symbol.toStringTag]() {
224 if (signal.aborted) controller.abort();
225 return "";
226 },
227 };
228
229 try {
230 const response = await fetch(pollUrl, fetchOptions);
231 // Clear the timeout since we got a response
232 clearTimeout(timeoutId);
233
234 // Parse the JSON response
235 const _data = await response.json();
236
237 // If we got here, data has changed, so fetch the latest data
238 await this.fetchData();
239
240 // Start a new long poll (if polling is still enabled)
241 if (this.isPollingEnabled) {
242 this.longPoll();
243 }
244 } catch (error) {
245 // Handle fetch errors inside the inner try block
246 clearTimeout(timeoutId);
247 throw error; // Re-throw to be caught by the outer catch block
248 }
249 } catch (error: unknown) {
250 // Clean up timeout if we're handling an error
251 if (timeoutId) clearTimeout(timeoutId);
252
253 // Don't log or treat manual cancellations as errors
254 const isErrorWithName = (
255 err: unknown,
256 ): err is { name: string; message?: string } =>
257 typeof err === "object" && err !== null && "name" in err;
258
259 if (
260 isErrorWithName(error) &&
261 error.name === "AbortError" &&
262 this.currentPollController?.signal.aborted
263 ) {
264 console.log("Polling cancelled by user");
265 return;
266 }
267
268 // Handle different types of errors with specific messages
269 let errorMessage = "Not connected";
270
271 if (isErrorWithName(error)) {
272 if (error.name === "AbortError") {
273 // This was our timeout abort
274 errorMessage = "Connection timeout - not connected";
275 console.error("Long polling timeout");
276 } else if (error.name === "SyntaxError") {
277 // JSON parsing error
278 errorMessage = "Invalid response from server - not connected";
279 console.error("JSON parsing error:", error);
280 } else if (
281 error.name === "TypeError" &&
282 error.message?.includes("NetworkError")
283 ) {
284 // Network connectivity issues
285 errorMessage = "Network connection lost - not connected";
286 console.error("Network error during polling:", error);
287 } else {
288 // Generic error
289 console.error("Long polling error:", error);
290 }
291 }
292
293 // Disable polling on error
294 this.isPollingEnabled = false;
295
296 // Update connection status to disconnected
297 this.updateConnectionStatus("disconnected");
298
299 // Emit an event that we're disconnected with the error message
300 this.emitEvent('connectionStatusChanged', this.connectionStatus, errorMessage);
301 }
302 }
303
304 /**
305 * Fetch timeline data
306 */
307 public async fetchData(): Promise<void> {
308 // If we're already fetching messages, don't start another fetch
309 if (this.isFetchingMessages) {
310 console.log("Already fetching messages, skipping request");
311 return;
312 }
313
314 this.isFetchingMessages = true;
315
316 try {
317 // Fetch state first
318 const stateResponse = await fetch("state");
319 const state = await stateResponse.json();
320 this.timelineState = state;
321
322 // Check if new messages are available
323 if (
324 state.message_count === this.lastMessageCount &&
325 this.lastMessageCount > 0
326 ) {
327 // No new messages, early return
328 this.isFetchingMessages = false;
329 this.emitEvent('dataChanged', { state, newMessages: [] });
330 return;
331 }
332
333 // Fetch messages with a start parameter
334 this.currentFetchStartIndex = this.nextFetchIndex;
335 const messagesResponse = await fetch(
336 `messages?start=${this.nextFetchIndex}`,
337 );
338 const newMessages = await messagesResponse.json() || [];
339
340 // Store messages in our array
341 if (this.nextFetchIndex === 0) {
342 // If this is the first fetch, replace the entire array
343 this.messages = [...newMessages];
344 } else {
345 // Otherwise append the new messages
346 this.messages = [...this.messages, ...newMessages];
347 }
348
349 // Update connection status to connected
350 this.updateConnectionStatus("connected");
351
352 // Update the last message index for next fetch
353 if (newMessages && newMessages.length > 0) {
354 this.nextFetchIndex += newMessages.length;
355 }
356
357 // Update the message count
358 this.lastMessageCount = state?.message_count ?? 0;
359
360 // Mark that we've completed first load
361 if (this.isFirstLoad) {
362 this.isFirstLoad = false;
363 }
364
365 // Emit an event that data has changed
366 this.emitEvent('dataChanged', { state, newMessages, isFirstFetch: this.nextFetchIndex === newMessages.length });
367 } catch (error) {
368 console.error("Error fetching data:", error);
369
370 // Update connection status to disconnected
371 this.updateConnectionStatus("disconnected");
372
373 // Emit an event that we're disconnected
374 this.emitEvent('connectionStatusChanged', this.connectionStatus, "Not connected");
375 } finally {
376 this.isFetchingMessages = false;
377 }
378 }
379}