【2026实战】Go语言实现AI Agent任务调度网关
✅ Hertz高性能HTTP服务✅ Redis + Asynq任务队列✅ Python服务客户端✅ 完整的任务生命周期管理✅ Docker + K8s部署下一篇,我们将实现Go与Python Agent的通信机制。← 5. Python与国产大模型深度集成 6.Go语言实现AI Agent任务调度网关(本文) 7. Go与Python Agent通信机制实现 →本文首发于CSDN,转载请注明出处。
·
系列第6篇:Python+Go构建企业级AI Agent实战指南(6/13)
标签: Go | Hertz | 任务调度 | 网关 | 高并发
一、开篇:为什么用Go做Agent基础设施?
Python是AI的"母语",但在企业级部署中,Go才是基础设施的"王者"。
字节跳动的实践数据:
- 40%的微服务采用Go开发
- AI服务网关全部使用Go
- 单机QPS提升3-5倍
Go的核心优势:
- 高并发:goroutine轻松处理百万连接
- 低延迟:GC优化后P99延迟<10ms
- 部署简单:单二进制文件,无依赖
- 云原生:Kubernetes原生支持
本文将用Go构建一个生产级的AI Agent任务调度网关。
二、技术选型:Hertz框架
2.1 为什么选择Hertz?
Hertz是字节跳动开源的高性能Go HTTP框架:
- 性能:比Gin快30%
- 生态:内置中间件、协议支持完善
- 生产验证:抖音、今日头条都在用
2.2 架构设计
┌─────────────────────────────────────────────────────────────┐
│ Agent任务调度网关 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端请求 → Hertz Router → 中间件链 → Handler → Python服务 │
│ ↓ ↓ ↓ │
│ 路由分发 认证/限流 任务队列 │
│ 负载均衡 日志/监控 结果聚合 │
│ │
└─────────────────────────────────────────────────────────────┘
三、项目初始化
3.1 创建项目
# 创建项目目录
mkdir -p agent-gateway
cd agent-gateway
# 初始化Go模块
go mod init agent-gateway
# 安装依赖
go get github.com/cloudwego/hertz@latest
go get github.com/cloudwego/kitex@latest
go get github.com/redis/go-redis/v9
go get github.com/hibiken/asynq
3.2 项目结构
agent-gateway/
├── cmd/
│ └── server/
│ └── main.go # 入口
├── internal/
│ ├── handler/
│ │ ├── agent.go # Agent请求处理
│ │ └── health.go # 健康检查
│ ├── middleware/
│ │ ├── auth.go # 认证
│ │ ├── rate_limit.go # 限流
│ │ └── logger.go # 日志
│ ├── service/
│ │ ├── task_queue.go # 任务队列
│ │ └── python_client.go # Python服务客户端
│ └── config/
│ └── config.go # 配置
├── pkg/
│ └── utils/
│ └── response.go # 响应工具
├── go.mod
└── go.sum
四、核心代码实现
4.1 主入口
// cmd/server/main.go
package main
import (
"context"
"log"
"github.com/cloudwego/hertz/pkg/app"
"github.com/cloudwego/hertz/pkg/app/server"
"github.com/cloudwego/hertz/pkg/common/utils"
"github.com/cloudwego/hertz/pkg/protocol/consts"
"agent-gateway/internal/handler"
"agent-gateway/internal/middleware"
)
func main() {
// 创建Hertz服务器
h := server.Default(
server.WithHostPorts("0.0.0.0:8080"),
)
// 注册中间件
h.Use(middleware.Logger())
h.Use(middleware.Recovery())
h.Use(middleware.CORS())
h.Use(middleware.RateLimiter(100, 200)) // 100 QPS, burst 200
// 注册路由
registerRoutes(h)
log.Println("🚀 Agent Gateway starting on :8080")
h.Spin()
}
func registerRoutes(h *server.Hertz) {
// 健康检查
h.GET("/health", handler.HealthCheck)
// API v1
v1 := h.Group("/api/v1")
{
v1.Use(middleware.Auth())
// Agent相关
v1.POST("/agent/run", handler.RunAgent)
v1.GET("/agent/status/:taskId", handler.GetTaskStatus)
v1.GET("/agent/result/:taskId", handler.GetTaskResult)
// 批量任务
v1.POST("/agent/batch", handler.BatchRun)
// 任务管理
v1.DELETE("/agent/task/:taskId", handler.CancelTask)
v1.GET("/agent/tasks", handler.ListTasks)
}
// WebSocket实时通信
h.GET("/ws/agent", handler.AgentWebSocket)
}
4.2 任务队列服务
// internal/service/task_queue.go
package service
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
)
const (
QueueDefault = "default"
QueueHigh = "high"
QueueLow = "low"
)
type TaskType string
const (
TaskTypeAgentRun TaskType = "agent:run"
TaskTypeAgentBatch TaskType = "agent:batch"
)
type TaskQueue struct {
client *asynq.Client
server *asynq.Server
redis *redis.Client
}
type AgentTask struct {
TaskID string `json:"task_id"`
AgentType string `json:"agent_type"`
Input map[string]interface{} `json:"input"`
UserID string `json:"user_id"`
Priority int `json:"priority"`
CreatedAt time.Time `json:"created_at"`
Timeout time.Duration `json:"timeout"`
}
func NewTaskQueue(redisAddr string) *TaskQueue {
redisOpt := asynq.RedisClientOpt{Addr: redisAddr}
client := asynq.NewClient(redisOpt)
server := asynq.NewServer(
redisOpt,
asynq.Config{
Concurrency: 10,
Queues: map[string]int{
QueueHigh: 6,
QueueDefault: 3,
QueueLow: 1,
},
},
)
rdb := redis.NewClient(&redis.Options{
Addr: redisAddr,
})
return &TaskQueue{
client: client,
server: server,
redis: rdb,
}
}
func (tq *TaskQueue) Enqueue(ctx context.Context, task *AgentTask) (string, error) {
payload, err := json.Marshal(task)
if err != nil {
return "", err
}
// 选择队列
queue := QueueDefault
if task.Priority > 5 {
queue = QueueHigh
} else if task.Priority < 3 {
queue = QueueLow
}
// 创建asynq任务
asynqTask := asynq.NewTask(string(TaskTypeAgentRun), payload)
// 设置选项
opts := []asynq.Option{
asynq.Queue(queue),
asynq.TaskID(task.TaskID),
asynq.Retention(24 * time.Hour),
}
if task.Timeout > 0 {
opts = append(opts, asynq.Timeout(task.Timeout))
}
info, err := tq.client.EnqueueContext(ctx, asynqTask, opts...)
if err != nil {
return "", err
}
// 存储任务元数据
taskKey := fmt.Sprintf("task:%s", task.TaskID)
tq.redis.HSet(ctx, taskKey, map[string]interface{}{
"status": "pending",
"created_at": task.CreatedAt.Format(time.RFC3339),
"agent_type": task.AgentType,
"user_id": task.UserID,
})
tq.redis.Expire(ctx, taskKey, 24*time.Hour)
return info.ID, nil
}
func (tq *TaskQueue) GetTaskStatus(ctx context.Context, taskID string) (map[string]string, error) {
taskKey := fmt.Sprintf("task:%s", taskID)
result, err := tq.redis.HGetAll(ctx, taskKey).Result()
if err != nil {
return nil, err
}
if len(result) == 0 {
return nil, fmt.Errorf("task not found: %s", taskID)
}
return result, nil
}
func (tq *TaskQueue) UpdateTaskStatus(ctx context.Context, taskID, status, result string) error {
taskKey := fmt.Sprintf("task:%s", taskID)
data := map[string]interface{}{
"status": status,
"updated_at": time.Now().Format(time.RFC3339),
}
if result != "" {
data["result"] = result
}
return tq.redis.HSet(ctx, taskKey, data).Err()
}
func (tq *TaskQueue) StartWorker(handler func(context.Context, *AgentTask) error) error {
mux := asynq.NewServeMux()
mux.HandleFunc(string(TaskTypeAgentRun), func(ctx context.Context, t *asynq.Task) error {
var task AgentTask
if err := json.Unmarshal(t.Payload(), &task); err != nil {
return err
}
// 更新状态为运行中
tq.UpdateTaskStatus(ctx, task.TaskID, "running", "")
// 执行任务
err := handler(ctx, &task)
// 更新状态
if err != nil {
tq.UpdateTaskStatus(ctx, task.TaskID, "failed", err.Error())
} else {
tq.UpdateTaskStatus(ctx, task.TaskID, "completed", "")
}
return err
})
return tq.server.Run(mux)
}
4.3 Python服务客户端
// internal/service/python_client.go
package service
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
type PythonClient struct {
baseURL string
httpClient *http.Client
}
type AgentRequest struct {
AgentType string `json:"agent_type"`
Input map[string]interface{} `json:"input"`
Context map[string]interface{} `json:"context,omitempty"`
}
type AgentResponse struct {
Success bool `json:"success"`
Result map[string]interface{} `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
func NewPythonClient(baseURL string) *PythonClient {
return &PythonClient{
baseURL: baseURL,
httpClient: &http.Client{
Timeout: 120 * time.Second,
},
}
}
func (c *PythonClient) RunAgent(ctx context.Context, req *AgentRequest) (*AgentResponse, error) {
url := fmt.Sprintf("%s/api/agent/run", c.baseURL)
payload, err := json.Marshal(req)
if err != nil {
return nil, err
}
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(payload))
if err != nil {
return nil, err
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("python service error: %s", string(body))
}
var result AgentResponse
if err := json.Unmarshal(body, &result); err != nil {
return nil, err
}
return &result, nil
}
4.4 Handler实现
// internal/handler/agent.go
package handler
import (
"context"
"net/http"
"time"
"github.com/cloudwego/hertz/pkg/app"
"github.com/google/uuid"
"agent-gateway/internal/service"
)
var (
taskQueue *service.TaskQueue
pythonClient *service.PythonClient
)
func InitServices(redisAddr, pythonAddr string) {
taskQueue = service.NewTaskQueue(redisAddr)
pythonClient = service.NewPythonClient(pythonAddr)
// 启动任务处理器
go taskQueue.StartWorker(processAgentTask)
}
func processAgentTask(ctx context.Context, task *service.AgentTask) error {
req := &service.AgentRequest{
AgentType: task.AgentType,
Input: task.Input,
}
resp, err := pythonClient.RunAgent(ctx, req)
if err != nil {
return err
}
if !resp.Success {
return fmt.Errorf(resp.Error)
}
// 存储结果
resultJSON, _ := json.Marshal(resp.Result)
taskQueue.UpdateTaskStatus(ctx, task.TaskID, "completed", string(resultJSON))
return nil
}
// RunAgent 提交Agent任务
func RunAgent(ctx context.Context, c *app.RequestContext) {
var req struct {
AgentType string `json:"agent_type"`
Input map[string]interface{} `json:"input"`
Priority int `json:"priority,omitempty"`
Timeout int `json:"timeout,omitempty"` // 秒
}
if err := c.Bind(&req); err != nil {
c.JSON(http.StatusBadRequest, utils.H{
"code": 400,
"message": "invalid request",
})
return
}
// 生成任务ID
taskID := uuid.New().String()
// 创建任务
task := &service.AgentTask{
TaskID: taskID,
AgentType: req.AgentType,
Input: req.Input,
UserID: c.GetString("user_id"),
Priority: req.Priority,
CreatedAt: time.Now(),
}
if req.Timeout > 0 {
task.Timeout = time.Duration(req.Timeout) * time.Second
}
// 入队
_, err := taskQueue.Enqueue(ctx, task)
if err != nil {
c.JSON(http.StatusInternalServerError, utils.H{
"code": 500,
"message": "failed to enqueue task",
})
return
}
c.JSON(http.StatusOK, utils.H{
"code": 0,
"message": "success",
"data": utils.H{
"task_id": taskID,
"status": "pending",
},
})
}
// GetTaskStatus 查询任务状态
func GetTaskStatus(ctx context.Context, c *app.RequestContext) {
taskID := c.Param("taskId")
status, err := taskQueue.GetTaskStatus(ctx, taskID)
if err != nil {
c.JSON(http.StatusNotFound, utils.H{
"code": 404,
"message": "task not found",
})
return
}
c.JSON(http.StatusOK, utils.H{
"code": 0,
"message": "success",
"data": status,
})
}
// GetTaskResult 获取任务结果
func GetTaskResult(ctx context.Context, c *app.RequestContext) {
taskID := c.Param("taskId")
status, err := taskQueue.GetTaskStatus(ctx, taskID)
if err != nil {
c.JSON(http.StatusNotFound, utils.H{
"code": 404,
"message": "task not found",
})
return
}
if status["status"] != "completed" {
c.JSON(http.StatusBadRequest, utils.H{
"code": 400,
"message": "task not completed",
"data": utils.H{
"status": status["status"],
},
})
return
}
// 解析结果
var result map[string]interface{}
json.Unmarshal([]byte(status["result"]), &result)
c.JSON(http.StatusOK, utils.H{
"code": 0,
"message": "success",
"data": result,
})
}
五、部署与运维
5.1 Docker部署
# Dockerfile
FROM golang:1.22-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o server cmd/server/main.go
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=builder /app/server .
EXPOSE 8080
CMD ["./server"]
5.2 Kubernetes部署
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-gateway
spec:
replicas: 3
selector:
matchLabels:
app: agent-gateway
template:
metadata:
labels:
app: agent-gateway
spec:
containers:
- name: gateway
image: agent-gateway:latest
ports:
- containerPort: 8080
env:
- name: REDIS_ADDR
value: "redis:6379"
- name: PYTHON_SERVICE_ADDR
value: "python-agent:8000"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: agent-gateway
spec:
selector:
app: agent-gateway
ports:
- port: 80
targetPort: 8080
type: LoadBalancer
六、结语
通过本文,你用Go构建了一个生产级的AI Agent任务调度网关:
- ✅ Hertz高性能HTTP服务
- ✅ Redis + Asynq任务队列
- ✅ Python服务客户端
- ✅ 完整的任务生命周期管理
- ✅ Docker + K8s部署
下一篇,我们将实现Go与Python Agent的通信机制。
系列文章导航: ← 5. Python与国产大模型深度集成 6. Go语言实现AI Agent任务调度网关(本文) 7. Go与Python Agent通信机制实现 →
本文首发于CSDN,转载请注明出处。
标签: Go | Hertz | 任务调度 | 网关 | 高并发
更多推荐




所有评论(0)