发布于 

基于 Redis 的通用接口排队服务 🪜

对于计算密集型的业务接口,使用排队机制可以有效地管理请求并避免系统过载

核心作用

  1. 限流限速保护:排队服务会控制请求进入业务接口的速率,确保不会超过业务接口的 QPS 上限,从而避免业务接口被过多请求压垮
  2. 请求排队:公平排队,所有请求都会按顺序先入队,之后在允许的范围内逐个进入业务接口,获取响应(支持在线和离线)
  3. 优先级管理:如 VIP 用户可以在任务队列中优先执行(即“插队”),可以根据任务的优先级来动态设置,从而控制任务的处理顺序
  4. 超时和失败处理:对于排队超时的请求,排队服务可以返回超时错误或直接丢弃,避免请求长时间占用系统资源。此外,当业务接口因故障或流量过高无法响应时,排队服务也可以处理失败请求,防止服务完全不可用

时序图

sequenceDiagram
    participant User as 前端
    participant QueueService as 排队服务
    participant Redis as Redis 队列
    participant Business as 业务接口

    User->>QueueService: submitTask(用户提交任务)
    QueueService->>Redis: 将任务存入队列
    Note right of Redis: 队列存储任务
    loop 控制QPS
        QueueService->>Redis: 从队列取出任务
        alt 未超出QPS
            QueueService->>Business: 调用业务接口
            Business-->>QueueService: 返回处理结果
            QueueService->>Redis: 更新任务状态和结果数据
        else 超出QPS
            QueueService->>Redis: 保持任务在队列中,并退避等待
        end
    end
    User->>QueueService: queryTask (查询任务状态)
    QueueService->>Redis: 获取任务状态和结果数据
    Redis-->>QueueService: 返回任务状态和结果数据
    QueueService-->>User: 返回任务状态和结果

技术栈

  • Go-zero + Redis + MySQL
  • Redis
    • List:用于任务队列(公平排队)
    • Sorted Set(ZSet):用于任务优先级管理
    • Hash:用于存储任务状态和执行结果,不支持为一个元素设置过期时间
    • String:用于存储任务状态和执行结果
    • TTL:过期时间管理
    • Sorted SetString:用于全局计数器(限流限速)

详细设计

优先级队列

  • 采用 Redis 的 ZSet 数据结构来实现优先级队列
  • ZSet 内部维护了元素的 score,并且它是按 score 排序的
  • 定义优先级
    • 优先级与 score 关联:通常,score 值越小,优先级越高
    • score = -priority * 1e9 + timestamp,其中 priority 根据用户类型进行设置
    • timestamp 仍然采用时间戳来确保相同优先级的任务按照提交时间排序(FIFO)
  • ZSet 中只保存任务的 TaskId,任务的详细数据序列化后保存在 String

任务状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type TaskRequestDetails struct {
TargetURL string `json:"target_url"` // 业务接口 URL
ExtractedPath string `json:"extracted_path"` // 提取的路径
HTTPMethod string `json:"http_method"` // HTTP 方法
Headers map[string][]string `json:"headers"` // 请求头
RequestBody string `json:"request_body"` // 请求体
QueryParams url.Values `json:"query_params"` // 路由参数或查询参数
}

type TaskExecutionStatus struct {
Status string `json:"status"` // 任务的当前状态(如 "pending", "processing", "completed", "failed" 等)
Result string `json:"result"` // 接口的响应内容,包括成功结果或错误信息,作为 JSON 字符串存储
CreatedAt time.Time `json:"created_at"` // 任务的创建时间,即任务被添加到队列的时间
StartedAt time.Time `json:"started_at"` // 任务的开始时间,即任务正式处理的时间
EndedAt time.Time `json:"ended_at"` // 任务的结束时间,任务完成或失败的时间
}

