diff --git a/cmd/gateway.go b/cmd/gateway.go index 87f3f231..ea8d0aa3 100644 --- a/cmd/gateway.go +++ b/cmd/gateway.go @@ -788,6 +788,9 @@ func runGateway() { } }) + // Slow tool notification subscriber — direct outbound when tool exceeds adaptive threshold. + wireSlowToolNotifySubscriber(msgBus) + // Start inbound message consumer (channel → scheduler → agent → channel) consumerTeamStore := pgStores.Teams diff --git a/cmd/gateway_slow_tool_notify.go b/cmd/gateway_slow_tool_notify.go new file mode 100644 index 00000000..622cc44b --- /dev/null +++ b/cmd/gateway_slow_tool_notify.go @@ -0,0 +1,48 @@ +package cmd + +import ( + "fmt" + + "github.com/nextlevelbuilder/goclaw/internal/agent" + "github.com/nextlevelbuilder/goclaw/internal/bus" + "github.com/nextlevelbuilder/goclaw/pkg/protocol" +) + +// wireSlowToolNotifySubscriber registers a subscriber that sends direct outbound +// notifications when a tool call exceeds its adaptive slow threshold. +// Always uses direct mode (never leader) to avoid wasting LLM calls. +// Team config (slow_tool enabled/disabled) is resolved in the loop before emitting +// the event, so no DB query is needed here. +func wireSlowToolNotifySubscriber(msgBus *bus.MessageBus) { + msgBus.Subscribe("consumer.slow-tool-notify", func(event bus.Event) { + if event.Name != protocol.EventAgent { + return + } + agentEvent, ok := event.Payload.(agent.AgentEvent) + if !ok || agentEvent.Type != protocol.AgentEventActivity { + return + } + payloadMap, _ := agentEvent.Payload.(map[string]any) + phase, _ := payloadMap["phase"].(string) + if phase != "tool_slow" { + return + } + if agentEvent.Channel == "" || agentEvent.ChatID == "" { + return + } + + tool, _ := payloadMap["tool"].(string) + thresholdMs, _ := payloadMap["threshold_ms"].(int64) + thresholdSec := thresholdMs / 1000 + if thresholdSec <= 0 { + thresholdSec = 120 + } + + content := fmt.Sprintf("⏳ %s: tool %s running longer than usual (>%ds)", agentEvent.AgentID, tool, thresholdSec) + msgBus.PublishOutbound(bus.OutboundMessage{ + Channel: agentEvent.Channel, + ChatID: agentEvent.ChatID, + Content: content, + }) + }) +} diff --git a/internal/agent/loop.go b/internal/agent/loop.go index 980e34ce..366c88b0 100644 --- a/internal/agent/loop.go +++ b/internal/agent/loop.go @@ -164,8 +164,12 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (*RunResult, error) // Auto-resolve team workspace for agents not dispatched via team task. // Lead agents default to team workspace (primary job is team coordination). // Non-lead members keep own workspace; team workspace is accessible via absolute path. + // resolvedTeamSettings caches team settings from workspace resolution + // to avoid re-querying when checking slow_tool notification config. + var resolvedTeamSettings json.RawMessage if req.TeamWorkspace == "" && l.teamStore != nil && l.agentUUID != uuid.Nil { if team, _ := l.teamStore.GetTeamForAgent(ctx, l.agentUUID); team != nil { + resolvedTeamSettings = team.Settings // Shared workspace: scope by teamID only. Isolated (default): scope by chatID too. wsChat := req.ChatID if wsChat == "" { @@ -243,6 +247,12 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (*RunResult, error) l.sessions.SetContextWindow(req.SessionKey, l.contextWindow) } + // 0b. Load adaptive tool timing from session metadata. + toolTiming := ParseToolTiming(l.sessions.GetSessionMetadata(req.SessionKey)) + + // Resolve slow_tool notification config from already-loaded team settings (no extra DB query). + slowToolEnabled := tools.ParseTeamNotifyConfig(resolvedTeamSettings).SlowTool + // 1. Build messages from session history history := l.sessions.GetHistory(req.SessionKey) summary := l.sessions.GetSummary(req.SessionKey) @@ -869,6 +879,8 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (*RunResult, error) toolSpanStart := time.Now().UTC() toolSpanID := l.emitToolSpanStart(ctx, toolSpanStart, tc.Name, tc.ID, string(argsJSON)) + + stopSlowTimer := toolTiming.StartSlowTimer(tc.Name, l.id, req.RunID, slowToolEnabled, emitRun) var result *tools.Result if allowedTools != nil && !allowedTools[tc.Name] { // Attempt lazy activation: deferred MCP tools can be activated on first call @@ -890,9 +902,13 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (*RunResult, error) if result == nil { result = l.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, req.Channel, req.ChatID, req.PeerKind, req.SessionKey, nil) } + stopSlowTimer() l.emitToolSpanEnd(ctx, toolSpanID, toolSpanStart, result) + // Record tool execution time for adaptive thresholds. + toolTiming.Record(tc.Name, time.Since(toolSpanStart).Milliseconds()) + // Record result for loop detection. loopDetector.recordResult(argsHash, result.ForLLM) @@ -1009,6 +1025,8 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (*RunResult, error) // Emit running span inside goroutine — goroutine-safe (channel send only). // End is also emitted here to prevent orphans on ctx cancellation. spanID := l.emitToolSpanStart(ctx, spanStart, tc.Name, tc.ID, string(argsJSON)) + + stopSlowTimer := toolTiming.StartSlowTimer(tc.Name, l.id, req.RunID, slowToolEnabled, emitRun) var result *tools.Result if allowedTools != nil && !allowedTools[tc.Name] { // Attempt lazy activation for deferred MCP tools. @@ -1030,6 +1048,7 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (*RunResult, error) if result == nil { result = l.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, req.Channel, req.ChatID, req.PeerKind, req.SessionKey, nil) } + stopSlowTimer() l.emitToolSpanEnd(ctx, spanID, spanStart, result) resultCh <- indexedResult{idx: idx, tc: tc, result: result, argsJSON: string(argsJSON), spanStart: spanStart} }(i, tc) @@ -1053,6 +1072,8 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (*RunResult, error) // Note: tool span start/end already emitted inside goroutines above. var loopStuck bool for _, r := range collected { + // Record tool execution time for adaptive thresholds. + toolTiming.Record(r.tc.Name, time.Since(r.spanStart).Milliseconds()) // Record for loop detection. argsHash := loopDetector.record(r.tc.Name, r.tc.Arguments) @@ -1222,6 +1243,11 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (*RunResult, error) l.sessions.AddMessage(req.SessionKey, msg) } + // Persist adaptive tool timing to session metadata. + if serialized := toolTiming.Serialize(); serialized != "" { + l.sessions.SetSessionMetadata(req.SessionKey, map[string]string{"tool_timing": serialized}) + } + // Write session metadata (matching TS session entry updates) l.sessions.UpdateMetadata(req.SessionKey, l.model, l.provider.Name(), req.Channel) l.sessions.AccumulateTokens(req.SessionKey, int64(totalUsage.PromptTokens), int64(totalUsage.CompletionTokens)) diff --git a/internal/agent/tool_timing.go b/internal/agent/tool_timing.go new file mode 100644 index 00000000..e7996556 --- /dev/null +++ b/internal/agent/tool_timing.go @@ -0,0 +1,123 @@ +package agent + +import ( + "encoding/json" + "log/slog" + "time" + + "github.com/nextlevelbuilder/goclaw/pkg/protocol" +) + +// defaultSlowToolThreshold is used when no historical data is available for a tool. +const defaultSlowToolThreshold = 120 * time.Second + +// toolTimingMultiplier determines how much slower than the historical max +// a tool call must be before it's considered abnormally slow. +const toolTimingMultiplier = 2.0 + +// minTimingSamples is the minimum number of samples needed before using +// adaptive thresholds instead of the default. +const minTimingSamples = 3 + +// ToolTimingStat tracks execution time statistics for a single tool. +type ToolTimingStat struct { + Min int64 `json:"min"` // minimum duration in ms + Max int64 `json:"max"` // maximum duration in ms + Sum int64 `json:"sum"` // total duration in ms (for avg calculation) + Count int `json:"n"` // number of samples +} + +// ToolTimingMap maps tool names to their timing statistics. +// Concurrency contract: SlowThreshold (read) may be called from goroutines, +// but Record (write) must only be called sequentially after parallel tools complete. +type ToolTimingMap map[string]*ToolTimingStat + +// ParseToolTiming reads tool timing data from session metadata. +// Returns an empty map if the key is missing or malformed. +func ParseToolTiming(metadata map[string]string) ToolTimingMap { + raw, ok := metadata["tool_timing"] + if !ok || raw == "" { + return make(ToolTimingMap) + } + var m ToolTimingMap + if err := json.Unmarshal([]byte(raw), &m); err != nil { + return make(ToolTimingMap) + } + return m +} + +// Serialize returns the JSON string for storage in session metadata. +func (m ToolTimingMap) Serialize() string { + if len(m) == 0 { + return "" + } + data, err := json.Marshal(m) + if err != nil { + return "" + } + return string(data) +} + +// Record adds a new timing sample for the given tool. +func (m ToolTimingMap) Record(toolName string, durationMs int64) { + stat, ok := m[toolName] + if !ok { + m[toolName] = &ToolTimingStat{ + Min: durationMs, + Max: durationMs, + Sum: durationMs, + Count: 1, + } + return + } + if durationMs < stat.Min { + stat.Min = durationMs + } + if durationMs > stat.Max { + stat.Max = durationMs + } + stat.Sum += durationMs + stat.Count++ +} + +// SlowThreshold returns the duration after which a tool call is considered +// abnormally slow. Uses adaptive threshold if enough samples exist, +// otherwise falls back to the default. +func (m ToolTimingMap) SlowThreshold(toolName string) time.Duration { + stat, ok := m[toolName] + if !ok || stat.Count < minTimingSamples { + return defaultSlowToolThreshold + } + threshold := time.Duration(float64(stat.Max)*toolTimingMultiplier) * time.Millisecond + // Never go below the default — short tools shouldn't trigger on tiny spikes. + if threshold < defaultSlowToolThreshold { + return defaultSlowToolThreshold + } + return threshold +} + +// StartSlowTimer starts a timer that emits a tool_slow activity event if the +// tool call exceeds the adaptive threshold. Returns a stop function that MUST +// be called after tool execution to cancel the timer. +// If enabled is false, returns a no-op stop function (no timer started). +func (m ToolTimingMap) StartSlowTimer(toolName, agentID, runID string, enabled bool, emitRun func(AgentEvent)) func() { + if !enabled { + return func() {} + } + threshold := m.SlowThreshold(toolName) + timer := time.AfterFunc(threshold, func() { + slog.Warn("tool.slow", "agent", agentID, "tool", toolName, "threshold_ms", threshold.Milliseconds()) + emitRun(AgentEvent{ + Type: protocol.AgentEventActivity, + AgentID: agentID, + RunID: runID, + Payload: map[string]any{ + "phase": "tool_slow", + "tool": toolName, + "threshold_ms": threshold.Milliseconds(), + }, + }) + }) + return func() { timer.Stop() } +} + diff --git a/internal/store/pg/sessions.go b/internal/store/pg/sessions.go index d4414db2..04efb579 100644 --- a/internal/store/pg/sessions.go +++ b/internal/store/pg/sessions.go @@ -142,6 +142,17 @@ func (s *PGSessionStore) SetLabel(key, label string) { } } +func (s *PGSessionStore) GetSessionMetadata(key string) map[string]string { + s.mu.RLock() + defer s.mu.RUnlock() + if data, ok := s.cache[key]; ok && data.Metadata != nil { + out := make(map[string]string, len(data.Metadata)) + maps.Copy(out, data.Metadata) + return out + } + return nil +} + func (s *PGSessionStore) SetSessionMetadata(key string, metadata map[string]string) { s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/store/session_store.go b/internal/store/session_store.go index caa8da61..9b4d08e1 100644 --- a/internal/store/session_store.go +++ b/internal/store/session_store.go @@ -97,6 +97,7 @@ type SessionStore interface { GetCompactionCount(key string) int GetMemoryFlushCompactionCount(key string) int SetMemoryFlushDone(key string) + GetSessionMetadata(key string) map[string]string SetSessionMetadata(key string, metadata map[string]string) SetSpawnInfo(key, spawnedBy string, depth int) SetContextWindow(key string, cw int) diff --git a/internal/tools/team_notify_config.go b/internal/tools/team_notify_config.go index 0bae7806..089bf28f 100644 --- a/internal/tools/team_notify_config.go +++ b/internal/tools/team_notify_config.go @@ -8,6 +8,7 @@ type TeamNotifyConfig struct { Progress bool `json:"progress"` // member updates progress Failed bool `json:"failed"` // task failed Completed bool `json:"completed"` // task completed + SlowTool bool `json:"slow_tool"` // system alert when tool call exceeds adaptive threshold (always direct, never through leader) Mode string `json:"mode"` // "direct" (outbound) or "leader" (through leader agent) } @@ -18,6 +19,7 @@ func DefaultTeamNotifyConfig() TeamNotifyConfig { Progress: true, Failed: true, Completed: true, + SlowTool: true, Mode: "direct", } } @@ -35,6 +37,7 @@ func ParseTeamNotifyConfig(settings json.RawMessage) TeamNotifyConfig { Progress *bool `json:"progress"` Failed *bool `json:"failed"` Completed *bool `json:"completed"` + SlowTool *bool `json:"slow_tool"` Mode string `json:"mode"` } `json:"notifications"` } @@ -54,6 +57,9 @@ func ParseTeamNotifyConfig(settings json.RawMessage) TeamNotifyConfig { if n.Completed != nil { cfg.Completed = *n.Completed } + if n.SlowTool != nil { + cfg.SlowTool = *n.SlowTool + } if n.Mode == "leader" { cfg.Mode = "leader" } diff --git a/ui/web/src/i18n/locales/en/teams.json b/ui/web/src/i18n/locales/en/teams.json index ecf328f3..f81cadf4 100644 --- a/ui/web/src/i18n/locales/en/teams.json +++ b/ui/web/src/i18n/locales/en/teams.json @@ -189,6 +189,8 @@ "notifyProgressHint": "Notify when a member updates task progress.", "notifyFailed": "Task failed", "notifyFailedHint": "Notify when a task fails.", + "notifySlowTool": "Slow tool alert", + "notifySlowToolHint": "System alert when a tool call takes abnormally long. Always direct — never through leader.", "notifyMode": "Delivery mode", "notifyModeHint": "How notifications are delivered to the chat channel.", "notifyModeDirect": "Direct", diff --git a/ui/web/src/i18n/locales/vi/teams.json b/ui/web/src/i18n/locales/vi/teams.json index 9aa61690..13d089fb 100644 --- a/ui/web/src/i18n/locales/vi/teams.json +++ b/ui/web/src/i18n/locales/vi/teams.json @@ -189,6 +189,8 @@ "notifyProgressHint": "Thông báo khi thành viên cập nhật tiến trình.", "notifyFailed": "Thất bại", "notifyFailedHint": "Thông báo khi nhiệm vụ thất bại.", + "notifySlowTool": "Cảnh báo tool chạy lâu", + "notifySlowToolHint": "Thông báo hệ thống khi tool chạy lâu bất thường. Luôn gửi trực tiếp — không qua leader.", "notifyMode": "Chế độ gửi", "notifyModeHint": "Cách thông báo được gửi đến kênh chat.", "notifyModeDirect": "Trực tiếp", diff --git a/ui/web/src/i18n/locales/zh/teams.json b/ui/web/src/i18n/locales/zh/teams.json index 7a741c6b..dfa6ef3d 100644 --- a/ui/web/src/i18n/locales/zh/teams.json +++ b/ui/web/src/i18n/locales/zh/teams.json @@ -189,6 +189,8 @@ "notifyProgressHint": "成员更新进度时通知。", "notifyFailed": "任务失败", "notifyFailedHint": "任务失败时通知。", + "notifySlowTool": "工具慢速警报", + "notifySlowToolHint": "工具调用异常耗时时发送系统警报。始终直接发送,不通过 leader。", "notifyMode": "发送方式", "notifyModeHint": "通知如何发送到聊天频道。", "notifyModeDirect": "直接发送", diff --git a/ui/web/src/pages/teams/team-settings-tab.tsx b/ui/web/src/pages/teams/team-settings-tab.tsx index 582ef11c..9c125ba2 100644 --- a/ui/web/src/pages/teams/team-settings-tab.tsx +++ b/ui/web/src/pages/teams/team-settings-tab.tsx @@ -75,6 +75,7 @@ export function TeamSettingsTab({ teamId, team, onSaved }: TeamSettingsTabProps) const [notifyDispatched, setNotifyDispatched] = useState(initNotify.dispatched ?? true); const [notifyProgress, setNotifyProgress] = useState(initNotify.progress ?? true); const [notifyFailed, setNotifyFailed] = useState(initNotify.failed ?? true); + const [notifySlowTool, setNotifySlowTool] = useState(initNotify.slow_tool ?? true); const [notifyMode, setNotifyMode] = useState<"direct" | "leader">(initNotify.mode ?? "direct"); const [escalationMode, setEscalationMode] = useState(initial.escalation_mode ?? ""); const [escalationActions, setEscalationActions] = useState(initial.escalation_actions ?? []); @@ -105,6 +106,7 @@ export function TeamSettingsTab({ teamId, team, onSaved }: TeamSettingsTabProps) setNotifyDispatched(sn.dispatched ?? true); setNotifyProgress(sn.progress ?? true); setNotifyFailed(sn.failed ?? true); + setNotifySlowTool(sn.slow_tool ?? true); setNotifyMode(sn.mode ?? "direct"); setEscalationMode(s.escalation_mode ?? ""); setEscalationActions(s.escalation_actions ?? []); @@ -129,6 +131,7 @@ export function TeamSettingsTab({ teamId, team, onSaved }: TeamSettingsTabProps) dispatched: notifyDispatched, progress: notifyProgress, failed: notifyFailed, + slow_tool: notifySlowTool, mode: notifyMode, }; if (escalationMode) { @@ -148,7 +151,7 @@ export function TeamSettingsTab({ teamId, team, onSaved }: TeamSettingsTabProps) } finally { setSaving(false); } - }, [teamId, version, allowUserIds, denyUserIds, allowChannels, denyChannels, notifyDispatched, notifyProgress, notifyFailed, notifyMode, escalationMode, escalationActions, followupInterval, followupMaxReminders, workspaceScope, updateTeamSettings, onSaved, t]); + }, [teamId, version, allowUserIds, denyUserIds, allowChannels, denyChannels, notifyDispatched, notifyProgress, notifyFailed, notifySlowTool, notifyMode, escalationMode, escalationActions, followupInterval, followupMaxReminders, workspaceScope, updateTeamSettings, onSaved, t]); const userOptions = knownUsers.map((u) => ({ value: u, label: u })); const channelOptions = CHANNEL_TYPES.map((c) => ({ value: c.value, label: c.label })); @@ -235,6 +238,13 @@ export function TeamSettingsTab({ teamId, team, onSaved }: TeamSettingsTabProps) +
+
+ {t("settings.notifySlowTool")} +

{t("settings.notifySlowToolHint")}

+
+ +
{t("settings.notifyMode")}
diff --git a/ui/web/src/types/team.ts b/ui/web/src/types/team.ts index 7e979a7a..595cb649 100644 --- a/ui/web/src/types/team.ts +++ b/ui/web/src/types/team.ts @@ -9,6 +9,7 @@ export interface TeamNotifyConfig { dispatched?: boolean; progress?: boolean; failed?: boolean; + slow_tool?: boolean; mode?: "direct" | "leader"; }