mirror of
https://github.com/tiennm99/goclaw.git
synced 2026-06-10 00:13:42 +00:00
2c1ef25392
- Token cost tracking: accumulate input/output tokens per subagent, include in announce messages and persist to DB - Per-edition rate limits: MaxSubagentConcurrent/Depth on Edition struct, tenant-scoped concurrency enforcement in Spawn/RunSync - WaitAll action: spawn(action=wait, timeout=N) blocks until all children complete, returns merged summary - Auto-retry: configurable MaxRetries (default 2) with linear backoff for transient LLM failures - Producer-consumer announce queue: merges staggered subagent results into single LLM run (same pattern as team task announces) - Raw metadata in bus messages to prevent double-formatting - Fire-and-forget DB persistence with detached context + tenant scope - Split oversized files for <200 line compliance
191 lines
5.5 KiB
Go
191 lines
5.5 KiB
Go
package tools
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/nextlevelbuilder/goclaw/internal/edition"
|
|
"github.com/nextlevelbuilder/goclaw/internal/store"
|
|
"github.com/nextlevelbuilder/goclaw/internal/tracing"
|
|
)
|
|
|
|
// Spawn creates a new subagent task that runs asynchronously.
|
|
// Returns immediately with a status message. The subagent runs in a goroutine.
|
|
// modelOverride optionally overrides the LLM model for this subagent (matching TS sessions-spawn-tool.ts).
|
|
func (sm *SubagentManager) Spawn(
|
|
ctx context.Context,
|
|
parentID string,
|
|
depth int,
|
|
task, label, modelOverride string,
|
|
channel, chatID, peerKind string,
|
|
callback AsyncCallback,
|
|
) (string, error) {
|
|
cfg := sm.effectiveConfig(ctx)
|
|
|
|
// Apply edition ceilings (Lite edition enforces lower limits).
|
|
ed := edition.Current()
|
|
if ed.MaxSubagentConcurrent > 0 && cfg.MaxConcurrent > ed.MaxSubagentConcurrent {
|
|
cfg.MaxConcurrent = ed.MaxSubagentConcurrent
|
|
}
|
|
if ed.MaxSubagentDepth > 0 && cfg.MaxSpawnDepth > ed.MaxSubagentDepth {
|
|
cfg.MaxSpawnDepth = ed.MaxSubagentDepth
|
|
}
|
|
|
|
sm.mu.Lock()
|
|
|
|
// Check depth limit
|
|
if depth >= cfg.MaxSpawnDepth {
|
|
sm.mu.Unlock()
|
|
return "", fmt.Errorf("spawn depth limit reached (%d/%d)", depth, cfg.MaxSpawnDepth)
|
|
}
|
|
|
|
// Check concurrent limit (scoped per tenant for isolation).
|
|
tenantID := store.TenantIDFromContext(ctx)
|
|
running := 0
|
|
for _, t := range sm.tasks {
|
|
if t.Status == TaskStatusRunning && t.OriginTenantID == tenantID {
|
|
running++
|
|
}
|
|
}
|
|
if running >= cfg.MaxConcurrent {
|
|
sm.mu.Unlock()
|
|
return "", fmt.Errorf("max concurrent subagents reached (%d/%d)", running, cfg.MaxConcurrent)
|
|
}
|
|
|
|
// Check per-parent children limit
|
|
childCount := 0
|
|
for _, t := range sm.tasks {
|
|
if t.ParentID == parentID {
|
|
childCount++
|
|
}
|
|
}
|
|
if childCount >= cfg.MaxChildrenPerAgent {
|
|
sm.mu.Unlock()
|
|
return "", fmt.Errorf("max children per agent reached (%d/%d)", childCount, cfg.MaxChildrenPerAgent)
|
|
}
|
|
|
|
id := generateSubagentID()
|
|
if label == "" {
|
|
label = truncate(task, 50)
|
|
}
|
|
|
|
subTask := &SubagentTask{
|
|
ID: id,
|
|
ParentID: parentID,
|
|
Task: task,
|
|
Label: label,
|
|
Status: "running",
|
|
Depth: depth + 1,
|
|
Model: modelOverride,
|
|
OriginChannel: channel,
|
|
OriginChatID: chatID,
|
|
OriginPeerKind: peerKind,
|
|
OriginLocalKey: ToolLocalKeyFromCtx(ctx),
|
|
OriginUserID: store.UserIDFromContext(ctx),
|
|
OriginSessionKey: ToolSessionKeyFromCtx(ctx),
|
|
OriginTenantID: store.TenantIDFromContext(ctx),
|
|
OriginTraceID: tracing.TraceIDFromContext(ctx),
|
|
OriginRootSpanID: tracing.ParentSpanIDFromContext(ctx),
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
spawnConfig: cfg,
|
|
}
|
|
// Detach from parent's cancellation chain so subagent survives after parent run completes.
|
|
// WithoutCancel preserves all context values (agent ID, workspace, trace info, etc.)
|
|
// but parent Done() no longer propagates. Manual cancel via taskCancel() still works.
|
|
detached := context.WithoutCancel(ctx)
|
|
taskCtx, taskCancel := context.WithCancel(detached)
|
|
subTask.cancelFunc = taskCancel
|
|
|
|
// Assign DB UUID inside lock to avoid race with runTask goroutine.
|
|
if sm.taskStore != nil {
|
|
subTask.dbID = store.GenNewID()
|
|
}
|
|
|
|
sm.tasks[id] = subTask
|
|
sm.mu.Unlock()
|
|
|
|
slog.Info("subagent spawned", "id", id, "parent", parentID, "depth", subTask.Depth, "label", label)
|
|
|
|
// Persist to DB (fire-and-forget).
|
|
if sm.taskStore != nil {
|
|
go sm.persistCreate(taskCtx, subTask)
|
|
}
|
|
|
|
go sm.runTask(taskCtx, subTask, callback)
|
|
|
|
return fmt.Sprintf("Spawned subagent '%s' (id=%s, depth=%d) for task: %s",
|
|
label, id, subTask.Depth, truncate(task, 100)), nil
|
|
}
|
|
|
|
// RunSync executes a subagent task synchronously, blocking until completion.
|
|
func (sm *SubagentManager) RunSync(
|
|
ctx context.Context,
|
|
parentID string,
|
|
depth int,
|
|
task, label string,
|
|
channel, chatID string,
|
|
) (string, int, error) {
|
|
cfg := sm.effectiveConfig(ctx)
|
|
|
|
// Apply edition ceilings (same as Spawn).
|
|
ed := edition.Current()
|
|
if ed.MaxSubagentConcurrent > 0 && cfg.MaxConcurrent > ed.MaxSubagentConcurrent {
|
|
cfg.MaxConcurrent = ed.MaxSubagentConcurrent
|
|
}
|
|
if ed.MaxSubagentDepth > 0 && cfg.MaxSpawnDepth > ed.MaxSubagentDepth {
|
|
cfg.MaxSpawnDepth = ed.MaxSubagentDepth
|
|
}
|
|
|
|
sm.mu.Lock()
|
|
|
|
if depth >= cfg.MaxSpawnDepth {
|
|
sm.mu.Unlock()
|
|
return "", 0, fmt.Errorf("spawn depth limit reached (%d/%d)", depth, cfg.MaxSpawnDepth)
|
|
}
|
|
|
|
id := generateSubagentID()
|
|
if label == "" {
|
|
label = truncate(task, 50)
|
|
}
|
|
|
|
subTask := &SubagentTask{
|
|
ID: id,
|
|
ParentID: parentID,
|
|
Task: task,
|
|
Label: label,
|
|
Status: "running",
|
|
Depth: depth + 1,
|
|
OriginChannel: channel,
|
|
OriginChatID: chatID,
|
|
OriginLocalKey: ToolLocalKeyFromCtx(ctx),
|
|
OriginUserID: store.UserIDFromContext(ctx),
|
|
OriginSessionKey: ToolSessionKeyFromCtx(ctx),
|
|
OriginTenantID: store.TenantIDFromContext(ctx),
|
|
OriginTraceID: tracing.TraceIDFromContext(ctx),
|
|
OriginRootSpanID: tracing.ParentSpanIDFromContext(ctx),
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
spawnConfig: cfg,
|
|
}
|
|
if sm.taskStore != nil {
|
|
subTask.dbID = store.GenNewID()
|
|
}
|
|
sm.tasks[id] = subTask
|
|
sm.mu.Unlock()
|
|
|
|
slog.Info("subagent sync started", "id", id, "parent", parentID, "depth", subTask.Depth, "label", label)
|
|
|
|
if sm.taskStore != nil {
|
|
sm.persistCreate(ctx, subTask)
|
|
}
|
|
|
|
iterations := sm.executeTask(ctx, subTask)
|
|
|
|
if subTask.Status == TaskStatusFailed {
|
|
return subTask.Result, iterations, fmt.Errorf("subagent failed: %s", subTask.Result)
|
|
}
|
|
|
|
return subTask.Result, iterations, nil
|
|
}
|