跳到主要内容

延时队列

· 阅读需 15 分钟
ahKevinXy
作者

延时队列(Delayed Queue)是一种支持消息延迟消费的特殊队列,核心特性是:消息入队后不会立即被消费,而是等待指定的延迟时间后,才被推送到消费端处理。它区别于普通队列的“先进先出”,核心是按“延迟时间”排序并触发消费,是分布式系统中处理延时任务的核心组件。

一、延时队列核心概念

1. 核心定义

  • 延迟时间:消息的“触发时间”(如“5分钟后执行”或“2024-05-01 10:00:00执行”);
  • 触发条件:当系统时间达到消息的延迟时间阈值时,消息才会被消费;
  • 有序性:延时队列内部通常按消息的触发时间排序,保证先到期的消息先被消费。

2. 与普通队列的区别

特性普通队列延时队列
消费时机入队后立即可消费延迟时间到期后才可消费
排序规则按入队顺序(FIFO)按触发时间排序
核心场景即时任务处理延时/定时任务处理

二、延时队列的典型应用场景

延时队列解决了“非即时、定时/延时处理任务”的核心需求,是分布式系统中不可或缺的组件,典型场景包括:

1. 订单超时处理

  • 场景:电商订单创建后,若用户未支付,需在30分钟后自动取消订单、释放库存;
  • 实现:订单创建时,向延时队列发送“30分钟后取消订单”的消息,到期后消费消息执行取消逻辑。

2. 消息重试机制

  • 场景:接口调用失败时,需按“指数退避策略”重试(如10s后第一次重试、30s后第二次、1分钟后第三次);
  • 实现:调用失败后,向延时队列发送带重试时间的消息,到期后重新执行调用逻辑。

3. 定时通知/提醒

  • 场景:用户预约会议(如明天10点)、课程提醒(开课前15分钟)、账单到期提醒(到期前3天);
  • 实现:将通知消息按提醒时间送入延时队列,到期后推送短信/APP通知。

4. 缓存过期清理

  • 场景:自定义缓存(非Redis)需在指定时间后自动清理,释放内存;
  • 实现:缓存写入时,向延时队列发送“缓存过期清理”消息,到期后执行删除逻辑。

5. 分布式定时任务

  • 场景:替代单机定时任务(如crontab),实现分布式环境下的精准定时任务(如每天凌晨1点统计昨日数据);
  • 实现:将定时任务按执行时间送入延时队列,分布式消费节点到期后执行任务。

三、为什么需要延时队列?

手动实现“延时任务”(如轮询数据库)存在诸多问题,延时队列从根本上解决了这些痛点:

1. 避免低效轮询

  • 问题:若用“定时轮询数据库”实现订单超时处理,需每秒/每分钟查询所有未支付订单,判断是否超时,大量无效查询占用数据库资源;
  • 解决方案:延时队列仅在订单超时时刻触发消费,无无效轮询,资源利用率提升10倍以上。

2. 精准的延时控制

  • 问题:轮询方式的延时精度取决于轮询间隔(如轮询间隔1分钟,超时判断误差可达1分钟);
  • 解决方案:延时队列可精确到毫秒级触发,满足高精准度的延时需求。

3. 分布式扩展能力

  • 问题:单机定时任务(如Linux crontab、Go的time.Ticker)无法横向扩展,单机故障导致任务丢失;
  • 解决方案:延时队列支持分布式部署,消费节点可水平扩容,任务可持久化,避免单点故障。

4. 解耦系统逻辑

  • 问题:延时任务逻辑与主业务耦合(如订单创建时直接写“30分钟后取消”的定时逻辑),代码冗余且不易维护;
  • 解决方案:主业务只需向延时队列发送消息,消费逻辑独立维护,系统解耦。

四、Go语言实现延时队列的三种方案

Go实现延时队列有多种方式,从简单到复杂、从单机到分布式,以下是三种主流方案(按落地成本由低到高):

方案1:基于堆(heap)的单机延时队列(轻量)

Go标准库container/heap提供了堆的实现,可基于“最小堆”(按触发时间排序)实现单机延时队列,核心思路:

  1. 用最小堆存储待执行的任务,堆顶是最早到期的任务;
  2. 启动一个协程循环检查堆顶任务,若已到期则执行,否则休眠至到期时间;
  3. 新任务入队时,插入堆中并重新排序。

完整实现代码

package main

import (
"container/heap"
"fmt"
"sync"
"time"
)

// 延时任务结构体
type DelayedTask struct {
ID string // 任务ID
ExecuteAt time.Time // 执行时间
Payload interface{} // 任务数据
Callback func(interface{}) // 任务执行的回调函数
index int // 堆中的索引(供heap包使用)
}

// 最小堆实现(按ExecuteAt升序)
type TaskHeap []*DelayedTask

