loop: Add StateTransitionIterator and stream state updates
Implement CodingAgent.NewStateTransitionIterator to observe state transitions.
Update the /stream endpoint to send state updates when transitions occur.
Co-Authored-By: sketch <hello@sketch.dev>
Change-ID: s4b4f9a0689c94c54k
Co-Authored-By: sketch <hello@sketch.dev>
Change-ID: s4b4f9a0689c94c54k
diff --git a/loop/server/loophttp.go b/loop/server/loophttp.go
index 5047782..b186760 100644
--- a/loop/server/loophttp.go
+++ b/loop/server/loophttp.go
@@ -979,6 +979,10 @@
iterator := s.agent.NewIterator(ctx, fromIndex) // Start from the requested index
defer iterator.Close()
+ // Create an iterator to receive state transitions
+ stateIterator := s.agent.NewStateTransitionIterator(ctx)
+ defer stateIterator.Close()
+
// Setup heartbeat timer
heartbeatTicker := time.NewTicker(45 * time.Second)
defer heartbeatTicker.Stop()
@@ -986,6 +990,9 @@
// Create a channel for messages
messageChan := make(chan *loop.AgentMessage, 10)
+ // Create a channel for state transitions
+ stateChan := make(chan *loop.StateTransition, 10)
+
// Start a goroutine to read messages without blocking the heartbeat
go func() {
defer close(messageChan)
@@ -1008,6 +1015,28 @@
}
}()
+ // Start a goroutine to read state transitions
+ go func() {
+ defer close(stateChan)
+ for {
+ // This can block, but it's in its own goroutine
+ newTransition := stateIterator.Next()
+ if newTransition == nil {
+ // No transition available (likely due to context cancellation)
+ slog.InfoContext(ctx, "No more state transitions available, ending state stream")
+ return
+ }
+
+ select {
+ case stateChan <- newTransition:
+ // Transition sent to channel
+ case <-ctx.Done():
+ // Context cancelled
+ return
+ }
+ }
+ }()
+
// Stay connected and stream real-time updates
for {
select {
@@ -1026,6 +1055,27 @@
slog.InfoContext(ctx, "Client disconnected from SSE stream")
return
+ case _, ok := <-stateChan:
+ if !ok {
+ // Channel closed
+ slog.InfoContext(ctx, "State transition channel closed, ending SSE stream")
+ return
+ }
+
+ // Get updated state
+ state = s.getState()
+
+ // Send updated state after the state transition
+ fmt.Fprintf(w, "event: state\n")
+ fmt.Fprintf(w, "data: ")
+ encoder.Encode(state)
+ fmt.Fprintf(w, "\n\n")
+
+ // Flush to send the state immediately
+ if f, ok := w.(http.Flusher); ok {
+ f.Flush()
+ }
+
case newMessage, ok := <-messageChan:
if !ok {
// Channel closed
diff --git a/loop/server/loophttp_test.go b/loop/server/loophttp_test.go
index 22ad237..cb50642 100644
--- a/loop/server/loophttp_test.go
+++ b/loop/server/loophttp_test.go
@@ -18,14 +18,15 @@
// mockAgent is a mock implementation of loop.CodingAgent for testing
type mockAgent struct {
- mu sync.RWMutex
- messages []loop.AgentMessage
- messageCount int
- currentState string
- subscribers []chan *loop.AgentMessage
- initialCommit string
- title string
- branchName string
+ mu sync.RWMutex
+ messages []loop.AgentMessage
+ messageCount int
+ currentState string
+ subscribers []chan *loop.AgentMessage
+ stateTransitionListeners []chan loop.StateTransition
+ initialCommit string
+ title string
+ branchName string
}
func (m *mockAgent) NewIterator(ctx context.Context, nextMessageIdx int) loop.MessageIterator {
@@ -115,12 +116,76 @@
}
}
+func (m *mockAgent) NewStateTransitionIterator(ctx context.Context) loop.StateTransitionIterator {
+ m.mu.Lock()
+ ch := make(chan loop.StateTransition, 10)
+ m.stateTransitionListeners = append(m.stateTransitionListeners, ch)
+ m.mu.Unlock()
+
+ return &mockStateTransitionIterator{
+ agent: m,
+ ctx: ctx,
+ ch: ch,
+ }
+}
+
+type mockStateTransitionIterator struct {
+ agent *mockAgent
+ ctx context.Context
+ ch chan loop.StateTransition
+}
+
+func (m *mockStateTransitionIterator) Next() *loop.StateTransition {
+ select {
+ case <-m.ctx.Done():
+ return nil
+ case transition, ok := <-m.ch:
+ if !ok {
+ return nil
+ }
+ transitionCopy := transition
+ return &transitionCopy
+ }
+}
+
+func (m *mockStateTransitionIterator) Close() {
+ m.agent.mu.Lock()
+ for i, ch := range m.agent.stateTransitionListeners {
+ if ch == m.ch {
+ m.agent.stateTransitionListeners = slices.Delete(m.agent.stateTransitionListeners, i, i+1)
+ break
+ }
+ }
+ m.agent.mu.Unlock()
+ close(m.ch)
+}
+
func (m *mockAgent) CurrentStateName() string {
m.mu.RLock()
defer m.mu.RUnlock()
return m.currentState
}
+func (m *mockAgent) TriggerStateTransition(from, to loop.State, event loop.TransitionEvent) {
+ m.mu.Lock()
+ m.currentState = to.String()
+ transition := loop.StateTransition{
+ From: from,
+ To: to,
+ Event: event,
+ }
+
+ // Create a copy of listeners to avoid holding the lock while sending
+ listeners := make([]chan loop.StateTransition, len(m.stateTransitionListeners))
+ copy(listeners, m.stateTransitionListeners)
+ m.mu.Unlock()
+
+ // Notify listeners
+ for _, ch := range listeners {
+ ch <- transition
+ }
+}
+
func (m *mockAgent) InitialCommit() string {
m.mu.RLock()
defer m.mu.RUnlock()
@@ -171,13 +236,14 @@
func TestSSEStream(t *testing.T) {
// Create a mock agent with initial messages
mockAgent := &mockAgent{
- messages: []loop.AgentMessage{},
- messageCount: 0,
- currentState: "Ready",
- subscribers: []chan *loop.AgentMessage{},
- initialCommit: "abcd1234",
- title: "Test Title",
- branchName: "sketch/test-branch",
+ messages: []loop.AgentMessage{},
+ messageCount: 0,
+ currentState: "Ready",
+ subscribers: []chan *loop.AgentMessage{},
+ stateTransitionListeners: []chan loop.StateTransition{},
+ initialCommit: "abcd1234",
+ title: "Test Title",
+ branchName: "sketch/test-branch",
}
// Add the initial messages before creating the server
@@ -262,6 +328,13 @@
ToolName: "test_tool",
})
+ // Trigger a state transition to test state updates
+ time.Sleep(200 * time.Millisecond)
+ mockAgent.TriggerStateTransition(loop.StateReady, loop.StateSendingToLLM, loop.TransitionEvent{
+ Description: "Agent started thinking",
+ Data: "start_thinking",
+ })
+
// Let it process for longer
time.Sleep(1000 * time.Millisecond)
cancel() // Cancel to end the test