type Task struct {
UserId int64 `json:"user_id"` // 用户 ID
TaskId string `json:"taskID"` // 任务的唯一标识符
TaskQueueKey string `json:"task_queue_Key"` // 任务队列的唯一标识符
Priority int64 `json:"priority"` // 任务的优先级
Type string `json:"type"` // 任务类型,如果是 offline 类型的任务,需要保存任务请求信息
Request TaskRequestDetails `json:"task_request"` // 任务请求信息
Execution TaskExecutionStatus `json:"task_status"` // 任务状态信息
}
  • 支持在线和离线排队任务,在线类型排队任务只需要 TargetURL 参数用于队列 key 的构建
  • 因为在线排队是给到一个前端向目标接口发起调用的许可,需要做好防止用户跳过排队直接向目标接口发起请求的措施(目前是在网关的鉴权服务做检查,前端需要把 taskId 加入请求头)

任务执行

  • 难点分析
    • 如何监听多个动态变化的 ZSet 任务队列(一个业务接口对应一个队列)
  • 解决方案
    • 使用 sync.Map 来存储当前的队列,并且在队列动态增加时,通过信号或事件通知来更新监听的队列
      • 在服务启动时,从 Redis 中加载一次所有的任务队列 Keys,并将它们添加到 sync.Map
      • 在服务运行时,使用 sync.Map 来维护当前正在监听的任务队列,避免持续的查询
    • 通过 channel 通知协程有新的队列加入或队列删除
    • 使用 BZPOPMIN 来阻塞并从 ZSet 中取出任务