// 实现heap.Interface接口
func (h TaskHeap) Len() int { return len(h) }
func (h TaskHeap) Less(i, j int) bool { return h[i].ExecuteAt.Before(h[j].ExecuteAt) }
func (h TaskHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}

// Push 向堆中添加元素
func (h *TaskHeap) Push(x interface{}) {
n := len(*h)
task := x.(*DelayedTask)
task.index = n
*h = append(*h, task)
}

// Pop 从堆中取出最小元素(堆顶)
func (h *TaskHeap) Pop() interface{} {
old := *h
n := len(old)
task := old[n-1]
task.index = -1 // 标记为已移除
*h = old[0 : n-1]
return task
}

// DelayedQueue 单机延时队列
type DelayedQueue struct {
mu sync.Mutex
heap TaskHeap
quit chan struct{} // 退出信号
wakeup chan struct{} // 唤醒检查协程的信号
}

// NewDelayedQueue 创建延时队列实例
func NewDelayedQueue() *DelayedQueue {
dq := &DelayedQueue{
heap: make(TaskHeap, 0),
quit: make(chan struct{}),
wakeup: make(chan struct{}, 1), // 带缓冲,避免阻塞
}
heap.Init(&dq.heap)
// 启动任务检查协程
go dq.run()
return dq
}

// run 循环检查堆顶任务,执行到期任务
func (dq *DelayedQueue) run() {
for {
select {
case <-dq.quit:
return
default:
dq.mu.Lock()
// 堆为空,休眠等待唤醒
if dq.heap.Len() == 0 {
dq.mu.Unlock()
<-dq.wakeup
continue
}

// 获取堆顶任务
now := time.Now()
nextTask := dq.heap[0]
// 任务未到期,计算休眠时间
if nextTask.ExecuteAt.After(now) {
sleepDuration := nextTask.ExecuteAt.Sub(now)
dq.mu.Unlock()
// 休眠至任务到期或被新任务唤醒
select {
case <-dq.quit:
return
case <-dq.wakeup:
continue
case <-time.After(sleepDuration):
continue
}
}

// 任务到期,取出并执行
task := heap.Pop(&dq.heap).(*DelayedTask)
dq.mu.Unlock()

// 异步执行回调(避免阻塞队列)
go func(t *DelayedTask) {
t.Callback(t.Payload)
}(task)
}
}
}

// AddTask 添加延时任务
// delay: 延迟时间(如5*time.Minute)
// payload: 任务数据
// callback: 任务执行的回调函数
func (dq *DelayedQueue) AddTask(taskID string, delay time.Duration, payload interface{}, callback func(interface{})) {
dq.mu.Lock()
defer dq.mu.Unlock()

// 创建任务
task := &DelayedTask{
ID: taskID,
ExecuteAt: time.Now().Add(delay),
Payload: payload,
Callback: callback,
}
// 任务入堆
heap.Push(&dq.heap, task)

// 唤醒检查协程(避免协程休眠过久)
select {
case dq.wakeup <- struct{}{}:
default:
// 通道已满,无需重复发送
}
}

// Stop 停止延时队列
func (dq *DelayedQueue) Stop() {
close(dq.quit)
// 唤醒休眠的协程,确保退出
select {
case dq.wakeup <- struct{}{}:
default:
}
}

// 测试示例
func main() {
// 创建延时队列
dq := NewDelayedQueue()
defer dq.Stop()

// 回调函数:执行任务
callback := func(payload interface{}) {
fmt.Printf("[%s] 执行任务:%v\n", time.Now().Format("2006-01-02 15:04:05"), payload)
}

// 添加3个延时任务
fmt.Printf("[%s] 添加延时任务(2秒后执行)\n", time.Now().Format("2006-01-02 15:04:05"))
dq.AddTask("task1", 2*time.Second, "订单123456超时取消", callback)

fmt.Printf("[%s] 添加延时任务(1秒后执行)\n", time.Now().Format("2006-01-02 15:04:05"))
dq.AddTask("task2", 1*time.Second, "短信提醒:课程即将开始", callback)

fmt.Printf("[%s] 添加延时任务(3秒后执行)\n", time.Now().Format("2006-01-02 15:04:05"))
dq.AddTask("task3", 3*time.Second, "缓存key:user_10086过期清理", callback)

// 等待任务执行完成
time.Sleep(4 * time.Second)
}

代码解释

  1. TaskHeap:基于container/heap实现最小堆,按任务的ExecuteAt(执行时间)排序,保证堆顶是最早到期的任务;
  2. DelayedQueue:封装堆、互斥锁、退出信号和唤醒信号,保证并发安全;
  3. run方法:核心协程,循环检查堆顶任务:
    • 堆为空时,休眠等待wakeup信号;
    • 堆顶任务未到期时,休眠至到期时间(或被新任务唤醒);
    • 堆顶任务到期时,取出并异步执行回调;
  4. AddTask方法:添加任务到堆中,并唤醒检查协程(避免协程休眠过久)。

