延时队列
· 阅读需 15 分钟
延时队列(Delayed Queue)是一种支持消息延迟消费的特殊队列,核心特性是:消息入队后不会立即被消费,而是等待指定的延迟时间后,才被推送到消费端处理。它区别于普通队列的“先进先出”,核心是按“延迟时间”排序并触发消费,是分布式系统中处理延时任务的核心组件。
一、延时队列核心概念
1. 核心定义
- 延迟时间:消息的“触发时间”(如“5分钟后执行”或“2024-05-01 10:00:00执行”);
- 触发条件:当系统时间达到消息的延迟时间阈值时,消息才会被消费;
- 有序性:延时队列内部通常按消息的触发时间排序,保证先到期的消息先被消费。
2. 与普通队列的区别
| 特性 | 普通队列 | 延时队列 |
|---|---|---|
| 消费时机 | 入队后立即可消费 | 延迟时间到期后才可消费 |
| 排序规则 | 按入队顺序(FIFO) | 按触发时间排序 |
| 核心场景 | 即时任务处理 | 延时/定时任务处理 |
二、延时队列的典型应用场景
延时队列解决了“非即时、定时/延时处理任务”的核心需求,是分布式系统中不可或缺的组件,典型场景包括:
1. 订单超时处理
- 场景:电商订单创建后,若用户未支付,需在30分钟后自动取消订单、释放库存;
- 实现:订单创建时,向延时队列发送“30分钟后取消订单”的消息,到期后消费消息执行取消逻辑。
2. 消息重试机制
- 场景:接口调用失败时,需按“指数退避策略”重试(如10s后第一次重试、30s后第二次、1分钟后第三次);
- 实现:调用失败后,向延时队列发送带重试时间的消息,到期后重新执行调用逻辑。
3. 定时通知/提醒
- 场景:用户预约会议(如明天10点)、课程提醒(开课前15分钟)、账单到期提醒(到期前3天);
- 实现:将通知消息按提醒时间送入延时队列,到期后推送短信/APP通知。
4. 缓存过期清理
- 场景:自定义缓存(非Redis)需在指定时间后自动清理,释放内存;
- 实现:缓存写入时,向延时队列发送“缓存过期清理”消息,到期后执行删除逻辑。
5. 分布式定时任务
- 场景:替代单机定时任务(如crontab),实现分布式环境下的精准定时任务(如每天凌晨1点统计昨日数据);
- 实现:将定时任务按执行时间送入延时队列,分布式消费节点到期后执行任务。
三、为什么需要延时队列?
手动实现“延时任务”(如轮询数据库)存在诸多问题,延时队列从根本上解决了这些痛点:
1. 避免低效轮询
- 问题:若用“定时轮询数据库”实现订单超时处理,需每秒/每分钟查询所有未支付订单,判断是否超时,大量无效查询占用数据库资源;
- 解决方案:延时队列仅在订单超时时刻触发消费,无无效轮询,资源利用率提升10倍以上。
2. 精准的延时控制
- 问题:轮询方式的延时精度取决于轮询间隔(如轮询间隔1分钟,超时判断误差可达1分钟);
- 解决方案:延时队列可精确到毫秒级触发,满足高精准度的延时需求。
3. 分布式扩展能力
- 问题:单机定时任务(如Linux crontab、Go的time.Ticker)无法横向扩展,单机故障导致任务丢失;
- 解决方案:延时队列支持分布式部署,消费节点可水平扩容,任务可持久化,避免单点故障。
4. 解耦系统逻辑
- 问题:延时任务逻辑与主业务耦合(如订单创建时直接写“30分钟后取消”的定时逻辑),代码冗余且不易维护;
- 解决方案:主业务只需向延时队列发送消息,消费逻辑独立维护,系统解耦。
四、Go语言实现延时队列的三种方案
Go实现延时队列有多种方式,从简单到复杂、从单机到分布式,以下是三种主流方案(按落地成本由低到高):
方案1:基于堆(heap)的单机延时队列(轻量)
Go标准库container/heap提供了堆的实现,可基于“最小堆”(按触发时间排序)实现单机延时队列,核心思路:
- 用最小堆存储待执行的任务,堆顶是最早到期的任务;
- 启动一个协程循环检查堆顶任务,若已到期则执行,否则休眠至到期时间;
- 新任务入队时,插入堆中并重新排序。
完整实现代码
package main
import (
"container/heap"
"fmt"
"sync"
"time"
)
// 延时任务结构体
type DelayedTask struct {
ID string // 任务ID
ExecuteAt time.Time // 执行时间
Payload interface{} // 任务数据
Callback func(interface{}) // 任务执行的回调函数
index int // 堆中的索引(供heap包使用)
}
// 最小堆实现(按ExecuteAt升序)
type TaskHeap []*DelayedTask
// 实现heap.Interface接口
func (h TaskHeap) Len() int { return len(h) }
func (h TaskHeap) Less(i, j int) bool { return h[i].ExecuteAt.Before(h[j].ExecuteAt) }
func (h TaskHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
// Push 向堆中添加元素
func (h *TaskHeap) Push(x interface{}) {
n := len(*h)
task := x.(*DelayedTask)
task.index = n
*h = append(*h, task)
}
// Pop 从堆中取出最小元素(堆顶)
func (h *TaskHeap) Pop() interface{} {
old := *h
n := len(old)
task := old[n-1]
task.index = -1 // 标记为已移除
*h = old[0 : n-1]
return task
}
// DelayedQueue 单机延时队列
type DelayedQueue struct {
mu sync.Mutex
heap TaskHeap
quit chan struct{} // 退出信号
wakeup chan struct{} // 唤醒检查协程的信号
}
// NewDelayedQueue 创建延时队列实例
func NewDelayedQueue() *DelayedQueue {
dq := &DelayedQueue{
heap: make(TaskHeap, 0),
quit: make(chan struct{}),
wakeup: make(chan struct{}, 1), // 带缓冲,避免阻塞
}
heap.Init(&dq.heap)
// 启动任务检查协程
go dq.run()
return dq
}
// run 循环检查堆顶任务,执行到期任务
func (dq *DelayedQueue) run() {
for {
select {
case <-dq.quit:
return
default:
dq.mu.Lock()
// 堆为空,休眠等待唤醒
if dq.heap.Len() == 0 {
dq.mu.Unlock()
<-dq.wakeup
continue
}
// 获取堆顶任务
now := time.Now()
nextTask := dq.heap[0]
// 任务未到期,计算休眠时间
if nextTask.ExecuteAt.After(now) {
sleepDuration := nextTask.ExecuteAt.Sub(now)
dq.mu.Unlock()
// 休眠至任务到期或被新任务唤醒
select {
case <-dq.quit:
return
case <-dq.wakeup:
continue
case <-time.After(sleepDuration):
continue
}
}
// 任务到期,取出并执行
task := heap.Pop(&dq.heap).(*DelayedTask)
dq.mu.Unlock()
// 异步执行回调(避免阻塞队列)
go func(t *DelayedTask) {
t.Callback(t.Payload)
}(task)
}
}
}
// AddTask 添加延时任务
// delay: 延迟时间(如5*time.Minute)
// payload: 任务数据
// callback: 任务执行的回调函数
func (dq *DelayedQueue) AddTask(taskID string, delay time.Duration, payload interface{}, callback func(interface{})) {
dq.mu.Lock()
defer dq.mu.Unlock()
// 创建任务
task := &DelayedTask{
ID: taskID,
ExecuteAt: time.Now().Add(delay),
Payload: payload,
Callback: callback,
}
// 任务入堆
heap.Push(&dq.heap, task)
// 唤醒检查协程(避免协程休眠过久)
select {
case dq.wakeup <- struct{}{}:
default:
// 通道已满,无需重复发送
}
}
// Stop 停止延时队列
func (dq *DelayedQueue) Stop() {
close(dq.quit)
// 唤醒休眠的协程,确保退出
select {
case dq.wakeup <- struct{}{}:
default:
}
}
// 测试示例
func main() {
// 创建延时队列
dq := NewDelayedQueue()
defer dq.Stop()
// 回调函数:执行任务
callback := func(payload interface{}) {
fmt.Printf("[%s] 执行任务:%v\n", time.Now().Format("2006-01-02 15:04:05"), payload)
}
// 添加3个延时任务
fmt.Printf("[%s] 添加延时任务(2秒后执行)\n", time.Now().Format("2006-01-02 15:04:05"))
dq.AddTask("task1", 2*time.Second, "订单123456超时取消", callback)
fmt.Printf("[%s] 添加延时任务(1秒后执行)\n", time.Now().Format("2006-01-02 15:04:05"))
dq.AddTask("task2", 1*time.Second, "短信提醒:课程即将开始", callback)
fmt.Printf("[%s] 添加延时任务(3秒后执行)\n", time.Now().Format("2006-01-02 15:04:05"))
dq.AddTask("task3", 3*time.Second, "缓存key:user_10086过期清理", callback)
// 等待任务执行完成
time.Sleep(4 * time.Second)
}
代码解释
- TaskHeap:基于
container/heap实现最小堆,按任务的ExecuteAt(执行时间)排序,保证堆顶是最早到期的任务; - DelayedQueue:封装堆、互斥锁、退出信号和唤醒信号,保证并发安全;
- run方法:核心协程,循环检查堆顶任务:
- 堆为空时,休眠等待
wakeup信号; - 堆顶任务未到期时,休眠至到期时间(或被新任务唤醒);
- 堆顶任务到期时,取出并异步执行回调;
- 堆为空时,休眠等待
- AddTask方法:添加任务到堆中,并唤醒检查协程(避免协程休眠过久)。
运行结果
[2024-05-01 10:00:00] 添加延时任务(2秒后执行)
[2024-05-01 10:00:00] 添加延时任务(1秒后执行)
[2024-05-01 10:00:00] 添加延时任务(3秒后执行)
[2024-05-01 10:00:01] 执行任务:短信提醒:课程即将开始
[2024-05-01 10:00:02] 执行任务:订单123456超时取消
[2024-05-01 10:00:03] 执行任务:缓存key:user_10086过期清理
方案2:基于Redis的分布式延时队列(中等成本)
单机队列无法满足分布式系统需求,Redis是实现分布式延时队列的主流方案,核心思路:
- 使用Redis的
ZSET(有序集合)存储任务,score为任务的触发时间戳(毫秒); - 消费端定时(如1秒)查询
ZSET中score <= 当前时间戳的任务; - 通过
ZREM原子性取出任务(避免重复消费),执行任务逻辑; - 可选:用Redis的
LPUSH/RPOP做消费确认,保证任务至少执行一次。
Go实现代码(基于Redis)
package main
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
// RedisDelayedQueue 基于Redis的分布式延时队列
type RedisDelayedQueue struct {
client *redis.Client
queueKey string // ZSET键名
processingKey string // 处理中队列(避免重复消费)
ctx context.Context
}
// NewRedisDelayedQueue 创建Redis延时队列
func NewRedisDelayedQueue(addr, password string, db int, queueKey string) *RedisDelayedQueue {
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
})
return &RedisDelayedQueue{
client: client,
queueKey: queueKey,
processingKey: queueKey + "_processing",
ctx: context.Background(),
}
}
// AddTask 添加延时任务
// delay: 延迟时间
// taskID: 任务唯一ID
// payload: 任务数据
func (rdq *RedisDelayedQueue) AddTask(taskID string, delay time.Duration, payload string) error {
// 计算触发时间戳(毫秒)
executeAt := time.Now().Add(delay).UnixMilli()
// 将任务添加到ZSET,score为触发时间戳
return rdq.client.ZAdd(rdq.ctx, rdq.queueKey, redis.Z{
Score: float64(executeAt),
Member: taskID + "|" + payload, // 拼接taskID和payload
}).Err()
}
// Consume 消费任务(阻塞循环)
// interval: 轮询间隔
// handler: 任务处理函数
func (rdq *RedisDelayedQueue) Consume(interval time.Duration, handler func(taskID, payload string) error) error {
for {
// 1. 查询并取出到期的任务(ZRANGEBYSCORE + ZREM 保证原子性)
now := time.Now().UnixMilli()
// 取出score <= now的前10条任务(批量处理)
tasks, err := rdq.client.ZRangeByScore(rdq.ctx, rdq.queueKey, &redis.ZRangeBy{
Min: "0",
Max: fmt.Sprintf("%d", now),
Offset: 0,
Count: 10,
}).Result()
if err != nil {
fmt.Printf("查询到期任务失败:%v\n", err)
time.Sleep(interval)
continue
}
// 2. 处理每个任务
for _, task := range tasks {
// 拆分taskID和payload
sepIndex := -1
for i, c := range task {
if c == '|' {
sepIndex = i
break
}
}
if sepIndex == -1 {
fmt.Printf("任务格式错误:%s\n", task)
// 删除无效任务
rdq.client.ZRem(rdq.ctx, rdq.queueKey, task)
continue
}
taskID := task[:sepIndex]
payload := task[sepIndex+1:]
// 原子性删除任务(避免重复消费)
delCount, err := rdq.client.ZRem(rdq.ctx, rdq.queueKey, task).Result()
if err != nil || delCount == 0 {
continue // 任务已被其他消费节点处理
}
// 3. 执行任务处理函数
if err := handler(taskID, payload); err != nil {
fmt.Printf("处理任务失败:%v,任务ID:%s\n", err, taskID)
// 失败重试:重新添加到队列(指数退避)
rdq.AddTask(taskID, 2*interval, payload)
}
}
// 4. 休眠至下一轮轮询
time.Sleep(interval)
}
}
// 测试示例
func main() {
// 创建Redis延时队列
rdq := NewRedisDelayedQueue("127.0.0.1:6379", "", 0, "delayed_queue")
// 添加任务
err := rdq.AddTask("order_123456", 2*time.Second, "cancel_order|123456")
if err != nil {
panic(err)
}
fmt.Printf("[%s] 添加任务:order_123456(2秒后执行)\n", time.Now().Format("2006-01-02 15:04:05"))
// 启动消费协程
go func() {
rdq.Consume(1*time.Second, func(taskID, payload string) error {
fmt.Printf("[%s] 执行任务:ID=%s, 数据=%s\n", time.Now().Format("2006-01-02 15:04:05"), taskID, payload)
return nil
})
}()
// 等待任务执行
time.Sleep(3 * time.Second)
}
核心要点
- ZSET 存储:利用ZSET的
score排序特性,按触发时间戳存储任务; - 原子性消费:通过
ZRem删除任务,保证同一任务仅被一个消费节点处理; - 失败重试:任务处理失败时,重新添加到队列(可结合指数退避策略);
- 批量处理:每次查询10条任务,提升消费效率。
方案3:基于消息队列(如RabbitMQ/RocketMQ)的延时队列(高可用)
专业消息队列原生支持延时队列,适合高可用、高并发的生产环境:
- RabbitMQ:通过“死信交换机(DLX)+ 过期时间(TTL)”实现;
- RocketMQ:原生支持延时消息(固定延时级别:1s/5s/10s/30s/1m/5m等);
- Kafka:通过时间轮算法或外部组件(如Kafka Connect)实现。
RabbitMQ实现思路(Go示例)
- 创建普通队列A,设置TTL(消息过期时间)和死信交换机DLX;
- 消息发送到队列A,过期后自动转发到死信队列B;
- 消费端监听死信队列B,处理延时任务。
package main
import (
"context"
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
// 连接RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
// 1. 声明死信交换机和死信队列
dlxExchange := "delayed_exchange"
dlxQueue := "delayed_queue"
// 声明死信交换机
err = ch.ExchangeDeclare(
dlxExchange,
"direct",
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil,
)
// 声明死信队列
dlxQueueArgs := amqp.Table{}
dlxQueue, err := ch.QueueDeclare(
dlxQueue,
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
dlxQueueArgs,
)
// 绑定死信队列到死信交换机
err = ch.QueueBind(
dlxQueue.Name,
"delayed_key",
dlxExchange,
false,
nil,
)
// 2. 声明普通队列(带TTL和死信配置)
normalQueue := "normal_queue"
normalQueueArgs := amqp.Table{
"x-dead-letter-exchange": dlxExchange, // 死信交换机
"x-dead-letter-routing-key": "delayed_key", // 死信路由键
"x-message-ttl": 2000, // 消息过期时间(2秒,单位毫秒)
}
_, err = ch.QueueDeclare(
normalQueue,
true,
false,
false,
false,
normalQueueArgs,
)
// 3. 发送延时消息(到普通队列)
msg := "cancel_order|123456"
err = ch.PublishWithContext(
context.Background(),
"", // 交换机
normalQueue, // 队列名
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
},
)
fmt.Printf("[%s] 发送延时消息:%s\n", time.Now().Format("2006-01-02 15:04:05"), msg)
// 4. 消费死信队列(处理延时任务)
msgs, err := ch.Consume(
dlxQueue.Name,
"",
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil,
)
go func() {
for d := range msgs {
fmt.Printf("[%s] 执行延时任务:%s\n", time.Now().Format("2006-01-02 15:04:05"), string(d.Body))
}
}()
// 等待任务执行
time.Sleep(3 * time.Second)
}
五、三种实现方案对比与选型建议
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 基于堆(单机) | 轻量、无依赖、精准延时 | 不支持分布式、无持久化、单机故障丢失 | 单机、轻量、临时延时任务 |
| 基于Redis | 分布式、易扩展、成本低 | 轮询有轻微延时、依赖Redis | 分布式、中小规模延时任务 |
| 基于MQ | 高可用、高并发、原生支持 | 部署复杂、成本高(如RabbitMQ需维护) | 生产环境、高可用、高并发场景 |
选型原则
- 单机场景:优先选“基于堆”的实现,简单高效;
- 分布式小规模:选Redis实现,成本低、易扩展;
- 生产环境大规模:选RabbitMQ/RocketMQ,保证高可用和高并发。
总结
- 延时队列的核心是按触发时间延迟消费消息,解决了定时/延时任务的精准、高效处理问题;
- 典型应用包括订单超时、消息重试、定时通知、缓存清理等,核心价值是避免轮询、精准延时、解耦系统;
- Go实现延时队列有三种主流方案:
- 单机场景用
container/heap实现最小堆,轻量无依赖; - 分布式中小规模用Redis ZSET,成本低易扩展;
- 生产环境大规模用RabbitMQ/RocketMQ,保证高可用。
- 单机场景用
- 选型时需结合“分布式需求、可用性要求、部署成本”综合考虑,优先选择成熟的中间件(Redis/MQ)而非重复造轮子。
