Implement Server-Sent Events (SSE) for Real-time Agent Communication

- Add server-side SSE endpoint (/stream?from=N) for streaming state updates and messages
- Replace polling with SSE in frontend for real-time updates with significant performance improvements
- Implement efficient connection handling with backoff strategy for reconnections
- Add visual network status indicator in UI to show connection state
- Use non-blocking goroutine with channel pattern to handle SSE message delivery
- Ensure proper message sequencing and state synchronization between client and server
- Fix test suite to accommodate the new streaming architecture
- Update mocks to use conversation.Budget instead of ant.Budget

Co-Authored-By: sketch <hello@sketch.dev>
diff --git a/loop/server/loophttp.go b/loop/server/loophttp.go
index f7a3979..56fbdec 100644
--- a/loop/server/loophttp.go
+++ b/loop/server/loophttp.go
@@ -120,6 +120,7 @@
 		return nil, fmt.Errorf("failed to build web bundle, did you run 'go generate sketch.dev/loop/...'?: %w", err)
 	}
 
+	s.mux.HandleFunc("/stream", s.handleSSEStream)
 	s.mux.HandleFunc("/diff", func(w http.ResponseWriter, r *http.Request) {
 		// Check if a specific commit hash was requested
 		commit := r.URL.Query().Get("commit")
@@ -367,36 +368,10 @@
 			}
 		}
 
-		serverMessageCount = agent.MessageCount()
-		totalUsage := agent.TotalUsage()
-
 		w.Header().Set("Content-Type", "application/json")
 
-		state := State{
-			MessageCount:         serverMessageCount,
-			TotalUsage:           &totalUsage,
-			Hostname:             s.hostname,
-			WorkingDir:           getWorkingDir(),
-			InitialCommit:        agent.InitialCommit(),
-			Title:                agent.Title(),
-			BranchName:           agent.BranchName(),
-			OS:                   agent.OS(),
-			OutsideHostname:      agent.OutsideHostname(),
-			InsideHostname:       s.hostname,
-			OutsideOS:            agent.OutsideOS(),
-			InsideOS:             agent.OS(),
-			OutsideWorkingDir:    agent.OutsideWorkingDir(),
-			InsideWorkingDir:     getWorkingDir(),
-			GitOrigin:            agent.GitOrigin(),
-			OutstandingLLMCalls:  agent.OutstandingLLMCallCount(),
-			OutstandingToolCalls: agent.OutstandingToolCalls(),
-			SessionID:            agent.SessionID(),
-			SSHAvailable:         s.sshAvailable,
-			SSHError:             s.sshError,
-			InContainer:          agent.IsInContainer(),
-			FirstMessageIndex:    agent.FirstMessageIndex(),
-			AgentState:           agent.CurrentStateName(),
-		}
+		// Use the shared getState function
+		state := s.getState()
 
 		// Create a JSON encoder with indentation for pretty-printing
 		encoder := json.NewEncoder(w)
@@ -912,3 +887,162 @@
 
 	return true
 }
