系列第6篇:Python+Go构建企业级AI Agent实战指南(6/13)

标签: Go | Hertz | 任务调度 | 网关 | 高并发


一、开篇:为什么用Go做Agent基础设施?

Python是AI的"母语",但在企业级部署中,Go才是基础设施的"王者"。

字节跳动的实践数据:

  • 40%的微服务采用Go开发
  • AI服务网关全部使用Go
  • 单机QPS提升3-5倍

Go的核心优势:

  1. 高并发:goroutine轻松处理百万连接
  2. 低延迟:GC优化后P99延迟<10ms
  3. 部署简单:单二进制文件,无依赖
  4. 云原生:Kubernetes原生支持

本文将用Go构建一个生产级的AI Agent任务调度网关。


二、技术选型:Hertz框架

2.1 为什么选择Hertz?

Hertz是字节跳动开源的高性能Go HTTP框架:

  • 性能:比Gin快30%
  • 生态:内置中间件、协议支持完善
  • 生产验证:抖音、今日头条都在用

2.2 架构设计

┌─────────────────────────────────────────────────────────────┐
│                    Agent任务调度网关                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  客户端请求 → Hertz Router → 中间件链 → Handler → Python服务  │
│                  ↓              ↓         ↓                  │
│            路由分发         认证/限流    任务队列              │
│            负载均衡         日志/监控    结果聚合              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

三、项目初始化

3.1 创建项目

# 创建项目目录
mkdir -p agent-gateway
cd agent-gateway

# 初始化Go模块
go mod init agent-gateway

# 安装依赖
go get github.com/cloudwego/hertz@latest
go get github.com/cloudwego/kitex@latest
go get github.com/redis/go-redis/v9
go get github.com/hibiken/asynq

3.2 项目结构

agent-gateway/
├── cmd/
│   └── server/
│       └── main.go          # 入口
├── internal/
│   ├── handler/
│   │   ├── agent.go         # Agent请求处理
│   │   └── health.go        # 健康检查
│   ├── middleware/
│   │   ├── auth.go          # 认证
│   │   ├── rate_limit.go    # 限流
│   │   └── logger.go        # 日志
│   ├── service/
│   │   ├── task_queue.go    # 任务队列
│   │   └── python_client.go # Python服务客户端
│   └── config/
│       └── config.go        # 配置
├── pkg/
│   └── utils/
│       └── response.go      # 响应工具
├── go.mod
└── go.sum

四、核心代码实现

4.1 主入口

// cmd/server/main.go
package main

import (
	"context"
	"log"

	"github.com/cloudwego/hertz/pkg/app"
	"github.com/cloudwego/hertz/pkg/app/server"
	"github.com/cloudwego/hertz/pkg/common/utils"
	"github.com/cloudwego/hertz/pkg/protocol/consts"
	
	"agent-gateway/internal/handler"
	"agent-gateway/internal/middleware"
)

func main() {
	// 创建Hertz服务器
	h := server.Default(
		server.WithHostPorts("0.0.0.0:8080"),
	)

	// 注册中间件
	h.Use(middleware.Logger())
	h.Use(middleware.Recovery())
	h.Use(middleware.CORS())
	h.Use(middleware.RateLimiter(100, 200)) // 100 QPS, burst 200

	// 注册路由
	registerRoutes(h)

	log.Println("🚀 Agent Gateway starting on :8080")
	h.Spin()
}

func registerRoutes(h *server.Hertz) {
	// 健康检查
	h.GET("/health", handler.HealthCheck)
	
	// API v1
	v1 := h.Group("/api/v1")
	{
		v1.Use(middleware.Auth())
		
		// Agent相关
		v1.POST("/agent/run", handler.RunAgent)
		v1.GET("/agent/status/:taskId", handler.GetTaskStatus)
		v1.GET("/agent/result/:taskId", handler.GetTaskResult)
		
		// 批量任务
		v1.POST("/agent/batch", handler.BatchRun)
		
		// 任务管理
		v1.DELETE("/agent/task/:taskId", handler.CancelTask)
		v1.GET("/agent/tasks", handler.ListTasks)
	}
	
	// WebSocket实时通信
	h.GET("/ws/agent", handler.AgentWebSocket)
}

