Files
goclaw/cmd/gateway_managed.go
viettranx 39bb90bd60 refactor(permissions): remove auto-add file writer, add config type constants
- Remove auto-add logic that granted file_writer permission to the first
  group/guild member who chatted with the bot
- Add ConfigTypeFileWriter and ConfigTypeHeartbeat constants, replace all
  hardcoded config_type strings across callers
- Add bootstrap exception: /addwriter and !addwriter allow first writer
  to be added when no writers exist yet
- Optimize writer commands: reuse cached ListFileWriters result for both
  permission check and last-writer guard, reducing DB queries per command
- Add freshness directive to file writer system prompt so bot prioritizes
  current list over stale references in conversation history
2026-04-01 13:59:56 +07:00

605 lines
21 KiB
Go

package cmd
import (
"context"
"encoding/json"
"log/slog"
"path/filepath"
"github.com/google/uuid"
"strings"
"github.com/nextlevelbuilder/goclaw/internal/agent"
"github.com/nextlevelbuilder/goclaw/internal/bus"
"github.com/nextlevelbuilder/goclaw/internal/config"
"github.com/nextlevelbuilder/goclaw/internal/edition"
httpapi "github.com/nextlevelbuilder/goclaw/internal/http"
kg "github.com/nextlevelbuilder/goclaw/internal/knowledgegraph"
mcpbridge "github.com/nextlevelbuilder/goclaw/internal/mcp"
"github.com/nextlevelbuilder/goclaw/internal/media"
"github.com/nextlevelbuilder/goclaw/internal/providers"
"github.com/nextlevelbuilder/goclaw/internal/sandbox"
"github.com/nextlevelbuilder/goclaw/internal/skills"
"github.com/nextlevelbuilder/goclaw/internal/store"
"github.com/nextlevelbuilder/goclaw/internal/store/pg"
"github.com/nextlevelbuilder/goclaw/internal/tools"
"github.com/nextlevelbuilder/goclaw/internal/tracing"
"github.com/nextlevelbuilder/goclaw/pkg/protocol"
)
// wireExtras wires components that require PG stores:
// agent resolver (lazy-creates Loops from DB), virtual FS interceptors, memory tools,
// and cache invalidation event subscribers.
// PG store creation and tracing are handled in gateway.go before this is called.
// Returns the ContextFileInterceptor so callers can pass it to AgentsMethods
// for immediate cache invalidation on agents.files.set.
func wireExtras(
stores *store.Stores,
agentRouter *agent.Router,
providerReg *providers.Registry,
msgBus *bus.MessageBus,
sessStore store.SessionStore,
toolsReg *tools.Registry,
toolPE *tools.PolicyEngine,
skillsLoader *skills.Loader,
hasMemory bool,
traceCollector *tracing.Collector,
workspace string,
injectionAction string,
appCfg *config.Config,
sandboxMgr sandbox.Manager,
redisClient any, // nil when built without -tags redis or when Redis is unconfigured
) (*tools.ContextFileInterceptor, *mcpbridge.Pool, *media.Store, tools.PostTurnProcessor) {
// 1. Build cache instances (in-memory or Redis depending on build tags)
agentCtxCache, userCtxCache := makeCaches(redisClient)
// 1a. Context file interceptor (created before resolver so callbacks can reference it)
var contextFileInterceptor *tools.ContextFileInterceptor
if stores.Agents != nil {
contextFileInterceptor = tools.NewContextFileInterceptor(stores.Agents, workspace, agentCtxCache, userCtxCache)
}
// 1c. Persistent media storage for cross-turn image/document access
mediaStore, err := media.NewStore(filepath.Join(workspace, ".media"))
if err != nil {
slog.Warn("media store creation failed, images will not persist across turns", "error", err)
}
// Wire media cleanup on session delete.
if mediaStore != nil {
if pgSess, ok := sessStore.(*pg.PGSessionStore); ok {
pgSess.OnDelete = func(sessionKey string) {
_ = mediaStore.DeleteSession(sessionKey)
}
}
// Register media analysis tools (need mediaStore for file access).
toolsReg.Register(tools.NewReadDocumentTool(providerReg, mediaStore))
toolsReg.Register(tools.NewReadAudioTool(providerReg, mediaStore))
toolsReg.Register(tools.NewReadVideoTool(providerReg, mediaStore))
toolsReg.Register(tools.NewCreateVideoTool(providerReg))
slog.Info("media tools registered", "tools", "read_document,read_audio,read_video,create_video")
}
// 1e. Wire secure CLI store into exec tool for credentialed exec
if stores.SecureCLI != nil {
if execTool, ok := toolsReg.Get("exec"); ok {
if et, ok := execTool.(*tools.ExecTool); ok {
et.SetSecureCLIStore(stores.SecureCLI)
}
}
}
// 2. Per-user profile + context file seeding callbacks
var ensureUserProfile agent.EnsureUserProfileFunc
var seedUserFiles agent.SeedUserFilesFunc
if stores.Agents != nil {
ensureUserProfile = buildEnsureUserProfile(stores.Agents)
seedUserFiles = buildSeedUserFiles(stores.Agents)
}
// 3. Context file loader callback: loads per-user context files dynamically
var contextFileLoader agent.ContextFileLoaderFunc
if contextFileInterceptor != nil {
contextFileLoader = buildContextFileLoader(contextFileInterceptor)
}
// 4. Compute global sandbox defaults for resolver
sandboxEnabled := sandboxMgr != nil
sandboxContainerDir := ""
sandboxWorkspaceAccess := ""
if sandboxEnabled {
sbCfg := appCfg.Agents.Defaults.Sandbox
if sbCfg != nil {
resolved := sbCfg.ToSandboxConfig()
sandboxContainerDir = resolved.ContainerWorkdir()
sandboxWorkspaceAccess = string(resolved.WorkspaceAccess)
}
}
// 5. Shared MCP connection pool (eliminates duplicate connections across agents)
var mcpPool *mcpbridge.Pool
if stores.MCP != nil {
mcpPool = mcpbridge.NewPool(mcpbridge.DefaultPoolConfig())
}
// 6. Set up agent resolver: lazy-creates Loops from DB
var skillAccessStore store.SkillAccessStore
if sas, ok := stores.Skills.(store.SkillAccessStore); ok {
skillAccessStore = sas
}
resolver := agent.NewManagedResolver(agent.ResolverDeps{
AgentStore: stores.Agents,
ProviderStore: stores.Providers,
ProviderReg: providerReg,
Bus: msgBus,
Sessions: sessStore,
Tools: toolsReg,
ToolPolicy: toolPE,
Skills: skillsLoader,
SkillAccessStore: skillAccessStore,
HasMemory: hasMemory,
TraceCollector: traceCollector,
EnsureUserProfile: ensureUserProfile,
SeedUserFiles: seedUserFiles,
ContextFileLoader: contextFileLoader,
BootstrapCleanup: buildBootstrapCleanup(stores.Agents),
CacheInvalidate: buildCacheInvalidate(contextFileInterceptor),
InjectionAction: injectionAction,
MaxMessageChars: appCfg.Gateway.MaxMessageChars,
CompactionCfg: appCfg.Agents.Defaults.Compaction,
ContextPruningCfg: appCfg.Agents.Defaults.ContextPruning,
SandboxEnabled: sandboxEnabled,
SandboxContainerDir: sandboxContainerDir,
SandboxWorkspaceAccess: sandboxWorkspaceAccess,
AgentLinkStore: stores.AgentLinks,
TeamStore: stores.Teams,
DataDir: workspace,
SecureCLIStore: stores.SecureCLI,
BuiltinToolStore: stores.BuiltinTools,
MCPStore: stores.MCP,
MCPPool: mcpPool,
ConfigPermStore: stores.ConfigPermissions,
MediaStore: mediaStore,
ModelPricing: appCfg.Telemetry.ModelPricing,
TracingStore: stores.Tracing,
MemoryStore: stores.Memory,
TenantStore: stores.Tenants,
BuiltinToolTenantCfgs: stores.BuiltinToolTenantCfgs,
SkillTenantCfgs: stores.SkillTenantCfgs,
Workspace: workspace,
OnEvent: func(event agent.AgentEvent) {
// Sign /v1/files/ and /v1/media/ URLs in content before delivery.
// Sessions store clean paths; signing happens only at delivery time.
secret := httpapi.FileSigningKey()
switch m := event.Payload.(type) {
case map[string]string:
if c, has := m["content"]; has && strings.Contains(c, "/v1/") {
m["content"] = httpapi.SignFileURLs(c, secret)
}
case map[string]any:
// Sign /v1/ URLs in content text (run.completed payload is map[string]any).
if c, ok := m["content"].(string); ok && strings.Contains(c, "/v1/") {
m["content"] = httpapi.SignFileURLs(c, secret)
}
// Convert media local paths → signed /v1/files/{full_path}?ft=hash
if rawMedia, ok := m["media"].([]agent.MediaResult); ok {
// Clone slice — the original is shared with RunResult.Media;
// mutating in-place corrupts paths for downstream consumers
// (announce queue, outbound channels) that expect local paths.
signed := make([]agent.MediaResult, len(rawMedia))
for i, mr := range rawMedia {
signed[i] = mr
// Use full path so backend resolves directly via os.Stat,
// no findInWorkspace fallback needed.
urlPath := strings.TrimPrefix(filepath.Clean(mr.Path), "/")
url := "/v1/files/" + urlPath
ft := httpapi.SignFileToken(url, secret, httpapi.FileTokenTTL)
signed[i].Path = url + "?ft=" + ft
}
m["media"] = signed
}
}
msgBus.Broadcast(bus.Event{
Name: protocol.EventAgent,
Payload: event,
TenantID: event.TenantID,
})
},
})
agentRouter.SetResolver(resolver)
// Wire virtual FS interceptors: route context + memory file reads/writes to DB.
// Share ONE ContextFileInterceptor instance between read_file and write_file
// so they share the same cache.
// Write-capable tools share a memory interceptor with optional KG extraction hook.
var writeMemIntc *tools.MemoryInterceptor
if stores.Memory != nil {
writeMemIntc = tools.NewMemoryInterceptor(stores.Memory, workspace)
// Hook KG extraction on memory writes if KG store is available
if stores.KnowledgeGraph != nil && stores.BuiltinTools != nil {
writeMemIntc.SetKGExtractFunc(buildKGExtractFunc(stores.KnowledgeGraph, stores.BuiltinTools, providerReg))
}
}
if readTool, ok := toolsReg.Get("read_file"); ok {
if ia, ok := readTool.(tools.InterceptorAware); ok {
if contextFileInterceptor != nil {
ia.SetContextFileInterceptor(contextFileInterceptor)
}
if stores.Memory != nil {
ia.SetMemoryInterceptor(tools.NewMemoryInterceptor(stores.Memory, workspace))
}
}
}
if writeTool, ok := toolsReg.Get("write_file"); ok {
if ia, ok := writeTool.(tools.InterceptorAware); ok {
if contextFileInterceptor != nil {
ia.SetContextFileInterceptor(contextFileInterceptor)
}
if writeMemIntc != nil {
ia.SetMemoryInterceptor(writeMemIntc)
}
}
}
if editTool, ok := toolsReg.Get("edit"); ok {
if ia, ok := editTool.(tools.InterceptorAware); ok {
if contextFileInterceptor != nil {
ia.SetContextFileInterceptor(contextFileInterceptor)
}
if writeMemIntc != nil {
ia.SetMemoryInterceptor(writeMemIntc)
}
}
}
if listTool, ok := toolsReg.Get("list_files"); ok {
if ia, ok := listTool.(tools.InterceptorAware); ok {
if stores.Memory != nil {
ia.SetMemoryInterceptor(tools.NewMemoryInterceptor(stores.Memory, workspace))
}
}
}
// Wire config perm store for file writer permission checks
if stores.ConfigPermissions != nil {
for _, toolName := range []string{"read_file", "write_file", "edit", "cron"} {
if t, ok := toolsReg.Get(toolName); ok {
if cpa, ok := t.(tools.ConfigPermAware); ok {
cpa.SetConfigPermStore(stores.ConfigPermissions)
}
}
}
if contextFileInterceptor != nil {
contextFileInterceptor.SetConfigPermStore(stores.ConfigPermissions)
}
}
// Wire memory store on memory tools (search + get)
if stores.Memory != nil {
if searchTool, ok := toolsReg.Get("memory_search"); ok {
if ms, ok := searchTool.(tools.MemoryStoreAware); ok {
ms.SetMemoryStore(stores.Memory)
}
}
if getTool, ok := toolsReg.Get("memory_get"); ok {
if ms, ok := getTool.(tools.MemoryStoreAware); ok {
ms.SetMemoryStore(stores.Memory)
}
}
slog.Info("memory layering enabled")
}
// Wire knowledge graph store on KG tool + hint in memory_search results
if stores.KnowledgeGraph != nil {
if kgTool, ok := toolsReg.Get("knowledge_graph_search"); ok {
if kgt, ok := kgTool.(*tools.KnowledgeGraphSearchTool); ok {
kgt.SetKGStore(stores.KnowledgeGraph)
}
}
// Enable KG hint in memory_search results
if searchTool, ok := toolsReg.Get("memory_search"); ok {
if mst, ok := searchTool.(*tools.MemorySearchTool); ok {
mst.SetHasKG(true)
}
}
slog.Info("knowledge graph tool wired (Postgres)")
}
// --- Cache invalidation event subscribers ---
// Context file cache: invalidate on agent/context data changes
if contextFileInterceptor != nil {
msgBus.Subscribe(bus.TopicCacheBootstrap, func(event bus.Event) {
if event.Name != protocol.EventCacheInvalidate {
return
}
payload, ok := event.Payload.(bus.CacheInvalidatePayload)
if !ok {
return
}
if payload.Kind == bus.CacheKindBootstrap || payload.Kind == bus.CacheKindAgent {
if payload.Key != "" {
agentID, err := uuid.Parse(payload.Key)
if err == nil {
contextFileInterceptor.InvalidateAgent(agentID)
}
} else {
contextFileInterceptor.InvalidateAll()
}
}
})
}
// Agent router: invalidate Loop cache on agent config changes
msgBus.Subscribe(bus.TopicCacheAgent, func(event bus.Event) {
if event.Name != protocol.EventCacheInvalidate {
return
}
payload, ok := event.Payload.(bus.CacheInvalidatePayload)
if !ok || payload.Kind != bus.CacheKindAgent {
return
}
if payload.Key != "" {
agentRouter.InvalidateAgent(payload.Key)
}
})
// Skills cache: bump version on skill changes
if stores.Skills != nil {
msgBus.Subscribe(bus.TopicCacheSkills, func(event bus.Event) {
if event.Name != protocol.EventCacheInvalidate {
return
}
payload, ok := event.Payload.(bus.CacheInvalidatePayload)
if !ok || payload.Kind != bus.CacheKindSkills {
return
}
stores.Skills.BumpVersion()
})
}
// Skill grants cache: invalidate all agent caches when grants change
msgBus.Subscribe(bus.TopicCacheSkillGrants, func(event bus.Event) {
if event.Name != protocol.EventCacheInvalidate {
return
}
payload, ok := event.Payload.(bus.CacheInvalidatePayload)
if !ok || payload.Kind != bus.CacheKindSkillGrants {
return
}
agentRouter.InvalidateAll()
})
// MCP cache: invalidate all agent caches when MCP servers/grants change
msgBus.Subscribe(bus.TopicCacheMCP, func(event bus.Event) {
if event.Name != protocol.EventCacheInvalidate {
return
}
payload, ok := event.Payload.(bus.CacheInvalidatePayload)
if !ok || payload.Kind != bus.CacheKindMCP {
return
}
agentRouter.InvalidateAll()
})
// Cron cache: invalidate job cache on cron changes
if ci, ok := stores.Cron.(store.CacheInvalidatable); ok {
msgBus.Subscribe(bus.TopicCacheCron, func(event bus.Event) {
if event.Name != protocol.EventCacheInvalidate {
return
}
payload, ok := event.Payload.(bus.CacheInvalidatePayload)
if !ok || payload.Kind != bus.CacheKindCron {
return
}
ci.InvalidateCache()
})
}
// Heartbeat cache: invalidate due cache on config changes
if hi, ok := stores.Heartbeats.(store.CacheInvalidatable); ok {
msgBus.Subscribe(bus.TopicCacheHeartbeat, func(event bus.Event) {
if event.Name != protocol.EventCacheInvalidate {
return
}
payload, ok := event.Payload.(bus.CacheInvalidatePayload)
if !ok || payload.Kind != bus.CacheKindHeartbeat {
return
}
hi.InvalidateCache()
})
}
// Config permissions cache: invalidate on grant/revoke changes
if pi, ok := stores.ConfigPermissions.(store.CacheInvalidatable); ok {
msgBus.Subscribe(bus.TopicCacheConfigPerms, func(event bus.Event) {
if event.Name != protocol.EventCacheInvalidate {
return
}
payload, ok := event.Payload.(bus.CacheInvalidatePayload)
if !ok || payload.Kind != bus.CacheKindConfigPerms {
return
}
pi.InvalidateCache()
})
}
// Builtin tools cache: re-apply disables on settings/enabled changes
if stores.BuiltinTools != nil {
msgBus.Subscribe(bus.TopicCacheBuiltinTools, func(event bus.Event) {
if event.Name != protocol.EventCacheInvalidate {
return
}
payload, ok := event.Payload.(bus.CacheInvalidatePayload)
if !ok || payload.Kind != bus.CacheKindBuiltinTools {
return
}
applyBuiltinToolDisables(context.Background(), stores.BuiltinTools, toolsReg)
agentRouter.InvalidateAll()
})
}
// Register team tools (team_tasks + workspace interceptor) if team store is available.
var postTurn tools.PostTurnProcessor
if stores.Teams != nil && stores.Agents != nil {
teamMgr := tools.NewTeamToolManager(stores.Teams, stores.Agents, msgBus, workspace)
postTurn = teamMgr
var teamPolicy tools.TeamActionPolicy = tools.FullTeamPolicy{}
if !edition.Current().TeamFullMode {
teamPolicy = tools.LiteTeamPolicy{}
}
toolsReg.Register(tools.NewTeamTasksTool(teamMgr, teamPolicy))
// Wire workspace interceptor into write_file so team workspace validation
// and event broadcasting happen transparently via existing file tools.
wsInterceptor := tools.NewWorkspaceInterceptor(teamMgr)
if writeTool, ok := toolsReg.Get("write_file"); ok {
if wia, ok := writeTool.(tools.WorkspaceInterceptorAware); ok {
wia.SetWorkspaceInterceptor(wsInterceptor)
}
}
slog.Info("team tools registered", "workspace", workspace)
// Team cache invalidation via pub/sub
msgBus.Subscribe(bus.TopicCacheTeam, func(event bus.Event) {
if event.Name != protocol.EventCacheInvalidate {
return
}
payload, ok := event.Payload.(bus.CacheInvalidatePayload)
if !ok || payload.Kind != bus.CacheKindTeam {
return
}
teamMgr.InvalidateTeam()
})
// Agent cache invalidation: clear TeamToolManager's agent lookup cache
// when agent data changes (update/delete via WS or HTTP).
msgBus.Subscribe("cache.agent.team_mgr", func(event bus.Event) {
if event.Name != protocol.EventCacheInvalidate {
return
}
payload, ok := event.Payload.(bus.CacheInvalidatePayload)
if !ok || payload.Kind != bus.CacheKindAgent {
return
}
teamMgr.InvalidateAgentCache()
})
slog.Info("team tools registered")
}
// User workspace cache: invalidate per-user workspace path on profile changes
msgBus.Subscribe(bus.TopicCacheUserWorkspace, func(event bus.Event) {
if event.Name != protocol.EventCacheInvalidate {
return
}
payload, ok := event.Payload.(bus.CacheInvalidatePayload)
if !ok || payload.Kind != bus.CacheKindUserWorkspace {
return
}
if payload.Key != "" {
agentRouter.InvalidateUserWorkspace(payload.Key)
}
})
// Provider cache: re-register ACP providers on create/update/delete
msgBus.Subscribe(bus.TopicCacheProvider, func(event bus.Event) {
if event.Name != protocol.EventCacheInvalidate {
return
}
payload, ok := event.Payload.(bus.CacheInvalidatePayload)
if !ok || payload.Kind != bus.CacheKindProvider {
return
}
if payload.Key == "" {
return
}
// Re-register from DB if provider still exists and is ACP type
provCtx := store.WithTenantID(context.Background(), event.TenantID)
p, err := stores.Providers.GetProviderByName(provCtx, payload.Key)
if err != nil {
// Provider was deleted or not found — already unregistered by handler
return
}
if p.ProviderType != store.ProviderACP {
return
}
// Unregister old instance (closes ProcessPool) then re-register
providerReg.Unregister(p.Name)
if p.Enabled {
registerACPFromDB(providerReg, *p)
}
})
slog.Info("resolver + interceptors + cache subscribers wired")
return contextFileInterceptor, mcpPool, mediaStore, postTurn
}
// kgSettings holds KG extraction settings from the builtin_tools table.
type kgSettings struct {
ExtractOnMemoryWrite bool `json:"extract_on_memory_write"`
ExtractionProvider string `json:"extraction_provider"`
ExtractionModel string `json:"extraction_model"`
MinConfidence float64 `json:"min_confidence"`
}
// buildKGExtractFunc returns a callback that extracts entities from memory content.
// Settings are read from the builtin_tools table on each invocation (not cached),
// so changes take effect immediately without restart.
func buildKGExtractFunc(kgStore store.KnowledgeGraphStore, bts store.BuiltinToolStore, providerReg *providers.Registry) tools.KGExtractFunc {
return func(ctx context.Context, agentID, userID, content string) {
slog.Info("kg extract: triggered", "agent", agentID, "user", userID, "content_len", len(content))
// Read settings from DB on each call so admin changes take effect immediately
raw, err := bts.GetSettings(ctx, "knowledge_graph_search")
if err != nil || raw == nil {
slog.Warn("kg extract: no settings found", "error", err)
return
}
var settings kgSettings
if err := json.Unmarshal(raw, &settings); err != nil {
slog.Warn("kg extract: invalid settings", "error", err)
return
}
if !settings.ExtractOnMemoryWrite || settings.ExtractionProvider == "" || settings.ExtractionModel == "" {
return
}
p, err := providerReg.Get(ctx, settings.ExtractionProvider)
if err != nil {
slog.Warn("kg extract: provider not found", "provider", settings.ExtractionProvider, "error", err)
return
}
extractor := kg.NewExtractor(p, settings.ExtractionModel, settings.MinConfidence)
result, err := extractor.Extract(ctx, content)
if err != nil {
slog.Warn("kg extract: extraction failed", "agent", agentID, "error", err)
return
}
if len(result.Entities) == 0 && len(result.Relations) == 0 {
return
}
for i := range result.Entities {
result.Entities[i].AgentID = agentID
result.Entities[i].UserID = userID
}
for i := range result.Relations {
result.Relations[i].AgentID = agentID
result.Relations[i].UserID = userID
}
entityIDs, err := kgStore.IngestExtraction(ctx, agentID, userID, result.Entities, result.Relations)
if err != nil {
slog.Warn("kg extract: ingest failed", "agent", agentID, "error", err)
return
}
slog.Info("kg extract: ingested from memory write", "agent", agentID, "entities", len(result.Entities), "relations", len(result.Relations))
// Run inline dedup on newly upserted entities (best-effort, non-blocking)
if len(entityIDs) > 0 {
merged, flagged, dedupErr := kgStore.DedupAfterExtraction(ctx, agentID, userID, entityIDs)
if dedupErr != nil {
slog.Warn("kg extract: dedup failed", "agent", agentID, "error", dedupErr)
} else if merged > 0 || flagged > 0 {
slog.Info("kg extract: dedup completed", "agent", agentID, "auto_merged", merged, "candidates_flagged", flagged)
}
}
}
}