package delayqueue
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/jiaxwu/gommon/container/heap"
)
type entry[T any] struct {
value T
expiration time.Time
}
type DelayQueue[T any] struct {
h *heap.Heap[*entry[T]]
mutex sync.Mutex
sleeping int32
wakeup chan struct{}
}
func New[T any]() *DelayQueue[T] {
return &DelayQueue[T]{
h: heap.New(nil, func(e1, e2 *entry[T]) bool {
return e1.expiration.Before(e2.expiration)
}),
wakeup: make(chan struct{}),
}
}
func (q *DelayQueue[T]) Push(value T, delay time.Duration) {
q.mutex.Lock()
defer q.mutex.Unlock()
entry := &entry[T]{
value: value,
expiration: time.Now().Add(delay),
}
q.h.Push(entry)
if q.h.Peek() == entry {
if atomic.CompareAndSwapInt32(&q.sleeping, 1, 0) {
q.wakeup <- struct{}{}
}
}
}
func (q *DelayQueue[T]) Take(ctx context.Context) (T, bool) {
for {
var timer *time.Timer
q.mutex.Lock()
if !q.h.Empty() {
entry := q.h.Peek()
now := time.Now()
if now.After(entry.expiration) {
q.h.Pop()
q.mutex.Unlock()
return entry.value, true
}
timer = time.NewTimer(entry.expiration.Sub(now))
}
atomic.StoreInt32(&q.sleeping, 1)
q.mutex.Unlock()
if timer != nil {
select {
case <-q.wakeup:
timer.Stop()
case <-timer.C:
if atomic.SwapInt32(&q.sleeping, 0) == 0 {
<-q.wakeup
}
case <-ctx.Done():
timer.Stop()
var t T
return t, false
}
} else {
select {
case <-q.wakeup:
case <-ctx.Done():
var t T
return t, false
}
}
}
}
func (q *DelayQueue[T]) Channel(ctx context.Context, size int) <-chan T {
out := make(chan T, size)
go func() {
for {
entry, ok := q.Take(ctx)
if !ok {
close(out)
return
}
out <- entry
}
}()
return out
}
func (q *DelayQueue[T]) Peek() (T, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.h.Empty() {
var t T
return t, false
}
return q.h.Peek().value, true
}
func (q *DelayQueue[T]) Pop() (T, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.h.Empty() {
var t T
return t, false
}
entry := q.h.Peek()
if time.Now().Before(entry.expiration) {
var t T
return t, false
}
q.h.Pop()
return entry.value, true
}
func (q *DelayQueue[T]) Empty() bool {
q.mutex.Lock()
defer q.mutex.Unlock()
return q.h.Empty()
}