4.2 任务队列服务

// internal/service/task_queue.go
package service

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/hibiken/asynq"
	"github.com/redis/go-redis/v9"
)

const (
	QueueDefault = "default"
	QueueHigh    = "high"
	QueueLow     = "low"
)

type TaskType string

const (
	TaskTypeAgentRun   TaskType = "agent:run"
	TaskTypeAgentBatch TaskType = "agent:batch"
)

type TaskQueue struct {
	client *asynq.Client
	server *asynq.Server
	redis  *redis.Client
}

type AgentTask struct {
	TaskID      string                 `json:"task_id"`
	AgentType   string                 `json:"agent_type"`
	Input       map[string]interface{} `json:"input"`
	UserID      string                 `json:"user_id"`
	Priority    int                    `json:"priority"`
	CreatedAt   time.Time              `json:"created_at"`
	Timeout     time.Duration          `json:"timeout"`
}

func NewTaskQueue(redisAddr string) *TaskQueue {
	redisOpt := asynq.RedisClientOpt{Addr: redisAddr}
	
	client := asynq.NewClient(redisOpt)
	
	server := asynq.NewServer(
		redisOpt,
		asynq.Config{
			Concurrency: 10,
			Queues: map[string]int{
				QueueHigh:    6,
				QueueDefault: 3,
				QueueLow:     1,
			},
		},
	)
	
	rdb := redis.NewClient(&redis.Options{
		Addr: redisAddr,
	})
	
	return &TaskQueue{
		client: client,
		server: server,
		redis:  rdb,
	}
}

func (tq *TaskQueue) Enqueue(ctx context.Context, task *AgentTask) (string, error) {
	payload, err := json.Marshal(task)
	if err != nil {
		return "", err
	}
	
	// 选择队列
	queue := QueueDefault
	if task.Priority > 5 {
		queue = QueueHigh
	} else if task.Priority < 3 {
		queue = QueueLow
	}
	
	// 创建asynq任务
	asynqTask := asynq.NewTask(string(TaskTypeAgentRun), payload)
	
	// 设置选项
	opts := []asynq.Option{
		asynq.Queue(queue),
		asynq.TaskID(task.TaskID),
		asynq.Retention(24 * time.Hour),
	}
	
	if task.Timeout > 0 {
		opts = append(opts, asynq.Timeout(task.Timeout))
	}
	
	info, err := tq.client.EnqueueContext(ctx, asynqTask, opts...)
	if err != nil {
		return "", err
	}
	
	// 存储任务元数据
	taskKey := fmt.Sprintf("task:%s", task.TaskID)
	tq.redis.HSet(ctx, taskKey, map[string]interface{}{
		"status":     "pending",
		"created_at": task.CreatedAt.Format(time.RFC3339),
		"agent_type": task.AgentType,
		"user_id":    task.UserID,
	})
	tq.redis.Expire(ctx, taskKey, 24*time.Hour)
	
	return info.ID, nil
}

func (tq *TaskQueue) GetTaskStatus(ctx context.Context, taskID string) (map[string]string, error) {
	taskKey := fmt.Sprintf("task:%s", taskID)
	result, err := tq.redis.HGetAll(ctx, taskKey).Result()
	if err != nil {
		return nil, err
	}
	
	if len(result) == 0 {
		return nil, fmt.Errorf("task not found: %s", taskID)
	}
	
	return result, nil
}

func (tq *TaskQueue) UpdateTaskStatus(ctx context.Context, taskID, status, result string) error {
	taskKey := fmt.Sprintf("task:%s", taskID)
	
	data := map[string]interface{}{
		"status":     status,
		"updated_at": time.Now().Format(time.RFC3339),
	}
	
	if result != "" {
		data["result"] = result
	}
	
	return tq.redis.HSet(ctx, taskKey, data).Err()
}

