blob: 5a12256a3214c26e8d517b8b6a3b6ae97d49a5fc [file] [log] [blame]
Josh Bleecher Snyder4f84ab72025-04-22 16:40:54 -07001package conversation
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "log/slog"
9 "maps"
10 "math/rand/v2"
11 "slices"
12 "strings"
13 "sync"
14 "time"
15
16 "github.com/oklog/ulid/v2"
17 "github.com/richardlehane/crock32"
18 "sketch.dev/llm"
19 "sketch.dev/skribe"
20)
21
22type Listener interface {
23 // TODO: Content is leaking an anthropic API; should we avoid it?
24 // TODO: Where should we include start/end time and usage?
25 OnToolCall(ctx context.Context, convo *Convo, toolCallID string, toolName string, toolInput json.RawMessage, content llm.Content)
26 OnToolResult(ctx context.Context, convo *Convo, toolCallID string, toolName string, toolInput json.RawMessage, content llm.Content, result *string, err error)
27 OnRequest(ctx context.Context, convo *Convo, requestID string, msg *llm.Message)
28 OnResponse(ctx context.Context, convo *Convo, requestID string, msg *llm.Response)
29}
30
31type NoopListener struct{}
32
33func (n *NoopListener) OnToolCall(ctx context.Context, convo *Convo, id string, toolName string, toolInput json.RawMessage, content llm.Content) {
34}
35
36func (n *NoopListener) OnToolResult(ctx context.Context, convo *Convo, id string, toolName string, toolInput json.RawMessage, content llm.Content, result *string, err error) {
37}
38
39func (n *NoopListener) OnResponse(ctx context.Context, convo *Convo, id string, msg *llm.Response) {
40}
41func (n *NoopListener) OnRequest(ctx context.Context, convo *Convo, id string, msg *llm.Message) {}
42
43var ErrDoNotRespond = errors.New("do not respond")
44
45// A Convo is a managed conversation with Claude.
46// It automatically manages the state of the conversation,
47// including appending messages send/received,
48// calling tools and sending their results,
49// tracking usage, etc.
50//
51// Exported fields must not be altered concurrently with calling any method on Convo.
52// Typical usage is to configure a Convo once before using it.
53type Convo struct {
54 // ID is a unique ID for the conversation
55 ID string
56 // Ctx is the context for the entire conversation.
57 Ctx context.Context
58 // Service is the LLM service to use.
59 Service llm.Service
60 // Tools are the tools available during the conversation.
61 Tools []*llm.Tool
62 // SystemPrompt is the system prompt for the conversation.
63 SystemPrompt string
64 // PromptCaching indicates whether to use Anthropic's prompt caching.
65 // See https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching#continuing-a-multi-turn-conversation
66 // for the documentation. At request send time, we set the cache_control field on the
67 // last message. We also cache the system prompt.
68 // Default: true.
69 PromptCaching bool
70 // ToolUseOnly indicates whether Claude may only use tools during this conversation.
71 // TODO: add more fine-grained control over tool use?
72 ToolUseOnly bool
73 // Parent is the parent conversation, if any.
74 // It is non-nil for "subagent" calls.
75 // It is set automatically when calling SubConvo,
76 // and usually should not be set manually.
77 Parent *Convo
78 // Budget is the budget for this conversation (and all sub-conversations).
79 // The Conversation DOES NOT automatically enforce the budget.
80 // It is up to the caller to call OverBudget() as appropriate.
81 Budget Budget
82
83 // messages tracks the messages so far in the conversation.
84 messages []llm.Message
85
86 // Listener receives messages being sent.
87 Listener Listener
88
89 muToolUseCancel *sync.Mutex
90 toolUseCancel map[string]context.CancelCauseFunc
91
92 // Protects usage. This is used for subconversations (that share part of CumulativeUsage) as well.
93 mu *sync.Mutex
94 // usage tracks usage for this conversation and all sub-conversations.
95 usage *CumulativeUsage
96}
97
98// newConvoID generates a new 8-byte random id.
99// The uniqueness/collision requirements here are very low.
100// They are not global identifiers,
101// just enough to distinguish different convos in a single session.
102func newConvoID() string {
103 u1 := rand.Uint32()
104 s := crock32.Encode(uint64(u1))
105 if len(s) < 7 {
106 s += strings.Repeat("0", 7-len(s))
107 }
108 return s[:3] + "-" + s[3:]
109}
110
111// New creates a new conversation with Claude with sensible defaults.
112// ctx is the context for the entire conversation.
113func New(ctx context.Context, srv llm.Service) *Convo {
114 id := newConvoID()
115 return &Convo{
116 Ctx: skribe.ContextWithAttr(ctx, slog.String("convo_id", id)),
117 Service: srv,
118 PromptCaching: true,
119 usage: newUsage(),
120 Listener: &NoopListener{},
121 ID: id,
122 muToolUseCancel: &sync.Mutex{},
123 toolUseCancel: map[string]context.CancelCauseFunc{},
124 mu: &sync.Mutex{},
125 }
126}
127
128// SubConvo creates a sub-conversation with the same configuration as the parent conversation.
129// (This propagates context for cancellation, HTTP client, API key, etc.)
130// The sub-conversation shares no messages with the parent conversation.
131// It does not inherit tools from the parent conversation.
132func (c *Convo) SubConvo() *Convo {
133 id := newConvoID()
134 return &Convo{
135 Ctx: skribe.ContextWithAttr(c.Ctx, slog.String("convo_id", id), slog.String("parent_convo_id", c.ID)),
136 Service: c.Service,
137 PromptCaching: c.PromptCaching,
138 Parent: c,
139 // For convenience, sub-convo usage shares tool uses map with parent,
140 // all other fields separate, propagated in AddResponse
141 usage: newUsageWithSharedToolUses(c.usage),
142 mu: c.mu,
143 Listener: c.Listener,
144 ID: id,
145 // Do not copy Budget. Each budget is independent,
146 // and OverBudget checks whether any ancestor is over budget.
147 }
148}
149
150func (c *Convo) SubConvoWithHistory() *Convo {
151 id := newConvoID()
152 return &Convo{
153 Ctx: skribe.ContextWithAttr(c.Ctx, slog.String("convo_id", id), slog.String("parent_convo_id", c.ID)),
154 Service: c.Service,
155 PromptCaching: c.PromptCaching,
156 Parent: c,
157 // For convenience, sub-convo usage shares tool uses map with parent,
158 // all other fields separate, propagated in AddResponse
159 usage: newUsageWithSharedToolUses(c.usage),
160 mu: c.mu,
161 Listener: c.Listener,
162 ID: id,
163 // Do not copy Budget. Each budget is independent,
164 // and OverBudget checks whether any ancestor is over budget.
165 messages: slices.Clone(c.messages),
166 }
167}
168
169// Depth reports how many "sub-conversations" deep this conversation is.
170// That it, it walks up parents until it finds a root.
171func (c *Convo) Depth() int {
172 x := c
173 var depth int
174 for x.Parent != nil {
175 x = x.Parent
176 depth++
177 }
178 return depth
179}
180
181// SendUserTextMessage sends a text message to the LLM in this conversation.
182// otherContents contains additional contents to send with the message, usually tool results.
183func (c *Convo) SendUserTextMessage(s string, otherContents ...llm.Content) (*llm.Response, error) {
184 contents := slices.Clone(otherContents)
185 if s != "" {
186 contents = append(contents, llm.Content{Type: llm.ContentTypeText, Text: s})
187 }
188 msg := llm.Message{
189 Role: llm.MessageRoleUser,
190 Content: contents,
191 }
192 return c.SendMessage(msg)
193}
194
195func (c *Convo) messageRequest(msg llm.Message) *llm.Request {
196 system := []llm.SystemContent{}
197 if c.SystemPrompt != "" {
198 var d llm.SystemContent
199 d = llm.SystemContent{Type: "text", Text: c.SystemPrompt}
200 if c.PromptCaching {
201 d.Cache = true
202 }
203 system = []llm.SystemContent{d}
204 }
205
206 // Claude is happy to return an empty response in response to our Done() call,
207 // and, if so, you'll see something like:
208 // API request failed with status 400 Bad Request
209 // {"type":"error","error": {"type":"invalid_request_error",
210 // "message":"messages.5: all messages must have non-empty content except for the optional final assistant message"}}
211 // So, we filter out those empty messages.
212 var nonEmptyMessages []llm.Message
213 for _, m := range c.messages {
214 if len(m.Content) > 0 {
215 nonEmptyMessages = append(nonEmptyMessages, m)
216 }
217 }
218
219 mr := &llm.Request{
220 Messages: append(nonEmptyMessages, msg), // not yet committed to keeping msg
221 System: system,
222 Tools: c.Tools,
223 }
224 if c.ToolUseOnly {
225 mr.ToolChoice = &llm.ToolChoice{Type: llm.ToolChoiceTypeAny}
226 }
227 return mr
228}
229
230func (c *Convo) findTool(name string) (*llm.Tool, error) {
231 for _, tool := range c.Tools {
232 if tool.Name == name {
233 return tool, nil
234 }
235 }
236 return nil, fmt.Errorf("tool %q not found", name)
237}
238
239// insertMissingToolResults adds error results for tool uses that were requested
240// but not included in the message, which can happen in error paths like "out of budget."
241// We only insert these if there were no tool responses at all, since an incorrect
242// number of tool results would be a programmer error. Mutates inputs.
243func (c *Convo) insertMissingToolResults(mr *llm.Request, msg *llm.Message) {
244 if len(mr.Messages) < 2 {
245 return
246 }
247 prev := mr.Messages[len(mr.Messages)-2]
248 var toolUsePrev int
249 for _, c := range prev.Content {
250 if c.Type == llm.ContentTypeToolUse {
251 toolUsePrev++
252 }
253 }
254 if toolUsePrev == 0 {
255 return
256 }
257 var toolUseCurrent int
258 for _, c := range msg.Content {
259 if c.Type == llm.ContentTypeToolResult {
260 toolUseCurrent++
261 }
262 }
263 if toolUseCurrent != 0 {
264 return
265 }
266 var prefix []llm.Content
267 for _, part := range prev.Content {
268 if part.Type != llm.ContentTypeToolUse {
269 continue
270 }
271 content := llm.Content{
272 Type: llm.ContentTypeToolResult,
273 ToolUseID: part.ID,
274 ToolError: true,
275 ToolResult: "not executed; retry possible",
276 }
277 prefix = append(prefix, content)
278 msg.Content = append(prefix, msg.Content...)
279 mr.Messages[len(mr.Messages)-1].Content = msg.Content
280 }
281 slog.DebugContext(c.Ctx, "inserted missing tool results")
282}
283
284// SendMessage sends a message to Claude.
285// The conversation records (internally) all messages succesfully sent and received.
286func (c *Convo) SendMessage(msg llm.Message) (*llm.Response, error) {
287 id := ulid.Make().String()
288 mr := c.messageRequest(msg)
289 var lastMessage *llm.Message
290 if c.PromptCaching {
291 lastMessage = &mr.Messages[len(mr.Messages)-1]
292 if len(lastMessage.Content) > 0 {
293 lastMessage.Content[len(lastMessage.Content)-1].Cache = true
294 }
295 }
296 defer func() {
297 if lastMessage == nil {
298 return
299 }
300 if len(lastMessage.Content) > 0 {
301 lastMessage.Content[len(lastMessage.Content)-1].Cache = false
302 }
303 }()
304 c.insertMissingToolResults(mr, &msg)
305 c.Listener.OnRequest(c.Ctx, c, id, &msg)
306
307 startTime := time.Now()
308 resp, err := c.Service.Do(c.Ctx, mr)
309 if resp != nil {
310 resp.StartTime = &startTime
311 endTime := time.Now()
312 resp.EndTime = &endTime
313 }
314
315 if err != nil {
316 c.Listener.OnResponse(c.Ctx, c, id, nil)
317 return nil, err
318 }
319 c.messages = append(c.messages, msg, resp.ToMessage())
320 // Propagate usage to all ancestors (including us).
321 for x := c; x != nil; x = x.Parent {
322 x.usage.Add(resp.Usage)
323 }
324 c.Listener.OnResponse(c.Ctx, c, id, resp)
325 return resp, err
326}
327
328type toolCallInfoKeyType string
329
330var toolCallInfoKey toolCallInfoKeyType
331
332type ToolCallInfo struct {
333 ToolUseID string
334 Convo *Convo
335}
336
337func ToolCallInfoFromContext(ctx context.Context) ToolCallInfo {
338 v := ctx.Value(toolCallInfoKey)
339 i, _ := v.(ToolCallInfo)
340 return i
341}
342
343func (c *Convo) ToolResultCancelContents(resp *llm.Response) ([]llm.Content, error) {
344 if resp.StopReason != llm.StopReasonToolUse {
345 return nil, nil
346 }
347 var toolResults []llm.Content
348
349 for _, part := range resp.Content {
350 if part.Type != llm.ContentTypeToolUse {
351 continue
352 }
353 c.incrementToolUse(part.ToolName)
354
355 content := llm.Content{
356 Type: llm.ContentTypeToolResult,
357 ToolUseID: part.ID,
358 }
359
360 content.ToolError = true
361 content.ToolResult = "user canceled this too_use"
362 toolResults = append(toolResults, content)
363 }
364 return toolResults, nil
365}
366
367// GetID returns the conversation ID
368func (c *Convo) GetID() string {
369 return c.ID
370}
371
372func (c *Convo) CancelToolUse(toolUseID string, err error) error {
373 c.muToolUseCancel.Lock()
374 defer c.muToolUseCancel.Unlock()
375 cancel, ok := c.toolUseCancel[toolUseID]
376 if !ok {
377 return fmt.Errorf("cannot cancel %s: no cancel function registered for this tool_use_id. All I have is %+v", toolUseID, c.toolUseCancel)
378 }
379 delete(c.toolUseCancel, toolUseID)
380 cancel(err)
381 return nil
382}
383
384func (c *Convo) newToolUseContext(ctx context.Context, toolUseID string) (context.Context, context.CancelFunc) {
385 c.muToolUseCancel.Lock()
386 defer c.muToolUseCancel.Unlock()
387 ctx, cancel := context.WithCancelCause(ctx)
388 c.toolUseCancel[toolUseID] = cancel
389 return ctx, func() { c.CancelToolUse(toolUseID, nil) }
390}
391
392// ToolResultContents runs all tool uses requested by the response and returns their results.
393// Cancelling ctx will cancel any running tool calls.
394func (c *Convo) ToolResultContents(ctx context.Context, resp *llm.Response) ([]llm.Content, error) {
395 if resp.StopReason != llm.StopReasonToolUse {
396 return nil, nil
397 }
398 // Extract all tool calls from the response, call the tools, and gather the results.
399 var wg sync.WaitGroup
400 toolResultC := make(chan llm.Content, len(resp.Content))
401 for _, part := range resp.Content {
402 if part.Type != llm.ContentTypeToolUse {
403 continue
404 }
405 c.incrementToolUse(part.ToolName)
406 startTime := time.Now()
407
408 c.Listener.OnToolCall(ctx, c, part.ID, part.ToolName, part.ToolInput, llm.Content{
409 Type: llm.ContentTypeToolUse,
410 ToolUseID: part.ID,
411 ToolUseStartTime: &startTime,
412 })
413
414 wg.Add(1)
415 go func() {
416 defer wg.Done()
417
418 content := llm.Content{
419 Type: llm.ContentTypeToolResult,
420 ToolUseID: part.ID,
421 ToolUseStartTime: &startTime,
422 }
423 sendErr := func(err error) {
424 // Record end time
425 endTime := time.Now()
426 content.ToolUseEndTime = &endTime
427
428 content.ToolError = true
429 content.ToolResult = err.Error()
430 c.Listener.OnToolResult(ctx, c, part.ID, part.ToolName, part.ToolInput, content, nil, err)
431 toolResultC <- content
432 }
433 sendRes := func(res string) {
434 // Record end time
435 endTime := time.Now()
436 content.ToolUseEndTime = &endTime
437
438 content.ToolResult = res
439 c.Listener.OnToolResult(ctx, c, part.ID, part.ToolName, part.ToolInput, content, &res, nil)
440 toolResultC <- content
441 }
442
443 tool, err := c.findTool(part.ToolName)
444 if err != nil {
445 sendErr(err)
446 return
447 }
448 // Create a new context for just this tool_use call, and register its
449 // cancel function so that it can be canceled individually.
450 toolUseCtx, cancel := c.newToolUseContext(ctx, part.ID)
451 defer cancel()
452 // TODO: move this into newToolUseContext?
453 toolUseCtx = context.WithValue(toolUseCtx, toolCallInfoKey, ToolCallInfo{ToolUseID: part.ID, Convo: c})
454 toolResult, err := tool.Run(toolUseCtx, part.ToolInput)
455 if errors.Is(err, ErrDoNotRespond) {
456 return
457 }
458 if toolUseCtx.Err() != nil {
459 sendErr(context.Cause(toolUseCtx))
460 return
461 }
462
463 if err != nil {
464 sendErr(err)
465 return
466 }
467 sendRes(toolResult)
468 }()
469 }
470 wg.Wait()
471 close(toolResultC)
472 var toolResults []llm.Content
473 for toolResult := range toolResultC {
474 toolResults = append(toolResults, toolResult)
475 }
476 if ctx.Err() != nil {
477 return nil, ctx.Err()
478 }
479 return toolResults, nil
480}
481
482func (c *Convo) incrementToolUse(name string) {
483 c.mu.Lock()
484 defer c.mu.Unlock()
485
486 c.usage.ToolUses[name]++
487}
488
489// CumulativeUsage represents cumulative usage across a Convo, including all sub-conversations.
490type CumulativeUsage struct {
491 StartTime time.Time `json:"start_time"`
492 Responses uint64 `json:"messages"` // count of responses
493 InputTokens uint64 `json:"input_tokens"`
494 OutputTokens uint64 `json:"output_tokens"`
495 CacheReadInputTokens uint64 `json:"cache_read_input_tokens"`
496 CacheCreationInputTokens uint64 `json:"cache_creation_input_tokens"`
497 TotalCostUSD float64 `json:"total_cost_usd"`
498 ToolUses map[string]int `json:"tool_uses"` // tool name -> number of uses
499}
500
501func newUsage() *CumulativeUsage {
502 return &CumulativeUsage{ToolUses: make(map[string]int), StartTime: time.Now()}
503}
504
505func newUsageWithSharedToolUses(parent *CumulativeUsage) *CumulativeUsage {
506 return &CumulativeUsage{ToolUses: parent.ToolUses, StartTime: time.Now()}
507}
508
509func (u *CumulativeUsage) Clone() CumulativeUsage {
510 v := *u
511 v.ToolUses = maps.Clone(u.ToolUses)
512 return v
513}
514
515func (c *Convo) CumulativeUsage() CumulativeUsage {
516 if c == nil {
517 return CumulativeUsage{}
518 }
519 c.mu.Lock()
520 defer c.mu.Unlock()
521 return c.usage.Clone()
522}
523
524func (u *CumulativeUsage) WallTime() time.Duration {
525 return time.Since(u.StartTime)
526}
527
528func (u *CumulativeUsage) DollarsPerHour() float64 {
529 hours := u.WallTime().Hours()
530 // Prevent division by very small numbers that could cause issues
531 if hours < 1e-6 {
532 return 0
533 }
534 return u.TotalCostUSD / hours
535}
536
537func (u *CumulativeUsage) Add(usage llm.Usage) {
538 u.Responses++
539 u.InputTokens += usage.InputTokens
540 u.OutputTokens += usage.OutputTokens
541 u.CacheReadInputTokens += usage.CacheReadInputTokens
542 u.CacheCreationInputTokens += usage.CacheCreationInputTokens
543 u.TotalCostUSD += usage.CostUSD
544}
545
546// TotalInputTokens returns the grand total cumulative input tokens in u.
547func (u *CumulativeUsage) TotalInputTokens() uint64 {
548 return u.InputTokens + u.CacheReadInputTokens + u.CacheCreationInputTokens
549}
550
551// Attr returns the cumulative usage as a slog.Attr with key "usage".
552func (u CumulativeUsage) Attr() slog.Attr {
553 elapsed := time.Since(u.StartTime)
554 return slog.Group("usage",
555 slog.Duration("wall_time", elapsed),
556 slog.Uint64("responses", u.Responses),
557 slog.Uint64("input_tokens", u.InputTokens),
558 slog.Uint64("output_tokens", u.OutputTokens),
559 slog.Uint64("cache_read_input_tokens", u.CacheReadInputTokens),
560 slog.Uint64("cache_creation_input_tokens", u.CacheCreationInputTokens),
561 slog.Float64("total_cost_usd", u.TotalCostUSD),
562 slog.Float64("dollars_per_hour", u.TotalCostUSD/elapsed.Hours()),
563 slog.Any("tool_uses", maps.Clone(u.ToolUses)),
564 )
565}
566
567// A Budget represents the maximum amount of resources that may be spent on a conversation.
568// Note that the default (zero) budget is unlimited.
569type Budget struct {
570 MaxResponses uint64 // if > 0, max number of iterations (=responses)
571 MaxDollars float64 // if > 0, max dollars that may be spent
572 MaxWallTime time.Duration // if > 0, max wall time that may be spent
573}
574
575// OverBudget returns an error if the convo (or any of its parents) has exceeded its budget.
576// TODO: document parent vs sub budgets, multiple errors, etc, once we know the desired behavior.
577func (c *Convo) OverBudget() error {
578 for x := c; x != nil; x = x.Parent {
579 if err := x.overBudget(); err != nil {
580 return err
581 }
582 }
583 return nil
584}
585
586// ResetBudget sets the budget to the passed in budget and
587// adjusts it by what's been used so far.
588func (c *Convo) ResetBudget(budget Budget) {
589 c.Budget = budget
590 if c.Budget.MaxDollars > 0 {
591 c.Budget.MaxDollars += c.CumulativeUsage().TotalCostUSD
592 }
593 if c.Budget.MaxResponses > 0 {
594 c.Budget.MaxResponses += c.CumulativeUsage().Responses
595 }
596 if c.Budget.MaxWallTime > 0 {
597 c.Budget.MaxWallTime += c.usage.WallTime()
598 }
599}
600
601func (c *Convo) overBudget() error {
602 usage := c.CumulativeUsage()
603 // TODO: stop before we exceed the budget instead of after?
604 // Top priority is money, then time, then response count.
605 var err error
606 cont := "Continuing to chat will reset the budget."
607 if c.Budget.MaxDollars > 0 && usage.TotalCostUSD >= c.Budget.MaxDollars {
608 err = errors.Join(err, fmt.Errorf("$%.2f spent, budget is $%.2f. %s", usage.TotalCostUSD, c.Budget.MaxDollars, cont))
609 }
610 if c.Budget.MaxWallTime > 0 && usage.WallTime() >= c.Budget.MaxWallTime {
611 err = errors.Join(err, fmt.Errorf("%v elapsed, budget is %v. %s", usage.WallTime().Truncate(time.Second), c.Budget.MaxWallTime.Truncate(time.Second), cont))
612 }
613 if c.Budget.MaxResponses > 0 && usage.Responses >= c.Budget.MaxResponses {
614 err = errors.Join(err, fmt.Errorf("%d responses received, budget is %d. %s", usage.Responses, c.Budget.MaxResponses, cont))
615 }
616 return err
617}