+
+// /stream?from=N endpoint for Server-Sent Events
+func (s *Server) handleSSEStream(w http.ResponseWriter, r *http.Request) {
+	w.Header().Set("Content-Type", "text/event-stream")
+	w.Header().Set("Cache-Control", "no-cache")
+	w.Header().Set("Connection", "keep-alive")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	// Extract the 'from' parameter
+	fromParam := r.URL.Query().Get("from")
+	var fromIndex int
+	var err error
+	if fromParam != "" {
+		fromIndex, err = strconv.Atoi(fromParam)
+		if err != nil {
+			http.Error(w, "Invalid 'from' parameter", http.StatusBadRequest)
+			return
+		}
+	}
+
+	// Ensure 'from' is valid
+	currentCount := s.agent.MessageCount()
+	if fromIndex < 0 {
+		fromIndex = 0
+	} else if fromIndex > currentCount {
+		fromIndex = currentCount
+	}
+
+	// Send the current state immediately
+	state := s.getState()
+
+	// Create JSON encoder
+	encoder := json.NewEncoder(w)
+
+	// Send state as an event
+	fmt.Fprintf(w, "event: state\n")
+	fmt.Fprintf(w, "data: ")
+	encoder.Encode(state)
+	fmt.Fprintf(w, "\n\n")
+
+	if f, ok := w.(http.Flusher); ok {
+		f.Flush()
+	}
+
+	// Create a context for the SSE stream
+	ctx := r.Context()
+
+	// Create an iterator to receive new messages as they arrive
+	iterator := s.agent.NewIterator(ctx, fromIndex) // Start from the requested index
+	defer iterator.Close()
+
+	// Setup heartbeat timer
+	heartbeatTicker := time.NewTicker(45 * time.Second)
+	defer heartbeatTicker.Stop()
+
+	// Create a channel for messages
+	messageChan := make(chan *loop.AgentMessage, 10)
+
+	// Start a goroutine to read messages without blocking the heartbeat
+	go func() {
+		defer close(messageChan)
+		for {
+			// This can block, but it's in its own goroutine
+			newMessage := iterator.Next()
+			if newMessage == nil {
+				// No message available (likely due to context cancellation)
+				slog.InfoContext(ctx, "No more messages available, ending message stream")
+				return
+			}
+
+			select {
+			case messageChan <- newMessage:
+				// Message sent to channel
+			case <-ctx.Done():
+				// Context cancelled
+				return
+			}
+		}
+	}()
+
+	// Stay connected and stream real-time updates
+	for {
+		select {
+		case <-heartbeatTicker.C:
+			// Send heartbeat event
+			fmt.Fprintf(w, "event: heartbeat\n")
+			fmt.Fprintf(w, "data: %d\n\n", time.Now().Unix())
+
+			// Flush to send the heartbeat immediately
+			if f, ok := w.(http.Flusher); ok {
+				f.Flush()
+			}
+
+		case <-ctx.Done():
+			// Client disconnected
+			slog.InfoContext(ctx, "Client disconnected from SSE stream")
+			return
+
+		case newMessage, ok := <-messageChan:
+			if !ok {
+				// Channel closed
+				slog.InfoContext(ctx, "Message channel closed, ending SSE stream")
+				return
+			}
+
+			// Send the new message as an event
+			fmt.Fprintf(w, "event: message\n")
+			fmt.Fprintf(w, "data: ")
+			encoder.Encode(newMessage)
+			fmt.Fprintf(w, "\n\n")
+
+			// Get updated state
+			state = s.getState()
+
+			// Send updated state after the message
+			fmt.Fprintf(w, "event: state\n")
+			fmt.Fprintf(w, "data: ")
+			encoder.Encode(state)
+			fmt.Fprintf(w, "\n\n")
+
+			// Flush to send the message and state immediately
+			if f, ok := w.(http.Flusher); ok {
+				f.Flush()
+			}
+		}
+	}
+}
+
+// Helper function to get the current state
+func (s *Server) getState() State {
+	serverMessageCount := s.agent.MessageCount()
+	totalUsage := s.agent.TotalUsage()
+
+	return State{
+		MessageCount:         serverMessageCount,
+		TotalUsage:           &totalUsage,
+		Hostname:             s.hostname,
+		WorkingDir:           getWorkingDir(),
+		InitialCommit:        s.agent.InitialCommit(),
+		Title:                s.agent.Title(),
+		BranchName:           s.agent.BranchName(),
+		OS:                   s.agent.OS(),
+		OutsideHostname:      s.agent.OutsideHostname(),
+		InsideHostname:       s.hostname,
+		OutsideOS:            s.agent.OutsideOS(),
+		InsideOS:             s.agent.OS(),
+		OutsideWorkingDir:    s.agent.OutsideWorkingDir(),
+		InsideWorkingDir:     getWorkingDir(),
+		GitOrigin:            s.agent.GitOrigin(),
+		OutstandingLLMCalls:  s.agent.OutstandingLLMCallCount(),
+		OutstandingToolCalls: s.agent.OutstandingToolCalls(),
+		SessionID:            s.agent.SessionID(),
+		SSHAvailable:         s.sshAvailable,
+		SSHError:             s.sshError,
+		InContainer:          s.agent.IsInContainer(),
+		FirstMessageIndex:    s.agent.FirstMessageIndex(),
+		AgentState:           s.agent.CurrentStateName(),
+	}
+}