跳到主要内容

sync

sync tree

├── atomic
│ ├── asm.s
│ ├── doc.go
│ ├── race.s
│ └── value.go
├── cond.go
├── map.go
├── mutex.go
├── once.go
├── pool.go
├── poolqueue.go
├── runtime2.go
├── runtime2_lockrank.go
├── runtime.go
├── rwmutex.go
└── waitgroup.go

RWMutex 多读写锁

信息

相对于Mutex 多了对读操作的锁定和解锁的操作。 除了实现Locker接口以外,还实现了Unlock、RLock和RUnlock接口 RWMutex是读写互斥锁。该锁可以被同时多个读取者持有或唯一个写入者持有。RWMutex可以创建为其他结构体的字段;零值为解锁状态。RWMutex类型的锁也和线程无关,可以由不同的线程加读取锁/写入和解读取锁/写入锁。

  • 一次只能有一个goroutine获取写锁。
  • 可以有多个goroutine同时获取读锁。
  • 只能同时获取读锁或者写锁,读和写是互斥的。

type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}

const rwmutexMaxReaders = 1 << 30

//RLock方法将rw锁定为读取状态,禁止其他线程写入,但不禁止读取。
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {

runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}

//Runlock方法解除rw的读取锁状态,如果m未加读取锁会导致运行时错误。
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {

rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

//Lock方法将rw锁定为写入状态,禁止其他线程读取或者写入。
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}

//Unlock方法解除rw的写入锁状态,如果m未加写入锁会导致运行时错误。
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}


r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}

//Rlocker方法返回一个互斥锁,通过调用rw.Rlock和rw.Runlock实现了Locker接口。
func (rw *RWMutex) RLocker() Locker {
return (*rlocker)(rw)
}

type rlocker RWMutex

func (r *rlocker) Lock() { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

poolqueue

type poolDequeue struct {

headTail uint64


vals []eface
}
type eface struct {
typ, val unsafe.Pointer
}
const dequeueBits = 32
const dequeueLimit = (1 << dequeueBits) / 4
type dequeueNil *struct{}

func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
const mask = 1<<dequeueBits - 1
head = uint32((ptrs >> dequeueBits) & mask)
tail = uint32(ptrs & mask)
return
}
func (d *poolDequeue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}

func (d *poolDequeue) pushHead(val interface{}) bool {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {

return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]


typ := atomic.LoadPointer(&slot.typ)
if typ != nil {

return false
}


if val == nil {
val = dequeueNil(nil)
}
*(*interface{})(unsafe.Pointer(slot)) = val

atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {

return nil, false
}


head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {

slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}

val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}

*slot = eface{}
return val, true
}

func (d *poolDequeue) popTail() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {

return nil, false
}

ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// Success.
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}


val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}


slot.val = nil
atomic.StorePointer(&slot.typ, nil)


return val, true
}
type poolChain struct {

head *poolChainElt


tail *poolChainElt
}

type poolChainElt struct {
poolDequeue


next, prev *poolChainElt
}
func storePoolChainElt(pp **poolChainElt, v *poolChainElt) {
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
}

func loadPoolChainElt(pp **poolChainElt) *poolChainElt {
return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
}

func (c *poolChain) pushHead(val interface{}) {
d := c.head
if d == nil {

const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}

if d.pushHead(val) {
return
}


newSize := len(d.vals) * 2
if newSize >= dequeueLimit {

newSize = dequeueLimit
}

d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}


func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
for d != nil {
if val, ok := d.popHead(); ok {
return val, ok
}

d = loadPoolChainElt(&d.prev)
}
return nil, false
}

func (c *poolChain) popTail() (interface{}, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}

