高性能协程池ants源码剖析
· 阅读需 10 分钟
简介
ants是什么
ants是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。
功能特点
- 自动调度海量的 goroutines,复用 goroutines
- 定期清理过期的 goroutines,进一步节省资源
- 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
- 优雅处理 panic,防止程序崩溃
- 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能
- 非阻塞机制
ants核心概念
- Pool :协程池
- WorkerArray:Pool池中的worker队列,存放所有的goWorker
- goWorker:运行任务的实际执行者,它启动一个 goroutine 来接受任务并执行函数调用。
Pool协程池
Pool结构
Ants 提供了两种Pool结构:Pool和PoolWithFunc ;但两者逻辑大致一样,本文着重介绍Pool的结构
// Pool accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
type Pool struct {
// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
// which submits a new task to the same pool.
// 协程池的容量
capacity int32
// running is the number of the currently running goroutines.
// 正在运行的goroutines的数量
running int32
// lock for protecting the worker queue.
// 锁,自旋锁,保护队列
lock sync.Locker
// workers is a slice that store the available workers.
// 存放池中所有的worker,workerArray包含可用workers队列和过期workers队列,只会从可用workers队列中取可用worker
workers workerArray
// state is used to notice the pool to closed itself.
// 记录池子的状态 (关闭、开启)
state int32
// cond for waiting to get an idle worker.
// 条件变量
cond *sync.Cond
// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
// worker 对象池
workerCache sync.Pool
// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
//阻塞等待的任务量
waiting int32
// 清道夫,定时清理workerarray 队列中过期的worker
purgeDone int32
stopPurge context.CancelFunc
// 定时器 更新pool中now的字段
ticktockDone int32
stopTicktock context.CancelFunc
now atomic.Value
// 需要自定义加载的配置
options *Options
}
Pool创建
// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {
opts := loadOptions(options...) // 加载自定义的options中的配置
if size <= 0 {
size = -1
}
if !opts.DisablePurge {// 当 DisablePurge 为 true 时,worker 不会被清除并且是驻留的。
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime // 默认间隔时间1s
}
}
if opts.Logger == nil {
opts.Logger = defaultLogger
}
p := &Pool{
capacity: int32(size),
lock: syncx.NewSpinLock(),//自旋锁
options: opts,
}
p.workerCache.New = func() interface{} { //sync.pool 初始化
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerArray(loopQueueType, size) //循环队列
} else {
p.workers = newWorkerArray(stackType, 0) //数组
}
p.cond = sync.NewCond(p.lock) // sync.cond初始化
p.goPurge()
p.goTicktock()
return p, nil
}
自旋锁SpinLock(重点)
思考:如何设计一种自旋锁,设计自旋锁时需要注意什么?
spinLock是基于CAS机制和指数退避算法实现的一种自旋锁
package sync
import (
"runtime"
"sync"
"sync/atomic"
)
type spinLock uint32 // 实现sync.Locker接口
const maxBackoff = 16 //最大的回避次数
func (sl *spinLock) Lock() {
backoff := 1
// 基于CAS机制,尝试获取锁,且使用指数退避算法来提供获取锁的成功率
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
// Leverage the exponential backoff algorithm, see https://en.wikipedia.org/wiki/Exponential_backoff.
for i := 0; i < backoff; i++ {
//runtime.Gosched()函数功能:使当前goroutine让出CPU时间片(“回避”),让其他的goroutine获得执行的机会。当前的goroutine会在未来的某个时间点继续运行。
//注意:当一个goroutine发生阻塞,Go会自动地把与该goroutine处于同一系统线程的其他goroutines转移到另一个系统线程上去,以使这些goroutines不阻塞(从GMP模型角度来说,就是当与P绑定的M发生阻塞,P就与其解绑,然后与另一个空闲的M进行绑定 或者 去创建一个M进行绑定)。
runtime.Gosched()
}
if backoff < maxBackoff {
backoff <<= 1
}
}
}
func (sl *spinLock) Unlock() {
//原子操作,并发安全
atomic.StoreUint32((*uint32)(sl), 0)
}
// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {
return new(spinLock)
}
sync.Locker
设计锁时必须实现该接口中的方法
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
关键知识点
- sync.Locker接口
- 指数退避算法
- atomic 原子包中的方法了解
- runtime.Gosched()