func (tq *TaskQueue) StartWorker(handler func(context.Context, *AgentTask) error) error {
	mux := asynq.NewServeMux()
	
	mux.HandleFunc(string(TaskTypeAgentRun), func(ctx context.Context, t *asynq.Task) error {
		var task AgentTask
		if err := json.Unmarshal(t.Payload(), &task); err != nil {
			return err
		}
		
		// 更新状态为运行中
		tq.UpdateTaskStatus(ctx, task.TaskID, "running", "")
		
		// 执行任务
		err := handler(ctx, &task)
		
		// 更新状态
		if err != nil {
			tq.UpdateTaskStatus(ctx, task.TaskID, "failed", err.Error())
		} else {
			tq.UpdateTaskStatus(ctx, task.TaskID, "completed", "")
		}
		
		return err
	})
	
	return tq.server.Run(mux)
}

4.3 Python服务客户端

// internal/service/python_client.go
package service

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"time"
)

type PythonClient struct {
	baseURL    string
	httpClient *http.Client
}

type AgentRequest struct {
	AgentType string                 `json:"agent_type"`
	Input     map[string]interface{} `json:"input"`
	Context   map[string]interface{} `json:"context,omitempty"`
}

type AgentResponse struct {
	Success bool                   `json:"success"`
	Result  map[string]interface{} `json:"result,omitempty"`
	Error   string                 `json:"error,omitempty"`
}

func NewPythonClient(baseURL string) *PythonClient {
	return &PythonClient{
		baseURL: baseURL,
		httpClient: &http.Client{
			Timeout: 120 * time.Second,
		},
	}
}

func (c *PythonClient) RunAgent(ctx context.Context, req *AgentRequest) (*AgentResponse, error) {
	url := fmt.Sprintf("%s/api/agent/run", c.baseURL)
	
	payload, err := json.Marshal(req)
	if err != nil {
		return nil, err
	}
	
	httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(payload))
	if err != nil {
		return nil, err
	}
	
	httpReq.Header.Set("Content-Type", "application/json")
	
	resp, err := c.httpClient.Do(httpReq)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	
	body, err := io.ReadAll(resp.Body)
	if err != nil {
		return nil, err
	}
	
	if resp.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("python service error: %s", string(body))
	}
	
	var result AgentResponse
	if err := json.Unmarshal(body, &result); err != nil {
		return nil, err
	}
	
	return &result, nil
}

4.4 Handler实现

// internal/handler/agent.go
package handler

import (
	"context"
	"net/http"
	"time"

	"github.com/cloudwego/hertz/pkg/app"
	"github.com/google/uuid"
	
	"agent-gateway/internal/service"
)

var (
	taskQueue     *service.TaskQueue
	pythonClient  *service.PythonClient
)

func InitServices(redisAddr, pythonAddr string) {
	taskQueue = service.NewTaskQueue(redisAddr)
	pythonClient = service.NewPythonClient(pythonAddr)
	
	// 启动任务处理器
	go taskQueue.StartWorker(processAgentTask)
}

func processAgentTask(ctx context.Context, task *service.AgentTask) error {
	req := &service.AgentRequest{
		AgentType: task.AgentType,
		Input:     task.Input,
	}
	
	resp, err := pythonClient.RunAgent(ctx, req)
	if err != nil {
		return err
	}
	
	if !resp.Success {
		return fmt.Errorf(resp.Error)
	}
	
	// 存储结果
	resultJSON, _ := json.Marshal(resp.Result)
	taskQueue.UpdateTaskStatus(ctx, task.TaskID, "completed", string(resultJSON))
	
	return nil
}