运行结果

[2024-05-01 10:00:00] 添加延时任务(2秒后执行)
[2024-05-01 10:00:00] 添加延时任务(1秒后执行)
[2024-05-01 10:00:00] 添加延时任务(3秒后执行)
[2024-05-01 10:00:01] 执行任务:短信提醒:课程即将开始
[2024-05-01 10:00:02] 执行任务:订单123456超时取消
[2024-05-01 10:00:03] 执行任务:缓存key:user_10086过期清理

方案2:基于Redis的分布式延时队列(中等成本)

单机队列无法满足分布式系统需求,Redis是实现分布式延时队列的主流方案,核心思路:

  1. 使用Redis的ZSET(有序集合)存储任务,score为任务的触发时间戳(毫秒);
  2. 消费端定时(如1秒)查询ZSETscore <= 当前时间戳的任务;
  3. 通过ZREM原子性取出任务(避免重复消费),执行任务逻辑;
  4. 可选:用Redis的LPUSH/RPOP做消费确认,保证任务至少执行一次。

Go实现代码(基于Redis)

package main

import (
"context"
"fmt"
"time"

"github.com/redis/go-redis/v9"
)

// RedisDelayedQueue 基于Redis的分布式延时队列
type RedisDelayedQueue struct {
client *redis.Client
queueKey string // ZSET键名
processingKey string // 处理中队列(避免重复消费)
ctx context.Context
}

// NewRedisDelayedQueue 创建Redis延时队列
func NewRedisDelayedQueue(addr, password string, db int, queueKey string) *RedisDelayedQueue {
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
})
return &RedisDelayedQueue{
client: client,
queueKey: queueKey,
processingKey: queueKey + "_processing",
ctx: context.Background(),
}
}

// AddTask 添加延时任务
// delay: 延迟时间
// taskID: 任务唯一ID
// payload: 任务数据
func (rdq *RedisDelayedQueue) AddTask(taskID string, delay time.Duration, payload string) error {
// 计算触发时间戳(毫秒)
executeAt := time.Now().Add(delay).UnixMilli()
// 将任务添加到ZSET,score为触发时间戳
return rdq.client.ZAdd(rdq.ctx, rdq.queueKey, redis.Z{
Score: float64(executeAt),
Member: taskID + "|" + payload, // 拼接taskID和payload
}).Err()
}

// Consume 消费任务(阻塞循环)
// interval: 轮询间隔
// handler: 任务处理函数
func (rdq *RedisDelayedQueue) Consume(interval time.Duration, handler func(taskID, payload string) error) error {
for {
// 1. 查询并取出到期的任务(ZRANGEBYSCORE + ZREM 保证原子性)
now := time.Now().UnixMilli()
// 取出score <= now的前10条任务(批量处理)
tasks, err := rdq.client.ZRangeByScore(rdq.ctx, rdq.queueKey, &redis.ZRangeBy{
Min: "0",
Max: fmt.Sprintf("%d", now),
Offset: 0,
Count: 10,
}).Result()
if err != nil {
fmt.Printf("查询到期任务失败:%v\n", err)
time.Sleep(interval)
continue
}

// 2. 处理每个任务
for _, task := range tasks {
// 拆分taskID和payload
sepIndex := -1
for i, c := range task {
if c == '|' {
sepIndex = i
break
}
}
if sepIndex == -1 {
fmt.Printf("任务格式错误:%s\n", task)
// 删除无效任务
rdq.client.ZRem(rdq.ctx, rdq.queueKey, task)
continue
}
taskID := task[:sepIndex]
payload := task[sepIndex+1:]

// 原子性删除任务(避免重复消费)
delCount, err := rdq.client.ZRem(rdq.ctx, rdq.queueKey, task).Result()
if err != nil || delCount == 0 {
continue // 任务已被其他消费节点处理
}

// 3. 执行任务处理函数
if err := handler(taskID, payload); err != nil {
fmt.Printf("处理任务失败:%v,任务ID:%s\n", err, taskID)
// 失败重试:重新添加到队列(指数退避)
rdq.AddTask(taskID, 2*interval, payload)
}
}

// 4. 休眠至下一轮轮询
time.Sleep(interval)
}
}

