loop: Add StateTransitionIterator and stream state updates
Implement CodingAgent.NewStateTransitionIterator to observe state transitions.
Update the /stream endpoint to send state updates when transitions occur.
Co-Authored-By: sketch <hello@sketch.dev>
Change-ID: s4b4f9a0689c94c54k
Co-Authored-By: sketch <hello@sketch.dev>
Change-ID: s4b4f9a0689c94c54k
diff --git a/loop/server/loophttp_test.go b/loop/server/loophttp_test.go
index 22ad237..cb50642 100644
--- a/loop/server/loophttp_test.go
+++ b/loop/server/loophttp_test.go
@@ -18,14 +18,15 @@
// mockAgent is a mock implementation of loop.CodingAgent for testing
type mockAgent struct {
- mu sync.RWMutex
- messages []loop.AgentMessage
- messageCount int
- currentState string
- subscribers []chan *loop.AgentMessage
- initialCommit string
- title string
- branchName string
+ mu sync.RWMutex
+ messages []loop.AgentMessage
+ messageCount int
+ currentState string
+ subscribers []chan *loop.AgentMessage
+ stateTransitionListeners []chan loop.StateTransition
+ initialCommit string
+ title string
+ branchName string
}
func (m *mockAgent) NewIterator(ctx context.Context, nextMessageIdx int) loop.MessageIterator {
@@ -115,12 +116,76 @@
}
}
+func (m *mockAgent) NewStateTransitionIterator(ctx context.Context) loop.StateTransitionIterator {
+ m.mu.Lock()
+ ch := make(chan loop.StateTransition, 10)
+ m.stateTransitionListeners = append(m.stateTransitionListeners, ch)
+ m.mu.Unlock()
+
+ return &mockStateTransitionIterator{
+ agent: m,
+ ctx: ctx,
+ ch: ch,
+ }
+}
+
+type mockStateTransitionIterator struct {
+ agent *mockAgent
+ ctx context.Context
+ ch chan loop.StateTransition
+}
+
+func (m *mockStateTransitionIterator) Next() *loop.StateTransition {
+ select {
+ case <-m.ctx.Done():
+ return nil
+ case transition, ok := <-m.ch:
+ if !ok {
+ return nil
+ }
+ transitionCopy := transition
+ return &transitionCopy
+ }
+}
+
+func (m *mockStateTransitionIterator) Close() {
+ m.agent.mu.Lock()
+ for i, ch := range m.agent.stateTransitionListeners {
+ if ch == m.ch {
+ m.agent.stateTransitionListeners = slices.Delete(m.agent.stateTransitionListeners, i, i+1)
+ break
+ }
+ }
+ m.agent.mu.Unlock()
+ close(m.ch)
+}
+
func (m *mockAgent) CurrentStateName() string {
m.mu.RLock()
defer m.mu.RUnlock()
return m.currentState
}
+func (m *mockAgent) TriggerStateTransition(from, to loop.State, event loop.TransitionEvent) {
+ m.mu.Lock()
+ m.currentState = to.String()
+ transition := loop.StateTransition{
+ From: from,
+ To: to,
+ Event: event,
+ }
+
+ // Create a copy of listeners to avoid holding the lock while sending
+ listeners := make([]chan loop.StateTransition, len(m.stateTransitionListeners))
+ copy(listeners, m.stateTransitionListeners)
+ m.mu.Unlock()
+
+ // Notify listeners
+ for _, ch := range listeners {
+ ch <- transition
+ }
+}
+
func (m *mockAgent) InitialCommit() string {
m.mu.RLock()
defer m.mu.RUnlock()
@@ -171,13 +236,14 @@
func TestSSEStream(t *testing.T) {
// Create a mock agent with initial messages
mockAgent := &mockAgent{
- messages: []loop.AgentMessage{},
- messageCount: 0,
- currentState: "Ready",
- subscribers: []chan *loop.AgentMessage{},
- initialCommit: "abcd1234",
- title: "Test Title",
- branchName: "sketch/test-branch",
+ messages: []loop.AgentMessage{},
+ messageCount: 0,
+ currentState: "Ready",
+ subscribers: []chan *loop.AgentMessage{},
+ stateTransitionListeners: []chan loop.StateTransition{},
+ initialCommit: "abcd1234",
+ title: "Test Title",
+ branchName: "sketch/test-branch",
}
// Add the initial messages before creating the server
@@ -262,6 +328,13 @@
ToolName: "test_tool",
})
+ // Trigger a state transition to test state updates
+ time.Sleep(200 * time.Millisecond)
+ mockAgent.TriggerStateTransition(loop.StateReady, loop.StateSendingToLLM, loop.TransitionEvent{
+ Description: "Agent started thinking",
+ Data: "start_thinking",
+ })
+
// Let it process for longer
time.Sleep(1000 * time.Millisecond)
cancel() // Cancel to end the test