mirror of
https://github.com/tiennm99/goclaw.git
synced 2026-06-10 04:10:26 +00:00
014f74ec15
Two fixes: 1. Remove assistant prefill from team task reminders. The injected [user]+[assistant]+[user] pattern caused LLMs to treat the canned ack as "turn complete", returning NO_REPLY for every user message in group sessions with active tasks. Reminders are now merged into the user message as prefix tags. 2. Add PeerKind propagation to team notification routing. TaskTicker and progress notifications were missing PeerKind on InboundMessage, causing them to route to phantom DM sessions instead of the correct group session. PeerKind is now carried through event payloads, notify queue metadata, and all inbound message publications.
397 lines
12 KiB
Go
397 lines
12 KiB
Go
package tasks
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"github.com/nextlevelbuilder/goclaw/internal/bus"
|
|
"github.com/nextlevelbuilder/goclaw/internal/store"
|
|
"github.com/nextlevelbuilder/goclaw/internal/tools"
|
|
"github.com/nextlevelbuilder/goclaw/pkg/protocol"
|
|
)
|
|
|
|
const (
|
|
defaultRecoveryInterval = 5 * time.Minute
|
|
defaultStaleThreshold = 2 * time.Hour
|
|
defaultInReviewThreshold = 4 * time.Hour
|
|
followupCooldown = 5 * time.Minute
|
|
defaultFollowupInterval = 30 * time.Minute
|
|
)
|
|
|
|
// TaskTicker periodically recovers stale tasks and re-dispatches pending work.
|
|
// All recovery/stale/followup queries are batched across v2 active teams (single SQL each).
|
|
type TaskTicker struct {
|
|
teams store.TeamStore
|
|
agents store.AgentStore
|
|
msgBus *bus.MessageBus
|
|
interval time.Duration
|
|
|
|
stopCh chan struct{}
|
|
wg sync.WaitGroup
|
|
|
|
mu sync.Mutex
|
|
lastFollowupSent map[uuid.UUID]time.Time // taskID → last followup sent time
|
|
}
|
|
|
|
func NewTaskTicker(teams store.TeamStore, agents store.AgentStore, msgBus *bus.MessageBus, intervalSec int) *TaskTicker {
|
|
interval := defaultRecoveryInterval
|
|
if intervalSec > 0 {
|
|
interval = time.Duration(intervalSec) * time.Second
|
|
}
|
|
return &TaskTicker{
|
|
teams: teams,
|
|
agents: agents,
|
|
msgBus: msgBus,
|
|
interval: interval,
|
|
stopCh: make(chan struct{}),
|
|
lastFollowupSent: make(map[uuid.UUID]time.Time),
|
|
}
|
|
}
|
|
|
|
// Start launches the background recovery loop.
|
|
func (t *TaskTicker) Start() {
|
|
t.wg.Add(1)
|
|
go t.loop()
|
|
slog.Info("task ticker started", "interval", t.interval)
|
|
}
|
|
|
|
// Stop signals the ticker to stop and waits for completion.
|
|
func (t *TaskTicker) Stop() {
|
|
close(t.stopCh)
|
|
t.wg.Wait()
|
|
slog.Info("task ticker stopped")
|
|
}
|
|
|
|
func (t *TaskTicker) loop() {
|
|
defer t.wg.Done()
|
|
|
|
// On startup: force-recover ALL in_progress tasks (lock may not be expired yet,
|
|
// but no agent is running after a restart).
|
|
t.recoverAll(true)
|
|
|
|
ticker := time.NewTicker(t.interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-t.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
// Periodic: only recover tasks with expired locks.
|
|
t.recoverAll(false)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *TaskTicker) recoverAll(forceRecover bool) {
|
|
// Step 1: Batch followups with own timeout (before recovery — recovery resets
|
|
// in_progress→pending, which would make followup tasks invisible since followup
|
|
// queries status='in_progress').
|
|
followupCtx, followupCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
t.processFollowups(followupCtx)
|
|
followupCancel()
|
|
|
|
// Step 2: Batch recovery — single query across all v2 active teams.
|
|
// Separate timeout so followup duration doesn't eat into recovery budget.
|
|
recoverCtx, recoverCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer recoverCancel()
|
|
|
|
var recovered []store.RecoveredTaskInfo
|
|
var err error
|
|
if forceRecover {
|
|
recovered, err = t.teams.ForceRecoverAllTasks(recoverCtx)
|
|
} else {
|
|
recovered, err = t.teams.RecoverAllStaleTasks(recoverCtx)
|
|
}
|
|
if err != nil {
|
|
slog.Warn("task_ticker: batch recovery", "force", forceRecover, "error", err)
|
|
}
|
|
if len(recovered) > 0 {
|
|
slog.Info("task_ticker: recovered tasks", "count", len(recovered), "force", forceRecover)
|
|
t.notifyLeaders(recoverCtx, recovered, "recovered (lock expired)",
|
|
"These tasks were reset to pending because the assigned agent stopped responding.\n"+
|
|
"To re-dispatch: use team_tasks(action=\"retry\", task_id=\"<task_id>\") for each task above.\n"+
|
|
"To cancel: use team_tasks(action=\"update\", task_id=\"<task_id>\", status=\"cancelled\").\n"+
|
|
"To view all tasks: use team_tasks(action=\"list\").")
|
|
}
|
|
|
|
// Step 3: Batch mark stale — pending tasks older than 2h.
|
|
staleThreshold := time.Now().Add(-defaultStaleThreshold)
|
|
stale, err := t.teams.MarkAllStaleTasks(recoverCtx, staleThreshold)
|
|
if err != nil {
|
|
slog.Warn("task_ticker: batch mark stale", "error", err)
|
|
}
|
|
if len(stale) > 0 {
|
|
slog.Info("task_ticker: marked stale", "count", len(stale))
|
|
t.notifyLeaders(recoverCtx, stale, "marked stale (no progress for 2+ hours)",
|
|
"These tasks have been pending too long without being picked up.\n"+
|
|
"To re-dispatch: use team_tasks(action=\"retry\", task_id=\"<task_id>\").\n"+
|
|
"To cancel: use team_tasks(action=\"update\", task_id=\"<task_id>\", status=\"cancelled\").\n"+
|
|
"To view current board: use team_tasks(action=\"list\").")
|
|
t.broadcastStaleEvents(recoverCtx, stale)
|
|
}
|
|
|
|
// Step 4: Mark in_review tasks stale after 4 hours.
|
|
inReviewThreshold := time.Now().Add(-defaultInReviewThreshold)
|
|
staleReview, err := t.teams.MarkInReviewStaleTasks(recoverCtx, inReviewThreshold)
|
|
if err != nil {
|
|
slog.Warn("task_ticker: batch mark in_review stale", "error", err)
|
|
}
|
|
if len(staleReview) > 0 {
|
|
slog.Info("task_ticker: marked in_review stale", "count", len(staleReview))
|
|
t.notifyLeaders(recoverCtx, staleReview, "in review too long (4+ hours) — marked stale",
|
|
"These tasks have been waiting for approval too long.\n"+
|
|
"To approve: use team_tasks(action=\"approve\", task_id=\"<task_id>\").\n"+
|
|
"To reject: use team_tasks(action=\"reject\", task_id=\"<task_id>\", text=\"reason\").\n"+
|
|
"To retry: use team_tasks(action=\"retry\", task_id=\"<task_id>\").")
|
|
t.broadcastStaleEvents(recoverCtx, staleReview)
|
|
}
|
|
|
|
// Step 5: Fix orphaned blocked tasks where all blockers are terminal.
|
|
fixed, err := t.teams.FixOrphanedBlockedTasks(recoverCtx)
|
|
if err != nil {
|
|
slog.Warn("task_ticker: fix orphaned blocked tasks", "error", err)
|
|
}
|
|
if len(fixed) > 0 {
|
|
slog.Info("task_ticker: auto-unblocked orphaned tasks", "count", len(fixed))
|
|
t.notifyLeaders(recoverCtx, fixed, "auto-unblocked (all blockers resolved)",
|
|
"These blocked tasks were automatically unblocked because all their dependencies completed.\n"+
|
|
"They are now pending and will be dispatched if assigned.")
|
|
}
|
|
|
|
// Step 6: Prune old cooldown entries to prevent memory leak.
|
|
t.pruneCooldowns()
|
|
}
|
|
|
|
// ============================================================
|
|
// Leader notifications (batched per scope)
|
|
// ============================================================
|
|
|
|
type taskScope struct {
|
|
TeamID uuid.UUID
|
|
TenantID uuid.UUID
|
|
Channel string // from task's origin channel
|
|
ChatID string
|
|
}
|
|
|
|
// notifyLeaders sends a batched system message per (teamID, channel, chatID) scope to the leader.
|
|
func (t *TaskTicker) notifyLeaders(ctx context.Context, tasks []store.RecoveredTaskInfo, action, hint string) {
|
|
if t.msgBus == nil {
|
|
return
|
|
}
|
|
|
|
// Group by (team_id, channel, chat_id) → one message per scope.
|
|
byScope := map[taskScope][]store.RecoveredTaskInfo{}
|
|
for _, task := range tasks {
|
|
key := taskScope{TeamID: task.TeamID, TenantID: task.TenantID, Channel: task.Channel, ChatID: task.ChatID}
|
|
byScope[key] = append(byScope[key], task)
|
|
}
|
|
|
|
// Cache team+lead lookups (same team may have multiple scopes).
|
|
teamCache := map[uuid.UUID]*store.TeamData{}
|
|
leadCache := map[uuid.UUID]*store.AgentData{}
|
|
|
|
for scope, scopeTasks := range byScope {
|
|
team := teamCache[scope.TeamID]
|
|
if team == nil {
|
|
var err error
|
|
team, err = t.teams.GetTeam(ctx, scope.TeamID)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
teamCache[scope.TeamID] = team
|
|
}
|
|
lead := leadCache[team.LeadAgentID]
|
|
if lead == nil {
|
|
var err error
|
|
lead, err = t.agents.GetByID(ctx, team.LeadAgentID)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
leadCache[team.LeadAgentID] = lead
|
|
}
|
|
|
|
// Build batched task list with clear actionable hints.
|
|
var lines []string
|
|
for _, task := range scopeTasks {
|
|
lines = append(lines, fmt.Sprintf(" - Task #%d (id: %s): %s",
|
|
task.TaskNumber, task.ID, task.Subject))
|
|
}
|
|
content := fmt.Sprintf("[System] %d task(s) %s:\n%s\n\n%s",
|
|
len(scopeTasks), action, strings.Join(lines, "\n"), hint)
|
|
|
|
// Route using task's channel directly (from RETURNING); fallback to dashboard.
|
|
channel := scope.Channel
|
|
chatID := scope.ChatID
|
|
if channel == "" || channel == "system" || channel == "teammate" {
|
|
channel = "dashboard"
|
|
chatID = scope.TeamID.String()
|
|
}
|
|
|
|
// Resolve PeerKind from first task's metadata for correct session routing (#266).
|
|
var peerKind string
|
|
if fullTask, err := t.teams.GetTask(ctx, scopeTasks[0].ID); err == nil && fullTask != nil && fullTask.Metadata != nil {
|
|
if pk, ok := fullTask.Metadata["peer_kind"].(string); ok {
|
|
peerKind = pk
|
|
}
|
|
}
|
|
|
|
if !t.msgBus.TryPublishInbound(bus.InboundMessage{
|
|
Channel: channel,
|
|
SenderID: "ticker:system",
|
|
ChatID: chatID,
|
|
AgentID: lead.AgentKey,
|
|
UserID: team.CreatedBy,
|
|
PeerKind: peerKind,
|
|
TenantID: scope.TenantID,
|
|
Content: content,
|
|
}) {
|
|
slog.Warn("task_ticker: inbound buffer full, notification dropped",
|
|
"team_id", scope.TeamID, "scope_chat", scope.ChatID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// broadcastStaleEvents sends UI broadcast events per team (for dashboard real-time updates).
|
|
func (t *TaskTicker) broadcastStaleEvents(ctx context.Context, tasks []store.RecoveredTaskInfo) {
|
|
if t.msgBus == nil {
|
|
return
|
|
}
|
|
// Deduplicate by team_id — one event per team.
|
|
seen := map[uuid.UUID]bool{}
|
|
for _, task := range tasks {
|
|
if seen[task.TeamID] {
|
|
continue
|
|
}
|
|
seen[task.TeamID] = true
|
|
bus.BroadcastForTenant(t.msgBus, protocol.EventTeamTaskStale, task.TenantID, tools.BuildTaskEventPayload(
|
|
task.TeamID.String(), "",
|
|
store.TeamTaskStatusStale,
|
|
"system", "task_ticker",
|
|
))
|
|
}
|
|
}
|
|
|
|
// ============================================================
|
|
// Follow-up reminders (batch)
|
|
// ============================================================
|
|
|
|
func (t *TaskTicker) processFollowups(ctx context.Context) {
|
|
tasks, err := t.teams.ListAllFollowupDueTasks(ctx)
|
|
if err != nil {
|
|
slog.Warn("task_ticker: list all followup tasks", "error", err)
|
|
return
|
|
}
|
|
if len(tasks) == 0 {
|
|
return
|
|
}
|
|
|
|
// Group by team_id for per-team interval resolution.
|
|
byTeam := map[uuid.UUID][]store.TeamTaskData{}
|
|
for _, task := range tasks {
|
|
byTeam[task.TeamID] = append(byTeam[task.TeamID], task)
|
|
}
|
|
for teamID, teamTasks := range byTeam {
|
|
team, err := t.teams.GetTeam(ctx, teamID)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
interval := followupInterval(*team)
|
|
t.processTeamFollowups(ctx, teamTasks, interval)
|
|
}
|
|
}
|
|
|
|
// processTeamFollowups sends follow-up reminders for a batch of tasks sharing the same team.
|
|
func (t *TaskTicker) processTeamFollowups(ctx context.Context, tasks []store.TeamTaskData, interval time.Duration) {
|
|
now := time.Now()
|
|
|
|
for i := range tasks {
|
|
task := &tasks[i]
|
|
|
|
// Cooldown: don't send more often than followupCooldown.
|
|
t.mu.Lock()
|
|
lastSent, exists := t.lastFollowupSent[task.ID]
|
|
t.mu.Unlock()
|
|
if exists && now.Sub(lastSent) < followupCooldown {
|
|
continue
|
|
}
|
|
|
|
if task.FollowupChannel == "" || task.FollowupChatID == "" {
|
|
continue
|
|
}
|
|
|
|
// Format reminder message.
|
|
countLabel := fmt.Sprintf("%d", task.FollowupCount+1)
|
|
if task.FollowupMax > 0 {
|
|
countLabel = fmt.Sprintf("%d/%d", task.FollowupCount+1, task.FollowupMax)
|
|
}
|
|
content := fmt.Sprintf("Reminder (%s): %s", countLabel, task.FollowupMessage)
|
|
|
|
if !t.msgBus.TryPublishOutbound(bus.OutboundMessage{
|
|
Channel: task.FollowupChannel,
|
|
ChatID: task.FollowupChatID,
|
|
Content: content,
|
|
}) {
|
|
slog.Warn("task_ticker: outbound buffer full, skipping followup", "task_id", task.ID)
|
|
continue
|
|
}
|
|
|
|
// Compute next followup_at.
|
|
newCount := task.FollowupCount + 1
|
|
var nextAt *time.Time
|
|
if task.FollowupMax == 0 || newCount < task.FollowupMax {
|
|
next := now.Add(interval)
|
|
nextAt = &next
|
|
}
|
|
// nextAt = nil when max reached → stops future reminders.
|
|
|
|
if err := t.teams.IncrementFollowupCount(ctx, task.ID, nextAt); err != nil {
|
|
slog.Warn("task_ticker: increment followup count", "task_id", task.ID, "error", err)
|
|
}
|
|
|
|
t.mu.Lock()
|
|
t.lastFollowupSent[task.ID] = now
|
|
t.mu.Unlock()
|
|
|
|
slog.Info("task_ticker: sent followup reminder",
|
|
"task_id", task.ID,
|
|
"task_number", task.TaskNumber,
|
|
"count", newCount,
|
|
"channel", task.FollowupChannel,
|
|
"team_id", task.TeamID,
|
|
)
|
|
}
|
|
}
|
|
|
|
// followupInterval parses the team's followup_interval_minutes setting.
|
|
func followupInterval(team store.TeamData) time.Duration {
|
|
if team.Settings != nil {
|
|
var settings map[string]any
|
|
if json.Unmarshal(team.Settings, &settings) == nil {
|
|
if v, ok := settings["followup_interval_minutes"].(float64); ok && v > 0 {
|
|
return time.Duration(int(v)) * time.Minute
|
|
}
|
|
}
|
|
}
|
|
return defaultFollowupInterval
|
|
}
|
|
|
|
func (t *TaskTicker) pruneCooldowns() {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
for id, ts := range t.lastFollowupSent {
|
|
if now.Sub(ts) > 2*followupCooldown {
|
|
delete(t.lastFollowupSent, id)
|
|
}
|
|
}
|
|
}
|