// 测试示例
func main() {
// 创建Redis延时队列
rdq := NewRedisDelayedQueue("127.0.0.1:6379", "", 0, "delayed_queue")

// 添加任务
err := rdq.AddTask("order_123456", 2*time.Second, "cancel_order|123456")
if err != nil {
panic(err)
}
fmt.Printf("[%s] 添加任务:order_123456(2秒后执行)\n", time.Now().Format("2006-01-02 15:04:05"))

// 启动消费协程
go func() {
rdq.Consume(1*time.Second, func(taskID, payload string) error {
fmt.Printf("[%s] 执行任务:ID=%s, 数据=%s\n", time.Now().Format("2006-01-02 15:04:05"), taskID, payload)
return nil
})
}()

// 等待任务执行
time.Sleep(3 * time.Second)
}

核心要点

  1. ZSET 存储:利用ZSET的score排序特性,按触发时间戳存储任务;
  2. 原子性消费:通过ZRem删除任务,保证同一任务仅被一个消费节点处理;
  3. 失败重试:任务处理失败时,重新添加到队列(可结合指数退避策略);
  4. 批量处理:每次查询10条任务,提升消费效率。

方案3:基于消息队列(如RabbitMQ/RocketMQ)的延时队列(高可用)

专业消息队列原生支持延时队列,适合高可用、高并发的生产环境:

  • RabbitMQ:通过“死信交换机(DLX)+ 过期时间(TTL)”实现;
  • RocketMQ:原生支持延时消息(固定延时级别:1s/5s/10s/30s/1m/5m等);
  • Kafka:通过时间轮算法或外部组件(如Kafka Connect)实现。

RabbitMQ实现思路(Go示例)

  1. 创建普通队列A,设置TTL(消息过期时间)和死信交换机DLX;
  2. 消息发送到队列A,过期后自动转发到死信队列B;
  3. 消费端监听死信队列B,处理延时任务。
package main

import (
"context"
"fmt"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
// 连接RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()

// 1. 声明死信交换机和死信队列
dlxExchange := "delayed_exchange"
dlxQueue := "delayed_queue"
// 声明死信交换机
err = ch.ExchangeDeclare(
dlxExchange,
"direct",
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil,
)
// 声明死信队列
dlxQueueArgs := amqp.Table{}
dlxQueue, err := ch.QueueDeclare(
dlxQueue,
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
dlxQueueArgs,
)
// 绑定死信队列到死信交换机
err = ch.QueueBind(
dlxQueue.Name,
"delayed_key",
dlxExchange,
false,
nil,
)

// 2. 声明普通队列(带TTL和死信配置)
normalQueue := "normal_queue"
normalQueueArgs := amqp.Table{
"x-dead-letter-exchange": dlxExchange, // 死信交换机
"x-dead-letter-routing-key": "delayed_key", // 死信路由键
"x-message-ttl": 2000, // 消息过期时间(2秒,单位毫秒)
}
_, err = ch.QueueDeclare(
normalQueue,
true,
false,
false,
false,
normalQueueArgs,
)

// 3. 发送延时消息(到普通队列)
msg := "cancel_order|123456"
err = ch.PublishWithContext(
context.Background(),
"", // 交换机
normalQueue, // 队列名
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
},
)
fmt.Printf("[%s] 发送延时消息:%s\n", time.Now().Format("2006-01-02 15:04:05"), msg)

// 4. 消费死信队列(处理延时任务)
msgs, err := ch.Consume(
dlxQueue.Name,
"",
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil,
)
go func() {
for d := range msgs {
fmt.Printf("[%s] 执行延时任务:%s\n", time.Now().Format("2006-01-02 15:04:05"), string(d.Body))
}
}()

// 等待任务执行
time.Sleep(3 * time.Second)
}

五、三种实现方案对比与选型建议

方案优点缺点适用场景
基于堆(单机)轻量、无依赖、精准延时不支持分布式、无持久化、单机故障丢失单机、轻量、临时延时任务
基于Redis分布式、易扩展、成本低轮询有轻微延时、依赖Redis分布式、中小规模延时任务
基于MQ高可用、高并发、原生支持部署复杂、成本高(如RabbitMQ需维护)生产环境、高可用、高并发场景

选型原则

  1. 单机场景:优先选“基于堆”的实现,简单高效;
  2. 分布式小规模:选Redis实现,成本低、易扩展;
  3. 生产环境大规模:选RabbitMQ/RocketMQ,保证高可用和高并发。

总结

  1. 延时队列的核心是按触发时间延迟消费消息,解决了定时/延时任务的精准、高效处理问题;
  2. 典型应用包括订单超时、消息重试、定时通知、缓存清理等,核心价值是避免轮询、精准延时、解耦系统;
  3. Go实现延时队列有三种主流方案:
    • 单机场景用container/heap实现最小堆,轻量无依赖;
    • 分布式中小规模用Redis ZSET,成本低易扩展;
    • 生产环境大规模用RabbitMQ/RocketMQ,保证高可用。
  4. 选型时需结合“分布式需求、可用性要求、部署成本”综合考虑,优先选择成熟的中间件(Redis/MQ)而非重复造轮子。