loop: Add StateTransitionIterator and stream state updates
Implement CodingAgent.NewStateTransitionIterator to observe state transitions.
Update the /stream endpoint to send state updates when transitions occur.
Co-Authored-By: sketch <hello@sketch.dev>
Change-ID: s4b4f9a0689c94c54k
Co-Authored-By: sketch <hello@sketch.dev>
Change-ID: s4b4f9a0689c94c54k
diff --git a/loop/server/loophttp.go b/loop/server/loophttp.go
index 5047782..b186760 100644
--- a/loop/server/loophttp.go
+++ b/loop/server/loophttp.go
@@ -979,6 +979,10 @@
iterator := s.agent.NewIterator(ctx, fromIndex) // Start from the requested index
defer iterator.Close()
+ // Create an iterator to receive state transitions
+ stateIterator := s.agent.NewStateTransitionIterator(ctx)
+ defer stateIterator.Close()
+
// Setup heartbeat timer
heartbeatTicker := time.NewTicker(45 * time.Second)
defer heartbeatTicker.Stop()
@@ -986,6 +990,9 @@
// Create a channel for messages
messageChan := make(chan *loop.AgentMessage, 10)
+ // Create a channel for state transitions
+ stateChan := make(chan *loop.StateTransition, 10)
+
// Start a goroutine to read messages without blocking the heartbeat
go func() {
defer close(messageChan)
@@ -1008,6 +1015,28 @@
}
}()
+ // Start a goroutine to read state transitions
+ go func() {
+ defer close(stateChan)
+ for {
+ // This can block, but it's in its own goroutine
+ newTransition := stateIterator.Next()
+ if newTransition == nil {
+ // No transition available (likely due to context cancellation)
+ slog.InfoContext(ctx, "No more state transitions available, ending state stream")
+ return
+ }
+
+ select {
+ case stateChan <- newTransition:
+ // Transition sent to channel
+ case <-ctx.Done():
+ // Context cancelled
+ return
+ }
+ }
+ }()
+
// Stay connected and stream real-time updates
for {
select {
@@ -1026,6 +1055,27 @@
slog.InfoContext(ctx, "Client disconnected from SSE stream")
return
+ case _, ok := <-stateChan:
+ if !ok {
+ // Channel closed
+ slog.InfoContext(ctx, "State transition channel closed, ending SSE stream")
+ return
+ }
+
+ // Get updated state
+ state = s.getState()
+
+ // Send updated state after the state transition
+ fmt.Fprintf(w, "event: state\n")
+ fmt.Fprintf(w, "data: ")
+ encoder.Encode(state)
+ fmt.Fprintf(w, "\n\n")
+
+ // Flush to send the state immediately
+ if f, ok := w.(http.Flusher); ok {
+ f.Flush()
+ }
+
case newMessage, ok := <-messageChan:
if !ok {
// Channel closed