task.go
1 // Copyright (c) 2024-2026 Tencent Zhuque Lab. All rights reserved. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 // 15 // Requirement: Any integration or derivative work must explicitly attribute 16 // Tencent Zhuque Lab (https://github.com/Tencent/AI-Infra-Guard) in its 17 // documentation or user interface, as detailed in the NOTICE file. 18 19 package websocket 20 21 import ( 22 "fmt" 23 "io" 24 "mime/multipart" 25 "net/http" 26 "regexp" 27 "strconv" 28 "strings" 29 30 "github.com/Tencent/AI-Infra-Guard/pkg/database" 31 32 "github.com/gin-gonic/gin" 33 "trpc.group/trpc-go/trpc-go/log" 34 ) 35 36 // TaskCreateRequest represents the request structure for creating a task 37 // Corresponds to the input when creating a task from the frontend 38 // Example: {"id":..., "sessionId":..., "task":..., ...} 39 type TaskCreateRequest struct { 40 ID string `json:"id" validate:"required"` // Message ID (dialog ID generated by frontend) - Required 41 SessionID string `json:"sessionId" validate:"required"` // Session ID (task ID) - Required 42 Username string `json:"username,omitempty"` // Username (optional, defaults to public user) 43 Task string `json:"taskType" validate:"required"` // Task type - Required 44 Timestamp int64 `json:"timestamp" validate:"required"` // Timestamp - Required 45 Content string `json:"content" validate:"required"` // Task content description - Required 46 Params map[string]interface{} `json:"params,omitempty"` // Task parameters - Optional 47 Attachments []string `json:"attachments,omitempty"` // Attachment list - Optional 48 CountryIsoCode string `json:"countryIsoCode,omitempty"` // Language identifier (optional) 49 } 50 51 // 通用事件消息体(SSE推送) 52 type TaskEventMessage struct { 53 ID string `json:"id"` // 事件ID - 必需 54 Type string `json:"type" validate:"required"` // 事件类型 - 必需 55 SessionID string `json:"sessionId" validate:"required"` // 会话ID - 必需 56 Timestamp int64 `json:"timestamp"` // 时间戳 - 必需 57 Event interface{} `json:"event" validate:"required"` // 事件数据 - 必需 58 } 59 60 // liveStatus 事件体 61 // {"type":"liveStatus", ...} 62 type LiveStatusEvent struct { 63 ID string `json:"id" validate:"required"` // 事件ID - 必需 64 Type string `json:"type" validate:"required"` // 事件类型 - 必需 65 Timestamp int64 `json:"timestamp" validate:"required"` // 时间戳 - 必需 66 Text string `json:"text" validate:"required"` // 状态文本 - 必需 67 } 68 69 // planUpdate 事件体 70 type PlanUpdateEvent struct { 71 ID string `json:"id" validate:"required"` // 事件ID - 必需 72 Type string `json:"type" validate:"required"` // 事件类型 - 必需 73 Timestamp int64 `json:"timestamp" validate:"required"` // 时间戳 - 必需 74 Tasks []PlanTaskItem `json:"tasks" validate:"required"` // 任务列表 - 必需 75 } 76 77 type PlanTaskItem struct { 78 StepID string `json:"stepId" validate:"required"` // 步骤ID - 必需 79 Status string `json:"status" validate:"required"` // 任务状态 - 必需 80 Title string `json:"title" validate:"required"` // 任务标题 - 必需 81 StartedAt int64 `json:"startedAt" validate:"required"` // 开始时间 - 必需 82 } 83 84 // newPlanStep 事件体 85 type NewPlanStepEvent struct { 86 ID string `json:"id" validate:"required"` // 事件ID - 必需 87 Type string `json:"type" validate:"required"` // 事件类型 - 必需 88 Timestamp int64 `json:"timestamp" validate:"required"` // 时间戳 - 必需 89 StepID string `json:"stepId" validate:"required"` // 步骤ID - 必需 90 Title string `json:"title" validate:"required"` // 步骤标题 - 必需 91 } 92 93 // statusUpdate 事件体 94 type StatusUpdateEvent struct { 95 ID string `json:"id" validate:"required"` // 事件ID - 必需 96 Type string `json:"type" validate:"required"` // 事件类型 - 必需 97 Timestamp int64 `json:"timestamp" validate:"required"` // 时间戳 - 必需 98 AgentStatus string `json:"agentStatus" validate:"required"` // Agent状态 - 必需 99 Brief string `json:"brief,omitempty"` // 简短描述 - 可选 100 Description string `json:"description,omitempty"` // 详细描述 - 可选 101 NoRender bool `json:"noRender,omitempty"` // 是否不渲染 - 可选 102 PlanStepID string `json:"planStepId,omitempty"` // 计划步骤ID - 可选 103 } 104 105 // toolUsed 事件体(支持多工具并行) 106 type ToolUsedEvent struct { 107 ID string `json:"id" validate:"required"` // 事件ID - 必需 108 Type string `json:"type" validate:"required"` // 事件类型 - 必需 109 Timestamp int64 `json:"timestamp" validate:"required"` // 时间戳 - 必需 110 Description string `json:"description" validate:"required"` // 描述 - 必需 111 PlanStepID string `json:"planStepId,omitempty"` // 计划步骤ID - 可选 112 StatusID string `json:"statusId,omitempty"` // 状态ID - 可选 113 Tools []ToolInfo `json:"tools" validate:"required"` // 工具列表 - 必需 114 Detail interface{} `json:"detail,omitempty"` // 详细信息 - 可选 115 } 116 117 // 工具信息 118 type ToolInfo struct { 119 ToolID string `json:"toolId" validate:"required"` // 工具ID - 必需 120 Tool string `json:"tool" validate:"required"` // 工具名称 - 必需 121 Status string `json:"status" validate:"required"` // 状态 - 必需 122 Brief string `json:"brief,omitempty"` // 简短描述 - 可选 123 Message interface{} `json:"message,omitempty"` // 消息 - 可选 124 Result string `json:"result,omitempty"` // 结果 - 可选 125 } 126 127 // actionLog 事件体 128 type ActionLogEvent struct { 129 ID string `json:"id" validate:"required"` // 事件ID - 必需 130 Type string `json:"type" validate:"required"` // 事件类型 - 必需 131 Timestamp int64 `json:"timestamp" validate:"required"` // 时间戳 - 必需 132 ActionID string `json:"actionId" validate:"required"` // 动作ID - 必需 133 Tool string `json:"tool" validate:"required"` // 工具名称 - 必需 134 PlanStepID string `json:"planStepId,omitempty"` // 计划步骤ID - 可选 135 ActionLog string `json:"actionLog" validate:"required"` // 动作日志 - 必需 136 } 137 138 // resultUpdate 事件体(任务完成结果) 139 type ResultUpdateEvent struct { 140 ID string `json:"id" validate:"required"` // 事件ID - 必需 141 Type string `json:"type" validate:"required"` // 事件类型 - 必需 142 Timestamp int64 `json:"timestamp" validate:"required"` // 时间戳 - 必需 143 Result interface{} `json:"result" validate:"required"` // 结果信息 - 必需(不同任务类型结果字段各不相同) 144 } 145 146 // 任务分配消息(Server -> Agent) 147 type TaskAssignMessage struct { 148 Type string `json:"type" validate:"required"` // 消息类型 - 必需 149 Content TaskContent `json:"content" validate:"required"` // 任务内容 - 必需 150 } 151 152 // 任务内容 153 type TaskContent struct { 154 SessionID string `json:"sessionId" validate:"required"` // 会话ID - 必需 155 TaskType string `json:"taskType" validate:"required"` // 任务类型 - 必需 156 Content string `json:"content" validate:"required"` // 任务内容 - 必需 157 Params map[string]interface{} `json:"params,omitempty"` // 任务参数 - 可选 158 Attachments []string `json:"attachments,omitempty"` // 附件列表 - 可选 159 Timeout int `json:"timeout,omitempty"` // 超时时间 - 可选 160 CountryIsoCode string `json:"countryIsoCode,omitempty"` // 语言标识 - 可选 161 } 162 163 // 任务更新请求结构体 164 type TaskUpdateRequest struct { 165 Title string `json:"title" validate:"required"` // 任务标题 - 必需 166 } 167 168 // 辅助函数:从gin context中获取trace_id 169 func getTraceID(c *gin.Context) string { 170 if traceID, exists := c.Get("trace_id"); exists { 171 if id, ok := traceID.(string); ok { 172 return id 173 } 174 } 175 return "unknown" 176 } 177 178 // isValidSessionID 验证会话ID格式 179 func isValidSessionID(sessionId string) bool { 180 // 只允许字母、数字、下划线、连字符 181 matched, _ := regexp.MatchString(`^[a-zA-Z0-9_-]+$`, sessionId) 182 return matched && len(sessionId) > 0 && len(sessionId) <= 50 183 } 184 185 // validateFileUpload 验证文件上传 186 func validateFileUpload(header *multipart.FileHeader) error { 187 // 1. 文件名安全验证 188 originalName := header.Filename 189 if originalName == "" { 190 return fmt.Errorf("文件名不能为空") 191 } 192 193 // 防止路径遍历攻击 194 if strings.Contains(originalName, "..") || strings.Contains(originalName, "/") || strings.Contains(originalName, "\\") { 195 return fmt.Errorf("文件名包含非法字符") 196 } 197 return nil 198 } 199 200 // validateTaskUpdateRequest 验证任务更新请求 201 func validateTaskUpdateRequest(req *TaskUpdateRequest) error { 202 if req.Title != "" { 203 // 清理和验证标题 204 req.Title = strings.TrimSpace(req.Title) 205 if len(req.Title) > 100 { 206 return fmt.Errorf("标题长度超过限制") 207 } 208 } 209 return nil 210 } 211 212 // SSE接口(实时事件推送) 213 func HandleTaskSSE(c *gin.Context, tm *TaskManager) { 214 traceID := getTraceID(c) 215 sessionId := c.Param("sessionId") 216 if sessionId == "" { 217 c.JSON(http.StatusBadRequest, gin.H{ 218 "status": 1, 219 "message": "sessionId不能为空", 220 "data": nil, 221 }) 222 return 223 } 224 225 // 验证sessionId格式 226 if !isValidSessionID(sessionId) { 227 c.JSON(http.StatusBadRequest, gin.H{ 228 "status": 1, 229 "message": "无效的sessionId格式", 230 "data": nil, 231 }) 232 return 233 } 234 235 // 验证任务是否存在 236 _, err := tm.taskStore.GetSession(sessionId) 237 if err != nil { 238 c.JSON(http.StatusNotFound, gin.H{ 239 "status": 1, 240 "message": "任务不存在", 241 "data": nil, 242 }) 243 return 244 } 245 246 username := c.GetString("username") 247 248 // 建立SSE连接 249 err = tm.EstablishSSEConnection(c.Writer, sessionId, username, traceID) 250 if err != nil { 251 log.Errorf("建立SSE连接失败: trace_id=%s, sessionId=%s, username=%s, error=%v", traceID, sessionId, username, err) 252 c.JSON(http.StatusInternalServerError, gin.H{ 253 "status": 1, 254 "message": "建立SSE连接失败: " + err.Error(), 255 "data": nil, 256 }) 257 return 258 } 259 260 log.Infof("SSE连接建立成功: trace_id=%s, sessionId=%s, username=%s", traceID, sessionId, username) 261 262 // 保持连接活跃,等待客户端断开 263 <-c.Request.Context().Done() 264 265 // 客户端断开连接时,清理SSE连接 266 tm.CloseSSESession(sessionId) 267 log.Infof("SSE连接已断开: trace_id=%s, sessionId=%s", traceID, sessionId) 268 } 269 270 // 新建任务接口 271 func HandleTaskCreate(c *gin.Context, tm *TaskManager) { 272 traceID := getTraceID(c) 273 var req TaskCreateRequest 274 log.Infof("开始创建任务: trace_id=%s, req=%+v", traceID, req) 275 if err := c.ShouldBindJSON(&req); err != nil { 276 c.JSON(http.StatusOK, gin.H{ 277 "status": 1, 278 "message": "参数错误", 279 "data": nil, 280 }) 281 return 282 } 283 284 // 验证sessionId格式 285 if !isValidSessionID(req.SessionID) { 286 c.JSON(http.StatusOK, gin.H{ 287 "status": 1, 288 "message": "无效的会话ID格式", 289 "data": nil, 290 }) 291 return 292 } 293 294 // 从中间件获取用户名 295 username := c.GetString("username") 296 297 // 设置用户名到请求中 298 req.Username = username 299 300 log.Infof("开始创建任务: trace_id=%s, sessionId=%s, username=%s, taskType=%s", traceID, req.SessionID, username, req.Task) 301 302 // 调用TaskManager 303 err := tm.AddTask(&req, traceID) 304 if err != nil { 305 log.Errorf("任务创建失败: trace_id=%s, sessionId=%s, username=%s, error=%v", traceID, req.SessionID, username, err) 306 c.JSON(http.StatusOK, gin.H{ 307 "status": 1, 308 "message": "任务创建失败: " + err.Error(), 309 "data": nil, 310 }) 311 return 312 } 313 314 log.Infof("任务创建成功: trace_id=%s, sessionId=%s, username=%s", traceID, req.SessionID, username) 315 316 // 生成任务标题 317 title := tm.generateTaskTitle(&req) 318 319 c.JSON(http.StatusOK, gin.H{ 320 "status": 0, 321 "message": "任务创建成功", 322 "data": gin.H{ 323 "sessionId": req.SessionID, 324 "title": title, 325 }, 326 }) 327 } 328 329 // 终止任务接口 330 func HandleTerminateTask(c *gin.Context, tm *TaskManager) { 331 traceID := getTraceID(c) 332 sessionId := c.Param("sessionId") 333 if sessionId == "" { 334 c.JSON(http.StatusOK, gin.H{ 335 "status": 1, 336 "message": "会话ID不能为空", 337 "data": nil, 338 }) 339 return 340 } 341 342 // 验证sessionId格式 343 if !isValidSessionID(sessionId) { 344 c.JSON(http.StatusOK, gin.H{ 345 "status": 1, 346 "message": "无效的会话ID格式", 347 "data": nil, 348 }) 349 return 350 } 351 352 // 从中间件获取用户名 353 username := c.GetString("username") 354 355 log.Infof("用户请求终止任务: trace_id=%s, sessionId=%s, username=%s", traceID, sessionId, username) 356 357 // 调用TaskManager(包含权限验证) 358 err := tm.TerminateTask(sessionId, username, traceID) 359 if err != nil { 360 log.Errorf("任务终止失败: trace_id=%s, sessionId=%s, username=%s, error=%v", traceID, sessionId, username, err) 361 c.JSON(http.StatusOK, gin.H{ 362 "status": 1, 363 "message": "任务终止失败: " + err.Error(), 364 "data": nil, 365 }) 366 return 367 } 368 369 log.Infof("任务终止成功: trace_id=%s, sessionId=%s, username=%s", traceID, sessionId, username) 370 371 c.JSON(http.StatusOK, gin.H{ 372 "status": 0, 373 "message": "任务终止成功", 374 "data": nil, 375 }) 376 } 377 378 // 更新任务信息接口 379 func HandleUpdateTask(c *gin.Context, tm *TaskManager) { 380 traceID := getTraceID(c) 381 sessionId := c.Param("sessionId") 382 if sessionId == "" { 383 c.JSON(http.StatusOK, gin.H{ 384 "status": 1, 385 "message": "会话ID不能为空", 386 "data": nil, 387 }) 388 return 389 } 390 391 // 验证sessionId格式 392 if !isValidSessionID(sessionId) { 393 c.JSON(http.StatusOK, gin.H{ 394 "status": 1, 395 "message": "无效的会话ID格式", 396 "data": nil, 397 }) 398 return 399 } 400 401 var req TaskUpdateRequest 402 if err := c.ShouldBindJSON(&req); err != nil { 403 c.JSON(http.StatusOK, gin.H{ 404 "status": 1, 405 "message": "参数错误: " + err.Error(), 406 "data": nil, 407 }) 408 return 409 } 410 411 // 验证任务更新请求 412 if err := validateTaskUpdateRequest(&req); err != nil { 413 c.JSON(http.StatusOK, gin.H{ 414 "status": 1, 415 "message": "请求参数验证失败: " + err.Error(), 416 "data": nil, 417 }) 418 return 419 } 420 421 // 从中间件获取用户名 422 username := c.GetString("username") 423 424 log.Infof("开始更新任务: trace_id=%s, sessionId=%s, username=%s", traceID, sessionId, username) 425 426 // 执行任务信息更新(包含权限验证) 427 err := tm.UpdateTask(sessionId, &req, username, traceID) 428 if err != nil { 429 log.Errorf("任务信息更新失败: trace_id=%s, sessionId=%s, username=%s, error=%v", traceID, sessionId, username, err) 430 c.JSON(http.StatusOK, gin.H{ 431 "status": 1, 432 "message": "任务信息更新失败: " + err.Error(), 433 "data": nil, 434 }) 435 return 436 } 437 438 log.Infof("任务信息更新成功: trace_id=%s, sessionId=%s, username=%s", traceID, sessionId, username) 439 440 c.JSON(http.StatusOK, gin.H{ 441 "status": 0, 442 "message": "任务信息更新成功", 443 "data": nil, 444 }) 445 } 446 447 // 删除任务接口 448 func HandleDeleteTask(c *gin.Context, tm *TaskManager) { 449 traceID := getTraceID(c) 450 sessionId := c.Param("sessionId") 451 if sessionId == "" { 452 c.JSON(http.StatusOK, gin.H{ 453 "status": 1, 454 "message": "会话ID不能为空", 455 "data": nil, 456 }) 457 return 458 } 459 460 // 验证sessionId格式 461 if !isValidSessionID(sessionId) { 462 c.JSON(http.StatusOK, gin.H{ 463 "status": 1, 464 "message": "无效的会话ID格式", 465 "data": nil, 466 }) 467 return 468 } 469 470 // 从中间件获取用户名 471 username := c.GetString("username") 472 473 log.Infof("开始删除任务: trace_id=%s, sessionId=%s, username=%s", traceID, sessionId, username) 474 475 // 执行任务删除(包含权限验证) 476 err := tm.DeleteTask(sessionId, username, traceID) 477 if err != nil { 478 log.Errorf("任务删除失败: trace_id=%s, sessionId=%s, username=%s, error=%v", traceID, sessionId, username, err) 479 c.JSON(http.StatusOK, gin.H{ 480 "status": 1, 481 "message": "任务删除失败: " + err.Error(), 482 "data": nil, 483 }) 484 return 485 } 486 487 log.Infof("任务删除成功: trace_id=%s, sessionId=%s, username=%s", traceID, sessionId, username) 488 489 c.JSON(http.StatusOK, gin.H{ 490 "status": 0, 491 "message": "任务删除成功", 492 "data": nil, 493 }) 494 } 495 496 // HandleUploadFile 文件上传接口 497 // @Summary Upload file 498 // @Description Upload a file for task processing. Supports various file formats including zip, json, txt, etc. 499 // @Description The uploaded file will be stored securely and can be referenced in task creation. 500 // @Tags taskapi 501 // @Accept multipart/form-data 502 // @Produce json 503 // @Param file formData file true "File to upload" example:"example.zip" 504 // @Success 200 {object} object{status=int,message=string,data=object{fileUrl=string,filename=string,size=int}} "File uploaded successfully" 505 // @Failure 400 {object} object{status=int,message=string,data=object} "Invalid file or upload parameters" 506 // @Failure 500 {object} object{status=int,message=string,data=object} "Internal server error" 507 // @Router /api/v1/app/taskapi/upload [post] 508 func HandleUploadFile(c *gin.Context, tm *TaskManager) { 509 traceID := getTraceID(c) 510 // 获取上传的文件 511 file, err := c.FormFile("file") 512 if err != nil { 513 c.JSON(http.StatusOK, gin.H{ 514 "status": 1, 515 "message": "获取上传文件失败: " + err.Error(), 516 "data": nil, 517 }) 518 return 519 } 520 521 // 验证文件,包含文件名和文件内容以及文件扩展的校验,不存在文件路径遍历风险 522 if err := validateFileUpload(file); err != nil { 523 c.JSON(http.StatusOK, gin.H{ 524 "status": 1, 525 "message": "文件验证失败: " + err.Error(), 526 "data": nil, 527 }) 528 return 529 } 530 531 username := c.GetString("username") 532 log.Infof("开始文件上传: trace_id=%s, filename=%s, size=%d, username=%s", traceID, file.Filename, file.Size, username) 533 534 // 执行文件上传 535 uploadResult, err := tm.UploadFile(file, traceID) 536 if err != nil { 537 log.Errorf("文件上传失败: trace_id=%s, filename=%s, username=%s, error=%v", traceID, file.Filename, username, err) 538 c.JSON(http.StatusOK, gin.H{ 539 "status": 1, 540 "message": "文件上传失败: " + err.Error(), 541 "data": nil, 542 }) 543 return 544 } 545 546 log.Infof("文件上传成功: trace_id=%s, filename=%s, fileUrl=%s, username=%s", traceID, file.Filename, uploadResult.FileURL, username) 547 548 c.JSON(http.StatusOK, gin.H{ 549 "status": 0, 550 "message": "文件上传成功", 551 "data": uploadResult, 552 }) 553 } 554 555 // HandleUploadFileChunk 分片上传接口 556 // @Summary Upload file chunk 557 // @Description Upload a file chunk for chunked file upload. Each chunk is stored temporarily until all chunks are merged. 558 // @Tags taskapi 559 // @Accept multipart/form-data 560 // @Produce json 561 // @Param fileId formData string true "Unique file identifier for grouping chunks" 562 // @Param filename formData string true "Original filename" 563 // @Param chunkIndex formData int true "Current chunk index (0-based)" 564 // @Param totalChunks formData int true "Total number of chunks" 565 // @Param chunk formData file true "File chunk data" 566 // @Success 200 {object} object{status=int,message=string,data=object{chunkIndex=int,totalChunks=int,message=string}} "Chunk uploaded successfully" 567 // @Failure 400 {object} object{status=int,message=string,data=object} "Invalid parameters" 568 // @Failure 500 {object} object{status=int,message=string,data=object} "Internal server error" 569 // @Router /api/v1/app/tasks/uploadChunk [post] 570 func HandleUploadFileChunk(c *gin.Context, tm *TaskManager) { 571 traceID := getTraceID(c) 572 username := c.GetString("username") 573 574 // 获取表单字段 575 fileID := c.PostForm("fileId") 576 filename := c.PostForm("filename") 577 chunkIndexStr := c.PostForm("chunkIndex") 578 totalChunksStr := c.PostForm("totalChunks") 579 580 // 验证必要参数 581 if fileID == "" || filename == "" || chunkIndexStr == "" || totalChunksStr == "" { 582 c.JSON(http.StatusOK, gin.H{ 583 "status": 1, 584 "message": "参数不完整: 需要 fileId, filename, chunkIndex, totalChunks", 585 "data": nil, 586 }) 587 return 588 } 589 590 // 验证fileID格式(防止路径遍历) 591 if strings.Contains(fileID, "..") || strings.Contains(fileID, "/") || strings.Contains(fileID, "\\") { 592 c.JSON(http.StatusOK, gin.H{ 593 "status": 1, 594 "message": "无效的fileId格式", 595 "data": nil, 596 }) 597 return 598 } 599 600 // 验证filename格式(防止路径遍历) 601 if strings.Contains(filename, "..") || strings.Contains(filename, "/") || strings.Contains(filename, "\\") { 602 c.JSON(http.StatusOK, gin.H{ 603 "status": 1, 604 "message": "文件名包含非法字符", 605 "data": nil, 606 }) 607 return 608 } 609 610 // 解析数值参数 611 chunkIndex, err := strconv.Atoi(chunkIndexStr) 612 if err != nil || chunkIndex < 0 { 613 c.JSON(http.StatusOK, gin.H{ 614 "status": 1, 615 "message": "无效的chunkIndex", 616 "data": nil, 617 }) 618 return 619 } 620 621 totalChunks, err := strconv.Atoi(totalChunksStr) 622 if err != nil || totalChunks <= 0 { 623 c.JSON(http.StatusOK, gin.H{ 624 "status": 1, 625 "message": "无效的totalChunks", 626 "data": nil, 627 }) 628 return 629 } 630 631 if chunkIndex >= totalChunks { 632 c.JSON(http.StatusOK, gin.H{ 633 "status": 1, 634 "message": "chunkIndex必须小于totalChunks", 635 "data": nil, 636 }) 637 return 638 } 639 640 // 获取分片文件 641 chunk, err := c.FormFile("chunk") 642 if err != nil { 643 c.JSON(http.StatusOK, gin.H{ 644 "status": 1, 645 "message": "获取分片数据失败: " + err.Error(), 646 "data": nil, 647 }) 648 return 649 } 650 651 // 读取分片数据 652 chunkFile, err := chunk.Open() 653 if err != nil { 654 c.JSON(http.StatusOK, gin.H{ 655 "status": 1, 656 "message": "打开分片文件失败: " + err.Error(), 657 "data": nil, 658 }) 659 return 660 } 661 defer chunkFile.Close() 662 663 chunkData, err := io.ReadAll(chunkFile) 664 if err != nil { 665 c.JSON(http.StatusOK, gin.H{ 666 "status": 1, 667 "message": "读取分片数据失败: " + err.Error(), 668 "data": nil, 669 }) 670 return 671 } 672 673 log.Infof("开始分片上传: trace_id=%s, fileId=%s, filename=%s, chunkIndex=%d/%d, size=%d, username=%s", 674 traceID, fileID, filename, chunkIndex+1, totalChunks, len(chunkData), username) 675 676 // 执行分片上传 677 result, err := tm.UploadFileChunk(fileID, filename, chunkIndex, totalChunks, chunkData, traceID) 678 if err != nil { 679 log.Errorf("分片上传失败: trace_id=%s, fileId=%s, chunkIndex=%d, username=%s, error=%v", 680 traceID, fileID, chunkIndex, username, err) 681 c.JSON(http.StatusOK, gin.H{ 682 "status": 1, 683 "message": "分片上传失败: " + err.Error(), 684 "data": nil, 685 }) 686 return 687 } 688 689 log.Infof("分片上传成功: trace_id=%s, fileId=%s, chunkIndex=%d/%d, username=%s", 690 traceID, fileID, chunkIndex+1, totalChunks, username) 691 692 c.JSON(http.StatusOK, gin.H{ 693 "status": 0, 694 "message": result.Message, 695 "data": result, 696 }) 697 } 698 699 // MergeChunksRequest 合并分片请求 700 type MergeChunksRequest struct { 701 FileID string `json:"fileId" binding:"required"` // 文件唯一标识 702 Filename string `json:"filename" binding:"required"` // 原始文件名 703 TotalChunks int `json:"totalChunks" binding:"required"` // 总分片数 704 FileSize int64 `json:"fileSize" binding:"required"` // 文件总大小 705 } 706 707 // HandleMergeFileChunks 合并分片接口 708 // @Summary Merge file chunks 709 // @Description Merge all uploaded chunks into a single file. Should be called after all chunks are uploaded. 710 // @Tags taskapi 711 // @Accept json 712 // @Produce json 713 // @Param request body MergeChunksRequest true "Merge request parameters" 714 // @Success 200 {object} object{status=int,message=string,data=object{filename=string,fileUrl=string,fileSize=int}} "File merged successfully" 715 // @Failure 400 {object} object{status=int,message=string,data=object} "Invalid parameters or missing chunks" 716 // @Failure 500 {object} object{status=int,message=string,data=object} "Internal server error" 717 // @Router /api/v1/app/tasks/mergeChunks [post] 718 func HandleMergeFileChunks(c *gin.Context, tm *TaskManager) { 719 traceID := getTraceID(c) 720 username := c.GetString("username") 721 722 var req MergeChunksRequest 723 if err := c.ShouldBindJSON(&req); err != nil { 724 c.JSON(http.StatusOK, gin.H{ 725 "status": 1, 726 "message": "参数错误: " + err.Error(), 727 "data": nil, 728 }) 729 return 730 } 731 732 // 验证参数 733 if req.FileID == "" || req.Filename == "" || req.TotalChunks <= 0 || req.FileSize <= 0 { 734 c.JSON(http.StatusOK, gin.H{ 735 "status": 1, 736 "message": "参数不完整", 737 "data": nil, 738 }) 739 return 740 } 741 742 // 验证fileID格式(防止路径遍历) 743 if strings.Contains(req.FileID, "..") || strings.Contains(req.FileID, "/") || strings.Contains(req.FileID, "\\") { 744 c.JSON(http.StatusOK, gin.H{ 745 "status": 1, 746 "message": "无效的fileId格式", 747 "data": nil, 748 }) 749 return 750 } 751 752 // 验证filename格式(防止路径遍历) 753 if strings.Contains(req.Filename, "..") || strings.Contains(req.Filename, "/") || strings.Contains(req.Filename, "\\") { 754 c.JSON(http.StatusOK, gin.H{ 755 "status": 1, 756 "message": "文件名包含非法字符", 757 "data": nil, 758 }) 759 return 760 } 761 762 log.Infof("开始合并分片: trace_id=%s, fileId=%s, filename=%s, totalChunks=%d, fileSize=%d, username=%s", 763 traceID, req.FileID, req.Filename, req.TotalChunks, req.FileSize, username) 764 765 // 执行分片合并 766 result, err := tm.MergeFileChunks(req.FileID, req.Filename, req.TotalChunks, req.FileSize, traceID) 767 if err != nil { 768 log.Errorf("分片合并失败: trace_id=%s, fileId=%s, username=%s, error=%v", 769 traceID, req.FileID, username, err) 770 c.JSON(http.StatusOK, gin.H{ 771 "status": 1, 772 "message": "文件合并失败: " + err.Error(), 773 "data": nil, 774 }) 775 return 776 } 777 778 log.Infof("分片合并成功: trace_id=%s, fileId=%s, filename=%s, fileUrl=%s, username=%s", 779 traceID, req.FileID, result.Filename, result.FileURL, username) 780 781 c.JSON(http.StatusOK, gin.H{ 782 "status": 0, 783 "message": "文件合并成功", 784 "data": result, 785 }) 786 } 787 788 // 获取任务列表接口 789 func HandleGetTaskList(c *gin.Context, tm *TaskManager) { 790 traceID := getTraceID(c) 791 // 从中间件获取用户名 792 username := c.GetString("username") 793 query := c.Query("q") 794 taskType := c.DefaultQuery("taskType", "") 795 var err error 796 797 log.Debugf("开始获取任务列表: trace_id=%s, username=%s, taskType=%s", traceID, username, taskType) 798 var results []map[string]interface{} 799 if query != "" { 800 log.Debugf("搜索参数: trace_id=%s, username=%s, query=%s, taskType=%s", traceID, username, query, taskType) 801 var searchParams database.SimpleSearchParams 802 803 // 从查询字符串获取搜索关键词和任务类型 804 searchParams.Query = query 805 searchParams.TaskType = taskType 806 searchParams.Page = 1 807 searchParams.PageSize = 999 808 // 调用TaskManager进行简化搜索 809 results, err = tm.SearchUserTasksSimple(username, searchParams, traceID) 810 if err != nil { 811 log.Errorf("搜索任务失败: trace_id=%s, username=%s, error=%v", traceID, username, err) 812 c.JSON(http.StatusOK, gin.H{ 813 "status": 1, 814 "message": "搜索任务失败: " + err.Error(), 815 "data": nil, 816 }) 817 return 818 } 819 820 } else { 821 // 获取用户的任务列表(支持taskType过滤) 822 results, err = tm.GetUserTasksByType(username, taskType, traceID) 823 if err != nil { 824 log.Errorf("获取任务列表失败: trace_id=%s, username=%s, error=%v", traceID, username, err) 825 c.JSON(http.StatusOK, gin.H{ 826 "status": 1, 827 "message": "获取任务列表失败: " + err.Error(), 828 "data": nil, 829 }) 830 return 831 } 832 } 833 834 log.Debugf("获取任务列表成功: trace_id=%s, username=%s, taskCount=%d", traceID, username, len(results)) 835 836 c.JSON(http.StatusOK, gin.H{ 837 "status": 0, 838 "message": "获取任务列表成功", 839 "data": gin.H{ 840 "tasks": results, 841 }, 842 }) 843 } 844 845 // HandleShare 分享任务 846 func HandleShare(c *gin.Context, tm *TaskManager) { 847 var params struct { 848 Session string `json:"sessionId" binding:"required"` 849 } 850 if err := c.ShouldBindJSON(¶ms); err != nil { 851 c.JSON(http.StatusBadRequest, gin.H{ 852 "status": 1, 853 "message": "参数错误: " + err.Error(), 854 "data": nil, 855 }) 856 return 857 } 858 if params.Session == "" { 859 c.JSON(http.StatusBadRequest, gin.H{ 860 "status": 1, 861 "message": "sessionId不能为空", 862 "data": nil, 863 }) 864 return 865 } 866 867 // 验证sessionId格式 868 if !isValidSessionID(params.Session) { 869 c.JSON(http.StatusBadRequest, gin.H{ 870 "status": 1, 871 "message": "无效的sessionId格式", 872 "data": nil, 873 }) 874 return 875 } 876 877 // 获取用户信息 878 username := c.GetString("username") 879 session, err := tm.taskStore.GetSession(params.Session) 880 if err != nil { 881 c.JSON(http.StatusInternalServerError, gin.H{ 882 "status": 1, 883 "message": "获取任务详情失败: " + err.Error(), 884 "data": nil, 885 }) 886 return 887 } 888 if username != session.Username { 889 c.JSON(http.StatusForbidden, gin.H{ 890 "status": 1, 891 "message": "无权限访问", 892 "data": nil, 893 }) 894 return 895 } 896 err = tm.taskStore.SetShare(params.Session, true) 897 if err != nil { 898 c.JSON(http.StatusInternalServerError, gin.H{ 899 "status": 1, 900 "message": "设置分享失败: " + err.Error(), 901 "data": nil, 902 }) 903 return 904 } 905 c.JSON(http.StatusOK, gin.H{ 906 "status": 0, 907 "message": "设置分享成功", 908 "data": nil, 909 }) 910 } 911 912 // HandleGetTaskDetail 获取任务详情 913 func HandleGetTaskDetail(c *gin.Context, tm *TaskManager) { 914 traceID := getTraceID(c) 915 sessionId := c.Param("sessionId") 916 if sessionId == "" { 917 c.JSON(http.StatusBadRequest, gin.H{ 918 "status": 1, 919 "message": "sessionId不能为空", 920 "data": nil, 921 }) 922 return 923 } 924 925 // 验证sessionId格式 926 if !isValidSessionID(sessionId) { 927 c.JSON(http.StatusBadRequest, gin.H{ 928 "status": 1, 929 "message": "无效的sessionId格式", 930 "data": nil, 931 }) 932 return 933 } 934 935 // 获取用户信息 936 username := c.GetString("username") 937 938 log.Infof("开始获取任务详情: trace_id=%s, sessionId=%s, username=%s", traceID, sessionId, username) 939 940 // 调用TaskManager获取任务详情 941 detail, err := tm.GetTaskDetail(sessionId, username, traceID) 942 if err != nil { 943 log.Errorf("获取任务详情失败: trace_id=%s, sessionId=%s, username=%s, error=%v", traceID, sessionId, username, err) 944 c.JSON(http.StatusInternalServerError, gin.H{ 945 "status": 1, 946 "message": "获取任务详情失败: " + err.Error(), 947 "data": nil, 948 }) 949 return 950 } 951 952 log.Infof("获取任务详情成功: trace_id=%s, sessionId=%s, username=%s", traceID, sessionId, username) 953 954 c.JSON(http.StatusOK, gin.H{ 955 "status": 0, 956 "message": "获取任务详情成功", 957 "data": detail, 958 }) 959 } 960 961 // HandleDownloadFile 文件下载接口 962 func HandleDownloadFile(c *gin.Context, tm *TaskManager) { 963 traceID := getTraceID(c) 964 sessionId := c.Param("sessionId") 965 if sessionId == "" { 966 c.JSON(http.StatusBadRequest, gin.H{ 967 "status": 1, 968 "message": "sessionId不能为空", 969 "data": nil, 970 }) 971 return 972 } 973 974 // 验证sessionId格式 975 if !isValidSessionID(sessionId) { 976 c.JSON(http.StatusBadRequest, gin.H{ 977 "status": 1, 978 "message": "无效的会话ID格式", 979 "data": nil, 980 }) 981 return 982 } 983 984 // 解析请求体 985 var req struct { 986 FileURL string `json:"fileUrl" binding:"required"` 987 } 988 if err := c.ShouldBindJSON(&req); err != nil { 989 c.JSON(http.StatusBadRequest, gin.H{ 990 "status": 1, 991 "message": "参数错误: " + err.Error(), 992 "data": nil, 993 }) 994 return 995 } 996 997 // 验证fileUrl格式 998 if req.FileURL == "" { 999 c.JSON(http.StatusBadRequest, gin.H{ 1000 "status": 1, 1001 "message": "文件URL不能为空", 1002 "data": nil, 1003 }) 1004 return 1005 } 1006 1007 // 获取用户信息 1008 username := c.GetString("username") 1009 1010 log.Infof("开始文件下载: trace_id=%s, sessionId=%s, fileUrl=%s, username=%s", traceID, sessionId, req.FileURL, username) 1011 1012 // 执行文件下载 1013 err := tm.DownloadFile(sessionId, req.FileURL, username, c, traceID) 1014 if err != nil { 1015 log.Errorf("文件下载失败: trace_id=%s, sessionId=%s, fileUrl=%s, username=%s, error=%v", traceID, sessionId, req.FileURL, username, err) 1016 // 根据错误类型返回不同的状态码 1017 switch err.Error() { 1018 case "任务不存在": 1019 c.JSON(http.StatusNotFound, gin.H{ 1020 "status": 1, 1021 "message": "任务不存在", 1022 "data": nil, 1023 }) 1024 case "文件不存在于此任务中": 1025 c.JSON(http.StatusNotFound, gin.H{ 1026 "status": 1, 1027 "message": "文件不存在于此任务中", 1028 "data": nil, 1029 }) 1030 case "文件不存在": 1031 c.JSON(http.StatusNotFound, gin.H{ 1032 "status": 1, 1033 "message": "文件不存在", 1034 "data": nil, 1035 }) 1036 default: 1037 c.JSON(http.StatusInternalServerError, gin.H{ 1038 "status": 1, 1039 "message": "文件下载失败: " + err.Error(), 1040 "data": nil, 1041 }) 1042 } 1043 return 1044 } 1045 1046 log.Infof("文件下载成功: trace_id=%s, sessionId=%s, fileUrl=%s, username=%s", traceID, sessionId, req.FileURL, username) 1047 1048 // 文件下载成功,响应头已在DownloadFile方法中设置 1049 }