// RunAgent 提交Agent任务
func RunAgent(ctx context.Context, c *app.RequestContext) {
	var req struct {
		AgentType string                 `json:"agent_type"`
		Input     map[string]interface{} `json:"input"`
		Priority  int                    `json:"priority,omitempty"`
		Timeout   int                    `json:"timeout,omitempty"` // 秒
	}
	
	if err := c.Bind(&req); err != nil {
		c.JSON(http.StatusBadRequest, utils.H{
			"code":    400,
			"message": "invalid request",
		})
		return
	}
	
	// 生成任务ID
	taskID := uuid.New().String()
	
	// 创建任务
	task := &service.AgentTask{
		TaskID:    taskID,
		AgentType: req.AgentType,
		Input:     req.Input,
		UserID:    c.GetString("user_id"),
		Priority:  req.Priority,
		CreatedAt: time.Now(),
	}
	
	if req.Timeout > 0 {
		task.Timeout = time.Duration(req.Timeout) * time.Second
	}
	
	// 入队
	_, err := taskQueue.Enqueue(ctx, task)
	if err != nil {
		c.JSON(http.StatusInternalServerError, utils.H{
			"code":    500,
			"message": "failed to enqueue task",
		})
		return
	}
	
	c.JSON(http.StatusOK, utils.H{
		"code":    0,
		"message": "success",
		"data": utils.H{
			"task_id": taskID,
			"status":  "pending",
		},
	})
}

// GetTaskStatus 查询任务状态
func GetTaskStatus(ctx context.Context, c *app.RequestContext) {
	taskID := c.Param("taskId")
	
	status, err := taskQueue.GetTaskStatus(ctx, taskID)
	if err != nil {
		c.JSON(http.StatusNotFound, utils.H{
			"code":    404,
			"message": "task not found",
		})
		return
	}
	
	c.JSON(http.StatusOK, utils.H{
		"code":    0,
		"message": "success",
		"data":    status,
	})
}

// GetTaskResult 获取任务结果
func GetTaskResult(ctx context.Context, c *app.RequestContext) {
	taskID := c.Param("taskId")
	
	status, err := taskQueue.GetTaskStatus(ctx, taskID)
	if err != nil {
		c.JSON(http.StatusNotFound, utils.H{
			"code":    404,
			"message": "task not found",
		})
		return
	}
	
	if status["status"] != "completed" {
		c.JSON(http.StatusBadRequest, utils.H{
			"code":    400,
			"message": "task not completed",
			"data": utils.H{
				"status": status["status"],
			},
		})
		return
	}
	
	// 解析结果
	var result map[string]interface{}
	json.Unmarshal([]byte(status["result"]), &result)
	
	c.JSON(http.StatusOK, utils.H{
		"code":    0,
		"message": "success",
		"data":    result,
	})
}

五、部署与运维

5.1 Docker部署

# Dockerfile
FROM golang:1.22-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o server cmd/server/main.go

FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/

COPY --from=builder /app/server .

EXPOSE 8080
CMD ["./server"]

5.2 Kubernetes部署

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-gateway
spec:
  replicas: 3
  selector:
    matchLabels:
      app: agent-gateway
  template:
    metadata:
      labels:
        app: agent-gateway
    spec:
      containers:
      - name: gateway
        image: agent-gateway:latest
        ports:
        - containerPort: 8080
        env:
        - name: REDIS_ADDR
          value: "redis:6379"
        - name: PYTHON_SERVICE_ADDR
          value: "python-agent:8000"
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
  name: agent-gateway
spec:
  selector:
    app: agent-gateway
  ports:
  - port: 80
    targetPort: 8080
  type: LoadBalancer

六、结语

通过本文,你用Go构建了一个生产级的AI Agent任务调度网关:

  1. ✅ Hertz高性能HTTP服务
  2. ✅ Redis + Asynq任务队列
  3. ✅ Python服务客户端
  4. ✅ 完整的任务生命周期管理
  5. ✅ Docker + K8s部署

下一篇,我们将实现Go与Python Agent的通信机制。


系列文章导航: ← 5. Python与国产大模型深度集成 6. Go语言实现AI Agent任务调度网关(本文) 7. Go与Python Agent通信机制实现 →


本文首发于CSDN,转载请注明出处。

标签: Go | Hertz | 任务调度 | 网关 | 高并发

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