### Implement Task and VM Lifecycle Management Source: https://github.com/chaitin/monkeycode/blob/main/backend/pkg/lifecycle/README.md Example implementation of creating a task with associated VM and handling the VM ready callback to trigger state transitions. ```go // 创建任务 func (a *TaskUsecase) Create(ctx context.Context, user *domain.User, req domain.CreateTaskReq, token string) (*domain.ProjectTask, error) { // 1. 创建任务记录 pt, err := a.repo.Create(ctx, user, req, token, func(...) { // 2. 创建 VM vm := a.taskflow.VirtualMachiner().Create(...) // 3. 临时存储 CreateTaskReq(10 分钟过期) reqKey := fmt.Sprintf("task:create_req:%s", task.ID) a.redis.Set(ctx, reqKey, createReq, 10*time.Minute) // 4. 初始化状态(Transition 会自动触发 Hook) taskMeta := lifecycle.TaskMetadata{TaskID: t.ID, UserID: user.ID.String()} a.taskLifecycle.Transition(ctx, t.ID, lifecycle.TaskStatePending, taskMeta) vmMeta := lifecycle.VMMetadata{VMID: vm.ID, TaskID: t.ID.String(), UserID: user.ID.String()} a.vmLifecycle.Transition(ctx, vm.ID, lifecycle.VMStatePending, vmMeta) return vm, nil }) return pt, nil } // VM 就绪回调 func (h *InternalHandler) VmReady(vmID string) error { // 转换 VM 状态,自动触发 Hook 链 meta := lifecycle.VMMetadata{...} return h.vmLifecycle.Transition(ctx, vmID, lifecycle.VMStateRunning, meta) } ``` -------------------------------- ### MonkeyCode CLI Example Source: https://github.com/chaitin/monkeycode/blob/main/frontend/public/ppt/index.html Demonstrates how to use the MonkeyCode CLI to initiate a task and add dependencies to a project. ```bash dev@monkey:~/app $ monkey task "开发一个网页版游戏" ✓ 创建云端开发环境 ✓ 读取代码仓库与 README ✓ 计划分成 3 步:地形、交互、预览 修改文件 src/world/chunk.ts +126 -18 · 14.2k tokens · context 70% dev@monkey:~/app $ pnpm add three zustand ✓ 2 packages added in 1.2s ``` -------------------------------- ### Implement Custom Hook Source: https://github.com/chaitin/monkeycode/blob/main/backend/pkg/lifecycle/README.md Example implementation of a hook that performs actions upon state changes. ```go type TaskNotifyHook struct { notify *dispatcher.Dispatcher logger *slog.Logger } func (h *TaskNotifyHook) Name() string { return "task-notify-hook" } func (h *TaskNotifyHook) Priority() int { return 50 } func (h *TaskNotifyHook) Async() bool { return true } func (h *TaskNotifyHook) OnStateChange(ctx context.Context, taskID uuid.UUID, from, to TaskState, meta TaskMetadata) error { // 发送通知逻辑 event := &NotifyEvent{ EventType: consts.NotifyEventTaskCompleted, Payload: map[string]any{"status": to, "task_id": taskID}, } return h.notify.Publish(ctx, event) } ``` -------------------------------- ### Create README for Lifecycle Package Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-18-lifecycle-refactor.md Create a README.md file for the lifecycle package to document its architecture, usage, and examples. This includes defining state types and metadata structures. ```markdown # Lifecycle Package 泛型化的生命周期管理框架,支持自定义状态类型和元数据。 ## 架构 ``` ┌─────────────────────────────────────────┐ │ LifecycleManager[S, M] │ │ - Register(hooks...) │ │ - Transition(ctx, id, to, metadata) │ │ - GetState(ctx, id) │ └─────────────────────────────────────────┘ │ │ 触发 ▼ ┌─────────────────────────────────────────┐ │ Hook Pipeline │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Hook 1 │→│ Hook 2 │→│ Hook 3 │ │ │ │ (sync) │ │ (async) │ │ (async) │ │ │ └─────────┘ └─────────┘ └─────────┘ │ └─────────────────────────────────────────┘ ``` ## 使用示例 ### 定义状态类型 ```go type OrderState string const ( OrderStatePending OrderState = "pending" OrderStatePaid OrderState = "paid" OrderStateShipped OrderState = "shipped" OrderStateDelivered OrderState = "delivered" ) type OrderMetadata struct { OrderID string UserID string Amount float64 } ``` ``` -------------------------------- ### Custom State Transitions Source: https://github.com/chaitin/monkeycode/blob/main/backend/pkg/lifecycle/README.md Example of how to define custom state transitions for a lifecycle using `WithTransitions`. ```APIDOC ## Custom State Transitions ### Order State Example ```go // Order State Example type OrderState string const ( OrderStatePending OrderState = "pending" OrderStatePaid OrderState = "paid" OrderStateShipped OrderState = "shipped" OrderStateDelivered OrderState = "delivered" OrderStateRefunded OrderState = "refunded" ) customTransitions := map[OrderState][]OrderState{ OrderStatePending: {OrderStatePaid}, OrderStatePaid: {OrderStateShipped, OrderStateRefunded}, OrderStateShipped: {OrderStateDelivered}, OrderStateDelivered: {}, } mgr := lifecycle.NewManager[string, OrderState, lifecycle.OrderMetadata]( redisClient, lifecycle.WithTransitions(customTransitions), ) ``` ``` -------------------------------- ### Redis Key Structure Example Source: https://github.com/chaitin/monkeycode/blob/main/backend/pkg/lifecycle/README.md Illustrates the structure of a Redis key for lifecycle tasks, including its type and fields. Metadata is not persisted and is passed via the Transition method. ```redis Key: lifecycle:{id} Type: Hash Fields: - state: 当前状态(string) - from_state: 前一个状态(string) - updated_at: 更新时间戳(毫秒) 示例: lifecycle:task-uuid-123 state: "running" from_state: "pending" updated_at: 1710891245678 ``` -------------------------------- ### Command Line Interface Example Source: https://github.com/chaitin/monkeycode/blob/main/frontend/public/ppt/index.html Example of using the monkey command-line tool to create a task for exporting user data to CSV. ```bash $ monkey task "给后台加一个用户导出 CSV 的接口" ``` -------------------------------- ### Define Test for Latest Round Start Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-04-01-stream-attach-latest-round.md Test case structure for verifying the identification of the latest round start time. ```go func TestFindLatestRoundStart(t *testing.T) { // case 1: 返回最近一个 user-input // case 2: 没有 user-input 时返回任务创建时间 } ``` -------------------------------- ### Typical Use Case: Task + VM Lifecycle Management Source: https://github.com/chaitin/monkeycode/blob/main/backend/pkg/lifecycle/README.md Illustrates a common workflow involving task creation, VM provisioning, and state transitions. ```APIDOC ## Typical Use Case: Task + VM Lifecycle Management ### Full Workflow 1. **TaskUsecase.Create()** - Create task record in DB. - Create VM via taskflow. - Store `CreateTaskReq` in Redis (10 min expiry, key: `task:create_req:{taskID}`). - `taskMgr.Transition(taskID, TaskStatePending, meta)`. - `vmMgr.Transition(vmID, VMStatePending, meta)`. 2. **VM Ready (InternalHandler.VmReady)** - `vmMgr.Transition(vmID, VMStateRunning, meta)`: - Triggers `VMTaskHook` (updates task status to Processing). - Triggers `TaskCreateHook` (reads `CreateTaskReq` from Redis, creates taskflow task, deletes key). - Triggers `VMNotifyHook` (sends VMReady notification). 3. **Task Completion** - `taskMgr.Transition(taskID, TaskStateSucceeded, meta)`: - Triggers `TaskNotifyHook` (sends TaskCompleted notification). ``` -------------------------------- ### Simplify GetProjectTree Implementation Source: https://github.com/chaitin/monkeycode/blob/main/docs/superpowers/specs/2026-03-27-git-platform-interface-abstraction-design.md Demonstrates using the getClient factory to standardize the GetProjectTree method, eliminating the need for platform-specific conversion functions. ```go func (u *ProjectUsecase) GetProjectTree(ctx context.Context, uid uuid.UUID, req *domain.GetProjectTreeReq) (domain.ProjectTree, error) { p, err := u.repo.Get(ctx, uid, req.ID) if err != nil { return nil, err } client, cc, err := u.getClient(p) if err != nil { return nil, err } ref := req.Ref if ref == "" { ref = cc.DefaultBranch } resp, err := client.Tree(ctx, &domain.TreeOptions{ Token: cc.Token, Owner: cc.Owner, Repo: cc.Repo, Ref: ref, Path: req.Path, Recursive: req.Recursive, InstallID: cc.InstallID, }) if err != nil { return nil, errcode.ErrGitOperation.Wrap(err) } // 直接转换 domain.TreeEntry → domain.ProjectTreeEntry,无需平台特定转换函数 return cvt.Iter(resp.Entries, func(_ int, e *domain.TreeEntry) *domain.ProjectTreeEntry { return &domain.ProjectTreeEntry{ Mode: e.Mode, Name: e.Name, Path: e.Path, Sha: e.Sha, Size: e.Size, LastModifiedAt: e.LastModifiedAt, } }), nil } ``` -------------------------------- ### Verify Functionality Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-18-lifecycle-refactor.md Manual verification steps for service startup and task lifecycle updates. ```bash # 启动服务并创建任务,验证: # 1. 任务创建后状态正确 # 2. VM 创建成功后任务状态自动更新 # 3. 通知正确发送 ``` -------------------------------- ### Define Loki Client Helper Function Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-04-01-stream-attach-latest-round.md Interface definition for the helper function to determine the start time of the latest round. ```go func (c *Client) FindLatestRoundStart(ctx context.Context, taskID string, taskCreatedAt, end time.Time) (time.Time, error) ``` -------------------------------- ### 刷新 VM 空闲计时器 Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/specs/2026-03-19-vm-idle-sleep-recycle-design.md 通过 Redis 去抖机制刷新 VM 的休眠、通知和回收计时器。 ```go func (h *HostUsecase) RefreshIdleTimers(ctx context.Context, vmID string, payload *domain.VmIdleInfo) { // 去抖:30 秒内同一 VM 只刷新一次 debounceKey := fmt.Sprintf("vm:idle:debounce:%s", vmID) if ok, _ := h.redis.SetNX(ctx, debounceKey, "1", 30*time.Second).Result(); !ok { return // 30 秒内已刷新过 } now := time.Now() h.vmSleepQueue.Enqueue(ctx, SLEEP_KEY, payload, now.Add(10*time.Minute), vmID) h.vmNotifyQueue.Enqueue(ctx, NOTIFY_KEY, payload, now.Add(7*24*time.Hour - 1*time.Hour), vmID) h.vmRecycleQueue.Enqueue(ctx, RECYCLE_KEY, payload, now.Add(7*24*time.Hour), vmID) } ``` -------------------------------- ### 上报 VM 活动状态 Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/specs/2026-03-19-vm-idle-sleep-recycle-design.md 在 TaskStream 消息处理中调用此方法上报活动时间。 ```go connector.EventReport.ReportVMActivity(vmID, time.Now()) ``` -------------------------------- ### 定义 VM 休眠状态常量 Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/specs/2026-03-19-vm-idle-sleep-recycle-design.md 在 MonkeyCode 和 taskflow 中同步定义休眠状态。 ```go // MonkeyCode: pkg/taskflow/types.go(MonkeyCode 的 taskflow client 包) const ( VirtualMachineStatusSleeping VirtualMachineStatus = "sleeping" ) // taskflow: types/orchestrator.go const ( VirtualMachineStatusSleeping VirtualMachineStatus = "sleeping" ) ``` -------------------------------- ### Refactor Create Method for Task Lifecycle Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-18-lifecycle-refactor.md Demonstrates the refactored Create method in TaskUsecase. It includes host status checks, prompt retrieval, task creation with pending status, VM creation, and state transitions using the new lifecycle managers for both task and VM. ```go func (a *TaskUsecase) Create(ctx context.Context, user *domain.User, req domain.CreateTaskReq, token string) (*domain.ProjectTask, error) { // 1. 检查 Host 在线状态 r, err := a.taskflow.Host().IsOnline(ctx, &taskflow.IsOnlineReq[string]{ IDs: []string{req.HostID}, }) if err != nil { return nil, errcode.ErrHostOffline.Wrap(err) } if !r.OnlineMap[req.HostID] { return nil, errcode.ErrHostOffline } req.Now = time.Now() // 2. 获取系统提示词 if a.taskHook != nil && req.SystemPrompt == "" { if prompt, err := a.taskHook.GetSystemPrompt(ctx, req.Type, req.SubType); err == nil && prompt != "" { req.SystemPrompt = prompt } } var res *db.ProjectTask err = entx.WithTx2(ctx, a.db, func(tx *db.Tx) error { // ... 现有逻辑:获取 Host, Model, Image // 3. 创建任务(状态:pending) tk, err := tx.Task.Create(). SetID(id). SetKind(req.Type). SetSubType(req.SubType). SetContent(req.Content). SetUserID(u.ID). SetStatus(consts.TaskStatusPending). // 初始状态 Save(ctx) if err != nil { return err } // 4. 创建 VM vm, err := a.taskflow.VirtualMachiner().Create(ctx, &taskflow.CreateVirtualMachineReq{ // ... 现有参数 }) if err != nil { return err } if vm == nil { return fmt.Errorf("vm is nil") } // 5. 转换任务状态到 Running(触发 Task Hooks) taskMeta := lifecycle.TaskMetadata{ TaskID: tk.ID.String(), UserID: user.ID.String(), Project: req.Extra.ProjectID.String(), } if err := a.taskLifecycle.Transition(ctx, tk.ID.String(), lifecycle.TaskStateRunning, taskMeta); err != nil { a.logger.WarnContext(ctx, "task lifecycle transition failed", "error", err) } // 6. 转换 VM 状态到 Running(触发 VM Hooks,自动更新任务状态) vmMeta := lifecycle.VMMetadata{ VMID: vm.ID, TaskID: tk.ID.String(), UserID: user.ID.String(), } if err := a.vmLifecycle.Transition(ctx, vm.ID, lifecycle.VMStateRunning, vmMeta); err != nil { a.logger.WarnContext(ctx, "vm lifecycle transition failed", "error", err) } // 7. 创建 MCP 配置并启动 Tasker(保持现有逻辑) mcps := []taskflow.McpServerConfig{...} if err := a.tasker.CreateTask(ctx, t.ID.String(), &domain.TaskSession{...}); err != nil { return err } return nil }) if err != nil { a.logger.With("error", err, "req", req).ErrorContext(ctx, "failed to create task") return nil, err } result := cvt.From(res, &domain.ProjectTask{}) // 8. 通知 TaskHook if a.taskHook != nil { if err := a.taskHook.OnTaskCreated(ctx, result); ``` -------------------------------- ### 实现错误处理逻辑 Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/specs/2026-03-30-control-websocket-design.md 在 taskflow 调用失败时,向前端返回包含错误信息的 call-response 消息。 ```go if err != nil { logger.With("error", err, "kind", m.Kind).WarnContext(ctx, "sync call failed") errData, _ := json.Marshal(map[string]string{"error": err.Error()}) wsConn.WriteJSON(domain.TaskStream{ Type: consts.TaskStreamTypeCallResponse, Data: errData, Kind: m.Kind, Timestamp: time.Now().UnixMilli(), }) return } ``` -------------------------------- ### GET /api/v1/users/tasks/control Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/specs/2026-03-30-control-websocket-design.md Establishes a WebSocket connection for control messages. This endpoint is designed to maintain a persistent connection throughout the user's session, independent of task execution status. ```APIDOC ## GET /api/v1/users/tasks/control ### Description Establishes a WebSocket connection for control messages. This endpoint is designed to maintain a persistent connection throughout the user's session, independent of task execution status. It supports synchronous operations like file browsing and diff viewing. ### Method GET ### Endpoint /api/v1/users/tasks/control ### Parameters #### Query Parameters - **taskId** (uuid) - Required - The unique identifier of the task associated with this control connection. ### Request Body This endpoint does not have a request body. Connection is established via WebSocket upgrade. ### Response #### Success Response (101 Switching Protocols) - The server upgrades the connection to WebSocket. #### Response Example (WebSocket connection established, no HTTP response body) ### WebSocket Message Protocol Messages are exchanged using the `domain.TaskStream` structure. #### Upstream (Frontend to Server) | Type | Kind | Description | |---------------|-------------------|---------------------------------| | `call` | `repo_file_changes` | Query changed file list | | `call` | `repo_file_list` | List directory files | | `call` | `repo_read_file` | Read file content | | `call` | `repo_file_diff` | Get file diff | | `call` | `restart` | Restart task (no response) | #### Downstream (Server to Frontend) | Type | Kind | Description | |-----------------|-------------------|-------------------------------------------------| | `call-response` | `repo_*` series | Response to synchronous requests (except `restart`) | | `call-response` | (error) | Error information from taskflow call | | `ping` | - | Heartbeat message | ``` -------------------------------- ### Compile Backend Project Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-26-task-stop-recycle.md Verify changes by compiling the backend package. ```bash cd /Users/yoko/chaitin/ai/MonkeyCode/backend && go build ./... ``` -------------------------------- ### Register and Unregister Task Aggregator in taskRecv Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/specs/2026-03-28-task-stream-realtime-design.md Registers a new RoundAggregator for a task upon its start and unregisters it upon completion or error. This ensures the connector can manage and flush the correct aggregator. ```go aggregator := NewRoundAggregator(token.TaskID.String(), a.logger, a.loki) a.connector.RegisterTaskAggregator(token.TaskID.String(), aggregator) defer a.connector.UnregisterTaskAggregator(token.TaskID.String()) ``` -------------------------------- ### Define VM Recycle Hook Constants and Struct Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-26-task-stop-recycle.md Defines constants for delay queue keys and the VMRecycleHook struct, which includes dependencies for task flow, Redis, host repository, and various delay queues. This setup is crucial for managing VM lifecycle events. ```go package lifecycle import ( "context" "fmt" "log/slog" "time" "github.com/redis/go-redis/v9" "github.com/samber/do" "github.com/chaitin/MonkeyCode/backend/db" "github.com/chaitin/MonkeyCode/backend/domain" "github.com/chaitin/MonkeyCode/backend/pkg/delayqueue" "github.com/chaitin/MonkeyCode/backend/pkg/entx" "github.com/chaitin/MonkeyCode/backend/pkg/taskflow" ) const ( vmSleepQueueKey = "vm:idle:sleep" vmNotifyQueueKey = "vm:idle:notify" vmRecycleQueueKey = "vm:idle:recycle" vmExpireQueueKey = "vm:expire" ) // VMRecycleHook VM 回收 Hook,负责删除 VM、清理队列和 Redis 键、标记 DB type VMRecycleHook struct { taskflow taskflow.Clienter redis *redis.Client hostRepo domain.HostRepo vmSleepQueue *delayqueue.VMSleepQueue vmNotifyQueue *delayqueue.VMNotifyQueue vmRecycleQueue *delayqueue.VMRecycleQueue vmExpireQueue *delayqueue.VMExpireQueue logger *slog.Logger } ``` -------------------------------- ### Define ClientContext and Factory Method Source: https://github.com/chaitin/monkeycode/blob/main/docs/superpowers/specs/2026-03-27-git-platform-interface-abstraction-design.md Defines the ClientContext structure and the getClient factory method to abstract platform-specific client initialization. ```go // ClientContext 平台客户端上下文 type ClientContext struct { Owner string Repo string DefaultBranch string InstallID int64 Token string } // getClient 根据项目平台返回对应客户端和上下文 func (u *ProjectUsecase) getClient(p *db.Project) (domain.GitPlatformClient, *ClientContext, error) { gi := p.Edges.GitIdentity if gi == nil { return nil, nil, errcode.ErrGitOperation.Wrap(fmt.Errorf("project has no git identity")) } token := gi.AccessToken switch p.Platform { case consts.GitPlatformGithub: parsed, err := giturl.Parse(p.RepoURL) if err != nil { return nil, nil, err } return u.gh, &ClientContext{ Owner: parsed.Owner, Repo: parsed.Repo, DefaultBranch: p.Branch, InstallID: gi.InstallationID, Token: token, }, nil case consts.GitPlatformGitLab: gl := u.getGitlabClientByBaseURL(gi.BaseURL) projectPath, _ := gitlab.ParseProjectPath(p.RepoURL) return gl, &ClientContext{ Owner: projectPath, DefaultBranch: p.Branch, Token: token, }, nil case consts.GitPlatformGitea: owner, repo, _ := gitea.ParseRepoPath(p.RepoURL) return u.gta, &ClientContext{ Owner: owner, Repo: repo, DefaultBranch: p.Branch, Token: token, }, nil default: return nil, nil, errcode.ErrGitOperation.Wrap(fmt.Errorf("unsupported platform: %s", p.Platform)) } } ``` -------------------------------- ### Initialize Manager and Register Hooks Source: https://github.com/chaitin/monkeycode/blob/main/backend/pkg/lifecycle/README.md Create a new lifecycle manager instance and register hooks that will execute during state transitions. ```go import "github.com/chaitin/MonkeyCode/backend/pkg/lifecycle" // 创建 Manager(Task 用 uuid.UUID 作 ID) taskMgr := lifecycle.NewManager[uuid.UUID, TaskState, TaskMetadata]( redisClient, lifecycle.WithTransitions[uuid.UUID, TaskState, TaskMetadata](lifecycle.TaskTransitions()), lifecycle.WithLogger(logger), ) // 注册 Hooks(按优先级自动排序) taskMgr.Register( lifecycle.NewTaskNotifyHook(notifyDispatcher, logger), lifecycle.NewTaskCreateHook(redisClient, taskflowClient, logger), ) ``` -------------------------------- ### Initialize GitLab Client with OAuth Source: https://github.com/chaitin/monkeycode/blob/main/docs/superpowers/specs/2026-03-27-git-platform-interface-abstraction-design.md Instantiate a GitLab client, enabling OAuth support via an option. This is used when authentication requires OAuth tokens. ```go gl := gitlab.NewGitlab(baseURL, token, logger, gitlab.WithOAuth(true)) ``` -------------------------------- ### Run Unit Tests Source: https://github.com/chaitin/monkeycode/blob/main/backend/pkg/lifecycle/README.md Executes all unit tests within the pkg/lifecycle directory. Use the -v flag for verbose output. ```bash go test ./pkg/lifecycle/... -v ``` -------------------------------- ### 提交变更记录 Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-19-vm-idle-sleep-recycle.md 记录相关文件变更的 Git 提交命令。 ```bash git add domain/host.go pkg/taskflow/types.go git commit -m "feat(idle): define VmIdleInfo struct and sleeping status" ``` ```bash git add pkg/delayqueue/vmidlequeue.go git commit -m "feat(idle): add idle queue factory functions" ``` -------------------------------- ### Run Lifecycle Manager Tests Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-18-lifecycle-refactor.md Command to execute all tests within the pkg/lifecycle directory using Go's testing framework. ```bash cd /Users/yoko/chaitin/ai/MonkeyCode/backend go test ./pkg/lifecycle/... -v ``` -------------------------------- ### 定义 VM 活动数据结构 Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/specs/2026-03-19-vm-idle-sleep-recycle-design.md 用于记录 VM 活动信息的结构体。 ```go // types/agent.go type VMActivity struct { VMID string `json:"vm_id"` LastActiveAt time.Time `json:"last_active_at"` } ``` -------------------------------- ### Build and Commit TaskUsecase Changes Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-19-replace-tasker-with-lifecycle.md Verify the build and commit the registration changes. ```bash go build ./... # Expected: PASS ``` ```bash git add biz/task/usecase/task.go git commit -m "refactor(task): register TaskCreateHook in TaskUsecase" ``` -------------------------------- ### 移除 TaskUsecase 中的 CreateTask 调用 Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-19-replace-tasker-with-lifecycle.md 在 Create 方法中移除对 tasker.CreateTask 的调用。 ```go // 删除这段 if err := a.tasker.CreateTask(ctx, t.ID.String(), &domain.TaskSession{...}); err != nil { return nil, err } ``` -------------------------------- ### Initialize and Register Hooks Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-18-lifecycle-refactor.md Create a new LifecycleManager instance and register hooks for state changes. ```go mgr := lifecycle.NewManager[OrderState, OrderMetadata](redis) mgr.Register( &OrderNotifyHook{notify: dispatcher}, &OrderAuditHook{audit: auditor}, ) ``` -------------------------------- ### 实现 TaskCreateHook Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-19-replace-tasker-with-lifecycle.md 创建 TaskCreateHook 以在任务进入 Running 状态时从 Redis 读取请求并创建 taskflow 任务。 ```go package lifecycle import ( "context" "encoding/json" "fmt" "log/slog" "time" "github.com/google/uuid" "github.com/redis/go-redis/v9" "github.com/chaitin/MonkeyCode/backend/pkg/taskflow" ) // TaskCreateHook 在 TaskStateRunning 时从 Redis 读取 CreateTaskReq 并创建 taskflow 任务 type TaskCreateHook struct { redis *redis.Client taskflow taskflow.Clienter logger *slog.Logger } // NewTaskCreateHook 创建 TaskCreateHook func NewTaskCreateHook(redis *redis.Client, taskflow taskflow.Clienter, logger *slog.Logger) *TaskCreateHook { return &TaskCreateHook{ redis: redis, taskflow: taskflow, logger: logger.With("hook", "task-create-hook"), } } func (h *TaskCreateHook) Name() string { return "task-create-hook" } func (h *TaskCreateHook) Priority() int { return 80 } // 介于 VMTaskHook(100) 和 TaskNotifyHook(50) 之间 func (h *TaskCreateHook) Async() bool { return false } // 同步执行 func (h *TaskCreateHook) OnStateChange(ctx context.Context, taskID uuid.UUID, from, to TaskState, metadata TaskMetadata) error { // 只在第一次进入 Running 状态时创建 taskflow 任务 if to != TaskStateRunning { return nil } // 从 Redis 读取 CreateTaskReq reqKey := fmt.Sprintf("task:create_req:%s", taskID.String()) val, err := h.redis.Get(ctx, reqKey).Result() if err == redis.Nil { h.logger.WarnContext(ctx, "CreateTaskReq not found in Redis (may be expired)", "task_id", taskID) return nil } if err != nil { return fmt.Errorf("failed to get CreateTaskReq from Redis: %w", err) } // 删除 key(用完即删) h.redis.Del(ctx, reqKey) // 反序列化 var createReq taskflow.CreateTaskReq if err := json.Unmarshal([]byte(val), &createReq); err != nil { return fmt.Errorf("failed to unmarshal CreateTaskReq: %w", err) } h.logger.InfoContext(ctx, "creating taskflow task", "task_id", taskID) return h.taskflow.TaskManager().Create(ctx, createReq) } ``` -------------------------------- ### 定义 VmIdleInfo 数据结构 Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/specs/2026-03-19-vm-idle-sleep-recycle-design.md 用于存储 VM 空闲检测相关的元数据。 ```go // domain/host.go type VmIdleInfo struct { UID uuid.UUID `json:"uid"` VmID string `json:"vm_id"` HostID string `json:"host_id"` EnvID string `json:"env_id"` TaskID string `json:"task_id"` // 关联的任务 ID,用于通知 Name string `json:"name"` // 任务名称,用于通知内容 } ``` -------------------------------- ### 定义 VM 空闲数据结构与状态 Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-19-vm-idle-sleep-recycle.md 定义用于空闲队列的 payload 结构体以及虚拟机休眠状态常量。 ```go // VmIdleInfo 空闲队列 payload type VmIdleInfo struct { UID uuid.UUID `json:"uid"` VmID string `json:"vm_id"` HostID string `json:"host_id"` EnvID string `json:"env_id"` } ``` ```go const VirtualMachineStatusSleeping VirtualMachineStatus = "sleeping" ``` -------------------------------- ### Write Integration Tests for Lifecycle Hooks Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-18-lifecycle-refactor.md Create integration tests in `pkg/lifecycle/integration_test.go` to verify the functionality of the new lifecycle hooks. These tests cover scenarios like VM state transitions and task notification events. ```go // pkg/lifecycle/integration_test.go package lifecycle import ( "context" "testing" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" ) // TestIntegration_VMToTaskStatus 测试 VM 状态变更自动更新任务状态 func TestIntegration_VMToTaskStatus(t *testing.T) { rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) defer rdb.Close() rdb.FlushAll(context.Background()) // 创建 VM Manager(带 VMTaskHook) taskRepo := &mockTaskRepo{} mgr := NewManager[VMState, VMMetadata](rdb) mgr.Register(&VMTaskHook{taskRepo: taskRepo}) ctx := context.Background() meta := VMMetadata{TaskID: "task-1", UserID: "user-1"} // VM: pending -> running err := mgr.Transition(ctx, "vm-1", VMStateRunning, meta) assert.NoError(t, err) // 验证:任务状态被更新为 Processing assert.Equal(t, consts.TaskStatusProcessing, taskRepo.lastStatus) // VM: running -> succeeded err = mgr.Transition(ctx, "vm-1", VMStateSucceeded, meta) assert.NoError(t, err) // 验证:任务状态被更新为 Finished assert.Equal(t, consts.TaskStatusFinished, taskRepo.lastStatus) } // TestIntegration_TaskNotifications 测试任务状态变更发送通知 func TestIntegration_TaskNotifications(t *testing.T) { rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) defer rdb.Close() rdb.FlushAll(context.Background()) notify := &mockDispatcher{} mgr := NewManager[TaskState, TaskMetadata](rdb) mgr.Register(&TaskNotifyHook{notify: notify}) ctx := context.Background() meta := TaskMetadata{TaskID: "task-1", UserID: "user-1"} // Task: (empty) -> pending err := mgr.Transition(ctx, "task-1", TaskStatePending, meta) assert.NoError(t, err) // 验证:发送了 TaskCreated 通知 assert.Contains(t, notify.events, domain.NotifyEventTaskCreated) } ``` -------------------------------- ### Build and Commit Commands Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-26-task-stop-recycle.md Standard commands for verifying compilation and committing changes. ```bash cd /Users/yoko/chaitin/ai/MonkeyCode/backend && go build ./... ``` ```bash git add backend/pkg/lifecycle/vmrecyclehook.go git commit -m "feat: add VMRecycleHook for VM cleanup on recycled state" ``` ```bash git add backend/pkg/register.go git commit -m "feat: register VMRecycleHook in VM lifecycle manager" ``` ```bash git add backend/biz/task/usecase/task.go git commit -m "feat: recycle VM on task stop and delete via lifecycle" ``` -------------------------------- ### Implement Attach Stream Logic Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-04-01-stream-attach-latest-round.md Logic for replaying historical logs and conditionally consuming live streams. ```go attachNow := time.Now() roundStart, err := h.loki.FindLatestRoundStart(...) ended := h.replayLatestRoundHistory(..., roundStart, attachNow) if !ended { h.consumeLiveStream(...) } ``` -------------------------------- ### Verify Build Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-18-lifecycle-refactor.md Command to verify that the project compiles after DI changes. ```bash cd /Users/yoko/chaitin/ai/MonkeyCode/backend go build ./... # Expected: PASS ``` -------------------------------- ### 移除 TaskUsecase 中的 Tasker 依赖 Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-19-replace-tasker-with-lifecycle.md 从 TaskUsecase 结构体中移除 tasker 字段及其注入逻辑。 ```go type TaskUsecase struct { // 删除: tasker *tasker.Tasker[*domain.TaskSession] } ``` ```go // 删除:tasker: do.MustInvoke[*tasker.Tasker[*domain.TaskSession]](i), // 删除:go u.tasker.StartGroupConsumers(...) ``` -------------------------------- ### Run Tests for VM Hooks Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-18-lifecycle-refactor.md Command to execute the Go tests for the VM hooks in the specified directory. This verifies the correct implementation of task status updates and notifications. ```bash cd /Users/yoko/chaitin/ai/MonkeyCode/backend go test ./pkg/lifecycle/... -v -run TestVM # Expected: PASS ``` -------------------------------- ### Update NewHostUsecase Constructor Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-19-vm-idle-sleep-recycle.md Inject the new queue dependencies and initialize the consumer goroutines. ```go func NewHostUsecase(i *do.Injector) (domain.HostUsecase, error) { h := &HostUsecase{ cfg: do.MustInvoke[*config.Config](i), redis: do.MustInvoke[*redis.Client](i), taskflow: do.MustInvoke[taskflow.Clienter](i), logger: do.MustInvoke[*slog.Logger](i).With("module", "HostUsecase"), repo: do.MustInvoke[domain.HostRepo](i), userRepo: do.MustInvoke[domain.UserRepo](i), vmSleepQueue: do.MustInvoke[*delayqueue.VMSleepQueue](i), vmNotifyQueue: do.MustInvoke[*delayqueue.VMNotifyQueue](i), vmRecycleQueue: do.MustInvoke[*delayqueue.VMRecycleQueue](i), } if pc, err := do.Invoke[domain.PrivilegeChecker](i); err == nil { h.privilegeChecker = pc } go h.vmSleepConsumer() go h.vmNotifyConsumer() go h.vmRecycleConsumer() return h, nil } ``` -------------------------------- ### Data Flow for mode=new (Modified) Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/specs/2026-03-28-task-stream-realtime-design.md Illustrates the data flow for the 'new' mode after modifications. User input triggers task creation, followed by a WebSocket subscription to the real-time task live stream. Agent execution broadcasts chunks via gRPC to taskflow, which then pushes them to the WebSocket. ```text 用户输入 → MonkeyCode → taskflow (创建任务) ↓ MonkeyCode 订阅 /internal/ws/task-live?id=xxx&flush=false ↓ Agent 执行 → gRPC → taskflow taskRecv ├─ Broadcast 原始 chunk → WebSocket → MonkeyCode → 前端 └─ aggregator → loki(后台存储,不影响实时推送) ``` -------------------------------- ### Run and Commit Unit Tests Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-19-replace-tasker-with-lifecycle.md Execute the unit tests and commit the test file. ```bash go test ./pkg/lifecycle/... -v -run TestTaskCreateHook # Expected: PASS ``` ```bash git add pkg/lifecycle/task_create_hook_test.go git commit -m "test(lifecycle): add unit test for TaskCreateHook" ``` -------------------------------- ### 构建与验证命令 Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-19-replace-tasker-with-lifecycle.md 用于验证代码修改的构建命令。 ```bash cd /Users/yoko/chaitin/ai/MonkeyCode/backend go build ./biz/task/... ``` ```bash go build ./pkg/lifecycle/... ``` -------------------------------- ### Initiating Real-time Stream in Stream Handler (New Mode) Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/specs/2026-03-28-task-stream-realtime-design.md In the 'new' mode, after receiving the first 'user-input' message, this code initiates a real-time stream subscription using taskflow.TaskLive. It replaces the previous tailLogs functionality with a call to subscribeRealtimeStream. ```go if !realtimeStarted && m.Type == consts.TaskStreamTypeUserInput { realtimeStarted = true go h.subscribeRealtimeStream(ctx, cancel, wsConn, logger, task.ID.String()) } ``` -------------------------------- ### Implement vmRecycleConsumer Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-19-vm-idle-sleep-recycle.md Consumes recycle tasks to delete VMs and mark them as recycled in the database. ```go func (h *HostUsecase) vmRecycleConsumer() { logger := h.logger.With("fn", "vmRecycleConsumer") for { err := h.vmRecycleQueue.StartConsumer(context.Background(), VM_RECYCLE_QUEUE_KEY, func(ctx context.Context, job *delayqueue.Job[*domain.VmIdleInfo]) error { innerLogger := logger.With("job", job) innerLogger.InfoContext(ctx, "vm recycle triggered") ctx = entx.SkipSoftDelete(ctx) vm, err := h.repo.GetVirtualMachine(ctx, job.Payload.VmID) if err != nil { innerLogger.ErrorContext(ctx, "failed to get vm", "error", err) return nil } if err := h.taskflow.VirtualMachiner().Delete(ctx, &taskflow.DeleteVirtualMachineReq{ UserID: vm.UserID.String(), HostID: vm.HostID, ID: vm.EnvironmentID, }); err != nil { innerLogger.ErrorContext(ctx, "failed to delete vm", "error", err) } if err := h.repo.UpdateVirtualMachine(ctx, vm.ID, func(vmuo *db.VirtualMachineUpdateOne) error { vmuo.SetIsRecycled(true) return nil }); err != nil { ``` -------------------------------- ### Implement VM Recycle Hook Interface Methods Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-26-task-stop-recycle.md Implements the Hook interface methods (Name, Priority, Async) for the VMRecycleHook. These methods define the hook's identity, execution priority, and whether it runs asynchronously. ```go func (h *VMRecycleHook) Name() string { return "vm-recycle-hook" } func (h *VMRecycleHook) Priority() int { return 100 } func (h *VMRecycleHook) Async() bool { return false } ``` -------------------------------- ### TaskLive Subscription and Consumption in Attach Mode Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/specs/2026-03-28-task-stream-realtime-design.md This function demonstrates the 'attach' mode stream handling. It first subscribes to the real-time stream with flush=true, then replays historical logs from Loki, and finally consumes the live stream if the task has not ended. This ensures no data is lost during the transition. ```go func (h *TaskHandler) attachStream(ctx, cancel, wsConn, logger, task) { taskCreatedAt := time.Unix(task.CreatedAt, 0) tailStart := h.findTailStart(ctx, task.ID.String(), taskCreatedAt) hasMore := tailStart.After(taskCreatedAt) h.writeCursor(wsConn, tailStart, hasMore) // 步骤1:先订阅实时流(flush=true) liveCh, cleanup, err := h.taskflow.TaskLiveSubscribe(ctx, task.ID.String(), true) if err != nil { // 降级:仍然用 loki tail h.tailLogs(ctx, cancel, wsConn, logger, task.ID.String(), tailStart) return } defer cleanup() // 步骤2:从 loki 读历史 ended := h.replayLokiHistory(ctx, wsConn, logger, task.ID.String(), tailStart) if ended { return } // 步骤3:消费实时流 h.consumeLiveStream(ctx, cancel, wsConn, logger, liveCh) } ``` -------------------------------- ### 添加 Redis 临时存储 CreateTaskReq Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-19-replace-tasker-with-lifecycle.md 在任务创建后,将 CreateTaskReq 序列化并存入 Redis,设置 10 分钟过期时间。 ```go // 创建任务后,临时存储 CreateTaskReq(10 分钟过期) createReq := &taskflow.CreateTaskReq{ ID: t.ID, VMID: vm.ID, Text: req.Content, SystemPrompt: req.SystemPrompt, CodingAgent: coding, LLM: taskflow.LLM{ ApiKey: m.APIKey, BaseURL: m.BaseURL, Model: m.Model, }, Configs: configs, McpConfigs: mcps, } reqKey := fmt.Sprintf("task:create_req:%s", t.ID.String()) if err := a.redis.Set(ctx, reqKey, createReq, 10*time.Minute).Err(); err != nil { return nil, err } ``` -------------------------------- ### Implement vmNotifyConsumer Source: https://github.com/chaitin/monkeycode/blob/main/backend/docs/superpowers/plans/2026-03-19-vm-idle-sleep-recycle.md Consumes notification tasks for VM idle warnings. ```go func (h *HostUsecase) vmNotifyConsumer() { logger := h.logger.With("fn", "vmNotifyConsumer") for { err := h.vmNotifyQueue.StartConsumer(context.Background(), VM_NOTIFY_QUEUE_KEY, func(ctx context.Context, job *delayqueue.Job[*domain.VmIdleInfo]) error { logger.InfoContext(ctx, "vm recycle notify triggered", "vmID", job.Payload.VmID) // 通过 Notify 模块发送回收预警(任务维度) // TODO: 对接现有 Notify 模块 return nil }) logger.Warn("notify consumer error, retrying...", "error", err) time.Sleep(10 * time.Second) } } ```