for {

d2 := loadPoolChainElt(&d.next)

if val, ok := d.popTail(); ok {
return val, ok
}

if d2 == nil {

return nil, false
}


if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {

storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}

Map

信息
  • Load(key interface) (value interface,ok bool):通过参数key查询对应的value,如果不存在则返回nil;ok表示是否找到对应的值。
  • Store(key,value interface):该方法相当于对sync.Map的更新或新增,参数是键值对
  • LoadOrStore(key,value interface) (actual interface,loaded bool):该方法的参数为key和value。该方法会先根据参数key查找对应的value,如果找到则不修改原来的值并通过actual返回,并且loaded为true;如果通过key无法查找到对应的value,则存储key-value并且将存储的value通过actual返回,loaded为false。
  • Delete(key interface):通过key删除键值对
  • Range(f func(key,value interface) bool):遍历sync.Map的元素,注意for...range map是对内置map类型的用法,sync.Map需要使用单独的Range方法。
type Map struct {
mu Mutex

read atomic.Value


dirty map[interface{}]*entry


misses int
}
type readOnly struct {
m map[interface{}]*entry
amended bool
}

var expunged = unsafe.Pointer(new(interface{}))
type entry struct {

p unsafe.Pointer
}

func newEntry(i interface{}) *entry {
return &entry{p: unsafe.Pointer(&i)}
}

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()

read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}

func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}

m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {

m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}

func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}

func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
func (e *entry) storeLocked(i *interface{}) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}

func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {

read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}

m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {

m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()

return actual, loaded
}
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}

ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
}
}

func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}

func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}

func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}

func (m *Map) Range(f func(key, value interface{}) bool) {

read, _ := m.read.Load().(readOnly)
if read.amended {

m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}

for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}

func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}

func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}

read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}


Cond 条件变量

信息

Cond实现了一个条件变量,一个线程集合地,供线程等待或者宣布某事件的发生。

每个Cond实例都有一个相关的锁(一般是Mutex或RWMutex类型的值),它必须在改变条件时或者调用Wait方法时保持锁定。Cond可以创建为其他结构体的字段,Cond在开始使用后不能被拷贝。

type Cond struct {
noCopy noCopy

L Locker

notify notifyList
checker copyChecker
}

//使用锁l创建一个*Cond。
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
//Signal唤醒等待c的一个线程(如果存在)。调用者在调用本方法时,建议(但并非必须)保持c.L的锁定。
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
//Broadcast唤醒所有等待c的线程。调用者在调用本方法时,建议(但并非必须)保持c.L的锁定。
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
type copyChecker uintptr

func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
type noCopy struct{}

func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}

WaitGroup

信息

WaitGroup用于等待一组线程的结束。父线程调用Add方法来设定应等待的线程的数量。每个被等待的线程在结束时应调用Done方法。同时,主线程里可以调用Wait方法阻塞至所有线程结束。

type WaitGroup struct {
noCopy noCopy

state1 [3]uint32
}

func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
//Add方法向内部计数加上delta,delta可以是负数;如果内部计数器变为0,Wait方法阻塞等待的所有线程都会释放,如果计数器小于0,方法panic。注意Add加上正数的调用应在Wait之前,否则Wait可能只会等待很少的线程。一般来说本方法应在创建新的线程或者其他应等待的事件之前调用。
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
if race.Enabled {
_ = *statep
if delta < 0 {

race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {

race.Read(unsafe.Pointer(semap))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}

if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}

*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}

//Done方法减少WaitGroup计数器的值,应在线程的最后执行。
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
//Wait方法阻塞直到WaitGroup计数器减为0。
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
if race.Enabled {
_ = *statep
race.Disable()
}
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {

if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}

if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {

race.Write(unsafe.Pointer(semap))
}
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}

runtime

func runtime_Semacquire(s *uint32)

func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)


func runtime_Semrelease(s *uint32, handoff bool, skipframes int)


func runtime_notifyListAdd(l *notifyList) uint32


func runtime_notifyListWait(l *notifyList, t uint32)


func runtime_notifyListNotifyAll(l *notifyList)


func runtime_notifyListNotifyOne(l *notifyList)

func runtime_notifyListCheck(size uintptr)
func init() {
var n notifyList
runtime_notifyListCheck(unsafe.Sizeof(n))
}


func runtime_canSpin(i int) bool


func runtime_doSpin()

func runtime_nanotime() int64

Pool 对象池

提示

存入sync.Pool的对象可能会在不通知的情况下被释放,这一点一定要注意。比如一些socket长连接就不适合存入sync.Pool内。 Get和Put是Pool的两个公共方法。Put方法是向池中添加对象,Get方法是从池中获取对象,如果没有对象则调用New方法创建生成,如果未设置New则返回nil。 Pool是一个可以分别存取的临时对象的集合。

