Implement Server-Sent Events (SSE) for Real-time Agent Communication
- Add server-side SSE endpoint (/stream?from=N) for streaming state updates and messages
- Replace polling with SSE in frontend for real-time updates with significant performance improvements
- Implement efficient connection handling with backoff strategy for reconnections
- Add visual network status indicator in UI to show connection state
- Use non-blocking goroutine with channel pattern to handle SSE message delivery
- Ensure proper message sequencing and state synchronization between client and server
- Fix test suite to accommodate the new streaming architecture
- Update mocks to use conversation.Budget instead of ant.Budget
Co-Authored-By: sketch <hello@sketch.dev>
diff --git a/loop/server/loophttp.go b/loop/server/loophttp.go
index f7a3979..56fbdec 100644
--- a/loop/server/loophttp.go
+++ b/loop/server/loophttp.go
@@ -120,6 +120,7 @@
return nil, fmt.Errorf("failed to build web bundle, did you run 'go generate sketch.dev/loop/...'?: %w", err)
}
+ s.mux.HandleFunc("/stream", s.handleSSEStream)
s.mux.HandleFunc("/diff", func(w http.ResponseWriter, r *http.Request) {
// Check if a specific commit hash was requested
commit := r.URL.Query().Get("commit")
@@ -367,36 +368,10 @@
}
}
- serverMessageCount = agent.MessageCount()
- totalUsage := agent.TotalUsage()
-
w.Header().Set("Content-Type", "application/json")
- state := State{
- MessageCount: serverMessageCount,
- TotalUsage: &totalUsage,
- Hostname: s.hostname,
- WorkingDir: getWorkingDir(),
- InitialCommit: agent.InitialCommit(),
- Title: agent.Title(),
- BranchName: agent.BranchName(),
- OS: agent.OS(),
- OutsideHostname: agent.OutsideHostname(),
- InsideHostname: s.hostname,
- OutsideOS: agent.OutsideOS(),
- InsideOS: agent.OS(),
- OutsideWorkingDir: agent.OutsideWorkingDir(),
- InsideWorkingDir: getWorkingDir(),
- GitOrigin: agent.GitOrigin(),
- OutstandingLLMCalls: agent.OutstandingLLMCallCount(),
- OutstandingToolCalls: agent.OutstandingToolCalls(),
- SessionID: agent.SessionID(),
- SSHAvailable: s.sshAvailable,
- SSHError: s.sshError,
- InContainer: agent.IsInContainer(),
- FirstMessageIndex: agent.FirstMessageIndex(),
- AgentState: agent.CurrentStateName(),
- }
+ // Use the shared getState function
+ state := s.getState()
// Create a JSON encoder with indentation for pretty-printing
encoder := json.NewEncoder(w)
@@ -912,3 +887,162 @@
return true
}
+
+// /stream?from=N endpoint for Server-Sent Events
+func (s *Server) handleSSEStream(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/event-stream")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("Connection", "keep-alive")
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+
+ // Extract the 'from' parameter
+ fromParam := r.URL.Query().Get("from")
+ var fromIndex int
+ var err error
+ if fromParam != "" {
+ fromIndex, err = strconv.Atoi(fromParam)
+ if err != nil {
+ http.Error(w, "Invalid 'from' parameter", http.StatusBadRequest)
+ return
+ }
+ }
+
+ // Ensure 'from' is valid
+ currentCount := s.agent.MessageCount()
+ if fromIndex < 0 {
+ fromIndex = 0
+ } else if fromIndex > currentCount {
+ fromIndex = currentCount
+ }
+
+ // Send the current state immediately
+ state := s.getState()
+
+ // Create JSON encoder
+ encoder := json.NewEncoder(w)
+
+ // Send state as an event
+ fmt.Fprintf(w, "event: state\n")
+ fmt.Fprintf(w, "data: ")
+ encoder.Encode(state)
+ fmt.Fprintf(w, "\n\n")
+
+ if f, ok := w.(http.Flusher); ok {
+ f.Flush()
+ }
+
+ // Create a context for the SSE stream
+ ctx := r.Context()
+
+ // Create an iterator to receive new messages as they arrive
+ iterator := s.agent.NewIterator(ctx, fromIndex) // Start from the requested index
+ defer iterator.Close()
+
+ // Setup heartbeat timer
+ heartbeatTicker := time.NewTicker(45 * time.Second)
+ defer heartbeatTicker.Stop()
+
+ // Create a channel for messages
+ messageChan := make(chan *loop.AgentMessage, 10)
+
+ // Start a goroutine to read messages without blocking the heartbeat
+ go func() {
+ defer close(messageChan)
+ for {
+ // This can block, but it's in its own goroutine
+ newMessage := iterator.Next()
+ if newMessage == nil {
+ // No message available (likely due to context cancellation)
+ slog.InfoContext(ctx, "No more messages available, ending message stream")
+ return
+ }
+
+ select {
+ case messageChan <- newMessage:
+ // Message sent to channel
+ case <-ctx.Done():
+ // Context cancelled
+ return
+ }
+ }
+ }()
+
+ // Stay connected and stream real-time updates
+ for {
+ select {
+ case <-heartbeatTicker.C:
+ // Send heartbeat event
+ fmt.Fprintf(w, "event: heartbeat\n")
+ fmt.Fprintf(w, "data: %d\n\n", time.Now().Unix())
+
+ // Flush to send the heartbeat immediately
+ if f, ok := w.(http.Flusher); ok {
+ f.Flush()
+ }
+
+ case <-ctx.Done():
+ // Client disconnected
+ slog.InfoContext(ctx, "Client disconnected from SSE stream")
+ return
+
+ case newMessage, ok := <-messageChan:
+ if !ok {
+ // Channel closed
+ slog.InfoContext(ctx, "Message channel closed, ending SSE stream")
+ return
+ }
+
+ // Send the new message as an event
+ fmt.Fprintf(w, "event: message\n")
+ fmt.Fprintf(w, "data: ")
+ encoder.Encode(newMessage)
+ fmt.Fprintf(w, "\n\n")
+
+ // Get updated state
+ state = s.getState()
+
+ // Send updated state after the message
+ fmt.Fprintf(w, "event: state\n")
+ fmt.Fprintf(w, "data: ")
+ encoder.Encode(state)
+ fmt.Fprintf(w, "\n\n")
+
+ // Flush to send the message and state immediately
+ if f, ok := w.(http.Flusher); ok {
+ f.Flush()
+ }
+ }
+ }
+}
+
+// Helper function to get the current state
+func (s *Server) getState() State {
+ serverMessageCount := s.agent.MessageCount()
+ totalUsage := s.agent.TotalUsage()
+
+ return State{
+ MessageCount: serverMessageCount,
+ TotalUsage: &totalUsage,
+ Hostname: s.hostname,
+ WorkingDir: getWorkingDir(),
+ InitialCommit: s.agent.InitialCommit(),
+ Title: s.agent.Title(),
+ BranchName: s.agent.BranchName(),
+ OS: s.agent.OS(),
+ OutsideHostname: s.agent.OutsideHostname(),
+ InsideHostname: s.hostname,
+ OutsideOS: s.agent.OutsideOS(),
+ InsideOS: s.agent.OS(),
+ OutsideWorkingDir: s.agent.OutsideWorkingDir(),
+ InsideWorkingDir: getWorkingDir(),
+ GitOrigin: s.agent.GitOrigin(),
+ OutstandingLLMCalls: s.agent.OutstandingLLMCallCount(),
+ OutstandingToolCalls: s.agent.OutstandingToolCalls(),
+ SessionID: s.agent.SessionID(),
+ SSHAvailable: s.sshAvailable,
+ SSHError: s.sshError,
+ InContainer: s.agent.IsInContainer(),
+ FirstMessageIndex: s.agent.FirstMessageIndex(),
+ AgentState: s.agent.CurrentStateName(),
+ }
+}