Files
viettranx 2c1ef25392 feat(subagent): token tracking, edition limits, waitAll, auto-retry, producer-consumer announce (#600)
- Token cost tracking: accumulate input/output tokens per subagent,
  include in announce messages and persist to DB
- Per-edition rate limits: MaxSubagentConcurrent/Depth on Edition struct,
  tenant-scoped concurrency enforcement in Spawn/RunSync
- WaitAll action: spawn(action=wait, timeout=N) blocks until all
  children complete, returns merged summary
- Auto-retry: configurable MaxRetries (default 2) with linear backoff
  for transient LLM failures
- Producer-consumer announce queue: merges staggered subagent results
  into single LLM run (same pattern as team task announces)
- Raw metadata in bus messages to prevent double-formatting
- Fire-and-forget DB persistence with detached context + tenant scope
- Split oversized files for <200 line compliance
2026-03-31 11:45:16 +07:00

332 lines
11 KiB
Go

package tools
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strings"
"time"
"github.com/nextlevelbuilder/goclaw/internal/bus"
"github.com/nextlevelbuilder/goclaw/internal/providers"
"github.com/nextlevelbuilder/goclaw/internal/store"
"github.com/nextlevelbuilder/goclaw/internal/tracing"
)
// runTask executes the subagent in a goroutine.
func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask, callback AsyncCallback) {
iterations := sm.executeTask(ctx, task)
// Announce result to parent via bus (matching TS subagent-announce.ts pattern).
// The announce goes through the parent agent's session so the agent can
// reformulate the result for the user.
if sm.msgBus != nil && task.OriginChannel != "" {
elapsed := time.Since(time.UnixMilli(task.CreatedAt))
item := AnnounceQueueItem{
SubagentID: task.ID,
Label: task.Label,
Status: task.Status,
Result: task.Result,
Media: task.Media,
Runtime: elapsed,
Iterations: iterations,
InputTokens: task.TotalInputTokens,
OutputTokens: task.TotalOutputTokens,
}
meta := AnnounceMetadata{
OriginChannel: task.OriginChannel,
OriginChatID: task.OriginChatID,
OriginPeerKind: task.OriginPeerKind,
OriginLocalKey: task.OriginLocalKey,
OriginUserID: task.OriginUserID,
OriginSessionKey: task.OriginSessionKey,
OriginTenantID: task.OriginTenantID,
ParentAgent: task.ParentID,
OriginTraceID: task.OriginTraceID.String(),
OriginRootSpanID: task.OriginRootSpanID.String(),
}
if sm.announceQueue != nil {
// Use batched announce queue (matching TS debounce pattern)
sessionKey := fmt.Sprintf("announce:%s:%s", task.ParentID, task.OriginChatID)
sm.announceQueue.Enqueue(sessionKey, item, meta)
} else {
// Direct publish (no batching)
roster := sm.RosterForParent(task.ParentID)
announceContent := FormatBatchedAnnounce([]AnnounceQueueItem{item}, roster)
announceMeta := map[string]string{
MetaOriginChannel: task.OriginChannel,
MetaOriginPeerKind: task.OriginPeerKind,
MetaParentAgent: task.ParentID,
"subagent_id": task.ID,
MetaSubagentLabel: task.Label,
MetaSubagentStatus: task.Status,
MetaSubagentResult: task.Result,
MetaSubagentRuntime: fmt.Sprintf("%d", elapsed.Milliseconds()),
MetaSubagentIterations: fmt.Sprintf("%d", iterations),
MetaSubagentInputToks: fmt.Sprintf("%d", task.TotalInputTokens),
MetaSubagentOutputToks: fmt.Sprintf("%d", task.TotalOutputTokens),
MetaOriginTraceID: task.OriginTraceID.String(),
MetaOriginRootSpanID: task.OriginRootSpanID.String(),
}
if task.OriginLocalKey != "" {
announceMeta[MetaOriginLocalKey] = task.OriginLocalKey
}
if task.OriginSessionKey != "" {
announceMeta[MetaOriginSessionKey] = task.OriginSessionKey
}
sm.msgBus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: fmt.Sprintf("subagent:%s", task.ID),
ChatID: task.OriginChatID,
Content: announceContent,
UserID: task.OriginUserID,
TenantID: task.OriginTenantID,
Metadata: announceMeta,
Media: task.Media,
})
}
}
// Call completion callback
if callback != nil {
result := NewResult(fmt.Sprintf("Subagent '%s' completed in %d iterations.\n\nResult:\n%s",
task.Label, iterations, task.Result))
callback(ctx, result)
}
}
// executeTask runs the LLM tool loop for a subagent. Returns iteration count.
func (sm *SubagentManager) executeTask(ctx context.Context, task *SubagentTask) int {
// Tracing: generate a root span ID for this subagent execution.
// LLM/tool spans will nest under this root span via parent_span_id.
// The root span itself links to the parent agent's root span (from ctx).
subRootSpanID := store.GenNewID()
taskStart := time.Now().UTC()
// Use a detached context for tracing so spans are emitted even if parent ctx is cancelled.
// We copy tracing values but remove the cancellation chain.
traceCtx := context.Background()
if collector := tracing.CollectorFromContext(ctx); collector != nil {
traceCtx = tracing.WithCollector(traceCtx, collector)
traceCtx = tracing.WithTraceID(traceCtx, tracing.TraceIDFromContext(ctx))
// Keep original parent_span_id (parent agent's root span) for the subagent root span.
traceCtx = tracing.WithParentSpanID(traceCtx, tracing.ParentSpanIDFromContext(ctx))
}
// subCtx overrides parent_span_id so child spans nest under subRootSpanID.
// traceCtx retains the original parent_span_id for the root subagent span.
subTraceCtx := tracing.WithParentSpanID(traceCtx, subRootSpanID)
var model string
var finalContent string
iteration := 0
defer func() {
sm.mu.Lock()
task.CompletedAt = time.Now().UnixMilli()
sm.mu.Unlock()
// Finalize root subagent span on exit (uses traceCtx which is never cancelled).
sm.emitSubagentSpanEnd(traceCtx, subRootSpanID, taskStart, task, finalContent)
slog.Debug("subagent tracing: root span finalized",
"id", task.ID, "span_id", subRootSpanID,
"trace_id", tracing.TraceIDFromContext(traceCtx),
"status", task.Status, "iterations", iteration)
// Schedule auto-archive
if task.spawnConfig.ArchiveAfterMinutes > 0 {
go sm.scheduleArchive(task.ID, time.Duration(task.spawnConfig.ArchiveAfterMinutes)*time.Minute)
}
}()
if ctx.Err() != nil {
sm.mu.Lock()
task.Status = TaskStatusCancelled
task.Result = "cancelled before execution"
sm.mu.Unlock()
return 0
}
// Build tools for subagent (no spawn/subagent tools to prevent recursion)
toolsReg := sm.createTools()
sm.applyDenyList(toolsReg, task.Depth, task.spawnConfig)
// Determine model (cascading priority):
// 1. Per-task model override (highest — LLM specified model in spawn call)
// 2. SubagentConfig.Model (agent-level subagent override)
// 3. Parent agent's model (inherit from the agent that spawned us)
// 4. SubagentManager default model (system-wide fallback)
model = sm.model
if parentModel := ParentModelFromCtx(ctx); parentModel != "" {
model = parentModel
}
if task.spawnConfig.Model != "" {
model = task.spawnConfig.Model
}
if task.Model != "" {
model = task.Model
}
// Determine provider (cascading priority):
// 1. Parent agent's provider (inherit so model/provider combo stays valid)
// 2. SubagentManager default provider (system-wide fallback)
activeProvider := sm.provider
if sm.providerReg != nil {
if parentProviderName := ParentProviderFromCtx(ctx); parentProviderName != "" {
if p, err := sm.providerReg.Get(ctx, parentProviderName); err == nil {
activeProvider = p
}
}
}
// Emit running subagent root span (after model resolution so span has correct model).
sm.emitSubagentSpanStart(traceCtx, subRootSpanID, taskStart, task, model, activeProvider.Name())
// Build subagent system prompt (matching TS buildSubagentSystemPrompt pattern).
workspace := ToolWorkspaceFromCtx(ctx)
systemPrompt := sm.buildSubagentSystemPrompt(task, task.spawnConfig, workspace)
messages := []providers.Message{
{Role: "system", Content: systemPrompt},
{Role: "user", Content: task.Task},
}
// Run LLM iteration loop (similar to agent loop but simplified)
var mediaFiles []bus.MediaFile
maxIterations := 20
for iteration < maxIterations {
iteration++
if ctx.Err() != nil {
sm.mu.Lock()
task.Status = TaskStatusCancelled
task.Result = "cancelled during execution"
sm.mu.Unlock()
return iteration
}
chatReq := providers.ChatRequest{
Messages: messages,
Tools: toolsReg.ProviderDefs(),
Model: model,
Options: map[string]any{
"max_tokens": 4096,
"temperature": 0.5,
},
}
llmStart := time.Now().UTC()
llmSpanID := sm.emitLLMSpanStart(subTraceCtx, llmStart, iteration, model, activeProvider.Name(), messages)
maxRetries := task.spawnConfig.MaxRetries
if maxRetries <= 0 {
maxRetries = 2
}
var resp *providers.ChatResponse
var err error
for attempt := 0; attempt <= maxRetries; attempt++ {
if attempt > 0 {
backoff := time.Duration(attempt) * 2 * time.Second
select {
case <-ctx.Done():
case <-time.After(backoff):
}
if ctx.Err() != nil {
break
}
slog.Info("subagent LLM retry", "id", task.ID, "iteration", iteration, "attempt", attempt+1)
}
resp, err = activeProvider.Chat(ctx, chatReq)
if err == nil {
break
}
}
sm.emitLLMSpanEnd(subTraceCtx, llmSpanID, llmStart, resp, err)
// Accumulate token usage for cost tracking.
if resp != nil && resp.Usage != nil {
sm.mu.Lock()
task.TotalInputTokens += int64(resp.Usage.PromptTokens)
task.TotalOutputTokens += int64(resp.Usage.CompletionTokens)
sm.mu.Unlock()
}
if err != nil {
sm.mu.Lock()
task.Status = TaskStatusFailed
task.Result = fmt.Sprintf("LLM error at iteration %d: %v", iteration, err)
sm.mu.Unlock()
slog.Warn("subagent LLM error", "id", task.ID, "iteration", iteration, "error", err)
go sm.persistStatus(ctx, task, iteration)
return iteration
}
// No tool calls → done
if len(resp.ToolCalls) == 0 {
finalContent = resp.Content
break
}
// Build assistant message
assistantMsg := providers.Message{
Role: "assistant",
Content: resp.Content,
ToolCalls: resp.ToolCalls,
}
messages = append(messages, assistantMsg)
// Execute tools
for _, tc := range resp.ToolCalls {
slog.Debug("subagent tool call", "id", task.ID, "tool", tc.Name)
argsJSON, _ := json.Marshal(tc.Arguments)
toolStart := time.Now().UTC()
toolSpanID := sm.emitToolSpanStart(subTraceCtx, toolStart, tc.Name, tc.ID, string(argsJSON))
result := toolsReg.Execute(ctx, tc.Name, tc.Arguments)
sm.emitToolSpanEnd(subTraceCtx, toolSpanID, toolStart, result.ForLLM, result.IsError)
// Capture media file paths from tool results (e.g. image generation).
if len(result.Media) > 0 {
mediaFiles = append(mediaFiles, result.Media...)
} else if strings.HasPrefix(strings.TrimSpace(result.ForLLM), "MEDIA:") {
// Fallback: parse MEDIA: prefix from ForLLM (same as agent loop's parseMediaResult)
p := strings.TrimSpace(strings.TrimSpace(result.ForLLM)[6:])
if nl := strings.IndexByte(p, '\n'); nl >= 0 {
p = strings.TrimSpace(p[:nl])
}
if p != "" {
mediaFiles = append(mediaFiles, bus.MediaFile{Path: p})
}
}
messages = append(messages, providers.Message{
Role: "tool",
Content: result.ForLLM,
ToolCallID: tc.ID,
})
}
}
sm.mu.Lock()
if finalContent == "" {
finalContent = "Task completed but no final response was generated."
}
task.Status = TaskStatusCompleted
task.Result = finalContent
task.Media = mediaFiles
sm.mu.Unlock()
slog.Info("subagent completed", "id", task.ID, "iterations", iteration)
// Persist final status to DB (fire-and-forget).
go sm.persistStatus(ctx, task, iteration)
return iteration
}