Pool中保存的任何item都可能随时不做通告的释放掉。如果Pool持有该对象的唯一引用,这个item就可能被回收。

Pool可以安全的被多个线程同时使用。

Pool的目的是缓存申请但未使用的item用于之后的重用,以减轻GC的压力。也就是说,让创建高效而线程安全的空闲列表更容易。但Pool并不适用于所有空闲列表。

Pool的合理用法是用于管理一组静静的被多个独立并发线程共享并可能重用的临时item。Pool提供了让多个线程分摊内存申请消耗的方法。

Pool的一个好例子在fmt包里。该Pool维护一个动态大小的临时输出缓存仓库。该仓库会在过载(许多线程活跃的打印时)增大,在沉寂时缩小。

另一方面,管理着短寿命对象的空闲列表不适合使用Pool,因为这种情况下内存申请消耗不能很好的分配。这时应该由这些对象自己实现空闲列表。

type Pool struct {
noCopy noCopy

local unsafe.Pointer
localSize uintptr

victim unsafe.Pointer
victimSize uintptr


New func() interface{}
}
type poolLocalInternal struct {
private interface{}
shared poolChain
}

type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

func fastrand() uint32

var poolRaceHash [128]uint64

func poolRaceAddr(x interface{}) unsafe.Pointer {
ptr := uintptr((*[2]unsafe.Pointer)(unsafe.Pointer(&x))[1])
h := uint32((uint64(uint32(ptr)) * 0x85ebca6b) >> 16)
return unsafe.Pointer(&poolRaceHash[h%uint32(len(poolRaceHash))])
}
//Put方法将x放入池中。
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}


// Get方法从池中选择任意一个item,删除其在池中的引用计数,并提供给调用者。Get方法也可能选择无视内存池,将其当作空的。调用者不应认为Get的返回这和传递给Put的值之间有任何关系。
// 假使Get方法没有取得item:如p.New非nil,Get返回调用p.New的结果;否则返回nil。

func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil {
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}

func (p *Pool) getSlow(pid int) interface{} {

size := runtime_LoadAcquintptr(&p.localSize)
locals := p.local

for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}

size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}


atomic.StoreUintptr(&p.victimSize, 0)

return nil
}

func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()

s := runtime_LoadAcquintptr(&p.localSize)
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
func (p *Pool) pinSlow() (*poolLocal, int) {

runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()

s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p)
}
.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
runtime_StoreReluintptr(&p.localSize, uintptr(size))
return &local[pid], pid
}

func poolCleanup() {

for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}

for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}

oldPools, allPools = allPools, nil
}

var (
allPoolsMu Mutex


allPools []*Pool


oldPools []*Pool
)

func init() {
runtime_registerPoolCleanup(poolCleanup)
}
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}


func runtime_registerPoolCleanup(cleanup func())
func runtime_procPin() int
func runtime_procUnpin()


func runtime_LoadAcquintptr(ptr *uintptr) uintptr


func runtime_StoreReluintptr(ptr *uintptr, val uintptr) uintptr

Once

提示

once 只能执行一次 sync.Once就提供了延迟初始化的功能 done用来记录执行次数,用m来保证同一时刻只有一个goroutine在执行Do方法。在使用该结构体的时候先定义Once型变量:

 type Once struct {
done uint32
m Mutex
}
func (o *Once) Do(f func()) {

if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}

func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}

Mutex 互斥锁

信息

一个互斥锁只能被一个goroutine锁定,锁定的是这个锁本身,而不是某段代码。如果给某段代码加上锁,相当于执行这段代码的时候就加了锁,别的程序执行这段代码只能等待前面的程序结束并把锁打开,如此往复,以此来实现对变量的共享使用 可以由不同的线程加锁和解锁 sync.Mutex的加锁和开锁必须是成对的,如果加锁后忘记开锁,那么程序会崩溃。

type Mutex struct {
state int32
sema uint32
}
//Lock方法锁住m,如果m已经加锁,则阻塞直到m解锁。
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
m.lockSlow()
}

func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}

if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}

//Unlock方法解锁m,如果m未加锁会导致运行时错误。锁和线程无关,可以由不同的线程加锁和解锁。
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}

// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}

func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}

new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}