接口限速

  • 采用滑动窗口的限流策略(Sorted Set + Lua 脚本)
    • 利用 Redis 的有序集合(Sorted Set)数据结构来记录请求的时间戳
    • 并通过计算当前窗口内的请求数来进行限流
    • interval 内允许 rate 个请求(如 1s 内允许 300 个请求
  • 采用计数法的限流策略(String
    • 每个接口使用一个独立的 Redis String 键来计数
    • 键名格式可以设置为 rate_limit:<endpoint>,例如,rate_limit:/api/generate_imagerate_limit:/api/generate_video
    • 通过 Redis 的 INCR 命令对每个键的值进行计数,并设置一个过期时间(TTL),当过期后自动重置计数,实现滑动时间窗口的效果
  • 触发限流后的指数退避方案
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
    // 触发限流
if limited {
// 任务放回队列
_, err := z.Redis.ZAdd(ctx, task.TaskQueueKey, redis.Z{
Score: res.Score,
Member: res.Member,
}).Result()
if err != nil {
logx.Errorf("触发限流,任务还原失败: %v,被丢弃", err)
// 从任务状态 String 中删除任务状态,防止状态泄露
z.Redis.Del(ctx, taskKey)
continue
}
retryCount := 0
maxRetryCount := 5 // 最大重试次数
baseWaitTime := time.Second
for retryCount < maxRetryCount {
// 计算指数退避时间,最大为 TaskInterval
backoffTime := time.Duration(float64(baseWaitTime) * math.Pow(2, float64(retryCount)))
if backoffTime > TaskInterval {
backoffTime = TaskInterval
}
// 退避一段时间,等待下一个窗口
logx.Infof("限流触发,等待 %v 后重试,任务: %s", backoffTime, taskKey)
time.Sleep(backoffTime)
// 重新检查限流状态
//limited, err = z.RateLimiter.LimitDefault(ctx, rateLimitedKey)
now := time.Now()
limited, err = z.RateLimiter.Limit(ctx, rateLimitedKey, interval, rate, now)
logx.Infof("重新检查限流状态: %v", limited)
if err != nil {
logx.Errorf("限流判断失败: %v", err)
break
}
// 如果不再限流,退出重试循环
// 这边要注意,就算不再限流了,外层可能会因此又触发限流
if !limited {
// 删除内层循环检查限流为 false 情况时产生的元素
z.Redis.ZRem(ctx, rateLimitedKey, now.UnixMilli())
break
}
// 增加重试次数,准备下一次退避
retryCount++
}
continue
}
go func(task Task) {
// 执行对应的任务
err = z.Execute(&task)
if err != nil {
logx.Errorf("执行任务失败: %v", err)
}
}(task)
}
  • 针对接口的限速策略 - 通过查询接口配置表

生命周期

graph TD
    A[任务提交] -->|写入| B[Redis: pending 状态]
    B -->|过期时间 3h| E[任务自动过期]
    B -->|任务执行完成| C[任务结果写入]
    C -->|写入成功| D[Redis: completed 状态]
    D -->|过期时间缩短至 1h| F[Redis 自动过期]
    C -->|写入| G[MySQL: 持久化存储]
    G -->|保留 3 天| H[MySQL 定期清理]
    D -->|用户查询结果| I[Redis: 缩短过期时间 10s]
    I -->|Redis 过期后| J[删除 Redis 数据]
    I -->|用户查询后立即| K[删除 MySQL 数据]
    J -->|MySQL 数据保留 3d| H
    H -->|批量删除| L[清理过期 MySQL 数据]

Redis 和 MySQL 结合使用,并且通过设置合理的过期时间来控制任务数据的生命周期,在实际应用中,过期时间的管理和删除策略需要特别注意性能和空间管理,以确保系统不会因为过期数据而导致性能下降或资源浪费

  • 任务提交后
    • Redis String 的过期时间设置为 3h
  • 任务执行后,结果和状态写入 Redis String 和 MySQL 中
    • Redis String 的对应元素过期时间缩短为 1h
    • MySQL 只保留任务结果数据 3d
  • 用户查询到任务
    • 若任务还未执行完毕,正常返回状态
    • 若任务已经执行完毕
      • Redis 中存在
        • Redis String 的对应元素的过期时间缩短为 20s(允许短时间内的重试,还有网关的插队检查)
        • 可立即删除 MySQL 中的任务数据,减少存储压力
      • Redis 中不存在,查询 MySQL,异步延迟 10s 后删除对应记录
  • 用户如果已离线,不再查询任务结果
    • Redis String 如期自动过期删除
    • MySQL 定时检查删除保留超过 3 天的任务数据
      • 索引优化,created_at 加索引,避免全表扫描
      • 批量删除,如果数据量较大,采用批量删除的方式,避免锁表和性能压力

接口信息配置

表格里填写需要排队的接口信息

并发数为 C,表示系统当前可以同时处理的请求数

执行时间为 T,表示每个请求的平均处理时间(秒)

QPS为 Q,表示系统每秒钟能够处理的请求数,Q = C / T

用户预计排队等待时间 = 待处理任务数 / QPS × 任务执行时间

对外接口

提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SubmitReq {
TargetUrl string `json:"target_url"` // 目标接口地址,完整的 url
HttpMethod string `json:"http_method"` // 请求方法,目前只考虑了 GET POST
Body string `json:"body"` // post 请求体 json 字符串,注意是带转义字符的字符串
Priority int64 `json:"priority"` // 优先级 > 0,数字越大优先级越高,例如,普通用户 0 VIP 2 SVIP 3
Timestamp int64 `json:"timestamp"` // 请求时间戳
TaskType string `json:"task_type"` // "online" 在线排队 "offline" 离线排队
}
SubmitResp {
TaskId string `json:"task_id"` // 任务Id
TaskStatus string `json:"task_status"` // 任务状态
CreatedAt string `json:"created_at"` // 任务创建时间
WaitNum int64 `json:"wait_num"` // 排队人数
}

查询任务

1
2
3
4
5
6
7
8
9
10
11
12
StatusReq {
TaskId string `form:"task_id"` // 任务Id
}
StatusResp {
TaskStatus string `json:"task_status"` // 任务状态
TaskResult string `json:"task_result"` // 任务结果,对应接口的响应体 json
WaitNum int64 `json:"wait_num"` // 排队人数
WaitTime int64 `json:"wait_time"` // 预计等待时间
CreatedAt string `json:"created_at"` // 任务创建时间
StartedAt string `json:"started_at"` // 任务开始时间
EndedAt string `json:"ended_at"` // 任务结束时间
}

取消任务

1
2
3
4
5
6
7
CancelReq {
TaskId string `json:"task_id"` // 任务Id
}
CancelResp {
TaskId string `json:"task_id"` // 任务Id
TaskStatus string `json:"task_status"` // 任务状态
}

安全考虑

  • Priority 可能会被用户篡改:采用前后端请求体对称加密
  • 跳过排队,直接访问目标接口:请求染色,配合网关服务做鉴权