blob: 820d9282e722c7c389d39bfb1ce1f8ff87698052 [file] [log] [blame]
package loop
import (
"context"
"testing"
"time"
)
// TestIteratorBasic tests basic iterator functionality
func TestIteratorBasic(t *testing.T) {
// Create an agent with some predefined messages
agent := &Agent{
subscribers: []chan *AgentMessage{},
}
// Add some test messages to the history
agent.mu.Lock()
agent.history = []AgentMessage{
{Type: AgentMessageType, Content: "Message 1", Idx: 0},
{Type: AgentMessageType, Content: "Message 2", Idx: 1},
{Type: AgentMessageType, Content: "Message 3", Idx: 2},
}
agent.mu.Unlock()
// Create an iterator starting from the beginning
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
it := agent.NewIterator(ctx, 0)
defer it.Close()
// Read all messages and verify
for i := range 3 {
msg := it.Next()
if msg == nil {
t.Fatalf("Expected message %d but got nil", i)
}
expectedNum := i + 1
expectedContent := "Message " + string(rune('0')+rune(expectedNum))
if msg.Content != expectedContent {
t.Errorf("Expected message %d to be 'Message %d', got '%s'", i+1, i+1, msg.Content)
}
}
}
// TestIteratorStartFromMiddle tests starting an iterator from a specific index
func TestIteratorStartFromMiddle(t *testing.T) {
// Create an agent with some predefined messages
agent := &Agent{
subscribers: []chan *AgentMessage{},
}
// Add some test messages to the history
agent.mu.Lock()
agent.history = []AgentMessage{
{Type: AgentMessageType, Content: "Message 1", Idx: 0},
{Type: AgentMessageType, Content: "Message 2", Idx: 1},
{Type: AgentMessageType, Content: "Message 3", Idx: 2},
}
agent.mu.Unlock()
// Create an iterator starting from index 1
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
it := agent.NewIterator(ctx, 1)
defer it.Close()
// We should get Message 2 and 3
msg := it.Next()
if msg == nil || msg.Content != "Message 2" {
t.Errorf("Expected 'Message 2', got %v", msg)
}
msg = it.Next()
if msg == nil || msg.Content != "Message 3" {
t.Errorf("Expected 'Message 3', got %v", msg)
}
}
// TestIteratorWithNewMessages tests that the iterator properly waits for and receives new messages
func TestIteratorWithNewMessages(t *testing.T) {
// Create an agent
agent := &Agent{
subscribers: []chan *AgentMessage{},
}
// Create an iterator
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
it := agent.NewIterator(ctx, 0)
defer it.Close()
// Use channels to synchronize instead of sleeps
msg1Added := make(chan struct{})
msg2Ready := make(chan struct{})
// Add messages in another goroutine
go func() {
// Add first message immediately
agent.pushToOutbox(context.Background(), AgentMessage{Type: AgentMessageType, Content: "New message 1"})
// Signal that message 1 is added
close(msg1Added)
// Wait for signal that we're ready for message 2
<-msg2Ready
// Add second message
agent.pushToOutbox(context.Background(), AgentMessage{Type: AgentMessageType, Content: "New message 2"})
}()
// Read first message
msg := it.Next()
if msg == nil || msg.Content != "New message 1" {
t.Errorf("Expected 'New message 1', got %v", msg)
}
// Signal that we're ready for message 2
close(msg2Ready)
// Read second message
msg = it.Next()
if msg == nil || msg.Content != "New message 2" {
t.Errorf("Expected 'New message 2', got %v", msg)
}
}
// TestIteratorClose tests that closing an iterator removes it from the subscribers list
func TestIteratorClose(t *testing.T) {
// Create an agent
agent := &Agent{
subscribers: []chan *AgentMessage{},
}
// Create an iterator
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
it := agent.NewIterator(ctx, 0)
// Verify iterator was added to subscribers after it tries to get a message
// that doesn't exist yet
agent.mu.Lock()
agent.history = []AgentMessage{} // ensure history is empty
agent.mu.Unlock()
// Start a goroutine to call Next() which should subscribe
done := make(chan struct{})
go func() {
// This will block after subscribing
it.Next()
close(done)
}()
// Give a short time for the goroutine to run and subscribe
time.Sleep(10 * time.Millisecond)
// Check that we have a subscriber
agent.mu.Lock()
subscriberCount := len(agent.subscribers)
agent.mu.Unlock()
if subscriberCount != 1 {
t.Errorf("Expected 1 subscriber, got %d", subscriberCount)
}
// Close the iterator
it.Close()
// Add a message to trigger the goroutine to exit (in case it's still waiting)
agent.pushToOutbox(context.Background(), AgentMessage{Type: AgentMessageType, Content: "Test message"})
// Wait for the goroutine to finish
select {
case <-done:
// Good, it finished
case <-time.After(100 * time.Millisecond):
t.Error("Timed out waiting for iterator goroutine to finish")
}
// Verify the subscriber was removed
agent.mu.Lock()
subscriberCount = len(agent.subscribers)
agent.mu.Unlock()
if subscriberCount != 0 {
t.Errorf("Expected 0 subscribers after Close(), got %d", subscriberCount)
}
}
// TestIteratorContextCancel tests that an iterator stops properly when its context is cancelled
func TestIteratorContextCancel(t *testing.T) {
// Create an agent
agent := &Agent{
subscribers: []chan *AgentMessage{},
}
// Create a context that we can cancel
ctx, cancel := context.WithCancel(context.Background())
it := agent.NewIterator(ctx, 0)
defer it.Close()
// Start a goroutine to call Next() which will block
resultChan := make(chan *AgentMessage)
go func() {
resultChan <- it.Next()
}()
// Wait a minimal time, then cancel the context
time.Sleep(10 * time.Millisecond)
cancel()
// Verify we get nil from the iterator
select {
case result := <-resultChan:
if result != nil {
t.Errorf("Expected nil result after context cancel, got %v", result)
}
case <-time.After(100 * time.Millisecond):
t.Error("Timed out waiting for iterator to return after context cancel")
}
// Verify the subscriber was removed due to context cancellation
agent.mu.Lock()
subscriberCount := len(agent.subscribers)
agent.mu.Unlock()
if subscriberCount != 0 {
t.Errorf("Expected 0 subscribers after context cancel, got %d", subscriberCount)
}
}