Files
goclaw/internal/tools/team_notify_queue.go
viettranx 014f74ec15 fix(agent): group session unresponsive during team task execution (#266)
Two fixes:

1. Remove assistant prefill from team task reminders. The injected
   [user]+[assistant]+[user] pattern caused LLMs to treat the canned
   ack as "turn complete", returning NO_REPLY for every user message
   in group sessions with active tasks. Reminders are now merged into
   the user message as prefix tags.

2. Add PeerKind propagation to team notification routing. TaskTicker
   and progress notifications were missing PeerKind on InboundMessage,
   causing them to route to phantom DM sessions instead of the correct
   group session. PeerKind is now carried through event payloads,
   notify queue metadata, and all inbound message publications.
2026-03-30 15:20:01 +07:00

104 lines
2.4 KiB
Go

package tools
import (
"strings"
"sync"
"time"
)
// NotifyRoutingMeta carries routing info for batched team notifications.
type NotifyRoutingMeta struct {
Mode string // "direct" or "leader"
Channel string
ChatID string
UserID string
LeadAgent string // agent key (only used in leader mode)
PeerKind string // "group" or "direct" — routes to correct session (#266)
}
// TeamNotifyQueue batches team task notifications per chat with debounce,
// following the same pattern as AnnounceQueue for subagent results.
type TeamNotifyQueue struct {
mu sync.Mutex
batches map[string]*notifyBatch // key: "teamID:chatID"
debounce time.Duration
cap int // immediate drain threshold
onDrain func(items []string, meta NotifyRoutingMeta)
}
type notifyBatch struct {
items []string
timer *time.Timer
meta NotifyRoutingMeta
}
// NewTeamNotifyQueue creates a notification queue with debounce and drain callback.
func NewTeamNotifyQueue(debounceMs int, onDrain func(items []string, meta NotifyRoutingMeta)) *TeamNotifyQueue {
if debounceMs <= 0 {
debounceMs = 2000
}
return &TeamNotifyQueue{
batches: make(map[string]*notifyBatch),
debounce: time.Duration(debounceMs) * time.Millisecond,
cap: 20,
onDrain: onDrain,
}
}
// Enqueue adds a formatted notification line to the batch for the given key.
// Resets debounce timer. Drains immediately if cap is reached.
func (q *TeamNotifyQueue) Enqueue(key string, content string, meta NotifyRoutingMeta) {
q.mu.Lock()
defer q.mu.Unlock()
b, ok := q.batches[key]
if !ok {
b = &notifyBatch{meta: meta}
q.batches[key] = b
}
b.items = append(b.items, content)
// Immediate drain if cap reached.
if len(b.items) >= q.cap {
if b.timer != nil {
b.timer.Stop()
}
items := b.items
bMeta := b.meta
delete(q.batches, key)
go q.drain(items, bMeta)
return
}
// Reset debounce timer.
if b.timer != nil {
b.timer.Stop()
}
b.timer = time.AfterFunc(q.debounce, func() {
q.mu.Lock()
b, ok := q.batches[key]
if !ok {
q.mu.Unlock()
return
}
items := b.items
bMeta := b.meta
delete(q.batches, key)
q.mu.Unlock()
q.drain(items, bMeta)
})
}
func (q *TeamNotifyQueue) drain(items []string, meta NotifyRoutingMeta) {
if len(items) == 0 || q.onDrain == nil {
return
}
q.onDrain(items, meta)
}
// FormatBatchedNotify joins notification lines into a single message.
func FormatBatchedNotify(items []string) string {
return strings.Join(items, "\n")
}